生产者消费者模型
主要用于解耦
from multiprocessing import Queue #队列是安全的,不用加锁. q = Queue(num) num : 队列的最大长度 q.get()#阻塞等待获取数据,如果有数据直接获取,如果没有数据,阻塞等待 q.put()#阻塞,如果可以继续往队列中放数据就直接放,不能放就阻塞等待 q.get_nowait()#不阻塞,如果有数据就直接获取,没有数据就报错 q.put_nowait()#不阻塞, 如果能继续往队列中放数据,就直接放,不能放就报错 q = Queue(3) q.put(1) q.put('abc') q.put([4,5,6]) print('此时队列已不能再放入了') q.put('呵呵')#此处阻塞等待空位置放入 #q.putnowait('呵呵')#队列已满,不再等待,直接报错 print('此处不会被打印')
print(q.get())#先进先出,先取出 1
print(q.get())
print(q.get())
#print(q.get())#队列为空,取不出会阻塞等待新数据取出
print(q.getnowait())#不再等待直接报错
借助队列实现生产者消费者模型 (队列(First In First Out 简称 : FIFO) : 先进先出 )
from multiprocessing import Queue ,Process def consumer(q,name): while 1: pro_info = q.get()#如果消费者不知道生产者停止生产,将会一直等待获取 if pro_info : print('%s拿走了%s' % (name,pro_info)) else:#当收到None时,结束获取,退出程序 break def producer(q,product): for i in range(10): pro_info = product + '的成品%s号' % str(i) q.put(pro_info) q.put(None)#生产者停止生产的标识 if __name__ == '__main__' : q = Queue(5)#规定队列最大为5 pro = Process(target=producer, args=(q,'版本一')) con = Process(target=consumer, args=(q,'小潘')) pro.start() con.start() #把成产表示符放入父进程 from multiprocessing import Queue ,Process def consumer(q,name): while 1: pro_info = q.get()#如果消费者不知道生产者停止生产,将会一直等待获取 if pro_info : print('%s拿走了%s' % (name,pro_info)) else:#当收到None时,结束获取,退出程序 break def producer(q,product): for i in range(10): pro_info = product + '的成品%s号' % str(i) q.put(pro_info) if __name__ == '__main__' : q = Queue(5)#规定队列最大为5 pro = Process(target=producer, args=(q,'版本一')) con = Process(target=consumer, args=(q,'小潘')) pro.start() con.start() pro.join() q.put(None)#生产者停止生产的标识
#多个生产者消费者
from multiprocessing import Queue ,Process
def consumer(q,name):
while 1:
pro_info = q.get()#如果消费者不知道生产者停止生产,将会一直等待获取
if pro_info :
print('%s拿走了%s' % (name,pro_info))
else:#当收到None时,结束获取,退出程序
break
def producer(q,product):
for i in range(20):
pro_info = product + '的成品%s号' % str(i)
q.put(pro_info)
if __name__ == '__main__' :
q = Queue(5)#规定队列最大为5
pro1 = Process(target=producer, args=(q,'版本一'))
pro2 = Process(target=producer, args=(q, '版本二'))
pro3 = Process(target=producer, args=(q, '版本三'))
con1 = Process(target=consumer, args=(q,'小潘'))
con2 = Process(target=consumer, args=(q, '李四'))
li = [pro1,pro2,pro3,con1,con2]
[i.start() for i in li]
pro1.join()
pro2.join()
pro3.join()
q.put(None)#生产者停止生产的标识
q.put(None)
joinablequeue模块
from multiprocessing import JoinableQueue #继承了multiprocessing.Queue 类,新添加了join(),q.task_done() q = JoinableQueue() q.join()#等待q.task_done的返回结果 q.task_done()#用于消费者,表示每消费队列中一个数据,就给join返回一个标识 from multiprocessing import JoinableQueue ,Process def consumer(q,name): while 1: pro_info = q.get()#如果消费者不知道生产者停止生产,将会一直等待获取 if pro_info : print('%s拿走了%s' % (name,pro_info)) q.task_done()#从队列中,每拿走一个数据,就传给join发送一个标识,共十个数据,则十个标识 def producer(q,product): for i in range(10): pro_info = product + '的成品%s号' % str(i) q.put(pro_info) q.join()#记录生产了20个数据在队列中,此时阻塞等待着对列中的所有数据均被拿取 if __name__ == '__main__' : q = JoinableQueue(5)#规定队列最大为5 pro = Process(target=producer, args=(q,'版本一')) con = Process(target=consumer, args=(q,'小潘')) con.daemon = True#把消费者进程设为守护