写点什么

Python 笔记四之协程

作者:Hunter熊
  • 2024-01-28
    北京
  • 本文字数:3435 字

    阅读完需:约 11 分钟

Python笔记四之协程

本文首发于公众号:Hunter 后端

原文链接:Python笔记四之协程


协程是一种运行在单线程下的并发编程模型,它的特点是能够在一个线程内实现多个任务的并发操作,通过在执行任务时主动让出执行权,让其他任务继续执行,从而实现并发。


以下所有的代码都是在 Python 3.8 版本中运行。


本篇笔记目录如下:


  1. asyncioasyncawait

  2. 并发运行协程任务

  3. 获取协程返回结果

  4. asyncio.gather()

  5. 报错处理

  6. 超时处理

  7. 用协程的方式访问网络接口

1、asyncio

在 Python 中,协程使用 asyncio 模块来实现,asyncio 是用来编写并发代码的库,使用的 async/await 语法。

async

我们使用 async 做前缀将普通函数变成异步函数,比如:


import asyncio import time
async def say_after(delay, what): now = time.time() await asyncio.sleep(delay) print(what, " 花时间:", time.time() - now) return time.time(
async def main(): print("started at: ", time.strftime("%X")) await say_after(1, "hello") await say_after(2, "world") print("finished at: ", time.strftime("%X"))
asyncio.run(main())
复制代码


函数前加上 async 就将其变成了一个异步函数,在这里我们通过 asyncio.run() 的方式在外层调用异步函数。

await

在 main() 函数里,我们通过 await 的方式表示在异步函数,也就是 main 函数里暂停当前的操作,等待后面跟着的 say_after() 异步函数执行完成。


await say_after() 就是我们前面说过的在执行任务的时候主动让出执行权,让其他任务执行。

2、并发运行协程任务

在上面 main() 函数的两个 await say_after() 中,可以看到两次 print() 出来的时间差约为 3s,因为我们两次调用 say_after() 分别用了 1 秒和 2 秒时间,所以这两次 await 操作是暂停当前任务的串行执行。


如果我们想要实现协程的并发操作,可以使用 asyncio.create_task()


async def main():
task1 = asyncio.create_task(say_after(1, "hello")) task2 = asyncio.create_task(say_after(2, "hello")) print("started at: ", time.strftime("%X")) await task1 await task2 print("finished at: ", time.strftime("%X"))
asyncio.run(main())
# started at: 11:40:03# hello 花时间: 1.0013182163238525# hello 花时间: 2.001201868057251# finished at: 11:40:05
复制代码


say_after() 函数中,有一个 await asyncio.sleep() 的操作,它的作用是在协程中主动挂起当前任务一段时间,并将控制权返回给事件循环,允许其他协程继续执行。


它模拟了在协程中等待一定时间的行为,比如在协程中发起网络请求后,协程会挂起等待网络请求的响应返回,或者异步 IO 操作中的等待 IO 操作完成等。


所以在上面这个函数操作中,我们通过 asyncio.create_task() 将协程函数 say_after() 添加到事件循环中进行自动调度,并在合适的时机执行。


所以在上面的操作中,程序检测到 say_after() 中需要进行 sleep 的操作,就会自动对其进行调度,切换到事件循环的下一个任务执行,这样就实现了协程任务的并发操作。


也因此,程序执行的整体时间会比前面的操作快 1 秒左右。

获取协程返回结果

协程的返回结果直接在 await 前赋值即可:


result1 = await task1print(result1)
复制代码

asyncio.gather()

asyncio.gather() 也可以用于并发执行协程任务,但是与 asyncio.create_task() 略有不同。


create_task() 的操作是将协程函数添加到事件循环中进行调度,返回的是一个 Task 对象,而 gather() 则可以直接接收多个协程任务并发执行,并等待他们全部完成,返回 Future 对象表示任务结果。


gather() 的使用方法如下:


async def main():    results = await asyncio.gather(        say_after(1, "hello"),        say_after(2, "world"),    )
复制代码


asyncio.gather() 除了可以接收异步函数,还可以接受 asyncio.create_task() 返回的结果,也就是返回的 task 对象,比如下面的操作也是合法的:


async def main():    task = asyncio.create_task(say_after(1, "hello"))    results = await asyncio.gather(        say_after(1, "hello"),        say_after(2, "world"),        task,    )
复制代码

3、报错处理

如果在并发操作中有一些报错,比如下面的示例:


import asyncioimport time
async def say_after(delay, what): now = time.time() await asyncio.sleep(delay) print(what, " 花时间:", time.time() - now) return time.time()
async def say_error(delay, err_msg="error"): await asyncio.sleep(delay) raise Exception(err_msg

async def main(): results = await asyncio.gather( say_after(1, "hello"), say_error(2, "error"), say_after(3, "world"), )
print(results)
asyncio.run(main())
复制代码


在上面的操作中,三个协程函数,在执行到第二个的时候,程序其实就直接返回报错了,如果想要忽略报错继续执行之后的操作,可以加上 return_exceptions 参数,设为 True


async def main():    results = await asyncio.gather(        say_after(1, "hello"),        say_error(2, "error"),        say_after(3, "world"),        return_exceptions=True,    )        print(result)
# [1691045418.774685, Exception('error'), 1691045420.774549]
复制代码


这样就会将报错信息直接也返回,且执行之后的协程函数。

4、超时处理

我们可以为协程函数执行的时间预设一个时间,如果超出这个时间则返回报错信息,我们可以使用 asyncio.wait_for(),比如:


async def main_4():    results = await asyncio.gather(        say_after(1, "hello"),        say_error(2, "error"),        asyncio.wait_for(say_after(30, "world"), timeout=3),        return_exceptions=True,    )
print(results)# [1691045925.265661, Exception('error'), TimeoutError()]
复制代码


在上面的操作中,我们给第三个任务加了个 3 秒的超时处理,但是该协程会执行 30 秒,所以返回的报错里是一个 TimeoutError()

5、用协程的方式访问网络接口

接下来我们用协程的方式来访问一个接口,与不用协程的方式进行比对。


首先我们建立一个服务端,用 Django、Flask 都可以,只是提供一个访问接口,以下是用 Flask 建立的示例:


from flask import Flaskimport time
def create_app(): app = Flask(__name__) @app.route("/test") def test(): time.sleep(1) return str(time.time()) return app
复制代码


运行这段代码就提供了我们需要的服务器接口。


使用协程的方式访问接口我们这里用到的是 aiohttp,是第三方库,需要提前安装:


pip3 install aiohttp==3.8.5
复制代码


进行测试的脚本如下:


import asyncioimport aiohttpimport requestsimport time
CALL_TIMES = 10000

def connect_url(url): return requests.get(url)

def run_connect_url(url): results = [] for i in range(CALL_TIMES): result = connect_url(url) results.append(result) return results

async def connect_url_by_session(session, url): async with session.get(url) as response: return await response.text()

async def run_connect(url): async with aiohttp.ClientSession() as session: tasks = [] for i in range(CALL_TIMES): tasks.append(connect_url_by_session(session, url)) results = await asyncio.gather(*tasks) return results

if __name__ == "__main__": url = "http://127.0.0.1:5000/test"
t1 = time.time() run_connect_url(url) print(f"串行调用次数: {CALL_TIMES},耗时:{time.time() - t1}")
t2 = time.time() asyncio.run(run_connect(url)) print(f"协程调用次数:{CALL_TIMES},耗时:{time.time() - t2}")
复制代码


在这里,aiohttp 的具体用法看代码即可,我们可以通过修改 CALL_TIMES 来修改调用次数,我这里调用 1000 次和 10000 次的结果分别如下:


串行调用次数: 1000,耗时:3.2450389862060547协程调用次数:1000,耗时:1.3642120361328125
串行调用次数: 10000,耗时:32.830286741256714协程调用次数:10000,耗时:12.519049882888794
复制代码


可以看到使用协程的方式对于接口的访问效率有了明显的提升。

发布于: 刚刚阅读数: 4
用户头像

Hunter熊

关注

公众号:Hunter后端 2018-09-17 加入

Python后端工程师,欢迎互相沟通交流

评论

发布
暂无评论
Python笔记四之协程_Python_Hunter熊_InfoQ写作社区