Mrli
别装作很努力,
因为结局不会陪你演戏。
Contacts:
QQ博客园

Python多进程

2019/10/07 Python
Word count: 2,567 | Reading time: 12min

Python多进程

守护进程

守护进程其实就是**“子进程“是否伴随主进程一起结束**:守护==>伴随,即守护进程会伴随主进程的代码运行完毕后而死掉

进程:当父进程需要将一个任务并发出去执行,需要将该任务放到以个子进程里
守护:当该子进程内的代码在父进程代码运行完毕后就没有存在的意义了,就应该
将该子进程设置为守护进程,会在父进程代码结束后死掉

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from multiprocessing import Process
import time
def foo():
print(123)
time.sleep(1)
print("end123")

def bar():
print(456)
time.sleep(3)
print("end456")

if __name__ == '__main__':
p1=Process(target=foo)
p2=Process(target=bar)

# 将子进程p1设置为守护进程,守护进程要放在进程start之前
# 所以p1会在print("main-------")打印完成后死掉,所以p1进程不会打印
p1.daemon=True
p1.start()
p2.start()
# time.sleep(1)
print("main-------")

进程锁

主要使用multiprocessing下的Lock对象

就是将要执行任务的部门代码(只涉及到修改共享数据的代码)变成串行,作用是让进程不乱掉,下面代码就是可以避免 i 乱打印

1
2
3
4
5
6
7
8
9
10
11
12
13
from multiprocessing import Process, Lock

def f(l, i):
l.acquire() #锁住进程
try:
print('hello world', i)
finally:
l.release() #释放锁

if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()

(理解为用锁来限制,同一时间只能让一个人拿着锁去改数据,先抢到锁的人
就有优先购买的权限)

1
2
3
4
# 主要就是用的这三个
lock = lock()
lock.acquire() # 拿钥匙,开门
lock.release() #还钥匙,关门

抢票demo

写法一

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#模拟抢票软件的原理:
from multiprocessing import Lock,Process
import json,os,time,random

def check():
#查票功能实现并行访问
time.sleep(1)
with open('db.txt','rt',encoding='utf-8') as f:
dic=json.load(f)
print('%s 查看余票数为 %s'%(os.getpid(),dic['count']))

def get():
#购票因为牵涉到对后台数据的修改,所以加互斥锁目的是逐一进行访问修改,以免数据错乱
with open('db.txt', 'rt',encoding='utf-8') as f:
dic = json.load(f)
time.sleep(2)
if dic['count'] >0:
#有票
dic['count']-=1
time.sleep(random.randint(1, 3))
#在购票时,模拟网络延迟...
with open('db.txt', 'wt',encoding='utf-8') as f:
json.dump(dic,f)
print('%s 购票成功'%os.getpid())
else:
print('%s 没有余票'%os.getpid())

def task(mutex):
#查看(并行访问)
check()
#抢票(加入互斥锁,实现串行访问,先到先得原则)
mutex.acquire()
get()
mutex.release() #第一个购买完成后,解锁,后续进入继续购买


if __name__ == '__main__':
mutex=Lock() #调用Lock类拿到一个对象
for i in range(10):
p=Process(target=task,args=(mutex,))
p.start()

写法二:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# 锁

# 火车票
import json
import time
from multiprocessing import Process
from multiprocessing import Lock

def show(i):
with open('ticket') as f:
dic = json.load(f)
print('余票: %s'%dic['ticket'])

def buy_ticket(i,lock):
lock.acquire() #拿钥匙进门
with open('ticket') as f:
dic = json.load(f)
time.sleep(0.1)
if dic['ticket'] > 0 :
dic['ticket'] -= 1
print('\033[32m%s买到票了\033[0m'%i)
else:
print('\033[31m%s没买到票\033[0m'%i)
time.sleep(0.1)
with open('ticket','w') as f:
json.dump(dic,f)
lock.release() # 还钥匙

if __name__ == '__main__':
for i in range(10):
p = Process(target=show,args=(i,))
p.start()
lock = Lock()
for i in range(10):
p = Process(target=buy_ticket, args=(i,lock))
p.start()

进程间的通信**

用一块儿共享的内存==>实现进程间的共享

特点:

  • 一定是内存空间
  • 能够自动帮忙处理锁的问题

实现进程间的通讯(IPC)的方式有很多种,如:管道(Pipe),消息队列(Queue: == PIPE+锁(队列)),共享内存,信号,信号量,套接字

管道 消息队列 共享内存
开辟空间 内存 内存 内存
读写方式 两端读写[双向/单向] 先进先出 覆盖之前的内容
效率 一般 一般 较高
应用 多用于父子进程 广泛灵活 需要注意互斥

注意:
1.队列占用的是内存空间
2.不应该往队列中放大数据,应该只存放数据量较小的精简的内容

生产者消费者模型

生产者:比喻的是程序中负责产生数据的任务
消费者:比喻的是程序中负责处理数据的任务

生产者———>共享的介质(队列)<————消费者

Q:作用是什么?
A: 实现生产者与消费者的解耦和,生产者可以不停的生产,消费者也可以不停的消费从而平衡了生产者的生产能力与消费者消费能力,提升了而整体运行的效率

Q:什么时候用?
A:当我们程序中存在明显的两类任务,一类是负责产生数据,一类是负责处理数据,此时就应该考虑使用生产者消费者模型来提升程序的效率

多进程中的join()

在进程中可以阻塞主进程的执行, 直到等待子线程全部完成之后, 才继续运行主线程后面的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import threading
import time
def test(num):
time.sleep(1)
print(num)
#定义一个用来装子线程的列表
threads = []
for i in range(5):
#target 指定子线程要执行的funtion, args 指定该funtion需要传入的参数
thread = threading.Thread(target = test, args = [i])
#上面的 thread 是一个个参数i都不同的线程, 现在把它一个个装进列表 threads 里面
threads.append(thread)
for i in threads:
#for 循环执行 threads 列表里面的全部线程, 没有用 join()线程是无序执行的,
# 就连最后一句print('end')可能比所有子线程都要先执行
i.start()
print('end')
'''
end
4
1
2
3
0
'''

将其修改为

1
2
3
4
5
6
7
8
9
10
11
12
for i in tsreads:
i.start()
i.join()
'''
0
1
2
3
4
end
[Finished in 5.2s]
'''

每个都会隔一秒的进行输出,因为for循环也是主进程操作,也会被阻塞直到子进程完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
for i in tsreads:
i.start()
for i in threads:
i.join()
'''
2
3
1
0
4
end
[Finished in 1.2s]
'''

在这里看一看出来, join()还是阻挡了主线程的执行, 让所有的子线程执行完毕之后再执行, 而且前面的子线程的执行都是无序地执行完毕了

△.一般来说,join函数应该在所有的start函数之后(即一般情况下,让所有进程都执行,然后等待子进程结束,再进行下面的主进程)。

Python多进程之Manager

可以用来进程间共享对象、资源、变量===>使用公共内存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from multiprocessing import Process,Manager
import os
# 这里实现的就是多个进程之间共享内存,并修改数据
# 这里不需要加锁,因为manager已经默认给你加锁了

def f(d,l):
d[1] = '1'
d['2'] = 2
d[0.25] = None
l.append(os.getpid())
print(l)

if __name__ == '__main__':
with Manager() as manager:
d = manager.dict() #生成一个字典
l = manager.list(range(5)) #生成一个列表
p_list = []
for i in range(10):
p = Process(target=f,args=(d,l))
p.start()
p_list.append(p)
for res in p_list:
res.join()
print(d)
print(l)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
from multiprocessing import Process, Lock, Value
from multiprocessing.managers import BaseManager


class Employee(object):
def __init__(self, name, salary):
self.name = name
self.salary = Value('i', salary)
self.data = []

def increase(self):
self.salary.value += 100
self.data.append(self.salary.value)
print(self.data)

def getPay(self):
return self.name + ':' + str(self.salary.value)


class MyManager(BaseManager):
'''
继承即可
'''
pass


def Manager():
m = MyManager()
m.start()
return m

MyManager.register('Employee', Employee)


def func(em, lock):
with lock:
em.increase()


if __name__ == '__main__':
manager = Manager()
em = manager.Employee('zhangsan', 1000)
lock = Lock()
proces = [Process(target=func, args=(em, lock)) for i in range(10)]
for p in proces:
p.start()
for p in proces:
p.join()
print(em.getPay())
'''
>>>
[1100]
[1100, 1200]
[1100, 1200, 1300]
[1100, 1200, 1300, 1400]
[1100, 1200, 1300, 1400, 1500]
[1100, 1200, 1300, 1400, 1500, 1600]
[1100, 1200, 1300, 1400, 1500, 1600, 1700]
[1100, 1200, 1300, 1400, 1500, 1600, 1700, 1800]
[1100, 1200, 1300, 1400, 1500, 1600, 1700, 1800, 1900]
[1100, 1200, 1300, 1400, 1500, 1600, 1700, 1800, 1900, 2000]
zhangsan:2000
'''

Queue

其实就是一个队列(FIFO),两个进程可以通过传入的参数q,来获得里面的内容,从而数据共享、通信。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#导入Queue,Process
from multiprocessing import Queue,Process
import os

#写入进程
def wp(q):
print("%s开始写入:"%os.getpid)
for i in "WANG":
#将信息写入队列
q.put(i)
print(i)

#读取进程
def rd(q):
print("%s开始读取"%os.getpid())
while True:
if not q.empty():
#从队列读取信息
print("read to %s"%q.get())

if __name__=="__main__":
#创建队列
q = Queue()

#创建写入进程
w = Process(target=wp,args=(q,))
#启动写入进程
w.start()

#创建读取进程
r = Process(target=rd,args=(q,))
#启动读取进程
r.start()

Pipe管道

和队列的功能差不多,实现两个进程之间数据的传递,只不过是FILO的:

pipe

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
'''管道pipe'''
from multiprocessing import Process, Pipe

def f(conn):
conn.send('hello') #发送hello
print(conn.recv()) #收到牛逼
conn.close() #关闭

if __name__ == '__main__':
parent_conn, child_conn = Pipe() #管道会产生两个返回值
p = Process(target=f, args=(child_conn,)) #创建一个子进程
p.start()
print(parent_conn.recv()) #收到'hello'"
parent_conn.send('牛逼') #发送牛逼
p.join()

借鉴:

Python守护进程、进程互斥锁、进程间通信ICP(Queue队列)、生产者消费者模型

多进程 之 join()

Java 多线程中两个线程交替执行

Python多进程消息队列实现进程间通讯

Queue详细内容

Author: Mrli

Link: https://nymrli.top/2019/10/06/Python多进程/

Copyright: All articles in this blog are licensed under CC BY-NC-SA 3.0 unless stating additionally.

< PreviousPost
蒙特卡洛树搜索MCTS
NextPost >
Openmv使用
CATALOG
  1. 1. Python多进程
    1. 1.1. 守护进程
    2. 1.2. 进程锁
      1. 1.2.1. 抢票demo
    3. 1.3. 进程间的通信**
    4. 1.4. 生产者消费者模型
    5. 1.5. 多进程中的join()
    6. 1.6. Python多进程之Manager
    7. 1.7. Queue
    8. 1.8. Pipe管道
    9. 1.9. 借鉴: