背景介绍
最近用python写了段代码,串行的形式如下,可是实在太慢。
def test_run(): global files_dir for f1 in os.listdir(files_dir): for f2 os.listdir(files_dir): os.system("run program x on f1 and f2")
想提高速度,调用多核执行这个系统命令,相当于每个核都提交os.system(‘……’)这样的任务。使用的模块是multiprocessing,之前知道里面的Pool非常好用,能自动跑满核,但尝试了一会发现使用map并不能实现并行,也不晓得原因是什么。使用某歌搜过发现已有大神在Stack Overflow进行了解读,链接在此。这里摘抄翻译记录一下,欢迎讨论。不多说直接上代码。代码实现方式至少有两种,请看:
代码实现方式1:
import os NUM_CPUS = None # defaults to all available def worker(f1, f2): os.system("run program x on f1 and f2") # 希望并行的一个函数,调用system命令 def test_run(pool): filelist = os.listdir(files_dir) for f1 in filelist: # 遍历f1变量 for f2 in filelist: # 遍历f2变量 pool.apply_async(worker, args=(f1, f2)) # 提交给pool任务并传递参数。 if __name__ == "__main__": import multiprocessing as mp pool = mp.Pool(NUM_CPUS) # 创建pool test_run(pool) # 调用函数,并传递pool对象作为参数 pool.close() # 结束pool,不再添加任务进队列 pool.join() # 等待未完成的任务,直至结束
代码写的很简洁。
补充
最新版的python3已经支持Pool的上下文管理器
if __name__ == "__main__": import multiprocessing as mp with mp.Pool(NUM_CPUS) as pool: test_run(pool)
代码实现方式2:
使用concurrent.futures,可以使代码更简洁。
def worker(f1, f2): os.system("run program x on f1 and f2") def test_run(): import concurrent.futures as cf # 使用concurrent.futures filelist = os.listdir(files_dir) with cf.ProcessPoolExecutor(NUM_CPUS) as pp: # 上下文管理器 for f1 in filelist: for f2 in filelist: pp.submit(worker, f1, f2) if __name__ == "__main__": test_run()
并行时,异常的抛出
并行是,worker里如果出现了异常,通常会被隐藏掉,因为该异常不会被主进程察觉,这样是不好的,相当于除了问题而自己不知道。一种实现能够捕捉异常的方法是,直接检查每个函数执行的结果,代码如下。
def test_run(): import concurrent.futures as cf filelist = os.listdir(files_dir) futures = [] with cf.ProcessPoolExecutor(NUM_CPUS) as pp: for f1 in filelist: for f2 in filelist: futures.append(pp.submit(worker, f1, f2)) for future in cf.as_completed(futures): future.result()
在主进程future.result()会检查每个结果,并抛出异常