写点什么

Python 笔记六之多进程

作者:Hunter熊
  • 2024-03-12
    北京
  • 本文字数:5657 字

    阅读完需:约 19 分钟

Python笔记六之多进程

本文首发于公众号:Hunter 后端

原文链接:Python笔记六之多进程


在 Python 里,我们使用 multiprocessing 这个模块来进行多进程的操作。


multiprocessing 模块通过创建子进程的方式来运行多进程,因此绕过了 Python 里 GIL 的限制,可以充分利用机器上的多个处理器。

1、多进程使用示例

多进程的使用方式和多线程的方式类似,这里使用到的是 multiprocessing.Process 类,下面是一个简单的示例:


from multiprocessing import Processimport time
def f(x): if x % 2 == 1: time.sleep(x+1) print(x * x) return x * x

def test_multi_process(): processes = []
for i in range(5): processes.append(Process(target=f, args=(i,)))
for p in processes: p.start()
for p in processes: p.join(0.5)
for p in processes: print(p, p.is_alive(), p.pid, p._parent_pid)

if __name__ == "__main__": test_multi_process()
复制代码


在上面的示例中,test_multi_process() 函数里使用多进程的方式调用 f 函数,和多线程的调用方式一致,通过 start() 方法启动进程活动,使用 join() 方法阻塞调用其的进程。


接下来介绍一下 multiprocessing.Process 的一些方法和属性。

1. run()

表示进程活动的方法,可以在子类中重载此方法,比如多线程笔记的操作里重写 run() 对函数执行报错进行了处理,并返回了执行结果

2. start()

启动进程活动,将对象的 run() 方法在一个单独的进程中调用

3. join()

阻塞调用 join() 方法的进程,在上面的示例中也就是父进程,默认值为 None,也就表示阻塞操作。


如果设置为其他正数值,那么则最多会阻塞多少秒,比如上面的示例为 0.5 秒,如果超时,那么父进程则会继续往后执行。


比如上面的示例输出结果如下:


0416<Process name='Process-1' pid=6600 parent=24248 stopped exitcode=0> False 6600 24248<Process name='Process-2' pid=4368 parent=24248 started> True 4368 24248<Process name='Process-3' pid=13024 parent=24248 stopped exitcode=0> False 13024 24248<Process name='Process-4' pid=3288 parent=24248 started> True 3288 24248<Process name='Process-5' pid=16880 parent=24248 stopped exitcode=0> False 16880 2424819
复制代码


在打印每个进程的信息时,f() 函数内部进行 sleep 的进程还没有执行结束,但是进程已经超时了,所以不再阻塞父进程向下执行。

4. is_alive()

上面有打印出信息,返回布尔值,表示该进程是否还活着。

5. pid 和 parent_pid

上面使用 .pid 和 ._parent_pid 属性打印出了每个进程的 id 和其父进程的 id。

2、进程池

进程使用的对象是 multiprocessing.pool.Pool()。


接受 processes 参数为进程数,表示要使用的工作进程数目,如果不传入,则默认使用 cpu 的核数,根据 os.cpu_count() 获取。


接下来分别使用示例介绍 multiprocessing.pool 下的几个调用方法,进程池的使用可以使用 map() 和 starmap() 两个函数。

1. map()

map() 接受两个参数,func 表示多进程要执行的函数,iterable 表示要执行的 func 函数输入的参数的迭代对象。


这里需要注意一下,map() 函数使用的 func 函数只能接受一个参数,比如我们前面定义的 f 函数,下面是其使用示例:


def f(x):    return x * x

def test_pool_map(): with Pool(processes=4) as pool: results = pool.map(func=f, iterable=range(10))
print(results)
复制代码

2. starmap()

starmap() 函数与 map() 使用方法类似,但是 iterable 迭代参数的元素是 func 函数的多个参数,比如我们想要对下面的 add() 函数使用多进程:


def f_add(x, y):    return x + y
复制代码


它的调用方式如下:


def test_pool_starmap():    with Pool(processes=4) as pool:        results = pool.starmap(func=f_add, iterable=zip(range(6), range(6, 12)))        print(results)
复制代码


这里返回的 results 是一个列表,元素是每个进程执行的函数的返回结果。

3、进程间交换对象

前面介绍了,多进程的运行方式是通过建立子进程的形式来操作,而不同进程间数据是不共享的,这一点不同于多线程。


因为多线程的操作是在同一个进程内实现的,所以线程间数据是共享数据资源的。


接下来介绍一下如何在进程间进行对象的交换,其实进程间进行对象的交换是一个子命题,更高层级的概括是在进程间进行通信,在官方的文档中对其进行了细分,所以这里也对其进行分类别的介绍。


在进程间进行对象交换的方式有两种,一种是队列,一种是管道。

1. 队列

1) 队列的代码示例

这里的模块的引入是 multiprocess.Queue,这个类近似于是 queue.Queue 的克隆,以下是官方文档的一个示例,内容是在父进程中创建一个队列,然后在子进程中写入数据,然后再在父进程中读取:



from multiprocessing import Process, Queue
def f(q): q.put([42, None, 'hello'])
if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join()
复制代码


队列的写入使用 put(),读取使用 get()。


get() 还可以加上两个参数,block 和 timeout,block 表示是否阻塞,timeout 表示获取的超时时间。


接下来我们再实现一个功能,两个子进程写入数据,一个子进程读取数据,代码示例如下:


from multiprocessing import Queue, Process

def f_write(q, n, name): for i in range(n): q.put(f"{name}_{i}") time.sleep(0.1)

def f_read(q): while q.qsize() > 0: print(q.get(block=False, timeout=1)) time.sleep(0.5)

def test_queue(): # 三个进程,一个写进程,两个读进程 q = Queue() q.put("origin_value") q.put("b")
# p1 = Process(target=f_queue, args=(q, "c")) # p2 = Process(target=f_queue, args=(q, )) p1 = Process(target=f_write, args=(q, 5, "a")) p2 = Process(target=f_write, args=(q, 8, "b")) p3 = Process(target=f_read, args=(q,))
p1.start() p2.start() p3.start()
p1.join() p2.join() p3.join()
print("total: ", q.qsize())
if __name__ == "__main__": test_queue()
复制代码

2) 队列的相关方法

关于队列的相关函数,除了前面介绍的几种,还有比如判断队列的长度,是否为空等。

a) Queue()

在定义一个队列的时候,我们前面是直接定义 q=Queue(),不为其设置元素长度,而如果我们想要为其设置一个最大的长度,可以加上 maxsize 参数:


q = Queue(maxsize=3)
复制代码


那么队列里最多只能有三个元素,而如果队列满了还往其中 put() 加入操作,则会阻塞,直到其他进程对其读取其中的数据。

b) put()

put() 函数表示的是往队列里添加元素,元素的类型不限,添加数字,字符串,字典,列表都可以:


q = Queue()q.put(1)q.put({"a": 4})q.put([1,3,4])
复制代码


前面介绍了,如果队列满了,还往队列里进行 put() 操作,则会进入阻塞操作,可以通过添加 block 或者 timeout 来进行避免。


block 表示是否阻塞,为 True 的话则会进入阻塞等待状态。False 的话则会引发异常。


timeout 表示超时,尝试往队列里添加数据,超出等待时间同样已发队列已满的异常。

c) get()

get() 函数表示从队列中读取元素,队列的写入和读取的原则是先入先出,最先进去的最先出来。


而为了避免队列为空的情况下进行 get() 进入阻塞状态,get() 可以使用两个参数,一个是 block,表示是否阻塞,一个是 timeout,表示超时时间。


如果队列为空还进行 get() 操作,使用上面这两个操作则会 raise 一个 Empty 的 error。

d) qsize()

返回队列的长度,但由于多进程或多线程的上下文,这个数字是不可靠的。

e) empty()

如果队列是空的,则返回 True,否则返回 False,由于多进程或多线程的环境,该状态是不可靠的。

f) full()

如果队列设置了 maxsize 参数,那么如果队列满了,则返回 True,否则返回 False,由于多进程或多线程的环境,该状态是不可靠的。

g) close()

关闭队列,如果执行了 q.close(),再往里面添加元素执行 q.put() 操作,则会引发报错。

2. 管道

1) 管道的相关函数

管道的引入方式如下:


from multiprocessing import Pipe
复制代码


管道的定义可以直接实例化 Pipe,返回管道的两端:


conn1, conn2 = Pipe()
复制代码


默认情况下,Pipe() 的参数 duplex 值为 True,表示管道是双工的,也就是可以双向通信的,比如 conn1 可以写入,也可以读出,conn2 可以写入也可以读出数据。


而如果手动设置 duplex 为 False,那么管道则是单向的,conn1 只能用于接收消息,conn2 只能发送消息。


管道用于发送和接收的函数分别如下:


发送信息


conn.send(obj)
复制代码


发送的对象可以是字符串,也可以是其他对象,比如列表,字典等。


接收信息


conn.recv()
复制代码


关闭连接对象


我们可以使用 close() 来关闭连接对象,当连接对象被垃圾回收时会自动调用:


conn.close()
复制代码


判断连接对象中是否有可以读取的数据


如果我们直接使用 conn.recv() 的时候,如果管道内没有可接收的对象,会进入阻塞状态,直到管道内传入数据。


我们可以使用 poll() 函数判断管道内是否有可以读取的数据,返回的是一个布尔型数据,表示是否有数据:


has_data = conn.poll()
复制代码


但是如果不设置超时时间,同样会进入等待状态,所以可以设置一个最大阻塞秒数:


has_data = conn.pool(timeout=3)  # 等待 3 秒
复制代码

2) 管道的代码示例

接下来我们用下面的代码来进行管道的双工测试,即从管道的两端分别写入和读取数据。


from multiprocessing import Process, Pipe

def send_info(conn, info): conn.send(info) conn.close()

def read_info(conn): while conn.poll(timeout=2): info = conn.recv() print(info)

def test_pipe(): # 两个 conn 分别都往里面读和写 parent_conn, child_conn = Pipe()
# p1 向 child 管道写入 print("id out of func: ", id(child_conn)) p1 = Process(target=send_info, args=(child_conn, "send_info_from_child")) p1.start() p1.join()
# p2 从 parent 管道读取 p2 = Process(target=read_info, args=(parent_conn,)) p2.start() p2.join()
# p3 向 parent 管道写入 p3 = Process(target=send_info, args=(parent_conn, "send_info_from_parent")) p3.start() p3.join()
# p4 从 child 管道读取 p4 = Process(target=read_info, args=(child_conn,)) p4.start() p4.join()

if __name__ == "__main__": test_pipe()
复制代码


注意 :如果两个进程(或线程)同时尝试读取或写入管道的 同一 端,则管道中的数据可能会损坏。当然,在不同进程中同时使用管道的不同端的情况下不存在损坏的风险。

4、进程间同步

与多线程一样,多进程也可以使用锁来确保一次只有一个进程来执行一个操作,比如有一个打印到标准输出的操作,我们需要确保其打印的日志不紊乱,就可以使用下面的操作:


from multiprocessing import Process, Lock
def f(l, i): l.acquire() try: print("hello ", i) finally: l.release()
if __name__ == "__main__": lock = Lock() for num in range(10): Process(target=f, args=(lock, num)).start()
复制代码


而如果不使用锁,我们重写 f 函数如下:


def f(l, i):    print("hello ", i)
复制代码


多执行几次,我们可以看到控制台的输出会出现错乱的情况,这样就可能对输出信息不能直观查看,比如:


hello  2hello  0hello  4hello hello  3 1hello  5hello  6hello  8hello  9hello  7
复制代码

5、进程间共享状态

在并发编程的时候,应当尽量避免使用共享状态,尤其是多进程操作时,但如果真的有这个需求,需要共享一些数据,multiprocessing 提供了两种方法,一种是共享内存,一种是服务进程。

1. 共享内存

我们可以使用 Value 或者 Array 将数据存储在共享内存映射中。


Value 是存储的单个变量,Array 存储的是数组,注意下,这里的 Value 和 Array 在定义的时候都需要指定元素类型。


其引入及代码示例如下:


from multiprocessing import Process, Value, Array

def f(n, a): n.value = 5 a[0] = 100

if __name__ == "__main__": num = Value('d', 1) arr = Array('i', range(5)) print(num.value) print(arr[:])
p = Process(target=f, args=(num, arr)) p.start() p.join()
print(num.value) print(arr[:])
复制代码


其中,引入的方式可以直接从 multiprocessing 中引入,在定义 Value 和 Array 的时候,第一个参数是 'd' 和 'i',分别表示类型是双精度浮点数和有符号整数。


这些共享对象将是进程和线程安全的。


更多的关于共享内存的信息,可以使用 multiprocessing.sharedctypes 模块。

2. 服务进程

我们可以使用 Manager() 返回的管理对象控制一个服务进程,这个进程还可以保存 Python 对象并允许其他进程使用代理操作它们。


这个操作的意思就是使用 Manager() 会跟多进程的操作方式一样,创建一个子进程,然后将一些需要共享的数据都放到这个子进程里,其他子进程可以操作这个子进程的数据来达到数据共享的目的。


Manager() 支持的数据类型有:list,dict,Namespace,Lock,Value,Array 等,下面介绍一下代码示例:


from multiprocessing import Process, Manager

def f(d, l): d["a"] = 1 d["b"] = 2 l[0] = 100

if __name__ == "__main__": with Manager() as manager: d = manager.dict() l = manager.list(range(5))
p = Process(target=f, args=(d, l)) p.start() p.join()
print(d) print(l)
复制代码


使用服务进程的管理器比使用共享内存对象更灵活,因为它们可以支持任意对象类型。


此外,单个管理器可以通过网络由不同计算机上的进程共享。但是,它们比使用共享内存慢。


发布于: 刚刚阅读数: 4
用户头像

Hunter熊

关注

公众号:Hunter后端 2018-09-17 加入

Python后端工程师,欢迎互相沟通交流

评论

发布
暂无评论
Python笔记六之多进程_Python_Hunter熊_InfoQ写作社区