前言
我们实战经常会遇到以下几个问题:
1、遇到一个利用步骤十分繁琐的漏洞,中间错一步就无法利用
2、挖到一个通用漏洞,想要批量刷洞小赚一波,但手动去测试每个网站工作量太大
这个时候编写一个 poc 脚本将会将会减轻我们很多工作。本文将以编写一个高效通用的 poc 脚本为目的,学习一些必要的 python 知识,这周也是拒绝做工具小子努力学习的一周
requests 模块使用技巧
Requests 是 Python 中一个常用的 HTTP 请求库,使用 Requests 库来发起网络请求非常简单,具体的使用方法这里就不多介绍了,这里只提几个 Requests 模块的使用技巧,请收好
取消重定向
Requests 会自动处理所有重定向,但有时我们并不需要重定向,可以通过allow_redirects参数禁用重定向处理:
r = requests.get('http://github.com', allow_redirects=False)
复制代码
SSL 证书验证
Requests 在请求 https 网站默认会验证 SSL 证书,可有些网站并没有证书,可增加verify=False参数忽略证书验证,但此时可能会遇到烦人的InsecureRequestWarning警告消息。最终能没有警告消息也能访问无证书的 https 网站的方法如下:
import requestsfrom requests.packages.urllib3.exceptions import InsecureRequestWarningrequests.packages.urllib3.disable_warnings(InsecureRequestWarning)requests.get('https://github.com', verify=False)
复制代码
代理
使用代理的目的就不说了,使用方法如下:
# http代理,需要指定访问的http协议和https协议两种proxies = { "http": "http://127.0.0.1:8080", "https": "http://127.0.0.1:1080",}# socks5代理proxies = { 'http': 'socks5://user:pass@host:port', 'https': 'socks5://user:pass@host:port'}requests.get("http://example.org", proxies=proxies)
复制代码
有个使用技巧就是代理到 burp 中,检查一下 python 发包。如我本地抓到 requests 请求包如下,可以发现特征十分明显,所以我们在实战使用时尽量修改 User-Agent
保持 cookie
使用 session 会话对象,向同一主机发送多个请求,底层的 TCP 连接将会被重用,不仅能提性能还能保持 cookie
s = requests.Session()s.get('http://httpbin.org/cookies/set/sessioncookie/123456789')r = s.get("http://httpbin.org/cookies")
复制代码
在编写 poc 脚本时我们只需要利用 Requests 模块发送带有 payload 的数据即可,配合上这里的小技巧可能会有更好的体验
验证结果
发送带有 payload 的请求后,我们需要通过分析响应包判断是否存在漏洞。往往存在漏洞的响应包都有一些特殊值,我们只需要在响应包中找到这样的特殊值即可证明存在漏洞,所以这里我们通常有两种写法
成员运算符 - in
if 'xxx' in r.text: print('存在漏洞')else: print('不存在漏洞')
复制代码
正则匹配 - re.search()
if re.search('xxx',r.text): print('存在漏洞')else: print('不存在漏洞')
复制代码
这两种写法差不多,不过 re.search()有个好处是可以使用正则表达式,在漏洞特征是动态变化的情况时也能有效的捕捉
单线程 poc 脚本
此时我们已经能写一个单线程 poc 脚本了,我对单线程的 poc 脚本的要求十分简单,就是简单,在面对不同的漏洞时简单修改几行代码就可以了。这里提供一个我自己写的单线程 poc 脚本,大概意思就是这样
import requestsimport refrom requests.packages.urllib3.exceptions import InsecureRequestWarningrequests.packages.urllib3.disable_warnings(InsecureRequestWarning)def Poc(url): proxy = { 'http':'http://127.0.0.1:8080', 'https':'http://127.0.0.1:8080' } headers = { 'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36', 'Connection':'close' } data = {'name':'xxxx','value':'xxxx'} try: response = requests.post(url=url,headers=headers,data=data,verify=False,proxies=proxy,timeout=10) if 'baidu' in response.text: print('存在漏洞') else: print('none') except Exception as e: print(f'请求失败:{e}')
if __name__ == '__main__': url = 'https://www.baidu.com' Poc(url)
复制代码
使用多线程
当我们想批量验证数个网站是否存在漏洞时,就需要多线程来提升效率了。关于 Python 多线程的详细知识这里就不是这里的重点了。这里我们将利用 Threading 和 queue 做一个多线程 poc 脚本,我计划的流程如下
把所有目标 url 放到 queue 队列中;
启动多线程从 queue 队列中获取目标并执行;
保存执行结果。
具体代码最后会给出
颜色标记
我在使用多线程时就遇到一个问题,因为多线程处理的数据比较多,终端瞬间会输出大量信息,很容易就会忽略一些关键的信息
然后我就想用颜色来区分不同的信息,在 linux 终端中使用\033[显示方式;前景色;背景色m的格式就能输出各个颜色的字体。这里推荐 python 第三方库:colorama
colorama 是一个可以跨多终端显示不同颜色字符与背景的第三方库,在 linux 终端上,使用 ANSI 转义字符来实现彩色字体的输出。在 windows 的终端上,通过包装 stdout 实现。在 windows 和 linux 上有不同的实现方案,从而达到跨平台的效果
安装第三方库的命令各位应该都会吧
具体使用参考官方文档:https://pypi.org/project/colorama/
我的使用习惯就是报错信息的字体使用红色,发现漏洞的字体使用绿色,此时部分代码如下:
from colorama import init,Foreinit(autoreset=True)print(Fore.GREEN + '[+]存在漏洞')print(Fore.RED + '[!]连接错误')
复制代码
使用颜色后的终端输出如下,现在使用体验上明显会更好一点,大家也可以根据自己的喜好去设置
添加进度条
我们使用多线程的目的就是为了更快的处理更多的 url,但目标过多,我们还是免不了更长时间的等待。我们经常会把脚本挂在那里跑,然后呆呆的等着。然后我就想做一个进度条,根据进度条能大概的去预估时间,然后安排自己的工作,提升工作效率,这不就是使用脚本的意义吗
我首先就找到了很多人推荐的第三方库:tqdm,在多线程中使用时可以使用手动更新进度
import timefrom tqdm import tqdm
with tqdm(total=200) as pbar: pbar.set_description('Processing:') for i in range(20): time.sleep(0.1) pbar.update(10)
复制代码
但我这里就遇到一个问题,很多人使用 tqdm 时只输出一个 Progress bar 任务条,但我们的需求是希望同时输出每次漏洞探测结果和任务条,这会导致这两者在终端中显示的混乱,如下图所示
有没有一种办法能让任务条一直固定输出在终端的末尾呢,这样两个信息都能很清晰的显示
我找了好久解决方法,但官方似乎说没有找到在多个平台上平等使用 tqdm 的方法,就没有这个功能。不过我最终找到官方提供了一个 tqdm.write()方法,似乎能解决这个问题,只需要把脚本中所有 print()方法换成 tqdm.write()方法
到这里我们就成功的拥有了一个进度条
【一>所有资源获取<一】1、网络安全学习路线 2、电子书籍(白帽子)3、安全大厂内部视频 4、100 份 src 文档 5、常见安全面试题 6、ctf 大赛经典题目解析 7、全套工具包 8、应急响应笔记
多线程 poc 脚本
我最终写好的多线程 poc 脚本如下,中间还有很多可以优化的地方,希望能得到大家的指点
import requestsfrom requests.packages.urllib3.exceptions import InsecureRequestWarningimport threadingimport queuefrom colorama import init,Forefrom tqdm import tqdminit(autoreset=True)requests.packages.urllib3.disable_warnings(InsecureRequestWarning)global_file_target_url = 'url.txt'global_file_result = 'right.txt'global_threads_num = 12global_q = queue.Queue()global_list_result = []# 目标uriglobal_where = ''# payload 成功时页面标志信息global_payload = 'test'global_request_proxy = { 'http':'socks5://127.0.0.1:8080', 'https':'socks5://127.0.0.1:8080'}global_request_headers = { 'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36', 'cookie':'xxxxxxxxxxxxxx', 'Connection':'close' #关闭多余的连接请求}global_request_data = {'name':'xxxx','value':'xxxx'} #POST传递的数据global_error = 0def req(url): global global_error i = 0 while i<3: if i>0: #print(f'[!]第{i}次重试请求{url}') tqdm.write(f'[!]第{i}次重试请求{url}') try: response = requests.get(url=url,headers=global_request_headers,verify=False,timeout=10) response.encoding = response.apparent_encoding text = response.text if global_payload in text: return True else: return False except Exception as e: if i==0: global_error +=1 i = i+1 #print(Fore.RED+f'[!]{url}请求失败') tqdm.write(Fore.RED+f'[!]{url}请求失败')def poc(pbar): while not global_q.empty(): target_url = global_q.get() url = target_url+global_where if req(url): #print(Fore.GREEN+'[+]存在漏洞:'+target_url) tqdm.write(Fore.GREEN+'[+]存在漏洞:'+target_url) global_list_result.append(target_url) else: #print('[-]未发现漏洞') tqdm.write('[-]未发现漏洞') pbar.update(1)def main(): # 1、添加目标url队列 with open(global_file_target_url,'r') as f: urls = f.readlines() for url in urls: url = url.strip() global_q.put(url) num_url = global_q.qsize() pbar = tqdm(total=num_url) pbar.set_description('Processing:') tqdm.write('url总数:'+str(num_url)) # 2、启动多线程poc验证 threads = [] for _ in range(global_threads_num): t = threading.Thread(target=poc,args=(pbar,)) threads.append(t) t.start() for t in threads: t.join() # 3、保存结果到文件 global_file_result if global_list_result: file = open(global_file_result,'w') for res in global_list_result: file.write(res+"\n") file.close() tqdm.write(f'失败请求数{global_error}')if __name__ == '__main__': main()
复制代码
参考:
https://mp.weixin.qq.com/s/aWbPUANyglL5kWiiFfSRYwhttps://mp.weixin.qq.com/s/Dwec68-ROfiqWeRUY4wEpg~~~~
评论