萬(wàn)字帶你深入阿里開(kāi)源的Canal工作原理


前言

???????? 上篇文章給大家講解了如何安裝一個(gè)Canal,以及講解了一部分的原理焚鲜,今天我們就來(lái)深度聊一聊Canal的工作流程,以及他是怎么工作的,以及架構(gòu)師怎樣的钮莲。?????????


? ? ? ? ? ? 首先我們深度了解Canal時(shí)必須深度了解了一下MySQL主從復(fù)制原理。

一序目、MySQL主從復(fù)制

  • MySQL master 將數(shù)據(jù)變更寫(xiě)入二進(jìn)制日志( binary log, 其中記錄叫做二進(jìn)制日志事件 ?log events臂痕,可以通過(guò) show binlog events 進(jìn)行查看)
  • MySQL slave 將 master 的 binary log events 拷貝到它的中繼日志(relay log)
  • MySQL slave 重放 relay log 中事件,將數(shù)據(jù)變更反映它自己的數(shù)據(jù)猿涨,以此來(lái)達(dá)到數(shù)據(jù)一致握童。

MySQL的binLog?????????

? ? ? ?它記錄了所有的DDL和DML(除了數(shù)據(jù)查詢(xún)語(yǔ)句)語(yǔ)句,以事件形式記錄叛赚,還包含語(yǔ)句所執(zhí)行的消耗的時(shí)間澡绩。主要用來(lái)備份和數(shù)據(jù)同步稽揭。binlog 有三種:STATEMENTROW肥卡、MIXED

  • STATEMENT 記錄的是執(zhí)行的sql語(yǔ)句
  • ROW 記錄的是真實(shí)的行數(shù)據(jù)記錄
  • MIXED 記錄的是1+2溪掀,優(yōu)先按照1的模式記錄

名詞解釋什么是中繼日志

???????? 從服務(wù)器I/O線程將主服務(wù)器的二進(jìn)制日志讀取過(guò)來(lái)記錄到從服務(wù)器本地文件,然后從服務(wù)器SQL線程會(huì)讀取relay-log日志的內(nèi)容并應(yīng)用到從服務(wù)器步鉴,從而使從服務(wù)器和主服務(wù)器的數(shù)據(jù)保持一致

二揪胃、Canal架構(gòu)

  • server 代表一個(gè) canal 運(yùn)行實(shí)例,對(duì)應(yīng)于一個(gè) jvm
  • instance 對(duì)應(yīng)于一個(gè)數(shù)據(jù)隊(duì)列 (1個(gè) canal server 對(duì)應(yīng) 1..n 個(gè) instance )
  • instance 下的子模塊
    • eventParser: 數(shù)據(jù)源接入氛琢,模擬 slave 協(xié)議和 master 進(jìn)行交互喊递,協(xié)議解析
    • eventSink: Parser 和 Store 鏈接器,進(jìn)行數(shù)據(jù)過(guò)濾阳似,加工骚勘,分發(fā)的工作
    • eventStore: 數(shù)據(jù)存儲(chǔ)
    • metaManager: 增量訂閱 & 消費(fèi)信息管理器
  • EventSink起到一個(gè)類(lèi)似channel的功能,可以對(duì)數(shù)據(jù)進(jìn)行過(guò)濾撮奏、分發(fā)/路由(1:n)俏讹、歸并(n:1)和加工。EventSink是連接EventParser和EventStore的橋梁畜吊。
  • EventStore實(shí)現(xiàn)模式是內(nèi)存模式泽疆,內(nèi)存結(jié)構(gòu)為環(huán)形隊(duì)列,由三個(gè)指針(Put定拟、Get和Ack)標(biāo)識(shí)數(shù)據(jù)存儲(chǔ)和讀取的位置于微。
  • MetaManager是增量訂閱&消費(fèi)信息管理器,增量訂閱和消費(fèi)之間的協(xié)議包括get/ack/rollback青自,分別為:
    • Message getWithoutAck(int batchSize)株依,允許指定batchSize,一次可以獲取多條延窜,每次返回的對(duì)象為Message恋腕,包含的內(nèi)容為:batch id[唯一標(biāo)識(shí)]和entries[具體的數(shù)據(jù)對(duì)象]
    • void rollback(long batchId),顧名思義逆瑞,回滾上次的get請(qǐng)求荠藤,重新獲取數(shù)據(jù)』窀撸基于get獲取的batchId進(jìn)行提交哈肖,避免誤操作
    • void ack(long batchId),顧名思議念秧,確認(rèn)已經(jīng)消費(fèi)成功淤井,通知server刪除數(shù)據(jù)。基于get獲取的batchId進(jìn)行提交币狠,避免誤操作

三游两、server/client交互協(xié)議

???????? canal client與canal server之間是C/S模式的通信,客戶(hù)端采用NIO漩绵,服務(wù)端采用Netty贱案。canal server啟動(dòng)后,如果沒(méi)有canal client止吐,那么canal server不會(huì)去mysql拉取binlog柒室。即Canal客戶(hù)端主動(dòng)發(fā)起拉取請(qǐng)求婿崭,服務(wù)端才會(huì)模擬一個(gè)MySQL Slave節(jié)點(diǎn)去主節(jié)點(diǎn)拉取binlog危队。通常Canal客戶(hù)端是一個(gè)死循環(huán)宙枷,這樣客戶(hù)端一直調(diào)用get方法,服務(wù)端也就會(huì)一直拉取binlog

BIO蕴忆、NIO、AIO的區(qū)別

IO的方式通常分為幾種悲幅,同步阻塞的BIO套鹅、同步非阻塞的NIO異步非阻塞的AIO汰具。

異步非阻塞IO:在此種模式下卓鹿,用戶(hù)進(jìn)程只需要發(fā)起一個(gè)IO操作然后立即返回,等IO操作真正的完成以后留荔,應(yīng)用程序會(huì)得到IO操作完成的通知吟孙,此時(shí)用戶(hù)進(jìn)程只需要對(duì)數(shù)據(jù)進(jìn)行處理就好了,不需要進(jìn)行實(shí)際的IO讀寫(xiě)操作聚蝶,因?yàn)檎嬲腎O讀取或者寫(xiě)入操作已經(jīng)由內(nèi)核完成了杰妓。目前Java中還沒(méi)有支持此種IO模型。

  1. handshake碘勉,
  2. ClientAuthentication巷挥。

canal client調(diào)用subscribe()方法,類(lèi)型為[subscription]验靡。對(duì)應(yīng)服務(wù)端采用netty處理RPC請(qǐng)求(CanalServerWithNetty):

public?class?CanalServerWithNetty?extends?AbstractCanalLifeCycle?implements?CanalServer?{
????public?void?start()?{
????????bootstrap.setPipelineFactory(new?ChannelPipelineFactory()?{
????????????public?ChannelPipeline?getPipeline()?throws?Exception?{
????????????????ChannelPipeline?pipelines?=?Channels.pipeline();
????????????????pipelines.addLast(FixedHeaderFrameDecoder.class.getName(),?new?FixedHeaderFrameDecoder());
????????????????//?處理客戶(hù)端的HANDSHAKE請(qǐng)求
????????????????pipelines.addLast(HandshakeInitializationHandler.class.getName(),
????????????????????new?HandshakeInitializationHandler(childGroups));
????????????????//?處理客戶(hù)端的CLIENTAUTHENTICATION請(qǐng)求
????????????????pipelines.addLast(ClientAuthenticationHandler.class.getName(),
????????????????????new?ClientAuthenticationHandler(embeddedServer));

????????????????//?處理客戶(hù)端的會(huì)話(huà)請(qǐng)求倍宾,包括SUBSCRIPTION,GET等
????????????????SessionHandler?sessionHandler?=?new?SessionHandler(embeddedServer);
????????????????pipelines.addLast(SessionHandler.class.getName(),?sessionHandler);
????????????????return?pipelines;
????????????}
????????});
????}
}

ClientAuthenticationHandler處理鑒權(quán)后胜嗓,會(huì)移除HandshakeInitializationHandler和ClientAuthenticationHandler高职。最重要的是會(huì)話(huà)處理器SessionHandler

以client發(fā)送GET辞州,server從mysql得到binlog后怔锌,返回MESSAGES給client為例,說(shuō)明client和server的rpc交互過(guò)程:

SimpleCanalConnector發(fā)送GET請(qǐng)求,并讀取響應(yīng)結(jié)果的流程:

public?Message?getWithoutAck(int?batchSize,?Long?timeout,?TimeUnit?unit)?throws?CanalClientException?{
????waitClientRunning();
????int?size?=?(batchSize?<=?0)???1000?:?batchSize;
????long?time?=?(timeout?==?null?||?timeout?<?0)???-1?:?timeout;?//?-1代表不做timeout控制
????if?(unit?==?null)?unit?=?TimeUnit.MILLISECONDS;??//默認(rèn)是毫秒

????//?client發(fā)送GET請(qǐng)求
????writeWithHeader(Packet.newBuilder()
????????.setType(PacketType.GET)
????????.setBody(Get.newBuilder()
????????????.setAutoAck(false)
????????????.setDestination(clientIdentity.getDestination())
????????????.setClientId(String.valueOf(clientIdentity.getClientId()))
????????????.setFetchSize(size)
????????????.setTimeout(time)
????????????.setUnit(unit.ordinal())
????????????.build()
????????????.toByteString())
????????.build()
????????.toByteArray());
????//?client獲取GET結(jié)果????
????return?receiveMessages();
}

private?Message?receiveMessages()?throws?IOException?{
????//?讀取server發(fā)送的數(shù)據(jù)包
????Packet?p?=?Packet.parseFrom(readNextPacket());
????switch?(p.getType())?{
????????case?MESSAGES:?{
????????????Messages?messages?=?Messages.parseFrom(p.getBody());
????????????Message?result?=?new?Message(messages.getBatchId());
????????????for?(ByteString?byteString?:?messages.getMessagesList())?{
????????????????result.addEntry(Entry.parseFrom(byteString));
????????????}
????????????return?result;
????????}
????}
}

服務(wù)端SessionHandler處理客戶(hù)端發(fā)送的GET請(qǐng)求流程:

case?GET:
????//?讀取客戶(hù)端發(fā)送的數(shù)據(jù)包产禾,封裝為Get對(duì)象
????Get?get?=?CanalPacket.Get.parseFrom(packet.getBody());
????//?destination表示canal?instance
????if?(StringUtils.isNotEmpty(get.getDestination())?&&?StringUtils.isNotEmpty(get.getClientId()))?{
????????clientIdentity?=?new?ClientIdentity(get.getDestination(),?Short.valueOf(get.getClientId()));
????????Message?message?=?null;
????????if?(get.getTimeout()?==?-1)?{//?是否是初始值
????????????message?=?embeddedServer.getWithoutAck(clientIdentity,?get.getFetchSize());
????????}?else?{
????????????TimeUnit?unit?=?convertTimeUnit(get.getUnit());
????????????message?=?embeddedServer.getWithoutAck(clientIdentity,?get.getFetchSize(),?get.getTimeout(),?unit);
????????}
????????//?設(shè)置返回給客戶(hù)端的數(shù)據(jù)包類(lèi)型為MESSAGES???
????????Packet.Builder?packetBuilder?=?CanalPacket.Packet.newBuilder();
????????packetBuilder.setType(PacketType.MESSAGES);
????????//?構(gòu)造Message
????????Messages.Builder?messageBuilder?=?CanalPacket.Messages.newBuilder();
????????messageBuilder.setBatchId(message.getId());
????????if?(message.getId()?!=?-1?&&?!CollectionUtils.isEmpty(message.getEntries()))?{
????????????for?(Entry?entry?:?message.getEntries())?{
????????????????messageBuilder.addMessages(entry.toByteString());
????????????}
????????}
????????packetBuilder.setBody(messageBuilder.build().toByteString());
????????//?輸出數(shù)據(jù)排作,返回給客戶(hù)端
????????NettyUtils.write(ctx.getChannel(),?packetBuilder.build().toByteArray(),?null);
????}

具體的網(wǎng)絡(luò)協(xié)議格式,可參見(jiàn):CanalProtocol.proto

get/ack/rollback協(xié)議介紹:

  • Message getWithoutAck(int batchSize)
    • batch id 唯一標(biāo)識(shí)
    • entries 具體的數(shù)據(jù)對(duì)象亚情,對(duì)應(yīng)的數(shù)據(jù)對(duì)象格式:EntryProtocol.proto
    • 允許指定batchSize妄痪,一次可以獲取多條,每次返回的對(duì)象為Message楞件,包含的內(nèi)容為:
  • getWithoutAck(int batchSize, Long timeout, TimeUnit unit)
    • 拿夠batchSize條記錄或者超過(guò)timeout時(shí)間
    • timeout=0衫生,阻塞等到足夠的batchSize
    • 相比于getWithoutAck(int batchSize),允許設(shè)定獲取數(shù)據(jù)的timeout超時(shí)時(shí)間
  • void rollback(long batchId)
    • 回滾上次的get請(qǐng)求土浸,重新獲取數(shù)據(jù)罪针。基于get獲取的batchId進(jìn)行提交黄伊,避免誤操作
  • void ack(long batchId)
    • 確認(rèn)已經(jīng)消費(fèi)成功泪酱,通知server刪除數(shù)據(jù)』棺睿基于get獲取的batchId進(jìn)行提交墓阀,避免誤操作

EntryProtocol.protod對(duì)應(yīng)的canal消息結(jié)構(gòu)如下:

Entry??
????Header??
????????logfileName?[binlog文件名]??
????????logfileOffset?[binlog?position]??
????????executeTime?[binlog里記錄變更發(fā)生的時(shí)間戳,精確到秒]??
????????schemaName???
????????tableName??
????????eventType?[insert/update/delete類(lèi)型]??
????entryType???[事務(wù)頭BEGIN/事務(wù)尾END/數(shù)據(jù)ROWDATA]??
????storeValue??[byte數(shù)據(jù),可展開(kāi),對(duì)應(yīng)的類(lèi)型為RowChange]??
??????
RowChange??
????isDdl???????[是否是ddl變更操作拓轻,比如create?table/drop?table]??
????sql?????????[具體的ddl?sql]??
????rowDatas????[具體insert/update/delete的變更數(shù)據(jù)斯撮,可為多條,1個(gè)binlog?event事件可對(duì)應(yīng)多條變更扶叉,比如批處理]??
????????beforeColumns?[Column類(lèi)型的數(shù)組勿锅,變更前的數(shù)據(jù)字段]??
????????afterColumns?[Column類(lèi)型的數(shù)組,變更后的數(shù)據(jù)字段]??
??????????
Column???
????index?????????
????sqlType?????[jdbc?type]??
????name????????[column?name]??
????isKey???????[是否為主鍵]??
????updated?????[是否發(fā)生過(guò)變更]??
????isNull??????[值是否為null]??
????value???????[具體的內(nèi)容枣氧,注意為string文本]

SessionHandler中服務(wù)端處理客戶(hù)端的其他類(lèi)型請(qǐng)求溢十,都會(huì)調(diào)用CanalServerWithEmbedded的相關(guān)方法:

case?SUBSCRIPTION:
????????Sub?sub?=?Sub.parseFrom(packet.getBody());
????????embeddedServer.subscribe(clientIdentity);
case?GET:
????????Get?get?=?CanalPacket.Get.parseFrom(packet.getBody());
????????message?=?embeddedServer.getWithoutAck(clientIdentity,?get.getFetchSize());
case?CLIENTACK:
????????ClientAck?ack?=?CanalPacket.ClientAck.parseFrom(packet.getBody());
????????embeddedServer.ack(clientIdentity,?ack.getBatchId());
case?CLIENTROLLBACK:
????????ClientRollback?rollback?=?CanalPacket.ClientRollback.parseFrom(packet.getBody());
????????embeddedServer.rollback(clientIdentity);//?回滾所有批次

所以真正的處理邏輯在CanalServerWithEmbedded中,下面重點(diǎn)來(lái)了作瞄。茶宵。。

3.1 CanalServerWithEmbedded

???????? CanalServer包含多個(gè)Instance宗挥,它的成員變量canalInstances記錄了instance名稱(chēng)與實(shí)例的映射關(guān)系乌庶。????????

? ? ? ? 因?yàn)槭且粋€(gè)Map,所以同一個(gè)Server不允許出現(xiàn)相同instance名稱(chēng)(本例中實(shí)例名稱(chēng)為example)契耿,比如不能同時(shí)有兩個(gè)example在一個(gè)server上瞒大。但是允許一個(gè)Server上有example1和example2。

注意:CanalServer中最重要的是CanalServerWithEmbedded搪桂,而CanalServerWithEmbedded中最重要的是CanalInstance透敌。

理解下各個(gè)組件的對(duì)應(yīng)關(guān)系:

  • Canal Client通過(guò)destination找出Canal Server中對(duì)應(yīng)的Canal Instance盯滚。
  • 一個(gè)Canal Server可以配置多個(gè)Canal Instances。

下面以CanalServerWithEmbedded的訂閱方法為例:

  1. 根據(jù)客戶(hù)端標(biāo)識(shí)獲取CanalInstance
  2. 向CanalInstance的元數(shù)據(jù)管理器訂閱當(dāng)前客戶(hù)端
  3. 從元數(shù)據(jù)管理中獲取客戶(hù)端的游標(biāo)
  4. 通知CanalInstance訂閱關(guān)系發(fā)生變化

注意:提供訂閱方法的作用是:MySQL新增了一張表酗电,客戶(hù)端原先沒(méi)有同步這張表魄藕,現(xiàn)在需要同步,所以需要重新訂閱撵术。

public?void?subscribe(ClientIdentity?clientIdentity)?throws?CanalServerException?{
???//?ClientIdentity表示Canal?Client客戶(hù)端背率,從中可以獲取出客戶(hù)端指定連接的Destination
???//?由于CanalServerWithEmbedded記錄了每個(gè)Destination對(duì)應(yīng)的Instance,可以獲取客戶(hù)端對(duì)應(yīng)的Instance
???CanalInstance?canalInstance?=?canalInstances.get(clientIdentity.getDestination());
???if?(!canalInstance.getMetaManager().isStart())?{
???????canalInstance.getMetaManager().start();?//?啟動(dòng)Instance的元數(shù)據(jù)管理器
???}
???canalInstance.getMetaManager().subscribe(clientIdentity);?//?執(zhí)行一下meta訂閱
???Position?position?=?canalInstance.getMetaManager().getCursor(clientIdentity);
???if?(position?==?null)?{
???????position?=?canalInstance.getEventStore().getFirstPosition();//?獲取一下store中的第一條
???????if?(position?!=?null)?{
???????????canalInstance.getMetaManager().updateCursor(clientIdentity,?position);?//?更新一下cursor
???????}
???}
???//?通知下訂閱關(guān)系變化
???canalInstance.subscribeChange(clientIdentity);
}

每個(gè)CanalInstance中包括了四個(gè)組件:EventParser嫩与、EventSink寝姿、EventStore、MetaManager划滋。

服務(wù)端主要的處理方法包括get/ack/rollback饵筑,這三個(gè)方法都會(huì)用到Instance上面的幾個(gè)內(nèi)部組件,主要還是EventStore和MetaManager:

在這之前处坪,要先理解EventStore的含義根资,EventStore是一個(gè)RingBuffer,有三個(gè)指針:Put同窘、Get嫂冻、Ack

  • Put: Canal Server從MySQL拉取到數(shù)據(jù)后塞椎,放到內(nèi)存中,Put增加
  • Get: 消費(fèi)者(Canal Client)從內(nèi)存中消費(fèi)數(shù)據(jù)睛低,Get增加
  • Ack: 消費(fèi)者消費(fèi)完成案狠,Ack增加。并且會(huì)刪除Put中已經(jīng)被Ack的數(shù)據(jù)
  • 如果timeout為null钱雷,則采用tryGet方式骂铁,即時(shí)獲取
  • 如果timeout不為null
  1. timeout為0,則采用get阻塞方式罩抗,獲取數(shù)據(jù)拉庵,不設(shè)置超時(shí),直到有足夠的batchSize數(shù)據(jù)才返回
  2. timeout不為0套蒂,則采用get+timeout方式钞支,獲取數(shù)據(jù),超時(shí)還沒(méi)有batchSize足夠的數(shù)據(jù)操刀,有多少返回多少
private?Events<Event>?getEvents(CanalEventStore?eventStore,?Position?start,?int?batchSize,?Long?timeout,????????????????????????????????TimeUnit?unit)?{
????if?(timeout?==?null)?{
????????return?eventStore.tryGet(start,?batchSize);?//?即時(shí)獲取
????}?else?if?(timeout?<=?0){
????????return?eventStore.get(start,?batchSize);?//?阻塞獲取
????}?else?{
????????return?eventStore.get(start,?batchSize,?timeout,?unit);?//?異步獲取
????}
}

注意:EventStore的實(shí)現(xiàn)采用了類(lèi)似Disruptor的RingBuffer環(huán)形緩沖區(qū)烁挟。RingBuffer的實(shí)現(xiàn)類(lèi)是MemoryEventStoreWithBuffer

get方法和getWithoutAck方法的區(qū)別是:

  • get方法會(huì)立即調(diào)用ack
  • getWithoutAck方法不會(huì)調(diào)用ack

3.2 ?EventStore

下面是Put填充環(huán)形緩沖區(qū)的代碼,檢查可用slot(checkFreeSlotAt方法)在幾個(gè)put方法中骨坑。

public?class?MemoryEventStoreWithBuffer?extends?AbstractCanalStoreScavenge?implements?CanalEventStore<Event>,?CanalStoreScavenge?{
????private?static?final?long?INIT_SQEUENCE?=?-1;
????private?int???????????????bufferSize????=?16?*?1024;
????private?int???????????????bufferMemUnit?=?1024;?????????????????????????//?memsize的單位撼嗓,默認(rèn)為1kb大小
????private?int???????????????indexMask;
????private?Event[]???????????entries;

????//?記錄下put/get/ack操作的三個(gè)下標(biāo)
????private?AtomicLong????????putSequence???=?new?AtomicLong(INIT_SQEUENCE);?//?代表當(dāng)前put操作最后一次寫(xiě)操作發(fā)生的位置
????private?AtomicLong????????getSequence???=?new?AtomicLong(INIT_SQEUENCE);?//?代表當(dāng)前get操作讀取的最后一條的位置
????private?AtomicLong????????ackSequence???=?new?AtomicLong(INIT_SQEUENCE);?//?代表當(dāng)前ack操作的最后一條的位置

????//?啟動(dòng)EventStore時(shí),創(chuàng)建指定大小的緩沖區(qū),Event數(shù)組的大小是16*1024
????//?也就是說(shuō)算個(gè)數(shù)的話(huà)且警,數(shù)組可以容納16000個(gè)事件粉捻。算內(nèi)存的話(huà),大小為16MB
????public?void?start()?throws?CanalStoreException?{
????????super.start();
????????indexMask?=?bufferSize?-?1;
????????entries?=?new?Event[bufferSize];
????}

????//?EventParser解析后斑芜,會(huì)放入內(nèi)存中(Event數(shù)組肩刃,緩沖區(qū))
????private?void?doPut(List<Event>?data)?{
????????long?current?=?putSequence.get();?//?取得當(dāng)前的位置,初始時(shí)為-1押搪,第一個(gè)元素為-1+1=0
????????long?end?=?current?+?data.size();?//?最末尾的位置树酪,假設(shè)Put了10條數(shù)據(jù),end=-1+10=9
????????//?先寫(xiě)數(shù)據(jù)大州,再更新對(duì)應(yīng)的cursor,并發(fā)度高的情況续语,putSequence會(huì)被get請(qǐng)求可見(jiàn),拿出了ringbuffer中的老的Entry值
????????for?(long?next?=?current?+?1;?next?<=?end;?next++)?{
????????????entries[getIndex(next)]?=?data.get((int)?(next?-?current?-?1));
????????}
????????putSequence.set(end);
????}?
}

???????? Put是生產(chǎn)數(shù)據(jù)厦画,Get是消費(fèi)數(shù)據(jù)疮茄,Get一定不會(huì)超過(guò)Put。比如Put了10條數(shù)據(jù)根暑,Get最多只能獲取到10條數(shù)據(jù)力试。但有時(shí)候?yàn)榱吮WCGet處理的速度,Put和Get并不會(huì)相等排嫌』眩可以把Put看做是生產(chǎn)者,Get看做是消費(fèi)者淳地。生產(chǎn)者速度可以很快怖糊,消費(fèi)者則可以慢慢地消費(fèi)。比如Put了1000條颇象,而Get我們只需要每次處理10條數(shù)據(jù)伍伤。

???????? 仍然以前面的示例來(lái)說(shuō)明Get的流程,初始時(shí)current=-1遣钳,假設(shè)Put了兩批數(shù)據(jù)一共15條扰魂,maxAbleSequence=14,而Get的BatchSize假設(shè)為10蕴茴。初始時(shí)next=current=-1劝评,end=-1。通過(guò)startPosition倦淀,會(huì)設(shè)置next=0付翁。最后end又被賦值為9,即循環(huán)緩沖區(qū)[0,9]一共10個(gè)元素晃听。

private?Events<Event>?doGet(Position?start,?int?batchSize)?throws?CanalStoreException?{
????LogPosition?startPosition?=?(LogPosition)?start;

????long?current?=?getSequence.get();
????long?maxAbleSequence?=?putSequence.get();
????long?next?=?current;
????long?end?=?current;
????//?如果startPosition為null百侧,說(shuō)明是第一次砰识,默認(rèn)+1處理
????if?(startPosition?==?null?||?!startPosition.getPostion().isIncluded())?{?//?第一次訂閱之后,需要包含一下start位置佣渴,防止丟失第一條記錄
????????next?=?next?+?1;
????}

????end?=?(next?+?batchSize?-?1)?<?maxAbleSequence???(next?+?batchSize?-?1)?:?maxAbleSequence;
????//?提取數(shù)據(jù)并返回
????for?(;?next?<=?end;?next++)?{
????????Event?event?=?entries[getIndex(next)];
????????if?(ddlIsolation?&&?isDdl(event.getEntry().getHeader().getEventType()))?{
????????????//?如果是ddl隔離辫狼,直接返回
????????????if?(entrys.size()?==?0)?{
????????????????entrys.add(event);//?如果沒(méi)有DML事件,加入當(dāng)前的DDL事件
????????????????end?=?next;?//?更新end為當(dāng)前
????????????}?else?{
????????????????//?如果之前已經(jīng)有DML事件辛润,直接返回了膨处,因?yàn)椴话?dāng)前next這記錄,需要回退一個(gè)位置
????????????????end?=?next?-?1;?//?next-1一定大于current砂竖,不需要判斷
????????????}
????????????break;
????????}?else?{
????????????entrys.add(event);
????????}
????}
????//?處理PositionRange真椿,然后設(shè)置getSequence為end
????getSequence.compareAndSet(current,?end)
}

ack操作的上限是Get,假設(shè)Put了15條數(shù)據(jù)乎澄,Get了10條數(shù)據(jù)突硝,最多也只能Ack10條數(shù)據(jù)。Ack的目的是清空緩沖區(qū)中已經(jīng)被Get過(guò)的數(shù)據(jù)

public?void?ack(Position?position)?throws?CanalStoreException?{
????cleanUntil(position);
}

public?void?cleanUntil(Position?position)?throws?CanalStoreException?{
????long?sequence?=?ackSequence.get();
????long?maxSequence?=?getSequence.get();

????boolean?hasMatch?=?false;
????long?memsize?=?0;
????for?(long?next?=?sequence?+?1;?next?<=?maxSequence;?next++)?{
????????Event?event?=?entries[getIndex(next)];
????????memsize?+=?calculateSize(event);
????????boolean?match?=?CanalEventUtils.checkPosition(event,?(LogPosition)?position);
????????if?(match)?{//?找到對(duì)應(yīng)的position置济,更新ack?seq
????????????hasMatch?=?true;

????????????if?(batchMode.isMemSize())?{
????????????????ackMemSize.addAndGet(memsize);
????????????????//?嘗試清空buffer中的內(nèi)存解恰,將ack之前的內(nèi)存全部釋放掉
????????????????for?(long?index?=?sequence?+?1;?index?<?next;?index++)?{
????????????????????entries[getIndex(index)]?=?null;//?設(shè)置為null
????????????????}
????????????}

????????????ackSequence.compareAndSet(sequence,?next)
????????}
????}
}

rollback回滾方法的實(shí)現(xiàn)則比較簡(jiǎn)單,將getSequence回退到ack位置浙于。

public?void?rollback()?throws?CanalStoreException?{
????getSequence.set(ackSequence.get());
????getMemSize.set(ackMemSize.get());
}

3.3 EventParser WorkFlow

EventStore負(fù)責(zé)存儲(chǔ)解析后的Binlog事件护盈,而解析動(dòng)作負(fù)責(zé)拉取Binlog,它的流程比較復(fù)雜羞酗。需要和MetaManager進(jìn)行交互腐宋。比如要記錄每次拉取的Position,這樣下一次就可以從上一次的最后一個(gè)位置繼續(xù)拉取檀轨。所以MetaManager應(yīng)該是有狀態(tài)的脏款。

EventParser的流程如下:

  1. Connection獲取上一次解析成功的位置 (如果第一次啟動(dòng),則獲取初始指定的位置或者是當(dāng)前數(shù)據(jù)庫(kù)的binlog位點(diǎn))
  2. Connection建立鏈接裤园,發(fā)送BINLOG_DUMP指令
  3. Mysql開(kāi)始推送Binaly Log
  4. 接收到的Binaly Log的通過(guò)Binlog parser進(jìn)行協(xié)議解析,補(bǔ)充一些特定信息
  5. 傳遞給EventSink模塊進(jìn)行數(shù)據(jù)存儲(chǔ)剂府,是一個(gè)阻塞操作拧揽,直到存儲(chǔ)成功

總結(jié)

???????? ?上述我們講了一些架構(gòu)和一些交互模式,和比較多原理腺占,做為一名優(yōu)秀的程序員不能只單純的會(huì)使用淤袜,而是多去了解他的思想和為什么這么寫(xiě),這樣你的代碼能力才一天比一天強(qiáng)衰伯。我在這里為大家提供大數(shù)據(jù)的資源需要的朋友可以去下面GitHub去下載铡羡,信自己,努力和汗水總會(huì)能得到回報(bào)的意鲸。我是大數(shù)據(jù)老哥烦周,我們下期見(jiàn)~~~

資源獲取 獲取Flink面試題尽爆,Spark面試題,程序員必備軟件读慎,hive面試題漱贱,Hadoop面試題,Docker面試題夭委,簡(jiǎn)歷模板等資源請(qǐng)去?

GitHub自行下載 https://github.com/lhh2002/Framework-Of-BigData?

Gitee 自行下載 ?https://gitee.com/li_hey_hey/dashboard/projects?

實(shí)時(shí)數(shù)倉(cāng)代碼:https://github.com/lhh2002/Real_Time_Data_WareHouse


?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末幅狮,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子株灸,更是在濱河造成了極大的恐慌崇摄,老刑警劉巖,帶你破解...
    沈念sama閱讀 210,978評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件慌烧,死亡現(xiàn)場(chǎng)離奇詭異逐抑,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)杏死,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,954評(píng)論 2 384
  • 文/潘曉璐 我一進(jìn)店門(mén)泵肄,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人淑翼,你說(shuō)我怎么就攤上這事腐巢。” “怎么了玄括?”我有些...
    開(kāi)封第一講書(shū)人閱讀 156,623評(píng)論 0 345
  • 文/不壞的土叔 我叫張陵冯丙,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我遭京,道長(zhǎng)胃惜,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,324評(píng)論 1 282
  • 正文 為了忘掉前任哪雕,我火速辦了婚禮船殉,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘斯嚎。我一直安慰自己利虫,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,390評(píng)論 5 384
  • 文/花漫 我一把揭開(kāi)白布堡僻。 她就那樣靜靜地躺著糠惫,像睡著了一般。 火紅的嫁衣襯著肌膚如雪钉疫。 梳的紋絲不亂的頭發(fā)上硼讽,一...
    開(kāi)封第一講書(shū)人閱讀 49,741評(píng)論 1 289
  • 那天,我揣著相機(jī)與錄音牲阁,去河邊找鬼固阁。 笑死壤躲,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的您炉。 我是一名探鬼主播柒爵,決...
    沈念sama閱讀 38,892評(píng)論 3 405
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼赚爵!你這毒婦竟也來(lái)了棉胀?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 37,655評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤冀膝,失蹤者是張志新(化名)和其女友劉穎唁奢,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體窝剖,經(jīng)...
    沈念sama閱讀 44,104評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡麻掸,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,451評(píng)論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了赐纱。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片脊奋。...
    茶點(diǎn)故事閱讀 38,569評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖疙描,靈堂內(nèi)的尸體忽然破棺而出诚隙,到底是詐尸還是另有隱情,我是刑警寧澤起胰,帶...
    沈念sama閱讀 34,254評(píng)論 4 328
  • 正文 年R本政府宣布久又,位于F島的核電站,受9級(jí)特大地震影響效五,放射性物質(zhì)發(fā)生泄漏地消。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,834評(píng)論 3 312
  • 文/蒙蒙 一畏妖、第九天 我趴在偏房一處隱蔽的房頂上張望脉执。 院中可真熱鬧,春花似錦戒劫、人聲如沸半夷。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,725評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至否彩,卻和暖如春疯攒,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背列荔。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,950評(píng)論 1 264
  • 我被黑心中介騙來(lái)泰國(guó)打工敬尺, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留枚尼,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,260評(píng)論 2 360
  • 正文 我出身青樓砂吞,卻偏偏與公主長(zhǎng)得像署恍,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子蜻直,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,446評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容