网络IO模型与协程
# 网络IO模型介绍
主机A
由于应用程序是不能直接操作硬件的,所以在进程想要发送数据到其他主机时,就需要将应用数据拷贝给内核,再由内核进行各种协议的封装,最后通过网络发送出去(copy datagram)。
主机B
发送系统调用,等待数据准备,根据使用的IO模型不同,接收数据的方式也不同。
操作系统接收到数据并处理之后,会将数据从内核拷贝到对应的进程中(copy datagram),然后应用程序再对接收到的数据进行处理。
# 五种网络IO模型
# 阻塞IO
一直IO阻塞,直到接收到数据为止,比如
accept、recv等。
# 非阻塞IO
系统调用之后,无论是否有数据,都会立刻获得一个结果,它会将所有的阻塞操作变成非阻塞,收到响应后,即可以选择再次系统调用获取数据,也可以做其他操作。
如果系统调用时,内核已经获取到了数据,则内核会将数据拷贝到进程。
虽然非阻塞IO看上去厉害,但是该模型会长时间占用CPU,并且不干活,类似于while死循环。实际应用中一般不会考虑直接使用非阻塞IO模型。
import socket
server = socket.socket()
server.bind(('0.0.0.0', 8080))
server.listen(5)
server.setblocking(False)
conn_list = []
while 1:
try:
conn, addr = server.accept()
conn_list.append(conn)
print(f'接收到客户端[{addr}]的连接!')
except BlockingIOError:
pass
# 通过循环和切换来处理连接和消息
for conn in conn_list.copy():
try:
data = conn.recv(1024)
# 如果连接断开
if len(data) == 0:
conn.close()
# 删除无用的连接
conn_list.remove(conn)
print(f'客户端[{addr}]断开连接!')
conn.send(data.upper())
except BlockingIOError:
continue
except ConnectionResetError:
conn.close()
conn_list.remove(conn)
print(f'客户端[{addr}]断开连接!')
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# IO多路复用
# 介绍
IO多路复用模型可以将多个进程的IO注册到同一个管道上,内核会通过该管道帮你监管多个IO对象,多个IO对象只需要阻塞一次。
操作系统会轮询的去检查对象是否被触发。只要被监管的对象被触发,就会返回被监管对象自身,进程就可以调用该对象的方法获取数据。
相当于由IO多路复用模型,来帮你判断被监管对象是否有收到数据。
监管机制(如select、poll、epoll等)是操作系统提供的,python中有对应模块供我们调用。
被监管的IO需要是非阻塞的,阻塞的任务将由内核来完成 (IO多路复用)。
当监管的对象只有一个的时候,IO多路复用的效率甚至不如阻塞IO,但是IO多路复用可以一次性监管很多个对象,且多个对象用一条管道,可以提高效率。
# select函数
多个进程的IO可以注册到同一个select上,select会监听所有注册好的IO对象。
在被监听的IO对象还没有数据时,select会阻塞直到任意一个IO对象接收到数据。
当有IO对象接收到数据,它就会返回IO对象自身,进程就可以通过调用该IO对象的方法来获取数据,比如accept、recv等等。
from os import read
# 导入模块
import select
import socket
server = socket.socket()
server.bind(('127.0.0.1', 8080))
server.listen(5)
server.setblocking(False)
# 监听非阻塞对象,需要将对象放到列表里
read_list = [server]
while 1:
# 阻塞直到IO对象有数据
r_list, w_list, x_list = select.select(read_list, [], [])
for i in r_list:
# 如果接收到数据的对象是server
if i is server:
# 调用接收到的server对象,其accept方法
# 因为前面的阻塞已经监测到有数据,所以此处一定能获取到数据
conn, addr = i.accept()
# 将conn对象,也添加到监听对象列表
read_list.append(conn)
# 如果接收到数据的对象是客户端的conn
else:
try:
res = i.recv(1024)
# 如果被关闭连接,则回收连接资源并从监管列表中移除
if (len(res)) == 0:
i.close()
read_list.remove(i)
print(res)
i.send(res.upper())
except ConnectionResetError:
continue
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# select机制 & poll机制 & epoll机制
select特点
select在windows和linux都可以使用。
单个进程监控的文件描述符有限,通常为1024*8个文件描述符。
决定着最多能监管多少个对象,文件描述符数量越多,性能越差。
内核/用户数据拷贝频繁,操作复杂。
select在调用之前,需要在程序里将要监控的对象添加到fed_set集合中,然后加载到内核进行监控。
当内核事件发生,再将fed_set集合中没有发生的文件描述符清空,然后拷贝到用户区,和数组中的文件描述符进行比对。再调用selecct也是如此,每次调用,都需要了来回拷贝。
轮询时间效率低,需要遍历整个数组才知道谁发生了变化,轮询代价大,同时可能会造成延迟。
poll特点
- poll只能在linux使用。
- poll基于链表来存储,没有最大轮询数量的限制。
- 除以上之外,和select基本没有差别。
epoll特点
- epoll只能在linux使用。
- 它会给每一个监管对象绑定一个回调机制,一旦有响应,回调机制立即发起提醒。
# selectors模块
该模块会自动针对不同操作系统,选择不同的监管机制select/poll/epoll。
# 信号驱动IO
基本不适用。
# 异步IO
异步IO模型是所有IO模型中效率最高、使用最广泛的模型。提交任务后,可以做自己的事,直到有收到通知触发回调函数,我们可以通过协程在实现异步IO。
相关模块:asyncio
相关框架:sanic、tronado、twisted
# 协程
协程是一种用户态的轻量级线程,用于实现异步IO。协程通过在程序代码层面对IO操作进行切换处理来实现,使系统认为我们的程序一直在运行没有进行IO。
Python中可以通过asyncio模块实现。
进程是在操作系统中执行的程序,线程是在进程内部执行的程序,而协程是在线程内部执行的程序,协程可以使单个线程实现并发的效果。
优点:协程相较线程开销更小,因为GIL锁限制同一时刻只能运行一个线程,但在一个线程内不会限制协程数,可以最大程度的利用CPU资源。
缺点:协程的本质是单线程下的多任务处理,所以也没办法利用多核优势。并且一旦协程遇到阻塞,也会阻塞整个线程。
# asyncio模块
# 介绍
asyncio模块是Python中用于实现异步的一个模块,它通过事件循环(EventLoop)来调度这些协程的执行。
# 协程函数与协程对象
被@asyncio.coroutine装饰的函数就是协程函数,协程函数无法被直接调用。
协程函数被调用后的返回值就是协程对象(coro)。
# 导入asyncio模块
import asyncio
# @asyncio.coroutine会将函数装饰成一个协程函数
@asyncio.coroutine
def hello():
print("Hello world!")
# yield from会暂时挂起当前协程,使事件循环去执行其他协程,直到yield from拿到其后方协程的返回值才会切换回来
r = yield from asyncio.sleep(1)
print("Hello again!")
2
3
4
5
6
7
8
9
10
当该函数作为协程被执行时会经历以下过程:
hello()会首先打印出Hello world!。
然后yield from语法会挂起当前协程,去执行其他协程,并等待该语法所在的协程返回值,再继续执行。
asyncio.sleep()是一个协程对象,所以线程不会真的等待asyncio.sleep(1)完成1秒的IO操作,而是切换去执行其他协程,也就是说实现了并发执行。
直到asyncio.sleep(1)返回时,线程从yield from拿到返回值,才会接着执行下一行语句 print("Hello again!")。
# 协程语法糖
async关键字:可以在协程函数前面加async关键字,以代替使用@asyncio.coroutine装饰器。
await关键字:可以在需要IO操作的协程对象前面加await关键字,以代替使用yield from挂起等待协程对象。
await表示等待这个地方的协程执行完再往下执行。
await只能在带有async关键字的函数中运行,且await后面的对象需要是一个协程对象,或者实现了相关的协议。
async def hello()
# 同等于
@asyncio.coroutine
def hello()
r = await asyncio.sleep(1)
# 同等于
r = yield from asyncio.sleep(1)
2
3
4
5
6
7
8
使用例子
import asyncio
async def hello():
print("Hello world!")
r = await asyncio.sleep(1)
print("Hello again!")
2
3
4
5
6
# 运行协程的方式
# 运行单个协程
# asyncio.run()函数
使用asyncio.run(协程函数())直接运行单个协程(一般运行主协程),它能够自动管理事件循环的创建、运行、关闭。
该函数仅Python 3.7及其之后的版本才支持。
注意:它在启动前会检查当前线程是否已有运行中的事件循环,如果有则会抛出异常。
import asyncio
async def main():
print('Hello world!')
await asyncio.sleep(1)
print('Hello again!')
# 运行单个协程,直到其执行完成
asyncio.run(main())
2
3
4
5
6
7
8
9
# 事件循环管理
手动创建、运行和关闭事件循环,适用于需要显式控制事件循环的底层场景。
import asyncio
async def main():
print('Hello world!')
await asyncio.sleep(1)
print('Hello again!')
# 获取当前线程的事件循环,如果不存在则会创建一个新事件循环。
loop = asyncio.get_event_loop()
# 使用异常捕获确保资源回收。
try:
# 在事件循环中运行单个协程,直到其执行完成。
loop.run_until_complete(main())
finally:
# 手动调用关闭事件循环,避免资源泄漏。
loop.close()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 并发运行多个协程
# asyncio.create_task()函数
使用asyncio.create_task(协程函数())会将协程包装成一个Task对象并将其加入事件循环中运行。
import asyncio
# 定义协程函数1
async def hello_one():
print('Hello one!')
await asyncio.sleep(1)
print('Hello one again!')
return 'one'
# 定义协程函数2
async def hello_two():
print('Hello two!')
await asyncio.sleep(1)
print('Hello two again!')
return 'two'
# 定义主协程函数
async def main():
# 将子协程加入事件循环并返回其任务对象
task1 = asyncio.create_task(hello_one())
task2 = asyncio.create_task(hello_two())
# 等需要并发运行的协程都启动,再await等待即可
task1_res = await task1
task2_res = await task2
# 输出任务对象的返回值
print(task1_res, task2_res)
if __name__ == "__main__":
# 运行主协程函数
asyncio.run(main())
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# asyncio.gather()函数
使用asyncio.gather(协程函数1(), 协程函数2(), ...)会多个协程加入事件循环中运行,并将所有结果以列表方式返回,结果顺序和传入顺序一致。
如果任一协程被取消或抛出异常,其他协程也会被取消。
import asyncio
# 定义协程函数1
async def hello_one():
print('Hello one!')
await asyncio.sleep(1)
print('Hello one again!')
return "one"
# 定义协程函数2
async def hello_two():
print('Hello two!')
await asyncio.sleep(1)
print('Hello two again!')
return "two"
# 定义主协程函数
async def main():
coros = [hello_one(), hello_two()]
# await等待传入的多个协程运行完成
results = await asyncio.gather(*coros)
# 打印返回值
for r in results:
print(r)
if __name__ == "__main__":
# 运行主协程函数
asyncio.run(main())
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# asyncio.wait()函数
使用asyncio.wait([Task对象1(), Task对象2(), ...], return_when=返回条件, timeout=超时时间)会多个协程加入事件循环中运行,相比于直接await它提供了更灵活的等待策略,可以设置返回条件。
return_when参数:
asyncio.ALL_COMPLETED:等待所有任务完成再返回(默认)。
asyncio.FIRST_COMPLETED:等待第一个任务完成就返回,未完成的任务将继续执行。timeout参数:
- 单位秒,任务最大超时时间,如果任务未在指定时间内完成,则任务将会取消。
返回值:
done:一个集合,包含已完成的任务(Task或Future对象)。
pending:一个集合,包含未完成的任务。可以遍历
done集合中的每个任务对象,并调用其.result()方法来获取结果,或.exception()方法来捕获异常。
import asyncio
# 定义协程函数1
async def hello_one():
print('Hello one!')
await asyncio.sleep(1)
print('Hello one again!')
return "one"
# 定义协程函数2
async def hello_two():
print('Hello two!')
# 等待更久一点,以提供差异对比
await asyncio.sleep(3)
print('Hello two again!')
return "two"
# 定义主协程函数
async def main():
# 将协程加入事件循环并返回任务对象
tasks = [asyncio.create_task(hello_one()), asyncio.create_task(hello_two())]
# 等待有任意一个协程完成任务就返回,因为需要等待协程运行完成,所以需要await挂起
done, pedding = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
# 输出所有已完成任务对象的返回结果
print([i.result() for i in done])
# 输入所有未完成任务的任务对象
print(pedding)
if __name__ == "__main__":
# 运行主协程函数
asyncio.run(main())
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32