写点什么

运筹帷幄决胜千里,Python3.10 原生协程 asyncio 工业级真实协程异步消费任务调度实践

  • 2022 年 8 月 08 日
  • 本文字数:4803 字

    阅读完需:约 16 分钟

运筹帷幄决胜千里,Python3.10原生协程asyncio工业级真实协程异步消费任务调度实践

我们一直都相信这样一种说法:协程是比多线程更高效的一种并发工作方式,它完全由程序本身所控制,也就是在用户态执行,协程避免了像线程切换那样产生的上下文切换,在性能方面得到了很大的提升。毫无疑问,这是颠扑不破的业界共识,是放之四海而皆准的真理。


但事实上,协程远比大多数人想象中的复杂,正因为协程的“用户态”特性,任务调度权掌握在撰写协程任务的人手里,而仅仅依赖 async 和 await 关键字远远达不到“调度”的级别,有时候反而会拖累任务效率,使其在任务执行效率上还不及“系统态”的多线程和多进程,本次我们来探讨一下 Python3 原生协程任务的调度管理。

Python3.10 协程库 async.io 的基本操作

事件循环(Eventloop)是 原生协程库 asyncio 的核心,可以理解为总指挥。Eventloop 实例提供了注册、取消和执行任务和回调的方法。


Eventloop 可以将一些异步方法绑定到事件循环上,事件循环会循环执行这些方法,但是和多线程一样,同时只能执行一个方法,因为协程也是单线程执行。当执行到某个方法时,如果它遇到了阻塞,事件循环会暂停它的执行去执行其他的方法,与此同时为这个方法注册一个回调事件,当某个方法从阻塞中恢复,下次轮询到它的时候将会继续执行,亦或者,当没有轮询到它,它提前从阻塞中恢复,也可以通过回调事件进行切换,如此往复,这就是事件循环的简单逻辑。


而上面最核心的动作就是切换别的方法,怎么切换?用 await 关键字:


import asyncio      async def job1():      print('job1开始')      await asyncio.sleep(1)      print('job1结束')      async def job2():      print('job2开始')      async def main():      await job1()      await job2()      if __name__ == '__main__':      asyncio.run(main())
复制代码


系统返回:


job1开始  job1结束  job2开始
复制代码


是的,切则切了,可切的对吗?事实上这两个协程任务并没有达成“协作”,因为它们是同步执行的,所以并不是在方法内 await 了,就可以达成协程的工作方式,我们需要并发启动这两个协程任务:


import asyncio      async def job1():      print('job1开始')      await asyncio.sleep(1)      print('job1结束')      async def job2():      print('job2开始')      async def main():      #await job1()      #await job2()      await asyncio.gather(job1(), job2())      if __name__ == '__main__':      asyncio.run(main())
复制代码


系统返回:


job1开始  job2开始  job1结束
复制代码


如果没有 asyncio.gather 的参与,协程方法就是普通的同步方法,就算用 async 声明了异步也无济于事。而 asyncio.gather 的基础功能就是将协程任务并发执行,从而达成“协作”。


但事实上,Python3.10 也支持“同步写法”的协程方法:


async def create_task():      task1 = asyncio.create_task(job1())      task2 = asyncio.create_task(job2())      await task1      await task2
复制代码


这里我们通过 asyncio.create_task 对 job1 和 job2 进行封装,返回的对象再通过 await 进行调用,由此两个单独的异步方法就都被绑定到同一个 Eventloop 了,这样虽然写法上同步,但其实是异步执行:


import asyncio      async def job1():      print('job1开始')      await asyncio.sleep(1)      print('job1结束')      async def job2():      print('job2开始')      async def create_task():      task1 = asyncio.create_task(job1())      task2 = asyncio.create_task(job2())      await task1      await task2      async def main():      #await job1()      #await job2()      await asyncio.gather(job1(), job2())      if __name__ == '__main__':      asyncio.run(create_task())
复制代码


系统返回:


job1开始  job2开始  job1结束
复制代码

协程任务的上下游监控

解决了并发执行的问题,现在假设每个异步任务都会返回一个操作结果:


async def job1():      print('job1开始')      await asyncio.sleep(1)      print('job1结束')        return "job1任务结果"      async def job2():      print('job2开始')        return "job2任务结果"
复制代码


通过 asyncio.gather 方法,我们可以收集到任务执行结果:


async def main():        res = await asyncio.gather(job1(), job2())      print(res)
复制代码


并发执行任务:


import asyncio      async def job1():      print('job1开始')      await asyncio.sleep(1)      print('job1结束')        return "job1任务结果"      async def job2():      print('job2开始')        return "job2任务结果"        async def main():        res = await asyncio.gather(job1(), job2())      print(res)      if __name__ == '__main__':      asyncio.run(main())
复制代码


系统返回:


job1开始  job2开始  job1结束  ['job1', 'job2']
复制代码


但任务结果仅仅也就是方法的返回值,除此之外,并没有其他有价值的信息,对协程任务的执行明细讳莫如深。


现在我们换成 asyncio.wait 方法:


async def main():        res = await asyncio.wait([job1(), job2()])      print(res)
复制代码


依然并发执行:


import asyncio      async def job1():      print('job1开始')      await asyncio.sleep(1)      print('job1结束')        return "job1任务结果"      async def job2():      print('job2开始')        return "job2任务结果"        async def main():        res = await asyncio.wait([job1(), job2()])      print(res)      if __name__ == '__main__':      asyncio.run(main())
复制代码


系统返回:


job1开始  job2开始  job1结束  ({<Task finished name='Task-2' coro=<job1() done, defined at /Users/liuyue/Downloads/upload/test/test_async.py:4> result='job1任务结果'>, <Task finished name='Task-3' coro=<job2() done, defined at /Users/liuyue/Downloads/upload/test/test_async.py:12> result='job2任务结果'>}, set())
复制代码


可以看出,asyncio.wait 返回的是任务对象,里面存储了大部分的任务信息,包括执行状态。


在默认情况下,asyncio.wait 会等待全部任务完成 (return_when='ALL_COMPLETED'),它还支持 return_when='FIRST_COMPLETED'(第一个协程完成就返回)和 return_when='FIRST_EXCEPTION'(出现第一个异常就返回)。


这就非常令人兴奋了,因为如果异步消费任务是发短信之类的需要统计达到率的任务,利用 asyncio.wait 特性,我们就可以第一时间记录任务完成或者异常的具体时间。

协程任务守护

假设由于某种原因,我们手动终止任务消费:


import asyncio      async def job1():      print('job1开始')      await asyncio.sleep(1)      print('job1结束')        return "job1任务结果"      async def job2():      print('job2开始')        return "job2任务结果"        async def main():      task1 = asyncio.create_task(job1())      task2 = asyncio.create_task(job2())      task1.cancel()      res = await asyncio.gather(task1, task2)      print(res)      if __name__ == '__main__':      asyncio.run(main())
复制代码


系统报错:


File "/Users/liuyue/Downloads/upload/test/test_async.py", line 23, in main      res = await asyncio.gather(task1, task2)  asyncio.exceptions.CancelledError  
复制代码


这里 job1 被手动取消,但会影响 job2 的执行,这违背了协程“互相提携”的特性。


事实上,asyncio.gather 方法可以捕获协程任务的异常:


import asyncio      async def job1():      print('job1开始')      await asyncio.sleep(1)      print('job1结束')        return "job1任务结果"      async def job2():      print('job2开始')        return "job2任务结果"        async def main():      task1 = asyncio.create_task(job1())      task2 = asyncio.create_task(job2())      task1.cancel()      res = await asyncio.gather(task1, task2,return_exceptions=True)      print(res)      if __name__ == '__main__':      asyncio.run(main())
复制代码


系统返回:


job2开始  [CancelledError(''), 'job2任务结果']
复制代码


可以看到 job1 没有被执行,并且异常替代了任务结果作为返回值。


但如果协程任务启动之后,需要保证任务情况下都不会被取消,此时可以使用 asyncio.shield 方法守护协程任务:


import asyncio      async def job1():      print('job1开始')      await asyncio.sleep(1)      print('job1结束')        return "job1任务结果"      async def job2():      print('job2开始')        return "job2任务结果"        async def main():      task1 = asyncio.shield(job1())      task2 = asyncio.create_task(job2())            res = await asyncio.gather(task1, task2,return_exceptions=True)        task1.cancel()      print(res)      if __name__ == '__main__':      asyncio.run(main())
复制代码


系统返回:


job1开始  job2开始  job1结束  ['job1任务结果', 'job2任务结果']
复制代码

协程任务回调

假设协程任务执行完毕之后,需要立刻进行回调操作,比如将任务结果推送到其他接口服务上:


import asyncio      async def job1():      print('job1开始')      await asyncio.sleep(1)      print('job1结束')        return "job1任务结果"      async def job2():      print('job2开始')        return "job2任务结果"      def callback(future):      print(f'回调任务: {future.result()}')        async def main():      task1 = asyncio.shield(job1())      task2 = asyncio.create_task(job2())        task1.add_done_callback(callback)            res = await asyncio.gather(task1, task2,return_exceptions=True)        print(res)      if __name__ == '__main__':      asyncio.run(main())
复制代码


这里我们通过 add_done_callback 方法对 job1 指定了 callback 方法,当任务执行完以后,callback 会被调用,系统返回:


job1开始  job2开始  job1结束  回调任务: job1任务结果  ['job1任务结果', 'job2任务结果']
复制代码


与此同时,add_done_callback 方法不仅可以获取协程任务返回值,它自己也支持参数参数传递:


import asyncio  from functools import partial    async def job1():      print('job1开始')      await asyncio.sleep(1)      print('job1结束')        return "job1任务结果"      async def job2():      print('job2开始')        return "job2任务结果"      def callback(future,num):      print(f"回调参数{num}")      print(f'回调任务: {future.result()}')        async def main():      task1 = asyncio.shield(job1())      task2 = asyncio.create_task(job2())        task1.add_done_callback(partial(callback,num=1))            res = await asyncio.gather(task1, task2,return_exceptions=True)        print(res)      if __name__ == '__main__':      asyncio.run(main())
复制代码


系统返回:


job1开始  job2开始  job1结束  回调参数1  回调任务: job1任务结果  ['job1任务结果', 'job2任务结果']
复制代码

结语

成也用户态,败也用户态。所谓水能载舟亦能覆舟,协程消费任务的调度远比多线程的系统级调度要复杂,稍不留神就会造成业务上的“同步”阻塞,弄巧成拙,适得其反。这也解释了为什么相似场景中多线程的出场率要远远高于协程,就是因为多线程不需要考虑启动后的“切换”问题,无为而为,简单粗暴。

发布于: 刚刚阅读数: 2
用户头像

专注技术,凝聚意志,解决问题 v3u.cn 2020.12.21 加入

还未添加个人简介

评论

发布
暂无评论
运筹帷幄决胜千里,Python3.10原生协程asyncio工业级真实协程异步消费任务调度实践_Python_刘悦的技术博客_InfoQ写作社区