多线程与线程间通信
# 线程理论
# 什么是线程
进程和线程都是虚拟单位,只是为了我们更加方便的进行描述。
进程
每一个进程必然自带一个线程。
进程是资源单位,每起一个进程就需要向计算机申请一块内存空间,用于存放资源和给线程提供所需资源(如变量等)。
线程
进程是执行单位,真正被CPU执行的其实是进程里面的线程,线程指的就是代码的执行过程。
执行代码所需要使用到的资源(如变量等),都是找所在的进程去获取。
# 为什么要有线程
多进程和多线程都能实现并行和并发,但开设一个进程需要申请内存空间、拷贝代码。而在进程内开设更多线程,却无需额外申请内存空间和拷贝代码的操作。
所以开设线程的开销要远远小于开设进程的开销,并且同一进程下的多个线程之间数据是共享的。
例如:
from threading import Thread
def one():
global a
a = 666
def two():
global a
print(a)
thr1 = Thread(target=one)
thr2 = Thread(target=two)
thr1.start()
thr1.join()
thr2.start()
2
3
4
5
6
7
8
9
10
11
12
# 什么时候用线程
多个功能之间需要数据共享的时候,开多线程处理更加合适。
# Python线程
# 创建启动线程
# 第一种方式
直接实例化Thread类。
# 导入线程模块
from threading import Thread
# 实例化一个线程对象
thr = Thread(target=函数, args=(实参1, 实参2...))
# 告诉操作系统帮你创建一个线程(线程启动时只会执行指定的函数)
thr.start()
2
3
4
5
6
例如:
from threading import Thread
def task(var):
print(var)
thr = Thread(target=task,args=['子'])
thr.start()
print('主')
2
3
4
5
6
# 第二种创建方式
编写自己的线程类,需要继承Thread类。
# 导入线程模块
from threading import Thread
# 继承Thread类
class MyThread(Thread):
def __init__(self,var):
super().__init__()
self.var = var
# 使用继承类的方法时,需要在类中定义run方法
def run(self):
print(self.var)
# 实例化对象
thr = MyThread()
# 告诉操作系统帮你创建一个线程(线程启动时,只会运行run方法中的代码)
thr.start()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
例如:
from threading import Thread
class MyThread(Thread):
def __init__(self,var):
super().__init__()
self.var = var
def run(self):
print(self.var)
thr = MyThread('子')
thr.start()
print('主')
2
3
4
5
6
7
8
9
10
11
# 等待线程
[线程对象].join()
线程对象的join方法,可以使主线程等待子线程执行完毕,然后再继续执行,和进程的join方法差不多。
例如:
import time
from threading import Thread
def test():
time.sleep(3)
print('子')
thr = Thread(target=test)
thr.start()
# 执行join()方法后,主线程会等待子线程执行完再继续执行后面的代码
thr.join()
print('主')
2
3
4
5
6
7
8
9
10
# 获取当前线程的线程名
current_thread().name
from threading import Thread, current_thread
def task():
print('子线程名: ' + current_thread().name)
thr = Thread(target=task)
print('主线程名: ' + current_thread().name)
thr.start()
2
3
4
5
6
# 获取正在活跃的线程数
active_count()
会获取当前活跃的主线程数和子线程数的和。
import time
from threading import Thread, active_count
def task():
time.sleep(2)
print('子线程执行完毕')
thr = Thread(target=task)
thr.start()
print('活跃线程数: {}'.format(active_count()))
thr.join()
print('活跃线程数: {}'.format(active_count()))
2
3
4
5
6
7
8
9
10
# 守护线程
# 介绍
主线程的代码运行结束之后不会立刻结束进程,而是会等待所有其他非守护线程运行结束才会结束。
就是说,主线程如果要结束,不会管守护线程是否执行完,只会管非守护线程是否执行完,但在主线程等待非守护线程执行完毕的期间,守护线程仍然会在执行。
# 实现方式
[线程对象].daemon = True
[线程对象].start()
2
# 例子
import time
from threading import Thread
def task(i):
print('子线程执行开始')
time.sleep(i)
print('子线程执行完毕')
thr1 = Thread(target=task,args=(2,))
thr2 = Thread(target=task,args=(1,))
thr1.daemon = 1
thr1.start()
thr2.start()
print('主')
2
3
4
5
6
7
8
9
10
11
12
# 线程互斥锁
# 介绍
同进程的互斥锁一样,也是将并发变成串行,牺牲效率,以保证数据安全性的。
# 使用例子
import time
from threading import Thread, Lock
money = 20
mutex = Lock()
def task(mutex):
mutex.acquire()
global money
tmp = money
time.sleep(1)
money = tmp - 1
mutex.release()
t_list = []
for t in range(10):
t = Thread(target=task,args=(mutex,))
t.start()
t_list.append(t)
for t in t_list:
t.join()
print(money)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# GIL全局解释器锁
# 介绍
GIL不是Python的特点,而是CPython解释器的特点。CPython解释器中存在一把GIL互斥锁,用来阻止同一进程下的多个线程的同时执行,因此使用CPython解释器运行多线程代码无法利用多核优势。
GIL的优点是可以保证解释器级别的数据的安全,缺点是只支持并发,不支持并行。
GIL是解释型语言的通病,因为解释型语言是一行一行执行的。而编译型是事先编译,事先已经规避。
# 原因
CPython中的内存管理(垃圾回收机制)不是线程安全的。线程不安全就是指在多线程写操作时,可能出现数据写入不正常的情况。
CPython的垃圾回收使用的是引用计数等,假如CPython支持多线程同时执行,则垃圾回收线程作为一个线程,会一直去扫描内存中没有被名称引用的值。
如果此时其他线程刚申请到一个值,但是还没来得及赋值给名称,这个还没来的及赋值给名称的值,被垃圾回收扫描到了就会直接回收掉它,这就是垃圾回收在多线程情况下的线程不安全。
# 实现方式
GIL锁是加在CPython解释器上的,在我们线程通过CPython解释器去执行代码的时候,就需要线程抢到了锁,它才能执行。
至于执行时看起来像是同时执行,是因为线程之间的保持状态和切换,所以是多线程并发。
# 检验方法
import time
from threading import Thread
money = 20
def task():
global money
tmp = money
money = tmp - 1
t_list = []
for t in range(10):
t = Thread(target=task)
t.start()
t_list.append(t)
for t in t_list:
t.join()
print(money)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
由于GIL锁的原因,多个线程之间并非同时执行,也就是说在无延迟的情况下,GIL锁可以保证一定的数据安全,所以执行结果会为正确的10。
这种情况是理想情况,实际使用时几乎不可能无延迟,所以只要有一点延迟,就需要我们自己加锁来保证数据的安全性。
# 进程池和线程池
# 介绍
池的是用来保证计算机硬件安全,它使我们能够最大限度的利用计算机,它降低了程序的运行效率,但是保证了程序的运行稳定性。
所谓进程池和线程池,就是用来管理限制进程数或线程数的。如果线程或进程不加以管理导致过多,就可能导致程序或计算机卡死、崩溃等。
# 线程池方法
导入线程池类:from concurrent.futures import ThreadPoolExecutor
创建线程池:thread_pool = ThreadPoolExecutor()
创建线程池,同时指定max_workers固定线程数,为该线程池固定线程数,以限制线程数量,如果为None,则默认为
逻辑CPU数 + 4。线程池创建后,线程池内会在生成同等于max_workers值的固定线程数。
这些被创建出来的线程是固定不变的,每一次执行任务,都是这些线程在执行,不会再出现线程的创建和销毁的过程,这样可以节省资源,且提高效率。
提交线程任务:future = thread_pool.submit(task, var1, var2...)
朝线程池中,以异步的方式提交线程任务,需要传入要执行的函数名,后面跟需要传入的参数。
submit()提交任务后会返回一个Future对象,可用于获取运行状态、返回结果等。
获取任务执行的返回结果:future.result()
获取任务执行的返回结果,调用submit返回的对象的result方法可手动获取任务的返回结果。
调用该方法时,如果任务还没有执行完,则会阻塞直到任务执行完毕,然后再返回结果。
绑定回调函数:thread_pool.submit(task).add_done_callback(call_back)
线程池绑定回调函数,当任务执行完时,会将submit生成的future对象,传入回调函数中。
关闭线程池:thread_pool.shutdown(wait=True, cancel_futures=False)
阻塞到线程池中的所有任务执行完毕,然后关闭线程池。
默认会等到线程池中的任务全部执行完,再执行关闭线程池操作,关闭后就不能再往里面提交任务了,需要重新创建线程池。
wait表示是否阻塞到线程池中的任务全部执行完后,再执行关闭操作。
cancel_futures表示是否直接清除所以线程任务。
# 进程池方法
与线程池方法基本一致。
导入进程池类:from concurrent.futures import ProcessPoolExecutor
创建进程池:thread_pool = ProcessPoolExecutor()
max_workers指固定进程池,与线程池不同的是,进程池如果为None,则默认为逻辑CPU数。
提交进程任务:process_pool.submit(task, var1, var2...)
绑定回调函数:process_pool.submit(task).add_done_callback(call_back)
获取任务执行的返回结果:future.result()
关闭进程池:process_pool.shutdown(wait=True, cancel_futures=False)
# 回调机制
在提交异步任务时,给每个异步提交的任务绑定一个回调函数,当任务执行完后,会将其结果传入回调函数。
例如:
# 定义一个回调函数
def call_back(future):
# 当任务执行完时,会将submit生成的future对象,传入回调函数中,回调函数就可以获取其返回值进行处理
print(future.result())
# 提交线程或进程的任务时,绑定回调函数
thread_pool.submit(task).add_done_callback(call_back)
2
3
4
5
6
# Python多进程和多线程的选择
# 单核情况
单核情况下一律使用多线程即可,因为单核开多进程不会提高效率,却会消耗额外资源。
# 多核情况
IO密集型(IO工作比较多)
- 多进程
运算效率不会提升,却会消耗额外资源。
同时进程创建时也会有时间开销。
- 多线程 (推荐)
可以节省资源,不会有额外的资源开销。
计算密集型(CPU工作比较多)
多进程 (推荐)
虽然会消耗额外资源,但会大幅提高运算效率。
多线程
虽然不会消耗额外资源,但也不会提高运算效率。
# 实际对比
IO密集型使用多进程测试:
import time
from multiprocessing import Process
def task():
time.sleep(2)
if __name__ == '__main__':
start_time = time.time()
# 测试2核所以起2个进程
p_list = []
for i in range(2):
p = Process(target=task)
p.start()
p_list.append(p)
for p in p_list:
p.join()
run_time = time.time() - start_time
# 执行耗时: 2.117844820022583 秒
print('执行耗时: {} 秒'.format(run_time))
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
IO密集型使用多线程测试:
import time
from threading import Thread
def task():
time.sleep(2)
if __name__ == '__main__':
start_time = time.time()
# 测试2核所以起2个进程
p_list = []
for i in range(2):
p = Thread(target=task)
p.start()
p_list.append(p)
for p in p_list:
p.join()
run_time = time.time() - start_time
# 执行耗时: 2.0063345432281494 秒
print('执行耗时: {} 秒'.format(run_time))
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
IO密集型测试结论
多进程没有提高效率,而且会消耗额外资源,所以多线程胜出。
计算密集型使用多进程测试:
import time
from multiprocessing import Process
def task():
a = 0
for i in range(100000000):
a = a * i
if __name__ == '__main__':
start_time = time.time()
# 测试2核所以起2个进程
p_list = []
for i in range(2):
p = Process(target=task)
p.start()
p_list.append(p)
for p in p_list:
p.join()
run_time = time.time() - start_time
# 执行耗时: 4.36698579788208 秒
print('执行耗时: {} 秒'.format(run_time))
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
计算密集型使用多线程测试:
import time
from threading import Thread
def task():
a = 0
for i in range(100000000):
a = a * i
if __name__ == '__main__':
start_time = time.time()
# 测试2核所以起2个进程
p_list = []
for i in range(2):
p = Thread(target=task)
p.start()
p_list.append(p)
for p in p_list:
p.join()
run_time = time.time() - start_time
# 执行耗时: 8.072399139404297 秒
print('执行耗时: {} 秒'.format(run_time))
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
计算密集型测试结论
多进程虽然会消耗额外资源,但是会大幅提高运算效率,所以多进程胜出。
# 总结
多进程和多线程都有各自的优势,一般写程序时,我们会在多进程下再开设多线程,这样我们既可以利用多核优势,也可以节省资源消耗。