Kafka從入門到進(jìn)階

1. Apache Kafka是一個(gè)分布式流平臺(tái)

1.1 流平臺(tái)有三個(gè)關(guān)鍵功能:

發(fā)布和訂閱流記錄佑惠,類似于一個(gè)消息隊(duì)列或企業(yè)消息系統(tǒng)

以一種容錯(cuò)的持久方式存儲(chǔ)記錄流

在流記錄生成的時(shí)候就處理它們

1.2 Kafka通常用于兩大類應(yīng)用:

如果想學(xué)習(xí)Java工程化栈虚、高性能及分布式蒙揣、深入淺出。微服務(wù)昌犹、Spring忘晤,MyBatis,Netty源碼分析的朋友可以加我的Java高級交流:854630135,群里有阿里大牛直播講解技術(shù)惑惶,以及Java大型互聯(lián)網(wǎng)技術(shù)的視頻免費(fèi)分享給大家煮盼。

構(gòu)建實(shí)時(shí)流數(shù)據(jù)管道,在系統(tǒng)或應(yīng)用程序之間可靠地獲取數(shù)據(jù)

構(gòu)建對數(shù)據(jù)流進(jìn)行轉(zhuǎn)換或輸出的實(shí)時(shí)流媒體應(yīng)用程序

1.3 有幾個(gè)特別重要的概念:

Kafka is run as a cluster on one or more servers that can span multiple datacenters.

The Kafka cluster stores streams of records in categories called topics.

Each record consists of a key, a value, and a timestamp.

Kafka作為集群運(yùn)行在一個(gè)或多個(gè)可以跨多個(gè)數(shù)據(jù)中心的服務(wù)器上

從這句話表達(dá)了三個(gè)意思:

Kafka是以集群方式運(yùn)行的

集群中可以只有一臺(tái)服務(wù)器带污,也有可能有多臺(tái)服務(wù)器僵控。也就是說,一臺(tái)服務(wù)器也是一個(gè)集群鱼冀,多臺(tái)服務(wù)器也可以組成一個(gè)集群

這些服務(wù)器可以跨多個(gè)數(shù)據(jù)中心

Kafka集群按分類存儲(chǔ)流記錄报破,這個(gè)分類叫做主題

這句話表達(dá)了以下幾個(gè)信息:

流記錄是分類存儲(chǔ)的,也就說記錄是歸類的

我們稱這種分類為主題

簡單地來講千绪,記錄是按主題劃分歸類存儲(chǔ)的

每個(gè)記錄由一個(gè)鍵充易、一個(gè)值和一個(gè)時(shí)間戳組成

1.4 Kafka有四個(gè)核心API:

Producer API?:允許應(yīng)用發(fā)布一條流記錄到一個(gè)或多個(gè)主題

Consumer API?:允許應(yīng)用訂閱一個(gè)或多個(gè)主題,并處理流記錄

Streams API?:允許應(yīng)用作為一個(gè)流處理器荸型,從一個(gè)或多個(gè)主題那里消費(fèi)輸入流盹靴,并將輸出流輸出到一個(gè)或多個(gè)輸出主題,從而有效地講輸入流轉(zhuǎn)換為輸出流

Connector API?:允許將主題連接到已經(jīng)存在的應(yīng)用或者數(shù)據(jù)系統(tǒng)瑞妇,以構(gòu)建并允許可重用的生產(chǎn)者或消費(fèi)者稿静。例如,一個(gè)關(guān)系型數(shù)據(jù)庫的連接器可能捕獲到一張表的每一次變更

(畫外音:我理解這四個(gè)核心API其實(shí)就是:發(fā)布辕狰、訂閱自赔、轉(zhuǎn)換處理、從第三方采集數(shù)據(jù)柳琢。)

在Kafka中,客戶端和服務(wù)器之間的通信是使用簡單的润脸、高性能的柬脸、與語言無關(guān)的TCP協(xié)議完成的。

2. Topics and Logs(主題和日志)

一個(gè)topic是一個(gè)分類毙驯,或者說是記錄被發(fā)布的時(shí)候的一個(gè)名字(畫外音:可以理解為記錄要被發(fā)到哪兒去)倒堕。

在Kafka中,topic總是有多個(gè)訂閱者爆价,因此垦巴,一個(gè)topic可能有0個(gè),1個(gè)或多個(gè)訂閱該數(shù)據(jù)的消費(fèi)者铭段。

對于每個(gè)主題骤宣,Kafka集群維護(hù)一個(gè)分區(qū)日志,如下圖所示:

每個(gè)分區(qū)都是一個(gè)有序的序愚、不可變的記錄序列憔披,而且記錄會(huì)不斷的被追加,一條記錄就是一個(gè)結(jié)構(gòu)化的提交日志(a structured commit log)。

分區(qū)中的每條記錄都被分配了一個(gè)連續(xù)的id號芬膝,這個(gè)id號被叫做offset(偏移量)望门,這個(gè)偏移量唯一的標(biāo)識出分區(qū)中的每條記錄。(PS:如果把分區(qū)比作數(shù)據(jù)庫表的話锰霜,那么偏移量就是主鍵)

Kafka集群持久化所有已發(fā)布的記錄筹误,無論它們有沒有被消費(fèi),記錄被保留的時(shí)間是可以配置的癣缅。例如厨剪,如果保留策略被設(shè)置為兩天,那么在記錄發(fā)布后的兩天內(nèi)所灸,可以使用它丽惶,之后將其丟棄以釋放空間。在對數(shù)據(jù)大小方面爬立,Kafka的性能是高效的钾唬,恒定常量級的,因此長時(shí)間存儲(chǔ)數(shù)據(jù)不是問題侠驯。

事實(shí)上抡秆,唯一維護(hù)在每個(gè)消費(fèi)者上的元數(shù)據(jù)是消費(fèi)者在日志中的位置或者叫偏移量。偏移量是由消費(fèi)者控制的:通常消費(fèi)者在讀取記錄的時(shí)候會(huì)線性的增加它的偏移量吟策,但是儒士,事實(shí)上,由于位置(偏移量)是由消費(fèi)者控制的檩坚,所有它可以按任意它喜歡的順序消費(fèi)記錄着撩。例如:一個(gè)消費(fèi)者可以重置到一個(gè)較舊的偏移量來重新處理之前已經(jīng)處理過的數(shù)據(jù),或者跳轉(zhuǎn)到最近的記錄并從“現(xiàn)在”開始消費(fèi)匾委。

這種特性意味著消費(fèi)者非常廉價(jià)————他們可以來來去去的消息而不會(huì)對集群或者其它消費(fèi)者造成太大影響拖叙。

日志中的分區(qū)有幾個(gè)用途。首先赂乐,它們允許日志的規(guī)模超出單個(gè)服務(wù)器的大小薯鳍。每個(gè)獨(dú)立分區(qū)都必須與宿主的服務(wù)器相匹配唤衫,但一個(gè)主題可能有多個(gè)分區(qū)是辕,所以它可以處理任意數(shù)量的數(shù)據(jù)竞帽。第二忍抽,它們作為并行的單位——稍后再進(jìn)一步拭嫁。

畫外音:簡單地來說银萍,日志分區(qū)的作用有兩個(gè):一络凿、日志的規(guī)模不再受限于單個(gè)服務(wù)器憨攒;二觉既、分區(qū)意味著可以并行砸民。

什么意思呢?主題建立在集群之上,每個(gè)主題維護(hù)了一個(gè)分區(qū)日志岭参,顧名思義反惕,日志是分區(qū)的;每個(gè)分區(qū)所在的服務(wù)器的資源(比如:CPU演侯、內(nèi)存姿染、帶寬、磁盤等)是有限的秒际,如果不分區(qū)(可以理解為等同于只有一個(gè))的話悬赏,必然受限于這個(gè)分區(qū)所在的服務(wù)器,那么多個(gè)分區(qū)的話就不一樣了娄徊,就突破了這種限制闽颇,服務(wù)器可以隨便加,分區(qū)也可以隨便加寄锐。

3. Distribution(分布)

日志的分區(qū)分布在集群中的服務(wù)器上兵多,每個(gè)服務(wù)器處理數(shù)據(jù),并且分區(qū)請求是共享的橄仆。每個(gè)分區(qū)被復(fù)制到多個(gè)服務(wù)器上以實(shí)現(xiàn)容錯(cuò)剩膘,到底復(fù)制到多少個(gè)服務(wù)器上是可以配置的。

Each partition is replicated across a configurable number of servers for fault tolerance.

每個(gè)分區(qū)都有一個(gè)服務(wù)器充當(dāng)“leader”角色盆顾,并且有0個(gè)或者多個(gè)服務(wù)器作為“followers”怠褐。leader處理對這個(gè)分區(qū)的所有讀和寫請求,而followers被動(dòng)的從leader那里復(fù)制數(shù)據(jù)您宪。如果leader失敗奈懒,followers中的其中一個(gè)會(huì)自動(dòng)變成新的leader。每個(gè)服務(wù)器充當(dāng)一些分區(qū)的“leader”的同時(shí)也是其它分區(qū)的“follower”宪巨,因此在整個(gè)集群中負(fù)載是均衡的筐赔。

也就是說,每個(gè)服務(wù)器既是“leader”也是“follower”揖铜。我們知道一個(gè)主題可能有多個(gè)分區(qū),一個(gè)分區(qū)可能在一個(gè)服務(wù)器上也可能跨多個(gè)服務(wù)器达皿,然而這并不以為著一臺(tái)服務(wù)器上只有一個(gè)分區(qū)天吓,是可能有多個(gè)分區(qū)的。每個(gè)分區(qū)中有一個(gè)服務(wù)器充當(dāng)“leader”峦椰,其余是“follower”龄寞。leader負(fù)責(zé)處理這個(gè)它作為leader所負(fù)責(zé)的分區(qū)的所有讀寫請求,而該分區(qū)中的follow只是被動(dòng)復(fù)制leader的數(shù)據(jù)汤功。這個(gè)有點(diǎn)兒像HDFS中的副本機(jī)制物邑。例如:分區(qū)-1有服務(wù)器A和B組成,A是leader,B是follower色解,有請求要往分區(qū)-1中寫數(shù)據(jù)的時(shí)候就由A處理茂嗓,然后A把剛才寫的數(shù)據(jù)同步給B,這樣的話正常請求相當(dāng)于A和B的數(shù)據(jù)是一樣的科阎,都有分區(qū)-1的全部數(shù)據(jù)述吸,如果A宕機(jī)了,B成為leader锣笨,接替A繼續(xù)處理對分區(qū)-1的讀寫請求蝌矛。

需要注意的是,分區(qū)是一個(gè)虛擬的概念错英,是一個(gè)邏輯單元入撒。

4. Producers(生產(chǎn)者)

如果想學(xué)習(xí)Java工程化、高性能及分布式椭岩、深入淺出茅逮。微服務(wù)、Spring簿煌,MyBatis氮唯,Netty源碼分析的朋友可以加我的Java高級交流:854630135,群里有阿里大牛直播講解技術(shù)姨伟,以及Java大型互聯(lián)網(wǎng)技術(shù)的視頻免費(fèi)分享給大家惩琉。

生產(chǎn)者發(fā)布數(shù)據(jù)到它們選擇的主題中。生產(chǎn)者負(fù)責(zé)選擇將記錄投遞到哪個(gè)主題的哪個(gè)分區(qū)中夺荒。要做這件事情瞒渠,可以簡單地用循環(huán)方式以到達(dá)負(fù)載均衡,或者根據(jù)一些語義分區(qū)函數(shù)(比如:基于記錄中的某些key)

5. Consumers(消費(fèi)者)

消費(fèi)者用一個(gè)消費(fèi)者組名來標(biāo)識它們自己(PS:相當(dāng)于給自己貼一個(gè)標(biāo)簽技扼,標(biāo)簽的名字是組名伍玖,以表明自己屬于哪個(gè)組),并且每一條發(fā)布到主題中的記錄只會(huì)投遞給每個(gè)訂閱的消費(fèi)者組中的其中一個(gè)消費(fèi)者實(shí)例剿吻。消費(fèi)者實(shí)例可能是單獨(dú)的進(jìn)程或者在單獨(dú)的機(jī)器上窍箍。

如果所有的消費(fèi)者實(shí)例都使用相同的消費(fèi)者組,那么記錄將會(huì)在這些消費(fèi)者之間有效的負(fù)載均衡丽旅。

如果所有的消費(fèi)者實(shí)例都使用不同的消費(fèi)者組椰棘,那么每條記錄將會(huì)廣播給所有的消費(fèi)者進(jìn)程。

上圖中其實(shí)那個(gè)Kafka Cluster換成Topic會(huì)更準(zhǔn)確一些

一個(gè)Kafka集群有2個(gè)服務(wù)器榄笙,4個(gè)分區(qū)(P0-P3)邪狞,有兩個(gè)消費(fèi)者組。組A中有2個(gè)消費(fèi)者實(shí)例茅撞,組B中有4個(gè)消費(fèi)者實(shí)例帆卓。

通常我們會(huì)發(fā)現(xiàn)巨朦,主題不會(huì)有太多的消費(fèi)者組,每個(gè)消費(fèi)者組是一個(gè)“邏輯訂閱者”(以消費(fèi)者組的名義訂閱主題剑令,而非以消費(fèi)者實(shí)例的名義去訂閱)糊啡。每個(gè)組由許多消費(fèi)者實(shí)例組成,以實(shí)現(xiàn)可擴(kuò)展性和容錯(cuò)尚洽。這仍然是發(fā)布/訂閱悔橄,只不過訂閱者是一個(gè)消費(fèi)者群體,而非單個(gè)進(jìn)程腺毫。

在Kafka中癣疟,這種消費(fèi)方式是通過用日志中的分區(qū)除以使用者實(shí)例來實(shí)現(xiàn)的,這樣可以保證在任意時(shí)刻每個(gè)消費(fèi)者都是排它的消費(fèi)潮酒,即“公平共享”睛挚。Kafka協(xié)議動(dòng)態(tài)的處理維護(hù)組中的成員。如果有心的實(shí)例加入到組中急黎,它們將從組中的其它成員那里接管一些分區(qū)扎狱;如果組中有一個(gè)實(shí)例死了,那么它的分區(qū)將會(huì)被分給其它實(shí)例勃教。

(畫外音:什么意思呢淤击?舉個(gè)例子,在上面的圖中故源,4個(gè)分區(qū)污抬,組A有2個(gè)消費(fèi)者,組B有4個(gè)消費(fèi)者绳军,那么對A來講組中的每個(gè)消費(fèi)者負(fù)責(zé)4/2=2個(gè)分區(qū)印机,對組B來說組中的每個(gè)消費(fèi)者負(fù)責(zé)4/4=1個(gè)分區(qū),而且同一時(shí)間消息只能被組中的一個(gè)實(shí)例消費(fèi)门驾。如果組中的成員數(shù)量有變化射赛,則重新分配。)

Kafka只提供分區(qū)下的記錄的總的順序奶是,而不提供主題下不同分區(qū)的總的順序楣责。每個(gè)分區(qū)結(jié)合按key劃分?jǐn)?shù)據(jù)的能力排序?qū)Υ蠖鄶?shù)應(yīng)用來說是足夠的。然而聂沙,如果你需要主題下總的記錄順序秆麸,你可以只使用一個(gè)分區(qū),這樣做的做的話就意味著每個(gè)消費(fèi)者組中只能有一個(gè)消費(fèi)者實(shí)例逐纬。

6. 保證

在一個(gè)高級別的Kafka給出下列保證:

被一個(gè)生產(chǎn)者發(fā)送到指定主題分區(qū)的消息將會(huì)按照它們被發(fā)送的順序追加到分區(qū)中。也就是說削樊,如果記錄M1和M2是被同一個(gè)生產(chǎn)者發(fā)送到同一個(gè)分區(qū)的豁生,而且M1是先發(fā)送的兔毒,M2是后發(fā)送的,那么在分區(qū)中M1的偏移量一定比M2小甸箱,并且M1出現(xiàn)在日志中的位置更靠前育叁。

一個(gè)消費(fèi)者看到記錄的順序和它們在日志中存儲(chǔ)的順序是一樣的。

對于一個(gè)副本因子是N的主題芍殖,我們可以容忍最多N-1個(gè)服務(wù)器失敗豪嗽,而不會(huì)丟失已經(jīng)提交給日志的任何記錄。

7. Spring Kafka

Spring提供了一個(gè)“模板”作為發(fā)送消息的高級抽象豌骏。它也通過使用@KafkaListener注釋和“監(jiān)聽器容器”提供對消息驅(qū)動(dòng)POJOs的支持龟梦。這些庫促進(jìn)了依賴注入和聲明式的使用。

7.1 純Java方式

1 package com.cjs.example.quickstart;

2

3 import org.apache.kafka.clients.consumer.ConsumerConfig;

4 import org.apache.kafka.clients.consumer.ConsumerRecord;

5 import org.apache.kafka.clients.producer.ProducerConfig;

6 import org.apache.kafka.common.serialization.IntegerDeserializer;

7 import org.apache.kafka.common.serialization.IntegerSerializer;

8 import org.apache.kafka.common.serialization.StringDeserializer;

9 import org.apache.kafka.common.serialization.StringSerializer;

10 import org.springframework.kafka.core.*;

11 import org.springframework.kafka.listener.KafkaMessageListenerContainer;

12 import org.springframework.kafka.listener.MessageListener;

13 import org.springframework.kafka.listener.config.ContainerProperties;

14

15 import java.util.HashMap;

16 import java.util.Map;

17

18 public class PureJavaDemo {

19

20 /**

21 * 生產(chǎn)者配置

22 */

23 private static Map senderProps() {

24 Map props = new HashMap<>();

25 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.5:9093");

26 props.put(ProducerConfig.RETRIES_CONFIG, 0);

27 props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);

28 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);

29 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

30 return props;

31 }

32

33 /**

34 * 消費(fèi)者配置

35 */

36 private static Map consumerProps() {

37 Map props = new HashMap<>();

38 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.5:9093");

39 props.put(ConsumerConfig.GROUP_ID_CONFIG, "hello");

40 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

41 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");

42 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");

43 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);

44 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

45 return props;

46 }

47

48 /**

49 * 發(fā)送模板配置

50 */

51 private static KafkaTemplate createTemplate() {

52 Map senderProps = senderProps();

53 ProducerFactory producerFactory = new DefaultKafkaProducerFactory<>(senderProps);

54 KafkaTemplate kafkaTemplate = new KafkaTemplate<>(producerFactory);

55 return kafkaTemplate;

56 }

57

58 /**

59 * 消息監(jiān)聽器容器配置

60 */

61 private static KafkaMessageListenerContainer createContainer() {

62 Map consumerProps = consumerProps();

63 ConsumerFactory consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProps);

64 ContainerProperties containerProperties = new ContainerProperties("test");

65 KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);

66 return container;

67 }

68

69

70 public static void main(String[] args) throws InterruptedException {

71 String topic1 = "test"; // 主題

72

73 KafkaMessageListenerContainer container = createContainer();

74 ContainerProperties containerProperties = container.getContainerProperties();

75 containerProperties.setMessageListener(new MessageListener() {

76 @Override

77 public void onMessage(ConsumerRecord record) {

78 System.out.println("Received: " + record);

79 }

80 });

81 container.setBeanName("testAuto");

82

83 container.start();

84

85 KafkaTemplate kafkaTemplate = createTemplate();

86 kafkaTemplate.setDefaultTopic(topic1);

87

88 kafkaTemplate.sendDefault(0, "foo");

89 kafkaTemplate.sendDefault(2, "bar");

90 kafkaTemplate.sendDefault(0, "baz");

91 kafkaTemplate.sendDefault(2, "qux");

92

93 kafkaTemplate.flush();

94 container.stop();

95

96 System.out.println("結(jié)束");

97 }

98

99 }

運(yùn)行結(jié)果:

Received: ConsumerRecord(topic = test, partition = 0, offset = 67, CreateTime = 1533300970788, serialized key size = 4, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = 0, value = foo)

Received: ConsumerRecord(topic = test, partition = 0, offset = 68, CreateTime = 1533300970793, serialized key size = 4, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = 2, value = bar)

Received: ConsumerRecord(topic = test, partition = 0, offset = 69, CreateTime = 1533300970793, serialized key size = 4, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = 0, value = baz)

Received: ConsumerRecord(topic = test, partition = 0, offset = 70, CreateTime = 1533300970793, serialized key size = 4, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = 2, value = qux)

7.2 更簡單一點(diǎn)兒窃躲,用SpringBoot

1 package com.cjs.example.quickstart;

2

3 import org.apache.kafka.clients.consumer.ConsumerRecord;

4 import org.springframework.beans.factory.annotation.Autowired;

5 import org.springframework.boot.CommandLineRunner;

6 import org.springframework.context.annotation.Bean;

7 import org.springframework.context.annotation.Configuration;

8 import org.springframework.kafka.annotation.KafkaListener;

9 import org.springframework.kafka.core.KafkaTemplate;

10

11 @Configuration

12 public class JavaConfigurationDemo {

13

14 @KafkaListener(topics = "test")

15 public void listen(ConsumerRecord record) {

16 System.out.println("收到消息: " + record);

17 }

18

19 @Bean

20 public CommandLineRunner commandLineRunner() {

21 return new MyRunner();

22 }

23

24 class MyRunner implements CommandLineRunner {

25

26 @Autowired

27 private KafkaTemplate kafkaTemplate;

28

29 @Override

30 public void run(String... args) throws Exception {

31 kafkaTemplate.send("test", "foo1");

32 kafkaTemplate.send("test", "foo2");

33 kafkaTemplate.send("test", "foo3");

34 kafkaTemplate.send("test", "foo4");

35 }

36 }

37 }

application.properties配置

spring.kafka.bootstrap-servers=192.168.101.5:9092

spring.kafka.consumer.group-id=world

8. 生產(chǎn)者

1 package com.cjs.example.send;

2

3 import org.apache.kafka.clients.producer.ProducerConfig;

4 import org.apache.kafka.common.serialization.IntegerSerializer;

5 import org.apache.kafka.common.serialization.StringSerializer;

6 import org.springframework.context.annotation.Bean;

7 import org.springframework.context.annotation.Configuration;

8 import org.springframework.kafka.core.DefaultKafkaProducerFactory;

9 import org.springframework.kafka.core.KafkaTemplate;

10 import org.springframework.kafka.core.ProducerFactory;

11

12 import java.util.HashMap;

13 import java.util.Map;

14

15 @Configuration

16 public class Config {

17

18 public Map producerConfigs() {

19 Map props = new HashMap<>();

20 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.5:9092");

21 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);

22 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

23 return props;

24 }

25

26 public ProducerFactory producerFactory() {

27 return new DefaultKafkaProducerFactory<>(producerConfigs());

28 }

29

30 @Bean

31 public KafkaTemplate kafkaTemplate() {

32 return new KafkaTemplate(producerFactory());

33 }

34

35 }

1 package com.cjs.example.send;

2

3 import org.springframework.beans.factory.annotation.Autowired;

4 import org.springframework.boot.CommandLineRunner;

5 import org.springframework.kafka.core.KafkaTemplate;

6 import org.springframework.kafka.support.SendResult;

7 import org.springframework.stereotype.Component;

8 import org.springframework.util.concurrent.ListenableFuture;

9 import org.springframework.util.concurrent.ListenableFutureCallback;

10

11 @Component

12 public class MyCommandLineRunner implements CommandLineRunner {

13

14 @Autowired

15 private KafkaTemplate kafkaTemplate;

16

17 public void sendTo(Integer key, String value) {

18 ListenableFuture> listenableFuture = kafkaTemplate.send("test", key, value);

19 listenableFuture.addCallback(new ListenableFutureCallback>() {

20 @Override

21 public void onFailure(Throwable throwable) {

22 System.out.println("發(fā)送失敗啦");

23 throwable.printStackTrace();

24 }

25

26 @Override

27 public void onSuccess(SendResult sendResult) {

28 System.out.println("發(fā)送成功计贰," + sendResult);

29 }

30 });

31 }

32

33 @Override

34 public void run(String... args) throws Exception {

35 sendTo(1, "aaa");

36 sendTo(2, "bbb");

37 sendTo(3, "ccc");

38 }

39

40

41 }

運(yùn)行結(jié)果:

發(fā)送成功,SendResult [producerRecord=ProducerRecord(topic=test, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=1, value=aaa, timestamp=null), recordMetadata=test-0@37]

發(fā)送成功蒂窒,SendResult [producerRecord=ProducerRecord(topic=test, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=2, value=bbb, timestamp=null), recordMetadata=test-0@38]

發(fā)送成功躁倒,SendResult [producerRecord=ProducerRecord(topic=test, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=3, value=ccc, timestamp=null), recordMetadata=test-0@39]

9. 消費(fèi)者@KafkaListener

1 package com.cjs.example.receive;

2

3 import org.apache.kafka.clients.consumer.ConsumerConfig;

4 import org.apache.kafka.clients.consumer.ConsumerRecord;

5 import org.apache.kafka.common.serialization.IntegerDeserializer;

6 import org.apache.kafka.common.serialization.StringDeserializer;

7 import org.springframework.context.annotation.Bean;

8 import org.springframework.context.annotation.Configuration;

9 import org.springframework.kafka.annotation.KafkaListener;

10 import org.springframework.kafka.annotation.TopicPartition;

11 import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;

12 import org.springframework.kafka.config.KafkaListenerContainerFactory;

13 import org.springframework.kafka.core.ConsumerFactory;

14 import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

15 import org.springframework.kafka.listener.AbstractMessageListenerContainer;

16 import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

17 import org.springframework.kafka.listener.config.ContainerProperties;

18 import org.springframework.kafka.support.Acknowledgment;

19 import org.springframework.kafka.support.KafkaHeaders;

20 import org.springframework.messaging.handler.annotation.Header;

21 import org.springframework.messaging.handler.annotation.Payload;

22

23 import java.util.HashMap;

24 import java.util.List;

25 import java.util.Map;

26

27 @Configuration

28 public class Config2 {

29

30 @Bean

31 public KafkaListenerContainerFactory> kafkaListenerContainerFactory() {

32 ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();

33 factory.setConsumerFactory(consumerFactory());

34 factory.setConcurrency(3);

35 ContainerProperties containerProperties = factory.getContainerProperties();

36 containerProperties.setPollTimeout(2000);

37 // containerProperties.setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);

38 return factory;

39 }

40

41 private ConsumerFactory consumerFactory() {

42 return new DefaultKafkaConsumerFactory<>(consumerProps());

43 }

44

45 private Map consumerProps() {

46 Map props = new HashMap<>();

47 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.5:9092");

48 props.put(ConsumerConfig.GROUP_ID_CONFIG, "hahaha");

49 // props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

50 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);

51 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

52 return props;

53 }

54

55

56 @KafkaListener(topics = "test")

57 public void listen(String data) {

58 System.out.println("listen 收到: " + data);

59 }

60

61

62 @KafkaListener(topics = "test", containerFactory = "kafkaListenerContainerFactory")

63 public void listen2(String data, Acknowledgment ack) {

64 System.out.println("listen2 收到: " + data);

65 ack.acknowledge();

66 }

67

68 @KafkaListener(topicPartitions = {@TopicPartition(topic = "test", partitions = "0")})

69 public void listen3(ConsumerRecord record) {

70 System.out.println("listen3 收到: " + record.value());

71 }

72

73

74 @KafkaListener(id = "xyz", topics = "test")

75 public void listen4(@Payload String foo,

76 @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,

77 @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,

78 @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,

79 @Header(KafkaHeaders.OFFSET) List offsets) {

80 System.out.println("listen4 收到: ");

81 System.out.println(foo);

82 System.out.println(key);

83 System.out.println(partition);

84 System.out.println(topic);

85 System.out.println(offsets);

86 }

87

88 }

9.1 Committing Offsets

如果想學(xué)習(xí)Java工程化、高性能及分布式洒琢、深入淺出秧秉。微服務(wù)、Spring衰抑,MyBatis象迎,Netty源碼分析的朋友可以加我的Java高級交流:854630135,群里有阿里大牛直播講解技術(shù)停士,以及Java大型互聯(lián)網(wǎng)技術(shù)的視頻免費(fèi)分享給大家挖帘。

如果enable.auto.commit設(shè)置為true,那么kafka將自動(dòng)提交offset恋技。如果設(shè)置為false拇舀,則支持下列AckMode(確認(rèn)模式)。

消費(fèi)者poll()方法將返回一個(gè)或多個(gè)ConsumerRecords

RECORD :處理完記錄以后蜻底,當(dāng)監(jiān)聽器返回時(shí)骄崩,提交offset

BATCH :當(dāng)對poll()返回的所有記錄進(jìn)行處理完以后,提交偏offset

TIME :當(dāng)對poll()返回的所有記錄進(jìn)行處理完以后薄辅,只要距離上一次提交已經(jīng)過了ackTime時(shí)間后就提交

COUNT :當(dāng)poll()返回的所有記錄都被處理時(shí)要拂,只要從上次提交以來收到了ackCount條記錄,就可以提交

COUNT_TIME :和TIME以及COUNT類似站楚,只要這兩個(gè)中有一個(gè)為true脱惰,則提交

MANUAL :消息監(jiān)聽器負(fù)責(zé)調(diào)用Acknowledgment.acknowledge()方法,此后和BATCH是一樣的

MANUAL_IMMEDIATE :當(dāng)監(jiān)聽器調(diào)用Acknowledgment.acknowledge()方法后立即提交

10. Spring Boot Kafka

10.1 application.properties

spring.kafka.bootstrap-servers=192.168.101.5:9092

10.2 發(fā)送消息

1 package com.cjs.example;

2

3 import org.springframework.beans.factory.annotation.Autowired;

4 import org.springframework.kafka.core.KafkaTemplate;

5 import org.springframework.web.bind.annotation.RequestMapping;

6 import org.springframework.web.bind.annotation.RestController;

7

8 import javax.annotation.Resource;

9

10 @RestController

11 @RequestMapping("/msg")

12 public class MessageController {

13

14 @Resource

15 private KafkaTemplate kafkaTemplate;

16

17 @RequestMapping("/send")

18 public String send(String topic, String key, String value) {

19 kafkaTemplate.send(topic, key, value);

20 return "ok";

21 }

22

23 }

10.3 接收消息

1 package com.cjs.example;

2

3 import org.apache.kafka.clients.consumer.ConsumerRecord;

4 import org.springframework.kafka.annotation.KafkaListener;

5 import org.springframework.kafka.annotation.KafkaListeners;

6 import org.springframework.stereotype.Component;

7

8 @Component

9 public class MessageListener {

10

11 /**

12 * 監(jiān)聽訂單消息

13 */

14 @KafkaListener(topics = "ORDER", groupId = "OrderGroup")

15 public void listenToOrder(String data) {

16 System.out.println("收到訂單消息:" + data);

17 }

18

19 /**

20 * 監(jiān)聽會(huì)員消息

21 */

22 @KafkaListener(topics = "MEMBER", groupId = "MemberGroup")

23 public void listenToMember(ConsumerRecord record) {

24 System.out.println("收到會(huì)員消息:" + record);

25 }

26

27 /**

28 * 監(jiān)聽所有消息

29 *

30 * 任意時(shí)刻窿春,一條消息只會(huì)發(fā)給組中的一個(gè)消費(fèi)者

31 *

32 * 消費(fèi)者組中的成員數(shù)量不能超過分區(qū)數(shù)拉一,這里分區(qū)數(shù)是1采盒,因此訂閱該主題的消費(fèi)者組成員不能超過1

33 */

34 // @KafkaListeners({@KafkaListener(topics = "ORDER", groupId = "OrderGroup"),

35 // @KafkaListener(topics = "MEMBER", groupId = "MemberGroup")})

36 // public void listenToAll(String data) {

37 // System.out.println("啊啊啊");

38 // }

39

40 }

11. pom.xml

如果想學(xué)習(xí)Java工程化、高性能及分布式蔚润、深入淺出磅氨。微服務(wù)、Spring嫡纠,MyBatis烦租,Netty源碼分析的朋友可以加我的Java高級交流:854630135,群里有阿里大牛直播講解技術(shù)除盏,以及Java大型互聯(lián)網(wǎng)技術(shù)的視頻免費(fèi)分享給大家叉橱。


xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

com.cjs.example

cjs-kafka-example

0.0.1-SNAPSHOT

jar

cjs-kafka-example

org.springframework.boot

spring-boot-starter-parent

2.0.4.RELEASE

UTF-8

UTF-8

1.8

org.springframework.boot

spring-boot-starter-web

org.springframework.kafka

spring-kafka

org.springframework.boot

spring-boot-starter-test

test

org.springframework.boot

spring-boot-maven-plugin

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市痴颊,隨后出現(xiàn)的幾起案子赏迟,更是在濱河造成了極大的恐慌,老刑警劉巖蠢棱,帶你破解...
    沈念sama閱讀 218,546評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件锌杀,死亡現(xiàn)場離奇詭異,居然都是意外死亡泻仙,警方通過查閱死者的電腦和手機(jī)糕再,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,224評論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來玉转,“玉大人突想,你說我怎么就攤上這事【孔ィ” “怎么了猾担?”我有些...
    開封第一講書人閱讀 164,911評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長刺下。 經(jīng)常有香客問我绑嘹,道長,這世上最難降的妖魔是什么橘茉? 我笑而不...
    開封第一講書人閱讀 58,737評論 1 294
  • 正文 為了忘掉前任工腋,我火速辦了婚禮,結(jié)果婚禮上畅卓,老公的妹妹穿的比我還像新娘擅腰。我一直安慰自己,他們只是感情好翁潘,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,753評論 6 392
  • 文/花漫 我一把揭開白布趁冈。 她就那樣靜靜地躺著,像睡著了一般拜马。 火紅的嫁衣襯著肌膚如雪渗勘。 梳的紋絲不亂的頭發(fā)上矾飞,一...
    開封第一講書人閱讀 51,598評論 1 305
  • 那天,我揣著相機(jī)與錄音呀邢,去河邊找鬼。 笑死豹绪,一個(gè)胖子當(dāng)著我的面吹牛价淌,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播瞒津,決...
    沈念sama閱讀 40,338評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼蝉衣,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了巷蚪?” 一聲冷哼從身側(cè)響起病毡,我...
    開封第一講書人閱讀 39,249評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎屁柏,沒想到半個(gè)月后啦膜,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,696評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡淌喻,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,888評論 3 336
  • 正文 我和宋清朗相戀三年僧家,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片裸删。...
    茶點(diǎn)故事閱讀 40,013評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡八拱,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出涯塔,到底是詐尸還是另有隱情肌稻,我是刑警寧澤,帶...
    沈念sama閱讀 35,731評論 5 346
  • 正文 年R本政府宣布匕荸,位于F島的核電站爹谭,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏每聪。R本人自食惡果不足惜旦棉,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,348評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望药薯。 院中可真熱鬧绑洛,春花似錦、人聲如沸童本。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,929評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽穷娱。三九已至绑蔫,卻和暖如春运沦,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背配深。 一陣腳步聲響...
    開封第一講書人閱讀 33,048評論 1 270
  • 我被黑心中介騙來泰國打工携添, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人篓叶。 一個(gè)月前我還...
    沈念sama閱讀 48,203評論 3 370
  • 正文 我出身青樓烈掠,卻偏偏與公主長得像,于是被迫代替她去往敵國和親缸托。 傳聞我的和親對象是個(gè)殘疾皇子左敌,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,960評論 2 355

推薦閱讀更多精彩內(nèi)容