如果不開啟checkpoint機(jī)制,flink job 在運(yùn)行時(shí)如果遇到異常整個(gè)job就會(huì)停止匪补。
如果開啟了checkpoint機(jī)制材泄,就會(huì)根據(jù)恢復(fù)點(diǎn)進(jìn)行數(shù)據(jù)重試,這個(gè)是一個(gè)非常復(fù)雜的機(jī)制对室,需要單獨(dú)的文章進(jìn)行解析。
所以開啟checkpoint是必然要做的配置咖祭。
在與rabbitmq集成時(shí)有個(gè)點(diǎn)必須要注意掩宜,就是mq 發(fā)送消息時(shí)候,必須要帶上CorrelationId么翰。我們看一下flink的官方文檔锭亏。
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointInterval(10000);
DataStream dataStreamSource = env.addSource(new RMQSource(connectionConfig,
"kgraph",true,new SimpleStringSchema()));
dataStreamSource.print();
上面是構(gòu)造RMQSource(…)的參數(shù),如下
queueName: The RabbitMQ queue name.
usesCorrelationId:?true?when correlation ids should be used,?false?otherwise (default is?false).
deserializationScehma: Deserialization schema to turn messages into Java objects.
根據(jù)參數(shù)不同硬鞍,有如下三種模式
Exactly-once (when checkpointed) with RabbitMQ transactions and messages with unique correlation IDs.
At-least-once (when checkpointed) with RabbitMQ transactions but no deduplication mechanism (correlation id is not set).
No strong delivery guarantees (without checkpointing) with RabbitMQ auto-commit mode.
那么 開啟 exactly-once 確保消費(fèi)一次的特性慧瘤,就必須在傳遞 mq消息的時(shí)候帶上 correlationId。
correlation Id 是 mq 消息的一個(gè)基本屬性固该,可以用來標(biāo)識(shí)消息的唯一id锅减,通常是mq實(shí)現(xiàn)rpc調(diào)用時(shí)使用,flink 利用其唯一id的特性來做 exactly once的消費(fèi)伐坏。所以在發(fā)送mq消息時(shí) 加上 correlation_id 的properties 就不會(huì)有問題了怔匣。
如果使用 spring 結(jié)合 rabbitmq 作為客戶端,需要對(duì) correlationId 做一個(gè)特別的處理
就是需要自己手動(dòng)設(shè)置correaltionId桦沉, rabbittemplate 沒有自動(dòng)的封裝這個(gè)屬性每瞒,convertAndSend這個(gè)方法非常讓人confuse,
里面支持傳入correlationData字段纯露,但是這個(gè)是寫入到消息頭的剿骨,而不是correlation_id,flink那邊永遠(yuǎn)無(wú)法讀取到埠褪。
public void sendMsg(KgraphUpdateMessage kgraphUpdateMessage)
{
CorrelationData correlationId =new CorrelationData(UUID.randomUUID().toString());
ObjectMapper jsonReader =new ObjectMapper();
try
? ? {
MessageProperties properties =new MessageProperties();
properties.setCorrelationId(correlationId.getId().getBytes());
Message message =new Message(jsonReader.writeValueAsBytes(kgraphUpdateMessage), properties);
rabbitTemplate.convertAndSend(KgraphMqConfig.KGRAPH_EXCHANGE, KgraphMqConfig.KGRAPH_TOPIC_EVENT, message,correlationId);
}catch (JsonProcessingException e)
{
e.printStackTrace();
}
}