第一次发表实战类型的爬虫文章,如果有那里不明白或者出现 bug 的可以找我私信,欢迎大家在下面评论,可以给出我更好的建议,欢迎大家指正.
网站链接放在这里了鬼吹灯
主要是以协程为主来爬取小说得章节内容,协程爬取不懂得小伙伴可以先关注我一手,后续会整理理论的知识放在专栏里
整体思路
得到鬼吹灯页面的源码
解析源码得到每一个章节的 url
得到书名,这个书名通过切片得到
通过 url 得到一个页面的内容
使用并发执行多个任务下载
代码实现
导包
import asyncioimport osfrom aiohttp import ClientSessionimport requestsimport aiofilesfrom bs4 import BeautifulSoup
复制代码
得到页面源码的方法
参数是传入的 url
返回出页面的源码
def get_page_source(url): """ 获取页面源码的方法 :param url: 传入的url :return: 返回的是页面的源码 """ headers={ 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 Edg/109.0.1518.78' } res = requests.get(url,headers=headers) data = res.content.decode() return data
复制代码
解析页面得到 url
在这里,我使用了集合来存储每一个章节的 url,使用 xpath 来得到章节 url,我个人是比较喜欢使用 xpath,在这里给出另一种写法,使用的是的 beautifulSoup
在页面 F12 查看,我们找到的是 div 下的 ul 下的 li 下的 a 标签的属性 href
写法一:使用 xpath
def parse_page_source(html): """ 对页面进行解析,得到我们每一个章节的url :param html: 传入的页面源码 :return: 返回的是一个带有所有章节url的集合 """ book_list=[] tree = etree.HTML(html) mulu_list = tree.xpath('//div[@class="mulu-list quanji"]') for mulu in mulu_list: # 抓取整个页面下章节的url a = mulu.xpath('./ul/li/a/@href') # 注意这里一定要在for循环里添加集合 book_list.append(a) return book_list
复制代码
写法二:bs4
def parse_page_source(html): """ 对页面进行解析,得到我们每一个章节的url :param html: 传入的页面源码 :return: 返回的是一个带有所有章节url的集合 """ book_list = [] soup = BeautifulSoup(html, 'html.parser') a_list = soup.find_all('div', attrs={'class': 'mulu-list quanji'}) for a in a_list: a_list = a.find_all('a') for href in a_list: chapter_url = href['href'] book_list.append(chapter_url) return book_list
复制代码
得到和章节名
通过传入的章节 url,进行切片以下面一个链接为例
https://www.51shucheng.net/daomu/guichuideng/jingjuegucheng/2464.html
我们按照/切分就有了['https:', '', 'www.51shucheng.net', 'daomu', 'guichuideng', 'jingjuegucheng', '2464.html'] 然后我们取倒数第二个元素就有了 b_name
def get_book_name(chapter_url): """ 得到名称,为了后续下载好分辨 :param chapter_url: :return: """ book_chapter_name = chapter_url.split('/')[-2] return book_chapter_name
复制代码
下载一个章节的内容
使用 xpath 来写
async def aio_download_one_content(chapter_url, single): """ 下载一个章节内容 :param chapter_url: 传入得到的章节url :param single: 使用async with single就是10个并发 :return: """ c_name = get_book_name(chapter_url) for i in range(10): try: async with single: async with ClientSession() as session: async with session.get(chapter_url) as res: # 得到章节内容的页面的源码 page_source = await res.content.read() tree = etree.HTML(page_source) # 章节名称 base_title = tree.xpath('//h1/text()')[0] if(':'in base_title): number = base_title.split(':')[0] con = base_title.split(':')[1] title = number+con else: title=base_title # 章节内容 content = tree.xpath('//div[@class="neirong"]/p/text()') chapter_content = '\n'.join(content) if not os.path.exists(f'{book_name}/{c_name}'): os.makedirs(f'{book_name}/{c_name}') async with aiofiles.open(f'{book_name}/{c_name}/{title}.txt', mode="w", encoding='utf-8') as f: await f.write(chapter_content) print(chapter_url, "下载完毕!") return "" except Exception as e: print(e) print(chapter_url, "下载失败!, 重新下载. ") return chapter_url
复制代码
这段代码单独拿出来是因为有的章节名称是这样的<<第 19 章 : 考古队>>,这样的数据是不对的,放在文件里无法命名,这就导致了后续能写入文件的只有章节名没有:的内容,所以我对第一次筛选出的数据进行切片,如果遇到:就把前面和后面的数据切出来在组合,如果没遇到就让一个新的变量来接收 base_title
# 章节名称 base_title = tree.xpath('//h1/text()')[0] if(':'in base_title): number = base_title.split(':')[0] con = base_title.split(':')[1] title = number+con else: title=base_title
复制代码
使用 bs4 来写
async def aio_download_one(chapter_url, signal): """ 下载一个章节内容 :param chapter_url: 传入得到的章节url :param single: 使用async with single就是10个并发 :return: """ c_name = get_book_name(chapter_url) for c in range(10): try: async with signal: async with aiohttp.ClientSession() as session: async with session.get(chapter_url) as resp: # 得到章节内容的页面的源码 page_source = await resp.text() soup = BeautifulSoup(page_source, 'html.parser') # 章节名称 base_title = soup.find('h1').text if (':' in base_title): number = base_title.split(':')[0] con = base_title.split(':')[1] title = number + con else: title = base_title # 章节内容 p_content = soup.find('div', attrs={'class': 'neirong'}).find_all('p') content = [p.text + '\n' for p in p_content] chapter_content = '\n'.join(content) if not os.path.exists(f'{book_name}/{c_name}'): os.makedirs(f'{book_name}/{c_name}') async with aiofiles.open(f'{book_name}/{c_name}/{title}.txt', mode="w", encoding='utf-8') as f: await f.write(chapter_content) print(chapter_url, "下载完毕!") return "" except Exception as e: print(e) print(chapter_url, "下载失败!, 重新下载. ") return chapter_url
复制代码
协程下载
async def aio_download(url_list): # 创建一个任务列表 tasks = [] # 设置最多10个任务并行运作 semaphore = asyncio.Semaphore(10) for h in url_list: tasks.append(asyncio.create_task(aio_download_one_content(h, semaphore))) await asyncio.wait(tasks)
复制代码
主函数运行
主函数运行就没什么可说的了,这里注意一点就是最后不要 loop.close(),这样的话会导致你还没有爬取完数据,loop.close()就会关闭,情况如下,还剩一点就爬完了,结果报错了
if __name__ == '__main__': url = 'https://www.51shucheng.net/daomu/guichuideng' book_name = '鬼吹灯' if not os.path.exists(book_name): os.makedirs(book_name) source = get_page_source(url) href_list = parse_page_source(source) loop = asyncio.get_event_loop() loop.run_until_complete(aio_download(href_list))
复制代码
完成效果图
我就不一一截图了
总结
为什么我在这里比对了 xpath 和 bs4 两种代码,小伙伴可以仔细看一下,在 xpath 中,我想拿到数据,找到它,大量的使用了//这种,这样的话就会从源码内全局检索,这就导致了我想爬取文章内容会很慢,有些时候还会超时导致报错.所以我们使用 xpath 的时候,想让他的速度提高,最好有一个指定的点 到了再//
可以理解为下面这种情况
def getList(): url = 'https://www.xiurenba.cc/XiuRen/' response = requests.get(url, headers=headers) data = response.c ontent.decode() # print(data) tree = etree.HTML(data) li_list = tree.xpath('//ul[@class="update_area_lists cl"]/li') return li_list
复制代码
def get_single_url():
li_list = getList()
for li in li_list:
single_url = 'https://www.xiurenba.cc' + li.xpath('./a/@href')[0]
复制代码
还有就是遇到了特殊符号要把它干掉,或者替换掉,这样就可以正常爬取数据
如果有小伙伴想要直接拿取源码的话,可以顺着代码实现一步步粘贴过去
评论