写点什么

python 基于 aiohttp 的异步爬虫实战时间

作者:Java-fenn
  • 2022 年 9 月 09 日
    湖南
  • 本文字数:6547 字

    阅读完需:约 21 分钟

主题


Python 协程钢铁知识库,一个学习 python 爬虫、数据分析的知识库。人生苦短,快用 python。


之前我们使用 requests 库爬取某个站点的时候,每发出一个请求,程序必须等待网站返回响应才能接着运行,而在整个爬虫过程中,整个爬虫程序是一直在等待的,实际上没有做任何事情。


像这种占用磁盘/内存 IO、网络 IO 的任务,大部分时间是 CPU 在等待的操作,就叫 IO 密集型任务。对于这种情况有没有优化方案呢,当然有,那就是使用 aiohttp 库实现异步爬虫。


aiohttp 是什么我们在使用 requests 请求时,只能等一个请求先出去再回来,才会发送下一个请求。明显效率不高阿,这时候如果换成异步请求的方式,就不会有这个等待。一个请求发出去,不管这个请求什么时间响应,程序通过 await 挂起协程对象后直接进行下一个请求。


解决方法就是通过 aiohttp + asyncio,什么是 aiohttp?一个基于 asyncio 的异步 HTTP 网络模块,可用于实现异步爬虫,速度明显快于 requests 的同步爬虫。


requests 和 aiohttp 区别区别就是一个同步一个是异步。话不多说直接上代码看效果。


安装 aiohttppip install aiohttprequests 同步示例:#!/usr/bin/env python

-- coding: utf-8 --

author: 钢铁知识库

import timeimport requests

同步请求

def main():start = time.time()for i in range(5):res = requests.get('http://httpbin.org/delay/2')print(f'当前时间:{datetime.datetime.now()}, status_code = {res.status_code}')print(f'requests 同步耗时:{time.time() - start}')


if name == 'main':main()


'''当前时间:2022-09-05 15:44:51.991685, status_code = 200 当前时间:2022-09-05 15:44:54.528918, status_code = 200 当前时间:2022-09-05 15:44:57.057373, status_code = 200 当前时间:2022-09-05 15:44:59.643119, status_code = 200 当前时间:2022-09-05 15:45:02.167362, status_code = 200requests 同步耗时:12.785893440246582'''可以看到 5 次请求总共用 12.7 秒,再来看同样的请求异步多少时间。


aiohttp 异步示例:#!/usr/bin/env python

file: day6-9 同步和异步.py

author: 钢铁知识库

import asyncioimport timeimport aiohttp


async def async_http():# 声明一个支持异步的上下文管理器 async with aiohttp.ClientSession() as session:res = await session.get('http://httpbin.org/delay/2')print(f'当前时间:{datetime.datetime.now()}, status_code = {res.status}')


tasks = [async_http() for _ in range(5)]start = time.time()

Python 3.7 及以后,不需要显式声明事件循环,可以使用 asyncio.run()来代替最后的启动操作

asyncio.run(asyncio.wait(tasks))print(f'aiohttp 异步耗时:{time.time() - start}')


'''当前时间:2022-09-05 15:42:32.363966, status_code = 200 当前时间:2022-09-05 15:42:32.366957, status_code = 200 当前时间:2022-09-05 15:42:32.374973, status_code = 200 当前时间:2022-09-05 15:42:32.384909, status_code = 200 当前时间:2022-09-05 15:42:32.390318, status_code = 200aiohttp 异步耗时:2.5826876163482666'''两次对比可以看到执行过程,时间一个是顺序执行,一个是同时执行。这就是同步和异步的区别。


aiohttp 使用介绍接下来我们会详细介绍 aiohttp 库的用法和爬取实战。aiohttp 是一个支持异步请求的库,它和 asyncio 配合使用,可以使我们非常方便地实现异步请求操作。asyncio 模块,其内部实现了对 TCP、UDP、SSL 协议的异步操作,但是对于 HTTP 请求,就需要 aiohttp 实现了。


aiohttp 分为两部分,一部分是 Client,一部分是 Server。下面来说说 aiohttp 客户端部分的用法。


基本实例先写一个简单的案例


#!/usr/bin/env python

-- coding: utf-8 --

@Author : 钢铁知识库

import asyncioimport aiohttp


async def get_api(session, url):# 声明一个支持异步的上下文管理器 async with session.get(url) as response:return await response.text(), response.status


async def main():async with aiohttp.ClientSession() as session:html, status = await get_api(session, 'http://httpbin.org/delay/2')print(f'html: {html[:50]}')print(f'status : {status}')


if name == 'main':# Python 3.7 及以后,不需要显式声明事件循环,可以使用 asyncio.run(main())来代替最后的启动操作 asyncio.get_event_loop().run_until_complete(main())'''html: {"args": {},"data": "","files": {},


status : 200


Process finished with exit code 0'''aiohttp 请求的方法和之前有明显区别,主要包括如下几点:


除了导入 aiohttp 库,还必须引入 asyncio 库,因为要实现异步,需要启动协程。异步的方法定义不同,前面都要统一加 async 来修饰。with as 用于声明上下文管理器,帮我们自动分配和释放资源,加上 async 代码支持异步。对于返回协程对象的操作,前面需要加 await 来修饰。response.text()返回的是协程对象。最后运行启用循环事件注意:Python3.7 及以后的版本中,可以使用 asyncio.run(main())代替最后的启动操作。


URL 参数设置对于 URL 参数的设置,我们可以借助 params 设置,传入一个字典即可,实例如下:


#!/usr/bin/env python

-- coding: utf-8 --

@Author : 钢铁知识库

import aiohttpimport asyncio


async def main():params = {'name': '钢铁知识库', 'age': 23}async with aiohttp.ClientSession() as session:async with session.get('https://www.httpbin.org/get', params=params) as res:print(await res.json())


if name == 'main':asyncio.get_event_loop().run_until_complete(main())'''{'args': {'age': '23', 'name': '钢铁知识库'}, 'headers': {'Accept': '/', 'Accept-Encoding': 'gzip, deflate', 'Host': 'www.httpbin.org', 'User-Agent': 'Python/3.8 aiohttp/3.8.1', 'X-Amzn-Trace-Id': 'Root=1-63162e34-1acf7bde7a6d801368494c72'}, 'origin': '122.55.11.188', 'url': 'https://www.httpbin.org/get?name=钢铁知识库 &age=23'}'''可以看到实际请求的 URL 后面带了后缀,这就是 params 的内容。


请求类型除了 get 请求,aiohttp 还支持其它请求类型,如 POST、PUT、DELETE 等,和 requests 使用方式类似。


session.post('http://httpbin.org/post', data=b'data')session.put('http://httpbin.org/put', data=b'data')session.delete('http://httpbin.org/delete')session.head('http://httpbin.org/get')session.options('http://httpbin.org/get')session.patch('http://httpbin.org/patch', data=b'data')要使用这些方法,只需要把对应的方法和参数替换一下。用法和 get 类似就不再举例。


响应的几个方法对于响应来说,我们可以用如下方法分别获取其中的响应情况。状态码、响应头、响应体、响应体二进制内容、响应体 JSON 结果,实例如下:


#!/usr/bin/env python

@Author : 钢铁知识库

import aiohttpimport asyncio


async def main():data = {'name': '钢铁知识库', 'age': 23}async with aiohttp.ClientSession() as session:async with session.post('https://www.httpbin.org/post', data=data) as response:print('status:', response.status) # 状态码 print('headers:', response.headers) # 响应头 print('body:', await response.text()) # 响应体 print('bytes:', await response.read()) # 响应体二进制内容 print('json:', await response.json()) # 响应体 json 数据


if name == 'main':asyncio.get_event_loop().run_until_complete(main())'''status: 200headers: <CIMultiDictProxy('Date': 'Tue, 06 Sep 2022 00:18:36 GMT', 'Content-Type': 'application/json', 'Content-Length': '534', 'Connection': 'keep-alive', 'Server': 'gunicorn/19.9.0', 'Access-Control-Allow-Origin': '', 'Access-Control-Allow-Credentials': 'true')>body: {"args": {},"data": "","files": {},"form": {"age": "23","name": "\u94a2\u94c1\u77e5\u8bc6\u5e93"},"headers": {"Accept": "/*","Accept-Encoding": "gzip, deflate","Content-Length": "57","Content-Type": "application/x-www-form-urlencoded","Host": "www.httpbin.org","User-Agent": "Python/3.8 aiohttp/3.8.1","X-Amzn-Trace-Id": "Root=1-631691dc-6aa1b2b85045a1a0481d06e1"},"json": null,"origin": "122.55.11.188","url": "https://www.httpbin.org/post"}


bytes: b'{\n "args": {}, \n "data": "", \n "files": {}, \n "form": {\n "age": "23", \n "name": "\u94a2\u94c1\u77e5\u8bc6\u5e93"\n }, \n "headers": {\n "Accept": "/", \n "Accept-Encoding": "gzip, deflate", \n "Content-Length": "57", \n "Content-Type": "application/x-www-form-urlencoded", \n "Host": "www.httpbin.org", \n "User-Agent": "Python/3.8 aiohttp/3.8.1", \n "X-Amzn-Trace-Id": "Root=1-631691dc-6aa1b2b85045a1a0481d06e1"\n }, \n "json": null, \n "origin": "122.5.132.196", \n "url": "https://www.httpbin.org/post"\n}\n'json: {'args': {}, 'data': '', 'files': {}, 'form': {'age': '23', 'name': '钢铁知识库'}, 'headers': {'Accept': '/', 'Accept-Encoding': 'gzip, deflate', 'Content-Length': '57', 'Content-Type': 'application/x-www-form-urlencoded', 'Host': 'www.httpbin.org', 'User-Agent': 'Python/3.8 aiohttp/3.8.1', 'X-Amzn-Trace-Id': 'Root=1-631691dc-6aa1b2b85045a1a0481d06e1'}, 'json': None, 'origin': '122.55.11.188', 'url': 'https://www.httpbin.org/post'}'''可以看到有些字段前面需要加 await,因为其返回的是一个协程对象(如 async 修饰的方法),那么前面就要加 await。


超时设置我们可以借助 ClientTimeout 对象设置超时,例如要设置 1 秒的超时时间,可以这么实现:


#!/usr/bin/env python

@Author : 钢铁知识库

import aiohttpimport asyncio


async def main():# 设置 1 秒的超时 timeout = aiohttp.ClientTimeout(total=1)data = {'name': '钢铁知识库', 'age': 23}async with aiohttp.ClientSession(timeout=timeout) as session:async with session.get('https://www.httpbin.org/delay/2', data=data) as response:print('status:', response.status) # 状态码


if name == 'main':asyncio.get_event_loop().run_until_complete(main())'''Traceback (most recent call last):####中间省略 ####raise asyncio.TimeoutError from Noneasyncio.exceptions.TimeoutError'''这里设置了超时 1 秒请求延时 2 秒,发现抛出异常 asyncio.TimeoutError ,如果正常则响应 200。


并发限制 aiohttp 可以支持非常高的并发量,但面对高并发网站可能会承受不住,随时有挂掉的危险,这时需要对并发进行一些控制。现在我们借助 asyncio 的 Semaphore 来控制并发量,实例如下:


#!/usr/bin/env python

-- coding: utf-8 --

@Author : 钢铁知识库

import asynciofrom datetime import datetimeimport aiohttp

声明最大并发量

semaphore = asyncio.Semaphore(2)


async def get_api():async with semaphore:print(f'scrapting...{datetime.now()}')async with session.get('https://www.baidu.com') as response:await asyncio.sleep(2)# print(f'当前时间:{datetime.now()}, {response.status}')


async def main():global sessionsession = aiohttp.ClientSession()tasks = [asyncio.ensure_future(get_api()) for _ in range(1000)]await asyncio.gather(*tasks)await session.close()


if name == 'main':asyncio.get_event_loop().run_until_complete(main())'''scrapting...2022-09-07 08:11:14.190000scrapting...2022-09-07 08:11:14.292000scrapting...2022-09-07 08:11:16.482000scrapting...2022-09-07 08:11:16.504000scrapting...2022-09-07 08:11:18.520000scrapting...2022-09-07 08:11:18.521000'''在 main 方法里,我们声明了 1000 个 task,如果没有通过 Semaphore 进行并发限制,那这 1000 放到 gather 方法后会被同时执行,并发量相当大。有了信号量的控制之后,同时运行的 task 数量就会被控制,这样就能给 aiohttp 限制速度了。


aiohttp 异步爬取实战接下来我们通过异步方式练手一个小说爬虫,需求如下:


需求页面: https://dushu.baidu.com/pc/detail?gid=4308080950


目录接口: https://dushu.baidu.com/api/pc/getCatalog?data={"book_id":"4308080950"}


详情接口: https://dushu.baidu.com/api/pc/getChapterContent?data={"book_id":"4295122774","cid":"4295122774|116332"}


关键参数: book_id :小说 ID、 cid :章节 id


采集要求:使用协程方式写入,数据存放进 mongo


需求分析:点开需求页面,通过 F12 抓包可以发现两个接口。一个目录接口,一个详情接口。


首先第一步先请求目录接口拿到 cid 章节 id,然后将 cid 传递给详情接口拿到小说数据,最后存入 mongo 即可。


话不多说,直接上代码:


#!/usr/bin/env python

-- coding: utf-8 --

@Author : 钢铁知识库

不合适就是不合适,真正合适的,你不会有半点犹豫。

import asyncioimport json,reimport loggingimport aiohttpimport requestsfrom utils.conn_db import ConnDb

日志格式

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')

章节目录 api

b_id = '4308080950'url = 'https://dushu.baidu.com/api/pc/getCatalog?data={"book_id":"'+b_id+'"}'headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) ""Chrome/104.0.0.0 Safari/537.36"}

并发声明

semaphore = asyncio.Semaphore(5)


async def download(title,b_id, cid):data = {"book_id": b_id,"cid": f'{b_id}|{cid}',}data = json.dumps(data)detail_url = 'https://dushu.baidu.com/api/pc/getChapterContent?data={}'.format(data)async with semaphore:async with aiohttp.ClientSession(headers=headers) as session:async with session.get(detail_url) as response:res = await response.json()content = {'title': title,'content': res['data']['novel']['content']}# print(title)await save_data(content)


async def save_data(data):if data:client = ConnDb().conn_motor_mongo()db = client.baidu_novelcollection = db.novellogging.info('saving data %s', data)await collection.update_one({'title': data.get('title')},{'$set': data},upsert=True)


async def main():res = requests.get(url, headers=headers)tasks = []for re in res.json()['data']['novel']['items']: # 拿到某小说目录 cidtitle = re['title']cid = re['cid']tasks.append(download(title, b_id, cid)) # 将请求放到列表里,再通过 gather 执行并发 await asyncio.gather(*tasks)


if name == 'main':asyncio.run(main())至此,我们就使用 aiohttp 完成了对小说章节的爬取。


要实现异步处理,得先要有挂起操作,当一个任务需要等待 IO 结果的时候,可以挂起当前任务,转而去执行其他任务,这样才能充分利用好资源,要实现异步,需要了解 await 的用法,使用 await 可以将耗时等待的操作挂起,让出控制权。当协程执行的时候遇到 await,时间循环就会将本协程挂起,转而去执行别的协程,直到其他的协程挂起或执行完毕。


await 后面的对象必须是如下格式之一:


A native coroutine object returned from a native coroutine function,一个原生 coroutine 对象。A generator-based coroutine object returned from a function decorated with types.coroutine,一个由 types.coroutine 修饰的生成器,这个生成器可以返回 coroutine 对象。An object with an await method returning an iterator,一个包含 await 方法的对象返回的一个迭代器。---- 20220909 钢铁知识库


总结以上就是借助协程 async 和异步 aiohttp 两个主要模块完成异步爬虫的内容,


aiohttp 以异步方式爬取网站的耗时远小于 requests 同步方式,以上列举的例子希望对你有帮助。


注意,线程和协程是两个概念,后面找机会我们再聊聊进程和线程、线程和协程的关系。

用户头像

Java-fenn

关注

需要Java资料或者咨询可加我v : Jimbye 2022.08.16 加入

还未添加个人简介

评论

发布
暂无评论
python 基于aiohttp的异步爬虫实战时间_Java_Java-fenn_InfoQ写作社区