使用channels實(shí)現(xiàn)websocket伤提,利用restframework-jwt功能進(jìn)行token驗(yàn)證衅谷。
流程如下:
Http登陸-->獲取token-->websocket請(qǐng)求連接攜帶token-->自定義channels的Authentication-->驗(yàn)證token-->允許接入
自定義channels的Authentication官方鏈接
下面的是rest_framework_jwt(不是channels)的JSONWebTokenAuthentication
默認(rèn)的token驗(yàn)證方法, 此方法是同步的涝缝,還執(zhí)行了user = self.authenticate_credentials(payload)
數(shù)據(jù)庫(kù)的查詢畅铭,這些操作都要修改為異步的(channels的異步才能盡其能)
def authenticate(self, request):
"""
Returns a two-tuple of `User` and token if a valid signature has been
supplied using JWT-based authentication. Otherwise returns `None`.
"""
jwt_value = self.get_jwt_value(request)
if jwt_value is None:
return None
try:
payload = jwt_decode_handler(jwt_value)
except jwt.ExpiredSignature:
msg = _('Signature has expired.')
raise exceptions.AuthenticationFailed(msg)
except jwt.DecodeError:
msg = _('Error decoding signature.')
raise exceptions.AuthenticationFailed(msg)
except jwt.InvalidTokenError:
raise exceptions.AuthenticationFailed()
user = self.authenticate_credentials(payload)
return (user, jwt_value)
上面的代碼是從request中獲取token的菠隆,但websocket提供的是scope兵琳。
下面是channels的官方樣例的權(quán)限驗(yàn)證的中間件。
class QueryAuthMiddleware:
"""
Custom middleware (insecure) that takes user IDs from the query string.
"""
def __init__(self, inner):
# Store the ASGI application we were passed
self.inner = inner
def __call__(self, scope):
# Close old database connections to prevent usage of timed out connections
#close_old_connections()
# Look up user from query string (you should also do things like
# checking if it is a valid user ID, or if scope["user"] is already
# populated).
user = User.objects.get(id=int(scope["query_string"]))
# Return the inner application directly and let it run everything else
return self.inner(dict(scope, user=user))
從上面的JWT和自定義授權(quán)類中得知骇径,支持websocket的JWT驗(yàn)證要做的兩件事:
- 從socpe中獲取token
- 調(diào)用
JSONWebTokenAuthentication
驗(yàn)證Token躯肌,并獲取用戶
從channels的官方文檔說(shuō)明中,執(zhí)行數(shù)據(jù)庫(kù)的查詢要使用channels.db.database_sync_to_async
破衔,即是JSONWebTokenAuthentication
類中的authenticate
方法要變?yōu)楫惒角迮移鋬?nèi)部調(diào)用的用戶查詢也得是database_sync_to_async
。
修改代碼晰筛,支持異步的websocket支持JWT如下:
新增WebsocketTokenAuthentication
類(繼承JSONWebTokenAuthentication
)嫡丙,修改部分有:
- 從scope中獲取url中的token。
- 修改其用戶數(shù)據(jù)庫(kù)查詢代碼读第,支持異步曙博。
class WebsocketTokenAuthentication(JSONWebTokenAuthentication):
"""
支持channels中間件的token處理和驗(yàn)證。
"""
def get_jwt_value(self, scope):
"""
websocket的url:/ws/chat/<str:room_name>?token
token格式: 'JWT eyJ0eXAiOiJKV1QiLCJhbGciOiJ'
"""
token = str(scope['query_string'].decode('utf-8'))
token = urllib.parse.unquote(token)
token = token.split(" ")[1].encode('utf-8')
return token
async def authenticate_credentials(self, payload):
"""
覆蓋此方法的user查詢卦方,使得其支持異步羊瘩。
Returns an active user that matches the payload's user id and email.
"""
User = get_user_model()
username = jwt_get_username_from_payload(payload)
if not username:
msg = _('Invalid payload.')
raise exceptions.AuthenticationFailed(msg)
try:
# user = User.objects.get_by_natural_key(username)
user = await self._get_user_by_natural_key(User, username)
except User.DoesNotExist:
msg = _('Invalid signature.')
raise exceptions.AuthenticationFailed(msg)
if not user.is_active:
msg = _('User account is disabled.')
raise exceptions.AuthenticationFailed(msg)
return user
@database_sync_to_async
def _get_user_by_natural_key(self, User, username):
return User.objects.get_by_natural_key(username)
下面就是創(chuàng)建一個(gè)QueryAuthMiddleware
中間件泰佳,從WebsocketTokenAuthentication
驗(yàn)證token 并獲取用戶盼砍。
class QueryAuthMiddleware:
"""
Custom middleware (insecure) that takes user IDs from the query string.
"""
def __init__(self, inner):
# Store the ASGI application we were passed
self.inner = inner
def __call__(self, scope):
# Close old database connections to prevent usage of timed out connections
# close_old_connections()
# Look up user from query string (you should also do things like
# checking if it is a valid user ID, or if scope["user"] is already
# populated).
auth = WebsocketTokenAuthentication()
user, token = auth.authenticate(scope)
# Return the inner application directly and let it run everything else
return self.inner(dict(scope, user=user))
在routing.py中注冊(cè)中間件
application = ProtocolTypeRouter({
# 'websocket': AuthMiddlewareStack(
# URLRouter(
# chat.routing.websocket_urlpatterns
# )
# ),
'websocket': QueryAuthMiddleware(
URLRouter(
chat.routing.websocket_urlpatterns
)
),
})
至此尘吗,修改完畢。
使得可以使用AsyncWebsocketConsumer
的Consumer和支持JWT驗(yàn)證浇坐。
后面只需要在consumer中驗(yàn)證是否存在用戶睬捶,驗(yàn)證則accept。
class ChatConsumer(AsyncWebsocketConsumer):
async def connect(self):
user = await self.scope['user']
if not user:
await self.close()
else:
self.room_name = self.scope['url_route']['kwargs']['room_name']
self.room_group_name = 'chat_%s' % self.room_name
# Join room group
await self.channel_layer.group_add(
self.room_group_name,
self.channel_name
)
await self.accept()