【Canal源碼分析】client工作過程

client的工作過程缕棵,需要我們自己去編寫對應的邏輯性锭,我們目前只能從example寫的例子來看赠潦。目前examle中提供了兩個例子,一個是單機的草冈,一個是集群的cluster她奥,我們后續(xù)如果需要進行開發(fā)的話,其實也是開發(fā)我們自己的client怎棱,以及client的一些邏輯哩俭。我們主要看下集群的client是如何實現(xiàn)和消費的,又是怎么和server進行數(shù)據(jù)交互的蹄殃。

我們來看看具體的代碼:

protected void process() {
    int batchSize = 5 * 1024;
    while (running) {
        try {
            MDC.put("destination", destination);
            connector.connect();
            connector.subscribe();
            waiting = false;
            while (running) {
                Message message = connector.getWithoutAck(batchSize); // 獲取指定數(shù)量的數(shù)據(jù)
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    // try {
                    // Thread.sleep(1000);
                    // } catch (InterruptedException e) {
                    // }
                } else {
                    printSummary(message, batchId, size);
                    printEntry(message.getEntries());
                }

                connector.ack(batchId); // 提交確認
                // connector.rollback(batchId); // 處理失敗, 回滾數(shù)據(jù)
            }
        } catch (Exception e) {
            logger.error("process error!", e);
        } finally {
            connector.disconnect();
            MDC.remove("destination");
        }
    }
}

這個的這樣的過程是這樣的

  • 連接携茂,connector.connect()
  • 訂閱,connector.subscribe
  • 獲取數(shù)據(jù)诅岩,connector.getWithoutAck()
  • 業(yè)務處理
  • 提交確認,connector.ack()
  • 回滾,connector.rollback()
  • 斷開連接带膜,connector.disconnect()

我們具體來看下吩谦。

一、建立連接

CanalConnector主要有兩個實現(xiàn)膝藕,一個是SimpleCanalConnector式廷,一個是ClusterCanalConnector,我們主要看下ClusterCanalConnector芭挽,這也是我們要用的一個模式滑废。

我們用的時候,通過一個工廠類生成我們需要的Connector袜爪,這里的工廠類是CanalConnectors棋凳,里面包含了生成ClusterCanalConnector的方法皂冰。

public static CanalConnector newClusterConnector(String zkServers, String destination, String username,
                                                 String password) {
    ClusterCanalConnector canalConnector = new ClusterCanalConnector(username,
        password,
        destination,
        new ClusterNodeAccessStrategy(destination, ZkClientx.getZkClient(zkServers)));
    canalConnector.setSoTimeout(30 * 1000);
    return canalConnector;
}

用到的參數(shù)有zk的地址,canal的名稱,數(shù)據(jù)庫的賬號密碼骄噪。里面有個ClusterNodeAccessStrategy是用來選擇client的策略丐重,這個ClusterNodeAccessStrategy的構(gòu)造方法里面有些東西需要我們關(guān)注下。

1.1 ClusterNodeAccessStrategy

public ClusterNodeAccessStrategy(String destination, ZkClientx zkClient){
    this.zkClient = zkClient;
    childListener = new IZkChildListener() {

        public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
            initClusters(currentChilds);
        }

    };

    dataListener = new IZkDataListener() {

        public void handleDataDeleted(String dataPath) throws Exception {
            runningAddress = null;
        }

        public void handleDataChange(String dataPath, Object data) throws Exception {
            initRunning(data);
        }

    };

    String clusterPath = ZookeeperPathUtils.getDestinationClusterRoot(destination);
    this.zkClient.subscribeChildChanges(clusterPath, childListener);
    initClusters(this.zkClient.getChildren(clusterPath));

    String runningPath = ZookeeperPathUtils.getDestinationServerRunning(destination);
    this.zkClient.subscribeDataChanges(runningPath, dataListener);
    initRunning(this.zkClient.readData(runningPath, true));
}

這邊起了兩個監(jiān)聽器,都是監(jiān)聽server端的活動服務器的诱咏。一個是獲取所有的server列表,一個是獲取活動的server服務器缴挖,都是從zk的對應節(jié)點上去取的袋狞。

1.2 連接connect

獲取到CanalConnector之后,就是真正的連接了映屋。在ClusterCanalConnector中苟鸯,我們可以看到,其實他底層用的也是SimpleCanalConnector秧荆,只不過加了一個選擇的策略倔毙。

public void connect() throws CanalClientException {
    if (connected) {
        return;
    }

    if (runningMonitor != null) {
        if (!runningMonitor.isStart()) {
            runningMonitor.start();
        }
    } else {
        waitClientRunning();
        if (!running) {
            return;
        }
        doConnect();
        if (filter != null) { // 如果存在條件,說明是自動切換乙濒,基于上一次的條件訂閱一次
            subscribe(filter);
        }
        if (rollbackOnConnect) {
            rollback();
        }
    }

    connected = true;
}

如果是集群模式的客戶端陕赃,那么這邊的runningMonitor不為空,因為他進行了初始化颁股。我們主要看下runningMonitor.start()里面的操作么库。

public void start() {
    super.start();

    String path = ZookeeperPathUtils.getDestinationClientRunning(this.destination, clientData.getClientId());
    zkClient.subscribeDataChanges(path, dataListener);
    initRunning();
}

這邊監(jiān)聽的路徑是:/otter/canal/destinations/{destination}/{clientId}/running。如果有任何的變化甘有,或節(jié)點的刪除诉儒,那么執(zhí)行dataListener里面的操作。

dataListener = new IZkDataListener() {

    public void handleDataChange(String dataPath, Object data) throws Exception {
        MDC.put("destination", destination);
        ClientRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ClientRunningData.class);
        if (!isMine(runningData.getAddress())) {
            mutex.set(false);
        }

        if (!runningData.isActive() && isMine(runningData.getAddress())) { // 說明出現(xiàn)了主動釋放的操作亏掀,并且本機之前是active
            release = true;
            releaseRunning();// 徹底釋放mainstem
        }

        activeData = (ClientRunningData) runningData;
    }

    public void handleDataDeleted(String dataPath) throws Exception {
        MDC.put("destination", destination);
        mutex.set(false);
        // 觸發(fā)一下退出,可能是人為干預的釋放操作或者網(wǎng)絡閃斷引起的session expired timeout
        processActiveExit();
        if (!release && activeData != null && isMine(activeData.getAddress())) {
            // 如果上一次active的狀態(tài)就是本機忱反,則即時觸發(fā)一下active搶占
            initRunning();
        } else {
            // 否則就是等待delayTime,避免因網(wǎng)絡瞬端或者zk異常滤愕,導致出現(xiàn)頻繁的切換操作
            delayExector.schedule(new Runnable() {

                public void run() {
                    initRunning();
                }
            }, delayTime, TimeUnit.SECONDS);
        }
    }

};

這里的注釋比較清楚温算,基本上如果數(shù)據(jù)發(fā)生了變化,那么進行節(jié)點釋放后间影,將運行節(jié)點置為活動節(jié)點注竿。如果發(fā)生了數(shù)據(jù)刪除,那么直接觸發(fā)退出魂贬,如果上一次的active狀態(tài)是本機巩割,那么觸發(fā)一下active搶占,否則等待delayTime付燥,默認5s后重試宣谈。下面我們主要看下initRunning。

1.3 initRunning

這塊主要是創(chuàng)建運行節(jié)點的臨時節(jié)點机蔗。節(jié)點路徑是/otter/canal/destinations/{destination}/{clientId}蒲祈,節(jié)點內(nèi)容是ClientRunningData的json序列化結(jié)果甘萧。連接的代碼:

public InetSocketAddress processActiveEnter() {
    InetSocketAddress address = doConnect();
    mutex.set(true);
    if (filter != null) { // 如果存在條件,說明是自動切換梆掸,基于上一次的條件訂閱一次
        subscribe(filter);
    }

    if (rollbackOnConnect) {
        rollback();
    }

    return address;
}

這塊有幾段邏輯扬卷,我們慢慢看下。

1.3.1 doConnect

這里是client直接連上了server酸钦,通過socket連接怪得,也就是server暴露的socket端口。

private InetSocketAddress doConnect() throws CanalClientException {
    try {
        channel = SocketChannel.open();
        channel.socket().setSoTimeout(soTimeout);
        SocketAddress address = getAddress();
        if (address == null) {
            address = getNextAddress();
        }
        channel.connect(address);
        readableChannel = Channels.newChannel(channel.socket().getInputStream());
        writableChannel = Channels.newChannel(channel.socket().getOutputStream());
        Packet p = Packet.parseFrom(readNextPacket());
        if (p.getVersion() != 1) {
            throw new CanalClientException("unsupported version at this client.");
        }

        if (p.getType() != PacketType.HANDSHAKE) {
            throw new CanalClientException("expect handshake but found other type.");
        }
        //
        Handshake handshake = Handshake.parseFrom(p.getBody());
        supportedCompressions.addAll(handshake.getSupportedCompressionsList());
        //
        ClientAuth ca = ClientAuth.newBuilder()
            .setUsername(username != null ? username : "")
            .setPassword(ByteString.copyFromUtf8(password != null ? password : ""))
            .setNetReadTimeout(soTimeout)
            .setNetWriteTimeout(soTimeout)
            .build();
        writeWithHeader(Packet.newBuilder()
            .setType(PacketType.CLIENTAUTHENTICATION)
            .setBody(ca.toByteString())
            .build()
            .toByteArray());
        //
        Packet ack = Packet.parseFrom(readNextPacket());
        if (ack.getType() != PacketType.ACK) {
            throw new CanalClientException("unexpected packet type when ack is expected");
        }

        Ack ackBody = Ack.parseFrom(ack.getBody());
        if (ackBody.getErrorCode() > 0) {
            throw new CanalClientException("something goes wrong when doing authentication: "
                                       + ackBody.getErrorMessage());
        }

        connected = true;
        return new InetSocketAddress(channel.socket().getLocalAddress(), channel.socket().getLocalPort());
    } catch (IOException e) {
        throw new CanalClientException(e);
    }
}

這邊采用NIO編程卑硫,建立和server的socket連接后徒恋,發(fā)送了握手包和認證包,當收到ack包后欢伏,認為連接成功入挣。認證包的服務端處理在ClientAuthenticationHandler類中,握手處理在HandshakeInitializationHandler類硝拧。

server接收到認證的消息后径筏,會做如下的處理:

public void messageReceived(final ChannelHandlerContext ctx, MessageEvent e) throws Exception {
    ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
    final Packet packet = Packet.parseFrom(buffer.readBytes(buffer.readableBytes()).array());
    switch (packet.getVersion()) {
        case SUPPORTED_VERSION:
        default:
            final ClientAuth clientAuth = ClientAuth.parseFrom(packet.getBody());
            // 如果存在訂閱信息
            if (StringUtils.isNotEmpty(clientAuth.getDestination())
                && StringUtils.isNotEmpty(clientAuth.getClientId())) {
                ClientIdentity clientIdentity = new ClientIdentity(clientAuth.getDestination(),
                    Short.valueOf(clientAuth.getClientId()),
                    clientAuth.getFilter());
                try {
                    MDC.put("destination", clientIdentity.getDestination());
                    embeddedServer.subscribe(clientIdentity);
                    ctx.setAttachment(clientIdentity);// 設置狀態(tài)數(shù)據(jù)
                    // 嘗試啟動,如果已經(jīng)啟動障陶,忽略
                    if (!embeddedServer.isStart(clientIdentity.getDestination())) {
                        ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(clientIdentity.getDestination());
                        if (!runningMonitor.isStart()) {
                            runningMonitor.start();
                        }
                    }
                } finally {
                    MDC.remove("destination");
                }
            }

            NettyUtils.ack(ctx.getChannel(), new ChannelFutureListener() {

                public void operationComplete(ChannelFuture future) throws Exception {
                    //忽略
                }

            });
            break;
    }
}

主要的邏輯在subscribe里面滋恬。如果metaManager沒有啟動,那么需要進行啟動抱究。啟動時恢氯,會從zk節(jié)點下面拉取一些數(shù)據(jù),包括客戶端的消費位點情況等等鼓寺。然后就是訂閱勋拟,訂閱是新建一個zk節(jié)點,路徑為/otter/canal/destinations/{destination}/{clientId}妈候。然后還有一些過濾器指黎,也需要寫到zk中。之后就是獲取一下本client的位點信息州丹,如果原來zk中包含,那么直接從內(nèi)存中獲取杂彭,否則取eventStore的第一條數(shù)據(jù)墓毒。

1.3.2 subscribe

發(fā)送訂閱消息給server,通過socket的方式亲怠。這邊是判斷所计,如果filter不為空,才發(fā)送訂閱消息团秽。服務端的處理過程是這樣的:

case SUBSCRIPTION:
    Sub sub = Sub.parseFrom(packet.getBody());
    if (StringUtils.isNotEmpty(sub.getDestination()) && StringUtils.isNotEmpty(sub.getClientId())) {
        clientIdentity = new ClientIdentity(sub.getDestination(),
                            Short.valueOf(sub.getClientId()),
                            sub.getFilter());
        MDC.put("destination", clientIdentity.getDestination());

        // 嘗試啟動主胧,如果已經(jīng)啟動叭首,忽略
        if (!embeddedServer.isStart(clientIdentity.getDestination())) {
            ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(clientIdentity.getDestination());
            if (!runningMonitor.isStart()) {
                runningMonitor.start();
            }
        }

        embeddedServer.subscribe(clientIdentity);
        ctx.setAttachment(clientIdentity);// 設置狀態(tài)數(shù)據(jù)
        NettyUtils.ack(ctx.getChannel(), null);
    } else {
        NettyUtils.error(401,
            MessageFormatter.format("destination or clientId is null", sub.toString()).getMessage(),
            ctx.getChannel(),
            null);
}
break;

類似于connect的過程,不過這邊帶上了filter的參數(shù)踪栋。這邊啟動了server以及他的監(jiān)聽器焙格。

1.3.3 rollback

這里的回滾是指回滾server端記錄的本client的位點信息。

public void rollback() throws CanalClientException {
    waitClientRunning();
    rollback(0);// 0代筆未設置
}

這里發(fā)送了rollback的指令夷都。服務端是這么處理的:

case CLIENTROLLBACK:
    ClientRollback rollback = CanalPacket.ClientRollback.parseFrom(packet.getBody());
    MDC.put("destination", rollback.getDestination());
    if (StringUtils.isNotEmpty(rollback.getDestination())
        && StringUtils.isNotEmpty(rollback.getClientId())) {
        clientIdentity = new ClientIdentity(rollback.getDestination(),
            Short.valueOf(rollback.getClientId()));
        if (rollback.getBatchId() == 0L) {
            embeddedServer.rollback(clientIdentity);// 回滾所有批次
        } else {
            embeddedServer.rollback(clientIdentity, rollback.getBatchId()); // 只回滾單個批次
        }
    } else {
        NettyUtils.error(401,
            MessageFormatter.format("destination or clientId is null", rollback.toString())
                .getMessage(),
            ctx.getChannel(),
            null);
    }
    break;

這里的batchId傳入的是0眷唉,也就是要回滾所有的批次。我們來看下這個回滾的動作:

@Override
public void rollback(ClientIdentity clientIdentity) throws CanalServerException {
    checkStart(clientIdentity.getDestination());
    CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
    // 因為存在第一次鏈接時自動rollback的情況囤官,所以需要忽略未訂閱
    boolean hasSubscribe = canalInstance.getMetaManager().hasSubscribe(clientIdentity);
    if (!hasSubscribe) {
        return;
    }

    synchronized (canalInstance) {
        // 清除batch信息
        canalInstance.getMetaManager().clearAllBatchs(clientIdentity);
        // rollback eventStore中的狀態(tài)信息
        canalInstance.getEventStore().rollback();
        logger.info("rollback successfully, clientId:{}", new Object[] { clientIdentity.getClientId() });
    }
}

這里回滾的冬阳,其實是eventStore中的指針,把get的指針設置為之前ack的指針党饮。

二肝陪、訂閱數(shù)據(jù)

當client連接server完成后,就需要進行binlog數(shù)據(jù)的訂閱刑顺。

public void subscribe() throws CanalClientException {
    subscribe(""); // 傳遞空字符即可
}

public void subscribe(String filter) throws CanalClientException {
    int times = 0;
    while (times < retryTimes) {
        try {
            currentConnector.subscribe(filter);
            this.filter = filter;
            return;
        } catch (Throwable t) {
            if (retryTimes == -1 && t.getCause() instanceof InterruptedException) {
                logger.info("block waiting interrupted by other thread.");
                return;
            } else {
                logger.warn(String.format(
                        "something goes wrong when subscribing from server: %s",
                        currentConnector != null ? currentConnector.getAddress() : "null"),
                        t);
                times++;
                restart();
                logger.info("restart the connector for next round retry.");
            }

        }
    }

    throw new CanalClientException("failed to subscribe after " + times + " times retry.");
}

訂閱這塊的內(nèi)容不再贅述氯窍,在上面的connect過程中有提到。這邊還有一個失敗重試的機制捏检,當異常不是中斷異常的情況下荞驴,會重試重啟client connector,直到達到了閾值retryTimes贯城。

三熊楼、獲取數(shù)據(jù)

在建立連接和進行數(shù)據(jù)訂閱之后,就可以開始進行binlog數(shù)據(jù)的獲取了能犯。主要的方法是getWithOutAck這個方法鲫骗,這種是需要client自己進行數(shù)據(jù)ack的,保證了只有數(shù)據(jù)真正的被消費踩晶,而且進行了業(yè)務邏輯處理之后执泰,才會ack。當然渡蜻,如果有了異常术吝,也會進行一定次數(shù)的重試和重啟。

public Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
    waitClientRunning();
    try {
        ...//忽略
        writeWithHeader(Packet.newBuilder()
            .setType(PacketType.GET)
            .setBody(Get.newBuilder()
            .setAutoAck(false)
            .setDestination(clientIdentity.getDestination())
            .setClientId(String.valueOf(clientIdentity.getClientId()))
                .setFetchSize(size)
                .setTimeout(time)
                .setUnit(unit.ordinal())
                .build()
                .toByteString())
            .build()
            .toByteArray());
        return receiveMessages();
    } catch (IOException e) {
        throw new CanalClientException(e);
    }
}

我們可以看到茸苇,其實是發(fā)送了一個GET命令給server端排苍,然后傳遞了一個參數(shù)batchSize,還有超時時間学密,而且不是自動提交的淘衙。服務端的處理是這樣的:

embeddedServer.getWithoutAck(clientIdentity, get.getFetchSize());

也是調(diào)用的這個方法:

@Override
public Message getWithoutAck(ClientIdentity clientIdentity, int batchSize, Long timeout, TimeUnit unit)
                                                throws CanalServerException {
    checkStart(clientIdentity.getDestination());
    checkSubscribe(clientIdentity);

    CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
    synchronized (canalInstance) {
        // 獲取到流式數(shù)據(jù)中的最后一批獲取的位置
        PositionRange<LogPosition> positionRanges = canalInstance.getMetaManager().getLastestBatch(clientIdentity);

        Events<Event> events = null;
        if (positionRanges != null) { // 存在流數(shù)據(jù)
            events = getEvents(canalInstance.getEventStore(), positionRanges.getStart(), batchSize, timeout, unit);
        } else {// ack后第一次獲取
            Position start = canalInstance.getMetaManager().getCursor(clientIdentity);
            if (start == null) { // 第一次,還沒有過ack記錄腻暮,則獲取當前store中的第一條
                start = canalInstance.getEventStore().getFirstPosition();
            }

            events = getEvents(canalInstance.getEventStore(), start, batchSize, timeout, unit);
        }

        if (CollectionUtils.isEmpty(events.getEvents())) {
            logger.debug("getWithoutAck successfully, clientId:{} batchSize:{} but result is null",
                    clientIdentity.getClientId(), batchSize);
            return new Message(-1, new ArrayList<Entry>()); // 返回空包彤守,避免生成batchId毯侦,浪費性能
        } else {
            // 記錄到流式信息
            Long batchId = canalInstance.getMetaManager().addBatch(clientIdentity, events.getPositionRange());
            List<Entry> entrys = Lists.transform(events.getEvents(), new Function<Event, Entry>() {

                public Entry apply(Event input) {
                    return input.getEntry();
                }
            });
            if (logger.isInfoEnabled()) {
                logger.info("getWithoutAck successfully, clientId:{} batchSize:{}  real size is {} and result is [batchId:{} , position:{}]",
                        clientIdentity.getClientId(),
                        batchSize,
                        entrys.size(),
                        batchId,
                        events.getPositionRange());
            }
            return new Message(batchId, entrys);
        }

    }
}

最主要的邏輯在這里:

  • 判斷canalInstance是否已經(jīng)啟動:checkStart
  • 判斷訂閱列表中是否包含當前的client:checkSubscribe
  • 根據(jù)client信息從metaManager中獲取最后消費的批次:getLastestBatch,這塊在運行起來后具垫,是從內(nèi)存中取的侈离,但是在instance啟動時,是從zk中拉取的做修,是從/otter/canal/destinations/{destination}/{clientId}/mark下面獲取的霍狰,后續(xù)也會定時(1s)刷新到這里面
  • 如果能獲取到消費的批次,直接從eventStore的隊列中獲取數(shù)據(jù)饰及。
  • 如果positionRanges為空蔗坯,那么從metaManager中獲取指針。如果指針也沒有燎含,說明原來沒有ack過數(shù)據(jù)宾濒,需要從store中第一條開始獲取。這個過程其實就是找start屏箍,也就是上一次ack的位置绘梦。
  • 調(diào)用getEvent,獲取數(shù)據(jù)赴魁。根據(jù)傳入的參數(shù)不同卸奉,調(diào)用不同的方法去獲取數(shù)據(jù),但是最終都是調(diào)用的goGet方法颖御。這個doGet方法不是很復雜榄棵,主要是根據(jù)參數(shù)從store隊列中獲取數(shù)據(jù),然后把指針進行新的設置潘拱。
  • 如果沒有取到binlog數(shù)據(jù)疹鳄,那么直接返回,批次號為-1芦岂。
  • 如果取到了數(shù)據(jù)瘪弓,記錄一下流式數(shù)據(jù)后返回。

結(jié)果封裝在Messages中禽最,最終改為Message腺怯,包含批次號和binlog列表。

四川无、業(yè)務處理

拿到message后瓢喉,需要進行判斷batchId,如果batchId=-1或者binlog大小為0舀透,說明沒有拿到數(shù)據(jù)。否則在message基礎上進行邏輯處理决左。

Message的內(nèi)容愕够,后續(xù)我們再進行討論。

五惑芭、提交確認

connector.ack(batchId); // 提交確認

提交批次id坠狡,底層發(fā)送CLIENTACK命令到server。server調(diào)用CanalServerWithEmbedded的ack方法來進行提交遂跟。

public void ack(ClientIdentity clientIdentity, long batchId) throws CanalServerException {
    checkStart(clientIdentity.getDestination());
    checkSubscribe(clientIdentity);

    CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
    PositionRange<LogPosition> positionRanges = null;
    positionRanges = canalInstance.getMetaManager().removeBatch(clientIdentity, batchId); // 更新位置
    if (positionRanges == null) { // 說明是重復的ack/rollback
        throw new CanalServerException(String.format("ack error , clientId:%s batchId:%d is not exist , please check",
            clientIdentity.getClientId(),
            batchId));
    }

    // 更新cursor
    if (positionRanges.getAck() != null) {
        canalInstance.getMetaManager().updateCursor(clientIdentity, positionRanges.getAck());
        if (logger.isInfoEnabled()) {
            logger.info("ack successfully, clientId:{} batchId:{} position:{}",
                    clientIdentity.getClientId(),
                    batchId,
                    positionRanges);
        }
    }

    // 可定時清理數(shù)據(jù)
    canalInstance.getEventStore().ack(positionRanges.getEnd());

}

首先更新metaManager中的batch逃沿,然后更新ack指針,同時清理store中到ack指針位置的數(shù)據(jù)幻锁。

六凯亮、回滾

如果有失敗的情況,需要進行回滾哄尔。發(fā)送CLIENTROLLBACK命令給server端假消,進行數(shù)據(jù)回滾×虢樱回滾單個批次時的處理邏輯是這樣的:

@Override
public void rollback(ClientIdentity clientIdentity, Long batchId) throws CanalServerException {
    checkStart(clientIdentity.getDestination());
    CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());

    // 因為存在第一次鏈接時自動rollback的情況富拗,所以需要忽略未訂閱
    boolean hasSubscribe = canalInstance.getMetaManager().hasSubscribe(clientIdentity);
    if (!hasSubscribe) {
        return;
    }
    synchronized (canalInstance) {
        // 清除batch信息
        PositionRange<LogPosition> positionRanges = canalInstance.getMetaManager().removeBatch(clientIdentity,
            batchId);
        if (positionRanges == null) { // 說明是重復的ack/rollback
            throw new CanalServerException(String.format("rollback error, clientId:%s batchId:%d is not exist , please check",
                clientIdentity.getClientId(),
                batchId));
        }

        // lastRollbackPostions.put(clientIdentity,
        // positionRanges.getEnd());// 記錄一下最后rollback的位置
        // TODO 后續(xù)rollback到指定的batchId位置
        canalInstance.getEventStore().rollback();// rollback
                                                 // eventStore中的狀態(tài)信息
        logger.info("rollback successfully, clientId:{} batchId:{} position:{}",
            clientIdentity.getClientId(),
            batchId,
            positionRanges);
    }
}

這里的rollback到指定的batchId,其實是假的鸣戴。他的rollback也是全量回滾到ack的指針位置啃沪。

七、斷開連接

在發(fā)生異常情況時窄锅,client會斷開與server的連接创千,也就是disconnect方法。

public void disconnect() throws CanalClientException {
    if (rollbackOnDisConnect && channel.isConnected()) {
        rollback();
    }

    connected = false;
    if (runningMonitor != null) {
        if (runningMonitor.isStart()) {
            runningMonitor.stop();
        }
    } else {
        doDisconnnect();
    }
}

判斷是否在斷開連接的時候回滾參數(shù)(默認false)和當前socket通道是否連接中签餐,進行回滾。

否則調(diào)用runningMonitor.stop方法進行停止盯串。主要的過程是這樣的:

  • 取消監(jiān)聽/otter/canal/destinations/{destination}/{clientId}/running/節(jié)點變化信息
  • 刪除上面這個節(jié)點
  • 關(guān)閉socket的讀通道
  • 關(guān)閉socket的寫通道
  • 關(guān)閉socket channel
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末氯檐,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子体捏,更是在濱河造成了極大的恐慌冠摄,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,919評論 6 502
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件几缭,死亡現(xiàn)場離奇詭異河泳,居然都是意外死亡,警方通過查閱死者的電腦和手機年栓,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,567評論 3 392
  • 文/潘曉璐 我一進店門拆挥,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人,你說我怎么就攤上這事纸兔《韫希” “怎么了?”我有些...
    開封第一講書人閱讀 163,316評論 0 353
  • 文/不壞的土叔 我叫張陵汉矿,是天一觀的道長崎坊。 經(jīng)常有香客問我,道長洲拇,這世上最難降的妖魔是什么奈揍? 我笑而不...
    開封第一講書人閱讀 58,294評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮赋续,結(jié)果婚禮上男翰,老公的妹妹穿的比我還像新娘。我一直安慰自己蚕捉,他們只是感情好奏篙,可當我...
    茶點故事閱讀 67,318評論 6 390
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著迫淹,像睡著了一般秘通。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上敛熬,一...
    開封第一講書人閱讀 51,245評論 1 299
  • 那天肺稀,我揣著相機與錄音,去河邊找鬼应民。 笑死话原,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的诲锹。 我是一名探鬼主播繁仁,決...
    沈念sama閱讀 40,120評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼归园!你這毒婦竟也來了黄虱?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,964評論 0 275
  • 序言:老撾萬榮一對情侶失蹤庸诱,失蹤者是張志新(化名)和其女友劉穎捻浦,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體桥爽,經(jīng)...
    沈念sama閱讀 45,376評論 1 313
  • 正文 獨居荒郊野嶺守林人離奇死亡朱灿,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,592評論 2 333
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了钠四。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片盗扒。...
    茶點故事閱讀 39,764評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出环疼,到底是詐尸還是另有隱情习霹,我是刑警寧澤,帶...
    沈念sama閱讀 35,460評論 5 344
  • 正文 年R本政府宣布炫隶,位于F島的核電站,受9級特大地震影響阎曹,放射性物質(zhì)發(fā)生泄漏伪阶。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,070評論 3 327
  • 文/蒙蒙 一处嫌、第九天 我趴在偏房一處隱蔽的房頂上張望栅贴。 院中可真熱鬧,春花似錦熏迹、人聲如沸檐薯。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,697評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽坛缕。三九已至,卻和暖如春捆昏,著一層夾襖步出監(jiān)牢的瞬間赚楚,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,846評論 1 269
  • 我被黑心中介騙來泰國打工骗卜, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留宠页,地道東北人。 一個月前我還...
    沈念sama閱讀 47,819評論 2 370
  • 正文 我出身青樓寇仓,卻偏偏與公主長得像举户,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子遍烦,可洞房花燭夜當晚...
    茶點故事閱讀 44,665評論 2 354

推薦閱讀更多精彩內(nèi)容