python多线程及condition版生产者

阅读: 评论:0

python多线程及condition版生产者

python多线程及condition版生产者

  • 写在前面
  • 多线程的概念
    • Python中的多线程
  • 生产者-消费者模型
    • Condition版生产者-消费者模型代码示例
  • 线程优先级队列( Queue)
  • GIL全局解释锁(Global Interceptor Lock)
  • 扩展

写在前面

python(基于cpython解释器)的多线程其实是假多线程,是单个CPU在多个线程间迅速切换营造出来的‘错觉’,所以Python的多线程适用于IO密集型(进程运行时间大部分花在input与output上),而不适用于CPU密集型(进程运行时间大部分花在计算上)

多线程的概念

多线程即多条顺序执行流 “同时” 执行同一个任务

我们将我们需要某一进程执行的任务比作一桌子菜
单线程就是一个人在一张桌子上吃一道菜
多线程就是多个人在一张桌子上吃同一道菜


Python中的多线程

Python中有两种使用线程的方法:函数式(def)类式(class)
Python通过threadthreading两个标准库提供对线程的支持
Thread较为简陋,本文将使用Threading进行说明


多说无益,上代码
单线程示例:

from time import sleepdef Write():for i in range(3):print('作家正在写第%d篇文章'%(i+1))sleep(1)  # 加一个休眠,方便观察def Sing():for i in range(3):print('歌手正在唱第%d首歌'%(i+1))sleep(1)if __name__ == '__main__':Write()Sing()

执行结果:

作家正在写第1篇文章
作家正在写第2篇文章
作家正在写第3篇文章
歌手正在唱第1首歌
歌手正在唱第2首歌
歌手正在唱第3首歌
(耗时大约6秒)

利用threading模块中的Thread方法开启新线程,基本参数如下

threading.Thread(target,args,name)
*target*:需要执行的函数/类名
*arges*:传递给函数/类的参数(需要以列表[]或元祖()的形式)
*name*:该线程的名字(可以随便取)
用.start()启动线程

多线程示例(双线程):

import threading
import timedef Write():for i in range(3):print('作家正在写第%d篇文章'%(i+1))time.sleep(1)  # 程序运行到这里休眠一秒,方便观察def Sing():for i in range(3):print('歌手正在唱第%d首歌'%(i+1))time.sleep(1)if __name__ == '__main__':tr1 = threading.Thread(target=Write)  # 传入函数名tr2 = threading.Thread(target=Sing)tr1.start()tr2.start()

运行结果:

作家正在写第1篇文章
歌手正在唱第1首歌
作家正在写第2篇文章
歌手正在唱第2首歌
作家正在写第3篇文章
歌手正在唱第3首歌
(耗时大约3秒)

我们说过多线程是多个人在同一张桌子上吃同一道菜

多个人同时吃一道菜的时候容易发生争抢,例如两个人同时夹一个菜,一个人刚伸出筷子,结果伸到的时候已经被夹走菜了。。。此时就必须等一个人夹一口之后,在还给另外一个人夹菜,也就是说资源共享就会发生冲突争抢。

也就是说,多线程在数据量过大的情况下容易出现脏数据

正常数据示例(单线程下):

sum = 0def sum1():global sumfor i in range(10000000):sum += 1print(sum)if __name__ == '__main__':for i in range(2):sum1()

运行结果:

1000000
2000000

脏数据示例(多线程下):

import threadingsum = 0def sum1():global sumfor i in range(10000000):sum += 1print(sum)if __name__ == '__main__':'''开启两个线程执行sum1'''for i in range(2):th = threading.Thread(target=sum1)th.start()

运行结果:

11997916
12162670
(正确输出应该是1000000与2000000)

多线程时资源共享的弊端就在这里,所以我们要制定一个规定:在一个线程执行任务时,其它执行同一任务的线程需要排队等候,在该线程执行完后,其它线程才能按顺序继续执行(就像公共厕所我们会锁门,同理,当前线程也可以给它当前正在使用的数据加锁,于是我们可以称其为:上厕所锁门定律

刚好threading模块给我们提供了这个锁:Lock方法

lk = threading.Lock()
上锁:lk.acquire()
执行:任务(上厕所)
解锁:

代码示例:

import threadingsum = 0lk = threading.Lock()def sum1():global sumlk.acquire()  # 进厕所,锁门for i in range(10000000):sum += lease()  # 结束,开门,出厕所print(sum)if __name__ == '__main__':for i in range(2):th = threading.Thread(target=sum1)th.start()

运行结果:

10000000
20000000
(上锁就是好,数据正常)

但是!但是!!但是!!!Lock在线程较多,循环较多,且需要判断数据是否满足执行条件的时候(比如Lock版的生产者-消费者模型),会进行反复的上锁与解锁,这样是非常消耗CPU资源的
为了解决这个问题,我们需要引入一个继承自Lock的新方法:Condition

ct = dition()
上锁:ct.acquire()
解锁:
ct.wait(self,timeout=None):将当前线程处于等待(即阻塞)状态并释放锁。等待状态中的线程可以被其他线程使用notify函数或notify_all函数唤醒,被唤醒后,该线程会继续等待上锁,上锁后继续执行下面的代码
notify(self,n=1):唤醒某一指定线程,默认唤醒等待中的第一个线程
notify_all(self):唤醒所有等待中的线程

生产者-消费者模型

Created with Raphaël 2.1.2 生产者(生产资源) 资源 消费者(消费资源)

Condition版生产者-消费者模型代码示例

import threading
import random
from time import sleepct = threading.Condition()all_money = 1000  # 基础金钱1000元
count = 10  # 限制生产者只可以生产十次class producers(threading.Thread):'''生产者模式'''def run(self):global all_moneyglobal countwhile True:ct.acquire()  # 处理数据前,先上锁if count > 0:  # 如果生产次数小于十次money = random.randint(200,1000)  # 随机生产200-1000元all_money += money  # 总金钱数 = 原总金钱数+生产金钱数count -= 1  # 允许生产次数-1print('生产者%s生产了%d元,剩余金钱%d元' % (threading.current_thread(), money, all_money))else:  # 如果生产次数已满10次ct.release()  # 解锁break  # 生产结束,跳出循环ct.notify_all()  # 通知所有等待中的消费者,生产已完成,可以开始消费ct.release()  # 解锁sleep(0.5)class comsumer(threading.Thread):'''消费者模式'''def run(self):global all_moneyglobal countwhile True:ct.acquire()  # 处理数据前,先上锁money = random.randint(200,1000)  # 随机消费200-1000元# 下面这个while是重点!(敲黑板,记笔记,后面我会说到的)while money > all_money:  # 如果需消费金额大于总金额,则等待至总金额大于需消费金钱if count == 0:  # 如果生产者生产次数已达上限ct.release()  # 结束前解锁return  # 结束函数print('消费者%s需要消费%d元,剩余金钱%d元,不足' % (threading.current_thread(), money, all_money))ct.wait()  # 进入等待(阻塞进程)all_money -= money  # 剩余金额大于消费金额时,总金额 = 原总金额 - 消费金额print('消费者%s消费了%d元,剩余金钱%d元' % (threading.current_thread(), money, all_money))ct.release()  # 解锁sleep(0.5)if __name__ == '__main__':for i in range(3):th = comsumer(name='线程%d'%i)th.start()for i in range(5):th = producers(name='线程%d'%i)th.start()

运行结果:

消费者<comsumer(线程0, started 42404)>消费了669元,剩余金钱331元
消费者<comsumer(线程1, started 15420)>需要消费796元,剩余金钱331元,不足
消费者<comsumer(线程2, started 15748)>需要消费718元,剩余金钱331元,不足
生产者<producers(线程0, started 15356)>生产了449元,剩余金钱780元
消费者<comsumer(线程2, started 15748)>消费了718元,剩余金钱62元
消费者<comsumer(线程1, started 15420)>需要消费796元,剩余金钱62元,不足
生产者<producers(线程1, started 15584)>生产了544元,剩余金钱606元
生产者<producers(线程2, started 42460)>生产了251元,剩余金钱857元
消费者<comsumer(线程1, started 15420)>消费了796元,剩余金钱61元
生产者<producers(线程3, started 15856)>生产了904元,剩余金钱965元
生产者<producers(线程4, started 17556)>生产了996元,剩余金钱1961元
生产者<producers(线程0, started 15356)>生产了886元,剩余金钱2847元
消费者<comsumer(线程2, started 15748)>消费了989元,剩余金钱1858元
消费者<comsumer(线程0, started 42404)>消费了376元,剩余金钱1482元
生产者<producers(线程1, started 15584)>生产了646元,剩余金钱2128元
生产者<producers(线程2, started 42460)>生产了219元,剩余金钱2347元
生产者<producers(线程4, started 17556)>生产了604元,剩余金钱2951元
消费者<comsumer(线程1, started 15420)>消费了411元,剩余金钱2540元
生产者<producers(线程3, started 15856)>生产了828元,剩余金钱3368元
消费者<comsumer(线程0, started 42404)>消费了538元,剩余金钱2830元
消费者<comsumer(线程2, started 15748)>消费了515元,剩余金钱2315元
消费者<comsumer(线程1, started 15420)>消费了240元,剩余金钱2075元
消费者<comsumer(线程2, started 15748)>消费了229元,剩余金钱1846元
消费者<comsumer(线程0, started 42404)>消费了348元,剩余金钱1498元
消费者<comsumer(线程1, started 15420)>消费了677元,剩余金钱821元
消费者<comsumer(线程0, started 42404)>消费了412元,剩余金钱409元
消费者<comsumer(线程1, started 15420)>消费了213元,剩余金钱196元

完美输出,使用Condition方法,可以有效避免不必要的CPU浪费

还有一个问题,关于wait(),wait()中的线程被唤醒后,并非立马拿到锁(因为它被阻塞后,很可能会被排到队列的后面),这样可能会导致数据的不对等(拿到其他线程处理之前的数据处理),所以在消费者类的while True下面,我们并没有用if来判断金额大小,而是用while,这样可以确保线程一直在原位置等待,一被唤醒就可以拿到锁

线程优先级队列( Queue)

线程安全队列
我们给列表A的某一个索引[0]赋值分为三个步骤:
1)指出我们要将1赋给A[0]:A[0] = 1;
2)找出A[0]所指向的内存地址;
3)将1存入A[0]对应的地址的内存中

不安全的队列,很可能在第二步的时候被终止,而被另一个线程调去执行任务,但是队列中的操作已经在前一个线程中被执行到一半,即到第二个线程中时,第二个线程可能会将数据存入第一个线程内的数据应该存入的内存地址中,导致脏数据产生(放苹果的篮子被放入了梨)

python中的list并非线程安全的(即执行时是可以被打断的,这样会造成脏数据),为了解决这个问题,python为我们提供了Queue模块,Queue模块中包含了同步的线程安全的队列类,包括FIFO(先入先出)队列QueueLIFO(后入先出)队列LifoQueue(类似‘栈’),和优先级队列PriorityQueue。这些队列都实现了锁原语(可以理解为原子级操作最小级,不可分割的操作,开始执行就不能被打断)),能够在多线程中直接使用。可以使用队列来实现线程间的同步。

qu = Queue(maxsize):创建一个先进先出的列队 maxsize为列队允许的最大元素数

qu.qsize(): 返回队列的大小
:如果队列为空,返回True,反之False
qu.full() :如果队列满了,返回True,反之False,Queue.full 与 maxsize 大小对应
<([block[, timeout]]):获取队列,timeout等待时间
qu.put(item) :写入队列,timeout等待时间

代码示例:

import time
from queue import Queuequ = Queue(4) #创建一个最大数据数为4的列队
print('初始队列是否为空:',qu.empty())
qu.put('数据一')
qu.put(['数据二'])
print('加入两个数据后,列队是否已满:',qu.full())
print('加入两个数据后,列队大小:',qu.qsize())
print('获取列队中第一个数据,block默认为True(即阻塞,直到队列中有数据再取):',qu.get(block=True))
print('加入两个数据后,又获取一个数据后,列队大小:',qu.qsize())
qu.put('数据一')
qu.put(('数据三'))
qu.put(4)
print('加入四个数据后,列队是否已满:',qu.full())
if qu.full():start = time.time()try:qu.put(5,timeout=3)  # 如果列队已满,三秒后结束插入except:end = time.time()print('列队已满,插入数据失败,等待时间为:',end-start)for i in range(qu.qsize()):())

运行结果:

初始队列是否为空: True
加入两个数据后,列队是否已满: False
加入两个数据后,列队大小: 2
获取列队中第一个数据,block默认为True(即阻塞,直到队列中有数据再取): 数据一
加入两个数据后,又获取一个数据后,列队大小: 1
加入四个数据后,列队是否已满: True
列队已满,插入数据失败,等待时间为: 3.0081722736358643
[‘数据二’]
数据一
数据三
4

多线程中使用Queue:

import threading
from queue import Queue
import timedef set_value(qu):'''生成元素放入列队'''index = 0while True:qu.put(index)index += 1start = time.time()time.sleep(2)  # 每隔三秒放入一个end = time.time()print('阻塞时间为:',end-start,'秒threading.Thread')def get_value(qu):while True:print('数据:',qu.get())  # 列队中有数据就取出来,没有就等待if __name__ == '__main__':qu = Queue(4)t1 = threading.Thread(target=set_value,args=[qu])t2 = threading.Thread(target=get_value,args=[qu])t1.start()t2.start()

运行结果:

数据: 0
阻塞时间为: 2.0001144409179688 秒threading.Thread
数据: 1
阻塞时间为: 2.0001144409179688 秒threading.Thread
数据: 2
阻塞时间为: 2.0001142024993896 秒threading.Thread
数据: 3
阻塞时间为: 2.0001144409179688 秒threading.Thread
数据: 4
阻塞时间为: 2.0001144409179688 秒threading.Thread
数据: 5
阻塞时间为: 2.0001144409179688 秒threading.Thread
数据: 6
阻塞时间为: 2.0001144409179688 秒threading.Thread
数据: 7
阻塞时间为: 2.0001144409179688 秒threading.Thread
数据: 8
阻塞时间为: 2.0001144409179688 秒threading.Thread
数据: 9

最后重点

GIL全局解释锁(Global Interceptor Lock)

Python代码的执行由Python 虚拟机(也叫解释器主循环,CPython版本)来控制,Python 在设计之初就考虑到要在解释器的主循环中,同时只有一个线程在执行,即在任意时刻,只有一个线程在解释器中运行。对Python 虚拟机的访问由全局解释器锁(GIL)来控制,正是这个锁能保证同一时刻只有一个线程在运行。
在多线程环境中,Python 虚拟机按以下方式执行:

  1. 设置GIL
  2. 切换到一个线程去运行
  3. 运行:
    a. 指定数量的字节码指令,或者
    b. 线程主动让出控制(可以调用time.sleep(0))
  4. 把线程设置为睡眠状态
  5. 解锁GIL
  6. 再次重复以上所有步骤

在调用外部代码(如C/C++扩展函数)的时候,GIL 将会被锁定,直到这个函数结束为止(由于在这期间没有Python 的字节码被运行,所以不会做线程切换)。

以上,就是我对Python多线程的理解与一些经验,到这里结束啦

扩展

下面是扩展环节(参考资料):

菜鸟教程
知乎(吃菜梗的来源)
网易云课堂的一个爬虫教程,中间有一段讲多线程的,讲得挺好的(厕所梗来源)

这里真的结束啦!

本文发布于:2024-01-31 14:53:09,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/170668398929314.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

上一篇:kafka消费者
标签:生产者   多线程   python   condition
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23