设为首页 加入收藏

TOP

Python并发之多进程(二)
2018-10-19 16:45:53 】 浏览:46
Tags:Python 并发 进程
gs=(s,i)) p.start() print('-----主进程-------')

 

注意:其实信号量和锁类似,只是限制进程运行某个代码块的数量(锁为1个),并不是能限制并发的进程,如上述代码,一次性还是创建了20个进程

 

事件(Event)

from multiprocessing import Process,Event
import time, random
def eating(event):
    event.wait()
    print('去吃饭的路上...')

def makeing(event):
    print('做饭中')
    time.sleep(random.randint(1,2))
    print('做好了,快来...')
    event.set()

if __name__ == '__main__':
    event=Event()
    t1 = Process(target=eating,args=(event,))
    t2 = Process(target=makeing,args=(event,))
    t1.start()
    t2.start()
    # 做饭中
    # 做好了,快来...
    # 去吃饭的路上...
和线程事件几乎一致

 

 

进程队列(Queue)

进程队列是进程通讯的方式之一。使用multiprocessing 下的Queue

from multiprocessing import Process,Queue
import time
def func1(queue):
    while True:
        info=queue.get()
        if info == None:
            return 
        print(info)

def func2(queue):
    for i in range(10):
        time.sleep(1)
        queue.put('is %d'%i)
    queue.put(None) #结束的标志
if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=func1,args=(q,))
    p2 = Process(target=func2, args=(q,))

    p1.start()
    p2.start()

 

Queue类的方法,源码如下:

class Queue(object):
    def __init__(self, maxsize=-1):  #可以传参设置队列最大容量
        self._maxsize = maxsize

    def qsize(self): #返回当前时刻队列中的个数
        return 0

    def empty(self): #是否为空
        return False

    def full(self):    是否满了
        return False

    def put(self, obj, block=True, timeout=None): #放值,blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常
        pass

    def put_nowait(self, obj):  #=put(False)
        pass

    def get(self, block=True, timeout=None): 获取值,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.
        pass

    def get_nowait(self): # = get(False)
        pass

    def close(self): #将队列关闭
        pass

    def join_thread(self): #略,几乎不用
        pass

    def cancel_join_thread(self):
        pass
进程队列源码注释

 

进程池

  进程的消耗是很大的,因此我们不能无节制的开启新进程,因此我们可以通过维护一个进程池来控制进程的数量。这就不同于信号量,进程池可以从源头控制进程数量。在Python中可以通过如下方法使用

同步调用

from multiprocessing import Pool
import time, random, os
def func(n):
    pid = os.getpid()
    print('进程%s正在处理第%d个任务'%(pid,n),'时间%s'%time.strftime('%H-%M-%S'))
    time.sleep(2)
    res = '处理%s'%random.choice(['成功','失败'])
    return res

if __name__ == '__main__':
    p = Pool(4) #创建4个进程,
    li = []
    for i in range(10):
        res = p.apply(func,args=(i,)) 交给进程池处理,处理完成才返回值,会阻塞,即使池内还有空余进程,相当于顺序执行
        li.append(res)

   
    for i in li:
        print(i)

#进程1916正在处理第0个任务 时间21-02-53
#进程1240正在处理第1个任务 时间21-02-55
#进程3484正在处理第2个任务 时间21-02-57
#进程7512正在处理第3个任务 时间21-02-59
#进程1916正在处理第4个任务 时间21-03-01
#进程1240正在处理第5个任务 时间21-03-03
#进程3484正在处理第6个任务 时间21-03-05
#进程7512正在处理第7个任务 时间21-03-07
#进程1916正在处理第8个任务 时间21-03-09
#进程1240正在处理第9个任务 时间21-03-11

 

从结果可以发现两点:

  1. 不是并发处理
  2. 一直都只有四个进程,串行执行

 

因此进程池提供了异步处理的方式

from multiprocessing import Pool
import time, random, os
def func(n):
    pid = os.getpid()
    print('进程%s正在处理第%d个任务'%(pid,n),'时间%s'%time.strftime('%H-%M-%S'))
    time.sleep(2)
    res = '处理%s'%random.choice(['成功','失败'])
    return res

if __name__ == '__main__':
    p = Pool(4)
    li = []
    for i in range(10):
        res = p.apply_async(func,args=(i,)) 结果不会立刻返回,遇到阻塞,开启下一个进程,在这,相当于几乎同时出现四个打印结果(一个线程处理一个任务,处理完下个任务才能进来)
        li.append(res)

    p.close() #join之前需要关闭进程池
    p.join()  #因为异
首页 上一页 1 2 3 下一页 尾页 2/3/3
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Python开发【第四篇】函数 下一篇Python算法基础

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目