写点什么

Python 协程

用户头像
若尘
关注
发布于: 2021 年 05 月 18 日
Python 协程

协程

  • 参考资料

  • http://python.jobbole.com/86481/

  • http://python.jobbole.com/87310/

  • http://segmentfault.com/a/1190000009781688

迭代器

  • 可迭代(Iterable):直接作用于 for 循环的变量

  • 迭代器(Iterator):不但可以作用于 for 循环,还可以被 next 调用

  • list 是典型的可迭代对象,但不是迭代器

  • 通过 isinstance 判断

  • iterable 和 iterator 可以转换

  • 通过 iter 函数


# 可迭代l = [i for i in range(10)]# l是可迭代的,但不是迭代器for idx in l:    print(idx)    # range是个迭代器for i in range(5):    print(i)
复制代码


012345678901234
复制代码


# isinstance案例# 判断某个变量是否是一个实例
# 判断是否可迭代from collections import Iterablell = [1, 2, 3, 4, 5]
print(isinstance(ll, Iterable))
from collections import Iteratorprint(isinstance(ll, Iterator))
复制代码


TrueFalse
复制代码


# iter函数
s = 'i love you'
print(isinstance(s, Iterable))print(isinstance(s, Iterator))
s_iter = iter(s)print(isinstance(s_iter, Iterable))print(isinstance(s_iter, Iterator))
复制代码


TrueFalseTrueTrue
复制代码

生成器

  • generator:一边循环一边计算下一个元素的机制/算法

  • 需要满足三个条件

  • 每次调用都生产出 for 循环需要的下一个元素或者

  • 如果达到最后一个后,爆出 StopIteration 异常

  • 可以被 next 函数调用

  • 如何生成一个生成器

  • 直接使用

  • 如果函数中包含 yield,则这个函数就叫生成器

  • next 调用函数,遇到 yield 返回


# 直接使用生成器
L = [x*x for x in range(5)] # 放在中括号中是列表生成器g = (x*x for x in range(5)) # 放在小括号中就是生成器
print(type(L))print(type(g))
复制代码


<class 'list'><class 'generator'>
复制代码


# 函数案例
def odd(): print("Step 1") print("Step 2") print("Step 3") return None
odd()
复制代码


Step 1Step 2Step 3
复制代码


# 生成器的案例# 在函数odd中, yield负责返回def odd():    print("Step 1")    yield 1    print("Step 2")    yield 2    print("Step 3")    yield 3
# odd() 是调用生成器g = odd()
one = next(g)print(one)
two = next(g)print(two)

three = next(g)print(three)
复制代码


Step 11Step 22Step 33
复制代码


# for循环调用生成器# 斐波那契数列def fib(max):    n, a, b = 0, 0, 1 # 注意写法    while n < max:        print(b)        a, b = b, a+b # 注意写法        n += 1            return 'Done'
fib(5)
复制代码


11235




'Done'
复制代码


# 斐波那契数列的生成器写法def fib(max):    n, a, b = 0, 0, 1 # 注意写法    while n < max:        yield b        a, b = b, a+b # 注意写法        n += 1           # 需要注意,爆出异常时的返回值是return的返回值    return 'Done'
g = fib(5)
for i in range(6): rst = next(g) print(rst)
复制代码


11235


---------------------------------------------------------------------------
StopIteration Traceback (most recent call last)
<ipython-input-34-71c1fd16a38d> in <module> 13 14 for i in range(6):---> 15 rst = next(g) 16 print(rst)

StopIteration: Done
复制代码


ge = fib(10)'''生成器的典型用法是在for中使用比较常用的典型生成器就是range'''for i in ge: # 在for循环中使用生成器    print(i)
复制代码


11235813213455
复制代码

协程

  • 历史历程

  • 3.4 引入协程,用 yield 实现

  • 3.5 引入协程语法

  • 实现的协程比较好的包有 asyncio,tornado,gevent

  • 定义:协程 是为非抢占式多任务产生子程序的计算机程序组件,协程允许不同入口点在不用位置暂停或者执行程序。

  • 从技术角度讲,协程就是一个你可以暂停执行的函数,或者干脆把协程理解成生成器

  • 协程的实现:

  • yield 返回

  • send 调用

  • 协程的四个状态

  • inspect.getgeneratorstate(...) 函数确定,该函数会返回下述字符串中的一个:

  • GEN_CREATED:等待开始执行

  • GEN_RUNNING:解释器正在执行

  • GEN_SUSPENED:在 yield 表达式处暂停

  • GEN_CLOSED:执行结束

  • next 预激(prime)

  • 代码案例 v2

  • 协程终止

  • 协程中未处理的异常会向上冒泡,传给 next 函数或 send 方法的调用方(即触发协程的对象)

  • 终止协程的一种方式:发送某个哨符值,让协程退出。内置的 None 和 Ellipsis 等常量经常用作哨符值==。

  • yield from

  • 调用协程为了得到返回值,协程必须正常终止

  • 生成器正常终止会发出 StopIteration 异常,异常对象的 vlaue 属性保存返回值

  • yield from 从内部捕获 StopIterator 异常

  • 案例 v03

  • 委派生成器

  • 包含 yield from 表达式的生成器函数

  • 委派生成器在 yield from 表达式处暂停,调用方可以直接把数据发给子生成器

  • 子生成器再把产出的值发给调用方

  • 子生成器在最后,解释器会抛出 StopIteration,并且把返回值附加到异常对象上

  • 案例 v04


# 协程代码案例1
def simple_coroutine(): print('-> start') x = yield print('-> recived', x) # 主线程sc = simple_coroutine()print(1111)# 可以使用sc.send(None), 效果一样next(sc) # 预激
print(2222)sc.send('zhuxiao')
复制代码


1111-> start2222-> recived zhuxiao


---------------------------------------------------------------------------
StopIteration Traceback (most recent call last)
<ipython-input-38-daec0bf83892> in <module> 13 14 print(2222)---> 15 sc.send('zhuxiao')

StopIteration:
复制代码


# 案例v2,协程的状态
def simple_coroutine(a): print('-> start') b = yield a print('-> recived', a, b) c = yield a + b print('-> recived', a, b, c) # runsc = simple_coroutine(5)
aa = next(sc)print(aa)bb = sc.send(6) # 5, 6print(bb)cc = sc.send(7) # 5, 6, 7print(cc)
复制代码


-> start5-> recived 5 611-> recived 5 6 7


---------------------------------------------------------------------------
StopIteration Traceback (most recent call last)
<ipython-input-40-1f54596be296> in <module> 17 bb = sc.send(6) # 5, 6 18 print(bb)---> 19 cc = sc.send(7) # 5, 6, 7 20 print(cc)

StopIteration:
复制代码


# 案例v03
def gen(): for c in 'AB': yield c # list直接用生成器作为参数print(list(gen()))
def gen_new(): yield from 'AB' print(list(gen_new()))
复制代码


['A', 'B']['A', 'B']
复制代码


# 案例v04,委派生成器
from collections import namedtuple
'''解释:1. 外层 for 循环每次迭代会新建一个 grouper 实例,赋值给 coroutine 变量;grouper 是委派生成器。2. 调用 next(coroutine), 预激委派生成器 grouper,此时进入 while True 循环,调用子生成器 averager 后,在 yield from 表达式处暂停。3. 内层 for 循环调用 coroutine.send(value), 直接把值传给子生成器 averager。同时,当前的 grouper 实例(coroutine)在 yield from 表达式处暂停。4. 内层循环结束后,grouper 实例依旧在 yield form 表达式处暂停。因此,grouper 函数定义体中为 results[key] 赋值的语句还没有执行。5. coroutine.send(None) 终止 averager 子生成器,子生成器抛出 StopIteration 异常并将返回的数据包含在异常对象的 value 中,yield from 可以直接抓取 StopIteration 异常并将异对象的 value 赋值给 results[key].'''ResClass = namedtuple('Res', 'count average')
# 子生成器def averager(): total = 0.0 count = 0 average = None while True: term = yield # None是哨兵值 if term is None: break total += term count += 1 average = total / count return ResClass(count, average)
# 委派生成器def grouper(storages, key): while True: # 获取average()返回的值 storages[key] = yield from averager() # 客户端代码def client(): process_data = { 'boys_2': [39.0, 40.8, 43.5, 43.6, 38.5, 49.5, 34.5, 43.5, 34.7], 'boys_1': [1.78, 1.79, 1.43, 1.23, 1.34, 1.54, 1.43, 1.53, 1.89] } storages = {} for k, v in process_data.items(): # 获得协程 coroutine = grouper(storages, k) # 预激协程 next(coroutine) # 发送数据到协程 for dt in v: coroutine.send(dt) # 终止协程 coroutine.send(None) print(storages) # runclient()
复制代码


{'boys_2': Res(count=9, average=40.84444444444444), 'boys_1': Res(count=9, average=1.551111111111111)}
复制代码

asyncio

  • python3.4 开始引入标准库当中,内置对异步 io 的支持

  • asyncio 本身是一个消息循环

  • 步骤:

  • 创建消息循环

  • 把协程导入

  • 关闭


import threading# 引入异步io包import asyncio
# 使用协程@asyncio.coroutinedef hello(): print('Hello world! (%s)' % threading.currentThread()) print('Start.... (%s)' % threading.currentThread()) yield from asyncio.sleep(5) print('Done.... (%s)' % threading.currentThread()) print('Hello again! (%s)' % threading.currentThread()) # 启动消息循环loop = asyncio.get_event_loop()# 定义任务tasks = [hello(), hello()]# asyncio使用wait等待task执行完毕loop.run_until_complete(asyncio.wait(tasks))# 关闭消息循环loop.close()
复制代码


---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
<ipython-input-8-377939537d5d> in <module> 17 tasks = [hello(), hello()] 18 # asyncio使用wait等待task执行完毕---> 19 loop.run_until_complete(asyncio.wait(tasks)) 20 # 关闭消息循环 21 loop.close()

D:\Anaconda3\lib\asyncio\base_events.py in run_until_complete(self, future) 569 future.add_done_callback(_run_until_complete_cb) 570 try:--> 571 self.run_forever() 572 except: 573 if new_task and future.done() and not future.cancelled():

D:\Anaconda3\lib\asyncio\base_events.py in run_forever(self) 524 self._check_closed() 525 if self.is_running():--> 526 raise RuntimeError('This event loop is already running') 527 if events._get_running_loop() is not None: 528 raise RuntimeError(

RuntimeError: This event loop is already running

Hello world! (<_MainThread(MainThread, started 22472)>)Start.... (<_MainThread(MainThread, started 22472)>)Hello world! (<_MainThread(MainThread, started 22472)>)Start.... (<_MainThread(MainThread, started 22472)>)Done.... (<_MainThread(MainThread, started 22472)>)Hello again! (<_MainThread(MainThread, started 22472)>)Done.... (<_MainThread(MainThread, started 22472)>)Hello again! (<_MainThread(MainThread, started 22472)>)
复制代码


import asyncio
@asyncio.coroutinedef wget(host): print('wget %s...' % host) # 异步请求网络地址 connect = asyncio.open_connection(host, 80) # 注意yield from的用法 reader, writer = yield from connect header = 'Get / HTTP/1.0\r\nHost: %s\r\n\r\n' % host write.write(header.encode('utf-8')) yield from write.drain() while True: line = yield from reader.readline() # http协议的换行使用\r\n if line == b'\r\n': break print('%s header > %s' % (host, line.decode('utf-8').rstrip())) # Ignore the body, close the socket write.close() loop = asyncio.get_event_loop()tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']]loop.run_until_complete(asyncio.wait(tasks))loop.close()
复制代码

asyn and await

  • 为了更好的表示异步 io

  • python3.5 引入

  • 让协程代码更简洁

  • 使用上,可以简单的进行替换

  • 用 async 替换 @asyncio.coroutine

  • 用 await 替换 yield from


import threading# 引入异步io包import asyncio
# 使用协程# @asyncio.coroutineasync def hello(): print('Hello world! (%s)' % threading.currentThread()) print('Start.... (%s)' % threading.currentThread()) await asyncio.sleep(5) print('Done.... (%s)' % threading.currentThread()) print('Hello again! (%s)' % threading.currentThread()) # 启动消息循环loop = asyncio.get_event_loop()# 定义任务tasks = [hello(), hello()]# asyncio使用wait等待task执行完毕loop.run_until_complete(asyncio.wait(tasks))# 关闭消息循环loop.close()
复制代码


---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
<ipython-input-10-ceee64b7a419> in <module> 17 tasks = [hello(), hello()] 18 # asyncio使用wait等待task执行完毕---> 19 loop.run_until_complete(asyncio.wait(tasks)) 20 # 关闭消息循环 21 loop.close()

D:\Anaconda3\lib\asyncio\base_events.py in run_until_complete(self, future) 569 future.add_done_callback(_run_until_complete_cb) 570 try:--> 571 self.run_forever() 572 except: 573 if new_task and future.done() and not future.cancelled():

D:\Anaconda3\lib\asyncio\base_events.py in run_forever(self) 524 self._check_closed() 525 if self.is_running():--> 526 raise RuntimeError('This event loop is already running') 527 if events._get_running_loop() is not None: 528 raise RuntimeError(

RuntimeError: This event loop is already running

Hello world! (<_MainThread(MainThread, started 22472)>)Start.... (<_MainThread(MainThread, started 22472)>)Hello world! (<_MainThread(MainThread, started 22472)>)Start.... (<_MainThread(MainThread, started 22472)>)Done.... (<_MainThread(MainThread, started 22472)>)Hello again! (<_MainThread(MainThread, started 22472)>)Done.... (<_MainThread(MainThread, started 22472)>)Hello again! (<_MainThread(MainThread, started 22472)>)
复制代码

aiohttp

  • asyncio 实现单线程的并发 io,在客户端用处不大

  • 在服务器端可以 asyncio+coroutine 配合,因为 http 是 io 操作

  • asyncio 实现了 tcp,udp,ssl 等协议

  • aiohttp 是给予 asyncio 实现的 http 框架

  • pip install aiohttp 安装


# aiohttp案例
import asyncio
from aiohttp import web
async def index(request): await asyncio.sleep(0.5) return web.Response(body=b'<h1>Index</h1>')
async def hello(request): await asyncio.sleep(0.5) text = '<h1>hello, %s!</h1>' % request.match_info['name'] return web.Response(body=text.encode('utf-8'))
async def init(loop): app = web.Application(loop=loop) app.router.add_route('GET', '/', index) app.router.add_route('GET', '/hello/{name}', hello) srv = await loop.create_serve(app.make_handler(), '127.0.0.1', 8000) print('Serve started at http://127.0.0.1:8000...') return srv
loop = asyncio.get_event_loop()loop.run_until_complete(init(loop))loop.run_forever()
复制代码


---------------------------------------------------------------------------
ModuleNotFoundError Traceback (most recent call last)
<ipython-input-4-e9c02c1f95af> in <module> 3 import asyncio 4 ----> 5 from aiohttp import web 6 7 async def index(request):

ModuleNotFoundError: No module named 'aiohttp'
复制代码

concurrent.futures

  • python3 新增的库

  • 类似其他语言的线程池的概念

  • 利用 multiprocessiong 实现真正的并行计算

  • 核心原理:以子进程的形式,并行运行多个 python 解释器,从而令 python 程序可以利用多核 CPU 来提升运行速度。由于子进程与主解释器相分离,所以他们的全局解释器锁也是相互独立的。每个子进程都能够完整的使用一个 CPU 内核。

  • concurrent.futures.Executor

  • ThreadPoolExecutor

  • ProcessPoolExecutor

  • 执行的时候需要自行选择

  • sunmit(fn, args, kwargs)

  • fn: 异步执行的函数

  • args, kwargs 参数


# 关于concurrent的案例from concurrent.futures import ThreadPoolExecutorimport time
def return_future(msg): time.sleep(3) return msg
# 创建一个线程池pool = ThreadPoolExecutor(max_workers=2)
# 往线程池加入2个taskf1 = pool.submit(return_future, 'hello')f2 = pool.submit(return_future, 'world')
# 等待执行完毕print(f1.done())time.sleep(3)print(f2.done())
# 结果print(f1.result())print(f2.result())
复制代码


FalseTruehelloworld
复制代码

current 中 map 函数

  • map(fn, *iterables, timeout=None)

  • 跟 map 函数类似

  • 函数需要异步执行

  • timeout:超时时间

  • map 跟 submit 使用一个就行


# map案例
import time,reimport os,datetimefrom concurrent import futures
data = ['1', '2']
def wait_on(argument): print(argument) time.sleep(2) return "OK"
ex = futures.ThreadPoolExecutor(max_workers=2)for i in ex.map(wait_on, data): print(i)
复制代码


12OKOK
复制代码


发布于: 2021 年 05 月 18 日阅读数: 9
用户头像

若尘

关注

还未添加个人签名 2021.01.11 加入

还未添加个人简介

评论

发布
暂无评论
Python 协程