python 中的 asyncio 使用详解与异步协程的处理流程分析
一些核心概念
异步函数的定义
普通函数的定义是使用 def 关键词,异步的函数,协程函数(Coroutine)本质上是一个函数,特点是在代码块中可以将执行权交给其他协程,使用 async def 来定义
如何调用协程并且得到它的运行结果?
调用普通的函数只需要 result = add2(2),这时函数就可以得到运行,并且将结果 4 返回给 result,如果使用 result = add3(2),此时再打印 result 呢?
得到的是一个 coroutine 对象,<coroutine object add3 at 0x000002ED564A5048>,并不是 2+3=5 这个结果,怎样才能得到结果呢?
协程函数想要执行需要放到事件循环里执行。
事件循环 Eventloop
Eventloop 是 asyncio 应用的核心,把一些异步函数注册到这个事件循环上,事件循环会循环执行这些函数,当执行到某个函数时,如果它正在等待 I/O 返回,如它正在进行网络请求,或者 sleep 操作,事件循环会暂停它的执行去执行其他的函数;当某个函数完成 I/O 后会恢复,下次循环到它的时候继续执行。因此,这些异步函数可以协同(Cooperative)运行:这就是事件循环的目标。
返回到上面的函数,想要得到函数执行结果,需要有一个 Eventloop
或者使用 await 关键字来修饰函数的调用,如 result = await add3(2),但是 await 只能用在协程函数中,所以想要用 await 关键字就还需要定义一个协程函数
但最终的执行还是需要放到一个事件循环中进行
稍微复杂一点的例子
这段代码定义了两个协程,并将它们放到另外一个协程 main 函数中,想要获得它们运行的结果,事件循环的特点是当它遇到某个 I/O 需要等待(如这里的 asyncio.sleep()函数)的时候,可以去执行其它的函数,这样,整个函数执行所需要的时间,应该是所有协程中执行时间最长的那个,对于上面这个代码来说,一个 sleep 了 3 秒,一个 sleep 了 1 秒,总的用时应该是 3 秒多一点,但结果是这样吗?
它的输出是这样的
它的用时是 4 秒多一点,而且是先执行了 testa 函数,然后再执行了 testb 函数,是串行的依次执行的,并没有像我们想象中的并发执行。那应该怎样才能并发执行呢?
需要将协程放到 asyncio.gather() 中运行,上面的代码得到的输出是
可以看到,testa 和 testb 是同步在运行,由于 testb 只 sleep 了 1 秒钟,所以 testb 先输出了 Resuming b,最后将每个协程函数的结果返回,注意,这里是 gather()函数里的每一个协程函数都执行完了,它才结果,结果是一个列表,列表里的值顺序和放到 gather 函数里的协程的顺序是一致的。
除了使用 asyncio.gather 来执行协程函数以外,还可以使用 Task 任务对象
使用 asyncio.ensure_future(testa(1))返回一个 task 对象,此时 task 进入 pending 状态,并没有执行,这时 print(taska) 得到<Task pending coro=<testa() running at F:/python/python3Test/asynctest.py:7>> 些时,taska.done()返回 False,表示它还没有结束,当调用 await taska 时表示开始执行该协程,当执行结束以后,taska.done() 返回 True,这时可以调用 taska.result() 得到函数的返回值,如果协程还没有结束就调用 result()方法则会抛个异常,raise InvalidStateError(‘Result is not ready.’)
创建 task 对象除了使用 asyncio.ensure_future()方法还可以使用 loop.create_task() 方法
上面一直在使用 asyncio.gather()函数来执行协程函数,还有一个 asyncio.wait()函数,它的参数是协程的列表。
使用 wait 和 gather 有哪些区别呢?
首先,gather 是需要所有任务都执行结束,如果某一个协程函数崩溃了,则会抛异常,都不会有结果。
wait 可以定义函数返回的时机,可以是 FIRST_COMPLETED(第一个结束的),FIRST_EXCEPTION(第一个出现异常的),ALL_COMPLETED(全部执行完,默认的)
这段代码要求在出现第一个异常的时候就结果,函数整体不会崩溃,只是如果这里想要获取结果的话它是一个异常对象。
可以在实际的工作中,由于以前写了太多的多线程与多进程,所以对于以前编写风格和一些由于没有异步支持的库函数来说,由于要写在异步里,所以对于编写代码来说还是要处理很多同步的方法,今天在这里整理一下在异步操作中如果处理同步的函数问题。
为了更好的演示,我准备了三个函数,一个同步的函数,两个异步的函数
协程中控制任务
异步函数的定义
上面的函数,比如说我只想将 asyncfunc1() 函数运行并且得结果,可以使用 loop.create_task()方法创建一个 task 对象,task 是 Futures 的子类,当调用 loop.run_until_complete() 以后,协程跑完以后,通过 task.result()获取协程函数的返回结果。
输出结果为
主线程和跑的协程函数是在同一个线程中。
也可以给 task 对象添加一个回调方法
输出结果为
loop.run_until_complete 是一个阻塞方法,只有当它里面的协程运行结束以后这个方法才结束,才会运行之后的代码。
其实也可以不调用 loop.run_until_complete 方法,创建一个 task 以后,其实就已经在跑协程函数了,只不过当事件循环如果准备开始运行了,此时的 task 状态是 pending,如果不调用事件循环的话,则不会运行协程函数,由于主线程跑完了,子线程也就被销毁了,如代码写成这样:
得到的输出是
所以想要使得协程函数得到执行,需要调用事件循环来执行任务,上面的 loop.run_until_complete 就是使循环开始跑了,其实也可以使用 loop.run_forever(),这个函数就像它的名字一样,会一直跑。只有事件循环跑起来了,那么使用该循环注册的协程才会得到执行,但是如果使用 loop.run_forever()则会阻塞在这里,事件循环还有一个 stop 方法,可以结束循环,我们可以在 task 对象上添加一个回调方法,当协程执行结束以后,调用事件循环的 stop 方法来结束整个循环。
除了使用 loop.run_until_complete 方法,还可以使用 asyncio.ensure_future() 方法来运行协程,将上面代码中的 task = loop.create_task(asyncfunc1()) 改为 task = asyncio.ensure_future(asyncfunc1())会得到相同的结果,它的参数是协程对象或者 futures,也可以传 task 对象,因为 task 是 futures 的子类,当传入的是一个协程对象时,返回一个 task 对象,传入一个 futures 的时候,直接返回 futures 对象,也就是说,在调用 asyncio.ensure_future()以后,都会返回一个 task 对象,都可以为它添加一个回调方法,并且可以调用 task.result()方法得到结果(注意如果 task 没有执行结束就调用 result 方法,则会抛异常)。
多个协程任务的并行
最上面我准备了两个异步的函数 asyncfunc1 和 asyncfunc2,如果我想要这两个函数同时执行,并且得到它们的返回值该怎么操作呢?
有了上面单协程的经验,我们也可以使用事件循环创建两个 task,然后在 run_forever()来执行,可以对 task 添加回调,将结果输出。
输出结果是
此时由于 loop 调用了 run_forever 方法,且没有方法调用 stop 方法,所以程序会一直卡着。
这样是可以将多个协程跑起来,但这样的处理一是繁琐,二是不方便结果的回收。
asyncio 有一个 gather 方法,可以传入多个任务对象,当调用 await asyncio.gather(*) 时,它会将结果全部返回。
由于 await 只能写在 async def 函数中,所以这里还需要新创建一个函数。
两种定义方式都可以,一个是向 gather 函数传的是协程对象,一个是传的 task 对象。之后在调用
得到的输出为
这样就达到的协程的并行与结果的回收。
依然是之前准备的三个函数,一个阻塞的,两个异步的。
使用传统的多线程的方式跑同步代码
输出结果
可以看到,主线程和子线程跑在了不同的线程中。
在事件循环中动态的添加同步函数
解决方案是,先启一个子线程,这个线程用来跑事件循环 loop,然后动态的将同步函数添加到事件循环中
由于使用 ping 命令得到很多输出,所以我对函数稍稍做了修改,只是模拟打印了一行文字,但是函数中的 time.sleep(2) 这个是一个阻塞式的函数。
得到的输出为
从输出结果可以看出,loop.call_soon_threadsafe()和主线程不是跑在同一个线程中的,虽然 loop.call_soon_threadsafe()没有阻塞主线程的运行,但是由于需要跑的函数 ping 是阻塞式函数,所以调用了三次,这三次结果是顺序执行的,并没有实现并发。
如果想要实现并发,需要通过 run_in_executor 把同步函数在一个执行器里去执行。该方法需要传入三个参数,run_in_executor(self, executor, func, *args) 第一个是执行器,默认可以传入 None,如果传入的是 None,将使用默认的执行器,一般执行器可以使用线程或者进程执行器。
得到的输出结果
可以看到同步函数实现了并发,但是它们跑在了不同的线程中,这个就和之前传统的使用多线程是一样的了。
上文说到,run_in_executor 的第一个参数是执行器,这里执行器是使用 concurrent.futures 下的两个类,一个是 thread 一个是 process,也就是执行器可以分为线程执行器和进程执行器。它们在初始化的时候都有一个 max_workers 参数,如果不传则根据系统自身决定。
这里初始化了两个执行器,一个是线程的,一个是进程的,它们执行的效果一样,只是一个跑在了多线程,一个跑在了多进程。
使用 concurrent.futures.ThreadPoolExecutor()执行器的结果是
这们的进程 ID 都是 8188,是跑在了同一个进程下。另外注意一下,我这里在初始化的时候传一个 max_workers 为 2,注意看结果的输出,它是先执行了前两个,当有一个执行完了以后再开始执行第三个,而不是三个同时运行的。
使用 concurrent.futures.ProcessPoolExecutor()执行器的执行结果
可以看出来它们的进程 ID 是不同的。
这样看使用 run_in_executor 和使用多进程和多线程其实意义是一样的。别着急,在讲完异步函数以后就可以看到区别了。
在事件循环中动态的添加异步函数
通过 asyncio.run_coroutine_threadsafe 方法来动态的将一个协程绑定到事件循环上,并且不会阻塞主线程
通过 asyncio.run_coroutine_threadsafe 在 loop 上绑定了四个协程函数,得到的输出结果为
主线程不会被阻塞,起的四个协程函数几乎同时返回的结果,但是注意,协程所在的线程和主线程不是同一个线程,因为此时事件循环 loop 是放到了另外的子线程中跑的,所以此时这四个协程放到事件循环的线程中运行的。
注意这里只有 run_coroutine_threadsafe 方法,没有 run_coroutine_thread 方法。
获取协程的返回结果
获取结果可以使用 asyncio.gather()方法,这里面传的是 coros_or_futures 就是协程或者 task 对象,asyncio.run_coroutine_threadsafe()和 run_in_executor()返回的都是 Future 对象,所以可以将它们共同放到 gather 里,获取返回值
代码执行结果:
总的时间是取决于所有运行的函数中耗时最长的,这里同步函数有个阻塞的 sleep(4) ,所以总的时间是 4 秒多一点点。
关于在异步协程中的处理流程先总结这么多,之后再学习总结一个与异步相关的各种库如 aiohttp 的使用等等。
更多学习资料获取点击下方
评论