ThankNeko's Blog ThankNeko's Blog
首页
  • 操作系统

    • Linux基础
    • Linux服务
    • WindowsServer笔记
    • Ansible笔记
    • Shell笔记
  • 容器服务

    • Docker笔记
    • Kubernetes笔记
    • Git笔记
  • 数据库服务

    • MySQL笔记
    • ELK笔记
    • Redis笔记
  • 监控服务

    • Zabbix笔记
  • Web服务

    • Nginx笔记
    • Tomcat笔记
  • 数据处理

    • Kettle笔记
  • Python笔记
  • Bootstrap笔记
  • C笔记
  • C++笔记
  • Arduino笔记
  • 分类
  • 标签
  • 归档
  • 随笔
  • 关于
GitHub (opens new window)

Hoshinozora

尽人事,听天命。
首页
  • 操作系统

    • Linux基础
    • Linux服务
    • WindowsServer笔记
    • Ansible笔记
    • Shell笔记
  • 容器服务

    • Docker笔记
    • Kubernetes笔记
    • Git笔记
  • 数据库服务

    • MySQL笔记
    • ELK笔记
    • Redis笔记
  • 监控服务

    • Zabbix笔记
  • Web服务

    • Nginx笔记
    • Tomcat笔记
  • 数据处理

    • Kettle笔记
  • Python笔记
  • Bootstrap笔记
  • C笔记
  • C++笔记
  • Arduino笔记
  • 分类
  • 标签
  • 归档
  • 随笔
  • 关于
GitHub (opens new window)
  • Python笔记

    • 基础知识

    • 并发编程

      • 并发相关介绍
      • 多进程与进程间通信
      • 多线程与线程间通信
      • 其他锁与队列
      • 网络IO模型与协程
        • 网络IO模型介绍
        • 五种网络IO模型
          • 阻塞IO
          • 非阻塞IO
          • IO多路复用
          • 信号驱动IO
          • 异步IO
        • 协程
        • asyncio模块
          • 介绍
          • 协程函数与协程对象
          • 协程语法糖
          • 运行协程的方式
    • 爬虫笔记

    • 模块笔记

    • 后端笔记

  • C笔记

  • C++笔记

  • Arduino笔记

  • Web笔记

  • Dev
  • Python笔记
  • 并发编程
Hoshinozora
2023-02-25
目录

网络IO模型与协程

# 网络IO模型介绍

主机A

由于应用程序是不能直接操作硬件的,所以在进程想要发送数据到其他主机时,就需要将应用数据拷贝给内核,再由内核进行各种协议的封装,最后通过网络发送出去(copy datagram)。

主机B

发送系统调用,等待数据准备,根据使用的IO模型不同,接收数据的方式也不同。

操作系统接收到数据并处理之后,会将数据从内核拷贝到对应的进程中(copy datagram),然后应用程序再对接收到的数据进行处理。

# 五种网络IO模型

# 阻塞IO

一直IO阻塞,直到接收到数据为止,比如accept、recv等。

# 非阻塞IO

系统调用之后,无论是否有数据,都会立刻获得一个结果,它会将所有的阻塞操作变成非阻塞,收到响应后,即可以选择再次系统调用获取数据,也可以做其他操作。

如果系统调用时,内核已经获取到了数据,则内核会将数据拷贝到进程。

虽然非阻塞IO看上去厉害,但是该模型会长时间占用CPU,并且不干活,类似于while死循环。实际应用中一般不会考虑直接使用非阻塞IO模型。

import socket
server = socket.socket()
server.bind(('0.0.0.0', 8080))
server.listen(5)
server.setblocking(False)
conn_list = []
while 1:
    try:
        conn, addr = server.accept()
        conn_list.append(conn)
        print(f'接收到客户端[{addr}]的连接!')
    except BlockingIOError:
        pass
    # 通过循环和切换来处理连接和消息
    for conn in conn_list.copy():
        try:
            data = conn.recv(1024)
            # 如果连接断开
            if len(data) == 0:
                conn.close()
                # 删除无用的连接
                conn_list.remove(conn)
                print(f'客户端[{addr}]断开连接!')
            conn.send(data.upper())
        except BlockingIOError:
            continue
        except ConnectionResetError:
            conn.close()
            conn_list.remove(conn)
            print(f'客户端[{addr}]断开连接!')
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

# IO多路复用

# 介绍

IO多路复用模型可以将多个进程的IO注册到同一个管道上,内核会通过该管道帮你监管多个IO对象,多个IO对象只需要阻塞一次。

操作系统会轮询的去检查对象是否被触发。只要被监管的对象被触发,就会返回被监管对象自身,进程就可以调用该对象的方法获取数据。

相当于由IO多路复用模型,来帮你判断被监管对象是否有收到数据。

监管机制(如select、poll、epoll等)是操作系统提供的,python中有对应模块供我们调用。

被监管的IO需要是非阻塞的,阻塞的任务将由内核来完成 (IO多路复用)。

当监管的对象只有一个的时候,IO多路复用的效率甚至不如阻塞IO,但是IO多路复用可以一次性监管很多个对象,且多个对象用一条管道,可以提高效率。

# select函数

多个进程的IO可以注册到同一个select上,select会监听所有注册好的IO对象。

在被监听的IO对象还没有数据时,select会阻塞直到任意一个IO对象接收到数据。

当有IO对象接收到数据,它就会返回IO对象自身,进程就可以通过调用该IO对象的方法来获取数据,比如accept、recv等等。

from os import read
# 导入模块
import select
import socket
server = socket.socket()
server.bind(('127.0.0.1', 8080))
server.listen(5)
server.setblocking(False)
# 监听非阻塞对象,需要将对象放到列表里
read_list = [server]
while 1:
    # 阻塞直到IO对象有数据
    r_list, w_list, x_list = select.select(read_list, [], [])
    for i in r_list:
        # 如果接收到数据的对象是server
        if i is server:
            # 调用接收到的server对象,其accept方法
            # 因为前面的阻塞已经监测到有数据,所以此处一定能获取到数据
            conn, addr = i.accept()
            # 将conn对象,也添加到监听对象列表
            read_list.append(conn)
        
        # 如果接收到数据的对象是客户端的conn
        else:
            try:
                res = i.recv(1024)
                # 如果被关闭连接,则回收连接资源并从监管列表中移除
                if (len(res)) == 0:
                    i.close()
                    read_list.remove(i)
                print(res)
                i.send(res.upper())
            except ConnectionResetError:
                continue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

# select机制 & poll机制 & epoll机制

select特点

  • select在windows和linux都可以使用。

  • 单个进程监控的文件描述符有限,通常为1024*8个文件描述符。

    决定着最多能监管多少个对象,文件描述符数量越多,性能越差。

  • 内核/用户数据拷贝频繁,操作复杂。

    select在调用之前,需要在程序里将要监控的对象添加到fed_set集合中,然后加载到内核进行监控。

    当内核事件发生,再将fed_set集合中没有发生的文件描述符清空,然后拷贝到用户区,和数组中的文件描述符进行比对。再调用selecct也是如此,每次调用,都需要了来回拷贝。

  • 轮询时间效率低,需要遍历整个数组才知道谁发生了变化,轮询代价大,同时可能会造成延迟。

poll特点

  • poll只能在linux使用。
  • poll基于链表来存储,没有最大轮询数量的限制。
  • 除以上之外,和select基本没有差别。

epoll特点

  • epoll只能在linux使用。
  • 它会给每一个监管对象绑定一个回调机制,一旦有响应,回调机制立即发起提醒。

# selectors模块

该模块会自动针对不同操作系统,选择不同的监管机制select/poll/epoll。

# 信号驱动IO

基本不适用。

# 异步IO

异步IO模型是所有IO模型中效率最高、使用最广泛的模型。提交任务后,可以做自己的事,直到有收到通知触发回调函数,我们可以通过协程在实现异步IO。

相关模块:asyncio

相关框架:sanic、tronado、twisted

# 协程

协程是一种用户态的轻量级线程,用于实现异步IO。协程通过在程序代码层面对IO操作进行切换处理来实现,使系统认为我们的程序一直在运行没有进行IO。

Python中可以通过asyncio模块实现。

进程是在操作系统中执行的程序,线程是在进程内部执行的程序,而协程是在线程内部执行的程序,协程可以使单个线程实现并发的效果。

优点:协程相较线程开销更小,因为GIL锁限制同一时刻只能运行一个线程,但在一个线程内不会限制协程数,可以最大程度的利用CPU资源。

缺点:协程的本质是单线程下的多任务处理,所以也没办法利用多核优势。并且一旦协程遇到阻塞,也会阻塞整个线程。

# asyncio模块

# 介绍

asyncio模块是Python中用于实现异步的一个模块,它通过事件循环(EventLoop)来调度这些协程的执行。

# 协程函数与协程对象

被@asyncio.coroutine装饰的函数就是协程函数,协程函数无法被直接调用。

协程函数被调用后的返回值就是协程对象(coro)。

# 导入asyncio模块
import asyncio

# @asyncio.coroutine会将函数装饰成一个协程函数
@asyncio.coroutine
def hello():
    print("Hello world!")
    # yield from会暂时挂起当前协程,使事件循环去执行其他协程,直到yield from拿到其后方协程的返回值才会切换回来
    r = yield from asyncio.sleep(1)
    print("Hello again!")
1
2
3
4
5
6
7
8
9
10

当该函数作为协程被执行时会经历以下过程:

hello()会首先打印出Hello world!。

然后yield from语法会挂起当前协程,去执行其他协程,并等待该语法所在的协程返回值,再继续执行。

asyncio.sleep()是一个协程对象,所以线程不会真的等待asyncio.sleep(1)完成1秒的IO操作,而是切换去执行其他协程,也就是说实现了并发执行。

直到asyncio.sleep(1)返回时,线程从yield from拿到返回值,才会接着执行下一行语句 print("Hello again!")。

# 协程语法糖

async关键字:可以在协程函数前面加async关键字,以代替使用@asyncio.coroutine装饰器。

await关键字:可以在需要IO操作的协程对象前面加await关键字,以代替使用yield from挂起等待协程对象。

await表示等待这个地方的协程执行完再往下执行。

await只能在带有async关键字的函数中运行,且await后面的对象需要是一个协程对象,或者实现了相关的协议。

async def hello()
# 同等于
@asyncio.coroutine
def hello()

r = await asyncio.sleep(1)
# 同等于
r = yield from asyncio.sleep(1)
1
2
3
4
5
6
7
8

使用例子

import asyncio

async def hello():
    print("Hello world!")
    r = await asyncio.sleep(1)
    print("Hello again!")
1
2
3
4
5
6

# 运行协程的方式

# 运行单个协程

# asyncio.run()函数

使用asyncio.run(协程函数())直接运行单个协程(一般运行主协程),它能够自动管理事件循环的创建、运行、关闭。

该函数仅Python 3.7及其之后的版本才支持。

注意:它在启动前会检查当前线程是否已有运行中的事件循环,如果有则会抛出异常。

import asyncio

async def main():
    print('Hello world!')
    await asyncio.sleep(1)
    print('Hello again!')

# 运行单个协程,直到其执行完成
asyncio.run(main())
1
2
3
4
5
6
7
8
9
# 事件循环管理

手动创建、运行和关闭事件循环,适用于需要显式控制事件循环的底层场景。

import asyncio

async def main():
    print('Hello world!')
    await asyncio.sleep(1)
    print('Hello again!')

# 获取当前线程的事件循环,如果不存在则会创建一个新事件循环。
loop = asyncio.get_event_loop()
# 使用异常捕获确保资源回收。
try:
    # 在事件循环中运行单个协程,直到其执行完成。
    loop.run_until_complete(main())
finally:
    # 手动调用关闭事件循环,避免资源泄漏。
    loop.close()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 并发运行多个协程

# asyncio.create_task()函数

使用asyncio.create_task(协程函数())会将协程包装成一个Task对象并将其加入事件循环中运行。

import asyncio

# 定义协程函数1
async def hello_one():
    print('Hello one!')
    await asyncio.sleep(1)
    print('Hello one again!')
    return 'one'

# 定义协程函数2
async def hello_two():
    print('Hello two!')
    await asyncio.sleep(1)
    print('Hello two again!')
    return 'two'

# 定义主协程函数
async def main():
    # 将子协程加入事件循环并返回其任务对象
    task1 = asyncio.create_task(hello_one())
    task2 = asyncio.create_task(hello_two())

    # 等需要并发运行的协程都启动,再await等待即可
    task1_res = await task1
    task2_res = await task2
    # 输出任务对象的返回值
    print(task1_res, task2_res)

if __name__ == "__main__":
    # 运行主协程函数
    asyncio.run(main())
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# asyncio.gather()函数

使用asyncio.gather(协程函数1(), 协程函数2(), ...)会多个协程加入事件循环中运行,并将所有结果以列表方式返回,结果顺序和传入顺序一致。

如果任一协程被取消或抛出异常,其他协程也会被取消。

import asyncio

# 定义协程函数1
async def hello_one():
    print('Hello one!')
    await asyncio.sleep(1)
    print('Hello one again!')
    return "one"

# 定义协程函数2
async def hello_two():
    print('Hello two!')
    await asyncio.sleep(1)
    print('Hello two again!')
    return "two"

# 定义主协程函数
async def main():
    coros = [hello_one(), hello_two()]
    # await等待传入的多个协程运行完成
    results = await asyncio.gather(*coros)
    # 打印返回值
    for r in results:
        print(r)

if __name__ == "__main__":
    # 运行主协程函数
    asyncio.run(main())
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# asyncio.wait()函数

使用asyncio.wait([Task对象1(), Task对象2(), ...], return_when=返回条件, timeout=超时时间)会多个协程加入事件循环中运行,相比于直接await它提供了更灵活的等待策略,可以设置返回条件。

return_when参数:

  • asyncio.ALL_COMPLETED:等待所有任务完成再返回(默认)。

  • asyncio.FIRST_COMPLETED:等待第一个任务完成就返回,未完成的任务将继续执行。

timeout参数:

  • 单位秒,任务最大超时时间,如果任务未在指定时间内完成,则任务将会取消。

返回值:

  1. done:一个集合,包含已完成的任务(Task或Future对象)。

  2. pending:一个集合,包含未完成的任务。

  3. 可以遍历done集合中的每个任务对象,并调用其.result()方法来获取结果,或.exception()方法来捕获异常。

import asyncio

# 定义协程函数1
async def hello_one():
    print('Hello one!')
    await asyncio.sleep(1)
    print('Hello one again!')
    return "one"

# 定义协程函数2
async def hello_two():
    print('Hello two!')
    # 等待更久一点,以提供差异对比
    await asyncio.sleep(3)
    print('Hello two again!')
    return "two"

# 定义主协程函数
async def main():
    # 将协程加入事件循环并返回任务对象
    tasks = [asyncio.create_task(hello_one()), asyncio.create_task(hello_two())]
    # 等待有任意一个协程完成任务就返回,因为需要等待协程运行完成,所以需要await挂起
    done, pedding = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    # 输出所有已完成任务对象的返回结果
    print([i.result() for i in done])
    # 输入所有未完成任务的任务对象
    print(pedding)


if __name__ == "__main__":
    # 运行主协程函数
    asyncio.run(main())
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#网络IO模型介绍#阻塞IO#非阻塞IO#IO多路复用#异步IO#协程#asyncio
其他锁与队列
爬虫相关介绍

← 其他锁与队列 爬虫相关介绍→

最近更新
01
Vue路由
12-09
02
FastAPI实现用户管理
11-23
03
Tortoise ORM
11-23
更多文章>
Theme by Vdoing | Copyright © 2022-2026 Hoshinozora | MIT License
湘ICP备2022022820号-1
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式