python 多进程
上一小节中分享了 python 并发的特性,如果想真正利用现在多核的 cpu 进行并行计算就需要利用多进程。
就如前面所说的,进程是拥有独立数据和内存空间的,进程之间彼此独立;所以如果说进程之间如果要进行数据交互或者返回数据等交互操作,就需要利用中间的服务机制来协调;
python 的多进程方式, 有:
有官方提供的concurrent的ProcessPoolExecutor
multiprocessing
第三方的任务处理队列库 celery(常见的中间服务机制是 redis 和 RabbitMQ)
分布式计算框架 Ray(以 redis 为任务管理)
ProcessPoolExecutor
from concurrent import futuresfrom time import sleep, strftimeimport time
def cpu_task(file): st = time.time() t = 0 for i in range(1000000): t += i**2 print('one task time is ',time.time()-st) return file def run(files): workers = min(5, len(files)) result_list = [] ret = [] with futures.ProcessPoolExecutor(workers) as executor: for file in sorted(files): future_result = executor.submit(cpu_task, file) result_list.append(future_result) for future in result_list: res = future.result() ret.append(res) return ret if __name__ == "__main__": ts = time.time() files = ['{}.png'.format(i) for i in range(5)] res = run(files) print(res)
print('all cost is ', time.time()-ts)
# one task time is 0.503483772277832# one task time is 0.503483772277832# one task time is 0.5034773349761963# one task time is 0.5095388889312744# one task time is 0.5119566917419434# ['0.png', '1.png', '2.png', '3.png', '4.png']# all cost is 0.6866297721862793
复制代码
从上面的执行结果可知,每个 cpu_task 在 0.5s,5 个任务执行完只要 0.68s,真正的并行计算。
从并发编程步骤来看:
任务分解:cpu_task 每次处理一个 file
通过 submit 并行提交任务
每个 submit 会返回一个 future 对象
通过 future 对象 result()来获取进行的返回值,需要注意的是 result()方法在没有获得返回值时阻塞进程
multiprocessing
multiprocessing 库也提供了关于 python 多线程开发的基本功能和进阶功能(如资源共享和同步锁)。
我们看上面ProcessPoolExecutor同样的例子multiprocessing是怎么实现的。
from multiprocessing import Poolimport time
def cpu_task(file): st = time.time() t = 0 for i in range(1000000): t += i**2 print('one task time is ',time.time()-st) return file
def end_call(arg): print("end_call",arg)
def run(files): workers = min(5, len(files)) p = Pool(workers) result_list = []
for file in sorted(files): res = p.apply_async(func=cpu_task, args=(file,), callback=end_call) result_list.append(res) p.close() p.join()
for ret in result_list: print(ret.get()) return "" if __name__ == "__main__": ts = time.time() files = ['{}.png'.format(i) for i in range(5)] res = run(files)
print('all cost is ', time.time()-ts) # one task time is 0.44005799293518066# end_call 1.png# one task time is 0.45325803756713867# end_call 3.png# one task time is 0.497267484664917# end_call 2.png# one task time is 0.4813237190246582# end_call 4.png# one task time is 0.5435652732849121# end_call 0.png# 0.png# 1.png# 2.png# 3.png# 4.png# all cost is 0.7456285953521729
复制代码
multiprocessing是通过 Pool 来构建进程池,并通过apply_async异步提交任务,apply_async的返回值类型是 ApplyResult, 该对象的 get 方法可以获取任务的返回值;close表示线程池在接受这些任务之后不再接收其他任务,而join是表示等待线程池所有任务执行完。
多进程之间的进程共享数据,multiprocessing通过 multiprocessing.Manager 来进行进程间通信。
一个 multiprocessing.Manager 对象会控制一个服务器进程,其他进程可以通过代理的方式来访问这个服务器进程。从而达到多进程间数据通信且安全。
Manager() 返回的管理器支持类型: list 、 dict 、 Namespace 、 Lock 、 RLock 、 Semaphore 、 BoundedSemaphore 、 Condition 、 Event 、 Barrier 、 Queue 、 Value 和 Array。
from multiprocessing import Pool, Managerimport time
def cpu_task(file, l): st = time.time() t = 0 for i in range(1000000): t += i**2 l.append(file) print('one task time is ',time.time()-st) return file
def end_call(arg): print("end_call",arg)
def run(files): workers = min(5, len(files)) p = Pool(workers) with Manager() as manager: l = manager.list() for file in sorted(files): res = p.apply_async(func=cpu_task, args=(file,l, ), callback=end_call) p.close() p.join()
print(list(l)) return "" if __name__ == "__main__": ts = time.time() files = ['{}.png'.format(i) for i in range(5)] res = run(files)
print('all cost is ', time.time()-ts)
# one task time is 0.5007572174072266# end_call 2.png# one task time is 0.5093979835510254# end_call 3.png# one task time is 0.5186748504638672# end_call 4.png# one task time is 0.518937349319458# end_call 0.png# one task time is 0.5269668102264404# end_call 1.png# ['2.png', '3.png', '4.png', '0.png', '1.png']# all cost is 0.7396669387817383
复制代码
评论