本文首发于:行者AI
在近期的编码工作过程中遇到了 async 和 await 装饰的函数,查询资料后了解到这种函数是基于协程的异步函数。这类编程方式称为异步编程,常用在 IO 较频繁的系统中,如:Tornado web 框架、文件下载、网络爬虫等应用。协程能够在 IO 等待时间就去切换执行其他任务,当 IO 操作结束后再自动回调,那么就会大大节省资源并提供性能。接下来便简单的讲解一下异步编程相关概念以及案例演示。
1. 协程简介
1.1 协程的含义及实现方法
协程(Coroutine),也可以被称为微线程,是一种用户态内的上下文切换技术。简而言之,其实就是通过一个线程实现代码块相互切换执行。例如:
def func1():
print(1)
... # 协程介入
print(2)
def func2():
print(3)
... # 协程介入
print(4)
func1()
func2()
复制代码
上述代码是普通的函数定义和执行,按流程分别执行两个函数中的代码,并先后会输出:1、2、3、4
。但如果介入协程技术那么就可以实现函数见代码切换执行,最终输入:1、3、2、4
。
在 Python 中有多种方式可以实现协程,例如:
greenlet,是一个第三方模块,用于实现协程代码(Gevent 协程就是基于 greenlet 实现);
yield,生成器,借助生成器的特点也可以实现协程代码;
asyncio,在 Python3.4 中引入的模块用于编写协程代码;
async & awiat,在 Python3.5 中引入的两个关键字,结合 asyncio 模块可以更方便的编写协程代码。
前两种实现方式较为老旧,所以重点关注后面的方式
标准库实现方法
asyncio 是 Python 3.4 版本引入的标准库,直接内置了对异步 IO 的支持。
import asyncio
@asyncio.coroutine
def func1():
print(1)
yield from asyncio.sleep(2) # 遇到IO耗时操作,自动化切换到tasks中的其他任务
print(2)
@asyncio.coroutine
def func2():
print(3)
yield from asyncio.sleep(2) # 遇到IO耗时操作,自动化切换到tasks中的其他任务
print(4)
tasks = [
asyncio.ensure_future( func1() ),
asyncio.ensure_future( func2() )
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
复制代码
关键字实现方法
async & await
关键字在 Python3.5 版本中正式引入,代替了asyncio.coroutine
装饰器,基于他编写的协程代码其实就是上一示例的加强版,让代码可以更加简便可读。
import asyncio
async def func1():
print(1)
await asyncio.sleep(2) # 耗时操作
print(2)
async def func2():
print(3)
await asyncio.sleep(2) # 耗时操作
print(4)
tasks = [
asyncio.ensure_future(func1()),
asyncio.ensure_future(func2())
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
复制代码
1.2 案例演示
例如:用代码实现下载 url_list
中的图片。
# requests库仅支持同步的http网络请求
import requests
def download_image(url):
print("开始下载:",url)
# 发送网络请求,下载图片
response = requests.get(url)
# 图片保存到本地文件
file_name = url.rsplit('_')[-1]
with open(file_name, mode='wb') as file_object:
file_object.write(response.content)
print("下载完成")
if __name__ == '__main__':
url_list = [
'https://www.1.jpg',
'https://www.2.jpg',
'https://www.3.jpg'
]
for item in url_list:
download_image(item)
复制代码
输出:按顺序发送请求,请求一次下载一张图片,假如每次下载花费 1s,完成任务需要 3s 以上。
# aiohttp 为支持异步编程的http请求库
import aiohttp
import asyncio
async def fetch(session, url):
print("发送请求:", url)
async with session.get(url, verify_ssl=False) as response:
content = await response.content.read()
file_name = url.rsplit('_')[-1]
with open(file_name, mode='wb') as file_object:
file_object.write(content)
async def main():
async with aiohttp.ClientSession() as session:
url_list = [
'https://www.1.jpg',
'https://www.2.jpg',
'https://www.3.jpg'
]
tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]
await asyncio.wait(tasks)
if __name__ == '__main__':
asyncio.run(main())
复制代码
输出:一次发送三个下载请求,同时下载,假如每次下载花费 1s,完成任务仅需要 1s 左右,第一种方法的耗时为第二种的三倍。
1.3 小结
协程可以让原来要使用异步+回调方式写的非人类代码,用看似同步的方式写出来。
2. 异步编程简介
2.1 同步和异步的区别
同步 :循序渐进执行操作、请求异步:无需等待上一步操作、请求完成,就开始下一步(每个操作仍然有先后顺序)
目前 python 异步相关的主流技术是通过包含关键字 async&await 的 async 模块实现。
2.2 异步编程-事件循环
事件循环,可以把他当做是一个 while 循环,这个 while 循环在周期性的运行并执行一些任务,在特定条件下终止循环。
# 伪代码
任务列表 = [ 任务1, 任务2, 任务3,... ]
while True:
可执行的任务列表,已完成的任务列表 = 去任务列表中检查所有的任务,将'可执行'和'已完成'的任务返回
for 就绪任务 in 已准备就绪的任务列表:
执行已就绪的任务
for 已完成的任务 in 已完成的任务列表:
在任务列表中移除 已完成的任
如果 任务列表 中的任务都已完成,则终止循环
复制代码
在编写程序时候可以通过如下代码来获取和创建事件循环。
# 方式一:
import asyncio
# 生成或获取一个事件循环
loop = asyncio.get_event_loop()
# 将任务添加到事件循环中
loop.run_until_complete(任务)
# 方式二(python3.7及以上版本支持):
asyncio.run( 任务 )
复制代码
2.3 异步编程-快速上手
async 关键字
# 协程函数
async def func():
pass
# 协程对象
result = func()
复制代码
注意:执行协程函数只会创建协程对象,函数内部代码不会执行。如果想要运行协程函数内部代码,必须要将协程对象交给事件循环来处理。
import asyncio
async def func():
print("执行协程函数内部代码!")
result = func()
# 调用方法1:
# loop = asyncio.get_event_loop()
# loop.run_until_complete( result )
# 调用方法2:
asyncio.run( result )
复制代码
await 关键字
await + 可等待的对象(协程对象、Future、Task 对象 -> IO 等待),遇到 IO 操作挂起当前协程(任务),等 IO 操作完成之后再继续往下执行。当前协程挂起时,事件循环可以去执行其他协程(任务)。
import asyncio
async def others():
print("start")
await asyncio.sleep(2)
print('end')
return '返回值'
async def func():
print("执行协程函数内部代码")
# await等待对象的值得到结果之后再继续向下走
response = await others()
print("IO请求结束,结果为:", response)
asyncio.run( func() )
复制代码
Task 对象
Task 对象的作用是在事件循环中添加多个任务,用于并发调度协程,通过asyncio.create_task(协程对象)
的方式创建 Task 对象,这样可以让协程加入事件循环中等待被调度执行。
async def module_a():
print("start module_a")
await asyncio.sleep(2) # 模拟 module_a 的io操作
print('end module_a')
return 'module_a 完成'
async def module_b():
print("start module_b")
await asyncio.sleep(1) # 模拟 module_a 的io操作
print('end module_b')
return 'module_b 完成'
task_list = [
module_a(),
module_b(),
]
done,pending = asyncio.run( asyncio.wait(task_list) )
print(done)
复制代码
2.4 案例演示
例如:用代码实现连接并查询数据库的同时,下载一个 APK 文件到本地。
import asyncio
import aiomysql
import os
import aiofiles as aiofiles
from aiohttp import ClientSession
async def get_app():
url = "http://www.123.apk"
async with ClientSession() as session:
# 网络IO请求,获取响应
async with session.get(url)as res:
if res.status == 200:
print("下载成功", res)
# 磁盘IO请求,读取响应数据
apk = await res.content.read()
async with aiofiles.open("demo2.apk", "wb") as f:
# 磁盘IO请求,数据写入本地磁盘
await f.write(apk)
else:
print("下载失败")
async def excute_sql(sql):
# 网络IO操作:连接MySQL
conn = await aiomysql.connect(host='127.0.0.1', port=3306, user='root', password='123', db='mysql', )
# 网络IO操作:创建CURSOR
cur = await conn.cursor()
# 网络IO操作:执行SQL
await cur.execute(sql)
# 网络IO操作:获取SQL结果
result = await cur.fetchall()
print(result)
# 网络IO操作:关闭链接
await cur.close()
conn.close()
task_list = [get_app(), execute_sql(sql="SELECT Host,User FROM user")]
asyncio.run(asyncio.wait(task_list))
复制代码
代码逻辑分析:
【step1】asyncio.run()
创建了事件循环。wait()
方法将 task 任务列表加入到当前的事件循环中;(注意:必须先创建事件循环,后加入任务列表,否则会报错)
【step2】事件循环监听事件状态,开始执行代码,先执行列表中的get_app()
方法,当代码执行到async with session.get(url)as res:
时,遇到 await 关键字表示有 IO 耗时操作,线程会将该任务挂起在后台执行,并切换到另外一个异步函数excute_sql()
;
【step3】当代码执行到excute_sql()
的第一个 IO 耗时操作后,线程会重复先前的操作,将该任务挂起,去执行其他可执行代码。假如此时事件循环监听到get_app()
中的第一 IO 耗时操作已经执行完成,那么线程会切换到该方法第一个 IO 操作后的代码,并按顺序执行直到遇上下一个 await 装饰的 IO 操作;假如事件循环监听到excute_sql()
中的第一个 IO 操作先于get_app()
的第一个 IO 操作完成,那么线程会继续执行excute_sql
的后续代码;
【step4】线程会重复进行上述第 3 点中的步骤,直到代码全部执行完成,事件循环也会随之停止。
2.5 小节
一般来说 CPU 的耗时运算方式有:
计算密集型的操作:计算密集型任务的特点是要进行大量的计算、逻辑判断,消耗 CPU 资源,比如计算圆周率、对视频进行高清解码等等。
IO 密集型的操作:涉及到网络、磁盘 IO 的任务都是 IO 密集型任务,这类任务的特点是 CPU 消耗很少,任务的大部分时间都在等待 IO 操作完成(因为 IO 的速度远远低于 CPU 和内存的速度)。
异步编程基于协程实现,如果利用协程实现计算密集型操作,因为线程在上下文之间的来回切换总会经历类似于”计算“-->”保存“-->”创建新环境“ 的一系列操作,导致系统的整体性能反而会下降。所以异步编程并不适用于计算密集型的程序。然而在 IO 密集型操作汇总,协程在 IO 等待时间就去切换执行其他任务,当 IO 操作结束后再自动回调,那么就会大大节省资源并提供性能。
评论