變化
- 支持ssl/sasl
- 支持group management protocol狸臣,允許consumer groups隨著broker數(shù)增大
- 更小的依賴酿雪,不需要依賴kafka core
概念
- Consumer group
一組消費(fèi)同一個Topic的Consumer的集合,每個consumer的加入和離開會導(dǎo)致rebalance partition在各個consumer的分配。
一個Brokers會充當(dāng)coordinator,其保存partition的分配,和這個組的member成員檩淋。 - Offset Management
從配置文件中讀取offset的起始位置(最早或者最晚),提交offset有自動模式和手動模式。自動模式會每隔一段時間自動提交一次
配置
- Core Configuration
總是把bootstrap.servers設(shè)置一個client.id - Group Configuration
- 設(shè)置group.id
- session.timeout.ms,正常是30s蟀悦,如果程序中使用consumer和處理在同一個線程媚朦,建議提升這個值,避免rebalance過快日戈。唯一的缺點(diǎn)是探測consumer失敗的時間過長導(dǎo)致某些partition消費(fèi)慢询张,但是通常情況下一個consumer退出會立刻通知coordinator
- heartbeat.interval.ms,提升他以減少rebalance
-
Offset Management
- enable.auto.commit
- auto.commit.interval.ms
- auto.offset.reset(earliest/latest)
管理
-
list Groups
bin/kafka-consumer-groups --bootstrap-server host:9092 --new-consumer --list
Describe Group
bin/kafka-consumer-groups --bootstrap-server host:9092 --new-consumer --describe --group foo
例子
Basic Poll Loop
public abstract class BasicConsumeLoop implements Runnable {
private final KafkaConsumer<K, V> consumer;
private final List<String> topics;
private final AtomicBoolean shutdown;
private final CountDownLatch shutdownLatch;
public BasicConsumeLoop(Properties config, List<String> topics) {
this.consumer = new KafkaConsumer<>(config);
this.topics = topics; this.shutdown = new AtomicBoolean(false);
this.shutdownLatch = new CountDownLatch(1); }
public abstract void process(ConsumerRecord<K, V> record);
public void run() {
try {
consumer.subscribe(topics);
while (!shutdown.get()) {
// 這里 這里可以采用wakeup模式,此處
// ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
// 可以在另一個線程consumer.wakeup();
ConsumerRecords<K, V> records = consumer.poll(500);
records.forEach(record -> process(record));
}
} finally {
consumer.close();
shutdownLatch.countDown();
}
}
public void shutdown() throws InterruptedException {
shutdown.set(true);
// 確定consumer.close關(guān)閉才返回
shutdownLatch.await();
}
}
提交offset
autoCommitOffset的問題是如果重啟可能會有數(shù)據(jù)重復(fù)處理的問題浙炼,可以通過減少commit interval的方式來減少這種重復(fù)份氧。
同步提交最保險:
private void doCommitSync() {
try {
consumer.commitSync();
} catch (WakeupException e) {
// we're shutting down, but finish the commit first and then
// rethrow the exception so that the main loop can exit
doCommitSync();
throw e;
} catch (CommitFailedException e) {
// the commit failed with an unrecoverable error. if there is any
// internal state which depended on the commit, you can clean it
// up here. otherwise it's reasonable to ignore the error and go on
log.debug("Commit failed", e);
}
}
public void run() {
try {
consumer.subscribe(topics);
while (true) {
ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
records.forEach(record -> process(record));
doCommitSync();
}
} catch (WakeupException e) {
// ignore, we're closing
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
consumer.close();
shutdownLatch.countDown();
}
}
如果group已經(jīng)被rabalanced,此時commit會失敗弯屈,拋出CommitFailedException蜗帜。在處理event的時候可能sessionTimeout,有兩種方法:
- 調(diào)整session.timeout.ms足夠大资厉,調(diào)整max.partition.fetch.bytes減少一次batch的返回事件數(shù)厅缺。
- 把事件處理放到另一個線程里面做。比如把event放到一個BlockingQueue里宴偿,
但是這有個問題湘捎,就是heartbeat request要在兩個poll()調(diào)用之間處理,如果在offer處理中block時間很長窄刘,會導(dǎo)致該節(jié)點(diǎn)被踢出去窥妇。
參考:https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
推薦1)
Delivery Guarantees####
- at least once:
auto-commit - at most once:
private boolean doCommitSync() {
try {
consumer.commitSync();
return true;
} catch (CommitFailedException e) {
// the commit failed with an unrecoverable error. if there is any
// internal state which depended on the commit, you can clean it
// up here. otherwise it's reasonable to ignore the error and go on
log.debug("Commit failed", e);
return false;
}
}
public void run() {
try {
consumer.subscribe(topics);
while (true) {
ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
if (doCommitSync())
records.forEach(record -> process(record));
}
} catch (WakeupException e) {
// ignore, we're closing
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
consumer.close();
shutdownLatch.countDown();
}
}
- Exactly-once delivery
不支持
異步offset提交
異步提交可以提升吞吐,但是會有風(fēng)險:如果commit失敗不會retry娩践。
在callback中自行記錄失敗的offset
public void run() {
try {
consumer.subscribe(topics);
while (true) {
ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
records.forEach(record -> process(record));
consumer.commitAsync(new OffsetCommitCallback() {
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (e != null)
log.debug("Commit failed for offsets {}", offsets, e);
}
});
}
} catch (WakeupException e) {
// ignore, we're closing
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
consumer.close();
shutdownLatch.countDown();
}
}
offsetcommit 失敗通常不會引起太大的問題活翩,因為并沒有重復(fù)讀原數(shù)據(jù),一種比較常見的方式是在poll中異步提交翻伺,同時在rebalance和shutdown時同步提交:
private void doCommitSync() {
try {
consumer.commitSync();
} catch (WakeupException e) {
// we're shutting down, but finish the commit first and then
// rethrow the exception so that the main loop can exit
doCommitSync();
throw e;
} catch (CommitFailedException e) {
// the commit failed with an unrecoverable error. if there is any
// internal state which depended on the commit, you can clean it
// up here. otherwise it's reasonable to ignore the error and go on
log.debug("Commit failed", e);
}
}
public void run() {
try {
consumer.subscribe(topics, new ConsumerRebalanceListener() {
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
doCommitSync();
}
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {}
});
while (true) {
ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
records.forEach(record -> process(record));
consumer.commitAsync();
}
} catch (WakeupException e) {
// ignore, we're closing
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
try {
doCommitSync();
} finally {
consumer.close();
shutdownLatch.countDown();
}
}
}
異步提交只能處理at least once 這種情況纱新,對于at most once這種情況,由于不能在確認(rèn)是否commit成功再consumer數(shù)理數(shù)據(jù)穆趴,除非我們有unread語意!
多線程
Multi-threaded Processing
The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application making the call. It is the responsibility of the user to ensure that multi-threaded access is properly synchronized. Un-synchronized access will result in ConcurrentModificationException
.The only exception to this rule is wakeup()
, which can safely be used from an external thread to interrupt an active operation. In this case, a WakeupException
will be thrown from the thread blocking on the operation. This can be used to shutdown the consumer from another thread. The following snippet shows the typical pattern:
public class KafkaConsumerRunner implements Runnable {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final KafkaConsumer consumer;
public void run() {
try {
consumer.subscribe(Arrays.asList("topic"));
while (!closed.get()) {
ConsumerRecords records = consumer.poll(10000);
// Handle new records
}
} catch (WakeupException e) {
// Ignore exception if closing
if (!closed.get()) throw e;
} finally {
consumer.close();
}
}
// Shutdown hook which can be called from a separate thread
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
}
Then in a separate thread, the consumer can be shutdown by setting the closed flag and waking up the consumer.
closed.set(true);
consumer.wakeup();
We have intentionally avoided implementing a particular threading model for processing. This leaves several options for implementing multi-threaded processing of records.
One Consumer Per Thread
A simple option is to give each thread its own consumer instance. Here are the pros and cons of this approach:PRO: It is the easiest to implement
PRO: It is often the fastest as no inter-thread co-ordination is needed
PRO: It makes in-order processing on a per-partition basis very easy to implement (each thread just processes messages in the order it receives them).
CON: More consumers means more TCP connections to the cluster (one per thread). In general Kafka handles connections very efficiently so this is generally a small cost.
CON: Multiple consumers means more requests being sent to the server and slightly less batching of data which can cause some drop in I/O throughput.
CON: The number of total threads across all processes will be limited by the total number of partitions.Decouple Consumption and Processing
Another alternative is to have one or more consumer threads that do all data consumption and hands off ConsumerRecords
instances to a blocking queue consumed by a pool of processor threads that actually handle the record processing. This option likewise has pros and cons:
PRO: This option allows independently scaling the number of consumers and processors. This makes it possible to have a single consumer that feeds many processor threads, avoiding any limitation on partitions.
CON: Guaranteeing order across the processors requires particular care as the threads will execute independently an earlier chunk of data may actually be processed after a later chunk of data just due to the luck of thread execution timing. For processing that has no ordering requirements this is not a problem.
CON: Manually committing the position becomes harder as it requires that all threads co-ordinate to ensure that processing is complete for that partition.
There are many possible variations on this approach. For example each processor thread can have its own queue, and the consumer threads can hash into these queues using the TopicPartition to ensure in-order consumption and simplify commit.
http://docs.confluent.io/2.0.1/clients/consumer.html#asynchronous-commits