kafkaAPI實戰(zhàn)

Kafka生產者 Java API

Srping整合kafka之生成者

導入依賴:

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.2.0.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.2.1</version>
            <exclusions>
                <exclusion>
                    <artifactId>jmxri</artifactId>
                    <groupId>com.sun.jmx</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>jms</artifactId>
                    <groupId>javax.jms</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>jmxtools</artifactId>
                    <groupId>com.sun.jdmk</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>5.1.7.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>

創(chuàng)建生產者配置文件:

<!--參數(shù)配置 -->
    <bean id="producerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <!-- kafka服務地址,可能是集群 value="localhost:9092,localhost:9093,localhost:9094"-->
                <entry key="bootstrap.servers" value="192.168.25.133:9092" />
                
                <!-- 有可能導致broker接收到重復的消息-->
                <entry key="retries" value="0" />
                <!-- 每次批量發(fā)送消息的數(shù)量 -->
                <entry key="batch.size" value="1638" />
                <!-- 默認0ms嵌巷,在異步IO線程被觸發(fā)后(任何一個topic坤学,partition滿都可以觸發(fā)) -->
                <entry key="linger.ms" value="1" />
                
                <!--producer可以用來緩存數(shù)據的內存大小些举。如果數(shù)據產生速度大于向broker發(fā)送的速度,producer會阻塞或者拋出異常 -->
                <entry key="buffer.memory" value="33554432 " />
                
                <entry key="key.serializer"
                    value="org.apache.kafka.common.serialization.StringSerializer" />
                    
                <entry key="value.serializer"
                    value="org.apache.kafka.common.serialization.StringSerializer" />
            </map>
        </constructor-arg>
    </bean>

    <!-- 創(chuàng)建kafkatemplate需要使用的producerfactory bean -->
    <bean id="producerFactory"
        class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
        <constructor-arg>
            <ref bean="producerProperties" />
        </constructor-arg>
    </bean>

    <!-- 創(chuàng)建kafkatemplate bean窗价,使用的時候射沟,只需要注入這個bean,即可使用template的send消息方法 -->
    <bean id="KafkaTemplate"
        class="org.springframework.kafka.core.KafkaTemplate">
        <constructor-arg ref="producerFactory" />
        <!--設置對應topic -->
        <property name="defaultTopic" value="1704D" />
    </bean>

編寫生產者測試類:

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations="classpath:spring-producer.xml")
public class TestProducer {

    @Autowired
    KafkaTemplate kafkaTemplate;
    @Test
    public void testSend(){
        kafkaTemplate.send("1704D", "你好,我是spring發(fā)來的消息");
    }
}

11.3 Kafka消費者 Java API

Srping整合kafka之消費者

創(chuàng)建消費者配置文件:

<bean id="consumerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <!--Kafka服務地址 -->
                <entry key="bootstrap.servers" value="192.168.25.133:9092" />
                <!--Consumer的組ID太雨,相同group.id的consumer屬于同一個組。 -->
                <entry key="group.id" value="test-consumer-group" />
                <!--如果此值設置為true魁蒜,consumer會周期性的把當前消費的offset值保存到zookeeper囊扳。當consumer失敗重啟之后將會使用此值作為新開始消費的值吩翻。 -->
                <entry key="enable.auto.commit" value="true" />
                <!--網絡請求的socket超時時間。實際超時時間由max.fetch.wait + socket.timeout.ms 確定 -->
                <entry key="session.timeout.ms" value="15000 " />
                
                <entry key="key.deserializer"
                    value="org.apache.kafka.common.serialization.StringDeserializer" />
                    
                <entry key="value.deserializer"
                    value="org.apache.kafka.common.serialization.StringDeserializer" />
            </map>
        </constructor-arg>
    </bean>


    <!-- 創(chuàng)建consumerFactory bean -->
    <bean id="consumerFactory"
        class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
        <constructor-arg>
            <ref bean="consumerProperties" />
        </constructor-arg>
    </bean>
    
    
    <bean id="messageListenerContainer"
        class="org.springframework.kafka.listener.KafkaMessageListenerContainer"
        init-method="doStart">
        <constructor-arg ref="consumerFactory" />
        <constructor-arg ref="containerProperties" />
    </bean>
    
    

    <!-- 記得修改主題 -->
    <bean id="containerProperties"
        class="org.springframework.kafka.listener.ContainerProperties">
        <!-- 構造函數(shù) 就是 主題的參數(shù)值 -->
        <constructor-arg value="1704D" />
        <property name="messageListener" ref="messageListernerConsumerService" />
    </bean>


    
    <!--指定具體監(jiān)聽類的bean -->
    <bean id="messageListernerConsumerService" class="com.bawei.demo01.CmsKafkaListener" />

創(chuàng)建消費者所需監(jiān)聽器,用來監(jiān)聽相關主題Topics

public class CmsKafkaListener implements MessageListener<String,String> {


    public void onMessage(ConsumerRecord<String, String> data) {
        String value = data.value();
        System.out.println(value);
    }
}

創(chuàng)建消費者測試類:

public class TestConsumer {


    public static void main(String[] args) {
        ClassPathXmlApplicationContext ac = new ClassPathXmlApplicationContext("classpath:spring-consumer.xml");
    }
}

啟動zk:

/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties

啟動kafka:

/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties

創(chuàng)建主題:

/opt/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic 1704D

查看主題:

/opt/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181

發(fā)送一些消息:

/opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic 1704D

-->等待輸入發(fā)送的消息

啟動消費者:

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.25.133:9092 --topic 1704D --from-beginning 從第一條開始接受
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.25.133:9092 --topic 1704D從現(xiàn)在生產者發(fā)送開始接受锥咸。

分別啟動java代碼進行測試


Kafka 生產過程分析

1.png

(1)點對點模式(一對一狭瞎,消費者主動拉取數(shù)據,消息收到后消息清除)
點對點模型通常是一個基于拉取或者輪詢的消息傳送模型她君,這種模型從隊列中請求信息脚作,而不是將消息推送到客戶端葫哗。這個模型的特點是發(fā)送到隊列的消息被一個且只有一個接收者接收處理缔刹,即使有多個消息監(jiān)聽者也是如此。
(2)發(fā)布/訂閱模式(一對多劣针,數(shù)據生產后校镐,推送給所有訂閱者)
發(fā)布訂閱模型則是一個基于推送的消息傳送模型萎胰。發(fā)布訂閱模型可以有多種不同的訂閱者蒸其,臨時訂閱者只在主動監(jiān)聽主題時才接收消息时鸵,而持久訂閱者則監(jiān)聽主題的所有消息董虱,即使當前訂閱者不可用眷篇,處于離線狀態(tài)犬第。

2.png

1)Producer :消息生產者型宙,就是向kafka broker發(fā)消息的客戶端疮茄。
2)Consumer :消息消費者擎浴,向kafka broker取消息的客戶端
3)Topic :可以理解為一個隊列员咽。
4) Consumer Group (CG):這是kafka用來實現(xiàn)一個topic消息的廣播(發(fā)給所有的consumer)和單播(發(fā)給任意一個consumer)的手段。一個topic可以有多個CG贮预。topic的消息會復制(不是真的復制贝室,是概念上的)到所有的CG,但每個partion只會把消息發(fā)給該CG中的一個consumer仿吞。如果需要實現(xiàn)廣播滑频,只要每個consumer有一個獨立的CG就可以了。要實現(xiàn)單播只要所有的consumer在同一個CG唤冈。用CG還可以將consumer進行自由的分組而不需要多次發(fā)送消息到不同的topic峡迷。
5)Broker :一臺kafka服務器就是一個broker。一個集群由多個broker組成你虹。一個broker可以容納多個topic绘搞。
6)Partition:為了實現(xiàn)擴展性,一個非常大的topic可以分布到多個broker(即服務器)上售葡,一個topic可以分為多個partition看杭,每個partition是一個有序的隊列。partition中的每條消息都會被分配一個有序的id(offset)挟伙。kafka只保證按一個partition中的順序將消息發(fā)給consumer楼雹,不保證一個topic的整體(多個partition間)的順序模孩。
7)Offset:kafka的存儲文件都是按照offset.kafka來命名,用offset做名字的好處是方便查找贮缅。例如你想找位于2049的位置榨咐,只要找到2048.kafka的文件即可。當然the first offset就是00000000000.kafka

3.png

1)producer先從zookeeper的 "/brokers/.../state"節(jié)點找到該partition的leader
2)producer將消息發(fā)送給該leader
3)leader將消息寫入本地log
4)followers從leader pull消息谴供,寫入本地log后向leader發(fā)送ACK
5)leader收到所有ISR中的replication的ACK后块茁,增加HW(high watermark,最后commit 的offset)并向producer發(fā)送ACK

4.png

Broker 保存消息

Kafka文件存儲基本結構

l 在Kafka文件存儲中桂肌,同一個topic下有多個不同partition数焊,每個partition為一個目錄,partiton命名規(guī)則為topic名稱+有序序號崎场,第一個partiton序號從0開始佩耳,序號最大值為partitions數(shù)量減1。

l 每個partion(目錄)相當于一個巨型文件被平均分配到多個大小相等segment(段)數(shù)據文件中谭跨。但每個段segment file消息數(shù)量不一定相等干厚,這種特性方便old segment file快速被刪除。默認保留7天的數(shù)據螃宙。

5.jpg

l 每個partiton只需要支持順序讀寫就行了蛮瞄,segment文件生命周期由服務端配置參數(shù)決定。(什么時候創(chuàng)建谆扎,什么時候刪除)

6.png

數(shù)據有序挂捅?

? 一個partition的數(shù)據是否是有序的? 間隔性有序燕酷,不連續(xù)

? 針對一個topic里面的數(shù)據籍凝,只能做到partition內部有序,不能做到全局有序苗缩。

? 特別加入消費者的場景后饵蒂,如何保證消費者消費的數(shù)據全局有序的?偽命題酱讶。

只有一種情況下才能保證全局有序退盯?就是只有一個partition。

11.5.2 KafkaPartitionSegment

Segment file組成:由2大部分組成泻肯,分別為index file和data file渊迁,此2個文件一一對應,成對出現(xiàn)灶挟,后綴".index"和“.log”分別表示為segment索引文件琉朽、數(shù)據文件。

8.jpg

Segment文件命名規(guī)則:partion全局的第一個segment從0開始稚铣,后續(xù)每個segment文件名為上一個segment文件最后一條消息的offset值箱叁。數(shù)值最大為64位long大小墅垮,19位數(shù)字字符長度,沒有數(shù)字用0填充耕漱。

索引文件存儲大量元數(shù)據算色,數(shù)據文件存儲大量消息,索引文件中元數(shù)據指向對應數(shù)據文件中message的物理偏移地址螟够。

9.jpg

3灾梦,497:當前l(fā)og文件中的第幾條信息,存放在磁盤上的那個地方

上述圖中索引文件存儲大量元數(shù)據妓笙,數(shù)據文件存儲大量消息若河,索引文件中元數(shù)據指向對應數(shù)據文件中message的物理偏移地址。

其中以索引文件中元數(shù)據3,497為例给郊,依次在數(shù)據文件中表示第3個message(在全局partiton表示第368772個message)牡肉、以及該消息的物理偏移地址為497。

segment data file由許多message組成淆九, qq物理結構如下:

10.png

Kafka 查找message

讀取offset=368776的message,需要通過下面2個步驟查找毛俏。

11.jpg

查找segment file

00000000000000000000.index表示最開始的文件炭庙,起始偏移量(offset)為0

00000000000000368769.index的消息量起始偏移量為368770 = 368769 + 1

00000000000000737337.index的起始偏移量為737338=737337 + 1

其他后續(xù)文件依次類推。

以起始偏移量命名并排序這些文件煌寇,只要根據offset 二分查找文件列表焕蹄,就可以快速定位到具體文件。當offset=368776時定位到00000000000000368769.index和對應log文件阀溶。

通過segment file查找message

當offset=368776時腻脏,依次定位到00000000000000368769.index的元數(shù)據物理位置和00000000000000368769.log的物理偏移地址

然后再通過00000000000000368769.log順序查找直到offset=368776為止。

消費過程分析

本質上kafka只支持Topic银锻;

每個group中可以有多個consumer永品,每個consumer屬于一個consumer group;

通常情況下击纬,一個group中會包含多個consumer鼎姐,這樣不僅可以提高topic中消息的并發(fā)消費能力,而且還能提高"故障容錯"性更振,如果group中的某個consumer失效那么其消費的partitions將會有其他consumer自動接管炕桨。

對于Topic中的一條特定的消息,只會被訂閱此Topic的每個group中的其中一個consumer消費肯腕,此消息不會發(fā)送給一個group的多個consumer献宫;

那么一個group中所有的consumer將會交錯的消費整個Topic,每個group中consumer消息消費互相獨立实撒,我們可以認為一個group是一個"訂閱"者姊途。

在kafka中,一個partition中的消息只會被group中的一個consumer消費(同一時刻)帖池;

一個Topic中的每個partions,只會被一個"訂閱者"中的一個consumer消費吭净,不過一個consumer可以同時消費多個partitions中的消息睡汹。

kafka的設計原理決定,對于一個topic,同一個group中不能有多于partitions個數(shù)的consumer同時消費寂殉,否則將意味著某些consumer將無法得到消息囚巴。

kafka只能保證一個partition中的消息被某個consumer消費時是順序的;事實上友扰,從Topic角度來說,當有多個partitions時,消息仍不是全局有序的彤叉。

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
禁止轉載,如需轉載請通過簡信或評論聯(lián)系作者村怪。
  • 序言:七十年代末秽浇,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子甚负,更是在濱河造成了極大的恐慌柬焕,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,561評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件梭域,死亡現(xiàn)場離奇詭異斑举,居然都是意外死亡,警方通過查閱死者的電腦和手機病涨,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,218評論 3 385
  • 文/潘曉璐 我一進店門富玷,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人既穆,你說我怎么就攤上這事赎懦。” “怎么了幻工?”我有些...
    開封第一講書人閱讀 157,162評論 0 348
  • 文/不壞的土叔 我叫張陵励两,是天一觀的道長。 經常有香客問我会钝,道長伐蒋,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,470評論 1 283
  • 正文 為了忘掉前任迁酸,我火速辦了婚禮先鱼,結果婚禮上,老公的妹妹穿的比我還像新娘奸鬓。我一直安慰自己焙畔,他們只是感情好,可當我...
    茶點故事閱讀 65,550評論 6 385
  • 文/花漫 我一把揭開白布串远。 她就那樣靜靜地躺著宏多,像睡著了一般儿惫。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上伸但,一...
    開封第一講書人閱讀 49,806評論 1 290
  • 那天肾请,我揣著相機與錄音,去河邊找鬼更胖。 笑死铛铁,一個胖子當著我的面吹牛,可吹牛的內容都是我干的却妨。 我是一名探鬼主播饵逐,決...
    沈念sama閱讀 38,951評論 3 407
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼彪标!你這毒婦竟也來了倍权?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 37,712評論 0 266
  • 序言:老撾萬榮一對情侶失蹤捞烟,失蹤者是張志新(化名)和其女友劉穎薄声,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體坷襟,經...
    沈念sama閱讀 44,166評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡奸柬,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,510評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了婴程。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,643評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡抱婉,死狀恐怖档叔,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情蒸绩,我是刑警寧澤衙四,帶...
    沈念sama閱讀 34,306評論 4 330
  • 正文 年R本政府宣布,位于F島的核電站患亿,受9級特大地震影響传蹈,放射性物質發(fā)生泄漏。R本人自食惡果不足惜步藕,卻給世界環(huán)境...
    茶點故事閱讀 39,930評論 3 313
  • 文/蒙蒙 一惦界、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧咙冗,春花似錦沾歪、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,745評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽挫望。三九已至,卻和暖如春狂窑,著一層夾襖步出監(jiān)牢的瞬間媳板,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,983評論 1 266
  • 我被黑心中介騙來泰國打工泉哈, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留蛉幸,地道東北人。 一個月前我還...
    沈念sama閱讀 46,351評論 2 360
  • 正文 我出身青樓旨巷,卻偏偏與公主長得像巨缘,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子采呐,可洞房花燭夜當晚...
    茶點故事閱讀 43,509評論 2 348

推薦閱讀更多精彩內容