常用的登录方式
网站、小程序、APP 是否已经登录所代表的状态,代表一个概念是登录态。
我们常用的登录态验证方式主要是如下3种 cookie,session,token
cookie和session 采用的是缓存机制,是需要在浏览器端或服务器端,储存用户登录状态。
两者的特点,cookie 采用的浏览器缓存 相对于 session 采用的服务器缓存机制安全较差,但后者很怕用户过多给服务器代理过大的压力。
token 提供了另外一种不需要缓存账户和密码的登录状态验证方式。首次登录后,服务器会给客户端发送一个Token,这个Token相当于门禁卡,每次访问服务器的时候携带这个Token,服务器端就可以验证是谁登录了。
但为了防止Token被盗用,我们会把Token的有效期设置的很短。Token过期后,向服务器再申请新的Token.这样就可以更加安全的使用Token方式进行登录了。
用Flask实现TOKEN登录的解决方案
环境准备
python flask模板
flask-login 登录验证模板
Redis 缓存模板
登录验证机制
通过解密TOKEN获得有效的登录用户相关信息,并把登录信息保存在服务器端的Redis内。
这样,确保即使别人盗取了你的Token.如果和服务器Redis中保存的Token不一致。同样系统会判断登录不成功。
第一步 用户登录
auth.py
通过 https://自己的服务器地址/login 登录成功,并返回 access_token和refresh_token到前端。
from flask_restful import Api,Resource
from db.models import *
admin_login_parser = reqparse.RequestParser()
admin_login_parser.add_argument("username", type=str, location='values', help='输入管理员用户名')
admin_login_parser.add_argument("password", type=str, location='values', help='输入管理员密码')
class Getlogin(Resource):
"""系统登录"""
def post(self):
args = admin_login_parser.parse_args()
admin = User.query.filter(User.username == args.username).first()
front_access_token, front_refresh_token = admin.get_id(life_time=7200, last_login_time=last_login_time, need_refresh_token=True)
rd.set_redis_data('front_access_token'+'_'+str(admin.id), front_access_token)
rd.set_redis_data('front_refresh_token'+'_'+str(admin.id),front_refresh_token)
return jsonify(get_result(0, '登录成功', {'access_token': front_access_token, 'refresh_token': front_refresh_token, 'userInfo': userInfo}))
第二步 前端保存 token 每一个新的请求中(这里用的是Uniapp前端),把access_token和refresh_token 保存在前端。
/**
* 调用微信登录
*/
function login() {
Utils.request(api.AuthLogin,{
username:'你的登录名',
password:'你的密码'
},'POST').then(res=>{
uni.setStorageSync('isLogin',true)
uni.setStorageSync('userInfo', res.data.userInfo);
uni.setStorageSync('Access-Token', res.data.access_token);
uni.setStorageSync('refresh_token',res.data.refresh_token);
setTimeout(()=>{
resolve({
userInfo:res.data.userInfo,
access_token:res.data.access_token,
})
},3000)
})
}
第三步 给前端的请求头添加access_token
uni.addInterceptor('request',{
// 在发送请求之前做一些处理
invoke(requestConfig){
// 添加请求头、身份验证等
requestConfig.header['Access-Token'] = uni.getStorageSync('Access-Token')
// requestConfig.header['Refresh-Token'] = uni.getStorageSync('refresh_token')
// 添加token到请求头
return requestConfig
},
success(response){
switch (response.data.errno) {
case 401:
uni.setStorageSync("Access-Token",'')
uni.setStorageSync("refresh_token",'')
uni.setStorageSync('isLogin',false)
uni.navigateTo({
url:'/pages/login/login/index'
})
break
case 403:
Utils.reloadMessage()
break
}
},
fail(error){
console.error('请求失败',error)
}
})
第四步 后端登录效验
这里采用了flask_login 登录模块,flask_login 这里我们从新定义 登录装饰器 login_required_token,用于效验利用token进行的登录。
def login_required_token(func):
@wraps(func)
def decorated_view(*args, **kwargs):
# 用户登录状态
if not current_user.is_authenticated:
if g.refresh:
g.refresh = False
return jsonify(base.get_result(403, 'token过期重建', {'token': 'refresh'}))
return current_app.login_manager.unauthorized()
if callable(getattr(current_app, "ensure_sync", None)):
return current_app.ensure_sync(func)(*args, **kwargs)
return func(*args, **kwargs)
return decorated_view
第五步 token过期,前端向后端发送 front_refresh_token
token 过期后,g.refresh 会设置为 True
g.refresh 为True是想后前端反馈状态码403
前端接受到403 的反馈后,执行Utils.reloadMessage函数获取新的token
public static reloadMessage = ()=>{
let header = {
'Content-Type': 'application/json; charset=UTF-8',
"Access-Control-Allow-Origin": "*",
"Refresh-Token":uni.getStorageSync('refresh_token')
}
Utils.request(api.getNewToken,{},"POST",header).then(res=>{
uni.setStorageSync('Access-Token', res.data.access_token)
})
}
第六步 前端向后端请求,获取新的Access-Token
https://自己的服务器地址/getNewToken
def check_refresh_token(func):
"""refresh_token"""
@wraps(func)
def inner(*args, **kwargs):
refresh_data = get_token_key(get_key(request.headers.get('Refresh-Token')))
if refresh_data is None:
rd.del_redis_data('front_refresh_token' + '_' + str(refresh_data.get('user_id')))
return jsonify(base.get_result(401,'系统退出2'))
try:
refresh_token = rd.get_redis_data('front_refresh_token' + '_' + str(refresh_data.get('user_id')))
refresh_token = get_token_key(refresh_token)
except Exception as e:
return jsonify(base.get_result(401,'系统退出3'))
else:
# 生成新的token,保存在g中
user = User.query.get(refresh_token.get('user_id'))
last_login_time=int(time.time())
front_access_token, front_refresh_token = user.get_id(life_time=10, last_login_time=last_login_time,
need_refresh_token=True)
rd.set_redis_data('front_access_token'+'_'+str(user.id),front_access_token)
g.new_token = front_access_token
return func(*args, **kwargs)
return inner
class GetNewToken(Resource):
"""申请新的Token"""
@check_refresh_token
def post(self):
return jsonify(get_result(0,'申请成功',{'access_token':g.new_token}))
api.add_resource(GetNewToken, '/getNewToken')
decorators.py 完整代码
from functools import wraps
from flask import session,redirect,url_for,request,jsonify,Markup,flash,abort,current_app,g
from db.models import User,Guest
from flask_login import LoginManager,login_user,logout_user,login_required,current_user,UserMixin
from apps.model import Token
import lib.base as base
import json
import apps.redis as rd
import time
#访问控制
# 实例化LoginManager类
login_manager = LoginManager()
# 提示信息
login_manager.login_message = "请先登录"
# 提示样式
login_manager.login_message_category = 'danger'
login_manager.anonymous_user = Guest
class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
def load_token(tok):
"""通过loads()方法来解析浏览器发送过来的token,从而进行初步的验证"""
try:
api_key = Token.validate_token(tok)
user = User.query.get(api_key.get('user_id'))
except Exception as e:
return None
if user:
user.token = tok
else:
print('用户不存在,令牌不正确!')
return None
return user
def get_token_key(key):
"""解密token到数据"""
try:
key = key.replace('Basic ', '', 1)
key = key.encode('utf-8')
key = Token.validate_token(key)
except Exception as e:
# except TypeError:
return None
return key
def get_key(value):
if value == '':
value = None
return value
@login_manager.unauthorized_handler
def unauthorized():
"""系统超时退出"""
return jsonify(base.get_result(401,'系统退出2'))
@login_manager.request_loader
def load_user_from_request(request):
"""效验请求中的token"""
if request.method == "POST":
g.refresh = False
"""# 请求正文中携带token时,执行如下代码
捕捉请求参数中的 'api_key' 变量,这个api_key和数据库中存储的api_key比较,确认登录身份。
"""
api_access_key = request.args.get('api_key')
if api_access_key:
user = User.query.filter_by(api_key=api_access_key).first()
if user:
return user
"""请求头中携带token,执行如下代码
捕捉请求参数中的 'X-Token' 变量,这个Access-Token通过服务端解码后,获得用户ID和Access-Token有效期,确认登录身份。
"""
# 取请求头中的access-token
api_access_key = get_key(request.headers.get('Access-Token'))
# 对比前端的token和服务器中保存的token是否一致,判断用户是否已经登录了
if api_access_key:
login_data = get_token_key(api_access_key)
# 判断token是否过期,没有过期返回用户对象。
if login_data and api_access_key == get_key(
rd.get_redis_data('front_access_token' + '_' + str(login_data.get('user_id')))):
g.user = User.query.get(login_data.get('user_id'))
return g.user
# 判断如果token已经过期了,但是front_refresh_token没有过期,执行如下代码?
else:
g.refresh = True
return None
return None
def confirm_required(func):
def decorated_function(*args, **kwargs):
if current_user.state.name == 'inactive':
logout_user()
message = Markup(
'账户未激活')
flash(message, 'warning')
return redirect(url_for('wx.login'))
return func(*args, **kwargs)
return decorated_function
def permission_required(permission_name):
def decorator(func):
@wraps(func)
def decorated_function(*args, **kwargs):
if not current_user.can(permission_name):
abort(403)
return func(*args, **kwargs)
return decorated_function
return decorator
def admin_required(func):
return permission_required('ADMINISTER')(func)
def login_required_token(func):
@wraps(func)
def decorated_view(*args, **kwargs):
# 用户登录状态
if not current_user.is_authenticated:
if g.refresh:
g.refresh = False
return jsonify(base.get_result(403, 'token过期重建', {'token': 'refresh'}))
return current_app.login_manager.unauthorized()
if callable(getattr(current_app, "ensure_sync", None)):
return current_app.ensure_sync(func)(*args, **kwargs)
return func(*args, **kwargs)
return decorated_view
def check_refresh_token(func):
"""refresh_token"""
@wraps(func)
def inner(*args, **kwargs):
refresh_data = get_token_key(get_key(request.headers.get('Refresh-Token')))
if refresh_data is None:
rd.del_redis_data('front_refresh_token' + '_' + str(refresh_data.get('user_id')))
return jsonify(base.get_result(401,'系统退出2'))
try:
refresh_token = rd.get_redis_data('front_refresh_token' + '_' + str(refresh_data.get('user_id')))
refresh_token = get_token_key(refresh_token)
except Exception as e:
return jsonify(base.get_result(401,'系统退出3'))
else:
# 生成新的token,保存在g中
user = User.query.get(refresh_token.get('user_id'))
last_login_time=int(time.time())
front_access_token, front_refresh_token = user.get_id(life_time=10, last_login_time=last_login_time,
need_refresh_token=True)
rd.set_redis_data('front_access_token'+'_'+str(user.id),front_access_token)
g.new_token = front_access_token
return func(*args, **kwargs)
return inner