在线预览:.并发编程-线程篇.html
示例代码:.concurrent/Thread
终于说道线程了,心酸啊,进程还有点东西下次接着聊,这周4天外出,所以注定发文少了+_+
用过Java或者Net的重点都在线程这块,Python的重点其实在上篇,但线程自有其独到之处~比如资源共享(更轻量级)
这次采用循序渐进的方式讲解,先使用,再深入,然后扩展,最后来个案例
,呃.呃.呃.先这样计划~欢迎纠正错误
官方文档:.html
进程是由若干线程组成的(一个进程至少有一个线程)
用法和Process
差不多,咱先看个案例:Thread(target=test, args=(i, ))
import os
from threading import Thread, current_thread def test(name):# current_thread()返回当前线程的实例thread_name = current_thread().name # 获取线程名print(f"[编号:{name}],ThreadName:{thread_name}nPID:{os.getpid()},PPID:{os.getppid()}")def main():t_list = [Thread(target=test, args=(i, )) for i in range(5)]for t in t_list:t.start() # 批量启动for t in t_list:t.join() # 批量回收# 主线程print(f"[Main]ThreadName:{current_thread().name}nPID:{os.getpid()},PPID:{os.getppid()}")if __name__ == '__main__':main()
输出:(同一个进程ID)
[编号:0],ThreadName:Thread-1
PID:20533,PPID:19830
[编号:1],ThreadName:Thread-2
PID:20533,PPID:19830
[编号:2],ThreadName:Thread-3
PID:20533,PPID:19830
[编号:3],ThreadName:Thread-4
PID:20533,PPID:19830
[编号:4],ThreadName:Thread-5
PID:20533,PPID:19830
[Main]ThreadName:MainThread
PID:22636,PPID:19830
注意一点:Python里面的线程是Posix Thread
如果想给线程设置一个Div的名字呢?:
from threading import Thread, current_threaddef test():# current_thread()返回当前线程的实例print(f"ThreadName:{current_thread().name}")def main():t1 = Thread(target=test, name="小明")t2 = Thread(target=test)t1.start()t2.start()t1.join()t2.join()# 主线程print(f"[Main],ThreadName:{current_thread().name}")if __name__ == '__main__':main()
输出:(你指定有特点的名字,没指定就使用默认命令【联想古时候奴隶名字都是编号,主人赐名就有名了】)
ThreadName:小明
ThreadName:Thread-1
[Main],ThreadName:MainThread
类的方式创建线程
from threading import Threadclass MyThread(Thread):def __init__(self, name):# 设个坑,你可以自行研究下super().__init__() # 放在后面就报错了self.name = namedef run(self):print(self.name)def main():t = MyThread(name="小明")t.start()t.join()if __name__ == '__main__':main()
输出:(和Thread初始化的name冲突了【变量名得注意哦】)
小明
from multiprocessing.dummy import Pool as ThreadPool, current_processdef test(i):# 本质调用了:threading.current_threadprint(f"[编号{i}]{current_process().name}")def main():p = ThreadPool()for i in range(5):p.apply_async(test, args=(i, ))p.close()p.join()print(f"{current_process().name}")if __name__ == '__main__':main()
输出:
[编号0]Thread-3
[编号1]Thread-4
[编号3]Thread-2
[编号2]Thread-1
[编号4]Thread-3
MainThread
对上面代码,项目里面一般都会这么优化:(并行这块线程后面会讲,不急)
from multiprocessing.dummy import Pool as ThreadPool, current_processdef test(i):# 源码:current_process = threading.current_threadprint(f"[编号{i}]{current_process().name}")def main():p = ThreadPool()p.map_async(test, range(5))p.close()p.join()print(f"{current_process().name}")if __name__ == '__main__':main()
输出:
[编号0]Thread-2
[编号1]Thread-4
[编号2]Thread-3
[编号4]Thread-2
[编号3]Thread-1
MainThread
代码改动很小(循环换成了map)性能提升很明显(密集型操作)
Thread初始化参数:
Thread实例对象的方法:
threading模块提供的一些方法:
看一个小案例:
import time
from threading import Thread, active_countdef test1():print("test1")time.sleep(1)print("test1 ok")def test2():print("test2")time.sleep(2)print("test2 ok")def main():t1 = Thread(target=test1)t2 = Thread(target=test2, daemon=True)t1.start()t2.start()t1.join()print(active_count())print(t1.is_alive)print(t2.is_alive)# 除非加这一句才等daemon线程,不然主线程退出的时候后台线程就退出了# t2.join()if __name__ == '__main__':main()
下次就以multiprocessing.dummy
模块为例了,API和threading
几乎一样,进行了一些并发的封装,性价比更高
其实以前的Linux中
是没有线程这个概念的,Windows
程序员经常使用线程,这一看~方便啊,然后可能是当时程序员偷懒了,就把进程模块改了改(这就是为什么之前说Linux下的多进程编程其实没有Win下那么“重量级”),弄了个精简版进程==>线程
(内核是分不出进程和线程
的,反正PCB
个数都是一样)
多线程和多进程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响,而多线程中,所有变量都由所有线程共享(全局变量和堆 ==> 线程间共享。进程的栈 ==> 线程平分而独占)
还记得通过current_thread()
获取的线程信息吗?难道线程也没个id啥的?一起看看:(通过ps -Lf pid 来查看LWP
)
回顾:进程共享的内容:(回顾:.html)
线程之间共享数据的确方便,但是也容易出现数据混乱的现象,来看个例子:
from multiprocessing.dummy import threadingnum = 0 # def global numdef test(i):print(f"子进程:{i}")global numfor i in range(100000):num += 1def main():p_list = [threading.Thread(target=test, args=(i, )) for i in range(5)]for i in p_list:i.start()for i in p_list:i.join()print(num) # 应该是500000,发生了数据混乱,结果少了很多if __name__ == '__main__':main()
输出:(应该是500000
,发生了数据混乱,只剩下358615
)
子进程:0
子进程:1
子进程:2
子进程:3
子进程:4
452238
共享资源+CPU调度==>数据混乱==解决==>线程同步
这时候Lock
就该上场了
互斥锁是实现线程同步最简单的一种方式,读写都加锁(读写都会串行)
先看看上面例子怎么解决调:
from multiprocessing.dummy import threading, Locknum = 0 # def global numdef test(i, lock):print(f"子进程:{i}")global numfor i in range(100000):with lock:num += 1def main():lock = Lock()p_list = [threading.Thread(target=test, args=(i, lock)) for i in range(5)]for i in p_list:i.start()for i in p_list:i.join()print(num)if __name__ == '__main__':main()
输出:time python3 1.thread.2.py
子进程:0
子进程:1
子进程:2
子进程:3
子进程:4
500000real 0m2.846s
user 0m1.897s
sys 0m3.159s
lock设置为全局或者局部,性能几乎一样。循环换成map后性能有所提升(测试案例在Code中)
from multiprocessing.dummy import Pool as ThreadPool, Locknum = 0 # def global num
lock = Lock()def test(i):print(f"子进程:{i}")global numglobal lockfor i in range(100000):with lock:num += 1def main():p = ThreadPool()p.map_async(test, list(range(5)))p.close()p.join()print(num)if __name__ == '__main__':main()
输出:
time python3 1.thread.2.py
子进程:0
子进程:1
子进程:3
子进程:2
子进程:4
500000real 0m2.468s
user 0m1.667s
sys 0m2.644s
本来多线程访问共享资源的时候可以并行,加锁后就部分串行了(没获取到的线程就阻塞等了)
【项目中可以多次加锁,每次加锁只对修改部分加(尽量少的代码) 】(以后会说协程和Actor模型)
补充:以前都是这么写的,现在支持with
托管了(有时候还会用到,所以了解下):【net是直接lock大括号包起来
】
#### 以前写法:
lock.acquire() # 获取锁
try:num += 1
lease() # 释放锁#### 等价简写
with lock:num += 1
扩展知识:(GIL在扩展篇会详说)
看个场景:小明欠小张2000,欠小周5000,现在需要同时转账给他们:(规定:几次转账加几次锁)
小明啥也没管,直接撸起袖子就写Code了:(错误Code示意)
from multiprocessing.dummy import Pool as ThreadPool, Lockxiaoming = 8000
xiaozhang = 3000
xiaozhou = 5000def test(lock):global xiaomingglobal xiaozhangglobal xiaozhou# 小明想一次搞定:with lock:# 小明转账2000给小张xiaoming -= 2000xiaozhang += 2000with lock:# 小明转账5000给小周xiaoming -= 5000xiaozhou += 5000def main():print(f"[还钱前]小明{xiaoming},小张{xiaozhang},小周{xiaozhou}")lock = Lock()p = ThreadPool()p.apply_async(test, args=(lock, ))p.close()p.join()print(f"[还钱后]小明{xiaoming},小张{xiaozhang},小周{xiaozhou}")if __name__ == '__main__':main()
小明写完代码就出去了,这可把小周和小张等急了,打了N个电话来催,小明心想啥情况?
一看代码楞住了,改了改代码,轻轻松松把钱转出去了:
from multiprocessing.dummy import Pool as ThreadPool, Lockxiaoming = 8000
xiaozhang = 3000
xiaozhou = 5000# 小明转账2000给小张
def a_to_b(lock):global xiaomingglobal xiaozhangwith lock:xiaoming -= 2000xiaozhang += 2000# 小明转账5000给小周
def a_to_c(lock):global xiaomingglobal xiaozhouwith lock:xiaoming -= 5000xiaozhou += 5000def main():print(f"[还钱前]小明{xiaoming},小张{xiaozhang},小周{xiaozhou}")lock = Lock()p = ThreadPool()p.apply_async(a_to_b, args=(lock, ))p.apply_async(a_to_c, args=(lock, ))p.close()p.join()print(f"[还钱后]小明{xiaoming},小张{xiaozhang},小周{xiaozhou}")if __name__ == '__main__':main()
输出:
[还钱前]小明8000,小张3000,小周5000
[还钱后]小明1000,小张5000,小周10000
就这么算了吗?不不不,不符合小明性格,于是小明研究了下,发现~还有个递归锁RLock
呢,正好解决他的问题:
from multiprocessing.dummy import Pool as ThreadPool, RLock # 就把这边换了下xiaoming = 8000
xiaozhang = 3000
xiaozhou = 5000def test(lock):global xiaomingglobal xiaozhangglobal xiaozhou# 小明想一次搞定:with lock:# 小明转账2000给小张xiaoming -= 2000xiaozhang += 2000with lock:# 小明转账5000给小周xiaoming -= 5000xiaozhou += 5000def main():print(f"[还钱前]小明{xiaoming},小张{xiaozhang},小周{xiaozhou}")lock = RLock() # 就把这边换了下p = ThreadPool()p.apply_async(test, args=(lock, ))p.close()p.join()print(f"[还钱后]小明{xiaoming},小张{xiaozhang},小周{xiaozhou}")if __name__ == '__main__':main()
RLock内部维护着一个Lock和一个counter
变量,counter记录了acquire
的次数,从而使得资源可以被多次require
。直到一个线程所有的acquire都被release
,其他的线程才能获得资源
小明想到了之前说的(互斥锁Lock
读写都加锁)就把代码拆分研究了下:
print("[开始]小明转账2000给小张")
lock.acquire() # 获取锁
xiaoming -= 2000
xiaozhang += 2000print("[开始]小明转账5000给小周")
lock.acquire() # 获取锁(互斥锁第二次加锁)
xiaoming -= 5000
xiaozhou += 5000
lease() # 释放锁
print("[结束]小明转账5000给小周")lease() # 释放锁
print("[开始]小明转账2000给小张")
输出发现:(第二次加锁的时候,变成阻塞等了【死锁】)
[还钱前]小明8000,小张3000,小周5000
[开始]小明转账2000给小张
[开始]小明转账5000给小周
这种方式,Python提供的RLock就可以解决了
看个场景:小明和小张需要流水帐,经常互刷~小明给小张转账1000,小张给小明转账1000
一般来说,有几个共享资源就加几把锁(小张、小明就是两个共享资源,所以需要两把Lock
)
先描述下然后再看代码:
正常流程 小明给小张转1000:小明自己先加个锁==>小明-1000==>获取小张的锁==>小张+1000==>转账完毕
死锁情况 小明给小张转1000:小明自己先加个锁==>小明-1000==>准备获取小张的锁。可是这时候小张准备转账给小明,已经把自己的锁获取了,在等小明的锁(两个人相互等,于是就一直死锁了)
代码模拟一下过程:
from time import sleep
from multiprocessing.dummy import Pool as ThreadPool, Lockxiaoming = 5000
xiaozhang = 8000
m_lock = Lock() # 小明的锁
z_lock = Lock() # 小张的锁# 小明转账1000给小张
def a_to_b():global xiaomingglobal xiaozhangglobal m_lockglobal z_lockwith m_lock:xiaoming -= 1000sleep(0.01)with z_lock:xiaozhang += 1000# 小张转账1000给小明
def b_to_a():global xiaomingglobal xiaozhangglobal m_lockglobal z_lockwith z_lock:xiaozhang -= 1000sleep(0.01)with m_lock:xiaoming += 1000def main():print(f"[还钱前]小明{xiaoming},小张{xiaozhang}")p = ThreadPool()p.apply_async(a_to_b)p.apply_async(b_to_a)p.close()p.join()print(f"[还钱后]小明{xiaoming},小张{xiaozhang}")if __name__ == '__main__':main()
输出:(卡在这边了)
[转账前]小明5000,小张8000
项目中像这类的情况,一般都是这几种解决方法:(还有其他解决方案,后面会继续说)
Lock(False)
)比如上面的情况,我们如果规定,不管是谁先转账,先从小明开始,然后再小张,那么就没问题了。或者谁钱多就谁(权重高的优先)
from time import sleep
from multiprocessing.dummy import Pool as ThreadPool, Lockxiaoming = 5000
xiaozhang = 8000
m_lock = Lock() # 小明的锁
z_lock = Lock() # 小张的锁# 小明转账1000给小张
def a_to_b():global xiaomingglobal xiaozhangglobal m_lockglobal z_lock# 以上次代码为例,这边只修改了这块with z_lock: # 小张权重高,大家都先获取小张的锁xiaozhang += 1000sleep(0.01)with m_lock:xiaoming -= 1000# 小张转账1000给小明
def b_to_a():global xiaomingglobal xiaozhangglobal m_lockglobal z_lockwith z_lock:xiaozhang -= 1000sleep(0.01)with m_lock:xiaoming += 1000def main():print(f"[转账前]小明{xiaoming},小张{xiaozhang}")p = ThreadPool()p.apply_async(a_to_b)p.apply_async(b_to_a)p.close()p.join()print(f"[转账后]小明{xiaoming},小张{xiaozhang}")if __name__ == '__main__':main()
输出:
[转账前]小明5000,小张8000
[转账后]小明5000,小张8000
PS:lock.locked()
:判断 lock 当前是否上锁,如果上锁,返回True,否则返回False【上锁失败时候的处理】
条件变量一般都不是锁,能阻塞线程,从而减少不必要的竞争,Python内置了RLock
(不指定就是RLock)
看看源码:
class Condition:"""
实现条件变量的类。
条件变量允许一个或多个线程等到另一个线程通知它们为止
如果给出了lock参数而不是None,那必须是Lock或RLock对象作底层锁。
否则,一个新的RLock对象被创建并用作底层锁。
"""def __init__(self, lock=None):if lock is None:lock = RLock()self._lock = lock# 设置lock的acquire()和release()方法self.acquire = lease = lease
再看看可不可以进行with托管:(支持)
def __enter__(self):return self._lock.__enter__()def __exit__(self, *args):return self._lock.__exit__(*args)
看个生产消费者的简单例子:(生产完就通知消费者)
from multiprocessing.dummy import Pool as ThreadPool, Conditions_list = []
con = Condition()def Shop(i):global conglobal s_list# 加锁保护共享资源for x in range(5):with con:s_list.append(x)print(f"[生产者{i}]生产商品{x}")ify_all() # 通知消费者有货了def User(i):global conglobal s_listwhile True:with con:if s_list:print(f"列表商品:{s_list}")name = s_list.pop() # 消费商品print(f"[消费者{i}]消费商品{name}")print(f"列表剩余:{s_list}")else:con.wait()def main():p = ThreadPool()# 两个生产者p.map_async(Shop, range(2))# 五个消费者p.map_async(User, range(5))p.close()p.join()if __name__ == '__main__':main()
输出:(list之类的虽然可以不加global标示,但是为了后期维护方便,建议加上)
[生产者0]生产商品0
[生产者0]生产商品1
列表商品:[0, 1]
[消费者0]消费商品1
列表剩余:[0]
列表商品:[0]
[消费者0]消费商品0
列表剩余:[]
[生产者0]生产商品2
列表商品:[2]
[消费者1]消费商品2
列表剩余:[]
[生产者0]生产商品3
[生产者1]生产商品0
[生产者0]生产商品4
列表商品:[3, 0, 4]
[消费者1]消费商品4
列表剩余:[3, 0]
[生产者1]生产商品1
[生产者1]生产商品2
[生产者1]生产商品3
[生产者1]生产商品4
列表商品:[3, 0, 1, 2, 3, 4]
[消费者2]消费商品4
列表剩余:[3, 0, 1, 2, 3]
列表商品:[3, 0, 1, 2, 3]
[消费者0]消费商品3
列表剩余:[3, 0, 1, 2]
列表商品:[3, 0, 1, 2]
[消费者1]消费商品2
列表剩余:[3, 0, 1]
列表商品:[3, 0, 1]
[消费者3]消费商品1
列表剩余:[3, 0]
列表商品:[3, 0]
[消费者3]消费商品0
列表剩余:[3]
列表商品:[3]
[消费者3]消费商品3
列表剩余:[]
通知方法:
记得当时在分析multiprocessing.Queue
源码的时候,有提到过(点我回顾)
同进程的一样,semaphore
管理一个内置的计数器,每当调用acquire()
时内置函数-1
,每当调用release()
时内置函数+1
通俗讲就是:在互斥锁的基础上封装了下,实现一定程度的并行
举个例子,以前使用互斥锁的时候:(厕所就一个坑位,必须等里面的人出来才能让另一个人上厕所)
使用信号量之后:厕所坑位增加到5个(自己指定),这样可以5个人一起上厕所了==>实现了一定程度的并发
举个例子:(Python在语法这点特别爽,不用你记太多异同,功能差不多基本上代码也就差不多)
from time import sleep
from multiprocessing.dummy import Pool as ThreadPool, Semaphoresem = Semaphore(5) # 限制最大连接数为5def goto_wc(i):global semwith sem:print(f"[线程{i}]上厕所")sleep(0.1)def main():p = ThreadPool()p.map_async(goto_wc, range(50))p.close()p.join()if __name__ == '__main__':main()
输出:
可能看了上节回顾的会疑惑:源码里面明明是BoundedSemaphore
,搞啥呢?
其实BoundedSemaphore
就比Semaphore
多了个在调用release()
时检查计数器的值是否超过了计数器的初始值,如果超过了将抛出一个异常
以上一个案例说事:你换成BoundedSemaphore
和上面效果一样==>sem = BoundedSemaphore(5)
Semaphore
补充¶之前有人问Semaphore
信号量在项目中有什么应用?(⊙o⊙)…额
,这个其实从概念就推出场景了,控制并发嘛~举个例子:
( ⊙ o ⊙ )捂脸
避免触发反爬虫的一种方式
,其他部分后面会逐步引入)这些虚的说完了,来个控制并发数的案例,然后咱们就继续并发编程的衍生了:
import time
from multiprocessing.dummy import threading, Semaphoreclass MyThread(threading.Thread):def __init__(self, id, sem):super().__init__()self.__id = idself.__sem = semdef run(self):self.__sem.acquire() # 获取self.api_test()def api_test(self):"""模拟api请求"""time.sleep(1)print(f"id={self.__id}")self.__lease() # 释放def main():sem = Semaphore(10) # 控制并发数t_list = [MyThread(i, sem) for i in range(1000)]for t in t_list:t.start()for t in t_list:t.join()if __name__ == '__main__':main()
输出图示:
运行分析:
性能全图:
在多线程程序中,死锁问题很大一部分是由于线程同时获取多个锁造成的,eg:一个线程获取了第一个锁,然后在获取第二个锁的 时候发生阻塞,那么这个线程就可能阻塞其他线程的执行,从而导致整个程序假死。
解决死锁问题的一种方案是为程序中的每一个锁分配一个唯一的id,然后只允许按照升序规则来使用多个锁,当时举了个小明小张转账的简单例子,来避免死锁,这次咱们再看一个案例:(这个规则使用上下文管理器非常简单)
先看看源码,咱们怎么使用:
# 装饰器方法
def contextmanager(func):"""
方法格式
@contextmanager
def some_generator(<arguments>):
<setup>
try:
yield <value>
finally:
<cleanup> 然后就可以直接使用with托管了
with some_generator(<arguments>) as <variable>:
<body>
"""@wraps(func)def helper(*args, **kwds):return _GeneratorContextManager(func, args, kwds)return helper
翻译成代码就是这样了:(简化)
from contextlib import contextmanager # 引入上下文管理器@contextmanager
def lock_manager(*args):# 先排个序(按照id排序)args = sorted(args, key=lambda x: id(x))try:for lock in args:lock.acquire()yieldfinally:# 先释放最后加的锁(倒序释放)for lock in reversed(args):lease()
基础忘记了可以点我(lambda)
以上面小明小张转账案例为例子:(不用再管锁顺序之类的了,直接全部丢进去:with lock_manager(...)
)
from contextlib import contextmanager # 引入上下文管理器
from multiprocessing.dummy import Pool as ThreadPool, Lock@contextmanager
def lock_manager(*args):# 先排个序(按照id排序)args = sorted(args, key=lambda x: id(x))try:for lock in args:lock.acquire()yieldfinally:# 先释放最后加的锁(倒序释放)for lock in reversed(args):lease()xiaoming = 5000
xiaozhang = 8000
m_lock = Lock() # 小明的锁
z_lock = Lock() # 小张的锁# 小明转账1000给小张
def a_to_b():global xiaomingglobal xiaozhangglobal m_lockglobal z_lockprint(f"[转账前]小明{xiaoming},小张{xiaozhang}")with lock_manager(m_lock, z_lock):xiaoming -= 1000xiaozhang += 1000print(f"[转账后]小明{xiaoming},小张{xiaozhang}")# 小张转账1000给小明
def b_to_a():global xiaomingglobal xiaozhangglobal m_lockglobal z_lockprint(f"[转账前]小明{xiaoming},小张{xiaozhang}")with lock_manager(m_lock, z_lock):xiaozhang -= 1000xiaoming += 1000print(f"[转账后]小明{xiaoming},小张{xiaozhang}")def main():print(f"[互刷之前]小明{xiaoming},小张{xiaozhang}")p = ThreadPool()for _ in range(5):p.apply_async(a_to_b)p.apply_async(b_to_a)p.close()p.join()print(f"[互刷之后]小明{xiaoming},小张{xiaozhang}")if __name__ == '__main__':main()
输出:
[互刷之前]小明5000,小张8000
[转账前]小明5000,小张8000
[转账前]小明5000,小张8000
[转账后]小明4000,小张9000
[转账前]小明5000,小张8000
[转账后]小明5000,小张8000
[转账前]小明5000,小张8000
[转账前]小明4000,小张9000
[转账后]小明4000,小张9000
[转账后]小明5000,小张8000
[转账前]小明5000,小张8000
[转账后]小明4000,小张9000
[转账前]小明4000,小张9000
[转账前]小明4000,小张9000
[转账后]小明5000,小张8000
[转账前]小明5000,小张8000
[转账后]小明4000,小张9000
[转账后]小明5000,小张8000
[转账前]小明5000,小张8000
[转账后]小明4000,小张9000
[转账后]小明5000,小张8000
[互刷之后]小明5000,小张8000
再来个验证,在他们互刷的过程中,小潘还了1000元给小明
from time import sleep
from contextlib import contextmanager # 引入上下文管理器
from multiprocessing.dummy import Pool as ThreadPool, Lock@contextmanager
def lock_manager(*args):# 先排个序(按照id排序)args = sorted(args, key=lambda x: id(x))try:for lock in args:lock.acquire()yieldfinally:# 先释放最后加的锁(倒序释放)for lock in reversed(args):lease()xiaopan = 9000
xiaoming = 5000
xiaozhang = 8000
m_lock = Lock() # 小明的锁
z_lock = Lock() # 小张的锁
p_lock = Lock() # 小潘的锁# 小明转账1000给小张
def a_to_b():global xiaomingglobal xiaozhangglobal m_lockglobal z_lockprint(f"[转账前]小明{xiaoming},小张{xiaozhang}")with lock_manager(m_lock, z_lock):xiaoming -= 1000xiaozhang += 1000print(f"[转账后]小明{xiaoming},小张{xiaozhang}")# 小张转账1000给小明
def b_to_a():global xiaomingglobal xiaozhangglobal m_lockglobal z_lockprint(f"[转账前]小明{xiaoming},小张{xiaozhang}")with lock_manager(m_lock, z_lock):xiaozhang -= 1000xiaoming += 1000print(f"[转账后]小明{xiaoming},小张{xiaozhang}")# 小潘还1000给小明
def c_to_a():global xiaomingglobal xiaopanglobal m_lockglobal p_lockprint(f"[转账前]小明{xiaoming},小潘{xiaopan}")with lock_manager(m_lock, p_lock):xiaopan -= 1000xiaoming += 1000print(f"[转账后]小明{xiaoming},小潘{xiaopan}")def main():print(f"[互刷之前]小明{xiaoming},小张{xiaozhang},小潘{xiaopan}")p = ThreadPool()for _ in range(5):p.apply_async(a_to_b)# 在他们互刷的过程中,小潘还了1000元给小明if _ == 3:p.apply_async(c_to_a)p.apply_async(b_to_a)p.close()p.join()print(f"[互刷之后]小明{xiaoming},小张{xiaozhang},小潘{xiaopan}")if __name__ == '__main__':main()
输出:
[互刷之前]小明5000,小张8000,小潘9000
[转账前]小明5000,小张8000
[转账前]小明5000,小张8000
[转账后]小明4000,小张9000
[转账前]小明5000,小张8000
[转账前]小明4000,小张9000
[转账后]小明5000,小张8000
[转账前]小明5000,小张8000
[转账后]小明4000,小张9000
[转账后]小明5000,小张8000
[转账前]小明5000,小张8000
[转账后]小明4000,小张9000
[转账前]小明4000,小张9000
[转账前]小明4000,小潘9000 # 注意下这个
[转账后]小明5000,小张8000
[转账前]小明5000,小张8000
[转账后]小明4000,小张9000
[转账后]小明5000,小潘8000 # 注意下这个
[转账前]小明5000,小张9000
[转账后]小明6000,小张8000
[转账后]小明5000,小张9000
[转账前]小明6000,小张8000
[转账后]小明6000,小张8000
[互刷之后]小明6000,小张8000,小潘8000
from contextlib import contextmanager
from multiprocessing.dummy import threading # or import threading# ThreadLocal 下节会说
_local = threading.local()@contextmanager
def acquire(*args):# 以id将锁进行排序args = sorted(args, key=lambda x: id(x))# 确保不违反以前获取的锁顺序acquired = getattr(_local, 'acquired', [])if acquired and max(id(lock) for lock in acquired) >= id(args[0]):raise RuntimeError('锁顺序有问题')# 获取所有锁d(args)_local.acquired = acquired # ThreadLocal:每个线程独享acquired# 固定格式try:for lock in args:lock.acquire()yieldfinally:# 逆向释放锁资源for lock in reversed(args):lease()# 把释放掉的锁给删了del acquired[-len(args):]
先看看场景:五个外国哲学家到中国来吃饭了,因为不了解行情,每个人只拿了一双筷子,然后点了一大份的面。碍于面子,他们不想再去拿筷子了,于是就想通过脑子来解决这个问题。
每个哲学家吃面都是需要两只筷子的,这样问题就来了:(只能拿自己两手边的筷子)
把现实问题转换成代码就是:
有了上面基础这个就简单了,使用死锁避免机制解决哲学家就餐问题的实现:(不用再操心锁顺序了)
from contextlib import contextmanager # 引入上下文管理器
from multiprocessing.dummy import Pool as ThreadPool, Lock, current_process as current_thread# 使用简化版,便于你们理解
@contextmanager
def lock_manager(*args):# 先排个序(按照id排序)args = sorted(args, key=lambda x: id(x))try:# 依次加锁for lock in args:lock.acquire()yieldfinally:# 先释放最后加的锁(倒序释放)for lock in reversed(args):lease()#########################################def eat(l_lock, r_lock):while True:with lock_manager(l_lock, r_lock):# 获取当前线程的名字print(f"{current_thread().name},正在吃面")sleep(0.5)def main():resource = 5 # 5个筷子,5个哲学家locks = [Lock() for i in range(resource)] # 几个资源几个锁p = ThreadPool(resource) # 让线程池里面有5个线程(默认是cup核数)for i in range(resource):# 抢左手筷子(locks[i])和右手的筷子(locks[(i + 1) % resource])# 举个例子更清楚:i=0 ==> 0,1;i=4 ==> 4,0p.apply_async(eat, args=(locks[i], locks[(i + 1) % resource]))p.close()p.join()if __name__ == '__main__':main()
输出图示:
PS:这个一般都是操作系统的算法,了解下就可以了,上面哲学家吃面用的更多一点(欢迎投稿~)
我们可以把操作系统看作是银行家,操作系统管理的资源相当于银行家管理的资金,进程向操作系统请求分配资源相当于用户向银行家贷款。 为保证资金的安全,银行家规定:
操作系统按照银行家制定的规则为进程分配资源,当进程首次申请资源时,要测试该进程对资源的最大需求量,如果系统现存的资源可以满足它的最大需求量则按当前的申请量分配资源,否则就推迟分配。当进程在执行中继续申请资源时,先测试该进程本次申请的资源数是否超过了该资源所剩余的总量。若超过则拒绝分配资源,若能满足则按当前的申请量分配资源,否则也要推迟分配。
通俗讲就是:当一个进程申请使用资源的时候,银行家算法通过先试探分配给该进程资源,然后通过安全性算法判断分配后的系统是否处于安全状态,若不安全则试探分配作废,让该进程继续等待。
参考链接:
.html
.html
Python里面没找到读写锁,这个应用场景也是有的,先简单说说这个概念,你可以结合RLock
实现读写锁(了解下,用到再研究)
读写锁(一把锁):
扩展参考:
/?p=2384
.htm
上次说了锁相关,把问题稍微汇聚提炼一下~重点在思想,语言无差别
正常执行线程任务没什么好说的,可以通过isAlive
判断当前线程状态,对于耗时操作可以设置超时时间t.join(timeout=1)
+重试机制
但是后台线程Thread(daemon=True)
就没那么好控制了:这些线程会在主线程终止时自动销毁。除了如上所示的两个操作,并没有太多可以对线程做的事情(无法结束一个线程,无法给它发送信号,无法调整它的调度,也无法执行其他高级操作)
比如说,如果你需要在不终止主线程的情况下杀死线程,那么这个线程就不能通过daemon
的方式了,必须通过编程在某个特定点轮询来退出:
from time import sleep
from multiprocessing.dummy import threadingclass MyThread(threading.Thread):def __init__(self):self.__running = Truesuper().__init__()def terminate(self):self.__running = Falsedef run(self):# 轮询方式必须根据业务来,不然没有意义while self.__running:print("do something")sleep(2)
def main():t = MyThread()t.start()t.terminate() # 调用的时候可以通过`terminate`来结束线程t.join()# t.join(timeout=1) # 超时时间print("over")if __name__ == '__main__':main()
输出:(再提醒一下,轮循必须根据业务来,不管是重试机制还是其他,这边只是举个例子)
do something
over
上面这种方式,比较好理解,但是比较依赖threading.Thread
,项目里面一般这么改下:
from time import sleep
from multiprocessing.dummy import threadingclass ShutdownTask(object):def __init__(self):self.__running = Truedef terminate(self):self.__running = Falsedef run(self):# 轮询方式必须根据业务来,不然没有意义while self.__running:print("do something")sleep(2)def main():task = ShutdownTask()t = threading.Thread(target=task.run)t.start()inate() # 结束线程t.join()print("over")if __name__ == '__main__':main()
输出:(ShutdownTask
就解耦了,不依赖threading
库了,你放在进程中使用也没事了)
do something
over
是不是心想着现在都妥妥的了?但是遗憾的是~如果遇到了IO阻塞的情况,轮循形同虚设,这时候就需要超时时间来解决了:
伪代码实现:(加上重试机制更完善)
class IOTask:def __init__(self):self.__running = Truedef terminate(self):self.__running = Falsedef run(self, socket):socket.settimeout(3) # 设置超时时间while self.__running:try:print("正在忙.....")v(8192)sleep(1)breakexcept Exception:print("超时处理")break
由于全局解释锁(GIL)的原因,Python 的线程被限制到同一时刻只允许一个线程执行这样一个执行模型。所以,Python 的线程更适用于处理I/O和其他需要并发执行的阻塞操作(比如等待I/O、等待从数据库获取数据等等),而不是需要多处理器并行的计算密集型任务。【这也是为什么我说Python和其他语言并发编程的重点不一样:进程+协程
】
Python进程Process
可以通过:terminate()
or signal
的方式终止:(点我回顾)
terminate
联合signal
进行退出前处理:
from time import sleep
from signal import signal, SIGTERM
from multiprocessing import Process# 可以释放锁、记录日记之类的操作
def save_data(signalnum, frame):print(f"[退出前处理]signalnum:{signalnum},frame:{frame}")exit(0)def test():# 信号处理signal(SIGTERM, save_data)print("subProcess start")sleep(2)print("subProcess over")def main():p = Process(target=test)p.start()sleep(inate() # 进程结束p.join()print("mainProcess over")if __name__ == '__main__':main()
输出:
subProcess start
[退出前处理]signalnum:15,frame:<frame object at 0x7f27df6c6210>
mainProcess over
还有一种方式,通过进程间状态共享(点我回顾),实现优雅的退出子进程
这块上面说很多了,再介绍几种:
在多线程环境下,每个线程都有自己的数据,想要互不干扰又不想定义成局部变量传来传去,怎么办?
一开始是这么解决的:
from multiprocessing.dummy import threadingglobal_dict = {}def task1():# 根据当前线程查找:global_dict[threading.current_thread()] = 10global_dict[threading.current_thread()] += 10def task2():# 根据当前线程查找:global_dict[threading.current_thread()] = 10global_dict[threading.current_thread()] -= 10def main():t1 = threading.Thread(target=task1)t2 = threading.Thread(target=task2)t1.start()t2.start()t1.join()t2.join()print(global_dict)if __name__ == '__main__':main()
但这么搞也很麻烦,于是就有了ThreadLocal
:
from multiprocessing.dummy import threadingglobal_local = threading.local()def show_name():print(f"[{threading.current_thread().name}]{global_local.name}")def task1():global_local.name = "小明"show_name()def task2():global_local.name = "小张"show_name()def main():t1 = threading.Thread(target=task1)t2 = threading.Thread(target=task2)t1.start()t2.start()t1.join()t2.join()if __name__ == '__main__':main()
输出:(同样存的是name属性,不同线程间互不影响)
[Thread-1]小明
[Thread-2]小张
再来谈谈常用的两种死锁解决思路:(这次不仅仅局限在Python
了)
tryLock
说说顺序锁的算法:hash Sort
(3种情况),先看看几种hash的对比吧:
%time
from multiprocessing.dummy import Lockm_lock = Lock()
z_lock = Lock()
print(f"是否相等:{m_lock==z_lock}n{m_lock}n{z_lock}") # 地址不一样
CPU times: user 3 µs, sys: 1 µs, total: 4 µs
Wall time: 9.3 µs
是否相等:False
<unlocked _thread.lock object at 0x7fdc5d7bd9e0>
<unlocked _thread.lock object at 0x7fdc640f2878>
In [2]: %timem_code = hash(m_lock)
z_code = hash(z_lock)
print(f"是否相等:{m_code==z_code}n{m_code}n{z_code}") # 值一样
CPU times: user 2 µs, sys: 0 ns, total: 2 µs
Wall time: 5.48 µs
是否相等:False
8786527370654
-9223363250320510329
In [3]: %timefrom hashlib import sha1# Java可以使用:identityhashcode
m_code = sha1(str(m_lock).encode("utf-8")).hexdigest()
z_code = sha1(str(z_code).encode("utf-8")).hexdigest()
print(f"是否相等:{m_code==z_code}n{m_code}n{z_code}") # 不相等
CPU times: user 3 µs, sys: 0 ns, total: 3 µs
Wall time: 5.25 µs
是否相等:False
f330fa642adfe965795dc5e88df13f21deff8afc
3ef62508c341fe5c6f3595cd6e1864d3b4ae9f28
In [4]: %timem_code = id(m_lock)
z_code = id(z_lock)
print(f"是否相等:{m_code==z_code}n{m_code}n{z_code}") # 不相等
CPU times: user 2 µs, sys: 1 µs, total: 3 µs
Wall time: 5.01 µs
是否相等:False
140584437930464
140584548247672
如果是一般的顺序死锁,那么程序代码改改逻辑基本上就可以避免了。比如调试的时候就知晓,或者借助类似于jstack
or 开发工具查看:
怕就怕在动态上==>举个例子:(还是小明小张互刷的案例)
有人实践后很多疑问,说明明我就按照顺序加锁了啊,先加转出账号,再加锁转入账号?
其实...换位思考就懂了==>伪代码
def transfer(p_from, p_to, money):with p_from.lock: -= with p_to.lock:p_to += money
这个虽然按照了所谓的顺序,但是转帐人其实在变,也就变成了动态的,所以也会出现死锁:
from time import sleep
from multiprocessing.dummy import Pool as ThreadPool, Lockclass People(object):def __init__(self, name, money=5000):self.name = nameself.lock = Lock() = money # 设置一个初始金额def transfer(p_from, p_to, money):with p_from.lock: -= moneysleep(1) # 模拟网络延迟with p_to.lock:p_to += moneydef main():xiaoming = People("小明")xiaozhang = People("小张")print(f"[互刷前]小明:{},小张:{}")p = ThreadPool()p.apply_async(transfer, args=(xiaoming, xiaozhang, 1000))p.apply_async(transfer, args=(xiaozhang, xiaoming, 1000))p.close()p.join()print(f"[互刷后]小明:{},小张:{}")if __name__ == '__main__':main()
输出:(死锁了,联想哲学家吃面~每个人先拿自己的筷子再抢人的筷子)
[互刷前]小明:5000,小张:5000
解决方案~伪代码思路:
def transfer(cls, p_from, p_to, money):"""p_from:谁转账,p_to:转给谁,money:转多少"""from_hash = get_hash(p_from)to_hash = get_hash(p_to)# 规定:谁大先锁谁if from_hash > to_hash:with p_from.lock: -= moneysleep(1) # 模拟网络延迟with p_to.lock: += moneyelif from_hash < to_hash:with p_to.lock: += moneysleep(1) # 模拟网络延迟with p_from.lock: -= money# hash出现碰撞时处理:(可能性很低)else:# 平局的时候,大家一起抢一个中间锁,谁抢到谁先转账with cls.tie_lock:with p_from.lock: -= moneysleep(1) # 模拟网络延迟with p_to.lock: += money
完整Python代码示意:
from time import sleep
from multiprocessing.dummy import Pool as ThreadPool, Lockclass Account(object):def __init__(self, name, money=5000):self.name = nameself.lock = Lock() = money # 设置一个初始金额class Bank(object):tie_lock = Lock()@classmethoddef __get_hash(cls, obj):return id(obj) # hash_func(obj)@classmethoddef transfer(cls, p_from, p_to, money):"""p_from:谁转账,p_to:转给谁,money:转多少"""from_hash = cls.__get_hash(p_from)to_hash = cls.__get_hash(p_to)print(f"from:{p_from.name}to{p_to.name}=>{money}")# 规定:谁大先锁谁if from_hash > to_hash:print("from_hash > to_hash")with p_from.lock: -= moneysleep(1) # 模拟网络延迟with p_to.lock: += moneyelif from_hash < to_hash:print("from_hash < to_hash")with p_to.lock: += moneysleep(1) # 模拟网络延迟with p_from.lock: -= money# hash出现碰撞时处理:(可能性很低)else:print("from_hash < to_hash")# 平局的时候,大家一起抢一个中间锁,谁抢到谁先转账with cls.tie_lock:with p_from.lock: -= moneysleep(1) # 模拟网络延迟with p_to.lock: += money
def main():xiaoming = Account("小明")xiaozhang = Account("小张")xiaopan = Account("小潘")print(f"[互刷前]小明:{},小张:{},小潘{}")p = ThreadPool()for i in range(3):p.apply_ansfer, args=(xiaoming, xiaozhang, 1000))if i == 1: # 小潘突然间还了1000给小明p.apply_ansfer, args=(xiaopan, xiaoming, 1000))p.apply_ansfer, args=(xiaozhang, xiaoming, 1000))p.close()p.join()print(f"[互刷后]小明:{},小张:{},小潘{}")if __name__ == '__main__':main()
输出:
[互刷前]小明:5000,小张:5000,小潘5000
from:小明to小张=>1000
from_hash < to_hash
from:小张to小明=>1000
from:小明to小张=>1000
from_hash > to_hash
from_hash < to_hash
from:小潘to小明=>1000
from_hash < to_hash
from:小张to小明=>1000
from:小明to小张=>1000
from_hash > to_hash
from_hash < to_hash
from:小张to小明=>1000
from_hash > to_hash
[互刷后]小明:6000,小张:5000,小潘4000
Python
上下文管理器我就不说了,上面说过了,思路和“顺序锁”基本一样:
from contextlib import contextmanager
from multiprocessing.dummy import threading # or import threading_local = threading.local()@contextmanager
def acquire(*args):# 以id将锁进行排序args = sorted(args, key=lambda x: id(x))# 确保不违反以前获取的锁顺序acquired = getattr(_local, 'acquired', [])if acquired and max(id(lock) for lock in acquired) >= id(args[0]):raise RuntimeError('锁顺序有问题')# 获取所有锁d(args)_local.acquired = acquired # ThreadLocal:每个线程独享acquired# 固定格式try:for lock in args:lock.acquire()yieldfinally:# 逆向释放锁资源for lock in reversed(args):lease()# 把释放掉的锁给删了del acquired[-len(args):]
大家都听说过死锁deadlock,但是很少有人听说过活锁livelock。活锁主要由两个线程过度谦让造成,两个线程都想让对方先干话,结果反而都无法继续执行下去。因为两个线程都在活跃状态,故称活锁。
trylock
可以解决死锁问题,但是用不好也会出现少见的活锁问题:
from time import sleep
from random import random
from multiprocessing.dummy import Pool as ThreadPool, Lockclass People(object):def __init__(self, name, money=5000):self.name = nameself.lock = Lock() # 非阻塞等 = money # 设置一个初始金额def transfer(p_from, p_to, money):flag = Truewhile flag:# 尝试获取p_from.lockif p_from.lock.acquire(False): # 非阻塞try:sleep(1) # 模拟网络延迟# 尝试获取p_to.lockif p_to.lock.acquire(False):try: -= += moneyflag = Falsefinally:print("p_to release")p_lease() # 释放锁finally:p_lease() # 释放锁sleep(random()) # 随机睡[0,1)sdef main():xiaoming = People("小明")xiaozhang = People("小张")xiaopan = People("小潘")print(f"[互刷前]小明:{},小张:{},小潘:{}")p = ThreadPool()for i in range(3):p.apply_async(transfer, args=(xiaoming, xiaozhang, 1000))if i == 1:p.apply_async(transfer, args=(xiaopan, xiaoming, 1000))p.apply_async(transfer, args=(xiaozhang, xiaoming, 1000))p.close()p.join()print(f"[互刷后]小明:{},小张:{},小潘:{}")if __name__ == '__main__':main()
输出:(没有sleep(random()) # 随机睡[0,1)s
就是一个活锁了)
[互刷前]小明:5000,小张:5000,小潘:5000
p_to release
p_to release
p_to release
p_to release
p_to release
p_to release
[互刷后]小明:6000,小张:5000,小潘:4000
可以思考一下,为什么trylock
的时候 -= money
和 += money
都要放在code最里面
参考链接:
守护线程参考:.html
Posix Thread:.html
一句话实现并行:
进程与线程的一个简单解释:.html线程分离方面的参考:
/
:
.html
.html死锁调试参考:
.html
.html
:
老外对杀死子线程的探讨:
线程同步这块,之前讲了锁系列,现在把剩下的也说说
Queue大家都很熟悉,应用场景很多很多,不仅仅局限在线(进)程同步,很多业务场景都在使用。
在开始之前先看一个秒杀场景:一般都使用乐观锁,也就是大家经常提的CAS机制来实现,数据所在的内存值,预期值,新值。当需要更新时,判断当前内存值与之前取到的值是否相等,若相等,则用新值更新,若失败则不断重试(sleep(random)
)
从数据库层面控制就是这样:(原子性操作)
update table set amout=amout-#{buys}, version=version+1 where id=#{id} and version=#{version}orupdate table set amout=amout-#{buys} where id=#{id} and amout-#{buys}>=0
我们用代码模拟一下:(Python里面没有CountDownLatch
,我们用之前学的条件变量实现一个)
# 模拟Java里的CountDownLatch(条件变量模拟)
# 可以理解为赛跑,当运动员全部准备好了,裁判一枪下去,开始比赛
class CountDownLatch(object):def __init__(self): = Condition() # 条件变量def wait(self)::wait()def countDown(self)::ify_all() # 开枪(唤醒所有线程)
模拟:
count = 100 # 库存100件class MyThread(threading.Thread):def __init__(self, id, con):self.id = = consuper().__init__()def run(self):wait()if count > 0: # if count - 1 >= 0:count -= 1print(f"线程{self.id}~抢到一件商品")def main():con = CountDownLatch() # 条件变量t_list = [MyThread(id=i, con=con) for i in range(1000)]for t in t_list:t.start()print("准备开抢")untDown() # 唤醒所有for t in t_list:t.join()print(f"剩余库存{count}")if __name__ == '__main__':main()
输出:(没错,没用锁一样高并发~)
线程42~抢到一件商品
线程49~抢到一件商品
线程50~抢到一件商品
线程63~抢到一件商品
线程84~抢到一件商品
线程113~抢到一件商品
线程135~抢到一件商品
线程161~抢到一件商品
线程183~抢到一件商品
线程220~抢到一件商品
线程271~抢到一件商品
线程278~抢到一件商品
线程302~抢到一件商品
线程359~抢到一件商品
线程379~抢到一件商品
....
线程10~抢到一件商品
线程18~抢到一件商品
线程23~抢到一件商品
线程26~抢到一件商品
线程33~抢到一件商品
线程44~抢到一件商品
线程52~抢到一件商品
线程53~抢到一件商品
线程158~抢到一件商品
线程177~抢到一件商品
线程227~抢到一件商品
线程289~抢到一件商品
线程15~抢到一件商品
线程37~抢到一件商品
线程134~抢到一件商品
线程212~抢到一件商品
线程72~抢到一件商品
线程305~抢到一件商品
线程365~抢到一件商品
剩余库存0real 0m0.189s
user 0m0.161s
sys 0m0.101s
如果你把if count > 0:
注释掉:(瞬间呵呵哒了)
剩余库存-900real 0m0.215s
user 0m0.188s
sys 0m0.088s
如果你在修改的时候加个锁:
real 0m0.195s
user 0m0.157s
sys 0m0.100s
在这里说,其实没有多大意义,了解下即可(数据库最大连接数是有瓶颈的,后端项目里面一般都是使用缓存的CAS机制
,比如Redis
的watch
、memcached
的gets
和cas
,还有就是我们下面要介绍的Queue
了)
后面会说,引入部分不用深究,记住两个即可:
CountDownLatch
的模拟Queue在讲进程的时候就有说过(进程间通信),线程用法也差不多,看个经典案例:
import time
import random
from multiprocessing.dummy import Pool as ThreadPool, Queuedef consumer(q, i):while True:data = q.get()print(f"[消费者{i}]商品{data}抢光了")def producer(q):while True:num = random.random()q.put(num)print(f"[生产者]商品{num}出厂了n")time.sleep(num)def main():q = Queue(10) # 为了演示,我这边限制一下pool = ThreadPool()# 一个生产者pool.apply_async(producer, args=(q,))# 两个消费者pool.apply_async(consumer, args=(q, 1))pool.apply_async(consumer, args=(q, 2))pool.close()pool.join()if __name__ == '__main__':main()
输出图示:(非阻塞可以使用put_nowait
和get_nowait
)
Queue是线程安全的放心使用,我们来看看Queue源码:(条件变量Condition
和Lock
的综合使用)
class Queue:def __init__(self, maxsize=0):self.maxsize = maxsizeself._init(maxsize)self.mutex = threading.Lock() # 三个Condition公用# get的时候使用,如果队列空了就等待_empty = threading.Condition(self.mutex)# put的时候使用,如果队列满了就等待_full = threading.Condition(self.mutex)# 每当未完成任务的数量降至零时,通知所有线程self.all_tasks_done = threading.Condition(self.mutex)self.unfinished_tasks = 0 # 未完成任务def put(self, item, block=True, timeout=None):# 如果队列满了就等待# _full = threading.Condition(self.mutex)_full:if self.maxsize > 0:if not block:if self._qsize() >= self.maxsize:raise Fullelif timeout is None:while self._qsize() >= self._full.wait()elif timeout < 0:raise ValueError("'timeout' must be a non-negative number")else:endtime = time() + timeoutwhile self._qsize() >= self.maxsize:remaining = endtime - time()if remaining <= 0.0:_full.wait(remaining)self._put(item)self.unfinished_tasks += _ify()def get(self, block=True, timeout=None):# 如果队列空了就等待# _empty = threading.Condition(self.mutex)_empty:if not block:if not self._qsize():raise Emptyelif timeout is None:while not self._qsize():_empty.wait()elif timeout < 0:raise ValueError("'timeout' must be a non-negative number")else:endtime = time() + timeoutwhile not self._qsize():remaining = endtime - time()if remaining <= 0.0:_empty.wait(remaining)item = self._get()_ify()return item
来个场景,厂家倒闭(任务列表完成了)怎么通知消费者不用等待了?
回顾一下使用协程是怎么解决的:协程yield实现多任务调度
def consumer():status = ""while True:tmp = yield statusif not tmp:print("消费者已经睡觉了...")returnprint("消费者:获得商品%s号..." % tmp)status = "ok"def produce(c):# 启动消费者c.send(None)for i in range(1, 3):print("生产者:出产商品%s号..." % i)# 生产商品,并提交给消费者status = c.send(i)print("生产者:生产者消费状态: %s" % status)# c.send(None) 执行这个会引发StopIterationc.close() # 使用close就可以避免了(手动关闭生成器函数,后面的调用会直接返回StopIteration异常)if __name__ == '__main__':# 创建消费者c = consumer()produce(c)
输出:
生产者:出产商品1号...
消费者:获得商品1号...
生产者:生产者消费状态: ok
生产者:出产商品2号...
消费者:获得商品2号...
生产者:生产者消费状态: ok
当使用Queue
时,协调生产者和消费者的关闭问题可以在队列中放置一个特殊的值,当消费者读到这个值的时候,终止执行:
import time, random, uuid
from multiprocessing.dummy import Pool as ThreadPool, Queuestop_obj = uuid.uuid1() # 获取UUID(GUID)def consumer(q, i):while True:data = q.get()if data == stop_obj:print(f"[消费者{i}]光荣退伍了")q.put(data) # 如果不加这个,其他消费者就不知道了(Queue里面的数据取出来就没了)breakprint(f"[消费者{i}]商品{data}抢光了")def producer(q):for i in range(10):num = random.random()q.put(num)print(f"[生产者]商品{num}出厂了")time.sleep(num)q.put(stop_obj) # 发送结束命令def main():q = Queue(10) # 为了演示,我这边限制一下pool = ThreadPool()# 一个生产者pool.apply_async(producer, args=(q,))# 两个消费者pool.apply_async(consumer, args=(q, 1))pool.apply_async(consumer, args=(q, 2))pool.close()pool.join()if __name__ == '__main__':main()
如果读到特殊值没有再放进队列就不能保证所有消费者都退出任务~Queue里面的数据取出来就没了 输出:(你可以把上面那句注释调看结果)
[生产者]商品0.33594145145041265出厂了
[消费者1]商品0.33594145145041265抢光了
[生产者]商品0.49907511942411487出厂了
[消费者1]商品0.49907511942411487抢光了
[生产者]商品0.6875075709064151出厂了
[消费者2]商品0.6875075709064151抢光了
[生产者]商品0.4039336126048405出厂了
[消费者1]商品0.4039336126048405抢光了
[生产者]商品0.4339014739644075出厂了
[消费者2]商品0.4339014739644075抢光了
[生产者]商品0.7101415304586235出厂了
[消费者1]商品0.7101415304586235抢光了
[生产者]商品0.39303515351899出厂了
[消费者2]商品0.39303515351899抢光了
[生产者]商品0.07572426360227902出厂了
[消费者1]商品0.07572426360227902抢光了
[生产者]商品0.8054064710812884出厂了
[消费者2]商品0.8054064710812884抢光了
[生产者]商品0.8085151230789658出厂了
[消费者1]商品0.8085151230789658抢光了
[消费者2]光荣退伍了
[消费者1]光荣退伍了
在上面案例里面,你把uuid.uuid1()
换成object()
,然后比较部分的==
换成is
也是可以的,但是分布式系统的话还是使用UUID
吧
如果想在Queue
的基础上扩展,可以自定义数据结构并添加所需的锁和同步机制(eg:Condition
)来实现线程间通信(同步)
写demo前说说理论:
以最小堆为例,画个图演示一下:
插入新节点
排序后的二叉树
准备删除节点2
把最后一个节点拿过来充数(维护二叉树稳定)
进行比较排序,把左右节点最小的拉上来
构建二叉堆:把一个无序的完全二叉树调整为二叉堆(让所有非叶子节点依次下沉
)
来个乱序的二叉树
从最后一个非叶子节点开始,和最小的子节点交换位置(8和1交换)
右边的也走一波(6和4交换)
节点5和1互换
现在根节点最小了(3和1互换)
从上往下再排个序,这时候就是最小堆了
看个完全二叉树
的规律:若从上至下、从左至右编号,则编号为i的结点:
2i+1
,其右孩子编号=2i+2
i/2
(根节点没有父节点)把上面二叉树转换成数组:
这时候再去理解优先队列就简单了:
Python提供了一个heapq
的模块:.html
来看个最小二叉堆的案例:
In [5]:%timeimport heapqh_list = []
# 来个乱序的二叉树(和图示一样)
for i in [3, 5, 6, 8, 2, 4, 7, 1, 9]:heapq.heappush(h_list, i) # 构建最小二叉堆
# 弹出最小值
heapq.heappop(h_list) # 查看堆中最小值,不弹出 heap[0]
CPU times: user 2 µs, sys: 0 ns, total: 2 µs
Wall time: 5.25 µs
Out[5]: 1
In [6]: %timeimport heapqh_list = []
# 堆元素可以是元组,可以拓展优先级的概念
heapq.heappush(h_list, (9,"小明"))
heapq.heappush(h_list, (5,"小张"))
heapq.heappush(h_list, (7,"小周"))
heapq.heappush(h_list, (3,"小潘"))heapq.heappop(h_list) # 弹出优先级最低的
CPU times: user 2 µs, sys: 1 µs, total: 3 µs
Wall time: 4.77 µs
Out[6]: (3, '小潘')
举个使用Condition
+二叉堆
实现一个优先级队列:
import heapq
from uuid import uuid1
from multiprocessing.dummy import Pool as ThreadPool, Conditionclass MaxPriorityQueue(object):"""自定义一个最大优先队列"""def __init__(self):self.__h_list = []self.__con = Condition() # 条件变量self.__index = 0 # 索引def put(self, value, sort=0):with self.__con:# heapq是最小二叉堆,优先级取负就是最大二叉堆了heapq.heappush(self.__h_list, (-sort, self.__index, value))self.__index += 1self.__ify() # 随机通知一个阻塞等的线程def get(self):with self.__con:while 1:# 0 => Falseif not self.qsize():self.__con.wait() # 列表为空则阻塞等return heapq.heappop(self.__h_list)[-1] # 返回元组最后一个元素(value)def qsize(self):return len(self.__h_list)stop_obj = uuid1() # 获取UUID(GUID)def task_put(queue):queue.put("小周", 5)queue.put("小潘", 7)queue.put("小明", 3)queue.put("小张", 9)global stop_objqueue.put(stop_obj)def task_get(queue):global stop_obj# 全部读出来while 1:data = ()if data == stop_obj:print("光荣退伍了")queue.put(stop_obj) # 保证其他消费者也能安全退出breakprint(data)if __name__ == '__main__':queue = MaxPriorityQueue()pool = ThreadPool()pool.apply_async(task_get, args=(queue,))pool.apply_async(task_put, args=(queue,))pool.close()pool.join()
输出:
小张
小潘
小周
小明
光荣退伍了
multiprocessing
、multiprocessing.dummy
、threading
¶multiprocessing.dummy
上面只列举了常用的模块,Queue这块就两个:Queue
和JoinableQueue
。既然提到了就顺便说几句,之前写进程篇的时候因为外出,急急忙忙就收尾了,像上面的Semaphore
和Condition
以及下面准备说的Event
和Barrier
等进程和线程都是通用的
如果要是非要找点不同,那么Queue这块还真有点不同,eg:Queue
里面没有task_done
和join
方法,而JoinableQueue
扩展了,而线程的Queue
是有task_done
和join
的,其他常用的进程api和线程基本上一样,用到的时候查下源码或者看看官方文档即可~
进程的Queue
与JoinableQueue
:
线程的Queue
:
threading
:
__all__ = ['get_ident', 'active_count', 'Condition', 'current_thread', 'enumerate','main_thread', 'TIMEOUT_MAX', 'Event', 'Lock', 'RLock', 'Semaphore','BoundedSemaphore', 'Thread', 'Barrier', 'BrokenBarrierError', 'Timer','ThreadError', 'setprofile', 'settrace', 'local', 'stack_size'
]
multiprocessing.dummy
:
__all__ = ['Process', 'current_process', 'active_children', 'freeze_support','Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition','Event', 'Barrier', 'Queue', 'Manager', 'Pipe', 'Pool', 'JoinableQueue']
multiprocessing.dummy
可以理解为multiprocessing
的轻量级并发库:api基本上和multiprocessing
一致,很多都是在threading
的基础上修改下或者直接使用(multiprocessing
在Process
基础上修改)比如:
# 被轻量化了,本质还是线程
# Process模块:Process = DummyProcess(threading.Thread)# 这就是为什么前面的代码的都是 as ThreadPool,这是怕和Process一起使用的时候把你们带坑里
# Pool:multiprocessing.pool.ThreadPool(processes, initializer, initargs)# 为了和进程api使用起来一致
# current_process:current_process = threading.current_thread# 再看看导入的模块就知道dummy的本质了:
from threading import Lock, RLock, Semaphore, BoundedSemaphore
from threading import Event, Condition, Barrier
from queue import Queue
看看内部实现:(比我们实现的还精简,秒懂)
class PriorityQueue(Queue):'''以优先级顺序检索打开条目的队列的变体(最低的第一个)
item通常是以下形式的元组:(优先级编号,数据)'''def _init(self, maxsize):self.queue = []def _qsize(self):return len(self.queue)def _put(self, item):heapq.heappush(self.queue, item)def _get(self):return heapq.heappop(self.queue)
看个上面MaxPriorityQueue
的案例:(想要大数字优先级高就变负数)
from uuid import uuid1
from queue import PriorityQueue
from multiprocessing.dummy import Pool as ThreadPoolstop_obj = uuid1() # 获取UUID(GUID)def task_put(queue):queue.put((-5, "小周"))queue.put((-7, "小潘"))queue.put((-3, "小明"))queue.put((-9, "小张"))global stop_obj# 可以思考一下为什么用0,如果按照小到大的顺序又该如何设置呢?queue.put((0, stop_obj))def task_get(queue):global stop_obj# 全部读出来while 1:data = ()if data[-1] == stop_obj:print("光荣退伍了")queue.put((0, stop_obj)) # 保证其他消费者也能安全退出breakprint(data[-1])def error_print(msg):print(msg)if __name__ == '__main__':queue = PriorityQueue()pool = ThreadPool()pool.apply_async(task_get, args=(queue, ), error_callback=error_print)pool.apply_async(task_put, args=(queue, ), error_callback=error_print)pool.close()pool.join()
输出:(如果功能不够用还是自己设计吧,设计的太简单调用的时候会比较麻烦)
小张
小潘
小周
小明
光荣退伍了
一看好像很高大上,翻翻源码:(其实就是基于List封装了个类,看来multiprocessing.dummy
重写这个是有原因的)
class LifoQueue(Queue):def _init(self, maxsize):self.queue = []def _qsize(self):return len(self.queue)def _put(self, item):self.queue.append(item)def _get(self):return self.queue.pop()
看个使用案例:(完全可以直接使用)
from queue import LifoQueuedef main():queue = LifoQueue()for i in range(10):queue.put(i)for i in range(queue.qsize()):())if __name__ == '__main__':main()
输出:
9
8
7
6
5
4
3
2
1
0
SimpleQueue
就不说了,和Queue使用基本上一样。线程和进程有点不一样,注意下:(进程间通信手段毕竟比线程少
)
threading
中的SimpleQueue
是FIFO
简单队列multiprocessing
中的SimpleQueue
是在PIPE
管道的基础上封装版JoinableQueue
在multiprocessing.dummy
就是Queue
:(等会直接使用Queue
即可)
# multiprocessing/dummy/__init__.py
from queue import Queue
JoinableQueue = Queue
相关源码:(下面会和Queue
对比举例)
class Queue:def __init__(self, maxsize=0):self.maxsize = maxsizeself._init(maxsize)self.mutex = threading.Lock()_empty = threading.Condition(self._full = threading.Condition(self.mutex)self.all_tasks_done = threading.Condition(self.mutex)# 进程在这边使用的是Semaphoreself.unfinished_tasks = 0def task_done(self):with self.all_tasks_done:unfinished = self.unfinished_tasks - 1if unfinished <= 0:if unfinished < 0:raise ValueError('task_done() called too many times')self.all_ify_all()self.unfinished_tasks = unfinisheddef join(self):with self.all_tasks_done:while self.unfinished_tasks:self.all_tasks_done.wait()
在multiprocessing
中的Queue
没有task_done
和join
方法,所以有了JoinableQueue
:
# multiprocessing/queues.pyclass JoinableQueue(Queue):def __init__(self, maxsize=0, *, ctx):Queue.__init__(self, maxsize, ctx=ctx)self._unfinished_tasks = ctx.Semaphore(0)self._cond = ctx.Condition()def task_done(self):with self._cond:if not self._unfinished_tasks.acquire(False):raise ValueError('task_done() called too many times')if self._unfinished_tasks._semlock._is_zero():self._ify_all()def join(self):with self._cond:if not self._unfinished_tasks._semlock._is_zero():self._cond.wait()
使用队列来进行线程间通信是一个单向、不确定的过程。通常情况下,没法知道接收数据的线程是什么时候接收到数据并开始工作的。这时候就可以使用Queue
提供的task_done()
和join()
了~
之前通知消费者退出是使用发一个消息的方式,这次换种思路~直接设置后台线(进)程,然后使用Queue
的join
方法:
from multiprocessing.dummy import threading, Queuedef consumer(queue):while 1:data = ()print(f"[消费者]消费商品{data}号")# 通知Queue完成任务了queue.task_done()def producer(queue):for i in range(10):print(f"[生产者]生产商品{i}号")queue.put(i)def main():queue = Queue()# 开启生产消费者线程任务t_list = [threading.Thread(target=func, args=(queue, ))for func in (producer, consumer)]# 启动两个线程for t in t_list:# 设置后台线程,就算是死循环当主线程退出的时候也会退出的t.setDaemon(True) # 进程是daemon属性,t.daemon=Truet.start()# 等待所有任务完成queue.join() # 你可以把这句话注释掉看输出print(f"当前队列未完成的数量:{queue.unfinished_tasks}")if __name__ == '__main__':main()
输出:
[生产者]生产商品0号
[生产者]生产商品1号
[消费者]消费商品0号
[生产者]生产商品2号
[消费者]消费商品1号
[生产者]生产商品3号
[消费者]消费商品2号
[生产者]生产商品4号
[消费者]消费商品3号
[生产者]生产商品5号
[消费者]消费商品4号
[生产者]生产商品6号
[消费者]消费商品5号
[生产者]生产商品7号
[消费者]消费商品6号
[生产者]生产商品8号
[消费者]消费商品7号
[生产者]生产商品9号
[消费者]消费商品8号
[消费者]消费商品9号
当前队列未完成的数量:0
进程案例见:/BaseCode/tree/urrent/Thread/2.lock_queue/3.queue/6.JoinableQueue.py
PS:其实Queue的完整写法应该是每次收到消息的时候调用一下q.task_done()
,便于记录未完成状态,大家进程的Queue
用多了,也就不太写了。现在task_done
讲过了,以后用线程的Queue
和进程的JoinableQueue
记得加上哦~
再扩展一下,看看queue.join
源码:(如果还不清楚,下面还有一个手写线程池的demo)
def join(self):# Condition条件变量with self.all_tasks_done:# 如果还有没有完成的任务就调用Condition的wait()方法while self.unfinished_tasks:self.all_tasks_done.wait()
Queue对象的方法:
q.full()
:判断队列是否已满q.qsize()
:返回当前队列中的元素个数q.put_nowait()
:非阻塞发送消息,等价于q.put(block=Flase)
q.join()
:等待所有任务完成q.task_done()
:在Queue中标记任务完成PS:q.qsize()
、q.full()
、
可能你对一个队列使用empty()
判断出这个队列为空,但同时另外一个线程可能已经向这个队列中插入一个数据项。所以,你最好不要在你的代码中使用这些方法。
queue模块定义的异常类:
queue.Full
:非阻塞发送消息时,如果队列满了~抛异常queue.Empty
:非阻塞获取消息时,如果队列为空~抛异常eg:
try:data = q.get_nowait() # get(timeout=5)
except queue.Empty:pass
基于简单队列编写多线程程序在线程安全队列的底层实现来看,你无需在你的代码中使用锁和其他底层的同步机制,使用队列这种基于消息的通信机制可以被扩展到更大的应用范畴,比如,你可以把你的程序放入多个进程甚至是分布式系统而无需改变底层的队列结构。
使用线程队列有一个要注意的问题:向队列中添加数据项时并不会复制此数据项,线程间通信实际上是在线程间传递对象引用。如果担心对象的共享状态,那最好只传递不可修改的数据结构(如:整型、字符串或者元组)或者一个对象的深拷贝copy.deepcopy(data)
和网络整合版的线程池后面再说,ThreadPoolExecutor
深入篇后会说,先模仿官方Pool
来个精简版:
from multiprocessing.dummy import threading, Queueclass Task(threading.Thread):def __init__(self, queue):super().__init__()self.queue = queueself.setDaemon(True) # 设置后台线程,主线程结束就终止self.start() # 开启线程,执行run方法print(f"开启一个线程~{self.name}")def run(self):func, args, kws = ()try:func(args, kws)except Exception as ex:print(ex)finally:self.queue.task_done()class ThreadPool(object):def __init__(self, count=0):# 设置Pool运行状态self.running = Truefrom os import cpu_count # 用到的时候导入对应模块即可# 默认是CPU核数,且至少有一个线程if count <= 0:count = cpu_count() or 1# 设置线程数self.queue = Queue(count)# 启动对应个数的线程for _ in range(count):Task(self.queue) # 不能在这直接启动,会阻塞Pool的def apply_async(self, func, args=(), kws={}):if self.running:# 执行任务self.queue.put((func, args, kws))def close(self):# 不再运行加入任务self.running = Falsedef join(self):# 等待任务执行完退出self.queue.join()
调用和官方风格一致:
def call_dad(*args, **kws):from time import sleepfrom random import randintn = randint(1, 2) # [1,2]print(f"休息{n}s")sleep(n)print(f"{args}~{kws}")def main():pool = ThreadPool()pool.apply_async(call_dad, args=(1, 2, 3), kws={"dad": "小明"})pool.apply_async(call_dad, args=(1, 2, 3), kws={"dad": "小张"})pool.close()pool.join()if __name__ == '__main__':main()
输出:(有些偶尔用的模块可以用的时候再导入【别放循环里,虽然重复导入模块不怎么耗时,但是总归有损耗的】)
开启一个线程~Thread-1
开启一个线程~Thread-2
开启一个线程~Thread-3
开启一个线程~Thread-4
休息1s
休息2s
((1, 2, 3), {'dad': '小明'})~{}
((1, 2, 3), {'dad': '小张'})~{}
线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就比较麻烦。这时候我们就可以使用Event
了~eg:(类比JQ里面的事件~eg:单击事件)
from time import sleep
from multiprocessing.dummy import Pool as ThreadPool, Eventevent = Event()def click():# event.clear() # 设置标准为假(默认是False)print("用户在修改网页表单")sleep(2)print("点击了修改案例")event.set() # 设置标准为真def update():print(f"事件状态:{event.is_set()}")event.wait() # 等待到标志为真print("修改成功")print(f"事件状态:{event.is_set()}")def main():pool = ThreadPool()pool.apply_async(click)pool.apply_async(update)pool.apply_async(click)pool.close()pool.join()if __name__ == '__main__':main()
输出:
用户在修改网页表单
事件状态:False
用户在修改网页表单
点击了修改案例
点击了修改案例
修改成功
事件状态:True
常用方法:
event.clear()
:恢复event的状态值为False(并发场景下有大用)event.wait()
:如果event.is_set()==False
将阻塞线程event.set()
: 设置event
的状态值为True
,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度event.is_set()
:返回event
的状态值(如果想非阻塞等可以使用这个先判断)线程有个重命名的方法叫isSet
。PS:进程线程中都有is_set
方法Event
对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生:
Event
对象中的信号标志被设置为假。等待Event
对象的线程将会被一直阻塞至标志为真。Event
对象的信号标志设置为真,它将唤醒所有等待这个Event
对象的线程。等待Event
的线程将忽略这个事件, 继续执行再来个简单版的生产消费者的案例:
from time import sleep
from random import random
from multiprocessing.dummy import Pool as ThreadPool, Eventglobal_list = []
event = Event()
stop_event = Event()n = 0def consumer(i):print(f"消费者{i}等待ing")while 1:event.wait()count = len(global_list)# 防止List空的时候pop出错if count > 0:print(f"消费了产品{global_list.pop()}")# 重置状态(加这一句能减少很多次循环)event.clear() # 可以思考一下为什么(提示:Lock)# 防止生产者结束了,但是消费者还没处理完成elif len(global_list) == 0 and stop_event.is_set():breakglobal nn += 1print(f"消费者{i}完成任务~总共循环{n}次")def producer():print("生产者正在生产商品")for i in range(10):global_list.append(i)sleep(random()) # 模拟网络延迟event.set() # 通知消费者生产结束stop_event.set() # 通知消费者已经可以结束线程了def main():pool = ThreadPool()pool.map_async(consumer, range(2)) # 两个消费者pool.apply_async(producer) #pool.close()pool.join()if __name__ == '__main__':main()
输出:(是不是又感觉多了种消费者安全退出的方式?)
消费者0等待ing
消费者1等待ing
生产者正在生产商品
消费了产品1
消费了产品0
消费了产品2
消费了产品3
消费了产品4
消费了产品5
消费了产品6
消费了产品7
消费了产品8
消费了产品9
消费者0完成任务
消费者1完成任务
PS:while条件换成:while not (len(global_list) == 0 and stop_event.is_set()):
也行
如果一个线程需要在一个“消费者”线程处理完特定的数据项时立即得到通知,你可以把要发送的数据和一个Event
一起使用,这样“生产者”就可以通过这个Event
对象来监测处理的过程了
from multiprocessing.dummy import Pool as ThreadPool, Queue, Eventdef producer(queue):for i in range(10):event = Event()queue.put((event, i))print(f"[生产者]生产了产品{i}")event.wait() # 等待消费者通知print(f"生产者已经收到消费情况的反馈{i}")def consumer(queue):while True:evt, data = ()print(f"[消费者]消费了产品{data}")evt.set() # 通知生产者def main():queue = Queue()pool = ThreadPool()pool.apply_async(consumer, args=(queue, ))pool.apply_async(producer, args=(queue, ))pool.close()pool.join()if __name__ == '__main__':main()
输出:(进程只需微微改动即可使用)
[生产者]生产了产品0
[消费者]消费了产品0
生产者已经收到消费情况的反馈0
[生产者]生产了产品1
[消费者]消费了产品1
生产者已经收到消费情况的反馈1
[生产者]生产了产品2
[消费者]消费了产品2
生产者已经收到消费情况的反馈2
[生产者]生产了产品3
[消费者]消费了产品3
生产者已经收到消费情况的反馈3
[生产者]生产了产品4
[消费者]消费了产品4
生产者已经收到消费情况的反馈4
[生产者]生产了产品5
[消费者]消费了产品5
生产者已经收到消费情况的反馈5
[生产者]生产了产品6
[消费者]消费了产品6
生产者已经收到消费情况的反馈6
[生产者]生产了产品7
[消费者]消费了产品7
生产者已经收到消费情况的反馈7
[生产者]生产了产品8
[消费者]消费了产品8
生产者已经收到消费情况的反馈8
[生产者]生产了产品9
[消费者]消费了产品9
生产者已经收到消费情况的反馈9
来看看Event
到底是何方神圣:(本质就是基于Condition
封装了一个标识位,来标记事件是否完成)
class Event:def __init__(self):self._cond = Condition(Lock()) # 条件变量self._flag = Falsedef is_set(self):return self._flagisSet = is_set # 建议用is_set,这样进程和线程方法就一致了def set(self):with self._cond:self._flag = Trueself._ify_all()def clear(self):with self._cond:self._flag = Falsedef wait(self, timeout=None):with self._cond:signaled = self._flagif not signaled:signaled = self._cond.wait(timeout)return signaled
其实应用场景很多,用起来比Condition
方便,比如在连接远程数据库或者访问api的时候设置一个重试机制,成功后再执行SQL或者数据处理:
from time import sleep
from multiprocessing.dummy import Pool as ThreadPool, Eventevent = Event()def conn_redis():n = 1time_out = 0.5# 重试机制while not event.is_set():if n == 4: # 自定义重试次数raise TimeoutError("