京东商智_Python并发处理模块multiprocessing

京东商智_Python并发处理模块multiprocessing

1.背景

在python脚本中经常有一些逻辑需要使用多线程并发处理,以此来加快处理速度,缩短处理时间。和java中一样,可以通过new Thread新建线程等方式来实现多线程处理,但是往往还需要处理线程通信、数据共享、加锁等问题,所以java基础库中封装了线程池来帮助用户快捷使用多线程。同样的,python基础库也封装了类似的线程池工具模块multiprocessing,方便用户快捷使用多线程。

2.Pool模块的常见方法

Pool(threadNum)

创建线程池对象,入参threadNum为int类型,表示线程池中最大线程数。

apply(func, args=(), kwds={})

同步执行func函数,该方法会阻塞主线程和加入线程池中的其他task,直到当前正在执行task结果返回。args是函数 func 的参数元组。

apply_async(func, args=(), kwds={}, callback=None, error_callback=None)

异步版本的 apply方法。它立即返回一个 AsyncResult对象,可以使用此对象查询或获取函数func的结果。callback是一个可选的可调用对象,当结果准备好时,会传递给callback,error_callback是在任务执行出错时调用。

*close() *

阻止有新的任务提交到池中。在所有任务执行完毕后,工作进程会退出。

terminate()

立刻停止工作进程,不再处理未完成的任务。

join()

等待所有工作进程退出,必须在调用close()或terminate()方法之后才能调用此方法。

3.ClickHouse推数组件使用实例

使用case如下代码,业务逻辑为将多天数据按天拆分,并行推数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from multiprocessing.dummy import Pool

def run(self):
pool = Pool(self.poolThreadNum)
index_dt = datetime.datetime.strptime(tar_start_dt, '%Y-%m-%d')
end_dt = datetime.datetime.strptime(tar_end_dt, '%Y-%m-%d')
results = []
while index_dt <= end_dt:
log_info("index_dt------>{}".format(str(index_dt)))
index_dt_str = index_dt.strftime('%Y-%m-%d')
hive_sql = self.get_source_sql(index_dt_str, index_dt_str)

results.append(pool.apply_async(self.run_unit_flow, (hive_sql, index_dt_str, index_dt_str)))

index_dt = index_dt + datetime.timedelta(1)
log_info('Waiting for all subprocesses done...')
pool.close()
pool.join()
log_info('All subprocesses done.')
for result in results:
log_info('result.get()-->{}'.format(result.get()))
if not result.get():
sys.exit(1)

注意multiprocessing和multiprocessing.dummy两个不同模块都有Pool类,但是它们是不同的类且并发实现机制并不相同,from multiprocessing import Pool实现的是进程并发,要求CPU多核,且是在多核之间实现并发,from multiprocessing.dummy import Pool实现的是线程并发。

apply()和apply_async()方法返回的数据类型是AsyncResult,可以通过AsyncResult.get()方法获取到通过apply()和apply_async()方法传递给线程池的目标函数的执行结果。在对应task执行完毕之后,就可以使用get()方法获取目标函数的返回值,如果task未执行完,get()方法也会阻塞住知道task执行完毕返回结果。

那么如上case中的获取结果逻辑就是先将任务都提交到线程池,然后使用一个resultList存放所有AsyncResult对象,等待线程池执行完毕之后,再依次获取AsyncResult对象执行get()方法获取目标函数执行结果。