写点什么

家务活中的 python 协程

用户头像
行者AI
关注
发布于: 2021 年 04 月 22 日

本文首发于:行者AI


为什么要用协程,通常在 Python 中我们进行并发编程都是使用多线程或者多进程来实现的,对于计算型任务由于 GIL 的存在我们通常使用多进程来实现,而对于 IO 型任务我们可以通过线程调度来让线程在执行 IO 任务时让出 GIL,从而实现表面上的并发。


协程是运行在单线程当中的“并发”,协程相比多线程一大优势就是省去了多线程之间的切换开销,获得了更大的运行效率。本文不会对 python 协程的实现机制展开讨论,只是通过简单的示例展示协程最常见的用法,可以很快上手一些基于协程的高性能的 web 框架,比如 FastAPI。

1. 家务活

假设要干三样家务活,分别是烧水、洗衣服和扫地,作为一个程序员,干活前我总是规划的很有条理,下面是我规划的具体任务线:


  • 烧水壶接水

  • 等待烧水壶烧水

  • 洗衣机放衣服和加入热水

  • 等待洗衣机洗衣服

  • 晾衣服

  • 扫地


想一想我就像那个勤劳的 CPU,不过我还有很多机器可以帮助我干这些活,或许对于 CPU 来说,网卡就像烧水壶,硬盘就像洗衣机。再来分析一下,烧水和洗衣服是同一类活,我们需要做的是把水接到水壶里或者把衣服放进洗衣机里然后打开开关,具体的细节有机器帮我们完成。扫地又是另外一类活,因为没有机器人帮我做,所以需要我自己扫。


如果将烧水和洗衣服类比为 IO 型任务,扫地就是计算密集型任务。

2. 程序描述

上一节中的家务活用程序模拟一下:



def get_network_number() -> int:  """通过网络获取一个整数"""   ... 
def get_file_number(filename: str) -> int: """读取磁盘文件获取一个整数""" ...
def cumulative_sum(start: int, end: int) -> int: """累加求和""" sum = 0 for number in range(start, end): sum += number return sum
def task(): """任务""" result = 1 + 2 network_number = get_network_number() result += network_number file_number = get_file_number(f"{result}.txt") result *= file_number sum = cumulative_sum(0, 10000) task()
复制代码

3. 问题分析及程序改进

母亲作为我们的管家看不下去了,一看我就在偷懒,扫地和另外的两个工作完全没有先后关系,烧水壶烧水和洗衣机工作的时候还可以去扫地,于是就开始指挥我干活,比如在水烧到一半的时候安排我去扫地,地还没扫完又安排我去烧水。


在管家的安排下我这个人力资源被高效的利用了起来,很难有机会闲下来。


这个管家就如同我们的操作系统,于是就有了如下优化后的代码:


from threading import Thread 
def get_network_number() -> int: """通过网络获取一个整数""" ...
def get_file_number(filename: str) -> int: """读取磁盘文件获取一个整数""" ...
def cumulative_sum(start: int, end: int) -> int: """累加求和""" sum = 0 for number in range(start, end): sum += number return sum
def task1(): ""任务1""" result = 1 + 2 network_number = get_network_number() result += network_number file_number = get_file_number() result *= file_number
def task2(): """任务2""" sum = cumulative_sum(0, 10000)
t1 = Thread(target=task1) t2 = Thread(target=task2) t1.start() t2.start() t1.join() t2.join()
复制代码


扫地跟烧水洗衣服没多大关系,是一个需要我们另外执行的任务,这两个任务是并发的关系,所以我们可以将这个任务安排到另一个线程中。于是 CPU 就会在两个线程之间来回的切换,同时执行两个任务。


这种由操作系统指挥的方式有一个很大的弊端,需要频繁的切换任务,这浪费了很多的时间。

4. 引入协程

像我这样聪明的人是不需要管家指挥的,烧水壶接完水打开开关之后,我直接拿起扫帚开始扫地,再也不会傻傻的等了,于是就有了下面这个运行逻辑:



这样干活比管家指挥好多了,也不用浪费来回的任务切换时间,自己根据情况自己安排,下面是最新的协程代码实现:


import asyncio 
# io任务改为协程 async def get_network_number() -> int: """通过网络获取一个整数""" ...
# io任务改为协程 async def get_file_number(filename) -> int: """读取磁盘文件获取一个整数""" ...
# 计算密集型任务不用改协程 def cumulative_sum(start: int, end: int) -> int: """累加求和""" sum = 0 for number in range(start, end): sum += number return sum
async def task1(): """任务1""" result = 1 + 2 network_number = await get_network_number() result += network_number file_number = await get_file_number(f'{result}.txt') result *= file_number
async def task2(): """任务2""" sum = cumulative_sum(0, 10000)
async def main(): task1 = asyncio.create_task(task1()) task2 = asyncio.create_task(task2()) await task1 await task2 asyncio.run(main())
复制代码


协程版本我们会发现这样一个问题,扫地是一个计算密集型任务,所以干起活来就停不下来,水可能已经烧好了,但是必须干完扫地这个活才能回去洗衣服。


为了解决这个问题,扫地的途中可以主动停一下,可以把扫地的活分几次干,这样不就可以去看看有没有其他活可以干了吗。在 python 协程中我们可以用 asyncio.sleep 让我们停下手头的活去干其它的活,下面是对计算密集型任务的改造:


async def cumulative_sum(start: int, end: int):   result = 0   for i in range(start, end):     if i % 100 == 0:     await asyncio.sleep(1)     result += i     return result 
async def task2(): """任务2""" sum = await cumulative_sum(0, 10000)
复制代码


我们每加 100 次就去看一下还有没有其它活可以干,就好比水烧好了我们就可以去洗衣服了,衣服放到洗衣机里我们再回来,如果啥活都没有就休息会,时间到了继续干(sleep 是有时间长度的)。


上述的思路是我们把扫地一个任务分成了好几件事情做。

5. 任务和事件

通过前几节的分析,我们从家务活中找到了两个很重要的概念:


  • 任务

  • 事情


我们发现任务是由很多有顺序关系的事情组成的,我们完成各类任务的时候都是在做一件一件的事情。


回头分析一下 python 程序,我们从中找一下哪些可以对应日常生活中的事情。


对于 task1


async def task1():   """任务1"""   result = 1 + 2   network_number = await get_network_number()   result += network_number   file_number = await get_file_number(f"{result}.txt")   result *= file_number 
复制代码


我们发现了这些 CPU 要干的事情(简化起见忽略网络请求和读取文件时 CPU 使用,同样忽略 netwok_number 和 file_number 的赋值操作):


  1. result = 1 + 2

  2. result += network_number

  3. result *= file_number


对于 task2


async def task2():   """任务2"""   sum = await cumulative_sum(0, 10000) 
复制代码


由于 task2 执行的逻辑在 cumulative_sum 中,所以我们还要继续分析 cumulative_sum 这个协程产生的事件


async def cumulative_sum(start: int, end: int):   result = 0   for i in range(start, end):     if i % 100 == 0:       await asyncio.sleep(1)   result += i   return result 
复制代码


累加 100 次我们看作一个事情,那么 task2 这个任务就是由很多累加 100 次的事情组成的。我们通过上面的分析可以看到,生活中的事情就是 python 协程中的事件,await 就是很明显的事件分割点。我们的程序可以由很多并发的任务组成,这些任务当中又包含着大量的事件,程序实际执行过程中的最小单位是这些事件。

6. 事件循环

我们按执行事件的思路完成程序的整个执行过程,应该怎么实现呢?


我们可以创建一个循环,这个循环就是用来执行事件的。最开始这个循环里面什么都没有,之后我们创建了一个任务,这个任务里面有好几个事件,于是我们先把这个任务里的第一个事件放到事件循环中,于是事件循环执行我们放入的这个事件,当这个事件结束的时候我们再把之后需要执行的事件再放到事件循环中,就这样经过有序的多次事件添加之后事件循环执行完了我们任务里所有的事件,任务结束。


由于事件循环一次只能执行一个事件,当我们有好几个任务的时候,事件就会排起队依次等待执行。

7. 细节讨论

我们来看看文件读取整数的操作,正常的读取是这样的:


async def get_file_number(filename):   with open(filename) as f:     number = int(f.read())     return number 
复制代码


我们发现读取操作中没有 await,它的执行和我们未加入 asyncio.sleep 的 cumulative_sum 是一样的,所以主程序即便是在做磁盘 IO 的时候也是在等待状态的,不会去执行其它事件,我们需要对磁盘 IO 操作也做改造处理,以最大化利用 CPU 资源。


这个时候线程就派上用场了,python 提供的改造是这样的:


import asyncio 
def _read_file(fd): return fd.read()
async def get_file_number(filename): loop = asyncio.get_event_loop() with open(filename) as f: number = await loop.run_in_executor(None, _read_file, f) return int(number)
复制代码


通过操作系统线程的调度,我们将磁盘 IO 的操作分割出去,给其它事件让出一定的执行权,就好比两个事件可以抢占 CPU 资源,具体哪个执行,操作系统来裁决。同样的 time.sleep 也会阻塞事件循环,所以在使用协程的时候要用 asyncio.sleep。上述的改造方式同样可以用于 cumulative_sum 的改造,替换掉原来的 asyncio.sleep 的改造方式,改用线程执行在 python3.9 中有了更好用的 asyncio.to_thread,协程使用的细节还是得仔细阅读 python 官方文档。

8. 协程的用武之地

通过上一节的细节讨论又引出了两个问题:


1)为什么磁盘 IO 需要用线程调度而网络 IO 不需要?

2)引入线程改造后的协程不是还会存在频繁的任务切换浪费 CPU 时间吗,这样做效率会比多线程方式高吗?


理解上述两个问题是我们灵活使用 python 协程的关键,以下几点是我个人的理解心得,并未分析源码,仅供参考:


  • 网络编程中有同步和异步的方式,异步的方式就是 IO 多路复用。

  • IO 多路复用支持的文件描述符类型和操作系统有关。

  • python 协程中任务的切换依赖于 IO 多路复用。

  • Windows 下磁盘 IO 不支持 IO 多路复用,即便有操作系统支持,如果标准库未做封装,需要我们自己封装。

  • 如果程序中未涉及网络 IO,那么使用协程并不能有效的降低任务切换的开销,但协程良好的同步编程方式依旧可用。

  • 不同的编程语言对协程的实现有所不同,使用方法和应用场景也不尽相同。

发布于: 2021 年 04 月 22 日阅读数: 17
用户头像

行者AI

关注

行者AI,为游戏插上人工智能的翅膀。 2020.12.18 加入

行者AI(成都潜在人工智能科技有限公司)专注于人工智能在游戏领域的研究和应用,凭借自研算法,推出游戏AI、智能内容审核、数据平台等产品服务。

评论

发布
暂无评论
家务活中的python协程