1. Apache Kafka是一個(gè)分布式流平臺(tái)
1.1 流平臺(tái)有三個(gè)關(guān)鍵功能:
發(fā)布和訂閱流記錄佑惠,類似于一個(gè)消息隊(duì)列或企業(yè)消息系統(tǒng)
以一種容錯(cuò)的持久方式存儲(chǔ)記錄流
在流記錄生成的時(shí)候就處理它們
1.2 Kafka通常用于兩大類應(yīng)用:
如果想學(xué)習(xí)Java工程化栈虚、高性能及分布式蒙揣、深入淺出。微服務(wù)昌犹、Spring忘晤,MyBatis,Netty源碼分析的朋友可以加我的Java高級交流:854630135,群里有阿里大牛直播講解技術(shù)惑惶,以及Java大型互聯(lián)網(wǎng)技術(shù)的視頻免費(fèi)分享給大家煮盼。
構(gòu)建實(shí)時(shí)流數(shù)據(jù)管道,在系統(tǒng)或應(yīng)用程序之間可靠地獲取數(shù)據(jù)
構(gòu)建對數(shù)據(jù)流進(jìn)行轉(zhuǎn)換或輸出的實(shí)時(shí)流媒體應(yīng)用程序
1.3 有幾個(gè)特別重要的概念:
Kafka is run as a cluster on one or more servers that can span multiple datacenters.
The Kafka cluster stores streams of records in categories called topics.
Each record consists of a key, a value, and a timestamp.
Kafka作為集群運(yùn)行在一個(gè)或多個(gè)可以跨多個(gè)數(shù)據(jù)中心的服務(wù)器上
從這句話表達(dá)了三個(gè)意思:
Kafka是以集群方式運(yùn)行的
集群中可以只有一臺(tái)服務(wù)器带污,也有可能有多臺(tái)服務(wù)器僵控。也就是說,一臺(tái)服務(wù)器也是一個(gè)集群鱼冀,多臺(tái)服務(wù)器也可以組成一個(gè)集群
這些服務(wù)器可以跨多個(gè)數(shù)據(jù)中心
Kafka集群按分類存儲(chǔ)流記錄报破,這個(gè)分類叫做主題
這句話表達(dá)了以下幾個(gè)信息:
流記錄是分類存儲(chǔ)的,也就說記錄是歸類的
我們稱這種分類為主題
簡單地來講千绪,記錄是按主題劃分歸類存儲(chǔ)的
每個(gè)記錄由一個(gè)鍵充易、一個(gè)值和一個(gè)時(shí)間戳組成
1.4 Kafka有四個(gè)核心API:
Producer API?:允許應(yīng)用發(fā)布一條流記錄到一個(gè)或多個(gè)主題
Consumer API?:允許應(yīng)用訂閱一個(gè)或多個(gè)主題,并處理流記錄
Streams API?:允許應(yīng)用作為一個(gè)流處理器荸型,從一個(gè)或多個(gè)主題那里消費(fèi)輸入流盹靴,并將輸出流輸出到一個(gè)或多個(gè)輸出主題,從而有效地講輸入流轉(zhuǎn)換為輸出流
Connector API?:允許將主題連接到已經(jīng)存在的應(yīng)用或者數(shù)據(jù)系統(tǒng)瑞妇,以構(gòu)建并允許可重用的生產(chǎn)者或消費(fèi)者稿静。例如,一個(gè)關(guān)系型數(shù)據(jù)庫的連接器可能捕獲到一張表的每一次變更
(畫外音:我理解這四個(gè)核心API其實(shí)就是:發(fā)布辕狰、訂閱自赔、轉(zhuǎn)換處理、從第三方采集數(shù)據(jù)柳琢。)
在Kafka中,客戶端和服務(wù)器之間的通信是使用簡單的润脸、高性能的柬脸、與語言無關(guān)的TCP協(xié)議完成的。
2. Topics and Logs(主題和日志)
一個(gè)topic是一個(gè)分類毙驯,或者說是記錄被發(fā)布的時(shí)候的一個(gè)名字(畫外音:可以理解為記錄要被發(fā)到哪兒去)倒堕。
在Kafka中,topic總是有多個(gè)訂閱者爆价,因此垦巴,一個(gè)topic可能有0個(gè),1個(gè)或多個(gè)訂閱該數(shù)據(jù)的消費(fèi)者铭段。
對于每個(gè)主題骤宣,Kafka集群維護(hù)一個(gè)分區(qū)日志,如下圖所示:
每個(gè)分區(qū)都是一個(gè)有序的序愚、不可變的記錄序列憔披,而且記錄會(huì)不斷的被追加,一條記錄就是一個(gè)結(jié)構(gòu)化的提交日志(a structured commit log)。
分區(qū)中的每條記錄都被分配了一個(gè)連續(xù)的id號芬膝,這個(gè)id號被叫做offset(偏移量)望门,這個(gè)偏移量唯一的標(biāo)識出分區(qū)中的每條記錄。(PS:如果把分區(qū)比作數(shù)據(jù)庫表的話锰霜,那么偏移量就是主鍵)
Kafka集群持久化所有已發(fā)布的記錄筹误,無論它們有沒有被消費(fèi),記錄被保留的時(shí)間是可以配置的癣缅。例如厨剪,如果保留策略被設(shè)置為兩天,那么在記錄發(fā)布后的兩天內(nèi)所灸,可以使用它丽惶,之后將其丟棄以釋放空間。在對數(shù)據(jù)大小方面爬立,Kafka的性能是高效的钾唬,恒定常量級的,因此長時(shí)間存儲(chǔ)數(shù)據(jù)不是問題侠驯。
事實(shí)上抡秆,唯一維護(hù)在每個(gè)消費(fèi)者上的元數(shù)據(jù)是消費(fèi)者在日志中的位置或者叫偏移量。偏移量是由消費(fèi)者控制的:通常消費(fèi)者在讀取記錄的時(shí)候會(huì)線性的增加它的偏移量吟策,但是儒士,事實(shí)上,由于位置(偏移量)是由消費(fèi)者控制的檩坚,所有它可以按任意它喜歡的順序消費(fèi)記錄着撩。例如:一個(gè)消費(fèi)者可以重置到一個(gè)較舊的偏移量來重新處理之前已經(jīng)處理過的數(shù)據(jù),或者跳轉(zhuǎn)到最近的記錄并從“現(xiàn)在”開始消費(fèi)匾委。
這種特性意味著消費(fèi)者非常廉價(jià)————他們可以來來去去的消息而不會(huì)對集群或者其它消費(fèi)者造成太大影響拖叙。
日志中的分區(qū)有幾個(gè)用途。首先赂乐,它們允許日志的規(guī)模超出單個(gè)服務(wù)器的大小薯鳍。每個(gè)獨(dú)立分區(qū)都必須與宿主的服務(wù)器相匹配唤衫,但一個(gè)主題可能有多個(gè)分區(qū)是辕,所以它可以處理任意數(shù)量的數(shù)據(jù)竞帽。第二忍抽,它們作為并行的單位——稍后再進(jìn)一步拭嫁。
(
畫外音:簡單地來說银萍,日志分區(qū)的作用有兩個(gè):一络凿、日志的規(guī)模不再受限于單個(gè)服務(wù)器憨攒;二觉既、分區(qū)意味著可以并行砸民。
什么意思呢?主題建立在集群之上,每個(gè)主題維護(hù)了一個(gè)分區(qū)日志岭参,顧名思義反惕,日志是分區(qū)的;每個(gè)分區(qū)所在的服務(wù)器的資源(比如:CPU演侯、內(nèi)存姿染、帶寬、磁盤等)是有限的秒际,如果不分區(qū)(可以理解為等同于只有一個(gè))的話悬赏,必然受限于這個(gè)分區(qū)所在的服務(wù)器,那么多個(gè)分區(qū)的話就不一樣了娄徊,就突破了這種限制闽颇,服務(wù)器可以隨便加,分區(qū)也可以隨便加寄锐。
)
3. Distribution(分布)
日志的分區(qū)分布在集群中的服務(wù)器上兵多,每個(gè)服務(wù)器處理數(shù)據(jù),并且分區(qū)請求是共享的橄仆。每個(gè)分區(qū)被復(fù)制到多個(gè)服務(wù)器上以實(shí)現(xiàn)容錯(cuò)剩膘,到底復(fù)制到多少個(gè)服務(wù)器上是可以配置的。
Each partition is replicated across a configurable number of servers for fault tolerance.
每個(gè)分區(qū)都有一個(gè)服務(wù)器充當(dāng)“leader”角色盆顾,并且有0個(gè)或者多個(gè)服務(wù)器作為“followers”怠褐。leader處理對這個(gè)分區(qū)的所有讀和寫請求,而followers被動(dòng)的從leader那里復(fù)制數(shù)據(jù)您宪。如果leader失敗奈懒,followers中的其中一個(gè)會(huì)自動(dòng)變成新的leader。每個(gè)服務(wù)器充當(dāng)一些分區(qū)的“leader”的同時(shí)也是其它分區(qū)的“follower”宪巨,因此在整個(gè)集群中負(fù)載是均衡的筐赔。
也就是說,每個(gè)服務(wù)器既是“leader”也是“follower”揖铜。我們知道一個(gè)主題可能有多個(gè)分區(qū),一個(gè)分區(qū)可能在一個(gè)服務(wù)器上也可能跨多個(gè)服務(wù)器达皿,然而這并不以為著一臺(tái)服務(wù)器上只有一個(gè)分區(qū)天吓,是可能有多個(gè)分區(qū)的。每個(gè)分區(qū)中有一個(gè)服務(wù)器充當(dāng)“leader”峦椰,其余是“follower”龄寞。leader負(fù)責(zé)處理這個(gè)它作為leader所負(fù)責(zé)的分區(qū)的所有讀寫請求,而該分區(qū)中的follow只是被動(dòng)復(fù)制leader的數(shù)據(jù)汤功。這個(gè)有點(diǎn)兒像HDFS中的副本機(jī)制物邑。例如:分區(qū)-1有服務(wù)器A和B組成,A是leader,B是follower色解,有請求要往分區(qū)-1中寫數(shù)據(jù)的時(shí)候就由A處理茂嗓,然后A把剛才寫的數(shù)據(jù)同步給B,這樣的話正常請求相當(dāng)于A和B的數(shù)據(jù)是一樣的科阎,都有分區(qū)-1的全部數(shù)據(jù)述吸,如果A宕機(jī)了,B成為leader锣笨,接替A繼續(xù)處理對分區(qū)-1的讀寫請求蝌矛。
需要注意的是,分區(qū)是一個(gè)虛擬的概念错英,是一個(gè)邏輯單元入撒。
4. Producers(生產(chǎn)者)
如果想學(xué)習(xí)Java工程化、高性能及分布式椭岩、深入淺出茅逮。微服務(wù)、Spring簿煌,MyBatis氮唯,Netty源碼分析的朋友可以加我的Java高級交流:854630135,群里有阿里大牛直播講解技術(shù)姨伟,以及Java大型互聯(lián)網(wǎng)技術(shù)的視頻免費(fèi)分享給大家惩琉。
生產(chǎn)者發(fā)布數(shù)據(jù)到它們選擇的主題中。生產(chǎn)者負(fù)責(zé)選擇將記錄投遞到哪個(gè)主題的哪個(gè)分區(qū)中夺荒。要做這件事情瞒渠,可以簡單地用循環(huán)方式以到達(dá)負(fù)載均衡,或者根據(jù)一些語義分區(qū)函數(shù)(比如:基于記錄中的某些key)
5. Consumers(消費(fèi)者)
消費(fèi)者用一個(gè)消費(fèi)者組名來標(biāo)識它們自己(PS:相當(dāng)于給自己貼一個(gè)標(biāo)簽技扼,標(biāo)簽的名字是組名伍玖,以表明自己屬于哪個(gè)組),并且每一條發(fā)布到主題中的記錄只會(huì)投遞給每個(gè)訂閱的消費(fèi)者組中的其中一個(gè)消費(fèi)者實(shí)例剿吻。消費(fèi)者實(shí)例可能是單獨(dú)的進(jìn)程或者在單獨(dú)的機(jī)器上窍箍。
如果所有的消費(fèi)者實(shí)例都使用相同的消費(fèi)者組,那么記錄將會(huì)在這些消費(fèi)者之間有效的負(fù)載均衡丽旅。
如果所有的消費(fèi)者實(shí)例都使用不同的消費(fèi)者組椰棘,那么每條記錄將會(huì)廣播給所有的消費(fèi)者進(jìn)程。
上圖中其實(shí)那個(gè)Kafka Cluster換成Topic會(huì)更準(zhǔn)確一些
一個(gè)Kafka集群有2個(gè)服務(wù)器榄笙,4個(gè)分區(qū)(P0-P3)邪狞,有兩個(gè)消費(fèi)者組。組A中有2個(gè)消費(fèi)者實(shí)例茅撞,組B中有4個(gè)消費(fèi)者實(shí)例帆卓。
通常我們會(huì)發(fā)現(xiàn)巨朦,主題不會(huì)有太多的消費(fèi)者組,每個(gè)消費(fèi)者組是一個(gè)“邏輯訂閱者”(以消費(fèi)者組的名義訂閱主題剑令,而非以消費(fèi)者實(shí)例的名義去訂閱)糊啡。每個(gè)組由許多消費(fèi)者實(shí)例組成,以實(shí)現(xiàn)可擴(kuò)展性和容錯(cuò)尚洽。這仍然是發(fā)布/訂閱悔橄,只不過訂閱者是一個(gè)消費(fèi)者群體,而非單個(gè)進(jìn)程腺毫。
在Kafka中癣疟,這種消費(fèi)方式是通過用日志中的分區(qū)除以使用者實(shí)例來實(shí)現(xiàn)的,這樣可以保證在任意時(shí)刻每個(gè)消費(fèi)者都是排它的消費(fèi)潮酒,即“公平共享”睛挚。Kafka協(xié)議動(dòng)態(tài)的處理維護(hù)組中的成員。如果有心的實(shí)例加入到組中急黎,它們將從組中的其它成員那里接管一些分區(qū)扎狱;如果組中有一個(gè)實(shí)例死了,那么它的分區(qū)將會(huì)被分給其它實(shí)例勃教。
(畫外音:什么意思呢淤击?舉個(gè)例子,在上面的圖中故源,4個(gè)分區(qū)污抬,組A有2個(gè)消費(fèi)者,組B有4個(gè)消費(fèi)者绳军,那么對A來講組中的每個(gè)消費(fèi)者負(fù)責(zé)4/2=2個(gè)分區(qū)印机,對組B來說組中的每個(gè)消費(fèi)者負(fù)責(zé)4/4=1個(gè)分區(qū),而且同一時(shí)間消息只能被組中的一個(gè)實(shí)例消費(fèi)门驾。如果組中的成員數(shù)量有變化射赛,則重新分配。)
Kafka只提供分區(qū)下的記錄的總的順序奶是,而不提供主題下不同分區(qū)的總的順序楣责。每個(gè)分區(qū)結(jié)合按key劃分?jǐn)?shù)據(jù)的能力排序?qū)Υ蠖鄶?shù)應(yīng)用來說是足夠的。然而聂沙,如果你需要主題下總的記錄順序秆麸,你可以只使用一個(gè)分區(qū),這樣做的做的話就意味著每個(gè)消費(fèi)者組中只能有一個(gè)消費(fèi)者實(shí)例逐纬。
6. 保證
在一個(gè)高級別的Kafka給出下列保證:
被一個(gè)生產(chǎn)者發(fā)送到指定主題分區(qū)的消息將會(huì)按照它們被發(fā)送的順序追加到分區(qū)中。也就是說削樊,如果記錄M1和M2是被同一個(gè)生產(chǎn)者發(fā)送到同一個(gè)分區(qū)的豁生,而且M1是先發(fā)送的兔毒,M2是后發(fā)送的,那么在分區(qū)中M1的偏移量一定比M2小甸箱,并且M1出現(xiàn)在日志中的位置更靠前育叁。
一個(gè)消費(fèi)者看到記錄的順序和它們在日志中存儲(chǔ)的順序是一樣的。
對于一個(gè)副本因子是N的主題芍殖,我們可以容忍最多N-1個(gè)服務(wù)器失敗豪嗽,而不會(huì)丟失已經(jīng)提交給日志的任何記錄。
7. Spring Kafka
Spring提供了一個(gè)“模板”作為發(fā)送消息的高級抽象豌骏。它也通過使用@KafkaListener注釋和“監(jiān)聽器容器”提供對消息驅(qū)動(dòng)POJOs的支持龟梦。這些庫促進(jìn)了依賴注入和聲明式的使用。
7.1 純Java方式
1 package com.cjs.example.quickstart;
2
3 import org.apache.kafka.clients.consumer.ConsumerConfig;
4 import org.apache.kafka.clients.consumer.ConsumerRecord;
5 import org.apache.kafka.clients.producer.ProducerConfig;
6 import org.apache.kafka.common.serialization.IntegerDeserializer;
7 import org.apache.kafka.common.serialization.IntegerSerializer;
8 import org.apache.kafka.common.serialization.StringDeserializer;
9 import org.apache.kafka.common.serialization.StringSerializer;
10 import org.springframework.kafka.core.*;
11 import org.springframework.kafka.listener.KafkaMessageListenerContainer;
12 import org.springframework.kafka.listener.MessageListener;
13 import org.springframework.kafka.listener.config.ContainerProperties;
14
15 import java.util.HashMap;
16 import java.util.Map;
17
18 public class PureJavaDemo {
19
20 /**
21 * 生產(chǎn)者配置
22 */
23 private static Map senderProps() {
24 Map props = new HashMap<>();
25 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.5:9093");
26 props.put(ProducerConfig.RETRIES_CONFIG, 0);
27 props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
28 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
29 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
30 return props;
31 }
32
33 /**
34 * 消費(fèi)者配置
35 */
36 private static Map consumerProps() {
37 Map props = new HashMap<>();
38 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.5:9093");
39 props.put(ConsumerConfig.GROUP_ID_CONFIG, "hello");
40 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
41 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
42 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
43 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
44 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
45 return props;
46 }
47
48 /**
49 * 發(fā)送模板配置
50 */
51 private static KafkaTemplate createTemplate() {
52 Map senderProps = senderProps();
53 ProducerFactory producerFactory = new DefaultKafkaProducerFactory<>(senderProps);
54 KafkaTemplate kafkaTemplate = new KafkaTemplate<>(producerFactory);
55 return kafkaTemplate;
56 }
57
58 /**
59 * 消息監(jiān)聽器容器配置
60 */
61 private static KafkaMessageListenerContainer createContainer() {
62 Map consumerProps = consumerProps();
63 ConsumerFactory consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProps);
64 ContainerProperties containerProperties = new ContainerProperties("test");
65 KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
66 return container;
67 }
68
69
70 public static void main(String[] args) throws InterruptedException {
71 String topic1 = "test"; // 主題
72
73 KafkaMessageListenerContainer container = createContainer();
74 ContainerProperties containerProperties = container.getContainerProperties();
75 containerProperties.setMessageListener(new MessageListener() {
76 @Override
77 public void onMessage(ConsumerRecord record) {
78 System.out.println("Received: " + record);
79 }
80 });
81 container.setBeanName("testAuto");
82
83 container.start();
84
85 KafkaTemplate kafkaTemplate = createTemplate();
86 kafkaTemplate.setDefaultTopic(topic1);
87
88 kafkaTemplate.sendDefault(0, "foo");
89 kafkaTemplate.sendDefault(2, "bar");
90 kafkaTemplate.sendDefault(0, "baz");
91 kafkaTemplate.sendDefault(2, "qux");
92
93 kafkaTemplate.flush();
94 container.stop();
95
96 System.out.println("結(jié)束");
97 }
98
99 }
運(yùn)行結(jié)果:
Received: ConsumerRecord(topic = test, partition = 0, offset = 67, CreateTime = 1533300970788, serialized key size = 4, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = 0, value = foo)
Received: ConsumerRecord(topic = test, partition = 0, offset = 68, CreateTime = 1533300970793, serialized key size = 4, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = 2, value = bar)
Received: ConsumerRecord(topic = test, partition = 0, offset = 69, CreateTime = 1533300970793, serialized key size = 4, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = 0, value = baz)
Received: ConsumerRecord(topic = test, partition = 0, offset = 70, CreateTime = 1533300970793, serialized key size = 4, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = 2, value = qux)
7.2 更簡單一點(diǎn)兒窃躲,用SpringBoot
1 package com.cjs.example.quickstart;
2
3 import org.apache.kafka.clients.consumer.ConsumerRecord;
4 import org.springframework.beans.factory.annotation.Autowired;
5 import org.springframework.boot.CommandLineRunner;
6 import org.springframework.context.annotation.Bean;
7 import org.springframework.context.annotation.Configuration;
8 import org.springframework.kafka.annotation.KafkaListener;
9 import org.springframework.kafka.core.KafkaTemplate;
10
11 @Configuration
12 public class JavaConfigurationDemo {
13
14 @KafkaListener(topics = "test")
15 public void listen(ConsumerRecord record) {
16 System.out.println("收到消息: " + record);
17 }
18
19 @Bean
20 public CommandLineRunner commandLineRunner() {
21 return new MyRunner();
22 }
23
24 class MyRunner implements CommandLineRunner {
25
26 @Autowired
27 private KafkaTemplate kafkaTemplate;
28
29 @Override
30 public void run(String... args) throws Exception {
31 kafkaTemplate.send("test", "foo1");
32 kafkaTemplate.send("test", "foo2");
33 kafkaTemplate.send("test", "foo3");
34 kafkaTemplate.send("test", "foo4");
35 }
36 }
37 }
application.properties配置
spring.kafka.bootstrap-servers=192.168.101.5:9092
spring.kafka.consumer.group-id=world
8. 生產(chǎn)者
1 package com.cjs.example.send;
2
3 import org.apache.kafka.clients.producer.ProducerConfig;
4 import org.apache.kafka.common.serialization.IntegerSerializer;
5 import org.apache.kafka.common.serialization.StringSerializer;
6 import org.springframework.context.annotation.Bean;
7 import org.springframework.context.annotation.Configuration;
8 import org.springframework.kafka.core.DefaultKafkaProducerFactory;
9 import org.springframework.kafka.core.KafkaTemplate;
10 import org.springframework.kafka.core.ProducerFactory;
11
12 import java.util.HashMap;
13 import java.util.Map;
14
15 @Configuration
16 public class Config {
17
18 public Map producerConfigs() {
19 Map props = new HashMap<>();
20 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.5:9092");
21 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
22 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
23 return props;
24 }
25
26 public ProducerFactory producerFactory() {
27 return new DefaultKafkaProducerFactory<>(producerConfigs());
28 }
29
30 @Bean
31 public KafkaTemplate kafkaTemplate() {
32 return new KafkaTemplate(producerFactory());
33 }
34
35 }
1 package com.cjs.example.send;
2
3 import org.springframework.beans.factory.annotation.Autowired;
4 import org.springframework.boot.CommandLineRunner;
5 import org.springframework.kafka.core.KafkaTemplate;
6 import org.springframework.kafka.support.SendResult;
7 import org.springframework.stereotype.Component;
8 import org.springframework.util.concurrent.ListenableFuture;
9 import org.springframework.util.concurrent.ListenableFutureCallback;
10
11 @Component
12 public class MyCommandLineRunner implements CommandLineRunner {
13
14 @Autowired
15 private KafkaTemplate kafkaTemplate;
16
17 public void sendTo(Integer key, String value) {
18 ListenableFuture> listenableFuture = kafkaTemplate.send("test", key, value);
19 listenableFuture.addCallback(new ListenableFutureCallback>() {
20 @Override
21 public void onFailure(Throwable throwable) {
22 System.out.println("發(fā)送失敗啦");
23 throwable.printStackTrace();
24 }
25
26 @Override
27 public void onSuccess(SendResult sendResult) {
28 System.out.println("發(fā)送成功计贰," + sendResult);
29 }
30 });
31 }
32
33 @Override
34 public void run(String... args) throws Exception {
35 sendTo(1, "aaa");
36 sendTo(2, "bbb");
37 sendTo(3, "ccc");
38 }
39
40
41 }
運(yùn)行結(jié)果:
發(fā)送成功,SendResult [producerRecord=ProducerRecord(topic=test, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=1, value=aaa, timestamp=null), recordMetadata=test-0@37]
發(fā)送成功蒂窒,SendResult [producerRecord=ProducerRecord(topic=test, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=2, value=bbb, timestamp=null), recordMetadata=test-0@38]
發(fā)送成功躁倒,SendResult [producerRecord=ProducerRecord(topic=test, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=3, value=ccc, timestamp=null), recordMetadata=test-0@39]
9. 消費(fèi)者@KafkaListener
1 package com.cjs.example.receive;
2
3 import org.apache.kafka.clients.consumer.ConsumerConfig;
4 import org.apache.kafka.clients.consumer.ConsumerRecord;
5 import org.apache.kafka.common.serialization.IntegerDeserializer;
6 import org.apache.kafka.common.serialization.StringDeserializer;
7 import org.springframework.context.annotation.Bean;
8 import org.springframework.context.annotation.Configuration;
9 import org.springframework.kafka.annotation.KafkaListener;
10 import org.springframework.kafka.annotation.TopicPartition;
11 import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
12 import org.springframework.kafka.config.KafkaListenerContainerFactory;
13 import org.springframework.kafka.core.ConsumerFactory;
14 import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
15 import org.springframework.kafka.listener.AbstractMessageListenerContainer;
16 import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
17 import org.springframework.kafka.listener.config.ContainerProperties;
18 import org.springframework.kafka.support.Acknowledgment;
19 import org.springframework.kafka.support.KafkaHeaders;
20 import org.springframework.messaging.handler.annotation.Header;
21 import org.springframework.messaging.handler.annotation.Payload;
22
23 import java.util.HashMap;
24 import java.util.List;
25 import java.util.Map;
26
27 @Configuration
28 public class Config2 {
29
30 @Bean
31 public KafkaListenerContainerFactory> kafkaListenerContainerFactory() {
32 ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
33 factory.setConsumerFactory(consumerFactory());
34 factory.setConcurrency(3);
35 ContainerProperties containerProperties = factory.getContainerProperties();
36 containerProperties.setPollTimeout(2000);
37 // containerProperties.setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
38 return factory;
39 }
40
41 private ConsumerFactory consumerFactory() {
42 return new DefaultKafkaConsumerFactory<>(consumerProps());
43 }
44
45 private Map consumerProps() {
46 Map props = new HashMap<>();
47 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.5:9092");
48 props.put(ConsumerConfig.GROUP_ID_CONFIG, "hahaha");
49 // props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
50 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
51 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
52 return props;
53 }
54
55
56 @KafkaListener(topics = "test")
57 public void listen(String data) {
58 System.out.println("listen 收到: " + data);
59 }
60
61
62 @KafkaListener(topics = "test", containerFactory = "kafkaListenerContainerFactory")
63 public void listen2(String data, Acknowledgment ack) {
64 System.out.println("listen2 收到: " + data);
65 ack.acknowledge();
66 }
67
68 @KafkaListener(topicPartitions = {@TopicPartition(topic = "test", partitions = "0")})
69 public void listen3(ConsumerRecord record) {
70 System.out.println("listen3 收到: " + record.value());
71 }
72
73
74 @KafkaListener(id = "xyz", topics = "test")
75 public void listen4(@Payload String foo,
76 @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
77 @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
78 @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
79 @Header(KafkaHeaders.OFFSET) List offsets) {
80 System.out.println("listen4 收到: ");
81 System.out.println(foo);
82 System.out.println(key);
83 System.out.println(partition);
84 System.out.println(topic);
85 System.out.println(offsets);
86 }
87
88 }
9.1 Committing Offsets
如果想學(xué)習(xí)Java工程化、高性能及分布式洒琢、深入淺出秧秉。微服務(wù)、Spring衰抑,MyBatis象迎,Netty源碼分析的朋友可以加我的Java高級交流:854630135,群里有阿里大牛直播講解技術(shù)停士,以及Java大型互聯(lián)網(wǎng)技術(shù)的視頻免費(fèi)分享給大家挖帘。
如果enable.auto.commit設(shè)置為true,那么kafka將自動(dòng)提交offset恋技。如果設(shè)置為false拇舀,則支持下列AckMode(確認(rèn)模式)。
消費(fèi)者poll()方法將返回一個(gè)或多個(gè)ConsumerRecords
RECORD :處理完記錄以后蜻底,當(dāng)監(jiān)聽器返回時(shí)骄崩,提交offset
BATCH :當(dāng)對poll()返回的所有記錄進(jìn)行處理完以后,提交偏offset
TIME :當(dāng)對poll()返回的所有記錄進(jìn)行處理完以后薄辅,只要距離上一次提交已經(jīng)過了ackTime時(shí)間后就提交
COUNT :當(dāng)poll()返回的所有記錄都被處理時(shí)要拂,只要從上次提交以來收到了ackCount條記錄,就可以提交
COUNT_TIME :和TIME以及COUNT類似站楚,只要這兩個(gè)中有一個(gè)為true脱惰,則提交
MANUAL :消息監(jiān)聽器負(fù)責(zé)調(diào)用Acknowledgment.acknowledge()方法,此后和BATCH是一樣的
MANUAL_IMMEDIATE :當(dāng)監(jiān)聽器調(diào)用Acknowledgment.acknowledge()方法后立即提交
10. Spring Boot Kafka
10.1 application.properties
spring.kafka.bootstrap-servers=192.168.101.5:9092
10.2 發(fā)送消息
1 package com.cjs.example;
2
3 import org.springframework.beans.factory.annotation.Autowired;
4 import org.springframework.kafka.core.KafkaTemplate;
5 import org.springframework.web.bind.annotation.RequestMapping;
6 import org.springframework.web.bind.annotation.RestController;
7
8 import javax.annotation.Resource;
9
10 @RestController
11 @RequestMapping("/msg")
12 public class MessageController {
13
14 @Resource
15 private KafkaTemplate kafkaTemplate;
16
17 @RequestMapping("/send")
18 public String send(String topic, String key, String value) {
19 kafkaTemplate.send(topic, key, value);
20 return "ok";
21 }
22
23 }
10.3 接收消息
1 package com.cjs.example;
2
3 import org.apache.kafka.clients.consumer.ConsumerRecord;
4 import org.springframework.kafka.annotation.KafkaListener;
5 import org.springframework.kafka.annotation.KafkaListeners;
6 import org.springframework.stereotype.Component;
7
8 @Component
9 public class MessageListener {
10
11 /**
12 * 監(jiān)聽訂單消息
13 */
14 @KafkaListener(topics = "ORDER", groupId = "OrderGroup")
15 public void listenToOrder(String data) {
16 System.out.println("收到訂單消息:" + data);
17 }
18
19 /**
20 * 監(jiān)聽會(huì)員消息
21 */
22 @KafkaListener(topics = "MEMBER", groupId = "MemberGroup")
23 public void listenToMember(ConsumerRecord record) {
24 System.out.println("收到會(huì)員消息:" + record);
25 }
26
27 /**
28 * 監(jiān)聽所有消息
29 *
30 * 任意時(shí)刻窿春,一條消息只會(huì)發(fā)給組中的一個(gè)消費(fèi)者
31 *
32 * 消費(fèi)者組中的成員數(shù)量不能超過分區(qū)數(shù)拉一,這里分區(qū)數(shù)是1采盒,因此訂閱該主題的消費(fèi)者組成員不能超過1
33 */
34 // @KafkaListeners({@KafkaListener(topics = "ORDER", groupId = "OrderGroup"),
35 // @KafkaListener(topics = "MEMBER", groupId = "MemberGroup")})
36 // public void listenToAll(String data) {
37 // System.out.println("啊啊啊");
38 // }
39
40 }
11. pom.xml
如果想學(xué)習(xí)Java工程化、高性能及分布式蔚润、深入淺出磅氨。微服務(wù)、Spring嫡纠,MyBatis烦租,Netty源碼分析的朋友可以加我的Java高級交流:854630135,群里有阿里大牛直播講解技術(shù)除盏,以及Java大型互聯(lián)網(wǎng)技術(shù)的視頻免費(fèi)分享給大家叉橱。
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
com.cjs.example
cjs-kafka-example
0.0.1-SNAPSHOT
jar
cjs-kafka-example
org.springframework.boot
spring-boot-starter-parent
2.0.4.RELEASE
UTF-8
UTF-8
1.8
org.springframework.boot
spring-boot-starter-web
org.springframework.kafka
spring-kafka
org.springframework.boot
spring-boot-starter-test
test
org.springframework.boot
spring-boot-maven-plugin