Kafka 多線程消費者

Kafka Java Consumer設計
Kafka Java Consumer采用的是單線程的設計吏饿。其入口類KafkaConsumer是一個雙線程的設計,即用戶主線程和心跳線程笆怠。
用戶主線程铝耻,指的是啟動Consumer應用程序main方法的線程,新引入的心跳線程(Heartbeat Thread)只負責定期給對應的Broker機器發(fā)送心跳請求,以表示消費者應用的存活性瓢捉。

多線程方案
KafkaConsumer不是線程安全的频丘,在使用中必須確保線程安全。

線程安全:多線程訪問時泡态,采用加鎖機制搂漠,當一個線程訪問該類的某個數(shù)據(jù)時,進行保護某弦,其他線程不能進行訪問直到該線程讀取完桐汤,
          其他線程才可使用。不會出現(xiàn)數(shù)據(jù)不一致或數(shù)據(jù)污染問題刀崖。
線程不安全:不提供數(shù)據(jù)訪問保護惊科,有可能出現(xiàn)多個線程先后更改數(shù)據(jù)造成所得到的數(shù)據(jù)時臟數(shù)據(jù)。

鑒于以上事實亮钦,我們可以指定兩套多線程方案:
1.消費者程序啟動多個線程,每個線程維護專屬的KafkaConsumer充活,負責完整的消息獲取蜂莉、消息處理流程。


image.png

2.消費者程序使用單或多線程獲取消息混卵,同時創(chuàng)建多個消費線程執(zhí)行消息處理邏輯映穗。獲取消息的線程可以是一個或多個,每個維護專屬KafkaConsumer實例幕随,處理消息交由特定線程池來做蚁滋,從而實現(xiàn)消息獲取與消息處理的真正解耦。


image.png

以下為兩種方案的優(yōu)缺點:


image.png

實例代碼
1.方案一:

public class KafkaConsumerRunner implements Runnable {
  private final AtomicBoolean closed = new AtomicBoolean(false);
  private final KafkaConsumer consumer;

  public void run() {
    try{
      consumer.subscribe(Arrays.asList("topic"));
      while(!closed.get()){
        ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));
      }
    }catch (WakeupException e){
      if (!closed.get()) throw e;
    }finally {
      consumer.close();
    }
  }

  public void shutdown() {
    closed.set(true);
    consumer.wakeup();
  }
}

2.方案二:

private final KafkaConsumer<String, String> consumer;
private ExecutorService executors;
...

private int workerNum = ...;
executors = new ThreadPoolExecutor(
            workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
            new ArrayBlockingQueue<>(1000),
            new ThreadPoolExecutor.CallerRunsPolicy());

...
while (true) {
  ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
  for (final ConsumerRecord record : records) {
    executors.submit(new Worker(record));
  }
}
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末赘淮,一起剝皮案震驚了整個濱河市辕录,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌梢卸,老刑警劉巖走诞,帶你破解...
    沈念sama閱讀 210,978評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異蛤高,居然都是意外死亡蚣旱,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,954評論 2 384
  • 文/潘曉璐 我一進店門戴陡,熙熙樓的掌柜王于貴愁眉苦臉地迎上來塞绿,“玉大人,你說我怎么就攤上這事恤批∫煳牵” “怎么了?”我有些...
    開封第一講書人閱讀 156,623評論 0 345
  • 文/不壞的土叔 我叫張陵开皿,是天一觀的道長涧黄。 經(jīng)常有香客問我篮昧,道長,這世上最難降的妖魔是什么笋妥? 我笑而不...
    開封第一講書人閱讀 56,324評論 1 282
  • 正文 為了忘掉前任懊昨,我火速辦了婚禮,結果婚禮上春宣,老公的妹妹穿的比我還像新娘酵颁。我一直安慰自己,他們只是感情好月帝,可當我...
    茶點故事閱讀 65,390評論 5 384
  • 文/花漫 我一把揭開白布躏惋。 她就那樣靜靜地躺著,像睡著了一般嚷辅。 火紅的嫁衣襯著肌膚如雪簿姨。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,741評論 1 289
  • 那天簸搞,我揣著相機與錄音扁位,去河邊找鬼。 笑死趁俊,一個胖子當著我的面吹牛域仇,可吹牛的內容都是我干的。 我是一名探鬼主播寺擂,決...
    沈念sama閱讀 38,892評論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼暇务,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了怔软?” 一聲冷哼從身側響起垦细,我...
    開封第一講書人閱讀 37,655評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎爽雄,沒想到半個月后蝠检,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,104評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡挚瘟,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,451評論 2 325
  • 正文 我和宋清朗相戀三年叹谁,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片乘盖。...
    茶點故事閱讀 38,569評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡跌宛,死狀恐怖店溢,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情,我是刑警寧澤黔帕,帶...
    沈念sama閱讀 34,254評論 4 328
  • 正文 年R本政府宣布,位于F島的核電站,受9級特大地震影響,放射性物質發(fā)生泄漏国旷。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,834評論 3 312
  • 文/蒙蒙 一茫死、第九天 我趴在偏房一處隱蔽的房頂上張望跪但。 院中可真熱鬧,春花似錦峦萎、人聲如沸屡久。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,725評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽被环。三九已至,卻和暖如春详幽,著一層夾襖步出監(jiān)牢的瞬間筛欢,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,950評論 1 264
  • 我被黑心中介騙來泰國打工唇聘, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留悴能,地道東北人。 一個月前我還...
    沈念sama閱讀 46,260評論 2 360
  • 正文 我出身青樓雳灾,卻偏偏與公主長得像,于是被迫代替她去往敵國和親冯凹。 傳聞我的和親對象是個殘疾皇子谎亩,可洞房花燭夜當晚...
    茶點故事閱讀 43,446評論 2 348

推薦閱讀更多精彩內容