1.管道
进程间通信(IPC)方式二:管道(不推荐使用,了解即可),端口易导致数据不安全的情况出现。
1 from multiprocessing import Pipe,Process 2 3 4 def func(conn1,conn2): 5 msg = conn1.recv() # 接收了conn2传递的 6 # msg1 = conn2.recv() # 接收了conn1传递的 7 print('>>>',msg) 8 # print('>>>',msg1) 9 10 11 if __name__ == '__main__': 12 # 拿到管道的两端,双工通信方式,两端都可以收发消息 13 conn1,conn2 = Pipe() # 必须在Process之前产生管道 14 p = Process(target=func,args=(conn1,conn2,)) # 管道给子进程 15 p.start() 16 conn1.send('hello') 17 conn1.close() 18 conn2.send('小子') 19 conn2.close() 20 21 print('进程结束') 22 23 # 注意管道不用了就关闭防止异常
2.共享数据
进程之间数据共享的模块之一Manager模块(少用):
进程间数据是独立的,可以借助于队列或管道实现通信,二者都是基于消息传递的虽然进程间数据独立,但可以通过Manager实现数据共享:
1 from multiprocessing import Manager,Process,Lock 2 3 4 def func1(dic,loc): 5 # loc.acquire() # 不加锁易出错 6 dic['num'] -= 1 7 # loc.release() 8 9 10 if __name__ == '__main__': 11 m = Manager() 12 loc = Lock() 13 dic = m.dict({'num':100}) 14 p_list = [] 15 for i in range(100): 16 p = Process(target=func1, args=(dic,loc)) 17 p_list.append(p) 18 p.start() 19 20 [pp.join() for pp in p_list] 21 22 print('>>>>>',dic['num']) 23 # 共享时不加锁,很可能导致同一个数据被多个子进程取用,数据是不安全的,且超多进程消耗大量资源易导致卡死.
多进程共同去处理共享数据的时候,就和我们多进程同时去操作一个文件中的数据是一样的,不加锁就会出现错误的结果,进程不安全的,所以也需要加锁。
总结:进程间应该尽量避免通信,即便需要通信,也应该选择进程安全的工具来避免加锁带来的问题。
3.进程池 Pool
创建进程需要消耗时间,销毁进程(空间,变量,文件信息等等的内容)也需要消耗时间。开启成千上万的进程,操作系统无法让他们同时执行,维护一个很大的进程列表的同时,调度的时候,还需要进行切换并且记录每个进程的执行节点,也就是记录上下文(各种变量等等乱七八糟的东西,虽然你看不到,但是操作系统都要做),这样反而会影响程序的效率。因此我们不能无限制的根据任务开启或者结束进程,这就需要用到进程池:
定义一个池子,在里面放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务,等到处理完毕,进程并不关闭,而是将进程再放回进程池中继续等待任务。如果有很多任务需要执行,池中的进程数量不够,任务就要等待之前的进程执行任务完毕归来,拿到空闲进程才能继续执行。也就是说,池中进程的数量是固定的,那么同一时间最多有固定数量的进程在运行。这样不会增加操作系统的调度难度,还节省了开闭进程的时间,也一定程度上能够实现并发效果。
创建方法:
Pool([numprocess [,initializer [, initargs]]]):创建进程池
参数介绍:
1 numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值 2 initializer:是每个工作进程启动时要执行的可调用对象,默认为None 3 initargs:是要传给initializer的参数组
常用方法:
p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。 '''需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()''' p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。 '''此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。''' p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成 P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用 主要方法介绍
1 import time 2 from multiprocessing import Process,Pool 3 4 5 def func1(i): 6 num = 0 7 for j in range(5): 8 num += i 9 10 11 if __name__ == '__main__': 12 pool = Pool(6) # 创建进程池 13 14 p_list = [] 15 start_time = time.time() 16 for i in range(500): 17 p = Process(target=func1,args=(i,)) 18 p_list.append(p) 19 p.start() 20 21 [pp.join() for pp in p_list] 22 end_time = time.time() 23 print('耗时:',end_time-start_time) 24 25 s_time = time.time() 26 pool.map(func1,range(500)) # map 27 e_time = time.time() 28 print('耗时:',e_time - s_time) # 耗时远远小于直接开500进程
apply同步方法:
1 from multiprocessing import Process,Pool 2 import time 3 4 5 def func1(i): 6 num = 0 7 for j in range(3): 8 num += i 9 time.sleep(1) 10 print(num) 11 return num 12 13 14 if _