python 多进程
上一小节中分享了 python 并发的特性,如果想真正利用现在多核的 cpu 进行并行计算就需要利用多进程。
就如前面所说的,进程是拥有独立数据和内存空间的,进程之间彼此独立;所以如果说进程之间如果要进行数据交互或者返回数据等交互操作,就需要利用中间的服务机制来协调;
python 的多进程方式, 有:
有官方提供的concurrent
的ProcessPoolExecutor
multiprocessing
第三方的任务处理队列库 celery(常见的中间服务机制是 redis 和 RabbitMQ)
分布式计算框架 Ray(以 redis 为任务管理)
ProcessPoolExecutor
from concurrent import futures
from time import sleep, strftime
import time
def cpu_task(file):
st = time.time()
t = 0
for i in range(1000000):
t += i**2
print('one task time is ',time.time()-st)
return file
def run(files):
workers = min(5, len(files))
result_list = []
ret = []
with futures.ProcessPoolExecutor(workers) as executor:
for file in sorted(files):
future_result = executor.submit(cpu_task, file)
result_list.append(future_result)
for future in result_list:
res = future.result()
ret.append(res)
return ret
if __name__ == "__main__":
ts = time.time()
files = ['{}.png'.format(i) for i in range(5)]
res = run(files)
print(res)
print('all cost is ', time.time()-ts)
# one task time is 0.503483772277832
# one task time is 0.503483772277832
# one task time is 0.5034773349761963
# one task time is 0.5095388889312744
# one task time is 0.5119566917419434
# ['0.png', '1.png', '2.png', '3.png', '4.png']
# all cost is 0.6866297721862793
复制代码
从上面的执行结果可知,每个 cpu_task 在 0.5s,5 个任务执行完只要 0.68s,真正的并行计算。
从并发编程步骤来看:
任务分解:cpu_task 每次处理一个 file
通过 submit 并行提交任务
每个 submit 会返回一个 future 对象
通过 future 对象 result()来获取进行的返回值,需要注意的是 result()方法在没有获得返回值时阻塞进程
multiprocessing
multiprocessing 库也提供了关于 python 多线程开发的基本功能和进阶功能(如资源共享和同步锁)。
我们看上面ProcessPoolExecutor
同样的例子multiprocessing
是怎么实现的。
from multiprocessing import Pool
import time
def cpu_task(file):
st = time.time()
t = 0
for i in range(1000000):
t += i**2
print('one task time is ',time.time()-st)
return file
def end_call(arg):
print("end_call",arg)
def run(files):
workers = min(5, len(files))
p = Pool(workers)
result_list = []
for file in sorted(files):
res = p.apply_async(func=cpu_task, args=(file,), callback=end_call)
result_list.append(res)
p.close()
p.join()
for ret in result_list:
print(ret.get())
return ""
if __name__ == "__main__":
ts = time.time()
files = ['{}.png'.format(i) for i in range(5)]
res = run(files)
print('all cost is ', time.time()-ts)
# one task time is 0.44005799293518066
# end_call 1.png
# one task time is 0.45325803756713867
# end_call 3.png
# one task time is 0.497267484664917
# end_call 2.png
# one task time is 0.4813237190246582
# end_call 4.png
# one task time is 0.5435652732849121
# end_call 0.png
# 0.png
# 1.png
# 2.png
# 3.png
# 4.png
# all cost is 0.7456285953521729
复制代码
multiprocessing
是通过 Pool 来构建进程池,并通过apply_async
异步提交任务,apply_async
的返回值类型是 ApplyResult, 该对象的 get 方法可以获取任务的返回值;close
表示线程池在接受这些任务之后不再接收其他任务,而join
是表示等待线程池所有任务执行完。
多进程之间的进程共享数据,multiprocessing
通过 multiprocessing.Manager 来进行进程间通信。
一个 multiprocessing.Manager 对象会控制一个服务器进程,其他进程可以通过代理的方式来访问这个服务器进程。从而达到多进程间数据通信且安全。
Manager() 返回的管理器支持类型: list 、 dict 、 Namespace 、 Lock 、 RLock 、 Semaphore 、 BoundedSemaphore 、 Condition 、 Event 、 Barrier 、 Queue 、 Value 和 Array。
from multiprocessing import Pool, Manager
import time
def cpu_task(file, l):
st = time.time()
t = 0
for i in range(1000000):
t += i**2
l.append(file)
print('one task time is ',time.time()-st)
return file
def end_call(arg):
print("end_call",arg)
def run(files):
workers = min(5, len(files))
p = Pool(workers)
with Manager() as manager:
l = manager.list()
for file in sorted(files):
res = p.apply_async(func=cpu_task, args=(file,l, ), callback=end_call)
p.close()
p.join()
print(list(l))
return ""
if __name__ == "__main__":
ts = time.time()
files = ['{}.png'.format(i) for i in range(5)]
res = run(files)
print('all cost is ', time.time()-ts)
# one task time is 0.5007572174072266
# end_call 2.png
# one task time is 0.5093979835510254
# end_call 3.png
# one task time is 0.5186748504638672
# end_call 4.png
# one task time is 0.518937349319458
# end_call 0.png
# one task time is 0.5269668102264404
# end_call 1.png
# ['2.png', '3.png', '4.png', '0.png', '1.png']
# all cost is 0.7396669387817383
复制代码
评论