Python多进程
守护进程
守护进程其实就是**“子进程“是否伴随主进程一起结束**:守护==>伴随,即守护进程会伴随主进程的代码运行完毕后而死掉
进程:当父进程需要将一个任务并发出去执行,需要将该任务放到以个子进程里
守护:当该子进程内的代码在父进程代码运行完毕后就没有存在的意义了,就应该
将该子进程设置为守护进程,会在父进程代码结束后死掉
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 from multiprocessing import Processimport timedef foo () : print(123 ) time.sleep(1 ) print("end123" ) def bar () : print(456 ) time.sleep(3 ) print("end456" ) if __name__ == '__main__' : p1=Process(target=foo) p2=Process(target=bar) p1.daemon=True p1.start() p2.start() print("main-------" )
进程锁
主要使用multiprocessing下的Lock对象
就是将要执行任务的部门代码(只涉及到修改共享数据的代码)变成串行,作用是让进程不乱掉,下面代码就是可以避免 i 乱打印
1 2 3 4 5 6 7 8 9 10 11 12 13 from multiprocessing import Process, Lock def f (l, i) : l.acquire() try : print('hello world' , i) finally : l.release() if __name__ == '__main__' : lock = Lock() for num in range(10 ): Process(target=f, args=(lock, num)).start()
(理解为用锁来限制,同一时间只能让一个人拿着锁去改数据,先抢到锁的人
就有优先购买的权限)
1 2 3 4 lock = lock() lock.acquire() lock.release()
抢票demo
写法一
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 from multiprocessing import Lock,Processimport json,os,time,randomdef check () : time.sleep(1 ) with open('db.txt' ,'rt' ,encoding='utf-8' ) as f: dic=json.load(f) print('%s 查看余票数为 %s' %(os.getpid(),dic['count' ])) def get () : with open('db.txt' , 'rt' ,encoding='utf-8' ) as f: dic = json.load(f) time.sleep(2 ) if dic['count' ] >0 : dic['count' ]-=1 time.sleep(random.randint(1 , 3 )) with open('db.txt' , 'wt' ,encoding='utf-8' ) as f: json.dump(dic,f) print('%s 购票成功' %os.getpid()) else : print('%s 没有余票' %os.getpid()) def task (mutex) : check() mutex.acquire() get() mutex.release() if __name__ == '__main__' : mutex=Lock() for i in range(10 ): p=Process(target=task,args=(mutex,)) p.start()
写法二:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 import jsonimport timefrom multiprocessing import Processfrom multiprocessing import Lockdef show (i) : with open('ticket' ) as f: dic = json.load(f) print('余票: %s' %dic['ticket' ]) def buy_ticket (i,lock) : lock.acquire() with open('ticket' ) as f: dic = json.load(f) time.sleep(0.1 ) if dic['ticket' ] > 0 : dic['ticket' ] -= 1 print('\033[32m%s买到票了\033[0m' %i) else : print('\033[31m%s没买到票\033[0m' %i) time.sleep(0.1 ) with open('ticket' ,'w' ) as f: json.dump(dic,f) lock.release() if __name__ == '__main__' : for i in range(10 ): p = Process(target=show,args=(i,)) p.start() lock = Lock() for i in range(10 ): p = Process(target=buy_ticket, args=(i,lock)) p.start()
进程间的通信**
用一块儿共享的内存==>实现进程间的共享
特点:
实现进程间的通讯(IPC)的方式有很多种,如:管道(Pipe),消息队列(Queue: == PIPE+锁(队列)),共享内存,信号,信号量,套接字
管道
消息队列
共享内存
开辟空间
内存
内存
内存
读写方式
两端读写[双向/单向]
先进先出
覆盖之前的内容
效率
一般
一般
较高
应用
多用于父子进程
广泛灵活
需要注意互斥
注意:
1.队列占用的是内存空间
2.不应该往队列中放大数据,应该只存放数据量较小的精简的内容
生产者消费者模型
生产者:比喻的是程序中负责产生数据的任务
消费者:比喻的是程序中负责处理数据的任务
生产者———>共享的介质(队列)<————消费者
Q:作用是什么?
A: 实现生产者与消费者的解耦和,生产者可以不停的生产,消费者也可以不停的消费从而平衡了生产者的生产能力与消费者消费能力,提升了而整体运行的效率
Q:什么时候用?
A:当我们程序中存在明显的两类任务,一类是负责产生数据,一类是负责处理数据,此时就应该考虑使用生产者消费者模型来提升程序的效率
多进程中的join()
在进程中可以阻塞主进程的执行, 直到等待子线程全部完成之后, 才继续运行主线程后面的代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import threadingimport timedef test (num) : time.sleep(1 ) print(num) threads = [] for i in range(5 ): thread = threading.Thread(target = test, args = [i]) threads.append(thread) for i in threads: i.start() print('end' ) ''' end 4 1 2 3 0 '''
将其修改为
1 2 3 4 5 6 7 8 9 10 11 12 for i in tsreads: i.start() i.join() ''' 0 1 2 3 4 end [Finished in 5.2s] '''
每个都会隔一秒的进行输出,因为for循环也是主进程操作,也会被阻塞直到子进程完成。
1 2 3 4 5 6 7 8 9 10 11 12 13 for i in tsreads: i.start() for i in threads: i.join() ''' 2 3 1 0 4 end [Finished in 1.2s] '''
在这里看一看出来, join()还是阻挡了主线程的执行, 让所有的子线程执行完毕之后再执行, 而且前面的子线程的执行都是无序地执行完毕了
△.一般来说,join函数应该在所有的start函数之后(即一般情况下,让所有进程都执行,然后等待子进程结束,再进行下面的主进程)。
Python多进程之Manager
可以用来进程间共享对象、资源、变量===>使用公共内存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 from multiprocessing import Process,Managerimport osdef f (d,l) : d[1 ] = '1' d['2' ] = 2 d[0.25 ] = None l.append(os.getpid()) print(l) if __name__ == '__main__' : with Manager() as manager: d = manager.dict() l = manager.list(range(5 )) p_list = [] for i in range(10 ): p = Process(target=f,args=(d,l)) p.start() p_list.append(p) for res in p_list: res.join() print(d) print(l)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 from multiprocessing import Process, Lock, Valuefrom multiprocessing.managers import BaseManagerclass Employee (object) : def __init__ (self, name, salary) : self.name = name self.salary = Value('i' , salary) self.data = [] def increase (self) : self.salary.value += 100 self.data.append(self.salary.value) print(self.data) def getPay (self) : return self.name + ':' + str(self.salary.value) class MyManager (BaseManager) : ''' 继承即可 ''' pass def Manager () : m = MyManager() m.start() return m MyManager.register('Employee' , Employee) def func (em, lock) : with lock: em.increase() if __name__ == '__main__' : manager = Manager() em = manager.Employee('zhangsan' , 1000 ) lock = Lock() proces = [Process(target=func, args=(em, lock)) for i in range(10 )] for p in proces: p.start() for p in proces: p.join() print(em.getPay()) ''' >>> [1100] [1100, 1200] [1100, 1200, 1300] [1100, 1200, 1300, 1400] [1100, 1200, 1300, 1400, 1500] [1100, 1200, 1300, 1400, 1500, 1600] [1100, 1200, 1300, 1400, 1500, 1600, 1700] [1100, 1200, 1300, 1400, 1500, 1600, 1700, 1800] [1100, 1200, 1300, 1400, 1500, 1600, 1700, 1800, 1900] [1100, 1200, 1300, 1400, 1500, 1600, 1700, 1800, 1900, 2000] zhangsan:2000 '''
Queue
其实就是一个队列(FIFO),两个进程可以通过传入的参数q,来获得里面的内容,从而数据共享、通信。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 from multiprocessing import Queue,Processimport os def wp (q) : print("%s开始写入:" %os.getpid) for i in "WANG" : q.put(i) print(i) def rd (q) : print("%s开始读取" %os.getpid()) while True : if not q.empty(): print("read to %s" %q.get()) if __name__=="__main__" : q = Queue() w = Process(target=wp,args=(q,)) w.start() r = Process(target=rd,args=(q,)) r.start()
Pipe管道
和队列的功能差不多,实现两个进程之间数据的传递,只不过是FILO的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 '''管道pipe''' from multiprocessing import Process, Pipe def f (conn) : conn.send('hello' ) print(conn.recv()) conn.close() if __name__ == '__main__' : parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) parent_conn.send('牛逼' ) p.join()
借鉴:
Python守护进程、进程互斥锁、进程间通信ICP(Queue队列)、生产者消费者模型
多进程 之 join()
Java 多线程中两个线程交替执行
Python多进程消息队列实现进程间通讯
Queue详细内容