FastAPI 学习之路(五十六)将token缓存到redis

在之前的文章中,FastAPI 学习之路(二十九)使用(哈希)密码和 JWT Bearer 令牌的 OAuth2,FastAPI 学习之路(二十八)使用密码和 Bearer 的简单 OAuth2,FastAPI 学习之路(三十四)数据库多表操作,我们分享了基于jwt认证token和基于数据库创建用户,那么我们今天把这些代码整理下,形成基于数据库用户名密码,登陆验证token存储到redis中。

首先我们看下之前基于jwt认证token的代码:

# 见chapter18

"""
-*- encoding=utf-8 -*-
Time: 2024/6/28 16:16
Author: lc
Email: 15101006331@163.com
File: chapter18.py
"""


"""
使用(哈希)密码和 JWT Bearer 令牌的 OAuth2
"""
from fastapi import FastAPI, Depends, status, HTTPException
from fastapi.security import OAuth2PasswordRequestForm, OAuth2PasswordBearer

from pydantic import BaseModel
from typing import Optional
from jose import JWTError, jwt
from datetime import datetime, timedelta
from passlib.context import CryptContext

SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

fake_db_users = {
    "mrli": {
        "username": "mrli",
        "full_name": "mrli_hanjing",
        "email": "mrli@qq.com",
        "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
        "disabled": False
    }
}

app = FastAPI()


def fake_hash_password(password: str):
    """模拟加密密码"""
    return password


class Token(BaseModel):
    access_token: str
    token_type: str


class TokenData(BaseModel):
    username: Optional[str] = None


class User(BaseModel):
    username: str
    full_name: Optional[str] = None
    email: Optional[str] = None
    disabled: Optional[bool] = None


class UserInDB(User):
    hashed_password: str


pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


def verify_password(plain_password: str, hashed_password: str):
    """校验密码"""
    return pwd_context.verify(plain_password, hashed_password)


def get_password_hash(password: str):
    """密码加密"""
    return pwd_context.hash(password)


def get_user(db_users: dict, username: str):
    if username in db_users:
        user_info = db_users[username]
        return UserInDB(**user_info)


def authenticate_user(db_users: dict, username: str, password: str):
    """校验用户权限"""
    user = get_user(db_users, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user


def create_access_token(data: dict, expires: Optional[timedelta] = None):
    """创建jwt"""
    to_encode = data.copy()
    if expires:
        expire_time = datetime.utcnow() + expires
    else:
        expire_time = datetime.utcnow() + timedelta(minutes=30)
    to_encode.update({"exp": expire_time})
    encode_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encode_jwt


def fake_decode_token(token):
    """模拟解码token"""
    user = get_user(fake_db_users, token)
    return user


def get_current_user(token: str = Depends(oauth2_scheme)):
    exc = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Authentication Failed",
        headers={"WWW-Authenticate": "Bearer"}
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username = payload.get("sub")
        if username is None:
            raise exc
        token_data = TokenData(username=username)
    except JWTError:
        raise exc
    user = get_user(fake_db_users, username=token_data.username)
    if not user:
        raise exc
    return user


def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user


@app.post("/token", response_model=Token)  # 必须实现路径为/token的接口来返回access_token,在文档页面点击Authorize时就是调用的这个接口
def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(fake_db_users, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"}
        )
    access_token_expire = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": form_data.username}, expires=access_token_expire
    )
    return {
        "access_token": access_token,
        "token_type": "bearer"
    }


@app.get("/me")
def get_me(current_user: User = Depends(get_current_active_user)):
    return current_user

我们需要把这部分代码进行整理,我们吊证到routers中的users.py。实际上就是把之前的方法柔和到新的方法中,需要调整下之前的创建用户,把登录实现了。

我们看下修改后的代码:

from fastapi import APIRouter, status
from fastapi import Depends, HTTPException
from starlette.requests import Request

from models.crud import *
from models.schemas import UserToken
from datetime import timedelta, datetime
from jose import JWTError, jwt
from typing import Optional


SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

user_router = APIRouter()
from . import create_db
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


def verify_password(plain_password: str, hashed_password: str):
    """校验密码"""
    return pwd_context.verify(plain_password, hashed_password)


def get_password_hash(password: str):
    """密码加密"""
    return pwd_context.hash(password)


def create_access_token(data: dict, expires: Optional[timedelta] = None):
    """创建jwt"""
    to_encode = data.copy()
    if expires:
        expire_time = datetime.utcnow() + expires
    else:
        expire_time = datetime.utcnow() + timedelta(minutes=30)
    to_encode.update({"exp": expire_time})
    encode_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encode_jwt


def get_current_user(token):
    exc = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Authentication Failed",
        headers={"WWW-Authenticate": "Bearer"}
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username = payload.get("sub")
        if username is None:
            raise exc
        return username
    except JWTError:
        raise exc


@user_router.post("/user", response_model=UserOut)
def create_user(user: UserModel, db: Session = Depends(create_db)):
    return create_user_method(db, user)


@user_router.get("/user", response_model=UserOut)
def get_user(uid: int, db: Session = Depends(create_db)):
    return get_user_method(db, uid)


@user_router.post("/login")
async def login(request: Request, user: UserModel, db: Session = Depends(create_db)):
    db_user = get_user_by_email(db, user.email)
    pass

现在登录还未完全实现,接下来我们实现这个api。

这里的UserOut在schemas中实现。

class UserToken(BaseUser):
    token: str

登录的具体实现:

@user_router.post("/login")
async def login(request: Request, user: UserModel, db: Session = Depends(create_db)):
    db_user = get_user_by_email(db, user.email)
    # 密码校验
    verify = verify_password(user.password, db_user.hashed_password)
    if verify:
        # 产生token
        token = create_access_token(data={"sub": user.email})
        is_cached = await request.app.state.redis.get(user.email)
        if is_cached:
            raise HTTPException(status_code=200, detail="请勿重复登录")
        await request.app.state.redis.set(user.email, token, expire=ACCESS_TOKEN_EXPIRE_MINUTES*60)
        user_token = UserToken(token=token, email=user.email)
        return user_token
    else:
        raise HTTPException(status_code=200, detail="用户名或密码错误")

redis的相关操作还是使用上次分享时的startup和shutdown方法

我们启动测试,看是否正确,由于我们更改了创建用户时密码的hash方式,所以我们先创建用户

 接下来,我们调用登录api

这样token就产生了,redis中也有对应的缓存信息

通过本节,我们将登录的用户存储到了数据库中,并将登录后的用户token缓存到了redis,接下来我们将分享如何校验token 

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-18 14:50:04       66 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-18 14:50:04       70 阅读
  3. 在Django里面运行非项目文件

    2024-07-18 14:50:04       57 阅读
  4. Python语言-面向对象

    2024-07-18 14:50:04       68 阅读

热门阅读

  1. 网络安全-网络安全及其防护措施10

    2024-07-18 14:50:04       21 阅读
  2. 反悔贪心和例题

    2024-07-18 14:50:04       24 阅读
  3. Docker 镜像存储目录的位置修改教程

    2024-07-18 14:50:04       24 阅读
  4. 生成Elasticsearch xpack安全认证证书

    2024-07-18 14:50:04       20 阅读
  5. 计算机视觉篇5 图像的位置--边框

    2024-07-18 14:50:04       18 阅读
  6. 大龄程序员的出路在哪里?

    2024-07-18 14:50:04       22 阅读
  7. [ptrade交易实战] 第十六篇 融资融券查询类函数!

    2024-07-18 14:50:04       24 阅读
  8. 项目随笔【环境变量的获取】

    2024-07-18 14:50:04       20 阅读