Kafka學(xué)習(xí)(三)-------- Kafka核心之Cosumer

了解了什么是kafka( https://www.cnblogs.com/tree1123/p/11226880.html)以后

學(xué)習(xí)核心api之消費(fèi)者奸柬,kafka的消費(fèi)者經(jīng)過(guò)幾次版本變化埂蕊,特別容易混亂篮绰,所以一定要搞清楚是哪個(gè)版本再研究魂拦。

一哺呜、舊版本consumer

只有舊版本(0.9以前)才有 high-level consumer 和 low-level consumer之分蚁吝,很多的文章提到的就是這兩個(gè):低階消費(fèi)者和高階消費(fèi)者宇立,低階消費(fèi)者更靈活但是需要自己維護(hù)很多東西诸蚕,高階就死板一點(diǎn)但是不需要維護(hù)太多東西步势。

high-level consumer就是消費(fèi)者組氧猬。

low-level consumer是單獨(dú)一個(gè)消費(fèi)者,單個(gè)consumer沒(méi)有什么消費(fèi)者組的概念坏瘩,與其他consumer相互之間不關(guān)聯(lián)盅抚。

1、low-level consumer

low-level consumer底層實(shí)現(xiàn)是

SimpleConsumer 他可以自行管理消費(fèi)者

Storm的Kafka插件 storm-kafka就是使用了SimpleConsumer

優(yōu)點(diǎn)是靈活 倔矾, 可以從任意位置拿消息 妄均。

如果需要: 重復(fù)讀取數(shù)據(jù) 只消費(fèi)部分分區(qū)數(shù)據(jù) 精確消費(fèi) 就得用這個(gè),

不過(guò)必須自己處理位移提交 尋找分區(qū)leader broker 處理leader變更哪自。

接口中的方法:
fetch
send  發(fā)送請(qǐng)求
getOffsetBefore
commitOffsets
fetchOffsets
earliestOrlatestOffset
close

使用步驟:

參照官網(wǎng)丰包,比較復(fù)雜需要好幾步來(lái)拉取消息。

Find an active Broker and find out which Broker is the leader for your topic and partition

找到活躍的broker 找到哪個(gè)broker是你的topic和partition的leader

Determine who the replica Brokers are for your topic and partition

查出replica 的brokers

Build the request defining what data you are interested in

建立請(qǐng)求

Fetch the data

拿數(shù)據(jù)

Identify and recover from leader changes

leader變化時(shí)恢復(fù)

也可以查詢一些offset等metadata信息壤巷,具體代碼如下邑彪。

//根據(jù)指定的分區(qū)從主題元數(shù)據(jù)中找到主副本
SimpleConsumer consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024,
                        "leaderLookup");
List<String> topics = Collections.singletonList(a_topic);
TopicMetadataRequest req = new TopicMetadataRequest(topics);                kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
List<TopicMetadata> metaData = resp.topicsMetadata();

String  leader = metaData.leader().host();

//獲取分區(qū)的offset等信息
//比如獲取lastoffset
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); 

Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();  

requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); 

kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);

OffsetResponse response = consumer.getOffsetsBefore(request);

long[] offsets = response.offsets(topic, partition);
long lastoffset = offsets[0];

這個(gè)api現(xiàn)在應(yīng)用不多,除非你有特殊需求胧华,比如要自己寫(xiě)監(jiān)控寄症,你可能需要更多的元數(shù)據(jù)信息。

2撑柔、high-level consumer

主要使用的類:ConsumerConnector

屏蔽了每個(gè)topic的每個(gè)Partition的offset的管理(自動(dòng)讀取zookeeper中該Consumer group的last offset)

Broker失敗轉(zhuǎn)移瘸爽,增減Partition Consumer時(shí)的負(fù)載均衡(當(dāng)Partiotion和Consumer增減時(shí),Kafka自動(dòng)負(fù)載均衡)

這些功能low-level consumer都需要自己實(shí)現(xiàn)的铅忿。

主要方法如下:
createMessageStreams
createMessageStreamsByFilter
commitOffsets
setconsumerReblanceListener
shutdown

group通過(guò)zookeeper完成核心功能剪决,

zookeeper目錄結(jié)構(gòu)如下:

/consumers/groupId/ids/consumre.id

記錄該consumer的訂閱信息,還被用來(lái)監(jiān)聽(tīng)consumer存活狀態(tài)檀训。這是一個(gè)臨時(shí)節(jié)點(diǎn)柑潦,會(huì)話失效將會(huì)自動(dòng)刪除。

/consumers/groupId/owners/topic/partition

保存consumer各個(gè)消費(fèi)線程的id峻凫,執(zhí)行rebalance時(shí)保存渗鬼。

/consumers/groupId/offsets/topic/partition

保存該group消費(fèi)指定分區(qū)的位移信息。

這個(gè)consumer支持多線程設(shè)計(jì)荧琼,只創(chuàng)建一個(gè)consumer實(shí)例譬胎,但如果是多個(gè)分區(qū),將會(huì)自動(dòng)創(chuàng)建多個(gè)線程消費(fèi)命锄。

使用步驟:

   Properties properties = new Properties();  
   properties.put("zookeeper.connect", "ip1:2181,ip2:2181,ip3:2181");//聲明zk  
   properties.put("group.id", "group03");
   ConsumerConnector  consumer =  Consumer.createJavaConsumerConnector(new ConsumerConfig(properties)); 
   
   Map<String, Integer> topicCountMap = new HashMap<String, Integer>();  
   topicCountMap.put(topic, 1); // 一次從主題中獲取一個(gè)數(shù)據(jù)  
   Map<String, List<KafkaStream<byte[], byte[]>>>  messageStreams = consumer.createMessageStreams(topicCountMap);  
   KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);// 獲取每次接收到的這個(gè)數(shù)據(jù)  如果是多線程在這里處理多分區(qū)的情況
   ConsumerIterator<byte[], byte[]> iterator =  stream.iterator();  
   while(iterator.hasNext()){  
        String message = new String(iterator.next().message());  
        System.out.println("接收到: " + message);  
   }  

//auto.offset.reset 默認(rèn)值為largest
//從頭消費(fèi) properties.put("auto.offset.reset", "smallest"); 

很簡(jiǎn)單堰乔,我們0.9版本之前使用的很多都是他,集成spring的方法等等脐恩。不過(guò)0.9版本以后新的consumer出現(xiàn)了镐侯。

二、新版本consumer

先說(shuō)一下版本的問(wèn)題:

Kafka 0.10.0.0之后 增加了 Kafka Streams 所以Kafka1.0開(kāi)始Streams 就穩(wěn)定了驶冒。

kafka security 0.9.0.0以后 0.10.0.1之后穩(wěn)定

0.10.1.0之后 新版本consumer穩(wěn)定

storm有兩個(gè)連kafka的包:

storm-kafka 使用了舊版本的consumer

storm-kafka-client 使用了新版本consumer

kafka 0.9.0.0廢棄了舊版producer和consumer 舊版時(shí)scala版 新版用java開(kāi)發(fā)

版本 推薦producer 推薦consumer 原因
0.8.2.2 舊版 舊版 新producer尚不穩(wěn)定
0.9.0.x 新版 舊版 新producer穩(wěn)定
0.10.0.x 新版 舊版 新consumer不穩(wěn)定
0.10.1.0 新版 新版 新consumer穩(wěn)定
0.10.2.x 新版 新版 都穩(wěn)定了

舊版本中offset管理依托zookeeper苟翻,新版本中不在依靠zookeeper韵卤。

?

語(yǔ)言 包名 主要使用類
舊版本 scala kafka.consumer.* ZookeeperConsumerConnector SimpleConsumer
新版本 java org.apache.kafka.clients.consumer.* KafkaConsumer

?

新版本的幾個(gè)核心概念:

consumer group

消費(fèi)者使用一個(gè)消費(fèi)者組名(group.id)來(lái)標(biāo)記自己,topic的每條消息都只會(huì)發(fā)送到每個(gè)訂閱他的消費(fèi)者組的一個(gè)消費(fèi)者實(shí)例上崇猫。

1沈条、一個(gè)消費(fèi)者組有若干個(gè)消費(fèi)者。

2邓尤、對(duì)于同一個(gè)group拍鲤,topic的每條消息只能被發(fā)送到group下的一個(gè)consumer實(shí)例上。

3汞扎、topic消息可以被發(fā)送到多個(gè)group中季稳。

consumer端offset

記錄每一個(gè)consumer消費(fèi)的分區(qū)的位置

kafka沒(méi)有把這個(gè)放在服務(wù)器端,保存在了consumer group中澈魄,并定期持久化景鼠。

舊版本會(huì)把這個(gè)offset定期存在zookeeper中:路徑是 /consumers/groupid/offsets/topic/partitionid

新版本將offset放在了一個(gè)內(nèi)部topic中:__consumer_offsets(前面兩個(gè)下劃線) 里面有50個(gè)分區(qū)

所以新版本的consumer就不需要連zookeeper了。

舊版本設(shè)置offsets.storage=kafka設(shè)置位移提交到這痹扇,不常使用铛漓。

__consumer_offsets中的結(jié)構(gòu): key = group.id+topic+partition value=offset

consumer group reblance

單個(gè)consumer是沒(méi)有rebalance的。

他規(guī)定了一個(gè)consumer group下的所有consumer如何去分配所有的分區(qū)鲫构。

單線程示例代碼:
Properties props = new Properties();
        props.put("bootstrap.servers", "kafka01:9092浓恶,kafka02:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        
        props.put("auto.offset.reset","earliest");
        
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("foo", "bar"));
      try{  
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
         }
        }finally{
          consumer.close();
        }

很簡(jiǎn)單,1结笨、只需要配置kafka的server groupid autocommit 序列化 autooffsetreset(其中 bootstrap.server group.id key.deserializer value.deserializer 必須指定);

2包晰、用這些Properties構(gòu)建consumer對(duì)象(KafkaConsumer還有其他構(gòu)造,可以把序列化傳進(jìn)去);

3炕吸、subscribe訂閱topic列表(可以用正則訂閱Pattern.compile("kafka.*")

使用正則必須指定一個(gè)listener subscribe(Pattern pattern, ConsumerRebalanceListener listener)); 可以重寫(xiě)這個(gè)接口來(lái)實(shí)現(xiàn) 分區(qū)變更時(shí)的邏輯伐憾。如果設(shè)置了enable.auto.commit = true 就不用理會(huì)這個(gè)邏輯。

4赫模、然后循環(huán)poll消息(這里的1000是超時(shí)設(shè)定树肃,如果沒(méi)有很多數(shù)據(jù),也就等一秒);

5瀑罗、處理消息(打印了offset key value 這里寫(xiě)處理邏輯)胸嘴。

6、關(guān)閉KafkaConsumer(可以傳一個(gè)timeout值 等待秒數(shù) 默認(rèn)是30)斩祭。

Properties詳解:

bootstrap.server(最好用主機(jī)名不用ip kafka內(nèi)部用的主機(jī)名 除非自己配置了ip)

deserializer 反序列化consumer從broker端獲取的是字節(jié)數(shù)組劣像,還原回對(duì)象類型。

默認(rèn)有十幾種:StringDeserializer LongDeserializer DoubleDeserializer停忿。驾讲。

也可以自定義:定義serializer格式 創(chuàng)建自定義deserializer類實(shí)現(xiàn)Deserializer 接口 重寫(xiě)邏輯

?

除了四個(gè)必傳的 bootstrap.server group.id key.deserializer value.deserializer

還有session.timeout.ms "coordinator檢測(cè)失敗的時(shí)間"

是檢測(cè)consumer掛掉的時(shí)間 為了可以及時(shí)的rebalance 默認(rèn)是10秒 可以設(shè)置更小的值避免消息延遲蚊伞。

max.poll.interval.ms "consumer處理邏輯最大時(shí)間"

處理邏輯比較復(fù)雜的時(shí)候 可以設(shè)置這個(gè)值 避免造成不必要的 rebalance 席赂,因?yàn)閮纱蝡oll時(shí)間超過(guò)了這個(gè)參數(shù)吮铭,kafka認(rèn)為這個(gè)consumer已經(jīng)跟不上了,會(huì)踢出組颅停,而且不能提交offset谓晌,就會(huì)重復(fù)消費(fèi)。默認(rèn)是5分鐘癞揉。

auto.offset.reset "無(wú)位移或者位移越界時(shí)kafka的應(yīng)對(duì)策略"

所以如果啟動(dòng)了一個(gè)group從頭消費(fèi) 成功提交位移后 重啟后還是接著消費(fèi) 這個(gè)參數(shù)無(wú)效

所以3個(gè)值的解釋是:

earliset 當(dāng)各分區(qū)下有已提交的offset時(shí)纸肉,從提交的offset開(kāi)始消費(fèi);無(wú)提交的offset時(shí)喊熟,從最早的位移消費(fèi)

latest 當(dāng)各分區(qū)下有已提交的offset時(shí)柏肪,從提交的offset開(kāi)始消費(fèi);無(wú)提交的offset時(shí)芥牌,消費(fèi)新產(chǎn)生的該分區(qū)下的數(shù)據(jù) none topic各分區(qū)都存在已提交的offset時(shí)烦味,從offset后開(kāi)始消費(fèi);只要有一個(gè)分區(qū)不存在已提交的offset壁拉,則拋出異常

(注意kafka-0.10.1.X版本之前: auto.offset.reset 的值為smallest,和,largest.(offest保存在zk中) 谬俄、

我們這是說(shuō)的是新版本:kafka-0.10.1.X版本之后: auto.offset.reset 的值更改為:earliest,latest,和none (offest保存在kafka的一個(gè)特殊的topic名為:__consumer_offsets里面))

enable.auto.commit 是否自動(dòng)提交位移

true 自動(dòng)提交 false需要用戶手動(dòng)提交 有只處理一次需要的 最近設(shè)置為false自己控制。

fetch.max.bytes consumer單次獲取最大字節(jié)數(shù)

max.poll.records 單次poll返回的最大消息數(shù)

默認(rèn)500條 如果消費(fèi)很輕量 可以適當(dāng)提高這個(gè)值 增加消費(fèi)速度弃理。

hearbeat.interval.ms consumer其他組員感知rabalance的時(shí)間

該值必須小于 session.timeout.ms 如果檢測(cè)到 consumer掛掉 也就根本無(wú)法感知rabalance了

connections.max.idle.ms 定期關(guān)閉連接的時(shí)間

默認(rèn)是9分鐘 可以設(shè)置為-1 永不關(guān)閉

poll方法詳解:

(舊版本:多分區(qū)多線程 新版本:一個(gè)線程管理多個(gè)socket連接)

但新版本KafkaConsumer是雙線程的溃论,主線程負(fù)責(zé):消息獲取,rebalance痘昌,coordinator钥勋,位移提交等等,

另一個(gè)是后臺(tái)心跳線程控汉。

根據(jù)上邊的各種配置析藕,poll方法會(huì)找到offset,當(dāng)獲取了足夠多的可用數(shù)據(jù)漾狼,或者等待時(shí)間超過(guò)了指定的超時(shí)時(shí)間肠阱,就會(huì)返回。

java consumer不是線程安全的街佑,同一個(gè)KafkaConsumer用在了多個(gè)線程中谢翎,將會(huì)報(bào)Kafka Consumer is not safe for multi-threaded assess異常°逯迹可以加一個(gè)同步鎖進(jìn)行保護(hù)森逮。

poll的超時(shí)參數(shù),已經(jīng)說(shuō)過(guò)1000的話是超時(shí)設(shè)定磁携,如果沒(méi)有很多數(shù)據(jù)褒侧,也就等一秒,就返回了,比如定時(shí)5秒的將消息寫(xiě)入闷供,就可以將超時(shí)參數(shù)設(shè)置為5000烟央,達(dá)到效率最大化。

如果沒(méi)有定時(shí)任務(wù)呢歪脏,那就設(shè)置為 Long.MAX_VALUE 未獲取足夠多的數(shù)據(jù)就無(wú)限等待疑俭。這里要捕獲一下WakeupException。

consumer offset詳解:

consumer需要定期向kafka提交自己的offset信息婿失。已經(jīng)學(xué)過(guò) 新版本將他提交到了一個(gè)topic中 __consumer_offsets钞艇。

offset有一個(gè)更大的作用是實(shí)現(xiàn)交付語(yǔ)義:

最多一次 at most once 可能丟失 不會(huì)重復(fù)

最少一次 at least once 可能重復(fù) 不會(huì)丟失

精確一次 exactly once 不丟失 不重復(fù) 就一次

若consumer在消費(fèi)之前提交位移 就實(shí)現(xiàn)了at most once

若是消費(fèi)后提交 就實(shí)現(xiàn)了 at least once 默認(rèn)是這個(gè)。

consumer的多個(gè)位置信息:

? 上次提交的位置 當(dāng)前位置 水位 日志最新位移

0 1 豪硅。哩照。 5 。懒浮。 10 葡秒。。 15

上次提交位置:consumer最近一次提交的offset值;

當(dāng)前位置:consumer上次poll 到了這個(gè)位置 但是還沒(méi)提交;

水位:這是分區(qū)日志的管理 consumer無(wú)法讀取水位以上的消息;

最新位移: 也是分區(qū)日志的管理 最大的位移值 一定不會(huì)比水位小嵌溢。

新版本的consumer會(huì)在broker選一個(gè)broker作為consumergroup的coordinator眯牧,用于實(shí)現(xiàn)組成員管理,消費(fèi)分配方案赖草,提交位移学少。如果consumer崩潰,他負(fù)責(zé)的分區(qū)就分配給其他consumer秧骑,如果沒(méi)有做好位移提交就可能重復(fù)消費(fèi)版确。

多次提交的情況,kafka只關(guān)注最新一次的提交乎折。

默認(rèn)consumer自動(dòng)提交位移 提交間隔為5秒 可以通過(guò) auto.commit.interval.ms 設(shè)置這個(gè)間隔绒疗。

自動(dòng)提交可以減少開(kāi)發(fā),但是可能重復(fù)消費(fèi)骂澄,所以需要精準(zhǔn)消費(fèi)時(shí)還是要手動(dòng)提交吓蘑。設(shè)置手動(dòng)提交 enable.auto.commit = false,然后調(diào)用 consumer.commitSync() 或者 consumer.commitAync() Sync為同步方式坟冲,阻塞 Aync為異步方式磨镶,不會(huì)阻塞。這兩個(gè)方法可以傳參健提,指定為哪個(gè)分區(qū)提交琳猫,這樣更合理一些。

(舊版本的自動(dòng)提交設(shè)置是 auto.commit.enable 默認(rèn)間隔為60秒)

rebalance詳解:

rebalance是consumer group如何分配topic的所有分區(qū)私痹。

正常情況脐嫂,比如有10個(gè)分區(qū)统刮,5個(gè)consumer 那么consumer group將為每個(gè)consumer 平均分配兩個(gè)分區(qū)。

每個(gè)分區(qū)只會(huì)分給一個(gè)consumer實(shí)例账千。有consumer出現(xiàn)問(wèn)題网沾,會(huì)重新執(zhí)行這個(gè)過(guò)程,這個(gè)過(guò)程就是rebalance蕊爵。

(舊版本通過(guò)zookeeper管理rebalance,新版本會(huì)選取某個(gè)broker為group coordinator來(lái)管理)

rebalance的觸發(fā)條件:

1桦山、有新的consumer加入攒射,或者有consumer離開(kāi)或者掛掉。

2恒水、group訂閱的topic發(fā)生變更会放,比如正則訂閱。

3钉凌、group訂閱的分區(qū)數(shù)發(fā)生變化咧最。

第一個(gè)經(jīng)常出現(xiàn),不一定是掛掉御雕,也可能是處理太慢矢沿,為了避免頻繁rebalance,要調(diào)整好request.timeout.ms max.poll.records和ma.poll.interval.

rebalance分區(qū)策略:

partition.assignment.strategy 設(shè)置 自定義分區(qū)策略-創(chuàng)建分區(qū)器 assignor

range策略(默認(rèn))酸纲,將分區(qū)劃分為分區(qū)段捣鲸,一次分配給每個(gè)consumer。

round-robin策略闽坡,輪詢分配栽惶。

sticky策略(0.11.0.0出現(xiàn),更優(yōu)秀)疾嗅,range策略在訂閱多個(gè)topic時(shí)會(huì)不均勻外厂。

sticky有兩個(gè)原則,當(dāng)兩者發(fā)生沖突時(shí)代承,第一個(gè)目標(biāo)優(yōu)先于第二個(gè)目標(biāo)汁蝶。

  1. 分區(qū)的分配要盡可能的均勻;
  2. 分區(qū)的分配盡可能的與上次分配的保持相同论悴。

rebalance generation分代機(jī)制保證rabalance時(shí)重復(fù)提交的問(wèn)題穿仪,延遲的offset提交時(shí)舊的generation信息會(huì)報(bào)異常ILLEGAL_GENERATION

rebalance過(guò)程:

1、確定coordinator所在的broker意荤,建立socket連接啊片。

確定算法: Math.abs(groupID.hashCode) % offsets.topic.num.partition 參數(shù)值(默認(rèn)50)

尋找__consumer_offset分區(qū)50的leader副本所在的broker,該broker即為這個(gè)group的coordinator

2玖像、加入組

所有consumer會(huì)向coordinator發(fā)送JoinGroup請(qǐng)求紫谷,收到所有請(qǐng)求后選一個(gè)consumer做leader(這個(gè)leader是consumer coordinator是broker)齐饮,coordinator把成員和訂閱信息發(fā)給coordinator。

3笤昨、同步分配方案

leader制定分配方案祖驱,通過(guò)SyncGroup請(qǐng)求發(fā)給coordinator,每個(gè)consumer也會(huì)發(fā)請(qǐng)求返回方案瞒窒。

kafka也支持offset不提交到__consumer_offset捺僻,可以自定義,這時(shí)候就需要實(shí)現(xiàn)一個(gè)監(jiān)聽(tīng)器ConsumerRebalanceListener崇裁,在這里重新處理Rebalance的邏輯匕坯。

多線程示例代碼:
這里要根據(jù)自身需求開(kāi)發(fā),我這里只舉一個(gè)簡(jiǎn)單的例子拔稳,就是幾個(gè)分區(qū)就啟動(dòng)幾個(gè)consumer葛峻,一一對(duì)應(yīng)。
三個(gè)類:
Main:
public static void main(String[] args) {
        
        String bootstrapServers = "kafka01:9092巴比,kafka02:9092"; 
        String groupId = "test";
        String topic = "testtopic";
        int consumerNum = 3;
        ConsumerGroup cg = new ConsumerGroup(consumerNum,bootstrapServers,groupId,topic);
        cg.execute();
}



import java.util.ArrayList;
import java.util.List;


public class ConsumerGroup {
    
    private List<ConsumerRunnable> consumers;
    
    public ConsumerGroup(int consumerNum,String bootstrapServers,String groupId,String topic){
        
        consumers = new ArrayList<>(consumerNum);
        
        for(int i=0;i < consumerNum;i++){
            ConsumerRunnable ConsumerRunnable = new ConsumerRunnable(bootstrapServers,groupId,topic);
            consumers.add(ConsumerRunnable);
        }
    }
    
    public void execute(){
        
        for(ConsumerRunnable consumerRunnable:consumers){
            new Thread(consumerRunnable).start();
        }
    }
}



import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class ConsumerRunnable implements Runnable{
    
    private final KafkaConsumer<String,String> consumer;
    
    public ConsumerRunnable(String bootstrapServers,String groupId,String topic){
        
        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrapServers);
        props.put("group.id", groupId);
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset","earliest");
        this.consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topic));
    }

    @Override
    public void run() {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(10);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }
    }
}
standalone consumer

有一些需求术奖,需要指定一個(gè)消費(fèi)者消費(fèi)某一個(gè)分區(qū)。彼此之間不干擾轻绞,一個(gè)standalone consumer崩潰不會(huì)影響其他采记。

類似舊版本的低階消費(fèi)者。

示例代碼如下:consumer.assign方法訂閱分區(qū)

public static void main(String[] args) {
        
        Properties props = new Properties();
        props.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        
        props.put("auto.offset.reset","earliest");
        
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        List<TopicPartition> partitions = new ArrayList<>();
        List<PartitionInfo> allpartitions = consumer.partitionsFor("testtopic");
        if(allpartitions!=null && !allpartitions.isEmpty()){
            for(PartitionInfo partitionInfo:allpartitions){
                partitions.add(new TopicPartition(partitionInfo.topic(),partitionInfo.partition()));
            }
            consumer.assign(partitions);
        }
        
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(10);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }
        
    }

以上為kafka消費(fèi)者的學(xué)習(xí)政勃,不同的具體細(xì)節(jié)還需要通過(guò)官網(wǎng)文檔仔細(xì)學(xué)習(xí)挺庞。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市稼病,隨后出現(xiàn)的幾起案子选侨,更是在濱河造成了極大的恐慌,老刑警劉巖然走,帶你破解...
    沈念sama閱讀 216,744評(píng)論 6 502
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件援制,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡芍瑞,警方通過(guò)查閱死者的電腦和手機(jī)晨仑,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,505評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)拆檬,“玉大人洪己,你說(shuō)我怎么就攤上這事【构幔” “怎么了答捕?”我有些...
    開(kāi)封第一講書(shū)人閱讀 163,105評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)屑那。 經(jīng)常有香客問(wèn)我拱镐,道長(zhǎng)艘款,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,242評(píng)論 1 292
  • 正文 為了忘掉前任沃琅,我火速辦了婚禮哗咆,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘益眉。我一直安慰自己晌柬,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,269評(píng)論 6 389
  • 文/花漫 我一把揭開(kāi)白布郭脂。 她就那樣靜靜地躺著年碘,像睡著了一般。 火紅的嫁衣襯著肌膚如雪朱庆。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,215評(píng)論 1 299
  • 那天闷祥,我揣著相機(jī)與錄音娱颊,去河邊找鬼。 笑死凯砍,一個(gè)胖子當(dāng)著我的面吹牛箱硕,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播悟衩,決...
    沈念sama閱讀 40,096評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼剧罩,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了座泳?” 一聲冷哼從身側(cè)響起惠昔,我...
    開(kāi)封第一講書(shū)人閱讀 38,939評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎挑势,沒(méi)想到半個(gè)月后镇防,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,354評(píng)論 1 311
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡潮饱,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,573評(píng)論 2 333
  • 正文 我和宋清朗相戀三年来氧,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片香拉。...
    茶點(diǎn)故事閱讀 39,745評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡啦扬,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出凫碌,到底是詐尸還是另有隱情扑毡,我是刑警寧澤,帶...
    沈念sama閱讀 35,448評(píng)論 5 344
  • 正文 年R本政府宣布盛险,位于F島的核電站僚楞,受9級(jí)特大地震影響勤晚,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜泉褐,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,048評(píng)論 3 327
  • 文/蒙蒙 一赐写、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧膜赃,春花似錦挺邀、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,683評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至疲眷,卻和暖如春禾蚕,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背狂丝。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,838評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工换淆, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人几颜。 一個(gè)月前我還...
    沈念sama閱讀 47,776評(píng)論 2 369
  • 正文 我出身青樓倍试,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親蛋哭。 傳聞我的和親對(duì)象是個(gè)殘疾皇子县习,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,652評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容

  • 大致可以通過(guò)上述情況進(jìn)行排除 1.kafka服務(wù)器問(wèn)題 查看日志是否有報(bào)錯(cuò),網(wǎng)絡(luò)訪問(wèn)問(wèn)題等谆趾。 2. kafka p...
    生活的探路者閱讀 7,587評(píng)論 0 10
  • Kafka的基本概念 BrokerKafka集群中包含多個(gè)服務(wù)器躁愿,其中每個(gè)服務(wù)器稱為一個(gè)broker。有一點(diǎn)需要注...
    frmark閱讀 372評(píng)論 0 0
  • Kafka系列一- Kafka背景及架構(gòu)介紹 Kafka簡(jiǎn)介 Kafka是一種分布式的沪蓬,基于發(fā)布/訂閱的消息系統(tǒng)攘已。...
    raincoffee閱讀 2,204評(píng)論 0 22
  • 一、consumer特點(diǎn) consumer不使用zk保存消費(fèi)位移(offset)怜跑,而是使用內(nèi)部的消息隊(duì)列样勃。因?yàn)楫?dāng)數(shù)...
    ands999閱讀 812評(píng)論 0 3
  • 追了那么久的街舞終于結(jié)束了,第一次這么認(rèn)真的追一檔綜藝性芬,第一次看到一期有224分鐘的綜藝節(jié)目峡眶,第一次了解街舞文化,...
    96613380fd6e閱讀 3,903評(píng)論 0 0