前言
???????? 上篇文章給大家講解了如何安裝一個(gè)Canal,以及講解了一部分的原理焚鲜,今天我們就來(lái)深度聊一聊Canal的工作流程,以及他是怎么工作的,以及架構(gòu)師怎樣的钮莲。?????????
? ? ? ? ? ? 首先我們深度了解Canal時(shí)必須深度了解了一下MySQL主從復(fù)制原理。
一序目、MySQL主從復(fù)制
- MySQL master
將數(shù)據(jù)變更寫(xiě)入二進(jìn)制日志
( binary log, 其中記錄叫做二進(jìn)制日志事件 ?log events
臂痕,可以通過(guò)show binlog events
進(jìn)行查看) - MySQL slave 將 master 的 binary log events 拷貝到它的中繼日志(relay log)
- MySQL slave 重放 relay log 中事件,將數(shù)據(jù)變更反映它自己的數(shù)據(jù)猿涨,以此來(lái)達(dá)到數(shù)據(jù)一致握童。
MySQL的binLog?????????
? ? ? ?它記錄了所有的DDL和DML(除了數(shù)據(jù)查詢(xún)語(yǔ)句)語(yǔ)句,以事件形式記錄叛赚,還包含語(yǔ)句所執(zhí)行的消耗的時(shí)間澡绩。主要用來(lái)備份和數(shù)據(jù)同步稽揭。binlog 有三種:STATEMENT
、ROW
肥卡、MIXED
- STATEMENT 記錄的是執(zhí)行的sql語(yǔ)句
- ROW 記錄的是真實(shí)的行數(shù)據(jù)記錄
- MIXED 記錄的是1+2溪掀,優(yōu)先按照1的模式記錄
名詞解釋:什么是中繼日志
???????? 從服務(wù)器I/O線程將主服務(wù)器的二進(jìn)制日志讀取過(guò)來(lái)記錄到從服務(wù)器本地文件,然后從服務(wù)器SQL線程會(huì)讀取relay-log日志的內(nèi)容并應(yīng)用到從服務(wù)器步鉴,從而使從服務(wù)器和主服務(wù)器的數(shù)據(jù)保持一致
二揪胃、Canal架構(gòu)
- server 代表一個(gè) canal 運(yùn)行實(shí)例,對(duì)應(yīng)于一個(gè) jvm
- instance 對(duì)應(yīng)于一個(gè)數(shù)據(jù)隊(duì)列 (1個(gè) canal server 對(duì)應(yīng) 1..n 個(gè) instance )
- instance 下的子模塊
- eventParser: 數(shù)據(jù)源接入氛琢,模擬 slave 協(xié)議和 master 進(jìn)行交互喊递,協(xié)議解析
- eventSink: Parser 和 Store 鏈接器,進(jìn)行數(shù)據(jù)過(guò)濾阳似,加工骚勘,分發(fā)的工作
- eventStore: 數(shù)據(jù)存儲(chǔ)
- metaManager: 增量訂閱 & 消費(fèi)信息管理器
- EventSink起到一個(gè)類(lèi)似channel的功能,可以對(duì)數(shù)據(jù)進(jìn)行過(guò)濾撮奏、分發(fā)/路由(1:n)俏讹、歸并(n:1)和加工。EventSink是連接EventParser和EventStore的橋梁畜吊。
- EventStore實(shí)現(xiàn)模式是內(nèi)存模式泽疆,內(nèi)存結(jié)構(gòu)為環(huán)形隊(duì)列,由三個(gè)指針(Put定拟、Get和Ack)標(biāo)識(shí)數(shù)據(jù)存儲(chǔ)和讀取的位置于微。
- MetaManager是增量訂閱&消費(fèi)信息管理器,增量訂閱和消費(fèi)之間的協(xié)議包括get/ack/rollback青自,分別為:
- Message getWithoutAck(int batchSize)株依,允許指定batchSize,一次可以獲取多條延窜,每次返回的對(duì)象為Message恋腕,包含的內(nèi)容為:batch id[唯一標(biāo)識(shí)]和entries[具體的數(shù)據(jù)對(duì)象]
- void rollback(long batchId),顧名思義逆瑞,回滾上次的get請(qǐng)求荠藤,重新獲取數(shù)據(jù)』窀撸基于get獲取的batchId進(jìn)行提交哈肖,避免誤操作
- void ack(long batchId),顧名思議念秧,確認(rèn)已經(jīng)消費(fèi)成功淤井,通知server刪除數(shù)據(jù)。基于get獲取的batchId進(jìn)行提交币狠,避免誤操作
三游两、server/client交互協(xié)議
???????? canal client與canal server之間是C/S模式的通信,客戶(hù)端采用NIO漩绵,服務(wù)端采用Netty贱案。canal server啟動(dòng)后,如果沒(méi)有canal client止吐,那么canal server不會(huì)去mysql拉取binlog柒室。即Canal客戶(hù)端主動(dòng)發(fā)起拉取請(qǐng)求婿崭,服務(wù)端才會(huì)模擬一個(gè)MySQL Slave節(jié)點(diǎn)去主節(jié)點(diǎn)拉取binlog危队。通常Canal客戶(hù)端是一個(gè)死循環(huán)宙枷,這樣客戶(hù)端一直調(diào)用get方法,服務(wù)端也就會(huì)一直拉取binlog
BIO蕴忆、NIO、AIO的區(qū)別
IO的方式通常分為幾種悲幅,同步阻塞的BIO
套鹅、同步非阻塞的NIO
、異步非阻塞的AIO
汰具。
異步非阻塞IO:在此種模式下卓鹿,用戶(hù)進(jìn)程只需要發(fā)起一個(gè)IO操作然后立即返回,等IO操作真正的完成以后留荔,應(yīng)用程序會(huì)得到IO操作完成的通知吟孙,此時(shí)用戶(hù)進(jìn)程只需要對(duì)數(shù)據(jù)進(jìn)行處理就好了,不需要進(jìn)行實(shí)際的IO讀寫(xiě)操作聚蝶,因?yàn)檎嬲腎O讀取或者寫(xiě)入操作已經(jīng)由內(nèi)核完成了杰妓。目前Java中還沒(méi)有支持此種IO模型。
- handshake碘勉,
- ClientAuthentication巷挥。
canal client調(diào)用subscribe()
方法,類(lèi)型為[subscription]验靡。對(duì)應(yīng)服務(wù)端采用netty處理RPC請(qǐng)求(CanalServerWithNetty
):
public?class?CanalServerWithNetty?extends?AbstractCanalLifeCycle?implements?CanalServer?{
????public?void?start()?{
????????bootstrap.setPipelineFactory(new?ChannelPipelineFactory()?{
????????????public?ChannelPipeline?getPipeline()?throws?Exception?{
????????????????ChannelPipeline?pipelines?=?Channels.pipeline();
????????????????pipelines.addLast(FixedHeaderFrameDecoder.class.getName(),?new?FixedHeaderFrameDecoder());
????????????????//?處理客戶(hù)端的HANDSHAKE請(qǐng)求
????????????????pipelines.addLast(HandshakeInitializationHandler.class.getName(),
????????????????????new?HandshakeInitializationHandler(childGroups));
????????????????//?處理客戶(hù)端的CLIENTAUTHENTICATION請(qǐng)求
????????????????pipelines.addLast(ClientAuthenticationHandler.class.getName(),
????????????????????new?ClientAuthenticationHandler(embeddedServer));
????????????????//?處理客戶(hù)端的會(huì)話(huà)請(qǐng)求倍宾,包括SUBSCRIPTION,GET等
????????????????SessionHandler?sessionHandler?=?new?SessionHandler(embeddedServer);
????????????????pipelines.addLast(SessionHandler.class.getName(),?sessionHandler);
????????????????return?pipelines;
????????????}
????????});
????}
}
ClientAuthenticationHandler處理鑒權(quán)后胜嗓,會(huì)移除HandshakeInitializationHandler和ClientAuthenticationHandler高职。最重要的是會(huì)話(huà)處理器SessionHandler。
以client發(fā)送GET辞州,server從mysql得到binlog后怔锌,返回MESSAGES給client為例,說(shuō)明client和server的rpc交互過(guò)程:
SimpleCanalConnector發(fā)送GET請(qǐng)求,并讀取響應(yīng)結(jié)果的流程:
public?Message?getWithoutAck(int?batchSize,?Long?timeout,?TimeUnit?unit)?throws?CanalClientException?{
????waitClientRunning();
????int?size?=?(batchSize?<=?0)???1000?:?batchSize;
????long?time?=?(timeout?==?null?||?timeout?<?0)???-1?:?timeout;?//?-1代表不做timeout控制
????if?(unit?==?null)?unit?=?TimeUnit.MILLISECONDS;??//默認(rèn)是毫秒
????//?client發(fā)送GET請(qǐng)求
????writeWithHeader(Packet.newBuilder()
????????.setType(PacketType.GET)
????????.setBody(Get.newBuilder()
????????????.setAutoAck(false)
????????????.setDestination(clientIdentity.getDestination())
????????????.setClientId(String.valueOf(clientIdentity.getClientId()))
????????????.setFetchSize(size)
????????????.setTimeout(time)
????????????.setUnit(unit.ordinal())
????????????.build()
????????????.toByteString())
????????.build()
????????.toByteArray());
????//?client獲取GET結(jié)果????
????return?receiveMessages();
}
private?Message?receiveMessages()?throws?IOException?{
????//?讀取server發(fā)送的數(shù)據(jù)包
????Packet?p?=?Packet.parseFrom(readNextPacket());
????switch?(p.getType())?{
????????case?MESSAGES:?{
????????????Messages?messages?=?Messages.parseFrom(p.getBody());
????????????Message?result?=?new?Message(messages.getBatchId());
????????????for?(ByteString?byteString?:?messages.getMessagesList())?{
????????????????result.addEntry(Entry.parseFrom(byteString));
????????????}
????????????return?result;
????????}
????}
}
服務(wù)端SessionHandler處理客戶(hù)端發(fā)送的GET請(qǐng)求流程:
case?GET:
????//?讀取客戶(hù)端發(fā)送的數(shù)據(jù)包产禾,封裝為Get對(duì)象
????Get?get?=?CanalPacket.Get.parseFrom(packet.getBody());
????//?destination表示canal?instance
????if?(StringUtils.isNotEmpty(get.getDestination())?&&?StringUtils.isNotEmpty(get.getClientId()))?{
????????clientIdentity?=?new?ClientIdentity(get.getDestination(),?Short.valueOf(get.getClientId()));
????????Message?message?=?null;
????????if?(get.getTimeout()?==?-1)?{//?是否是初始值
????????????message?=?embeddedServer.getWithoutAck(clientIdentity,?get.getFetchSize());
????????}?else?{
????????????TimeUnit?unit?=?convertTimeUnit(get.getUnit());
????????????message?=?embeddedServer.getWithoutAck(clientIdentity,?get.getFetchSize(),?get.getTimeout(),?unit);
????????}
????????//?設(shè)置返回給客戶(hù)端的數(shù)據(jù)包類(lèi)型為MESSAGES???
????????Packet.Builder?packetBuilder?=?CanalPacket.Packet.newBuilder();
????????packetBuilder.setType(PacketType.MESSAGES);
????????//?構(gòu)造Message
????????Messages.Builder?messageBuilder?=?CanalPacket.Messages.newBuilder();
????????messageBuilder.setBatchId(message.getId());
????????if?(message.getId()?!=?-1?&&?!CollectionUtils.isEmpty(message.getEntries()))?{
????????????for?(Entry?entry?:?message.getEntries())?{
????????????????messageBuilder.addMessages(entry.toByteString());
????????????}
????????}
????????packetBuilder.setBody(messageBuilder.build().toByteString());
????????//?輸出數(shù)據(jù)排作,返回給客戶(hù)端
????????NettyUtils.write(ctx.getChannel(),?packetBuilder.build().toByteArray(),?null);
????}
具體的網(wǎng)絡(luò)協(xié)議格式,可參見(jiàn):CanalProtocol.proto
get/ack/rollback協(xié)議介紹:
- Message getWithoutAck(int batchSize)
- batch id 唯一標(biāo)識(shí)
- entries 具體的數(shù)據(jù)對(duì)象亚情,對(duì)應(yīng)的數(shù)據(jù)對(duì)象格式:EntryProtocol.proto
- 允許指定batchSize妄痪,一次可以獲取多條,每次返回的對(duì)象為Message楞件,包含的內(nèi)容為:
- getWithoutAck(int batchSize, Long timeout, TimeUnit unit)
- 拿夠batchSize條記錄或者超過(guò)timeout時(shí)間
- timeout=0衫生,阻塞等到足夠的batchSize
- 相比于getWithoutAck(int batchSize),允許設(shè)定獲取數(shù)據(jù)的timeout超時(shí)時(shí)間
- void rollback(long batchId)
- 回滾上次的get請(qǐng)求土浸,重新獲取數(shù)據(jù)罪针。基于get獲取的batchId進(jìn)行提交黄伊,避免誤操作
- void ack(long batchId)
- 確認(rèn)已經(jīng)消費(fèi)成功泪酱,通知server刪除數(shù)據(jù)』棺睿基于get獲取的batchId進(jìn)行提交墓阀,避免誤操作
EntryProtocol.protod對(duì)應(yīng)的canal消息結(jié)構(gòu)如下:
Entry??
????Header??
????????logfileName?[binlog文件名]??
????????logfileOffset?[binlog?position]??
????????executeTime?[binlog里記錄變更發(fā)生的時(shí)間戳,精確到秒]??
????????schemaName???
????????tableName??
????????eventType?[insert/update/delete類(lèi)型]??
????entryType???[事務(wù)頭BEGIN/事務(wù)尾END/數(shù)據(jù)ROWDATA]??
????storeValue??[byte數(shù)據(jù),可展開(kāi),對(duì)應(yīng)的類(lèi)型為RowChange]??
??????
RowChange??
????isDdl???????[是否是ddl變更操作拓轻,比如create?table/drop?table]??
????sql?????????[具體的ddl?sql]??
????rowDatas????[具體insert/update/delete的變更數(shù)據(jù)斯撮,可為多條,1個(gè)binlog?event事件可對(duì)應(yīng)多條變更扶叉,比如批處理]??
????????beforeColumns?[Column類(lèi)型的數(shù)組勿锅,變更前的數(shù)據(jù)字段]??
????????afterColumns?[Column類(lèi)型的數(shù)組,變更后的數(shù)據(jù)字段]??
??????????
Column???
????index?????????
????sqlType?????[jdbc?type]??
????name????????[column?name]??
????isKey???????[是否為主鍵]??
????updated?????[是否發(fā)生過(guò)變更]??
????isNull??????[值是否為null]??
????value???????[具體的內(nèi)容枣氧,注意為string文本]
SessionHandler中服務(wù)端處理客戶(hù)端的其他類(lèi)型請(qǐng)求溢十,都會(huì)調(diào)用CanalServerWithEmbedded的相關(guān)方法:
case?SUBSCRIPTION:
????????Sub?sub?=?Sub.parseFrom(packet.getBody());
????????embeddedServer.subscribe(clientIdentity);
case?GET:
????????Get?get?=?CanalPacket.Get.parseFrom(packet.getBody());
????????message?=?embeddedServer.getWithoutAck(clientIdentity,?get.getFetchSize());
case?CLIENTACK:
????????ClientAck?ack?=?CanalPacket.ClientAck.parseFrom(packet.getBody());
????????embeddedServer.ack(clientIdentity,?ack.getBatchId());
case?CLIENTROLLBACK:
????????ClientRollback?rollback?=?CanalPacket.ClientRollback.parseFrom(packet.getBody());
????????embeddedServer.rollback(clientIdentity);//?回滾所有批次
所以真正的處理邏輯在CanalServerWithEmbedded
中,下面重點(diǎn)來(lái)了作瞄。茶宵。。
3.1 CanalServerWithEmbedded
???????? CanalServer包含多個(gè)Instance宗挥,它的成員變量canalInstances
記錄了instance名稱(chēng)與實(shí)例的映射關(guān)系乌庶。????????
? ? ? ? 因?yàn)槭且粋€(gè)Map,所以同一個(gè)Server不允許出現(xiàn)相同instance名稱(chēng)(本例中實(shí)例名稱(chēng)為example)契耿,比如不能同時(shí)有兩個(gè)example在一個(gè)server上瞒大。但是允許一個(gè)Server上有example1和example2。
注意:CanalServer
中最重要的是CanalServerWithEmbedded
搪桂,而CanalServerWithEmbedded中最重要的是CanalInstance
透敌。
理解下各個(gè)組件的對(duì)應(yīng)關(guān)系:
- Canal Client通過(guò)destination找出Canal Server中對(duì)應(yīng)的Canal Instance盯滚。
- 一個(gè)Canal Server可以配置多個(gè)Canal Instances。
下面以CanalServerWithEmbedded的訂閱方法為例:
- 根據(jù)客戶(hù)端標(biāo)識(shí)獲取CanalInstance
- 向CanalInstance的元數(shù)據(jù)管理器訂閱當(dāng)前客戶(hù)端
- 從元數(shù)據(jù)管理中獲取客戶(hù)端的游標(biāo)
- 通知CanalInstance訂閱關(guān)系發(fā)生變化
注意:提供訂閱方法的作用是:MySQL新增了一張表酗电,客戶(hù)端原先沒(méi)有同步這張表魄藕,現(xiàn)在需要同步,所以需要重新訂閱撵术。
public?void?subscribe(ClientIdentity?clientIdentity)?throws?CanalServerException?{
???//?ClientIdentity表示Canal?Client客戶(hù)端背率,從中可以獲取出客戶(hù)端指定連接的Destination
???//?由于CanalServerWithEmbedded記錄了每個(gè)Destination對(duì)應(yīng)的Instance,可以獲取客戶(hù)端對(duì)應(yīng)的Instance
???CanalInstance?canalInstance?=?canalInstances.get(clientIdentity.getDestination());
???if?(!canalInstance.getMetaManager().isStart())?{
???????canalInstance.getMetaManager().start();?//?啟動(dòng)Instance的元數(shù)據(jù)管理器
???}
???canalInstance.getMetaManager().subscribe(clientIdentity);?//?執(zhí)行一下meta訂閱
???Position?position?=?canalInstance.getMetaManager().getCursor(clientIdentity);
???if?(position?==?null)?{
???????position?=?canalInstance.getEventStore().getFirstPosition();//?獲取一下store中的第一條
???????if?(position?!=?null)?{
???????????canalInstance.getMetaManager().updateCursor(clientIdentity,?position);?//?更新一下cursor
???????}
???}
???//?通知下訂閱關(guān)系變化
???canalInstance.subscribeChange(clientIdentity);
}
每個(gè)CanalInstance中包括了四個(gè)組件:EventParser嫩与、EventSink寝姿、EventStore、MetaManager划滋。
服務(wù)端主要的處理方法包括get/ack/rollback饵筑,這三個(gè)方法都會(huì)用到Instance上面的幾個(gè)內(nèi)部組件,主要還是EventStore和MetaManager:
在這之前处坪,要先理解EventStore的含義根资,EventStore是一個(gè)RingBuffer,有三個(gè)指針:Put同窘、Get嫂冻、Ack。
- Put: Canal Server從MySQL拉取到數(shù)據(jù)后塞椎,放到內(nèi)存中,Put增加
- Get: 消費(fèi)者(Canal Client)從內(nèi)存中消費(fèi)數(shù)據(jù)睛低,Get增加
- Ack: 消費(fèi)者消費(fèi)完成案狠,Ack增加。并且會(huì)刪除Put中已經(jīng)被Ack的數(shù)據(jù)
- 如果timeout為null钱雷,則采用tryGet方式骂铁,即時(shí)獲取
- 如果timeout不為null
- timeout為0,則采用get阻塞方式罩抗,獲取數(shù)據(jù)拉庵,不設(shè)置超時(shí),直到有足夠的batchSize數(shù)據(jù)才返回
- timeout不為0套蒂,則采用get+timeout方式钞支,獲取數(shù)據(jù),超時(shí)還沒(méi)有batchSize足夠的數(shù)據(jù)操刀,有多少返回多少
private?Events<Event>?getEvents(CanalEventStore?eventStore,?Position?start,?int?batchSize,?Long?timeout,????????????????????????????????TimeUnit?unit)?{
????if?(timeout?==?null)?{
????????return?eventStore.tryGet(start,?batchSize);?//?即時(shí)獲取
????}?else?if?(timeout?<=?0){
????????return?eventStore.get(start,?batchSize);?//?阻塞獲取
????}?else?{
????????return?eventStore.get(start,?batchSize,?timeout,?unit);?//?異步獲取
????}
}
注意:EventStore的實(shí)現(xiàn)采用了類(lèi)似Disruptor的RingBuffer環(huán)形緩沖區(qū)烁挟。RingBuffer的實(shí)現(xiàn)類(lèi)是MemoryEventStoreWithBuffer
get方法和getWithoutAck方法的區(qū)別是:
- get方法會(huì)立即調(diào)用ack
- getWithoutAck方法不會(huì)調(diào)用ack
3.2 ?EventStore
下面是Put填充環(huán)形緩沖區(qū)的代碼,檢查可用slot(checkFreeSlotAt方法)在幾個(gè)put方法中骨坑。
public?class?MemoryEventStoreWithBuffer?extends?AbstractCanalStoreScavenge?implements?CanalEventStore<Event>,?CanalStoreScavenge?{
????private?static?final?long?INIT_SQEUENCE?=?-1;
????private?int???????????????bufferSize????=?16?*?1024;
????private?int???????????????bufferMemUnit?=?1024;?????????????????????????//?memsize的單位撼嗓,默認(rèn)為1kb大小
????private?int???????????????indexMask;
????private?Event[]???????????entries;
????//?記錄下put/get/ack操作的三個(gè)下標(biāo)
????private?AtomicLong????????putSequence???=?new?AtomicLong(INIT_SQEUENCE);?//?代表當(dāng)前put操作最后一次寫(xiě)操作發(fā)生的位置
????private?AtomicLong????????getSequence???=?new?AtomicLong(INIT_SQEUENCE);?//?代表當(dāng)前get操作讀取的最后一條的位置
????private?AtomicLong????????ackSequence???=?new?AtomicLong(INIT_SQEUENCE);?//?代表當(dāng)前ack操作的最后一條的位置
????//?啟動(dòng)EventStore時(shí),創(chuàng)建指定大小的緩沖區(qū),Event數(shù)組的大小是16*1024
????//?也就是說(shuō)算個(gè)數(shù)的話(huà)且警,數(shù)組可以容納16000個(gè)事件粉捻。算內(nèi)存的話(huà),大小為16MB
????public?void?start()?throws?CanalStoreException?{
????????super.start();
????????indexMask?=?bufferSize?-?1;
????????entries?=?new?Event[bufferSize];
????}
????//?EventParser解析后斑芜,會(huì)放入內(nèi)存中(Event數(shù)組肩刃,緩沖區(qū))
????private?void?doPut(List<Event>?data)?{
????????long?current?=?putSequence.get();?//?取得當(dāng)前的位置,初始時(shí)為-1押搪,第一個(gè)元素為-1+1=0
????????long?end?=?current?+?data.size();?//?最末尾的位置树酪,假設(shè)Put了10條數(shù)據(jù),end=-1+10=9
????????//?先寫(xiě)數(shù)據(jù)大州,再更新對(duì)應(yīng)的cursor,并發(fā)度高的情況续语,putSequence會(huì)被get請(qǐng)求可見(jiàn),拿出了ringbuffer中的老的Entry值
????????for?(long?next?=?current?+?1;?next?<=?end;?next++)?{
????????????entries[getIndex(next)]?=?data.get((int)?(next?-?current?-?1));
????????}
????????putSequence.set(end);
????}?
}
???????? Put是生產(chǎn)數(shù)據(jù)厦画,Get是消費(fèi)數(shù)據(jù)疮茄,Get一定不會(huì)超過(guò)Put。比如Put了10條數(shù)據(jù)根暑,Get最多只能獲取到10條數(shù)據(jù)力试。但有時(shí)候?yàn)榱吮WCGet處理的速度,Put和Get并不會(huì)相等排嫌』眩可以把Put看做是生產(chǎn)者,Get看做是消費(fèi)者淳地。生產(chǎn)者速度可以很快怖糊,消費(fèi)者則可以慢慢地消費(fèi)。比如Put了1000條颇象,而Get我們只需要每次處理10條數(shù)據(jù)伍伤。
???????? 仍然以前面的示例來(lái)說(shuō)明Get的流程,初始時(shí)current=-1遣钳,假設(shè)Put了兩批數(shù)據(jù)一共15條扰魂,maxAbleSequence=14,而Get的BatchSize假設(shè)為10蕴茴。初始時(shí)next=current=-1劝评,end=-1。通過(guò)startPosition倦淀,會(huì)設(shè)置next=0付翁。最后end又被賦值為9,即循環(huán)緩沖區(qū)[0,9]一共10個(gè)元素晃听。
private?Events<Event>?doGet(Position?start,?int?batchSize)?throws?CanalStoreException?{
????LogPosition?startPosition?=?(LogPosition)?start;
????long?current?=?getSequence.get();
????long?maxAbleSequence?=?putSequence.get();
????long?next?=?current;
????long?end?=?current;
????//?如果startPosition為null百侧,說(shuō)明是第一次砰识,默認(rèn)+1處理
????if?(startPosition?==?null?||?!startPosition.getPostion().isIncluded())?{?//?第一次訂閱之后,需要包含一下start位置佣渴,防止丟失第一條記錄
????????next?=?next?+?1;
????}
????end?=?(next?+?batchSize?-?1)?<?maxAbleSequence???(next?+?batchSize?-?1)?:?maxAbleSequence;
????//?提取數(shù)據(jù)并返回
????for?(;?next?<=?end;?next++)?{
????????Event?event?=?entries[getIndex(next)];
????????if?(ddlIsolation?&&?isDdl(event.getEntry().getHeader().getEventType()))?{
????????????//?如果是ddl隔離辫狼,直接返回
????????????if?(entrys.size()?==?0)?{
????????????????entrys.add(event);//?如果沒(méi)有DML事件,加入當(dāng)前的DDL事件
????????????????end?=?next;?//?更新end為當(dāng)前
????????????}?else?{
????????????????//?如果之前已經(jīng)有DML事件辛润,直接返回了膨处,因?yàn)椴话?dāng)前next這記錄,需要回退一個(gè)位置
????????????????end?=?next?-?1;?//?next-1一定大于current砂竖,不需要判斷
????????????}
????????????break;
????????}?else?{
????????????entrys.add(event);
????????}
????}
????//?處理PositionRange真椿,然后設(shè)置getSequence為end
????getSequence.compareAndSet(current,?end)
}
ack操作的上限是Get,假設(shè)Put了15條數(shù)據(jù)乎澄,Get了10條數(shù)據(jù)突硝,最多也只能Ack10條數(shù)據(jù)。Ack的目的是清空緩沖區(qū)中已經(jīng)被Get過(guò)的數(shù)據(jù)
public?void?ack(Position?position)?throws?CanalStoreException?{
????cleanUntil(position);
}
public?void?cleanUntil(Position?position)?throws?CanalStoreException?{
????long?sequence?=?ackSequence.get();
????long?maxSequence?=?getSequence.get();
????boolean?hasMatch?=?false;
????long?memsize?=?0;
????for?(long?next?=?sequence?+?1;?next?<=?maxSequence;?next++)?{
????????Event?event?=?entries[getIndex(next)];
????????memsize?+=?calculateSize(event);
????????boolean?match?=?CanalEventUtils.checkPosition(event,?(LogPosition)?position);
????????if?(match)?{//?找到對(duì)應(yīng)的position置济,更新ack?seq
????????????hasMatch?=?true;
????????????if?(batchMode.isMemSize())?{
????????????????ackMemSize.addAndGet(memsize);
????????????????//?嘗試清空buffer中的內(nèi)存解恰,將ack之前的內(nèi)存全部釋放掉
????????????????for?(long?index?=?sequence?+?1;?index?<?next;?index++)?{
????????????????????entries[getIndex(index)]?=?null;//?設(shè)置為null
????????????????}
????????????}
????????????ackSequence.compareAndSet(sequence,?next)
????????}
????}
}
rollback回滾方法的實(shí)現(xiàn)則比較簡(jiǎn)單,將getSequence回退到ack位置浙于。
public?void?rollback()?throws?CanalStoreException?{
????getSequence.set(ackSequence.get());
????getMemSize.set(ackMemSize.get());
}
3.3 EventParser WorkFlow
EventStore負(fù)責(zé)存儲(chǔ)解析后的Binlog事件护盈,而解析動(dòng)作負(fù)責(zé)拉取Binlog,它的流程比較復(fù)雜羞酗。需要和MetaManager進(jìn)行交互腐宋。比如要記錄每次拉取的Position,這樣下一次就可以從上一次的最后一個(gè)位置繼續(xù)拉取檀轨。所以MetaManager應(yīng)該是有狀態(tài)的脏款。
EventParser的流程如下:
- Connection獲取上一次解析成功的位置 (如果第一次啟動(dòng),則獲取初始指定的位置或者是當(dāng)前數(shù)據(jù)庫(kù)的binlog位點(diǎn))
- Connection建立鏈接裤园,發(fā)送BINLOG_DUMP指令
- Mysql開(kāi)始推送Binaly Log
- 接收到的Binaly Log的通過(guò)Binlog parser進(jìn)行協(xié)議解析,補(bǔ)充一些特定信息
- 傳遞給EventSink模塊進(jìn)行數(shù)據(jù)存儲(chǔ)剂府,是一個(gè)阻塞操作拧揽,直到存儲(chǔ)成功
總結(jié)
???????? ?上述我們講了一些架構(gòu)和一些交互模式,和比較多原理腺占,做為一名優(yōu)秀的程序員不能只單純的會(huì)使用淤袜,而是多去了解他的思想和為什么這么寫(xiě),這樣你的代碼能力才一天比一天強(qiáng)衰伯。我在這里為大家提供大數(shù)據(jù)的資源
需要的朋友可以去下面GitHub去下載铡羡,信自己,努力和汗水總會(huì)能得到回報(bào)的意鲸。我是大數(shù)據(jù)老哥烦周,我們下期見(jiàn)~~~
資源獲取 獲取Flink面試題尽爆,Spark面試題,程序員必備軟件读慎,hive面試題漱贱,Hadoop面試題,Docker面試題夭委,簡(jiǎn)歷模板等資源請(qǐng)去?
GitHub自行下載 https://github.com/lhh2002/Framework-Of-BigData?
Gitee 自行下載 ?https://gitee.com/li_hey_hey/dashboard/projects?
實(shí)時(shí)數(shù)倉(cāng)代碼:https://github.com/lhh2002/Real_Time_Data_WareHouse