写点什么

python 小知识 - 并发编程(3)

作者:AIWeker
  • 2022-11-08
    福建
  • 本文字数:2466 字

    阅读完需:约 8 分钟

python小知识-并发编程(3)

python 多进程

上一小节中分享了 python 并发的特性,如果想真正利用现在多核的 cpu 进行并行计算就需要利用多进程。


就如前面所说的,进程是拥有独立数据和内存空间的,进程之间彼此独立;所以如果说进程之间如果要进行数据交互或者返回数据等交互操作,就需要利用中间的服务机制来协调;


python 的多进程方式, 有:


  • 有官方提供的concurrentProcessPoolExecutor

  • multiprocessing

  • 第三方的任务处理队列库 celery(常见的中间服务机制是 redis 和 RabbitMQ)

  • 分布式计算框架 Ray(以 redis 为任务管理)

ProcessPoolExecutor

from concurrent import futuresfrom time import sleep, strftimeimport time
def cpu_task(file): st = time.time() t = 0 for i in range(1000000): t += i**2 print('one task time is ',time.time()-st) return file def run(files): workers = min(5, len(files)) result_list = [] ret = [] with futures.ProcessPoolExecutor(workers) as executor: for file in sorted(files): future_result = executor.submit(cpu_task, file) result_list.append(future_result) for future in result_list: res = future.result() ret.append(res) return ret if __name__ == "__main__": ts = time.time() files = ['{}.png'.format(i) for i in range(5)] res = run(files) print(res)
print('all cost is ', time.time()-ts)
# one task time is 0.503483772277832# one task time is 0.503483772277832# one task time is 0.5034773349761963# one task time is 0.5095388889312744# one task time is 0.5119566917419434# ['0.png', '1.png', '2.png', '3.png', '4.png']# all cost is 0.6866297721862793
复制代码


从上面的执行结果可知,每个 cpu_task 在 0.5s,5 个任务执行完只要 0.68s,真正的并行计算。


从并发编程步骤来看:


  • 任务分解:cpu_task 每次处理一个 file

  • 通过 submit 并行提交任务

  • 每个 submit 会返回一个 future 对象

  • 通过 future 对象 result()来获取进行的返回值,需要注意的是 result()方法在没有获得返回值时阻塞进程

multiprocessing

multiprocessing 库也提供了关于 python 多线程开发的基本功能和进阶功能(如资源共享和同步锁)。


我们看上面ProcessPoolExecutor同样的例子multiprocessing是怎么实现的。


from multiprocessing import Poolimport time
def cpu_task(file): st = time.time() t = 0 for i in range(1000000): t += i**2 print('one task time is ',time.time()-st) return file
def end_call(arg): print("end_call",arg)

def run(files): workers = min(5, len(files)) p = Pool(workers) result_list = []
for file in sorted(files): res = p.apply_async(func=cpu_task, args=(file,), callback=end_call) result_list.append(res) p.close() p.join()
for ret in result_list: print(ret.get()) return "" if __name__ == "__main__": ts = time.time() files = ['{}.png'.format(i) for i in range(5)] res = run(files)
print('all cost is ', time.time()-ts) # one task time is 0.44005799293518066# end_call 1.png# one task time is 0.45325803756713867# end_call 3.png# one task time is 0.497267484664917# end_call 2.png# one task time is 0.4813237190246582# end_call 4.png# one task time is 0.5435652732849121# end_call 0.png# 0.png# 1.png# 2.png# 3.png# 4.png# all cost is 0.7456285953521729
复制代码


multiprocessing是通过 Pool 来构建进程池,并通过apply_async异步提交任务,apply_async的返回值类型是 ApplyResult, 该对象的 get 方法可以获取任务的返回值;close表示线程池在接受这些任务之后不再接收其他任务,而join是表示等待线程池所有任务执行完。


多进程之间的进程共享数据,multiprocessing通过 multiprocessing.Manager 来进行进程间通信。


一个 multiprocessing.Manager 对象会控制一个服务器进程,其他进程可以通过代理的方式来访问这个服务器进程。从而达到多进程间数据通信且安全。


Manager() 返回的管理器支持类型: list 、 dict 、 Namespace 、 Lock 、 RLock 、 Semaphore 、 BoundedSemaphore 、 Condition 、 Event 、 Barrier 、 Queue 、 Value 和 Array。


from multiprocessing import Pool, Managerimport time
def cpu_task(file, l): st = time.time() t = 0 for i in range(1000000): t += i**2 l.append(file) print('one task time is ',time.time()-st) return file
def end_call(arg): print("end_call",arg)

def run(files): workers = min(5, len(files)) p = Pool(workers) with Manager() as manager: l = manager.list() for file in sorted(files): res = p.apply_async(func=cpu_task, args=(file,l, ), callback=end_call) p.close() p.join()
print(list(l)) return "" if __name__ == "__main__": ts = time.time() files = ['{}.png'.format(i) for i in range(5)] res = run(files)
print('all cost is ', time.time()-ts)
# one task time is 0.5007572174072266# end_call 2.png# one task time is 0.5093979835510254# end_call 3.png# one task time is 0.5186748504638672# end_call 4.png# one task time is 0.518937349319458# end_call 0.png# one task time is 0.5269668102264404# end_call 1.png# ['2.png', '3.png', '4.png', '0.png', '1.png']# all cost is 0.7396669387817383
复制代码


发布于: 刚刚阅读数: 3
用户头像

AIWeker

关注

InfoQ签约作者 / 公众号:人工智能微客 2019-11-21 加入

人工智能微客(aiweker)长期跟踪和分享人工智能前沿技术、应用、领域知识,不定期的发布相关产品和应用,欢迎关注和转发

评论

发布
暂无评论
python小知识-并发编程(3)_Python_AIWeker_InfoQ写作社区