1
Python 多线程 VS 多进程(二)
发布于: 2021 年 05 月 20 日
共享变量
共享变量:当多个线程同时访问同一个变量的时候,会产生共享变量的问题
案例 11
import threading
sum = 0
loopSum = 1000000
def myAdd():
global sum, loopSum
for i in range(1, loopSum):
sum += 1
def myMinu():
global sum, loopSum
for i in range(1, loopSum):
sum -= 1
if __name__ == '__main__':
print("Starting ....{0}".format(sum))
# 开始多线程的实现,看执行结果是否一样
t1 = threading.Thread(target=myAdd, args=())
t2 = threading.Thread(target=myMinu, args=())
t1.start()
t2.start()
t1.join()
t2.join()
print("Done ....{0}".format(sum))
复制代码
Starting ....0
Done ....157577
- 解决变量:锁,信号灯
- 锁(Lock):
- 是一个标志,表示一个线程在占用一些资源
- 使用方法
- 上锁
- 使用共享变量,放心的用
- 取消锁,释放锁
- 案例12
复制代码
import threading
sum = 0
loopSum = 1000000
lock = threading.Lock()
def myAdd():
global sum, loopSum
for i in range(1, loopSum):
# 上锁,申请锁
lock.acquire()
sum += 1
# 释放锁
lock.release()
def myMinu():
global sum, loopSum
for i in range(1, loopSum):
lock.acquire()
sum -= 1
lock.release()
if __name__ == '__main__':
print("Starting ....{0}".format(sum))
# 开始多线程的实现,看执行结果是否一样
t1 = threading.Thread(target=myAdd, args=())
t2 = threading.Thread(target=myMinu, args=())
t1.start()
t2.start()
t1.join()
t2.join()
print("Done ....{0}".format(sum))
复制代码
Starting ....0
Done ....0
- 锁谁:哪个资源需要多个线程共享,锁哪个
- 理解锁:锁其实不是锁住谁,而是一个令牌
- 线程安全问题:
- 如果一个资源/变量,他对于多线程来讲,不用加锁也不会引起任何问题,则称为线程安全
- 线程不安全变量类型:list,set,dict
- 线程安全变量类型:queue(队列)
- 生产者消费者问题
- 一个模型,可以用来搭建消息队列
- queue是一个用来存放变量的数据结构,特点是先进先出,内部元素排队,可以理解成一个特殊的list
- 案例13
复制代码
#encoding=utf-8
import threading
import time
# Python2
# from Queue import Queue
# Python3
import queue
class Producer(threading.Thread):
def run(self):
global queue
count = 0
while True:
# qsize返回queue内容长度
if queue.qsize() < 1000:
for i in range(100):
count = count +1
msg = '生成产品'+str(count)
# put是往queue中放入一个值
queue.put(msg)
print(msg)
time.sleep(0.5)
class Consumer(threading.Thread):
def run(self):
global queue
while True:
if queue.qsize() > 100:
for i in range(3):
# get是从queue中取出一个值
msg = self.name + '消费了 '+queue.get()
print(msg)
time.sleep(1)
if __name__ == '__main__':
queue = queue.Queue()
for i in range(500):
queue.put('初始产品'+str(i))
for i in range(2):
p = Producer()
p.start()
for i in range(5):
c = Consumer()
c.start()
复制代码
生成产品1
生成产品2
生成产品3
生成产品4
生成产品5
生成产品6
生成产品7
生成产品8
生成产品9
生成产品10
生成产品11
生成产品12
生成产品13
生成产品14
生成产品15
生成产品16
生成产品17
生成产品18生成产品1
生成产品2
生成产品3
生成产品4
生成产品5
生成产品6
生成产品7
生成产品8
生成产品9
生成产品10
生成产品11
生成产品19
生成产品12
生成产品13
生成产品14
生成产品20
生成产品21
生成产品22
生成产品23
生成产品24
生成产品25
生成产品26
生成产品15
生成产品16
生成产品17
生成产品18
生成产品19
生成产品20Thread-3消费了 初始产品0
Thread-3消费了 初始产品1
Thread-3消费了 初始产品2
生成产品27
Thread-4消费了 初始产品3
Thread-4消费了 初始产品4
Thread-4消费了 初始产品5
生成产品21生成产品28
生成产品29
生成产品30
生成产品31
生成产品32
生成产品33
生成产品34
生成产品22
生成产品23
生成产品24
生成产品25
生成产品26
生成产品27Thread-5消费了 初始产品6
Thread-5消费了 初始产品7
Thread-5消费了 初始产品8
...
- 死锁问题,案例14
复制代码
import threading
import time
lock_1 = threading.Lock()
lock_2 = threading.Lock()
def func_1():
print("func_1 starting.........")
lock_1.acquire()
print("func_1 申请了 lock_1....")
time.sleep(2)
print("func_1 等待 lock_2.......")
lock_2.acquire()
print("func_1 申请了 lock_2.......")
lock_2.release()
print("func_1 释放了 lock_2")
lock_1.release()
print("func_1 释放了 lock_1")
print("func_1 done..........")
def func_2():
print("func_2 starting.........")
lock_2.acquire()
print("func_2 申请了 lock_2....")
time.sleep(4)
print("func_2 等待 lock_1.......")
lock_1.acquire()
print("func_2 申请了 lock_1.......")
lock_1.release()
print("func_2 释放了 lock_1")
lock_2.release()
print("func_2 释放了 lock_2")
print("func_2 done..........")
if __name__ == "__main__":
print("主程序启动..............")
t1 = threading.Thread(target=func_1, args=())
t2 = threading.Thread(target=func_2, args=())
t1.start()
t2.start()
t1.join()
t2.join()
print("主程序启动..............")
复制代码
主程序启动..............
func_1 starting.........
func_1 申请了 lock_1....
func_2 starting.........
func_2 申请了 lock_2....
func_1 等待 lock_2.......
func_2 等待 lock_1.......
- 锁的等待时间问题,案例15
复制代码
import threading
import time
lock_1 = threading.Lock()
lock_2 = threading.Lock()
def func_1():
print("func_1 starting.........")
lock_1.acquire(timeout=4)
print("func_1 申请了 lock_1....")
time.sleep(2)
print("func_1 等待 lock_2.......")
rst = lock_2.acquire(timeout=2)
if rst:
print("func_1 已经得到锁 lock_2")
lock_2.release()
print("func_1 释放了锁 lock_2")
else:
print("func_1 注定没申请到lock_2.....")
lock_1.release()
print("func_1 释放了 lock_1")
print("func_1 done..........")
def func_2():
print("func_2 starting.........")
lock_2.acquire()
print("func_2 申请了 lock_2....")
time.sleep(4)
print("func_2 等待 lock_1.......")
lock_1.acquire()
print("func_2 申请了 lock_1.......")
lock_1.release()
print("func_2 释放了 lock_1")
lock_2.release()
print("func_2 释放了 lock_2")
print("func_2 done..........")
if __name__ == "__main__":
print("主程序启动..............")
t1 = threading.Thread(target=func_1, args=())
t2 = threading.Thread(target=func_2, args=())
t1.start()
t2.start()
t1.join()
t2.join()
print("主程序结束..............")
复制代码
主程序启动..............
func_1 starting.........
func_1 申请了 lock_1....
func_2 starting.........
func_2 申请了 lock_2....
func_1 等待 lock_2.......
func_1 注定没申请到lock_2.....
func_1 释放了 lock_1
func_2 等待 lock_1.......
func_1 done..........
func_2 申请了 lock_1.......
func_2 释放了 lock_1
func_2 释放了 lock_2
func_2 done..........
主程序结束..............
- semphore
- 允许一个资源对多有几个多线程同时使用
- 案例16
复制代码
import threading
import time
# 参数定义最多几个线程同时使用资源
semaphore = threading.Semaphore(3)
def func():
if semaphore.acquire():
for i in range(5):
print(threading.currentThread().getName() + 'get semaphore')
time.sleep(15)
semaphore.release()
print(threading.currentThread().getName() + 'release semaphore')
for i in range(8):
t1 = threading.Thread(target=func)
t1.start()
复制代码
Thread-1get semaphore
Thread-1get semaphore
Thread-1get semaphore
Thread-1get semaphore
Thread-1get semaphore
Thread-2get semaphore
Thread-2get semaphore
Thread-2get semaphore
Thread-2get semaphore
Thread-2get semaphore
Thread-3get semaphore
Thread-3get semaphore
Thread-3get semaphore
Thread-3get semaphore
Thread-3get semaphore
Thread-2release semaphoreThread-1release semaphore
Thread-5get semaphore
Thread-5get semaphore
Thread-5get semaphore
Thread-5get semaphore
Thread-5get semaphore
Thread-4get semaphore
Thread-4get semaphore
Thread-4get semaphore
Thread-4get semaphore
Thread-4get semaphore
Thread-3release semaphoreThread-6get semaphore
Thread-6get semaphore
Thread-6get semaphore
Thread-6get semaphore
Thread-6get semaphore
...
- threading.Timer
- 案例17
复制代码
import threading
import time
def func():
print("I am running.........")
time.sleep(4)
print("I am done...")
if __name__ == "__main__":
t = threading.Timer(6, func)
t.start()
i = 0
while True:
print("{0}*****************".format(i))
time.sleep(3)
i += 1
复制代码
0*****************
1*****************
I am running.........
2*****************
3*****************
I am done...
4*****************
5*****************
6*****************
7*****************
8*****************
...
- Timer是利用多线程,在指定时间后启动一个功能
- 可重入锁
- 一个锁,可以被一个线程多次申请
- 主要解决递归调用的时候,需要申请锁的情况
- 案例18
复制代码
import threading
import time
class MyThread(threading.Thread):
def run(self):
global num
time.sleep(1)
if mutex.acquire(1):
num = num+1
msg = self.name+' set num to '+str(num)
print(msg)
mutex.acquire()
mutex.release()
mutex.release()
num = 0
mutex = threading.RLock()
def testTh():
for i in range(5):
t = MyThread()
t.start()
if __name__ == "__main__":
testTh()
复制代码
Thread-1 set num to 1
Thread-3 set num to 2
Thread-2 set num to 3
Thread-4 set num to 4
Thread-5 set num to 5
复制代码
线程替代方案
subprocess
完全跳过线程,使用进程
是派生进程的主要替代方案
python2.4 后引入
multiprocessiong
使用 threading 接口派生,使用子进程
允许为多核或者多 cpu 派生进程,接口跟 threading 非常相似
python2.6 后引入
concurrent.futures
新的异步执行模块
任务级别的操作
python3.2 后引入
多进程
进程间通讯(InterprocessCommunication, IPC)
进程之间无任何共享状态
进程的创建
直接生成 Process 实例对象,案例 19
import multiprocessing
from time import sleep, ctime
def clock(interval):
while True:
print("The time is %s" % ctime())
sleep(interval)
if __name__ == '__main__':
p = multiprocessing.Process(target=clock, args=(5, ))
p.start()
while True:
print("sleeping......")
sleep(1)
复制代码
sleeping......
The time is Tue Aug 13 19:47:41 2019
sleeping......
sleeping......
sleeping......
sleeping......
sleeping......
The time is Tue Aug 13 19:47:46 2019
sleeping......
sleeping......
sleeping......
sleeping......
...
- 派生子类,案例20
复制代码
import multiprocessing
from time import sleep, ctime
class ClockProcess(multiprocessing.Process):
'''
两个函数比较重要
1. init构造函数
2. run
'''
def __init__(self, interval):
super(ClockProcess, self).__init__()
self.interval = interval
def run(self):
while True:
print("The time is %s" % ctime())
sleep(self.interval)
if __name__ == '__main__':
p = ClockProcess(3)
p.start()
复制代码
The time is Tue Aug 13 19:48:49 2019
The time is Tue Aug 13 19:48:52 2019
The time is Tue Aug 13 19:48:55 2019
The time is Tue Aug 13 19:48:58 2019
The time is Tue Aug 13 19:49:01 2019
The time is Tue Aug 13 19:49:04 2019
...
复制代码
在 os 中查看 pid,ppid 以及他们的关系
案例 21
from multiprocessing import Process
import os
def info(title):
print(title)
print("module name:", __name__)
# 得到父亲进程的id
print("parent process:", os.getppid())
# 得到本身进程的id
print("process id:", os.getpid())
def f(name):
info('function f')
print('hello', name)
if __name__ == '__main__':
info('main line')
p = Process(target=f, args=('bob', ))
p.start()
p.join()
复制代码
main line
module name: __main__
parent process: 11900
process id: 18832
function f
module name: __mp_main__
parent process: 18832
process id: 20868
hello bob
复制代码
生产者消费者模型
JoinableQueue
案例 22
import multiprocessing
from time import ctime
def consumer(input_q):
print("Into consumer:", ctime)
while True:
# 处理项
item = input_q.get()
print('pull', item, 'out of q')
input_q.task_done() # 发出信号通知任务完成
print("Out of consumer:", ctime()) ## 此句未执行,因为q.join()收集到四个task_done()信号后,主进程启动,未等到print此句完成,程序就结束了
def producer(sequence, output_q):
print("Into procuder:", ctime())
for item in sequence:
output_q.put(item)
print('put', item, 'into q')
print('Out of procuder', ctime())
# 建立进程
if __name__ == '__main__':
q = multiprocessing.JoinableQueue()
# 进行消费者进程
cons_p = multiprocessing.Process(target=consumer, args=(q, ))
cons_p.daemon = True
cons_p.start()
# 生产多个项,sequence代表要发送给消费者的项序列
# 在实践中,这可能是生成器的输出或通过一些其他地方生产出来
sequence = [1, 2, 3, 4]
producer(sequence, q)
# 等待所有项被处理
q.join()
复制代码
Into procuder: Tue Aug 13 19:50:38 2019
put 1 into q
put 2 into q
put 3 into q
put 4 into q
Out of procuder Tue Aug 13 19:50:38 2019
Into consumer: <built-in function ctime>
pull 1 out of q
pull 2 out of q
pull 3 out of q
pull 4 out of q
- 队列中哨兵的使用,案例23
复制代码
import multiprocessing
from time import ctime
# 设置哨兵问题
def consumer(input_q):
print("Into consumer:", ctime())
while True:
item = input_q.get()
if item is None:
break
print("pull", item, "out of q")
print ("Out of consumer:", ctime()) ## 此句执行完成,再转入主进程
def producer(sequence, output_q):
print ("Into procuder:", ctime())
for item in sequence:
output_q.put(item)
print ("put", item, "into q")
print ("Out of procuder:", ctime())
if __name__ == '__main__':
q = multiprocessing.Queue()
cons_p = multiprocessing.Process(target = consumer, args = (q,))
cons_p.start()
sequence = [1,2,3,4]
producer(sequence, q)
q.put(None)
cons_p.join()
复制代码
Into procuder: Tue Aug 13 19:51:23 2019
put 1 into q
put 2 into q
put 3 into q
put 4 into q
Out of procuder: Tue Aug 13 19:51:23 2019
Into consumer: Tue Aug 13 19:51:24 2019
pull 1 out of q
pull 2 out of q
pull 3 out of q
pull 4 out of q
Out of consumer: Tue Aug 13 19:51:24 2019
- 哨兵的改进,案例24
复制代码
import multiprocessing
from time import ctime
def consumer(input_q):
print ("Into consumer:", ctime())
while True:
item = input_q.get()
if item is None:
break
print("pull", item, "out of q")
print ("Out of consumer:", ctime())
def producer(sequence, output_q):
for item in sequence:
print ("Into procuder:", ctime())
output_q.put(item)
print ("Out of procuder:", ctime())
if __name__ == '__main__':
q = multiprocessing.Queue()
cons_p1 = multiprocessing.Process (target = consumer, args = (q,))
cons_p1.start()
cons_p2 = multiprocessing.Process (target = consumer, args = (q,))
cons_p2.start()
sequence = [1,2,3,4]
producer(sequence, q)
q.put(None)
q.put(None)
cons_p1.join()
cons_p2.join()
复制代码
Into procuder: Tue Aug 13 19:52:08 2019
Out of procuder: Tue Aug 13 19:52:08 2019
Into procuder: Tue Aug 13 19:52:08 2019
Out of procuder: Tue Aug 13 19:52:08 2019
Into procuder: Tue Aug 13 19:52:08 2019
Out of procuder: Tue Aug 13 19:52:08 2019
Into procuder: Tue Aug 13 19:52:08 2019
Out of procuder: Tue Aug 13 19:52:08 2019
Into consumer: Tue Aug 13 19:52:08 2019
pull 1 out of q
pull 2 out of q
pull 3 out of q
pull 4 out of q
Out of consumer: Tue Aug 13 19:52:08 2019
Into consumer: Tue Aug 13 19:52:08 2019
Out of consumer: Tue Aug 13 19:52:08 2019
复制代码
划线
评论
复制
发布于: 2021 年 05 月 20 日阅读数: 8
版权声明: 本文为 InfoQ 作者【若尘】的原创文章。
原文链接:【http://xie.infoq.cn/article/d1079cf411bfe35ae4bb9d448】。文章转载请联系作者。
若尘
关注
还未添加个人签名 2021.01.11 加入
还未添加个人简介
评论