ThankNeko's Blog ThankNeko's Blog
首页
  • 操作系统

    • Linux基础
    • Linux服务
    • WindowsServer笔记
    • Ansible笔记
    • Shell笔记
  • 容器服务

    • Docker笔记
    • Kubernetes笔记
    • Git笔记
  • 数据库服务

    • MySQL笔记
    • ELK笔记
    • Redis笔记
  • 监控服务

    • Zabbix笔记
  • Web服务

    • Nginx笔记
    • Tomcat笔记
  • 数据处理

    • Kettle笔记
  • Python笔记
  • Bootstrap笔记
  • C笔记
  • C++笔记
  • Arduino笔记
  • 分类
  • 标签
  • 归档
  • 随笔
  • 关于
GitHub (opens new window)

Hoshinozora

尽人事,听天命。
首页
  • 操作系统

    • Linux基础
    • Linux服务
    • WindowsServer笔记
    • Ansible笔记
    • Shell笔记
  • 容器服务

    • Docker笔记
    • Kubernetes笔记
    • Git笔记
  • 数据库服务

    • MySQL笔记
    • ELK笔记
    • Redis笔记
  • 监控服务

    • Zabbix笔记
  • Web服务

    • Nginx笔记
    • Tomcat笔记
  • 数据处理

    • Kettle笔记
  • Python笔记
  • Bootstrap笔记
  • C笔记
  • C++笔记
  • Arduino笔记
  • 分类
  • 标签
  • 归档
  • 随笔
  • 关于
GitHub (opens new window)
  • Python笔记

    • 基础知识

    • 并发编程

      • 并发相关介绍
      • 多进程与进程间通信
        • 进程的创建和终止
        • Python子进程
        • 僵尸进程与孤儿进程
        • 守护进程
        • 互斥锁
        • 队列
        • IPC (进程间通信)
        • 生产者/消费者模型
      • 多线程与线程间通信
      • 其他锁与队列
      • 网络IO模型与协程
    • 爬虫笔记

    • 模块笔记

    • 后端笔记

  • C笔记

  • C++笔记

  • Arduino笔记

  • Web笔记

  • Dev
  • Python笔记
  • 并发编程
Hoshinozora
2023-02-25
目录

多进程与进程间通信

# 进程的创建和终止

# 创建进程介绍

创建进程会在内存中申请一块内存空间,将需要运行的代码丢进去运行。一个进程对应在内存中就是一块独立的内存空间。多个进程对应在内存中就是多块独立的内存空间。

进程与进程、主进程与子进程之间的数据默认情况下都是相互隔离,如果想要交互可以借助第三方工具/模块。

不同操作系统中创建进程所调用的底层函数不一样:

unix系统通过fork创建一个和父进程一样的副本。

windows系统通过内置的进程调度CreateProcess创建一个进程。

# 创建进程的方式

  1. 系统初始化,系统启动时操作系统会自启动一些内置进程,例如任务管理器、资源管理器等。
  2. 进程中开启子进程,一个进程中又开启一些子进程。
  3. 交互式请求,例如双击启动一个应用程序。
  4. 批处理作业的初始化,大型的任务开始时,会有一些批处理作业连串的执行启动进程。

# 终止进程的方式

  1. 正常退出,自己关闭软件。
  2. 出错退出,在启动阶段就报错导致退出,例如:启动阶段发现要启动的文件不存在。
  3. 严重错误退出,在执行到错误的指令时被迫退出,例如:int('a')。
  4. 被其他进程杀死,例如:通过kill命令杀死。

# Python子进程

# 创建子进程对象

# 导入多进程模块
from multiprocessing import Process

# 实例化一个进程对象,并指定目标函数和函数需要的参数
prc = Process(target=函数, args=(实参1, 实参2...))
1
2
3
4
5

target:指定子进程要执行的目标函数对象,可为空。

args:按位置传递目标函数所需的参数,可为空。

kwargs:按关键字传递目标函数所需的参数,可为空。

name:子进程名称,可为空。

# 启动子进程

[进程对象].start()

start方法会让进程对象自上而下再次运行该Python脚本的代码,另外附带执行进程对象创建时指定的函数。

还有一个[进程对象].run()方法,其作用一致。

例如:

import time
from multiprocessing import Process

def task(name):
    print("你的名字是: {}".format(name))
    time.sleep(3)
    print("{} is over.".format(name))

print("outside")

if __name__ == "__main__":
    prc = Process(target=task, args=('cris',))
    prc.start()
    print('test')

输出 >> outside
输出 >> test
输出 >> outside
输出 >> 你的名字是: cris
输出 >> cris is over.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

注意事项:

windows下,创建进程的代码一定要书写在main内创建,linux则中不用,但还是建议写在main之中以提高跨平台性。也就是:if __name__ == "__main__": ... ...

因为windows下没有fork,多处理模块会启动一个新的Python进程并导入模块进行调用,也就会再次读取并执行python脚本中的代码。如果此时不判断是不是在main内,那么等再次读取到创建进程的代码时,又会去创建进程,这就会进入一个死循环的状态,所以会报错。

# 等待子进程

[进程对象].join()

join方法会让主进程等待指定的子进程代码运行结束,再继续运行,已经在运行中的子进程不受影响。

需在start()后调用,join()会阻塞主进程,直到子进程子进程运行完。

例子:

import time
from multiprocessing import Process

def task(name, t):
    print("{}进程运行中...".format(name))
    time.sleep(t)
    print("{}进程运行结束!".format(name))

if __name__ == "__main__":
    prc1 = Process(target=task, args=('01', 1))
    prc2 = Process(target=task, args=('02', 2))
    start = time.time()
    # 启动子进程prc1
    prc1.start()
    # 启动子进程prc2
    prc2.start()
    # 如果不使用join()阻塞主进程,则会直接执行到下面的print()。
    prc1.join()
    prc2.join()
    # 执行到prc2.join()时,prc2已经运行了1秒,再过1秒就会继续执行print(),所以执行完只需要2秒
    print("主 " + str(time.time() - start))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

# 杀死子进程

[进程对象].terminate()

告诉操作系统去杀死进程对象的进程。

但杀死进程需要一定的时间,而代码的运行速度比之更快,所以如果杀死后需要判断进程是否存活,则需要sleep(0.1)等待一会再去判断结果才会准确。

# 判断子进程存活

[进程对象].is_alive()

判断进程对象的子进程是否存活。

# 进程对象常用属性

[进程对象].daemon:默认是False,当前进程是否开启守护进程。

[进程对象].name:获取子进程的名字。

[进程对象].pid:获取子进程的进程ID。

[进程对象].exitcode:获取子进程的退出状态码。

# 区分不同进程的方式

系统会给每一个运行的进程分配一个PID号,便于区分与管理不同的进程。

Python第一种查看方式(推荐)。

# 导入os模块
import os
# 获取当前进程的PID号
os.getpid()
# 获取当前进程的父进程的PID号
os.getppid()
1
2
3
4
5
6

Python第二种查看方式。

from multiprocecssing import current_process
# 获取当前进程的PID号
current_process().pid
1
2
3

windows系统查看方式。

cmd中执行tasklist命令。

linux系统查看方式。

shell中执行ps aux命令。

# 僵尸进程与孤儿进程

# 僵尸进程

僵尸进程就是指子进程死亡后,父进程和其他进程都没有管他,该子进程的资源没有得到正确的释放,从而导致占用持续资源。

解决方法:

  1. 主动杀死子进程。
  2. 子线程启动后,使用join方法,join方法在阻塞结束后会主动回收僵尸进程。
  3. 建立一个信号,当子进程结束后向主线程反馈信号,主线程获取到当前信号后主动杀死该子进程。

# 孤儿进程

孤儿进程就是在子进程结束之前,父进程先结束了,导致子进程与父进程之间的通信消失。操作系统会自行去管理孤儿进程,回收相关资源等。

# 守护进程

# 介绍

守护着某个进程的进程,随着所守护的进程的存活而存活,终止而终止。当主进程死后,子进程也会跟着结束运行。

# 实现方式

[进程对象].daemon = True
[进程对象].start()
1
2

# 例子

import time
from multiprocessing import Process
def task(name, t):
    print("{}进程运行中...".format(name))
    time.sleep(t)
    print("{}进程运行结束!".format(name))
if __name__ == "__main__":
    # 直接在创建进程对象时指定为守护进程
    prc1 = Process(target=task, args=('01', 2), daemon=True)
    # 或者在start()之前手动修改为守护进程
    prc1.daemon = True
    prc1.start()
    time.sleep(1)
    print("主")
1
2
3
4
5
6
7
8
9
10
11
12
13
14

子进程后面的那句话不会执行,是因为子进程已经跟着主进程死了,主进程执行完Print就死了。

# 互斥锁

# 介绍

针对多个进程,操作同一份数据的时候,会出现数据错乱的问题。

针对上述问题,解决方式就是加锁处理,加锁可以在特定时刻将并发变成串行,牺牲效率,但是保证了数据的安全。

注意:

锁不要轻易地使用,容易造成死锁现象,锁应当只在处理数据的部分加,以来保证数据的安全可靠性,因为加的锁越多,效率越低。

# 使用方法

# 先导入 Process 和 Lock
from multiprocessing import Process, Lock

def task(mutex):
    search()
    # 在遇到mutex.acquire()时,子进程需要抢到锁才会继续执行,否则会一直等待
    mutex.acquire()
    buy()
    # 执行完操作后释放锁,不然其他子进程会一直等待锁释放
    mutex.release()

if __name__ == '__main__':
    # 在主进程中生成一把锁,让所有的子进程抢,谁先抢到谁先执行
    mutex = Lock()
    # 在创建子进程对象时,将锁作为参数传给子进程
    p1 = Process(target=task, args=(mutex,))
    p2 = Process(target=task, args=(mutex,))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

某个子进程拿到锁之后,其他有着相同锁的子进程,除非等抢到锁的进程释放锁,否者会一直等待,直到锁释放后然后抢锁,接着运行。

# 使用例子

data.json文件:

{"ticket": 1}
1

mutex.py文件:

import json
from multiprocessing import Process, Lock

def search(name):
    # 查票
    with open('data.json', 'rt', encoding='utf-8') as f:
        data = json.load(f)
    print('用户: {}, 查询余票: {}'.format(name, data.get('ticket')))

def buyticket(name):
    # 查票
    with open('data.json', 'rt', encoding='utf-8') as f:
        data = json.load(f)
    # 买票
    if data.get('ticket') > 0:
        data['ticket'] -= 1
        with open('data.json', 'wt', encoding='utf-8') as f:
            json.dump(data, f)
        print('用户: {}, 购票成功!'.format(name))
    else:
        print('用户: {}, 购票失败,余票不足!'.format(name))

def run(name, mutex):
    search(name)
    mutex.acquire()
    buyticket(name)
    mutex.release()

if __name__ == '__main__':
    mutex = Lock()
    pr1 = Process(target=run, args=('cecilia',mutex))
    pr2 = Process(target=run, args=('anli',mutex))
    pr1.start()
    pr2.start()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

# 队列

# 介绍

队列:先进先出,类似于管道加锁。

堆栈:先进后出。

# queue模块

import queue
que = queue.Queue(maxsize=0)
1
2

Queue类用于创建队列对象。

maxsize参数:指定队列可以存放的最大数据量,默认0,表示无限制大小。

# 队列插入元素

que.put(item, block=True, timeout=None)

默认情况下,当队列数据存满了之后如果还往里存数据,则程序会进入阻塞状态,直到有位置让出来。

item参数:要插入的元素。

block参数:如果block为True,则在队列存满时,进入阻塞状态,可通过timeout控制阻塞时长。

timeout参数:用于控制block为true时阻塞的时长,超过设定的时间没有空位置供插入元素则报错,None为一直阻塞。

# 队列弹出元素

que.get(block=True, timeout=None)

默认情况下,当队列数据为空之后,如果还执行弹出操作,则程序会进入阻塞状态,直到有数据可以弹出。

block参数:如果block为True,则在队列为空时,进入阻塞状态,可通过timeout控制阻塞时长。

timeout参数:用于控制block为true时阻塞的时长,超过设定的时间队列还没有元素则报错,None为一直阻塞。

# 其他方法

que.full():判断当前队列是否已满。

``que.empty()`:判断当前队列是否为空。

que.get_nowait():队列弹出元素,如果队列数据为空,直接抛出异常。

这些方法在多进程中,可能是不准的,除非加锁,可能会出现前面的进程刚判断,后面的进程就已经更改了操作,这样就会造成不准的情况,所以一般用put()和get()就行。

# IPC (进程间通信)

# IPC介绍

IPC即进程之间通讯,我们可以利用队列来实现进程与进程之间的通信。

两个进程共用一个队列,然后一个进程put,另一个进程get,这就实现了进程间的通信。

# 例如

子进程和主进程交互:

from multiprocessing import Process, Queue
def producer(que):
    print('hello girl!')
    que.put('你好!帅哥')
if __name__ == '__main__':
    que = Queue()
    pcs = Process(target=producer, args=[que])
    pcs.start()
    print(que.get())
1
2
3
4
5
6
7
8
9

子进程和子进程交互:

from multiprocessing import Process, Queue
def producer(que):
    print('生产!')
    que.put('消费!')
def consumer(que):
    print(que.get())
if __name__ == '__main__':
    que = Queue()
    pcs1 = Process(target=producer, args=[que])
    pcs2 = Process(target=consumer, args=[que])
    pcs1.start()
    pcs2.start()
1
2
3
4
5
6
7
8
9
10
11
12

注意:子进程传入的队列对象,需要是multiprocessing中的Queue类实例化得来的,否则无法正常启动子进程。

# 生产者/消费者模型

# 生产者

生产、制作消息,并往队列中插入消息。

# 消息队列

作为消费者和生产者交互的媒介,用于存储消息。

# 消费者

从队列中取出消息,并消费、处理消息。

# 使用例子

from multiprocessing import Process, Queue

def producer(que):
    for i in range(0, 10):
        que.put(i)
        print("produce: {}".format(i))

def consumer(que):
    while True:
        data = que.get()
        if data is None:
            print('consume done!')
            break
        else:
            print('consume: {}'.format(data))

if __name__ == '__main__':
    que = Queue()
    producer_process = Process(target=producer, args=(que,))
    consumer01_process = Process(target=consumer, args=(que,))
    consumer02_process = Process(target=consumer, args=(que,))
    producer_process.start()
    consumer01_process.start()
    consumer02_process.start()
    producer_process.join()
    que.put(None)
    que.put(None)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

等生产者都生产完了,就插入一个None,而消费者就判断如果取到None就结束。有几个消费者就要put个None。

# JoinableQueue队列

from multiprocessing import JoinableQueue

与Queue的区别是每当往JoinableQueue队列实例里put数据,内部计数器就会加一。

当调用task_done时,计数器就会加一。

#多进程使用#进程对象方法#僵尸进程#孤儿进程#守护进程#互斥锁#队列#IPC#生产者消费者模型
并发相关介绍
多线程与线程间通信

← 并发相关介绍 多线程与线程间通信→

最近更新
01
Vue路由
12-09
02
FastAPI实现用户管理
11-23
03
Tortoise ORM
11-23
更多文章>
Theme by Vdoing | Copyright © 2022-2026 Hoshinozora | MIT License
湘ICP备2022022820号-1
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式