1.进程同步/串行(锁)
进程之间数据不共享,但共享同一套文件系统,所以访问同一个文件,或同一个打印终端,没有问题,但共享带来的是竞争容易错乱,如抢票时。这就需让进程一个个的进去保证数据安全,也就是加锁处理,Lock
并发,效率高,但是竞争同一个文件时,导致数据混乱
加锁,由并发改成了串行,牺牲了运行效率,但避免数据竞争
以模拟抢票为例:
1 # 注意:首先在当前文件目录下创建一个名为db的文件 2 # 文件db的内容为:{"count":1},只有这一行数据,并且注意,每次运行完了之后,文件中的1变成了0,你需要手动将0改为1,然后在去运行代码。 3 # 注意一定要用双引号,不然json无法识别 4 # 加锁保证数据安全,不出现混乱 5 from multiprocessing import Process,Lock 6 import time,json,random 7 8 9 # 查看剩余票数 10 def search(i): 11 dic=json.load(open('db')) # 打开文件,直接load文件中的内容,拿到文件中的包含剩余票数的字典 12 print('客户%s查看剩余票数%s' %(i,dic['count'])) 13 14 15 # 抢票 16 def get(i): 17 dic = json.load(open('db')) 18 time.sleep(0.1) # 模拟读数据的网络延迟,那么进程之间的切换,所有人拿到的字典都是{"count": 1},也就是每个人都拿到了这一票。 19 if dic['count'] > 0: 20 dic['count'] -= 1 21 time.sleep(random.randint(0,1)) # 模拟写数据的网络延迟 22 json.dump(dic,open('db','w')) 23 # 若不加限制最终导致,每个人显示都抢到了票,这就出现了问题 24 print('客户%s购票成功'%i) 25 else: 26 print('sorry,客户%s 没票了亲!'%i) 27 28 29 def task(i,lock): 30 search(i) 31 # 抢票时是发生数据变化的时候,所以我们将锁加到这里,让进程串行执行 32 lock.acquire() # 加锁 33 get(i) 34 lock.release() # 解锁 35 36 37 if __name__ == '__main__': 38 lock = Lock() # 创建一个锁 39 for i in range(10): # 模拟并发10个客户端抢票 40 p = Process(target=task,args=(i,lock,)) # 将锁作为参数传给task函数 41 p.start()
总结:
加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,速度是慢了,但牺牲了速度却保证了数据安全。
因此需一种解决方案能够兼顾:1、效率高(多个进程共享一块内存的数据)2、帮我们处理好锁问题。这就是 mutiprocessing 模块提供的基于消息的IPC通信机制:队列和管道(见后续)。
2.进程守护
子进程是不会随着主进程结束而结束,子进程全部执行完后,程序才结束,那如果需求主进程结束,子进程必须跟着结束,怎么办?这就需要用到守护进程了!
运用:如,系统关机,其他一切都要跟着结束
1 import time 2 from multiprocessing import Process 3 4 def func1(m): 5 time.sleep(1) 6 print('我是func1',m) 7 8 9 # 注意:进程之间是互相独立的,主进程代码运行结束,不管有没有运行完,守护进程随即终止 10 if __name__ == '__main__': 11 p = Process(target=func1,args=(666,)) 12 p.daemon = True # 守护进程,在start之前 13 p.start() 14 15 print('主进程执行结束')
总结:
其一:守护进程会在主进程代码执行结束后就终止
其二:守护进程内无法再开启子进程,否则出异常
3.队列
进程之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持 队列和管道,这两种方式都是使用消息传递队列就像一个特殊的列表,但是可以设置固定长度,并且从前面插入数据,从后面取出数据,先进先出,取出就没有这个数据了。
方法介绍:
1 ''' 2 q = Queue([maxsize]) 3 创建共享的进程队列。maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。底层队列使用管道和锁定实现。另外,还需要运行支持线程以便队列中的数据传输到底层管道中。 4 Queue的实例q具有以下方法: 5 6 q.get( [ block [ ,timeout ] ] ) 7 返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。 8 9 q.get_nowait( ) 10 同q.get(False)方法。 11 12 q.put(item [, block [,timeout ] ] ) 13 将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。 14 15 q.qsize() 16 返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引发NotImplementedError异常。 17 18 19 q.empty() 20 如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。 21 22 q.full() 23 如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty()方法) 24 25 '''
示例代码:
1 from multiprocessing import Process,Queue 2 # Queue 先进先出 fifo first in first out,队列里面的数据,只能取一次,取出就没了 3 4 q = Queue(3) # Queue(参数)可理解成一个可限制长度(参数)的列表 5 # 添加数据 6 # print(q.full()) 7 q.put(4) 8 q.put(3) 9 q.put(2) 10 # print(q.full()) # 查看序列是否满了,但不可信的(如多进程时) 11 12 # 取出数据 13 print('---