FastAPI实现用户管理
# 用户登录
# 用户登录逻辑
- 接收登录请求:客户端发送用户名(或邮箱)和密码。
- 验证输入格式:使用Pydantic模型校验字段是否合法(如非空、长度等)。
- 查询用户名和密码:根据用户名和密码在数据库中查找账号是否存在且密码正确。
- 生成身份凭证:登录成功后,签发一个访问令牌(Access Token),用于后续请求的身份认证(如 JWT Token)。
- 返回响应:返回token或用户信息给客户端。
# JWT访问令牌
# 介绍
JWT是一种签名传输方式,用于在各方之间安全地传输JSON格式数据,常用于身份认证,比如生成及验证用户Token等。
一个 JWT 由三部分组成(用.分隔):Header.Payload.Signature
Header:JWT头,算法和token类型(如 HS256 + JWT)。
也是JSON格式,例如:
{"typ":"JWT","alg":"HS256"}。
Payload:有效载荷,你要传递的JSON格式数据(如用户ID、过期时间等)。
JWT是签名,Header和Payload只是被base64编码,所以可被解码,不要放敏感信息。
Signature:签名,用于验证token是否被篡改。
签名是通过将header和payload拼接然后进行base64编码并对其进行对称加密生成的。
验证时通过对header和payload进行同样的编码加密操作得到签名,然后是否和验证JWT令牌的签名一致,一致则说明未被篡改。
攻击者由于没有密钥,所以修改了header和payload也无法生成正确的签名。
# 安装
# Python中处理JWT的主流库,用于生成/解析JWT
pip install python-jose[cryptography]
1
2
2
# 生成
from datetime import datetime, timedelta
from jose import jwt, JWTError
from typing import Optional
# 对称加密算法
ALGORITHM = "HS256"
# 密钥,随机生成即可,HS256推荐32字节及以上
SECRET_KEY = "your_very_secret_key_here"
def create_access_token(payload: dict, expires: int = 30) -> str:
"""
生成JWT访问令牌
:param payload: 要编码的数据
:param expires: 过期时长,单位分钟
:return: 生成的Token字符串
"""
# 复制副本,避免修改到原字典
_payload = payload.copy()
# 添加过期时间(当前时间+过期时长)
_payload["exp"] = datetime.utcnow() + timedelta(minutes=expires)
# 生成JWT签名
encoded_jwt = jwt.encode(_payload, SECRET_KEY, ALGORITHM)
return encoded_jwt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 验证
def verify_token(token: str) -> dict:
"""
验证并解码JWT访问令牌
:param token: 要验证解码的Token
:return: 验证解码后的Payload字典,失败则返回None
"""
# 验证JWT令牌是否过期或被篡改,并返回解码后的Payload数据
payload = jwt.decode(token,
SECRET_KEY,
algorithms=[ALGORITHM]
)
return payload
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
在调用
jwt.decode()时,如果Payload数据中包含exp字段,则会自动验证token是否过期。在当前时间大于过期时间时,会抛出ExpiredSignatureError。另外Payload中一般还有令牌主题字段(比如用户ID),官方推荐的标准化字段名是
sub,虽然也可以用其他字段名,但使用sub能保证与其他系统兼容,是行业通用约定。
# 定义请求体验证模型
class UserAuth(BaseModel):
username: str = Field(
min_length=6,
max_length=24,
pattern=r"^[a-zA-Z0-9]+$",
description="用户名,仅支持字母、数字",
example="alice2025"
)
password: str = Field(
min_length=6,
max_length=48,
description="用户密码",
example="qwe123456"
)
@field_validator("username")
@classmethod
def username_to_lower(cls, v: str) -> str:
return v.lower()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 处理登录请求
# 使用UserAuth模型验证请求体格式
@router.post("/login")
async def user_login(response: Response, body: UserAuth):
# 查询用户名和密码
user = await models.User.get_or_none(username=body.username, password=body.password)
# 登录成功则处理
if user:
print(f"用户[{body.username}]登录系统成功。")
# 生成Token身份凭证
token = create_jwt_token({"sub": user.username})
# cookies中设置token,并返回Json格式信息
response.set_cookie(key="session_id", value=token, httponly=True, expires=None)
return {"code": 0, "message": "登录成功"}
1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
# 验证用户信息
编写验证函数并通过Depends依赖注入进行验证。
from typing import Optional
from fastapi import Cookie, Depends, HTTPException
def get_current_user(
token: Optional[str] = Cookie(
default=None, # 没有找到所需Cookie时的默认值
alias="access_token", # 显示指定要匹配的Cookie名,不显示指定则默认使用形参名称
description="JWT 访问令牌" # 可选在FastAPI文档中显示说明
)) -> int:
if not token:
raise HTTPException(status_code=401, detail="未登录")
# 解码并验证token
payload = jwt.decode(token,
SECRET_KEY,
algorithms=[ALGORITHM]
)
user_id = payload.get("sub")
return int(user_id)
@app.get("/dashboard")
def dashboard(user_id: int = Depends(get_current_user)):
return {"user_id": user_id}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22