Python多核调用系统命令的方法及相应异常处理

背景介绍

最近用python写了段代码,串行的形式如下,可是实在太慢。

def test_run():
     global files_dir
     for f1 in os.listdir(files_dir):
          for f2 os.listdir(files_dir):
               os.system("run program x on f1 and f2")

想提高速度,调用多核执行这个系统命令,相当于每个核都提交os.system(‘……’)这样的任务。使用的模块是multiprocessing,之前知道里面的Pool非常好用,能自动跑满核,但尝试了一会发现使用map并不能实现并行,也不晓得原因是什么。使用某歌搜过发现已有大神在Stack Overflow进行了解读,链接在此。这里摘抄翻译记录一下,欢迎讨论。不多说直接上代码。代码实现方式至少有两种,请看:

代码实现方式1:

import os
NUM_CPUS = None  # defaults to all available

def worker(f1, f2):
    os.system("run program x on f1 and f2") # 希望并行的一个函数,调用system命令

def test_run(pool):
     filelist = os.listdir(files_dir)
     for f1 in filelist: # 遍历f1变量
          for f2 in filelist: # 遍历f2变量
               pool.apply_async(worker, args=(f1, f2)) # 提交给pool任务并传递参数。

if __name__ == "__main__":
     import multiprocessing as mp
     pool = mp.Pool(NUM_CPUS) # 创建pool
     test_run(pool) # 调用函数,并传递pool对象作为参数
     pool.close() # 结束pool,不再添加任务进队列
     pool.join() # 等待未完成的任务,直至结束

代码写的很简洁。

补充

最新版的python3已经支持Pool的上下文管理器

if __name__ == "__main__":
     import multiprocessing as mp
     with mp.Pool(NUM_CPUS) as pool:
         test_run(pool)

代码实现方式2:

使用concurrent.futures,可以使代码更简洁。

def worker(f1, f2):
    os.system("run program x on f1 and f2")

def test_run():
     import concurrent.futures as cf  # 使用concurrent.futures
     filelist = os.listdir(files_dir)
     with cf.ProcessPoolExecutor(NUM_CPUS) as pp: # 上下文管理器
         for f1 in filelist:
             for f2 in filelist:
                 pp.submit(worker, f1, f2)

if __name__ == "__main__":
     test_run()

并行时,异常的抛出

并行是,worker里如果出现了异常,通常会被隐藏掉,因为该异常不会被主进程察觉,这样是不好的,相当于除了问题而自己不知道。一种实现能够捕捉异常的方法是,直接检查每个函数执行的结果,代码如下。

def test_run():
     import concurrent.futures as cf
     filelist = os.listdir(files_dir)
     futures = []
     with cf.ProcessPoolExecutor(NUM_CPUS) as pp:
         for f1 in filelist:
             for f2 in filelist:
                 futures.append(pp.submit(worker, f1, f2))
     for future in cf.as_completed(futures):
         future.result()

在主进程future.result()会检查每个结果,并抛出异常

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据