Flink 與rabbitmq集成 并開啟checkpoint

如果不開啟checkpoint機(jī)制,flink job 在運(yùn)行時(shí)如果遇到異常整個(gè)job就會(huì)停止匪补。

如果開啟了checkpoint機(jī)制材泄,就會(huì)根據(jù)恢復(fù)點(diǎn)進(jìn)行數(shù)據(jù)重試,這個(gè)是一個(gè)非常復(fù)雜的機(jī)制对室,需要單獨(dú)的文章進(jìn)行解析。

所以開啟checkpoint是必然要做的配置咖祭。

在與rabbitmq集成時(shí)有個(gè)點(diǎn)必須要注意掩宜,就是mq 發(fā)送消息時(shí)候,必須要帶上CorrelationId么翰。我們看一下flink的官方文檔锭亏。

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

env.getCheckpointConfig().setCheckpointInterval(10000);

DataStream dataStreamSource = env.addSource(new RMQSource(connectionConfig,

"kgraph",true,new SimpleStringSchema()));

dataStreamSource.print();

上面是構(gòu)造RMQSource(…)的參數(shù),如下

queueName: The RabbitMQ queue name.

usesCorrelationId:?true?when correlation ids should be used,?false?otherwise (default is?false).

deserializationScehma: Deserialization schema to turn messages into Java objects.

根據(jù)參數(shù)不同硬鞍,有如下三種模式

Exactly-once (when checkpointed) with RabbitMQ transactions and messages with unique correlation IDs.

At-least-once (when checkpointed) with RabbitMQ transactions but no deduplication mechanism (correlation id is not set).

No strong delivery guarantees (without checkpointing) with RabbitMQ auto-commit mode.

那么 開啟 exactly-once 確保消費(fèi)一次的特性慧瘤,就必須在傳遞 mq消息的時(shí)候帶上 correlationId。

correlation Id 是 mq 消息的一個(gè)基本屬性固该,可以用來標(biāo)識(shí)消息的唯一id锅减,通常是mq實(shí)現(xiàn)rpc調(diào)用時(shí)使用,flink 利用其唯一id的特性來做 exactly once的消費(fèi)伐坏。所以在發(fā)送mq消息時(shí) 加上 correlation_id 的properties 就不會(huì)有問題了怔匣。


如果使用 spring 結(jié)合 rabbitmq 作為客戶端,需要對(duì) correlationId 做一個(gè)特別的處理

就是需要自己手動(dòng)設(shè)置correaltionId桦沉, rabbittemplate 沒有自動(dòng)的封裝這個(gè)屬性每瞒,convertAndSend這個(gè)方法非常讓人confuse,

里面支持傳入correlationData字段纯露,但是這個(gè)是寫入到消息頭的剿骨,而不是correlation_id,flink那邊永遠(yuǎn)無(wú)法讀取到埠褪。

public void sendMsg(KgraphUpdateMessage kgraphUpdateMessage)

{

CorrelationData correlationId =new CorrelationData(UUID.randomUUID().toString());

ObjectMapper jsonReader =new ObjectMapper();

try

? ? {

MessageProperties properties =new MessageProperties();

properties.setCorrelationId(correlationId.getId().getBytes());

Message message =new Message(jsonReader.writeValueAsBytes(kgraphUpdateMessage), properties);

rabbitTemplate.convertAndSend(KgraphMqConfig.KGRAPH_EXCHANGE, KgraphMqConfig.KGRAPH_TOPIC_EVENT, message,correlationId);

}catch (JsonProcessingException e)

{

e.printStackTrace();

}

}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末浓利,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子钞速,更是在濱河造成了極大的恐慌贷掖,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,639評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件渴语,死亡現(xiàn)場(chǎng)離奇詭異苹威,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)驾凶,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,277評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門牙甫,熙熙樓的掌柜王于貴愁眉苦臉地迎上來潮改,“玉大人,你說我怎么就攤上這事腹暖。” “怎么了翰萨?”我有些...
    開封第一講書人閱讀 157,221評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵脏答,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我亩鬼,道長(zhǎng)殖告,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,474評(píng)論 1 283
  • 正文 為了忘掉前任雳锋,我火速辦了婚禮黄绩,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘玷过。我一直安慰自己爽丹,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,570評(píng)論 6 386
  • 文/花漫 我一把揭開白布辛蚊。 她就那樣靜靜地躺著粤蝎,像睡著了一般。 火紅的嫁衣襯著肌膚如雪袋马。 梳的紋絲不亂的頭發(fā)上初澎,一...
    開封第一講書人閱讀 49,816評(píng)論 1 290
  • 那天,我揣著相機(jī)與錄音虑凛,去河邊找鬼碑宴。 笑死,一個(gè)胖子當(dāng)著我的面吹牛桑谍,可吹牛的內(nèi)容都是我干的延柠。 我是一名探鬼主播,決...
    沈念sama閱讀 38,957評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼锣披,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼捕仔!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起盈罐,我...
    開封第一講書人閱讀 37,718評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤榜跌,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后盅粪,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體钓葫,經(jīng)...
    沈念sama閱讀 44,176評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,511評(píng)論 2 327
  • 正文 我和宋清朗相戀三年票顾,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了础浮。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片帆调。...
    茶點(diǎn)故事閱讀 38,646評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖豆同,靈堂內(nèi)的尸體忽然破棺而出番刊,到底是詐尸還是另有隱情,我是刑警寧澤影锈,帶...
    沈念sama閱讀 34,322評(píng)論 4 330
  • 正文 年R本政府宣布芹务,位于F島的核電站,受9級(jí)特大地震影響鸭廷,放射性物質(zhì)發(fā)生泄漏枣抱。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,934評(píng)論 3 313
  • 文/蒙蒙 一辆床、第九天 我趴在偏房一處隱蔽的房頂上張望佳晶。 院中可真熱鬧,春花似錦讼载、人聲如沸轿秧。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,755評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)淤刃。三九已至,卻和暖如春吱型,著一層夾襖步出監(jiān)牢的瞬間逸贾,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,987評(píng)論 1 266
  • 我被黑心中介騙來泰國(guó)打工津滞, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留铝侵,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,358評(píng)論 2 360
  • 正文 我出身青樓触徐,卻偏偏與公主長(zhǎng)得像咪鲜,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子撞鹉,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,514評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容