client的工作過程缕棵,需要我們自己去編寫對應的邏輯性锭,我們目前只能從example寫的例子來看赠潦。目前examle中提供了兩個例子,一個是單機的草冈,一個是集群的cluster她奥,我們后續(xù)如果需要進行開發(fā)的話,其實也是開發(fā)我們自己的client怎棱,以及client的一些邏輯哩俭。我們主要看下集群的client是如何實現(xiàn)和消費的,又是怎么和server進行數(shù)據(jù)交互的蹄殃。
我們來看看具體的代碼:
protected void process() {
int batchSize = 5 * 1024;
while (running) {
try {
MDC.put("destination", destination);
connector.connect();
connector.subscribe();
waiting = false;
while (running) {
Message message = connector.getWithoutAck(batchSize); // 獲取指定數(shù)量的數(shù)據(jù)
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
// try {
// Thread.sleep(1000);
// } catch (InterruptedException e) {
// }
} else {
printSummary(message, batchId, size);
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交確認
// connector.rollback(batchId); // 處理失敗, 回滾數(shù)據(jù)
}
} catch (Exception e) {
logger.error("process error!", e);
} finally {
connector.disconnect();
MDC.remove("destination");
}
}
}
這個的這樣的過程是這樣的
- 連接携茂,connector.connect()
- 訂閱,connector.subscribe
- 獲取數(shù)據(jù)诅岩,connector.getWithoutAck()
- 業(yè)務處理
- 提交確認,connector.ack()
- 回滾,connector.rollback()
- 斷開連接带膜,connector.disconnect()
我們具體來看下吩谦。
一、建立連接
CanalConnector主要有兩個實現(xiàn)膝藕,一個是SimpleCanalConnector式廷,一個是ClusterCanalConnector,我們主要看下ClusterCanalConnector芭挽,這也是我們要用的一個模式滑废。
我們用的時候,通過一個工廠類生成我們需要的Connector袜爪,這里的工廠類是CanalConnectors棋凳,里面包含了生成ClusterCanalConnector的方法皂冰。
public static CanalConnector newClusterConnector(String zkServers, String destination, String username,
String password) {
ClusterCanalConnector canalConnector = new ClusterCanalConnector(username,
password,
destination,
new ClusterNodeAccessStrategy(destination, ZkClientx.getZkClient(zkServers)));
canalConnector.setSoTimeout(30 * 1000);
return canalConnector;
}
用到的參數(shù)有zk的地址,canal的名稱,數(shù)據(jù)庫的賬號密碼骄噪。里面有個ClusterNodeAccessStrategy是用來選擇client的策略丐重,這個ClusterNodeAccessStrategy的構(gòu)造方法里面有些東西需要我們關(guān)注下。
1.1 ClusterNodeAccessStrategy
public ClusterNodeAccessStrategy(String destination, ZkClientx zkClient){
this.zkClient = zkClient;
childListener = new IZkChildListener() {
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
initClusters(currentChilds);
}
};
dataListener = new IZkDataListener() {
public void handleDataDeleted(String dataPath) throws Exception {
runningAddress = null;
}
public void handleDataChange(String dataPath, Object data) throws Exception {
initRunning(data);
}
};
String clusterPath = ZookeeperPathUtils.getDestinationClusterRoot(destination);
this.zkClient.subscribeChildChanges(clusterPath, childListener);
initClusters(this.zkClient.getChildren(clusterPath));
String runningPath = ZookeeperPathUtils.getDestinationServerRunning(destination);
this.zkClient.subscribeDataChanges(runningPath, dataListener);
initRunning(this.zkClient.readData(runningPath, true));
}
這邊起了兩個監(jiān)聽器,都是監(jiān)聽server端的活動服務器的诱咏。一個是獲取所有的server列表,一個是獲取活動的server服務器缴挖,都是從zk的對應節(jié)點上去取的袋狞。
1.2 連接connect
獲取到CanalConnector之后,就是真正的連接了映屋。在ClusterCanalConnector中苟鸯,我們可以看到,其實他底層用的也是SimpleCanalConnector秧荆,只不過加了一個選擇的策略倔毙。
public void connect() throws CanalClientException {
if (connected) {
return;
}
if (runningMonitor != null) {
if (!runningMonitor.isStart()) {
runningMonitor.start();
}
} else {
waitClientRunning();
if (!running) {
return;
}
doConnect();
if (filter != null) { // 如果存在條件,說明是自動切換乙濒,基于上一次的條件訂閱一次
subscribe(filter);
}
if (rollbackOnConnect) {
rollback();
}
}
connected = true;
}
如果是集群模式的客戶端陕赃,那么這邊的runningMonitor不為空,因為他進行了初始化颁股。我們主要看下runningMonitor.start()里面的操作么库。
public void start() {
super.start();
String path = ZookeeperPathUtils.getDestinationClientRunning(this.destination, clientData.getClientId());
zkClient.subscribeDataChanges(path, dataListener);
initRunning();
}
這邊監(jiān)聽的路徑是:/otter/canal/destinations/{destination}/{clientId}/running。如果有任何的變化甘有,或節(jié)點的刪除诉儒,那么執(zhí)行dataListener里面的操作。
dataListener = new IZkDataListener() {
public void handleDataChange(String dataPath, Object data) throws Exception {
MDC.put("destination", destination);
ClientRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ClientRunningData.class);
if (!isMine(runningData.getAddress())) {
mutex.set(false);
}
if (!runningData.isActive() && isMine(runningData.getAddress())) { // 說明出現(xiàn)了主動釋放的操作亏掀,并且本機之前是active
release = true;
releaseRunning();// 徹底釋放mainstem
}
activeData = (ClientRunningData) runningData;
}
public void handleDataDeleted(String dataPath) throws Exception {
MDC.put("destination", destination);
mutex.set(false);
// 觸發(fā)一下退出,可能是人為干預的釋放操作或者網(wǎng)絡閃斷引起的session expired timeout
processActiveExit();
if (!release && activeData != null && isMine(activeData.getAddress())) {
// 如果上一次active的狀態(tài)就是本機忱反,則即時觸發(fā)一下active搶占
initRunning();
} else {
// 否則就是等待delayTime,避免因網(wǎng)絡瞬端或者zk異常滤愕,導致出現(xiàn)頻繁的切換操作
delayExector.schedule(new Runnable() {
public void run() {
initRunning();
}
}, delayTime, TimeUnit.SECONDS);
}
}
};
這里的注釋比較清楚温算,基本上如果數(shù)據(jù)發(fā)生了變化,那么進行節(jié)點釋放后间影,將運行節(jié)點置為活動節(jié)點注竿。如果發(fā)生了數(shù)據(jù)刪除,那么直接觸發(fā)退出魂贬,如果上一次的active狀態(tài)是本機巩割,那么觸發(fā)一下active搶占,否則等待delayTime付燥,默認5s后重試宣谈。下面我們主要看下initRunning。
1.3 initRunning
這塊主要是創(chuàng)建運行節(jié)點的臨時節(jié)點机蔗。節(jié)點路徑是/otter/canal/destinations/{destination}/{clientId}蒲祈,節(jié)點內(nèi)容是ClientRunningData的json序列化結(jié)果甘萧。連接的代碼:
public InetSocketAddress processActiveEnter() {
InetSocketAddress address = doConnect();
mutex.set(true);
if (filter != null) { // 如果存在條件,說明是自動切換梆掸,基于上一次的條件訂閱一次
subscribe(filter);
}
if (rollbackOnConnect) {
rollback();
}
return address;
}
這塊有幾段邏輯扬卷,我們慢慢看下。
1.3.1 doConnect
這里是client直接連上了server酸钦,通過socket連接怪得,也就是server暴露的socket端口。
private InetSocketAddress doConnect() throws CanalClientException {
try {
channel = SocketChannel.open();
channel.socket().setSoTimeout(soTimeout);
SocketAddress address = getAddress();
if (address == null) {
address = getNextAddress();
}
channel.connect(address);
readableChannel = Channels.newChannel(channel.socket().getInputStream());
writableChannel = Channels.newChannel(channel.socket().getOutputStream());
Packet p = Packet.parseFrom(readNextPacket());
if (p.getVersion() != 1) {
throw new CanalClientException("unsupported version at this client.");
}
if (p.getType() != PacketType.HANDSHAKE) {
throw new CanalClientException("expect handshake but found other type.");
}
//
Handshake handshake = Handshake.parseFrom(p.getBody());
supportedCompressions.addAll(handshake.getSupportedCompressionsList());
//
ClientAuth ca = ClientAuth.newBuilder()
.setUsername(username != null ? username : "")
.setPassword(ByteString.copyFromUtf8(password != null ? password : ""))
.setNetReadTimeout(soTimeout)
.setNetWriteTimeout(soTimeout)
.build();
writeWithHeader(Packet.newBuilder()
.setType(PacketType.CLIENTAUTHENTICATION)
.setBody(ca.toByteString())
.build()
.toByteArray());
//
Packet ack = Packet.parseFrom(readNextPacket());
if (ack.getType() != PacketType.ACK) {
throw new CanalClientException("unexpected packet type when ack is expected");
}
Ack ackBody = Ack.parseFrom(ack.getBody());
if (ackBody.getErrorCode() > 0) {
throw new CanalClientException("something goes wrong when doing authentication: "
+ ackBody.getErrorMessage());
}
connected = true;
return new InetSocketAddress(channel.socket().getLocalAddress(), channel.socket().getLocalPort());
} catch (IOException e) {
throw new CanalClientException(e);
}
}
這邊采用NIO編程卑硫,建立和server的socket連接后徒恋,發(fā)送了握手包和認證包,當收到ack包后欢伏,認為連接成功入挣。認證包的服務端處理在ClientAuthenticationHandler類中,握手處理在HandshakeInitializationHandler類硝拧。
server接收到認證的消息后径筏,會做如下的處理:
public void messageReceived(final ChannelHandlerContext ctx, MessageEvent e) throws Exception {
ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
final Packet packet = Packet.parseFrom(buffer.readBytes(buffer.readableBytes()).array());
switch (packet.getVersion()) {
case SUPPORTED_VERSION:
default:
final ClientAuth clientAuth = ClientAuth.parseFrom(packet.getBody());
// 如果存在訂閱信息
if (StringUtils.isNotEmpty(clientAuth.getDestination())
&& StringUtils.isNotEmpty(clientAuth.getClientId())) {
ClientIdentity clientIdentity = new ClientIdentity(clientAuth.getDestination(),
Short.valueOf(clientAuth.getClientId()),
clientAuth.getFilter());
try {
MDC.put("destination", clientIdentity.getDestination());
embeddedServer.subscribe(clientIdentity);
ctx.setAttachment(clientIdentity);// 設置狀態(tài)數(shù)據(jù)
// 嘗試啟動,如果已經(jīng)啟動障陶,忽略
if (!embeddedServer.isStart(clientIdentity.getDestination())) {
ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(clientIdentity.getDestination());
if (!runningMonitor.isStart()) {
runningMonitor.start();
}
}
} finally {
MDC.remove("destination");
}
}
NettyUtils.ack(ctx.getChannel(), new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
//忽略
}
});
break;
}
}
主要的邏輯在subscribe里面滋恬。如果metaManager沒有啟動,那么需要進行啟動抱究。啟動時恢氯,會從zk節(jié)點下面拉取一些數(shù)據(jù),包括客戶端的消費位點情況等等鼓寺。然后就是訂閱勋拟,訂閱是新建一個zk節(jié)點,路徑為/otter/canal/destinations/{destination}/{clientId}妈候。然后還有一些過濾器指黎,也需要寫到zk中。之后就是獲取一下本client的位點信息州丹,如果原來zk中包含,那么直接從內(nèi)存中獲取杂彭,否則取eventStore的第一條數(shù)據(jù)墓毒。
1.3.2 subscribe
發(fā)送訂閱消息給server,通過socket的方式亲怠。這邊是判斷所计,如果filter不為空,才發(fā)送訂閱消息团秽。服務端的處理過程是這樣的:
case SUBSCRIPTION:
Sub sub = Sub.parseFrom(packet.getBody());
if (StringUtils.isNotEmpty(sub.getDestination()) && StringUtils.isNotEmpty(sub.getClientId())) {
clientIdentity = new ClientIdentity(sub.getDestination(),
Short.valueOf(sub.getClientId()),
sub.getFilter());
MDC.put("destination", clientIdentity.getDestination());
// 嘗試啟動主胧,如果已經(jīng)啟動叭首,忽略
if (!embeddedServer.isStart(clientIdentity.getDestination())) {
ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(clientIdentity.getDestination());
if (!runningMonitor.isStart()) {
runningMonitor.start();
}
}
embeddedServer.subscribe(clientIdentity);
ctx.setAttachment(clientIdentity);// 設置狀態(tài)數(shù)據(jù)
NettyUtils.ack(ctx.getChannel(), null);
} else {
NettyUtils.error(401,
MessageFormatter.format("destination or clientId is null", sub.toString()).getMessage(),
ctx.getChannel(),
null);
}
break;
類似于connect的過程,不過這邊帶上了filter的參數(shù)踪栋。這邊啟動了server以及他的監(jiān)聽器焙格。
1.3.3 rollback
這里的回滾是指回滾server端記錄的本client的位點信息。
public void rollback() throws CanalClientException {
waitClientRunning();
rollback(0);// 0代筆未設置
}
這里發(fā)送了rollback的指令夷都。服務端是這么處理的:
case CLIENTROLLBACK:
ClientRollback rollback = CanalPacket.ClientRollback.parseFrom(packet.getBody());
MDC.put("destination", rollback.getDestination());
if (StringUtils.isNotEmpty(rollback.getDestination())
&& StringUtils.isNotEmpty(rollback.getClientId())) {
clientIdentity = new ClientIdentity(rollback.getDestination(),
Short.valueOf(rollback.getClientId()));
if (rollback.getBatchId() == 0L) {
embeddedServer.rollback(clientIdentity);// 回滾所有批次
} else {
embeddedServer.rollback(clientIdentity, rollback.getBatchId()); // 只回滾單個批次
}
} else {
NettyUtils.error(401,
MessageFormatter.format("destination or clientId is null", rollback.toString())
.getMessage(),
ctx.getChannel(),
null);
}
break;
這里的batchId傳入的是0眷唉,也就是要回滾所有的批次。我們來看下這個回滾的動作:
@Override
public void rollback(ClientIdentity clientIdentity) throws CanalServerException {
checkStart(clientIdentity.getDestination());
CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
// 因為存在第一次鏈接時自動rollback的情況囤官,所以需要忽略未訂閱
boolean hasSubscribe = canalInstance.getMetaManager().hasSubscribe(clientIdentity);
if (!hasSubscribe) {
return;
}
synchronized (canalInstance) {
// 清除batch信息
canalInstance.getMetaManager().clearAllBatchs(clientIdentity);
// rollback eventStore中的狀態(tài)信息
canalInstance.getEventStore().rollback();
logger.info("rollback successfully, clientId:{}", new Object[] { clientIdentity.getClientId() });
}
}
這里回滾的冬阳,其實是eventStore中的指針,把get的指針設置為之前ack的指針党饮。
二肝陪、訂閱數(shù)據(jù)
當client連接server完成后,就需要進行binlog數(shù)據(jù)的訂閱刑顺。
public void subscribe() throws CanalClientException {
subscribe(""); // 傳遞空字符即可
}
public void subscribe(String filter) throws CanalClientException {
int times = 0;
while (times < retryTimes) {
try {
currentConnector.subscribe(filter);
this.filter = filter;
return;
} catch (Throwable t) {
if (retryTimes == -1 && t.getCause() instanceof InterruptedException) {
logger.info("block waiting interrupted by other thread.");
return;
} else {
logger.warn(String.format(
"something goes wrong when subscribing from server: %s",
currentConnector != null ? currentConnector.getAddress() : "null"),
t);
times++;
restart();
logger.info("restart the connector for next round retry.");
}
}
}
throw new CanalClientException("failed to subscribe after " + times + " times retry.");
}
訂閱這塊的內(nèi)容不再贅述氯窍,在上面的connect過程中有提到。這邊還有一個失敗重試的機制捏检,當異常不是中斷異常的情況下荞驴,會重試重啟client connector,直到達到了閾值retryTimes贯城。
三熊楼、獲取數(shù)據(jù)
在建立連接和進行數(shù)據(jù)訂閱之后,就可以開始進行binlog數(shù)據(jù)的獲取了能犯。主要的方法是getWithOutAck這個方法鲫骗,這種是需要client自己進行數(shù)據(jù)ack的,保證了只有數(shù)據(jù)真正的被消費踩晶,而且進行了業(yè)務邏輯處理之后执泰,才會ack。當然渡蜻,如果有了異常术吝,也會進行一定次數(shù)的重試和重啟。
public Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
waitClientRunning();
try {
...//忽略
writeWithHeader(Packet.newBuilder()
.setType(PacketType.GET)
.setBody(Get.newBuilder()
.setAutoAck(false)
.setDestination(clientIdentity.getDestination())
.setClientId(String.valueOf(clientIdentity.getClientId()))
.setFetchSize(size)
.setTimeout(time)
.setUnit(unit.ordinal())
.build()
.toByteString())
.build()
.toByteArray());
return receiveMessages();
} catch (IOException e) {
throw new CanalClientException(e);
}
}
我們可以看到茸苇,其實是發(fā)送了一個GET命令給server端排苍,然后傳遞了一個參數(shù)batchSize,還有超時時間学密,而且不是自動提交的淘衙。服務端的處理是這樣的:
embeddedServer.getWithoutAck(clientIdentity, get.getFetchSize());
也是調(diào)用的這個方法:
@Override
public Message getWithoutAck(ClientIdentity clientIdentity, int batchSize, Long timeout, TimeUnit unit)
throws CanalServerException {
checkStart(clientIdentity.getDestination());
checkSubscribe(clientIdentity);
CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
synchronized (canalInstance) {
// 獲取到流式數(shù)據(jù)中的最后一批獲取的位置
PositionRange<LogPosition> positionRanges = canalInstance.getMetaManager().getLastestBatch(clientIdentity);
Events<Event> events = null;
if (positionRanges != null) { // 存在流數(shù)據(jù)
events = getEvents(canalInstance.getEventStore(), positionRanges.getStart(), batchSize, timeout, unit);
} else {// ack后第一次獲取
Position start = canalInstance.getMetaManager().getCursor(clientIdentity);
if (start == null) { // 第一次,還沒有過ack記錄腻暮,則獲取當前store中的第一條
start = canalInstance.getEventStore().getFirstPosition();
}
events = getEvents(canalInstance.getEventStore(), start, batchSize, timeout, unit);
}
if (CollectionUtils.isEmpty(events.getEvents())) {
logger.debug("getWithoutAck successfully, clientId:{} batchSize:{} but result is null",
clientIdentity.getClientId(), batchSize);
return new Message(-1, new ArrayList<Entry>()); // 返回空包彤守,避免生成batchId毯侦,浪費性能
} else {
// 記錄到流式信息
Long batchId = canalInstance.getMetaManager().addBatch(clientIdentity, events.getPositionRange());
List<Entry> entrys = Lists.transform(events.getEvents(), new Function<Event, Entry>() {
public Entry apply(Event input) {
return input.getEntry();
}
});
if (logger.isInfoEnabled()) {
logger.info("getWithoutAck successfully, clientId:{} batchSize:{} real size is {} and result is [batchId:{} , position:{}]",
clientIdentity.getClientId(),
batchSize,
entrys.size(),
batchId,
events.getPositionRange());
}
return new Message(batchId, entrys);
}
}
}
最主要的邏輯在這里:
- 判斷canalInstance是否已經(jīng)啟動:checkStart
- 判斷訂閱列表中是否包含當前的client:checkSubscribe
- 根據(jù)client信息從metaManager中獲取最后消費的批次:getLastestBatch,這塊在運行起來后具垫,是從內(nèi)存中取的侈离,但是在instance啟動時,是從zk中拉取的做修,是從/otter/canal/destinations/{destination}/{clientId}/mark下面獲取的霍狰,后續(xù)也會定時(1s)刷新到這里面
- 如果能獲取到消費的批次,直接從eventStore的隊列中獲取數(shù)據(jù)饰及。
- 如果positionRanges為空蔗坯,那么從metaManager中獲取指針。如果指針也沒有燎含,說明原來沒有ack過數(shù)據(jù)宾濒,需要從store中第一條開始獲取。這個過程其實就是找start屏箍,也就是上一次ack的位置绘梦。
- 調(diào)用getEvent,獲取數(shù)據(jù)赴魁。根據(jù)傳入的參數(shù)不同卸奉,調(diào)用不同的方法去獲取數(shù)據(jù),但是最終都是調(diào)用的goGet方法颖御。這個doGet方法不是很復雜榄棵,主要是根據(jù)參數(shù)從store隊列中獲取數(shù)據(jù),然后把指針進行新的設置潘拱。
- 如果沒有取到binlog數(shù)據(jù)疹鳄,那么直接返回,批次號為-1芦岂。
- 如果取到了數(shù)據(jù)瘪弓,記錄一下流式數(shù)據(jù)后返回。
結(jié)果封裝在Messages中禽最,最終改為Message腺怯,包含批次號和binlog列表。
四川无、業(yè)務處理
拿到message后瓢喉,需要進行判斷batchId,如果batchId=-1或者binlog大小為0舀透,說明沒有拿到數(shù)據(jù)。否則在message基礎上進行邏輯處理决左。
Message的內(nèi)容愕够,后續(xù)我們再進行討論。
五惑芭、提交確認
connector.ack(batchId); // 提交確認
提交批次id坠狡,底層發(fā)送CLIENTACK命令到server。server調(diào)用CanalServerWithEmbedded的ack方法來進行提交遂跟。
public void ack(ClientIdentity clientIdentity, long batchId) throws CanalServerException {
checkStart(clientIdentity.getDestination());
checkSubscribe(clientIdentity);
CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
PositionRange<LogPosition> positionRanges = null;
positionRanges = canalInstance.getMetaManager().removeBatch(clientIdentity, batchId); // 更新位置
if (positionRanges == null) { // 說明是重復的ack/rollback
throw new CanalServerException(String.format("ack error , clientId:%s batchId:%d is not exist , please check",
clientIdentity.getClientId(),
batchId));
}
// 更新cursor
if (positionRanges.getAck() != null) {
canalInstance.getMetaManager().updateCursor(clientIdentity, positionRanges.getAck());
if (logger.isInfoEnabled()) {
logger.info("ack successfully, clientId:{} batchId:{} position:{}",
clientIdentity.getClientId(),
batchId,
positionRanges);
}
}
// 可定時清理數(shù)據(jù)
canalInstance.getEventStore().ack(positionRanges.getEnd());
}
首先更新metaManager中的batch逃沿,然后更新ack指針,同時清理store中到ack指針位置的數(shù)據(jù)幻锁。
六凯亮、回滾
如果有失敗的情況,需要進行回滾哄尔。發(fā)送CLIENTROLLBACK命令給server端假消,進行數(shù)據(jù)回滾×虢樱回滾單個批次時的處理邏輯是這樣的:
@Override
public void rollback(ClientIdentity clientIdentity, Long batchId) throws CanalServerException {
checkStart(clientIdentity.getDestination());
CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
// 因為存在第一次鏈接時自動rollback的情況富拗,所以需要忽略未訂閱
boolean hasSubscribe = canalInstance.getMetaManager().hasSubscribe(clientIdentity);
if (!hasSubscribe) {
return;
}
synchronized (canalInstance) {
// 清除batch信息
PositionRange<LogPosition> positionRanges = canalInstance.getMetaManager().removeBatch(clientIdentity,
batchId);
if (positionRanges == null) { // 說明是重復的ack/rollback
throw new CanalServerException(String.format("rollback error, clientId:%s batchId:%d is not exist , please check",
clientIdentity.getClientId(),
batchId));
}
// lastRollbackPostions.put(clientIdentity,
// positionRanges.getEnd());// 記錄一下最后rollback的位置
// TODO 后續(xù)rollback到指定的batchId位置
canalInstance.getEventStore().rollback();// rollback
// eventStore中的狀態(tài)信息
logger.info("rollback successfully, clientId:{} batchId:{} position:{}",
clientIdentity.getClientId(),
batchId,
positionRanges);
}
}
這里的rollback到指定的batchId,其實是假的鸣戴。他的rollback也是全量回滾到ack的指針位置啃沪。
七、斷開連接
在發(fā)生異常情況時窄锅,client會斷開與server的連接创千,也就是disconnect方法。
public void disconnect() throws CanalClientException {
if (rollbackOnDisConnect && channel.isConnected()) {
rollback();
}
connected = false;
if (runningMonitor != null) {
if (runningMonitor.isStart()) {
runningMonitor.stop();
}
} else {
doDisconnnect();
}
}
判斷是否在斷開連接的時候回滾參數(shù)(默認false)和當前socket通道是否連接中签餐,進行回滾。
否則調(diào)用runningMonitor.stop方法進行停止盯串。主要的過程是這樣的:
- 取消監(jiān)聽/otter/canal/destinations/{destination}/{clientId}/running/節(jié)點變化信息
- 刪除上面這個節(jié)點
- 關(guān)閉socket的讀通道
- 關(guān)閉socket的寫通道
- 關(guān)閉socket channel