1 消費異常關鍵代碼分析
1.1 獲取消息
Message message = canalConnector.getWithoutAck(BatchSize());
long batchId = message.getId();
getWithoutAck(batchSize)是基于所謂流式調用的寂呛,應該為了支持多線程調用提高消費的效率镊尺,但是這在不覺間就有可能埋下調用的疑問,尤其是在消費message時出異常時刀崖,客戶端需要重試消費炬转;
1.2 處理message出現(xiàn)異常笙瑟,重試處理
重試處理的只能是對出現(xiàn)異常的message進行再次處理,不能再次調用getWithoutAck(batchSize)
1.3 重試處理message失敗后的處理
可以rollback()或者默認處理即告訴canal服務端這一批數(shù)據(jù)處理成功喷橙,可以刪除
2 與canal服務端建立連接
2.1 原因分析:
canal client與canal服務端建立連接成功后才能接收服務端發(fā)來的message進行消費啥么,但是客戶端并不能保證一定能與服務端建立連接或者說一直保持連接,因為難免出現(xiàn)網絡問題或者服務宕機的情況贰逾,為此客戶端為了更為健壯悬荣,需要重點處理建立連接的邏輯
2.2 建立連接關鍵代碼
解決的思路很簡單,調用CanalConnector.connect()不斷進行連接即可
2.3 關于建立連接的坑
2.3.1 失去連接的異常捕獲
canal client阿里官方封裝的異常CanalClientException疙剑,會封裝IOException與ConnectException
2.3.2 再次連接的disconnect
項目代碼建立連接的邏輯是不重新new CanalConnector氯迂,而是利用已經存在CanalConnector再次去建立連接践叠,這時候就需要先調用disconnect()去改變連接狀態(tài),然后才能與canal服務端重新建立連接嚼蚀,不然canal服務端會因為connector的連接狀態(tài)為true(private volatile boolean connected;)拒絕再次連接禁灼;