写点什么

Spider 实战系列 - 爬取鬼吹灯小说

作者:浅辄
  • 2023-04-15
    吉林
  • 本文字数:4266 字

    阅读完需:约 14 分钟

Spider实战系列-爬取鬼吹灯小说

第一次发表实战类型的爬虫文章,如果有那里不明白或者出现 bug 的可以找我私信,欢迎大家在下面评论,可以给出我更好的建议,欢迎大家指正.


网站链接放在这里了鬼吹灯


主要是以协程为主来爬取小说得章节内容,协程爬取不懂得小伙伴可以先关注我一手,后续会整理理论的知识放在专栏里



整体思路

  1. 得到鬼吹灯页面的源码

  2. 解析源码得到每一个章节的 url

  3. 得到书名,这个书名通过切片得到

  4. 通过 url 得到一个页面的内容

  5. 使用并发执行多个任务下载

代码实现

导包

import asyncioimport osfrom aiohttp import ClientSessionimport requestsimport aiofilesfrom bs4 import BeautifulSoup
复制代码

得到页面源码的方法

参数是传入的 url


返回出页面的源码


def get_page_source(url):    """    获取页面源码的方法    :param url: 传入的url    :return: 返回的是页面的源码    """    headers={        'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 Edg/109.0.1518.78'    }    res = requests.get(url,headers=headers)    data = res.content.decode()    return data
复制代码

解析页面得到 url

在这里,我使用了集合来存储每一个章节的 url,使用 xpath 来得到章节 url,我个人是比较喜欢使用 xpath,在这里给出另一种写法,使用的是的 beautifulSoup



在页面 F12 查看,我们找到的是 div 下的 ul 下的 li 下的 a 标签的属性 href

写法一:使用 xpath

def parse_page_source(html):    """    对页面进行解析,得到我们每一个章节的url    :param html: 传入的页面源码    :return: 返回的是一个带有所有章节url的集合    """    book_list=[]    tree = etree.HTML(html)    mulu_list = tree.xpath('//div[@class="mulu-list quanji"]')    for mulu in mulu_list:        # 抓取整个页面下章节的url        a = mulu.xpath('./ul/li/a/@href')        # 注意这里一定要在for循环里添加集合        book_list.append(a)    return book_list
复制代码

写法二:bs4

def parse_page_source(html):    """    对页面进行解析,得到我们每一个章节的url    :param html: 传入的页面源码    :return: 返回的是一个带有所有章节url的集合    """    book_list = []    soup = BeautifulSoup(html, 'html.parser')    a_list = soup.find_all('div', attrs={'class': 'mulu-list quanji'})    for a in a_list:        a_list = a.find_all('a')        for href in a_list:            chapter_url = href['href']            book_list.append(chapter_url)    return book_list
复制代码

得到和章节名

通过传入的章节 url,进行切片以下面一个链接为例


https://www.51shucheng.net/daomu/guichuideng/jingjuegucheng/2464.html


我们按照/切分就有了['https:', '', 'www.51shucheng.net', 'daomu', 'guichuideng', 'jingjuegucheng', '2464.html'] 然后我们取倒数第二个元素就有了 b_name


def get_book_name(chapter_url):    """    得到名称,为了后续下载好分辨    :param chapter_url:    :return:    """    book_chapter_name = chapter_url.split('/')[-2]    return book_chapter_name
复制代码

下载一个章节的内容


使用 xpath 来写


async def aio_download_one_content(chapter_url, single):    """    下载一个章节内容    :param chapter_url: 传入得到的章节url    :param single: 使用async with single就是10个并发    :return:    """    c_name = get_book_name(chapter_url)    for i in range(10):        try:            async with single:                async with ClientSession() as session:                    async with session.get(chapter_url) as res:                        # 得到章节内容的页面的源码                        page_source = await res.content.read()                        tree = etree.HTML(page_source)                        # 章节名称                        base_title = tree.xpath('//h1/text()')[0]                        if(':'in base_title):                            number = base_title.split(':')[0]                            con = base_title.split(':')[1]                            title = number+con                        else:                            title=base_title                        # 章节内容                        content = tree.xpath('//div[@class="neirong"]/p/text()')                        chapter_content = '\n'.join(content)                        if not os.path.exists(f'{book_name}/{c_name}'):                            os.makedirs(f'{book_name}/{c_name}')                            async with aiofiles.open(f'{book_name}/{c_name}/{title}.txt', mode="w",                                                     encoding='utf-8') as f:                                await f.write(chapter_content)                            print(chapter_url, "下载完毕!")                            return ""        except Exception as e:            print(e)            print(chapter_url, "下载失败!, 重新下载. ")    return chapter_url
复制代码


这段代码单独拿出来是因为有的章节名称是这样的<<第 19 章 : 考古队>>,这样的数据是不对的,放在文件里无法命名,这就导致了后续能写入文件的只有章节名没有:的内容,所以我对第一次筛选出的数据进行切片,如果遇到:就把前面和后面的数据切出来在组合,如果没遇到就让一个新的变量来接收 base_title


# 章节名称                    base_title = tree.xpath('//h1/text()')[0]                    if(':'in base_title):                        number = base_title.split(':')[0]                        con = base_title.split(':')[1]                        title = number+con                    else:                        title=base_title
复制代码


使用 bs4 来写


async def aio_download_one(chapter_url, signal):    """    下载一个章节内容    :param chapter_url: 传入得到的章节url    :param single: 使用async with single就是10个并发    :return:    """    c_name = get_book_name(chapter_url)    for c in range(10):        try:            async with signal:                async with aiohttp.ClientSession() as session:                    async with session.get(chapter_url) as resp:                        # 得到章节内容的页面的源码                        page_source = await resp.text()                        soup = BeautifulSoup(page_source, 'html.parser')                        # 章节名称                        base_title = soup.find('h1').text                        if (':' in base_title):                            number = base_title.split(':')[0]                            con = base_title.split(':')[1]                            title = number + con                        else:                            title = base_title                        # 章节内容                        p_content = soup.find('div', attrs={'class': 'neirong'}).find_all('p')                        content = [p.text + '\n' for p in p_content]                        chapter_content = '\n'.join(content)                        if not os.path.exists(f'{book_name}/{c_name}'):                            os.makedirs(f'{book_name}/{c_name}')                        async with aiofiles.open(f'{book_name}/{c_name}/{title}.txt', mode="w",                                                 encoding='utf-8') as f:                            await f.write(chapter_content)                        print(chapter_url, "下载完毕!")                        return ""        except Exception as e:            print(e)            print(chapter_url, "下载失败!, 重新下载. ")    return chapter_url
复制代码

协程下载

async def aio_download(url_list):    # 创建一个任务列表    tasks = []    # 设置最多10个任务并行运作    semaphore = asyncio.Semaphore(10)    for h in url_list:        tasks.append(asyncio.create_task(aio_download_one_content(h, semaphore)))    await asyncio.wait(tasks)
复制代码

主函数运行

主函数运行就没什么可说的了,这里注意一点就是最后不要 loop.close(),这样的话会导致你还没有爬取完数据,loop.close()就会关闭,情况如下,还剩一点就爬完了,结果报错了



if __name__ == '__main__':    url = 'https://www.51shucheng.net/daomu/guichuideng'    book_name = '鬼吹灯'    if not os.path.exists(book_name):        os.makedirs(book_name)    source = get_page_source(url)    href_list = parse_page_source(source)    loop = asyncio.get_event_loop()    loop.run_until_complete(aio_download(href_list))
复制代码

完成效果图



我就不一一截图了

总结

为什么我在这里比对了 xpath 和 bs4 两种代码,小伙伴可以仔细看一下,在 xpath 中,我想拿到数据,找到它,大量的使用了//这种,这样的话就会从源码内全局检索,这就导致了我想爬取文章内容会很慢,有些时候还会超时导致报错.所以我们使用 xpath 的时候,想让他的速度提高,最好有一个指定的点 到了再//


可以理解为下面这种情况


def getList():    url = 'https://www.xiurenba.cc/XiuRen/'    response = requests.get(url, headers=headers)    data = response.c    ontent.decode()    # print(data)    tree = etree.HTML(data)    li_list = tree.xpath('//ul[@class="update_area_lists cl"]/li')    return li_list
复制代码


def get_single_url():
li_list = getList()
for li in li_list:
single_url = 'https://www.xiurenba.cc' + li.xpath('./a/@href')[0]
复制代码


还有就是遇到了特殊符号要把它干掉,或者替换掉,这样就可以正常爬取数据


如果有小伙伴想要直接拿取源码的话,可以顺着代码实现一步步粘贴过去

发布于: 刚刚阅读数: 12
用户头像

浅辄

关注

大丈夫生于天地之间,岂能郁郁久居人之下 2022-11-08 加入

Java、Spider、Go

评论

发布
暂无评论
Spider实战系列-爬取鬼吹灯小说_案例分享_浅辄_InfoQ写作社区