ThankNeko's Blog ThankNeko's Blog
首页
  • 操作系统

    • Linux基础
    • Linux服务
    • WindowsServer笔记
    • Ansible笔记
    • Shell笔记
  • 容器服务

    • Docker笔记
    • Kubernetes笔记
    • Git笔记
  • 数据库服务

    • MySQL笔记
    • ELK笔记
    • Redis笔记
  • 监控服务

    • Zabbix笔记
  • Web服务

    • Nginx笔记
    • Tomcat笔记
  • 数据处理

    • Kettle笔记
  • Python笔记
  • Bootstrap笔记
  • C笔记
  • C++笔记
  • Arduino笔记
  • 分类
  • 标签
  • 归档
  • 随笔
  • 关于
GitHub (opens new window)

Hoshinozora

尽人事,听天命。
首页
  • 操作系统

    • Linux基础
    • Linux服务
    • WindowsServer笔记
    • Ansible笔记
    • Shell笔记
  • 容器服务

    • Docker笔记
    • Kubernetes笔记
    • Git笔记
  • 数据库服务

    • MySQL笔记
    • ELK笔记
    • Redis笔记
  • 监控服务

    • Zabbix笔记
  • Web服务

    • Nginx笔记
    • Tomcat笔记
  • 数据处理

    • Kettle笔记
  • Python笔记
  • Bootstrap笔记
  • C笔记
  • C++笔记
  • Arduino笔记
  • 分类
  • 标签
  • 归档
  • 随笔
  • 关于
GitHub (opens new window)
  • Python笔记

    • 基础知识

    • 并发编程

      • 并发相关介绍
      • 多进程与进程间通信
      • 多线程与线程间通信
        • 线程理论
        • Python线程
        • 守护线程
        • 线程互斥锁
        • GIL全局解释器锁
        • 进程池和线程池
        • Python多进程和多线程的选择
      • 其他锁与队列
      • 网络IO模型与协程
    • 爬虫笔记

    • 模块笔记

    • 后端笔记

  • C笔记

  • C++笔记

  • Arduino笔记

  • Web笔记

  • Dev
  • Python笔记
  • 并发编程
Hoshinozora
2023-02-25
目录

多线程与线程间通信

# 线程理论

# 什么是线程

进程和线程都是虚拟单位,只是为了我们更加方便的进行描述。

进程

每一个进程必然自带一个线程。

进程是资源单位,每起一个进程就需要向计算机申请一块内存空间,用于存放资源和给线程提供所需资源(如变量等)。

线程

进程是执行单位,真正被CPU执行的其实是进程里面的线程,线程指的就是代码的执行过程。

执行代码所需要使用到的资源(如变量等),都是找所在的进程去获取。

# 为什么要有线程

多进程和多线程都能实现并行和并发,但开设一个进程需要申请内存空间、拷贝代码。而在进程内开设更多线程,却无需额外申请内存空间和拷贝代码的操作。

所以开设线程的开销要远远小于开设进程的开销,并且同一进程下的多个线程之间数据是共享的。

例如:

from threading import Thread
def one():
    global a
    a = 666
def two():
    global a
    print(a)
thr1 = Thread(target=one)
thr2 = Thread(target=two)
thr1.start()
thr1.join()
thr2.start()
1
2
3
4
5
6
7
8
9
10
11
12

# 什么时候用线程

多个功能之间需要数据共享的时候,开多线程处理更加合适。

# Python线程

# 创建启动线程

# 第一种方式

直接实例化Thread类。

# 导入线程模块
from threading import Thread
# 实例化一个线程对象
thr = Thread(target=函数, args=(实参1, 实参2...))
# 告诉操作系统帮你创建一个线程(线程启动时只会执行指定的函数)
thr.start()
1
2
3
4
5
6

例如:

from threading import Thread
def task(var):
    print(var)
thr = Thread(target=task,args=['子'])
thr.start()
print('主')
1
2
3
4
5
6
# 第二种创建方式

编写自己的线程类,需要继承Thread类。

# 导入线程模块
from threading import Thread
# 继承Thread类
class MyThread(Thread):
    def __init__(self,var):
        super().__init__()
        self.var = var
    # 使用继承类的方法时,需要在类中定义run方法
    def run(self):
        print(self.var)

# 实例化对象
thr = MyThread()
# 告诉操作系统帮你创建一个线程(线程启动时,只会运行run方法中的代码)
thr.start()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

例如:

from threading import Thread
class MyThread(Thread):
    def __init__(self,var):
        super().__init__()
        self.var = var
    def run(self):
        print(self.var)

thr = MyThread('子')
thr.start()
print('主')
1
2
3
4
5
6
7
8
9
10
11

# 等待线程

[线程对象].join()

线程对象的join方法,可以使主线程等待子线程执行完毕,然后再继续执行,和进程的join方法差不多。

例如:

import time
from threading import Thread
def test():
    time.sleep(3)
    print('子')
thr = Thread(target=test)
thr.start()
# 执行join()方法后,主线程会等待子线程执行完再继续执行后面的代码
thr.join()
print('主')
1
2
3
4
5
6
7
8
9
10

# 获取当前线程的线程名

current_thread().name

from threading import Thread, current_thread
def task():
    print('子线程名: ' + current_thread().name)
thr = Thread(target=task)
print('主线程名: ' + current_thread().name)
thr.start()
1
2
3
4
5
6

# 获取正在活跃的线程数

active_count()

会获取当前活跃的主线程数和子线程数的和。

import time
from threading import Thread, active_count
def task():
    time.sleep(2)
    print('子线程执行完毕')
thr = Thread(target=task)
thr.start()
print('活跃线程数: {}'.format(active_count()))
thr.join()
print('活跃线程数: {}'.format(active_count()))
1
2
3
4
5
6
7
8
9
10

# 守护线程

# 介绍

主线程的代码运行结束之后不会立刻结束进程,而是会等待所有其他非守护线程运行结束才会结束。

就是说,主线程如果要结束,不会管守护线程是否执行完,只会管非守护线程是否执行完,但在主线程等待非守护线程执行完毕的期间,守护线程仍然会在执行。

# 实现方式

[线程对象].daemon = True
[线程对象].start()
1
2

# 例子

import time
from threading import Thread
def task(i):
    print('子线程执行开始')
    time.sleep(i)
    print('子线程执行完毕')
thr1 = Thread(target=task,args=(2,))
thr2 = Thread(target=task,args=(1,))
thr1.daemon = 1
thr1.start()
thr2.start()
print('主')
1
2
3
4
5
6
7
8
9
10
11
12

# 线程互斥锁

# 介绍

同进程的互斥锁一样,也是将并发变成串行,牺牲效率,以保证数据安全性的。

# 使用例子

import time
from threading import Thread, Lock
money = 20
mutex = Lock()
def task(mutex):
    mutex.acquire()
    global money
    tmp = money
    time.sleep(1)
    money = tmp - 1
    mutex.release()
t_list = []
for t in range(10):
    t = Thread(target=task,args=(mutex,))
    t.start()
    t_list.append(t)
for t in t_list:
    t.join()
print(money)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# GIL全局解释器锁

# 介绍

GIL不是Python的特点,而是CPython解释器的特点。CPython解释器中存在一把GIL互斥锁,用来阻止同一进程下的多个线程的同时执行,因此使用CPython解释器运行多线程代码无法利用多核优势。

GIL的优点是可以保证解释器级别的数据的安全,缺点是只支持并发,不支持并行。

GIL是解释型语言的通病,因为解释型语言是一行一行执行的。而编译型是事先编译,事先已经规避。

# 原因

CPython中的内存管理(垃圾回收机制)不是线程安全的。线程不安全就是指在多线程写操作时,可能出现数据写入不正常的情况。

CPython的垃圾回收使用的是引用计数等,假如CPython支持多线程同时执行,则垃圾回收线程作为一个线程,会一直去扫描内存中没有被名称引用的值。

如果此时其他线程刚申请到一个值,但是还没来得及赋值给名称,这个还没来的及赋值给名称的值,被垃圾回收扫描到了就会直接回收掉它,这就是垃圾回收在多线程情况下的线程不安全。

# 实现方式

GIL锁是加在CPython解释器上的,在我们线程通过CPython解释器去执行代码的时候,就需要线程抢到了锁,它才能执行。

至于执行时看起来像是同时执行,是因为线程之间的保持状态和切换,所以是多线程并发。

# 检验方法

import time
from threading import Thread
money = 20
def task():
    global money
    tmp = money
    money = tmp - 1
t_list = []
for t in range(10):
    t = Thread(target=task)
    t.start()
    t_list.append(t)
for t in t_list:
    t.join()
print(money)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

由于GIL锁的原因,多个线程之间并非同时执行,也就是说在无延迟的情况下,GIL锁可以保证一定的数据安全,所以执行结果会为正确的10。

这种情况是理想情况,实际使用时几乎不可能无延迟,所以只要有一点延迟,就需要我们自己加锁来保证数据的安全性。

# 进程池和线程池

# 介绍

池的是用来保证计算机硬件安全,它使我们能够最大限度的利用计算机,它降低了程序的运行效率,但是保证了程序的运行稳定性。

所谓进程池和线程池,就是用来管理限制进程数或线程数的。如果线程或进程不加以管理导致过多,就可能导致程序或计算机卡死、崩溃等。

# 线程池方法

导入线程池类:from concurrent.futures import ThreadPoolExecutor

创建线程池:thread_pool = ThreadPoolExecutor()

创建线程池,同时指定max_workers固定线程数,为该线程池固定线程数,以限制线程数量,如果为None,则默认为逻辑CPU数 + 4。

线程池创建后,线程池内会在生成同等于max_workers值的固定线程数。

这些被创建出来的线程是固定不变的,每一次执行任务,都是这些线程在执行,不会再出现线程的创建和销毁的过程,这样可以节省资源,且提高效率。

提交线程任务:future = thread_pool.submit(task, var1, var2...)

朝线程池中,以异步的方式提交线程任务,需要传入要执行的函数名,后面跟需要传入的参数。

submit()提交任务后会返回一个Future对象,可用于获取运行状态、返回结果等。

获取任务执行的返回结果:future.result()

获取任务执行的返回结果,调用submit返回的对象的result方法可手动获取任务的返回结果。

调用该方法时,如果任务还没有执行完,则会阻塞直到任务执行完毕,然后再返回结果。

绑定回调函数:thread_pool.submit(task).add_done_callback(call_back)

线程池绑定回调函数,当任务执行完时,会将submit生成的future对象,传入回调函数中。

关闭线程池:thread_pool.shutdown(wait=True, cancel_futures=False)

阻塞到线程池中的所有任务执行完毕,然后关闭线程池。

默认会等到线程池中的任务全部执行完,再执行关闭线程池操作,关闭后就不能再往里面提交任务了,需要重新创建线程池。

wait表示是否阻塞到线程池中的任务全部执行完后,再执行关闭操作。

cancel_futures表示是否直接清除所以线程任务。

# 进程池方法

与线程池方法基本一致。

导入进程池类:from concurrent.futures import ProcessPoolExecutor

创建进程池:thread_pool = ProcessPoolExecutor()

max_workers指固定进程池,与线程池不同的是,进程池如果为None,则默认为逻辑CPU数。

提交进程任务:process_pool.submit(task, var1, var2...)

绑定回调函数:process_pool.submit(task).add_done_callback(call_back)

获取任务执行的返回结果:future.result()

关闭进程池:process_pool.shutdown(wait=True, cancel_futures=False)

# 回调机制

在提交异步任务时,给每个异步提交的任务绑定一个回调函数,当任务执行完后,会将其结果传入回调函数。

例如:

# 定义一个回调函数
def call_back(future):
	# 当任务执行完时,会将submit生成的future对象,传入回调函数中,回调函数就可以获取其返回值进行处理
    print(future.result())
# 提交线程或进程的任务时,绑定回调函数
thread_pool.submit(task).add_done_callback(call_back)
1
2
3
4
5
6

# Python多进程和多线程的选择

# 单核情况

单核情况下一律使用多线程即可,因为单核开多进程不会提高效率,却会消耗额外资源。

# 多核情况

IO密集型(IO工作比较多)

  • 多进程

    运算效率不会提升,却会消耗额外资源。

    同时进程创建时也会有时间开销。

  • 多线程 (推荐)

    可以节省资源,不会有额外的资源开销。

计算密集型(CPU工作比较多)

  • 多进程 (推荐)

    虽然会消耗额外资源,但会大幅提高运算效率。

  • 多线程

    虽然不会消耗额外资源,但也不会提高运算效率。

# 实际对比

IO密集型使用多进程测试:

import time
from multiprocessing import Process
def task():
    time.sleep(2)
if __name__ == '__main__':
    start_time = time.time()
    # 测试2核所以起2个进程
    p_list = []
    for i in range(2):
        p = Process(target=task)
        p.start()
        p_list.append(p)
    
    for p in p_list:
        p.join()
    run_time = time.time() - start_time
    # 执行耗时: 2.117844820022583 秒
    print('执行耗时: {} 秒'.format(run_time))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

IO密集型使用多线程测试:

import time
from threading import Thread
def task():
    time.sleep(2)
if __name__ == '__main__':
    start_time = time.time()
    # 测试2核所以起2个进程
    p_list = []
    for i in range(2):
        p = Thread(target=task)
        p.start()
        p_list.append(p)
    
    for p in p_list:
        p.join()
    run_time = time.time() - start_time
    # 执行耗时: 2.0063345432281494 秒
    print('执行耗时: {} 秒'.format(run_time))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

IO密集型测试结论

多进程没有提高效率,而且会消耗额外资源,所以多线程胜出。

计算密集型使用多进程测试:

import time
from multiprocessing import Process
def task():
    a = 0
    for i in range(100000000):
        a = a * i
if __name__ == '__main__':
    start_time = time.time()
    # 测试2核所以起2个进程
    p_list = []
    for i in range(2):
        p = Process(target=task)
        p.start()
        p_list.append(p)
    
    for p in p_list:
        p.join()
    run_time = time.time() - start_time
    # 执行耗时: 4.36698579788208 秒
    print('执行耗时: {} 秒'.format(run_time))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

计算密集型使用多线程测试:

import time
from threading import Thread
def task():
    a = 0
    for i in range(100000000):
        a = a * i
if __name__ == '__main__':
    start_time = time.time()
    # 测试2核所以起2个进程
    p_list = []
    for i in range(2):
        p = Thread(target=task)
        p.start()
        p_list.append(p)
    for p in p_list:
        p.join()
    run_time = time.time() - start_time
    # 执行耗时: 8.072399139404297 秒
    print('执行耗时: {} 秒'.format(run_time))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

计算密集型测试结论

多进程虽然会消耗额外资源,但是会大幅提高运算效率,所以多进程胜出。

# 总结

多进程和多线程都有各自的优势,一般写程序时,我们会在多进程下再开设多线程,这样我们既可以利用多核优势,也可以节省资源消耗。

#线程理论#线程使用#线程对象方法#守护线程#互斥锁#GIL全局解释器锁#进程池#线程池
多进程与进程间通信
其他锁与队列

← 多进程与进程间通信 其他锁与队列→

最近更新
01
Vue路由
12-09
02
FastAPI实现用户管理
11-23
03
Tortoise ORM
11-23
更多文章>
Theme by Vdoing | Copyright © 2022-2026 Hoshinozora | MIT License
湘ICP备2022022820号-1
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式