京东商智_Python并发处理模块multiprocessing
1.背景
在python脚本中经常有一些逻辑需要使用多线程并发处理,以此来加快处理速度,缩短处理时间。和java中一样,可以通过new Thread新建线程等方式来实现多线程处理,但是往往还需要处理线程通信、数据共享、加锁等问题,所以java基础库中封装了线程池来帮助用户快捷使用多线程。同样的,python基础库也封装了类似的线程池工具模块multiprocessing,方便用户快捷使用多线程。
2.Pool模块的常见方法
Pool(threadNum)
创建线程池对象,入参threadNum为int类型,表示线程池中最大线程数。
apply(func, args=(), kwds={})
同步执行func函数,该方法会阻塞主线程和加入线程池中的其他task,直到当前正在执行task结果返回。args是函数 func 的参数元组。
apply_async(func, args=(), kwds={}, callback=None, error_callback=None)
异步版本的 apply方法。它立即返回一个 AsyncResult对象,可以使用此对象查询或获取函数func的结果。callback是一个可选的可调用对象,当结果准备好时,会传递给callback,error_callback是在任务执行出错时调用。
*close() *
阻止有新的任务提交到池中。在所有任务执行完毕后,工作进程会退出。
terminate()
立刻停止工作进程,不再处理未完成的任务。
join()
等待所有工作进程退出,必须在调用close()或terminate()方法之后才能调用此方法。
3.ClickHouse推数组件使用实例
使用case如下代码,业务逻辑为将多天数据按天拆分,并行推数:
1 | from multiprocessing.dummy import Pool |
注意multiprocessing和multiprocessing.dummy两个不同模块都有Pool类,但是它们是不同的类且并发实现机制并不相同,from multiprocessing import Pool实现的是进程并发,要求CPU多核,且是在多核之间实现并发,from multiprocessing.dummy import Pool实现的是线程并发。
apply()和apply_async()方法返回的数据类型是AsyncResult,可以通过AsyncResult.get()方法获取到通过apply()和apply_async()方法传递给线程池的目标函数的执行结果。在对应task执行完毕之后,就可以使用get()方法获取目标函数的返回值,如果task未执行完,get()方法也会阻塞住知道task执行完毕返回结果。
那么如上case中的获取结果逻辑就是先将任务都提交到线程池,然后使用一个resultList存放所有AsyncResult对象,等待线程池执行完毕之后,再依次获取AsyncResult对象执行get()方法获取目标函数执行结果。