首页 技术 正文
技术 2022年11月21日
0 收藏 591 点赞 4,587 浏览 2947 个字

9.8 生产者消费者模型

该模型中包含两类重要的角色:

1、生产者:将负责造数据的任务比喻为生产者 2、消费者:接收生产者造出的数据来做进一步的处理的被比喻成消费者

实现生产者消费者模型三要素:1、生产者 2、消费者 3、队列

什么时候用该模型:

程序中出现明显的两类任何,一类任务是负责生产,另外一类任务是负责处理生产的数据的

该模型的好处:

1、实现了生产者与消费者解耦和

2、平衡了生产力与消费力,即生产者可以一直不停地生产,消费者可以不停地处理,因为二者不再直接沟通的,而是跟队列沟通,从而提高程序整体处理数据的速度

import time
import random
from multiprocessing import Process,Queue
def consumer(name,q):
while True:
res=q.get()
time.sleep(random.randint(1,3))
print('\033[46m消费者===》%s 吃了 %s\033[0m' %(name,res))

def producer(name,q,food):
for i in range(5):
time.sleep(random.randint(1,2))
res='%s%s' %(food,i)
q.put(res)
print('\033[45m生产者者===》%s 生产了 %s\033[0m' %(name,res))

if __name__ == '__main__':
q=Queue() #1、共享的盆 p1=Process(target=producer,args=('egon',q,'包子')) #2、生产者们
p2=Process(target=producer,args=('刘清政',q,'泔水'))
p3=Process(target=producer,args=('杨军',q,'米饭'))

c1=Process(target=consumer,args=('alex',q)) #3、消费者们
c2=Process(target=consumer,args=('xxx',q))

p1.start()
p2.start()
p3.start()
c1.start()
c2.start()

9.81 守护进程的应用

问题:消费者c1和c2在取空了q之后,则一直处于死循环中且卡在q.get()这一步

解决方式无非是让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就可以break退出死循环

import time
import random
from multiprocessing import Process,Queue

def consumer(name,q):
while True:
res=q.get()
if res is None:break
time.sleep(random.randint(1,3))
print('\033[46m消费者===》%s 吃了 %s\033[0m' %(name,res))

def producer(name,q,food):
for i in range(5):
time.sleep(random.randint(1,2))
res='%s%s' %(food,i)
q.put(res)
print('\033[45m生产者者===》%s 生产了 %s\033[0m' %(name,res))
#q.put(None)
if __name__ == '__main__':
#1、共享的盆
q=Queue()
#2、生产者们
p1=Process(target=producer,args=('egon',q,'包子'))
p2=Process(target=producer,args=('刘清政',q,'泔水'))
p3=Process(target=producer,args=('杨军',q,'米饭'))
#3、消费者们
c1=Process(target=consumer,args=('alex',q))
c2=Process(target=consumer,args=('梁书东',q))

p1.start()
p2.start()
p3.start()
c1.start()
c2.start()

p1.join()# 在生产者生产完毕后,往队列的末尾添加一个结束信号None
p2.join()
p3.join()
# 有几个消费者就应该放几个结束信号
q.put(None)#队列是共享的,主进程同样可以往队列里放None
q.put(None)

升级版:设置守护进程,向队列发送结束信号,解决管道取空阻塞问题

JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。

  • maxsize是队列中允许最大项数,省略则无大小限制

  • q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常

  • q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止

import time
import random
from multiprocessing import Process,JoinableQueue

def consumer(name,q):
while True:
res=q.get()
if res is None:break
time.sleep(random.randint(1,3))
print('\033[46m消费者===》%s 吃了 %s\033[0m' %(name,res))
q.task_done() #向q.join()发送一次信号,证明一个数据已经被取走了

def producer(name,q,food):
..........
if __name__ == '__main__':
#1、共享的盆
q=JoinableQueue()
#2、生产者们
p1=Process(target=producer,args=('egon',q,'包子'))
p2=Process(target=producer,args=('刘清政',q,'泔水'))
p3=Process(target=producer,args=('杨军',q,'米饭'))
#3、消费者们
c1=Process(target=consumer,args=('alex',q))
c2=Process(target=consumer,args=('梁书东',q))
c1.daemon=True
c2.daemon=True

p1.start()
p2.start()
p3.start()
c1.start()
c2.start()

p1.join()# 确定生产者确确实实已经生产完毕
p2.join()
p3.join()
# 在生产者生产完毕后,拿到队列中元素的总个数,然后直到元素总数变为0,q.join()这一行代码才算运行完毕
q.join()
#q.join()一旦结束就意味着队列确实被取空,消费者已经确确实实把数据都取干净了
print('主进程结束')
相关推荐
python开发_常用的python模块及安装方法
adodb:我们领导推荐的数据库连接组件bsddb3:BerkeleyDB的连接组件Cheetah-1.0:我比较喜欢这个版本的cheeta…
日期:2022-11-24 点赞:878 阅读:9,082
Educational Codeforces Round 11 C. Hard Process 二分
C. Hard Process题目连接:http://www.codeforces.com/contest/660/problem/CDes…
日期:2022-11-24 点赞:807 阅读:5,557
下载Ubuntn 17.04 内核源代码
zengkefu@server1:/usr/src$ uname -aLinux server1 4.10.0-19-generic #21…
日期:2022-11-24 点赞:569 阅读:6,406
可用Active Desktop Calendar V7.86 注册码序列号
可用Active Desktop Calendar V7.86 注册码序列号Name: www.greendown.cn Code: &nb…
日期:2022-11-24 点赞:733 阅读:6,179
Android调用系统相机、自定义相机、处理大图片
Android调用系统相机和自定义相机实例本博文主要是介绍了android上使用相机进行拍照并显示的两种方式,并且由于涉及到要把拍到的照片显…
日期:2022-11-24 点赞:512 阅读:7,815
Struts的使用
一、Struts2的获取  Struts的官方网站为:http://struts.apache.org/  下载完Struts2的jar包,…
日期:2022-11-24 点赞:671 阅读:4,898