本文首发于公众号:Hunter 后端
原文链接:Python笔记二之多线程
这一篇笔记介绍一下在 Python 中使用多线程。
注意:以下的操作都是在 Python 3.8 版本中试验,不同版本可能有不同之处,需要注意。
本篇笔记目录如下:
概念
多线程的使用示例 daemonrun()
线程对象的属性和设置
线程模块相关函数
threading.active_count()
threading.current_thread()
threading.enumerate()
线程的异常和函数结果获取
锁
线程池
result()
done()
exception()
cancel()
running()
如何探索出最佳的线程池线程数量
1、概念
关于进程与线程的概念,这里简单介绍下。
一个进程是一个独立的执行环境,包括代码、数据和系统资源等,每个进程都有自己的内存空间、文件描述符、环境变量等。
而线程存在于进程中,共享进程内的内存和资源。
至于多进程与多线程,多进程可以充分利用计算机的多核 CPU,适用于 CPU 密集型的任务,,比如进行大量计算操作
而多线程则适用于涉及到大量的 IO 操作的任务,比如网络请求,文件读写等,在 Python 中有一个 GIL 的概念,它的全称是 Global Interpreter Lock,为全局解释器锁。
GIL 的存在是为了使同一时刻只有一个线程在运行 Python 代码,保护解释器的内部数据避免收到并发访问的影响。
所以 Python 中的多线程操作实际上是在多个线程中进行切换,以此来实现想要的并发效果。
2、多线程的使用示例
前面介绍了 Python 中多线程的操作适用于 IO 密集型的任务,所以这里以访问某个接口为例介绍一下多线程的使用。
那个接口我们这里用 Flask 创建一个服务器,其内容如下:
# app/__init__.py
from flask import Flask
import time
def create_app():
app = Flask(__name__)
@app.route("/test/<int:delay>")
def test(delay):
time.sleep(delay)
return str(time.time())
return app
复制代码
这个接口通过 delay 参数可以指定接口的休眠时间返回,比如 /test/4
,那么接口响应时间大约会是 4 秒。
在 Python 中,用到多线程的模块是 threading
模块,以下是一个使用示例:
import threading
import time
import requests
def get_response(url):
response = requests.get(url)
print(response.content)
def test_multi_threading():
url = "http://192.168.1.6:5000/test/2"
threads = []
for i in range(20):
threads.append(threading.Thread(target=get_response, args=(url,)))
for t in threads:
t.start()
for t in threads:
t.join()
def test_single():
url = "http://192.168.1.6:5000/test/2"
for i in range(5):
get_response(url)
if __name__ == "__main__":
start_time = time.time()
test_multi_threading()
print("运行耗时:", time.time() - start_time)
start_time = time.time()
test_single()
print("运行耗时:", time.time() - start_time)
复制代码
在这里我们可以比对单个线程执行五次,需要的时间大约是 10 秒,而使用多线程的方式虽然调用了 20 次接口,但是耗时大约只有 2 秒,这就是多线程在 IO 密集型的情况下的好处。
接下来具体介绍下多线程的使用方法:
def test_multi_threading():
url = "http://192.168.1.6:5000/test/2"
threads = []
for i in range(20):
threads.append(threading.Thread(target=get_response, args=(url,)))
for t in threads:
t.start()
for t in threads:
t.join()
复制代码
在这里,我们通过 threading.Thread()
的方式创建一个线程,然后通过 .start()
方法开始线程活动。
接着通过 join()
方法阻塞调用这个方法的线程,在这里也就是主线程,等待 t 线程完成后再执行主线程后面的操作。
如果我们尝试注释掉 t.join()
这两行,那么主线程就会不等待 t 线程直接往后面执行,造成我们后面在主函数里计算的时间不准确。
daemon
可以根据这个参数设置线程是否为守护线程,所有线程创建的时候默认都不是守护线程,如果需要设置线程为守护线程,需要额外做设置。
守护线程是一种特殊类型的线程,生命周期受到主线程的影响,也就是说当主线程结束时,守护线程会被强制终止,它不会阻止主线程的正常执行,主线程也不会像其他线程调用了 join()
一样被阻塞。
守护线程通常用于执行一些辅助性任务,比如日志记录、定时任务等,示例如下,我们开启了一个守护线程用于定时 print() 某些信息:
def print_info():
while True:
print("daemon threading, curr_time:", time.time())
time.sleep(1)
def test_daemon_threading():
base_url = "http://192.168.1.6:5000/test/"
t1 = threading.Thread(target=get_response, args=(base_url + str(6),))
t2 = threading.Thread(target=get_response, args=(base_url + str(2),))
daemon_t = threading.Thread(target=print_info, args=(), daemon=True)
t1.start()
t2.start()
daemon_t.start()
t1.join()
t2.join()
复制代码
这样,守护线程 daemon_t 就会在后台一直循环打印信息,直到主线程结束,守护线程也会被强制终止。
run()
run()
和 start()
方法都和线程的执行有关。
start()
用于启动线程,线程变量调用 start()
后,比如前面的 t.start()
,会立即开始执行线程,且线程的执行与主线程并行进行。
而 run()
定义的是线程内的执行逻辑,是线程的入口点,表示的是线程活动的方法,线程开启后就会调用 run()
方法,执行线程的任务。
在执行 start()
方法后,线程会自动调用 run()
方法,以此来执行线程内需要调用的函数,我们可以通过重写 run()
方法来实现我们想要的定制化功能,比如在后面我们就是通过重写 run()
方法来实现线程的异常信息以及函数的结果返回的,
3、线程对象的属性和设置
线程本身有一些属性可以用于设置和获取,我们先创建一条线程:
t1 = threading.Thread(target=get_response, args=(base_url + str(6),))
复制代码
查看线程名称
线程名称只是用于标记线程的,并无实际意义,根据用户设置而定,比如前面创建了线程,默认名为 Thread-1
,我们可以通过下面的两个操作获取,两个操作是等效的:
设置线程名称
设置线程名称的方法如下:
t1.setName("test_thread")
复制代码
判断线程是否存活
在未进行 start() 操作前,不是存活状态:
判断线程是否是守护线程
t1.daemon
t1.isDaemon()
# False
复制代码
设置线程为守护线程
将线程设置为守护线程:
True
为是,False
为否
4、线程模块相关函数
对于 threading
模块,有一些函数可以用于进行相关操作,比如当前存活的线程对象,异常处理等。
接下来先介绍这些函数及其功能,之后会用一个示例应用上这些函数
1. threading.active_count()
返回当前存活的 Thread
对象的数量
2. threading.current_thread()
返回当前对应调用者的线程
3. threading.enumerate()
列表形式返回当前所有存活的 Thread
对象
接下来我们修改 print_info()
函数,运用我们刚刚介绍的这几种函数:
def print_info():
while True:
active_count = threading.active_count()
print("当前存活的线程数量为:", active_count)
for thread in threading.enumerate():
print("存活的线程分别是:", thread.getName())
print("当前所处的的线程名称为:", threading.current_thread().getName())
print("\n")
time.sleep(1)
复制代码
还是执行 test_daemon_threading()
就可以看到对应的输出信息。
5、线程的异常和函数结果获取
Python 中使用 threading
模块创建的线程中的默认异常以及函数执行结果是不会被主线程捕获的,因为线程是独立运行的,我们可以通过定义全局的变量,比如 dict 或者队列来获取对应的信息。
这里介绍一下通过改写 run()
方法来实现我们的功能。
import threading
import traceback
import time
import request
def get_response(url):
response = requests.get(url)
if url.endswith("2"):
1/0
return time.time()
def print_info():
while True:
active_count = threading.active_count()
print("当前存活的线程数量为:", active_count)
for thread in threading.enumerate():
print("存活的线程分别是:", thread.getName())
print("当前所处的的线程名称为:", threading.current_thread().getName())
print("\n")
time.sleep(1)
class MyThread(threading.Thread):
def __init__(self, func, *args, **kwargs):
super(MyThread, self).__init__()
self.func = func
self.args = args
self.kwargs = kwargs
self.result = None
self.is_error = None
self.trace_info = None
def run(self):
try:
self.result = self.func(*self.args, **self.kwargs)
except Exception as e:
self.is_error = True
self.trace_info = traceback.format_exc()
def get_result(self):
return self.result if self.is_error is not True else None
def test_get_exception_and_result():
base_url = "http://192.168.1.6:5000/test/"
t1 = MyThread(get_response, base_url + str(3))
t2 = MyThread(get_response, base_url + str(2))
daemon_t = MyThread(print_info)
daemon_t.setDaemon(True)
t1.start()
t2.start()
daemon_t.start()
t1.join()
t2.join()
print(t1.get_result())
print(t2.is_error)
print(t2.trace_info)
if __name__ == "__main__":
test_get_exception_and_result()
复制代码
在这里,我们调用 get_response
函数时,通过判断 delay
的值,手动触发了报错,以及添加了一个 return
返回值,且通过 MyThread
这个重写的 threading.Thread
来进行操作,获取到线程执行是否有异常,以及异常信息,以及函数返回的结果。
6、锁
如果有时候多个线程需要访问同一个全局变量,可能会导致数据不一致的问题,我们使用线程里的锁来控制对相关资源的访问,以此来确保线程安全,下面是一个示例:
import threading
counter = 0
lock_counter = 0
lock = threading.Lock()
def test_no_lock():
global counter
for i in range(1000000):
counter += 1
counter -= 1
def run_no_lock_thread():
t1 = threading.Thread(target=test_no_lock)
t2 = threading.Thread(target=test_no_lock)
t1.start()
t2.start()
t1.join()
t2.join()
def test_lock():
global lock_counter
for i in range(1000000):
lock.acquire()
lock_counter += 1
lock_counter -= 1
lock.release()
def run_lock_thread():
t1 = threading.Thread(target=test_lock)
t2 = threading.Thread(target=test_lock)
t1.start()
t2.start()
t1.join()
t2.join()
if __name__ == "__main__":
print("before: ", counter)
run_no_lock_thread()
print("after: ", counter)
print("before: ", lock_counter)
run_lock_thread()
print("after: ", lock_counter)
复制代码
在上面的示例中,通过比对两个加锁和不加锁的情况下全局变量的值,可以发现,多执行几次的话,可以看法 counter
的值并不总是为 0 的,而 lock_counter
的值的结果一直是 0。
我们通过这种加锁的方式来保证 lock_counter
的值是安全的。
锁的引入我们使用的是:
获取以及释放的方法是:
lock.acquire()
lock.release()
复制代码
在这里对于 lock.acquire()
获取锁,有两个参数,blocking
和 timeout
。
blocking
表示是否阻塞,默认为 True,表示如果锁没有被释放,则会一直阻塞到锁被其他线程释放,为 False 的话,则表示不阻塞地获取锁,获取到返回为 True
,没有获取到返回为 False
lock.acquire()
# 返回为 True,表示获取到锁
lock.acquire()
lock.acquire(blocking=True)
# 这两个操作都是阻塞获取锁,因为前一个操作已经获取到锁,所以这一步会被一直阻塞
is_lock = lock.acquire(blocking=False)
# 不阻塞的获取锁,如果拿到了锁并加锁,则返回为 True,否则返回为 False,表示没有拿到锁
复制代码
还有一个参数为 timeout
,表示 blocking
为True
,也就是阻塞的时候,等待的秒数之后,超时没有拿到锁,返回为 False
。
release()
表示为锁的释放,没有返回值,当前面获取锁之后,可以通过 lock.release()
的方式释放锁。
locked()
返回为布尔型数据,判断是否获得了锁。
7、线程池
我们可以通过线程池的方式来自动管理我们的线程,用到的模块是 concurrent.futures.ThreadPoolExecutor
以下是一个使用示例:
from concurrent.futures import ThreadPoolExecutor
import concurrent.futures
def get_response(url):
return True
with ThreadPoolExecutor(max_workers=8) as executor:
future_list = [executor.submit(get_response, base_url) for _ in range(20)]
for future in concurrent.futures.as_completed(future_list):
print(future.result()
复制代码
在这里,首先实例化一个线程池,然后输入 max_workers 参数,表示线程池开启的最大的线程数。
之后通过 submit()
方法向线程池提交两个任务,并返回一个 Future
对象,我们可以通过这个 Future
对象获取线程函数执行的各种情况,比如线程函数的返回结果,线程异常情况等。
在这里有一个 concurrent.futures.as_completed()
输入的是一个 Future
列表,会按照 任务完成的顺序
逐个返回已经完成的 Future
对象,这个完成,可以是线程函数执行完成,也可以是出现异常的结果。
接下来介绍一下 Future
对象的几个方法,在此之前,我们设置一下用于试验的基本数据:
from concurrent.futures import ThreadPoolExecutor
import concurrent.futures
import requests
import time
def get_response(url):
response = requests.get(url)
if url.endswith("2"):
1/0
return time.time()
base_url = "http://192.168.1.6:5000/test/"
executor = ThreadPoolExecutor(max_workers=2)
future_1 = executor.submit(get_response, base_url + "3")
future_2 = executor.submit(get_response, base_url + "2")
复制代码
其中,future_1 线程是正常运行,future_2 在线程里执行报错了。
1. result()
用于获取线程执行的函数返回的结果,如果线程还未完成,那么调用这个方法会阻塞,直到返回结果。
而如果线程里函数执行异常了,调用 result()
方法会重新抛出异常,希望程序正常运行的话,可以加上一个 try-except
操作,或者先通过后面的 exception()
方法进行判断。
我们调用 future_1.result()
可以正常返回,而 future_2.result()
会重新报异常。
2. done()
返回一个布尔值,表示线程是否已经完成:
future_1.done() # True
future_2.done() # True
复制代码
线程执行发生异常也属于完成。
3. exception()
如果线程执行发生异常,可以用这个方法来获取异常对象,如果没有异常就会返回 None
。
future_2.exception()
# ZeroDivisionError('division by zero')
复制代码
4. cancel()
尝试取消线程的执行,如果线程还没有开始执行,线程会被标记为取消状态,如果线程已经在执行中或者执行完毕,则不会被取消:
判断一个线程是否已经被取消,使用方法 cancelled()
,返回布尔型数据
5. running()
判断线程是否还在执行中,比如下面的操作:
future_3 = executor.submit(get_response, base_url + "65")
future_3.running() # True
复制代码
8、如何探索出最佳的线程池线程数量
对于线程池中线程的数量需要指定多少个,是一个需要探索的问题。
比如需要判断我们的任务是否是 IO 密集型的,比如网络请求等,这种的话可以设置相对较高,但也并非无限高,因为等待的过程中,线程间的切换也是一部分开销。
在执行真正的任务前,我们可以通过一小部分任务来进行性能测试,逐步调整线程池的线程数量,然后观察服务器的内存啊,CPU 利用率啊,以及整个操作的消耗时间等,来综合判断出比较适合的线程数量作为最终的结果。
评论