[Kafka] kafka consumer 0.9筆記

變化

  1. 支持ssl/sasl
  2. 支持group management protocol狸臣,允許consumer groups隨著broker數(shù)增大
  3. 更小的依賴酿雪,不需要依賴kafka core

概念

  1. Consumer group
    一組消費(fèi)同一個Topic的Consumer的集合,每個consumer的加入和離開會導(dǎo)致rebalance partition在各個consumer的分配。
    一個Brokers會充當(dāng)coordinator,其保存partition的分配,和這個組的member成員檩淋。
  2. Offset Management
    從配置文件中讀取offset的起始位置(最早或者最晚),提交offset有自動模式和手動模式。自動模式會每隔一段時間自動提交一次

配置

  1. Core Configuration
    總是把bootstrap.servers設(shè)置一個client.id
  2. Group Configuration
  • 設(shè)置group.id
  • session.timeout.ms,正常是30s蟀悦,如果程序中使用consumer和處理在同一個線程媚朦,建議提升這個值,避免rebalance過快日戈。唯一的缺點(diǎn)是探測consumer失敗的時間過長導(dǎo)致某些partition消費(fèi)慢询张,但是通常情況下一個consumer退出會立刻通知coordinator
  • heartbeat.interval.ms,提升他以減少rebalance
  1. Offset Management

    • enable.auto.commit
    • auto.commit.interval.ms
    • auto.offset.reset(earliest/latest)

管理

  • list Groups

    bin/kafka-consumer-groups --bootstrap-server host:9092 --new-consumer --list
    
  • Describe Group
    bin/kafka-consumer-groups --bootstrap-server host:9092 --new-consumer --describe --group foo

例子

Basic Poll Loop

public abstract class BasicConsumeLoop implements Runnable {
  private final KafkaConsumer<K, V> consumer;
  private final List<String> topics;
  private final AtomicBoolean shutdown;
  private final CountDownLatch shutdownLatch;
  public BasicConsumeLoop(Properties config, List<String> topics) {
    this.consumer = new KafkaConsumer<>(config);
    this.topics = topics; this.shutdown = new AtomicBoolean(false);
  this.shutdownLatch = new CountDownLatch(1); }
  public abstract void process(ConsumerRecord<K, V> record);
  public void run() {
    try { 
      consumer.subscribe(topics);
      while (!shutdown.get()) {
        // 這里 這里可以采用wakeup模式,此處
        // ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
        // 可以在另一個線程consumer.wakeup();
        ConsumerRecords<K, V> records = consumer.poll(500);
        records.forEach(record -> process(record));
      }
      } finally {
        consumer.close();
        shutdownLatch.countDown();
      }
  }
  public void shutdown() throws InterruptedException {
    shutdown.set(true);
    // 確定consumer.close關(guān)閉才返回
    shutdownLatch.await();
  }
}

提交offset

autoCommitOffset的問題是如果重啟可能會有數(shù)據(jù)重復(fù)處理的問題浙炼,可以通過減少commit interval的方式來減少這種重復(fù)份氧。

同步提交最保險:

private void doCommitSync() {
  try {
    consumer.commitSync();
  } catch (WakeupException e) {
    // we're shutting down, but finish the commit first and then
    // rethrow the exception so that the main loop can exit
    doCommitSync();
    throw e;
  } catch (CommitFailedException e) {
    // the commit failed with an unrecoverable error. if there is any
    // internal state which depended on the commit, you can clean it
    // up here. otherwise it's reasonable to ignore the error and go on
    log.debug("Commit failed", e);
  }
}

public void run() {
  try {
    consumer.subscribe(topics);

    while (true) {
      ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
      records.forEach(record -> process(record));
      doCommitSync();
    }
  } catch (WakeupException e) {
    // ignore, we're closing
  } catch (Exception e) {
    log.error("Unexpected error", e);
    } finally {
      consumer.close();
      shutdownLatch.countDown();
    }
}

如果group已經(jīng)被rabalanced,此時commit會失敗弯屈,拋出CommitFailedException蜗帜。在處理event的時候可能sessionTimeout,有兩種方法:

  1. 調(diào)整session.timeout.ms足夠大资厉,調(diào)整max.partition.fetch.bytes減少一次batch的返回事件數(shù)厅缺。
  2. 把事件處理放到另一個線程里面做。比如把event放到一個BlockingQueue里宴偿,
    但是這有個問題湘捎,就是heartbeat request要在兩個poll()調(diào)用之間處理,如果在offer處理中block時間很長窄刘,會導(dǎo)致該節(jié)點(diǎn)被踢出去窥妇。
    參考:https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design

推薦1)

Delivery Guarantees####

  • at least once:
    auto-commit
  • at most once:
private boolean doCommitSync() {
  try {
      consumer.commitSync();
      return true;
  } catch (CommitFailedException e) {
    // the commit failed with an unrecoverable error. if there is any
    // internal state which depended on the commit, you can clean it
    // up here. otherwise it's reasonable to ignore the error and go on
    log.debug("Commit failed", e);
    return false;
  }
}

public void run() {
  try {
    consumer.subscribe(topics);

    while (true) {
      ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
      if (doCommitSync())
        records.forEach(record -> process(record));
    }
  } catch (WakeupException e) {
    // ignore, we're closing
  } catch (Exception e) {
    log.error("Unexpected error", e);
    } finally {
      consumer.close();
      shutdownLatch.countDown();
    }
}
  • Exactly-once delivery
    不支持

異步offset提交

異步提交可以提升吞吐,但是會有風(fēng)險:如果commit失敗不會retry娩践。
在callback中自行記錄失敗的offset

public void run() {
  try {
    consumer.subscribe(topics);

    while (true) {
      ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
      records.forEach(record -> process(record));
      consumer.commitAsync(new OffsetCommitCallback() {
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
          if (e != null)
            log.debug("Commit failed for offsets {}", offsets, e);
        }
      });
    }
  } catch (WakeupException e) {
    // ignore, we're closing
  } catch (Exception e) {
    log.error("Unexpected error", e);
  } finally {
    consumer.close();
    shutdownLatch.countDown();
  }
}

offsetcommit 失敗通常不會引起太大的問題活翩,因為并沒有重復(fù)讀原數(shù)據(jù),一種比較常見的方式是在poll中異步提交翻伺,同時在rebalance和shutdown時同步提交:

private void doCommitSync() {
  try {
    consumer.commitSync();
  } catch (WakeupException e) {
    // we're shutting down, but finish the commit first and then
    // rethrow the exception so that the main loop can exit
    doCommitSync();
    throw e;
  } catch (CommitFailedException e) {
    // the commit failed with an unrecoverable error. if there is any
    // internal state which depended on the commit, you can clean it
    // up here. otherwise it's reasonable to ignore the error and go on
    log.debug("Commit failed", e);
  }
}

public void run() {
  try {
    consumer.subscribe(topics, new ConsumerRebalanceListener() {
      public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        doCommitSync();
      }

      public void onPartitionsAssigned(Collection<TopicPartition> partitions) {}
    });

    while (true) {
      ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
      records.forEach(record -> process(record));
      consumer.commitAsync();
    }
  } catch (WakeupException e) {
    // ignore, we're closing
  } catch (Exception e) {
    log.error("Unexpected error", e);
  } finally {
    try {
      doCommitSync();
    } finally {
      consumer.close();
      shutdownLatch.countDown();
    }
  }
}

異步提交只能處理at least once 這種情況纱新,對于at most once這種情況,由于不能在確認(rèn)是否commit成功再consumer數(shù)理數(shù)據(jù)穆趴,除非我們有unread語意!

多線程

Multi-threaded Processing
The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application making the call. It is the responsibility of the user to ensure that multi-threaded access is properly synchronized. Un-synchronized access will result in ConcurrentModificationException
.The only exception to this rule is wakeup()
, which can safely be used from an external thread to interrupt an active operation. In this case, a WakeupException
will be thrown from the thread blocking on the operation. This can be used to shutdown the consumer from another thread. The following snippet shows the typical pattern:

public class KafkaConsumerRunner implements Runnable {
     private final AtomicBoolean closed = new AtomicBoolean(false);
     private final KafkaConsumer consumer;

     public void run() {
         try {
             consumer.subscribe(Arrays.asList("topic"));
             while (!closed.get()) {
                 ConsumerRecords records = consumer.poll(10000);
                 // Handle new records
             }
         } catch (WakeupException e) {
             // Ignore exception if closing
             if (!closed.get()) throw e;
         } finally {
             consumer.close();
         }
     }

     // Shutdown hook which can be called from a separate thread
     public void shutdown() {
         closed.set(true);
         consumer.wakeup();
     }
 }

Then in a separate thread, the consumer can be shutdown by setting the closed flag and waking up the consumer.

 closed.set(true);
 consumer.wakeup();

We have intentionally avoided implementing a particular threading model for processing. This leaves several options for implementing multi-threaded processing of records.

  1. One Consumer Per Thread
    A simple option is to give each thread its own consumer instance. Here are the pros and cons of this approach:PRO: It is the easiest to implement
    PRO: It is often the fastest as no inter-thread co-ordination is needed
    PRO: It makes in-order processing on a per-partition basis very easy to implement (each thread just processes messages in the order it receives them).
    CON: More consumers means more TCP connections to the cluster (one per thread). In general Kafka handles connections very efficiently so this is generally a small cost.
    CON: Multiple consumers means more requests being sent to the server and slightly less batching of data which can cause some drop in I/O throughput.
    CON: The number of total threads across all processes will be limited by the total number of partitions.

  2. Decouple Consumption and Processing
    Another alternative is to have one or more consumer threads that do all data consumption and hands off ConsumerRecords
    instances to a blocking queue consumed by a pool of processor threads that actually handle the record processing. This option likewise has pros and cons:

PRO: This option allows independently scaling the number of consumers and processors. This makes it possible to have a single consumer that feeds many processor threads, avoiding any limitation on partitions.
CON: Guaranteeing order across the processors requires particular care as the threads will execute independently an earlier chunk of data may actually be processed after a later chunk of data just due to the luck of thread execution timing. For processing that has no ordering requirements this is not a problem.
CON: Manually committing the position becomes harder as it requires that all threads co-ordinate to ensure that processing is complete for that partition.

There are many possible variations on this approach. For example each processor thread can have its own queue, and the consumer threads can hash into these queues using the TopicPartition to ensure in-order consumption and simplify commit.


http://docs.confluent.io/2.0.1/clients/consumer.html#asynchronous-commits

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末遇汞,一起剝皮案震驚了整個濱河市未妹,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌空入,老刑警劉巖络它,帶你破解...
    沈念sama閱讀 216,997評論 6 502
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異歪赢,居然都是意外死亡化戳,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,603評論 3 392
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來点楼,“玉大人扫尖,你說我怎么就攤上這事÷永” “怎么了换怖?”我有些...
    開封第一講書人閱讀 163,359評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長蟀瞧。 經(jīng)常有香客問我沉颂,道長,這世上最難降的妖魔是什么悦污? 我笑而不...
    開封第一講書人閱讀 58,309評論 1 292
  • 正文 為了忘掉前任铸屉,我火速辦了婚禮,結(jié)果婚禮上切端,老公的妹妹穿的比我還像新娘彻坛。我一直安慰自己,他們只是感情好帆赢,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,346評論 6 390
  • 文/花漫 我一把揭開白布小压。 她就那樣靜靜地躺著,像睡著了一般椰于。 火紅的嫁衣襯著肌膚如雪怠益。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,258評論 1 300
  • 那天瘾婿,我揣著相機(jī)與錄音蜻牢,去河邊找鬼。 笑死偏陪,一個胖子當(dāng)著我的面吹牛抢呆,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播笛谦,決...
    沈念sama閱讀 40,122評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼抱虐,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了饥脑?” 一聲冷哼從身側(cè)響起恳邀,我...
    開封第一講書人閱讀 38,970評論 0 275
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎灶轰,沒想到半個月后谣沸,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,403評論 1 313
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡笋颤,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,596評論 3 334
  • 正文 我和宋清朗相戀三年乳附,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,769評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡赋除,死狀恐怖阱缓,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情贤重,我是刑警寧澤茬祷,帶...
    沈念sama閱讀 35,464評論 5 344
  • 正文 年R本政府宣布,位于F島的核電站并蝗,受9級特大地震影響祭犯,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜滚停,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,075評論 3 327
  • 文/蒙蒙 一沃粗、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧键畴,春花似錦最盅、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,705評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至惹想,卻和暖如春问词,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背嘀粱。 一陣腳步聲響...
    開封第一講書人閱讀 32,848評論 1 269
  • 我被黑心中介騙來泰國打工激挪, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人锋叨。 一個月前我還...
    沈念sama閱讀 47,831評論 2 370
  • 正文 我出身青樓垄分,卻偏偏與公主長得像,于是被迫代替她去往敵國和親娃磺。 傳聞我的和親對象是個殘疾皇子薄湿,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,678評論 2 354

推薦閱讀更多精彩內(nèi)容