Token的配置
生成Token: TimedJSONWebSignatureSerializer進行序列化
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
from flask import current_app, jsonify
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
from app.libs.enums import ClientTypeEnum
from app.libs.redprint import RedPrint
from app.models.user import User
from app.validators.forms import ClientForm
api = RedPrint('token')
@api.route('', methods=['POST'])
def get_token():
form = ClientForm().validate_for_api()
promise = {
ClientTypeEnum.USER_EMAIL: User.verify
}
identify = promise[ClientTypeEnum(form.type.data)](
form.account.data,
form.secret.data
)
# 生成令牌
expiration = current_app.config['TOKEN_EXPIRATION']
token = generate_auth_token(identify['uid'],
form.type.data,
identify['scope'],
expiration)
# 注意需要進行 ascii 編碼
t = {
'token': token.decode('ascii')
}
return jsonify(t, 201)
def generate_auth_token(uid, ac_type, scope=None, expiration=7200):
"""
生成token议谷,將用戶的id,作用域鞠眉,用戶類型猾封,過期時間寫入token
:param uid: 用戶id
:param ac_type: 用戶類型
:param scope: 權(quán)限域
:param expiration: 過期時間 秒
:return:
"""
s = Serializer(current_app.config['SECRET_KEY'],
expires_in=expiration,
)
return s.dumps({'uid': uid,
'type': ac_type.value,
'scope': scope
})
Token的驗證,在用戶調(diào)取接口等相關(guān)的操作的時候奢米,對token進行驗證的操作淹真,傳遞token是在http的請求頭中 Authorization 字段中
#使用示例讶迁,注意必須放到放到方法名上
@api.route('', methods=['GET'])
@auth.login_required
def get_user():
uid = g.user.uid
user = User.query.filter_by(id=uid).first_or_404()
return jsonify(user)
#驗證相關(guān)代碼
from collections import namedtuple
from flask import current_app, g, request
from flask_httpauth import HTTPBasicAuth
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer, BadSignature, SignatureExpired
from app.libs.error_code import AuthFailed, Forbidden
from app.libs.scope import is_in_scope
auth = HTTPBasicAuth()
#使用nametuple生成類
User = namedtuple('User', ['uid', 'ac_type', 'scope'])
@auth.verify_password
def verify_password(token, password):
# http的協(xié)議規(guī)范
# key = Authorization
# value = basic base64(rjl:111111)
user_info = verify_auth_token(token)
if not user_info:
return False
else:
g.user = user_info
return True
# 驗證不通過,直接返回相關(guān)的信息,從token中獲取用戶相關(guān)的信息
def verify_auth_token(token):
s = Serializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token)
except BadSignature:
raise AuthFailed(msg='token is invalid', error_code=1002)
except SignatureExpired:
raise AuthFailed(msg='token is expired', error_code=1003)
uid = data['uid']
ac_type = data['type']
scope = data['scope']
# request 視圖函數(shù)
allow = is_in_scope(scope, request.endpoint)
if not allow:
raise Forbidden()
return User(uid, ac_type, scope)
用戶對接口操作權(quán)限的判斷
在用戶登錄操作的時候把用戶的等級標志寫入token中核蘸,根據(jù)token的信息巍糯,與當前用戶的等級進行判斷!
將相應(yīng)用戶可以訪問的視圖函數(shù),存儲起來客扎,當用戶請求的時候祟峦,查看是> 否在該作用域中。
定義作于域:
class Scope:
# 視圖函數(shù)級別的區(qū)分
allow_api = []
# 模塊級別的定義區(qū)分
allow_module = []
# 排除
forbidden_api = []
def __add__(self, other):
self.allow_api = self.allow_api + other.allow_api
# set去重徙鱼,再轉(zhuǎn)為list
self.allow_api = list(set(self.allow_api))
self.allow_module = self.allow_module + other.allow_module
self.allow_module = list(set(self.allow_module))
return self
#定義管理權(quán)限組
class AdminScope(Scope):
allow_module = [
'v1.user'
]
# def __init__(self):
# self + UserScope()
#普通用戶組
class UserScope(Scope):
allow_api = [
'v1.user+get_user'
]
def __init__(self):
pass
#超級管理員組
class SuperScope(Scope):
"""
超級管理員宅楞,權(quán)限相加
"""
allow_api = [
]
allow_module = [
'v1.user'
]
def __init__(self):
self + AdminScope() + UserScope()
#驗證權(quán)限
def is_in_scope(scope, endpoint):
"""
根據(jù)視圖函數(shù)名判斷,該視圖函數(shù)是否在 權(quán)限組中
:param scope:
:param endpoint:
:return:
"""
# 反射 ,注意endpoint的路徑是帶有 blueprint的路徑的
scope = globals()[scope]()
# 查看定義 redprint的 位置
splits = endpoint.split('+')
red_name = splits[0]
if endpoint in scope.forbidden_api:
return False
if endpoint in scope.allow_api:
return True
if red_name in scope.allow_module:
return True
else:
return False
與token的數(shù)據(jù)進行校驗
. . .
uid = data['uid']
ac_type = data['type']
scope = data['scope']
# request 視圖函數(shù)
allow = is_in_scope(scope, request.endpoint)
if not allow:
raise Forbidden()
. . .
tips:模塊級別的校驗袱吆,需要更改 redprint的 endpoint
class RedPrint:
def __init__(self, name):
self.name = name
self.mound = []
def route(self, rule, **options):
def decorator(f):
self.mound.append((f, rule, options))
return f
return decorator
def register(self, bp, url_prefix=None):
"""
將 redprint注冊到 blueprint厌衙,實際調(diào)用 blueprint代碼
"""
if url_prefix is None:
url_prefix = '/' + self.name
for f, rule, options in self.mound:
# 自定義 endpoint
endpoint = self.name + '+' + options.pop("endpoint", f.__name__)
bp.add_url_rule(url_prefix + rule, endpoint, f, **options)