環(huán)境 ws4py+
pip install ws4py
from ws4py.client.threadedclient import WebSocketClient
一、websocket協(xié)議
- 先與wss://broadcastlv.chat.bilibili.com/sub建立連接
- 發(fā)送登錄包
{
"uid": 0表示未登錄哑姚,否則為用戶ID,
"roomid": 房間ID,
"protover": 1,
"platform": "web",
"clientver": "1.4.0"
} - 每隔一段時間發(fā)送心跳包(30秒)
Python只有延遲功能threading.Timer(delay,fun)
需要自定義一個不斷循環(huán)的定時器 -
接收響應(yīng)
響應(yīng)由頭部和數(shù)據(jù)組成
圖片.png
圖片.png - 解析響應(yīng)得到數(shù)據(jù)
這里有個細節(jié) b站可能一次返回了好幾幀數(shù)據(jù)
先解析操作碼再由操作碼解析數(shù)據(jù)段 - 把數(shù)據(jù)交給主程序去處理
需要自定義一個事件類
用來實現(xiàn)事件的注冊on和分發(fā)功能emit
二祭饭、工具層 utils.py - 定時器類
可取消 - 事件類
注冊事件 (可重復(fù))
分發(fā)事件
取消事件
class Timer():
def __init__(self,delay,fun):
self.delay,self.f=delay,fun
self.t=threading.Timer(self.delay,self.fun)
self.t.start()
def fun(self):
if self.f:self.f()
self.t=threading.Timer(self.delay,self.fun)
self.t.start()
def cancel(self):
self.t.cancel()
print("threading cancel")
class Event():
def __init__(self):
self.map=[]
self.keys=[]
def index(self,k):
i=-1
for key in self.keys:
i+=1
if key==k:return i
return -1
def on(self,key,fun):
i=self.index(key)
if i==-1:
self.map.append({"key":key,"funs":[fun]})
self.keys.append(key)
else:
self.map[i]["funs"].append(fun)
def emit(self,key,data=None):
i=self.index(key)
if i==-1:
print("no regist event:"+str(key))
return
for f in self.map[i]["funs"]:f(data)
def rm(self,key,fun):
i=self.index(key)
if i==-1:
print("no regist event:"+str(key))
return
funs=self.map[i]["funs"]
for j in range(len(funs)):
if funs[j]==fun:funs[j]=None
self.map[i]["funs"]=list(filter(None,funs))
三、服務(wù)層 DanmuWS.py
opened(self) 連接建立后父類會自動調(diào)用
closed(self,code,reason) 連接關(guān)閉后父類會自動調(diào)用
需要進行斷線重連
received_message(self,message) 接收到數(shù)據(jù)時會自動調(diào)用
在這里解析數(shù)據(jù)并分發(fā)給主程序處理
send(self,data)父類發(fā)送數(shù)據(jù)的方法
sendLoginPacket 發(fā)送登錄包
sendHeartBeatPacket 發(fā)送心跳包
bind 綁定主程序處理數(shù)據(jù)和事件的函數(shù)
import threading
import json
import struct
from ws4py.client.threadedclient import WebSocketClient
from utils import Event,Timer
event=Event()
class DanmuWebSocket(WebSocketClient):
def __init__(self,info,serveraddress='wss://broadcastlv.chat.bilibili.com/sub'):
self.serveraddress=serveraddress
WebSocketClient.__init__(self,serveraddress)
DanmuWebSocket.event=event
DanmuWebSocket.headerLength=16
self.Info=info
def opened(self):
self.sendLoginPacket(self.Info['uid'],self.Info['roomid'],self.Info['protover'],self.Info['platform'],self.Info['clientver'])
self.sendHeartBeatPacket();
self.heartBeatHandler = Timer(20,self.sendHeartBeatPacket)
print("opened")
def delay_close(self):
dws=DanmuWebSocket(self.Info,self.serveraddress)
event.emit('reconnect',dws);
def closed(self, code, reason=None):
print("Closed", code, reason)
if hasattr(self,"heartBeatHandler"):self.heartBeatHandler.cancel();
if code == 1000: return
threading.Timer(5,self.delay_close).start()
print("Closed", code, reason)
def received_message(self, message):
position,length=0,len(message.data)-1
while position<length:
header_pack=struct.unpack(">IHHII",message.data[position:position+16])
length_pack=header_pack[0]
operation=header_pack[3]
if operation==3:
num=header_pack[1]+position
num=struct.unpack(">I",message.data[num:num+4])[0]
event.emit('heartbeat',num)
elif operation==5:
data=json.loads(message.data[position+16:position+length_pack])
event.emit('cmd',data)
#print("recv:"+data["cmd"])
else:
event.emit('login');
position+=length_pack
def sendData(self,data, protover = 1, operation = 2, sequence = 1):
if type(data)==dict:
data=json.dumps(data).encode()
elif type(data)==str:
data=data.encode()
header=struct.pack(">IHHII",DanmuWebSocket.headerLength+len(data),DanmuWebSocket.headerLength,protover,operation,sequence)
self.send(header+data)
def sendLoginPacket(self,uid, roomid, protover = 1, platform = 'web', clientver = '1.4.6'):
# Uint(4byte) + 00 10 + 00 01 + 00 00 00 07 + 00 00 00 01 + Data 登錄數(shù)據(jù)包
data = {
'uid': int(uid),
'roomid': int(roomid),
'protover': protover,
'platform': platform,
'clientver': clientver
}
print("sendLoginPacket")
data=json.dumps(data)
data=data.replace(' ','')
self.sendData(data.encode(),1,7,1)
def sendHeartBeatPacket(self):
# Uint(4byte) + 00 10 + 00 01 + 00 00 00 02 + 00 00 00 01 + Data 心跳數(shù)據(jù)包
self.sendData(b'[object Object]', 1, 2, 1);
def bind(self,onreconnect=None,onlogin=None,onheartbeat=None,oncmd=None,onreceive =None):
if "cmd" in event.keys:return
if hasattr(onreconnect,"__call__"):event.on("reconnect",onreconnect)
if hasattr(onlogin,"__call__"):event.on("login",onlogin)
if hasattr(onheartbeat,"__call__"):event.on("heartbeat",onheartbeat)
if hasattr(oncmd,"__call__"):event.on("cmd",oncmd)
if hasattr(onreceive,"__call__"):event.on("receive",onreceive)
四叙量、測試代碼
利用前面寫的二維碼登錄代碼
登錄上Bilibili獲取個人uid
oncmd 處理彈幕數(shù)據(jù)
onlogin 連接成功時的處理函數(shù)
onreconnect 斷線重連時調(diào)用更新ws
onheartbeat 處理服務(wù)器發(fā)送的直播間人氣值
from server import Login
headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:62.0) Gecko/20100101 Firefox/62.0',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2',
'Accept-Encoding': 'gzip, deflate, br',
'Referer': 'https://live.bilibili.com/',
'Origin': 'https://live.bilibili.com',
'Connection': 'keep-alive'
}
s=session(headers,'cookie.txt')
login=Login(s)
while not login.isLogin():
login.get_vdcode()
login.loop_vdcode()
info={
"uid": login.info['uid'],
"roomid": 7603080,
"protover": 1,
"platform": "web",
"clientver": "1.4.0"
}
from DanmuWS import DanmuWebSocket
def oncmd(data):
cmd=data["cmd"]
if cmd=="SYS_MSG":
print(data)
elif cmd=="SPECIAL_GIFT":
print(data)
else:
print(data)
def onlogin(data):
print("login success")
def onreconnect(dws):
global ws
ws=dws
def onheartbeat(num):
print(num)
try:
ws = DanmuWebSocket(info,'wss://broadcastlv.chat.bilibili.com/sub')
ws.connect()
ws.bind(None,onlogin,onheartbeat,oncmd)
ws.run_forever()
except:
ws.close()
圖片.png