引言
有小伙伴看了之前 Requests
自动重试的文章,在问有没有 aiohttp
aiohttp 特点
aiohttp
是一个异步的请求库,本身可以作为客户端也可以作为服务端对外提供服务。aiohttp
采用异步非阻塞模式,区别于传统同步库。发送多个 HTTP 请求时,不会因等待某个请求响应而阻塞整个程序,能同时处理多个请求,一旦有响应即可处理,极大提升效率与资源利用率。
重试
启用日志
首先导入日志包,开启所有的日志,但是我看了一下 aiohttp
好像没怎么打日志,开了日志也没有打印多少日志,代码见下面。
安装 aiohttp_retry
由于 aiohttp
本身没有自带重试相关的逻辑,所以这里需要安装专门为 aiohttp
设计的重试库,aiohttp_retry
。
使用 pip 安装,我这里使用 poetry 安装。
pip install aiohttp_retry
poetry add aiohttp_retry
复制代码
重试逻辑
aiohttp_retry
自带了多种重试方法,从指数重试到随机重试可以随意选择。
指数重试
import asyncio
import logging
from aiohttp_retry import RetryClient, ExponentialRetry
# 开启 urllib3 的日志,以便于查看重试过程
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s-%(filename)s-%(funcName)s-[%(lineno)d]-%(levelname)s-%(message)s')
logging.getLogger("aiohttp.access").setLevel(logging.DEBUG)
logging.getLogger("aiohttp.client").setLevel(logging.DEBUG)
logging.getLogger("aiohttp.internal").setLevel(logging.DEBUG)
logging.getLogger("aiohttp.server").setLevel(logging.DEBUG)
logging.getLogger("aiohttp.web").setLevel(logging.DEBUG)
logging.getLogger("aiohttp.websocket").setLevel(logging.DEBUG)
logging.getLogger('aiohttp_retry').setLevel(logging.DEBUG)
loop = asyncio.new_event_loop()
loop.set_debug(True)
async def main():
retry_options = ExponentialRetry(attempts=3, start_timeout=1)
retry_client = RetryClient(raise_for_status=True, retry_options=retry_options)
try:
async with retry_client.get('https://httpbin.org/status/503') as response:
print((await response.text())[:100])
except Exception as exc:
print(exc)
finally:
await retry_client.close()
asyncio.run(main())
复制代码
使用 ExponentialRetry
指定重试次数,默认参数是 0.1 秒进行指数重试,使用 RetryClient
中的 raise_for_status
可以指定是否在指定的状态码进行重试,这里设置为 True
,所以默认会重试三次。为了明显看出效果,这里将 start_timeout
设置成 1 秒,运行结果:
2024-11-21 21:17:04,644-selector_events.py-__init__-[54]-DEBUG-Using selector: KqueueSelector
2024-11-21 21:17:04,644-selector_events.py-__init__-[54]-DEBUG-Using selector: KqueueSelector
2024-11-21 21:17:04,644-client.py-_do_request-[110]-DEBUG-Attempt 1 out of 3
2024-11-21 21:17:06,374-client.py-_do_request-[151]-DEBUG-Retrying after response code: 503
2024-11-21 21:17:08,377-client.py-_do_request-[110]-DEBUG-Attempt 2 out of 3
2024-11-21 21:17:08,636-client.py-_do_request-[151]-DEBUG-Retrying after response code: 503
2024-11-21 21:17:12,637-client.py-_do_request-[110]-DEBUG-Attempt 3 out of 3
503, message='SERVICE UNAVAILABLE', url=URL('https://httpbin.org/status/503')
2024-11-21 21:17:13,058-base_events.py-close-[672]-DEBUG-Close <_UnixSelectorEventLoop running=False closed=False debug=True>
复制代码
可以看到,重试间隔从 1 秒到 2 秒到 4 秒成指数递增。
指定 session
注意,上面的代码中,RetryClient
会在没有传入 session
的时候使用默认的 session
,如果想要使用自定义的 session
的话,需要手动传入你自己的 session。
loop = asyncio.new_event_loop()
loop.set_debug(True)
session = ClientSession()
async def main():
retry_options = ExponentialRetry(attempts=3)
retry_client = RetryClient(raise_for_status=True, retry_options=retry_options, client_session=session)
try:
async with retry_client.get('https://httpbin.org/status/503') as response:
print((await response.text())[:100])
except Exception as exc:
print(exc)
finally:
await retry_client.close()
await session.close()
asyncio.run(main())
复制代码
传输指定的 session
可以自定义 headers
和 cookie
等参数,这里就不演示了。
随机重试
async def main():
retry_options = RandomRetry(attempts=3, random_func=random.random)
retry_client = RetryClient(raise_for_status=True, retry_options=retry_options)
try:
async with retry_client.get('https://httpbin.org/status/503') as response:
print((await response.text())[:100])
except Exception as exc:
print(exc)
finally:
await retry_client.close()
asyncio.run(main())
复制代码
可以通过 RandomRetry
来进行随机时间后重试,随机数函数可以自定义。
跟踪请求
可以在 http 请求在发送的整个生命周期中跟踪它,并且修改它的数据。此方法是 aiohttp
原生提供的,aiohttp_retry
只是对它进行了透传。
loop = asyncio.new_event_loop()
loop.set_debug(True)
retry_options = ExponentialRetry(attempts=3)
async def on_request_start(
session: ClientSession,
trace_config_ctx: SimpleNamespace,
params: TraceRequestStartParams,
) -> None:
# 可以在这里修改请求数据
logger.debug('on_request_start')
async def on_connection_create_end(
session: ClientSession,
trace_config_ctx: SimpleNamespace,
params: TraceRequestStartParams,
) -> None:
# 可以在这里修改请求数据
logger.debug('on_connection_create_end')
async def connection_reuseconn(
session: ClientSession,
trace_config_ctx: SimpleNamespace,
params: TraceRequestStartParams,
) -> None:
# 可以在这里修改请求数据
logger.debug('connection_reuseconn')
async def main():
trace_config = TraceConfig()
trace_config.on_request_start.append(on_request_start)
trace_config.on_connection_create_end.append(on_connection_create_end)
trace_config.on_connection_reuseconn.append(connection_reuseconn)
retry_client = RetryClient(retry_options=retry_options, trace_configs=[trace_config])
try:
async with retry_client.get('https://httpbin.org/status/503') as response:
print((await response.text())[:100])
except Exception as exc:
print(exc)
finally:
await retry_client.close()
asyncio.run(main())
复制代码
运行结果:
2024-11-21 21:43:58,362-selector_events.py-__init__-[54]-DEBUG-Using selector: KqueueSelector
2024-11-21 21:43:58,362-selector_events.py-__init__-[54]-DEBUG-Using selector: KqueueSelector
2024-11-21 21:43:58,362-client.py-_do_request-[110]-DEBUG-Attempt 1 out of 3
2024-11-21 21:43:58,363-test7.py-on_request_start-[32]-DEBUG-on_request_start
2024-11-21 21:43:59,268-test7.py-on_connection_create_end-[41]-DEBUG-on_connection_create_end
2024-11-21 21:43:59,764-client.py-_do_request-[151]-DEBUG-Retrying after response code: 503
2024-11-21 21:43:59,965-client.py-_do_request-[110]-DEBUG-Attempt 2 out of 3
2024-11-21 21:43:59,966-test7.py-on_request_start-[32]-DEBUG-on_request_start
2024-11-21 21:43:59,966-test7.py-connection_reuseconn-[50]-DEBUG-connection_reuseconn
2024-11-21 21:44:00,209-client.py-_do_request-[151]-DEBUG-Retrying after response code: 503
2024-11-21 21:44:00,611-client.py-_do_request-[110]-DEBUG-Attempt 3 out of 3
2024-11-21 21:44:00,620-test7.py-on_request_start-[32]-DEBUG-on_request_start
2024-11-21 21:44:00,621-test7.py-connection_reuseconn-[50]-DEBUG-connection_reuseconn
2024-11-21 21:44:00,869-base_events.py-close-[672]-DEBUG-Close <_UnixSelectorEventLoop running=False closed=False debug=True>
复制代码
从日志上可以看出,请求开始,请求结束,连接复用日志都被打印出来了。当然,跟踪请求的方式不止这些,还有更多的可以查看文档。此方法可以用来调试请求,也可以用来在发生异常的时候修改 headers
、body
、params
等等,或许可以修改代理?先挖个坑哈哈,后面再填。
更换 params 重试
loop = asyncio.new_event_loop()
loop.set_debug(True)
retry_options = ExponentialRetry(attempts=3)
async def on_request_start(
session: ClientSession,
trace_config_ctx: SimpleNamespace,
params: TraceRequestStartParams,
) -> None:
# 可以在这里修改请求数据
logger.debug('on_request_start')
logger.debug(params.headers)
async def on_connection_create_end(
session: ClientSession,
trace_config_ctx: SimpleNamespace,
params: TraceRequestStartParams,
) -> None:
# 可以在这里修改请求数据
logger.debug('on_connection_create_end')
async def connection_reuseconn(
session: ClientSession,
trace_config_ctx: SimpleNamespace,
params: TraceRequestStartParams,
) -> None:
# 可以在这里修改请求数据
logger.debug('connection_reuseconn')
async def main():
trace_config = TraceConfig()
trace_config.on_request_start.append(on_request_start)
trace_config.on_connection_create_end.append(on_connection_create_end)
trace_config.on_connection_reuseconn.append(connection_reuseconn)
retry_client = RetryClient(retry_options=retry_options, trace_configs=[trace_config])
try:
async with retry_client.requests(
params_list=[
RequestParams(method="POST", url='http://httpbin.org/status/503', headers={'headers': 'headers1'}),
RequestParams(method="POST", url='http://httpbin.org/status/503', headers={'headers': 'headers2'})
]
) as response:
print((await response.text())[:100])
except Exception as exc:
print(exc)
finally:
await retry_client.close()
asyncio.run(main())
复制代码
运行结果:
2024-11-21 22:05:26,582-selector_events.py-__init__-[54]-DEBUG-Using selector: KqueueSelector
2024-11-21 22:05:26,582-selector_events.py-__init__-[54]-DEBUG-Using selector: KqueueSelector
2024-11-21 22:05:26,582-client.py-_do_request-[110]-DEBUG-Attempt 1 out of 3
2024-11-21 22:05:26,582-test7.py-on_request_start-[32]-DEBUG-on_request_start
2024-11-21 22:05:26,582-test7.py-on_request_start-[33]-DEBUG-<CIMultiDict('headers': 'headers1')>
2024-11-21 22:05:26,590-test7.py-on_connection_create_end-[42]-DEBUG-on_connection_create_end
2024-11-21 22:05:27,295-client.py-_do_request-[151]-DEBUG-Retrying after response code: 503
2024-11-21 22:05:27,497-client.py-_do_request-[110]-DEBUG-Attempt 2 out of 3
2024-11-21 22:05:27,498-test7.py-on_request_start-[32]-DEBUG-on_request_start
2024-11-21 22:05:27,498-test7.py-on_request_start-[33]-DEBUG-<CIMultiDict('headers': 'headers2')>
2024-11-21 22:05:27,500-test7.py-connection_reuseconn-[51]-DEBUG-connection_reuseconn
2024-11-21 22:05:27,746-client.py-_do_request-[151]-DEBUG-Retrying after response code: 503
2024-11-21 22:05:28,147-client.py-_do_request-[110]-DEBUG-Attempt 3 out of 3
2024-11-21 22:05:28,148-test7.py-on_request_start-[32]-DEBUG-on_request_start
2024-11-21 22:05:28,148-test7.py-on_request_start-[33]-DEBUG-<CIMultiDict('headers': 'headers2')>
2024-11-21 22:05:28,148-test7.py-connection_reuseconn-[51]-DEBUG-connection_reuseconn
2024-11-21 22:05:29,326-base_events.py-close-[672]-DEBUG-Close <_UnixSelectorEventLoop running=False closed=False debug=True>
复制代码
可以看到,在每次重试的时候自动切换了 header
,使用了提前传入的 header
。
总结
以上就是使用 aiohttp_retry
进行自动重试的内容了,当然上面只是举了一些例子,aiohttp_retry
还有更多的用法,大家可以根据自己的需求进行自定义或者查看官方文档。如果有无法满足的地方,可以尝试自己实现重试类,这种传入 retry_options
方式提供了极大的灵活度,我们可以根据自己的需求传入自定义的类,方便自定义逻辑。如果想要控制请求的流转流程,也可以使用 aiohttp
官方提供的 TraceConfig
,它可以在请求流转的各个时间对请求进行修改或者拦截。
评论