源作者:埋头苦干的小码农
下面程序的运行结果是multiprocessing模块
解释:综合的处理进程的包
multiprocess模块下的Process模块:和创建进程相关
创建一个进程
# 创建一个进程
from multiprocessing import Process # 导入创建进程模块
def func(): # 定义一个函数
print('我是一个进程')
if __name__ =='__main__':
p = Process(target=func) # 创建一个进程对象p = Process(target=func),target:目标函数名
p.start() # 知道执行start才有了进程
# 结果
# 我是一个进程
证明创建一个进程的方法
import os
from multiprocessing import Process # 导入创建进程模块
def func(): # 定义一个函数
print('子进程:', os.getpid())
print('我是一个进程')
if __name__ =='__main__':
p = Process(target=func) # 创建一个进程对象p = Process(target=func),target:目标函数名
print('主进程', os.getpid())
p.start() # 知道执行start才有了进程
print('主进程', os.getpid())
# 结果
# 主进程 23756
# 主进程 23756
# 子进程: 17900
# 我是一个进程
上程序运行方法异步程序
import os # 1
from multiprocessing import Process # 2
def func(): # 3
print('子进程:', os.getpid())
print('我是一个进程')
if __name__ =='__main__':
p = Process(target=func) # 4
print('主进程', os.getpid()) # 5
p.start() # 6 解释:只是告知操作系统我要创建程序(至于操作系统做不做我们无法控制)PS:异步
print('主进程', os.getpid()) # 7
# 运行方法见标号
异步进程传参
# 异步进程传参
import os
from multiprocessing import Process
def func(exes):
print('我是一个%s'%exes)
if __name__ =='__main__':
p = Process(target=func, args=('name',)) # 注意进程传参必须是传一个元组,即时只有一个值也要写成(123,)形式
print('我是第一顺序执行者')
p.start()
# 结果:
# 我是第一顺序执行者
# 我是一个name
# PS这是一个异步程序
join介绍(阻塞)PS:主进程会阻塞在jion出等待子进程运行结束
# join介绍(阻塞)
from multiprocessing import Process
def func(exes):
print('我是一个%s'%exes)
if __name__ =='__main__':
p = Process(target=func, args=('name',)) # 注意进程传参必须是传一个元组,即时只有一个值也要写成(123,)形式
p.start()
p.join() # 调用join方法:实际是一个阻塞,只有子进程结束才能继续执行:实际是变成同步程序了
print('我是第一顺序执行者')
# 结果:
# 我是一个name
# 我是第一顺序执行者
# PS加上join变成同步程序
PS:在Windows操作系统中,创建进程的语句必须放在 if __name__ =='__main__':下面
开启多个子进程
开启多进程方法(可for循环开启多个进程)
# 开启多进程方法
import os
from multiprocessing import Process
def func():
print('子进程%s的主进程是%s' % (os.getpid(), os.getppid()))
if __name__ =='__main__':
p1 = Process(target=func)
p2 = Process(target=func)
p3 = Process(target=func)
p4 = Process(target=func)
p1.start()
p2.start()
p3.start()
p4.start()
print('------主进程%s-------'% os.getpid())
# 结果:
# ------主进程2104-------
# 子进程13440的主进程是2104
# 子进程7576的主进程是2104
# 子进程11600的主进程是2104
# 子进程14832的主进程是2104
证明进程杂乱的方法
# 证明进程杂乱的方法
import time
import os
from multiprocessing import Process
def func(i):
time.sleep(1)
print('%d 子进程%s的主进程是%s' % (i, os.getpid(), os.getppid()))
if __name__ =='__main__':
for i in range(10):
p = Process(target=func, args=(i,))
p.start()
print('------主进程%s-------' % os.getpid())
# 结果:
# ------主进程12556-------
# 0 子进程23168的主进程是12556
# 2 子进程22832的主进程是12556
# 1 子进程15976的主进程是12556
# 3 子进程14344的主进程是12556
# 4 子进程8216的主进程是12556
# 8 子进程22960的主进程是12556
# 6 子进程17812的主进程是12556
# 5 子进程22932的主进程是12556
# 9 子进程4696的主进程是12556
# 7 子进程14492的主进程是12556
# 子进程的运行顺序是杂乱无章的
控制先输出子进程再输出主进程方法
import time
import os
from multiprocessing import Process
def func(i):
time.sleep(1)
print('%d 子进程%s的主进程是%s' % (i, os.getpid(), os.getppid()))
if __name__ =='__main__':
p_lst = []
for i in range(10):
p = Process(target=func, args=(i,))
p.start()
p_lst.append(p) # 创建一个列表,将所有子进程地址装入列表中
for j in p_lst:
j.join()
print('------主进程%s-------' % os.getpid())
# 结果:
# 3 子进程17612的主进程是23516
# 2 子进程18616的主进程是23516
# 1 子进程16812的主进程是23516
# 0 子进程21592的主进程是23516
# 5 子进程23480的主进程是23516
# 4 子进程18064的主进程是23516
# 6 子进程16284的主进程是23516
# 8 子进程4524的主进程是23516
# 7 子进程21924的主进程是23516
# 9 子进程13000的主进程是23516
# ------主进程23516-------
# 控制先运行子进程再运行主进程方法
创建进程的其他方法class方法
# 创建进程的其他方法class方法
import os
from multiprocessing import Process
class MyProcess(Process): # PS:必须创建一个类(可以叫任何名字)但必须继承Process类
def __init__(self, arg1, arg2, arg3): # 传进子进程的方法
super().__init__() # PS:继承父类(Process)的init方法
self.arg1 = arg1
self.arg2 = arg2
self.arg3 = arg3
def run(self): # PS:必须有一个run方法
print('子进程:{} 参数是{} {} {}'.format(os.getpid(), self.arg1, self.arg2, self.arg3))
self.work() # 在子进程中调用work方法的方法
def work(self):
print('我是work方法', os.getpid())
if __name__ == '__main__':
p = MyProcess(1, 2, 3) # 传参方法
p.start() # 默认调用run方法
p.work() # work直接在主进程中调用,并没有在子进程中执行 是同步
print('主进程%s' % os.getpid())
# 结果:
# 我是work方法 6220
# 主进程6220
# 子进程:18236 参数是1 2 3
# 我是work方法 18236
# 使用时注意:实例化MyProcess得到一个对象
# 使用对象调用start方法
# 传参直接在实例化对象的时候传参
守护进程daemon
定义:守护进程会随着主进程的代码执行结束而结束,不等待其他子进程结束
如果行等待子进程完成(在主进程最后加join方法)
正常的子进程没有执行完的时候主进程要一直等着
守护进程
import time
from multiprocessing import Process
def func():
while True:
time.sleep(1)
print('过了一秒')
if __name__ == '__main__':
p = Process(target=func)
p.daemon = True # 一定在开启进程之前设置
p.start()
for i in range(100):
time.sleep(0.1)
print('*' * i)
# 注意
# 守护进程要在start之前设置
# 守护进程中不能再开启子进程
进程的其他方法
进程两方法p.is_alive()/p.terminate()
# p.is_alive() # 判断进程是否还活着True代表进程还在,False代表进程不在了
# p.terminate() # 结束一个进程,但是这个进程不会立即被杀死(异步操作)
import time
from multiprocessing import Process
def func():
print('子进程开始')
time.sleep(5)
print('子进程结束')
if __name__ == '__main__':
p = Process(target=func)
p.start()
print(p.is_alive()) # 判断进程是否还活着True代表进程还在,False代表进程不在了
time.sleep(1)
p.terminate() # 结束一个进程,但是这个进程不会立即被杀死(异步操作)
print(p.is_alive())
进程两属性name/pid
# name:查看进程名字为进程名重命名
# pid:查看进程id
# 函数中使用
import time
from multiprocessing import Process
def func():
print('子进程开始')
if __name__ == '__main__':
p = Process(target=func)
p.start()
print(p.name, p.pid)
p.name = '重命名进程名'
print(p.name)
# 结果:
# Process-1 16876
# 重命名进程名
# 子进程开始
# 类中使用
class MyProcess(Process):
def run(self):
print('子进程开始', self.name, self.pid)
if __name__ == '__main__':
p = MyProcess()
p.start()
# 结果
# 子进程开始 MyProcess-2 13684
锁Lock
作用:就是在并发编程中,保证数据安全
定义锁和使用锁
from multiprocessing import Lock
lock = Lock() # 实例化一个锁
lock.acquire() # 需要锁==拿钥匙
lock.release() # 释放锁==还钥匙
使用锁
from multiprocessing import Lock
from multiprocessing import Process
def func(k, lock):
lock.release()
print(k)
lock.acquire()
if __name__ == '__main__':
lock = Lock()
for i in range(10):
p = Process(target=func, args=(i, lock))
p.start()
信号量Semaphore
信号量应用
import time
import random
from multiprocessing import Process
from multiprocessing import Semaphore # 导入信号量
def toilet(i, sem):
sem.acquire() # 需要锁==拿钥匙
print('%d进厕所' % i)
time.sleep(random.randint(1, 5))
print('%d出厕所' % i)
sem.release() # 释放锁==还钥匙
if __name__ == '__main__':
sem = Semaphore(4) # 实例化信号量并定义钥匙数量
for i in range(10):
p = Process(target=toilet, args=(i, sem))
p.start()
事件Event
事件基础
# 事件基础
from multiprocessing import Event # 导入事件
e = Event() # 实例化一个事件
e.set() # 将标志变成非阻塞
e.wait() # 刚实例化出的一个事件对象,默认的信号是阻塞信号
e.clear() # 将标志又变成阻塞
PS:e.is_set() # 是否阻塞 True就是非阻塞, False就是阻塞
红绿灯事件
import time
import random
from multiprocessing import Event
from multiprocessing import Process
def traffic_car(e): # 控制红绿灯
while True:
if e.is_set():
time.sleep(3)
print('红灯亮')
e.clear() # 绿变红
else:
time.sleep(3)
print('绿灯亮')
e.set() # 红变绿
def car(i, e):
e.wait()
print('%s车通过' % i)
if __name__ == '__main__':
e = Event() # 立一个红绿灯
tra = Process(target=traffic_car, args=(e,)) # 实例化一个进程
tra.start() # 启动一个进程来控制红绿灯
for i in range(100):
if i % 6 == 0:
time.sleep(random.randint(1, 3))
car_lll = Process(target=car, args=(i, e)) # 实例化一辆车
car_lll.start() # 启动车辆
队列Queue
1》进程之间通信,可以使用multiprocessing的Queue模块
下面程序的运行结果是a=10 def func2》队列有两种创建方式,第一种不传参数,这种队列没有长度限制,第二种传参数,创建一个有最大长度限制的队列q = Queue(3)
3》提供两种重要方法:put get
4》qsize 有的操作系统会报错,有时不准(查看队列大小)
5》通过队列可以实现主进程与子进程的通信, 子进程与子进程之间的通信
队列简单使用
from multiprocessing import Queue # 引用队列
q = Queue() # 实例化队列
q.put(1) # 往队列放数据
print(q.get()) # 在队列中取数据
主进程与子进程之间的通信
from multiprocessing import Queue
from multiprocessing import Process
def q_put(q):
q.put('hello')
if __name__ == '__main__':
q = Queue()
p = Process(target=q_put, args=(q,))
p.start()
print(q.get())
# 结果:
# hello
子进程与子进程之间的通信
from multiprocessing import Queue
from multiprocessing import Process
def q_put(q):
q.put('hello')
def q_get(q):
print(q.get())
if __name__ == '__main__':
q = Queue()
p = Process(target=q_put, args=(q,))
p.start()
p2 = Process(target=q_get, args=(q,))
p2.start()
# 结果:
# hello
初级生产者消费者模型
生产者消费者初阶模型
import time
import random
from multiprocessing import Queue
from multiprocessing import Process
def producer(q, food): # 生产者
for i in range(5):
q.put('%s-%s' % (food, i))
print('生产了%s' % food)
time.sleep(random.random()) # 模拟生产时间
q.put(None)
q.put(None)
q.put(None)
def consumer(q,name): # 消费者
while True:
food = q.get() # 生产者不生产还是生产的慢
if food == None:
break
print('%s 吃了 %s' % (name, food))
if __name__ == '__main__':
q = Queue()
p1 = Process(target=producer, args=(q, '牛奶')).start()
p2 = Process(target=producer, args=(q, '面包')).start()
c1 = Process(target=consumer, args=(q, 'li')).start()
c2 = Process(target=consumer, args=(q, 'wang')).start()
c3 = Process(target=consumer, args=(q, 'sun')).start()
解读生产者消费者模型
# 解读
# 队列是安全的
# 现在是通过queue
# 生产者消费者模型问题:
# 消费者要处理多少数据是不确定的
# 故只能用while循环来处理数据,但是while循环无法结束
# 故需要生产者来发送信号
# PS:有多少消费者,就需要发送多少信号
# 但是发送的信号数量需要根据生产者消费者的数量进行计算,所以非常不方便
生产者消费者模型进阶
import time
import random
from multiprocessing import JoinableQueue
from multiprocessing import Process
def producer(q, food): # 生产者
for i in range(5):
q.put('%s-%s' % (food, i))
print('生产了%s' % food)
time.sleep(random.random()) # 模拟生产时间
q.join() # 等待消费者把所有的数据处理完
def consumer(q,name): # 消费者
while True:
food = q.get() # 生产者不生产还是生产的慢
print('%s 吃了 %s' % (name, food))
q.task_done() # 消费者每处理完一条数据通知生产者
if __name__ == '__main__':
q = JoinableQueue()
p1 = Process(target=producer, args=(q, '牛奶')).start()
p2 = Process(target=producer, args=(q, '面包')).start()
c1 = Process(target=consumer, args=(q, 'li'))
c1.daemon = True
c1.start()
c2 = Process(target=consumer, args=(q, 'qun'))
c2.daemon = True
c2.start()
c3 = Process(target=consumer, args=(q, 'mun'))
c3.daemon = True
c3.start()
p1.join() # 等待p1执行完毕
p2.join() # 等待p2执行完毕
管道Pipe
特点:双向通信
简单使用
from multiprocessing import Pipe
p1, p2 = Pipe() # 支持双向通信
p1.send('abc') # 发送
print(p2.recv()) # 接收
p2.send('123')
print(p1.recv())
p1.close() # 关闭管道
PS:EOFError 报错
from multiprocessing import Pipe
p1, p2 = Pipe() # 支持双向通信
p1.send('abc') # 发送
print(p2.recv()) # 接收
p2.send('123')
print(p1.recv())
p2.close() # 关闭管道
print(p1.recv())
# 结果 EOFError 报错
# 解释:管道中已经无数据还要继续读取报EOFError错误,后期可以以此错误来关闭管道,(try:……except)
子进程与主进程通信
from multiprocessing import Process
from multiprocessing import Pipe
def func(p):
foo, son = p
foo.close()
while True:
try:
print(son.recv())
except EOFError:
break
if __name__ == '__main__':
foo, son = Pipe()
p = Process(target=func, args=((foo, son),))
p.start()
son.close()
foo.send('hello')
foo.close()
进程池Pool
# 进程池出现原因:开过多的进程1、开启进程浪费时间 2、操作系统调度太多的进程也会影响效率
# 进程池理解:有几个现成的进程在池子里,有任务来了,就用这个池子中的进程去处理任务,
# 任务处理之后,再把进程放回池子里,池子中的进程就能去处理其它任务了。
# 当所有的任务都都处理完了,进程池关闭,回收所有的进程
开进进程池进程数 = cpu核数 + 1
检查cpu办法
import os
print(os.cpu_count())
开启进程池简易方法
import os
print(os.cpu_count())
import os
import random
import time
from multiprocessing import Pool
def func(i):
print(i, os.getpid())
time.sleep(random.randint(1, 3))
if __name__ == '__main__':
p = Pool(5) # 实例化进程池中的进程数
p.map(func, range(20)) # 开启进程
p.close() # 不允许向进程池中添加任务 为保证在进程池工作完成前不会有其他任务进入
p.join()
print('========>>>>>')
# 解释: 进程池一次开五个进程,那个进程执行完便执行下一个任务,且不会出现阻塞情况
# 结果
# 0 6080
# 1 5668
# 2 8016
# 3 3524
# 4 2992
# 5 6080
# 6 6080
# 7 3524
# 8 5668
# 9 8016
# 10 3524
# 11 2992
# 12 6080
# 13 3524
# 14 6080
# 15 5668
# 16 5668
# 17 8016
# 18 3524
# 19 2992
# ========>>>>>
开启进程池与开启多进程时间对比
# 开启进程池与开启多进程时间对比
import time
from multiprocessing import Process
from multiprocessing import Pool
def func(i):
i = i + 1
if __name__ == '__main__':
start1 = time.time()
p = Pool(5)
p.map(func, range(100))
print(time.time() - start1)
start2 = time.time()
l = []
for i in range(100):
pr = Process(target=func, args=(i, ))
pr.start()
l.append(pr)
for j in l:
j.join()
print(time.time() - start2)
# 结果
# 0.26270198822021484
# 5.297751426696777
apply进程池同步提交任务机制
import time
from multiprocessing import Pool
def func(i):
time.sleep(1)
i += 1
print(i)
if __name__ == '__main__':
p = Pool(5)
for i in range(20):
p.apply(func, args=(i, )) # apple是同步提交任务机制
apply_async进程池异步提交任务机制
# apply_async进程池异步提交任务机制
import time
from multiprocessing import Pool
def func(i):
time.sleep(1)
i = i + 1
print(i)
if __name__ == '__main__':
p = Pool(5)
for i in range(20):
p.apply_async(func, args=(i, )) # apply_async是异步提交任务的机制
p.close() # close必须加在join之前,不允许再添加新的任务了
p.join() # 等待子进程结束再往下执行
回调函数
import os
import time
from multiprocessing import Pool
def func(i): # 多进程中的IO多
print('子进程%s:%s' % (i, os.getpid()))
return i * '*'
def call(arg): # 回调函数是在主进程中完成的,不能传参数,且只能接受多进程中函数的返回值
print('回调:', os.getpid())
print(arg)
if __name__ == '__main__':
print('---->', os.getpid())
p = Pool(5)
for i in range(10):
p.apply_async(func, args=(i, ), callback=call)
p.close()
p.join()
#python#
#开发#
#软件开发#
#Python编程从入门到实践#
#科技新星创作营#