Kafka生產者 Java API
Srping整合kafka之生成者
導入依賴:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.1</version>
<exclusions>
<exclusion>
<artifactId>jmxri</artifactId>
<groupId>com.sun.jmx</groupId>
</exclusion>
<exclusion>
<artifactId>jms</artifactId>
<groupId>javax.jms</groupId>
</exclusion>
<exclusion>
<artifactId>jmxtools</artifactId>
<groupId>com.sun.jdmk</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
創(chuàng)建生產者配置文件:
<!--參數(shù)配置 -->
<bean id="producerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<!-- kafka服務地址,可能是集群 value="localhost:9092,localhost:9093,localhost:9094"-->
<entry key="bootstrap.servers" value="192.168.25.133:9092" />
<!-- 有可能導致broker接收到重復的消息-->
<entry key="retries" value="0" />
<!-- 每次批量發(fā)送消息的數(shù)量 -->
<entry key="batch.size" value="1638" />
<!-- 默認0ms嵌巷,在異步IO線程被觸發(fā)后(任何一個topic坤学,partition滿都可以觸發(fā)) -->
<entry key="linger.ms" value="1" />
<!--producer可以用來緩存數(shù)據的內存大小些举。如果數(shù)據產生速度大于向broker發(fā)送的速度,producer會阻塞或者拋出異常 -->
<entry key="buffer.memory" value="33554432 " />
<entry key="key.serializer"
value="org.apache.kafka.common.serialization.StringSerializer" />
<entry key="value.serializer"
value="org.apache.kafka.common.serialization.StringSerializer" />
</map>
</constructor-arg>
</bean>
<!-- 創(chuàng)建kafkatemplate需要使用的producerfactory bean -->
<bean id="producerFactory"
class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<ref bean="producerProperties" />
</constructor-arg>
</bean>
<!-- 創(chuàng)建kafkatemplate bean窗价,使用的時候射沟,只需要注入這個bean,即可使用template的send消息方法 -->
<bean id="KafkaTemplate"
class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg ref="producerFactory" />
<!--設置對應topic -->
<property name="defaultTopic" value="1704D" />
</bean>
編寫生產者測試類:
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations="classpath:spring-producer.xml")
public class TestProducer {
@Autowired
KafkaTemplate kafkaTemplate;
@Test
public void testSend(){
kafkaTemplate.send("1704D", "你好,我是spring發(fā)來的消息");
}
}
11.3 Kafka消費者 Java API
Srping整合kafka之消費者
創(chuàng)建消費者配置文件:
<bean id="consumerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<!--Kafka服務地址 -->
<entry key="bootstrap.servers" value="192.168.25.133:9092" />
<!--Consumer的組ID太雨,相同group.id的consumer屬于同一個組。 -->
<entry key="group.id" value="test-consumer-group" />
<!--如果此值設置為true魁蒜,consumer會周期性的把當前消費的offset值保存到zookeeper囊扳。當consumer失敗重啟之后將會使用此值作為新開始消費的值吩翻。 -->
<entry key="enable.auto.commit" value="true" />
<!--網絡請求的socket超時時間。實際超時時間由max.fetch.wait + socket.timeout.ms 確定 -->
<entry key="session.timeout.ms" value="15000 " />
<entry key="key.deserializer"
value="org.apache.kafka.common.serialization.StringDeserializer" />
<entry key="value.deserializer"
value="org.apache.kafka.common.serialization.StringDeserializer" />
</map>
</constructor-arg>
</bean>
<!-- 創(chuàng)建consumerFactory bean -->
<bean id="consumerFactory"
class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<ref bean="consumerProperties" />
</constructor-arg>
</bean>
<bean id="messageListenerContainer"
class="org.springframework.kafka.listener.KafkaMessageListenerContainer"
init-method="doStart">
<constructor-arg ref="consumerFactory" />
<constructor-arg ref="containerProperties" />
</bean>
<!-- 記得修改主題 -->
<bean id="containerProperties"
class="org.springframework.kafka.listener.ContainerProperties">
<!-- 構造函數(shù) 就是 主題的參數(shù)值 -->
<constructor-arg value="1704D" />
<property name="messageListener" ref="messageListernerConsumerService" />
</bean>
<!--指定具體監(jiān)聽類的bean -->
<bean id="messageListernerConsumerService" class="com.bawei.demo01.CmsKafkaListener" />
創(chuàng)建消費者所需監(jiān)聽器,用來監(jiān)聽相關主題Topics
public class CmsKafkaListener implements MessageListener<String,String> {
public void onMessage(ConsumerRecord<String, String> data) {
String value = data.value();
System.out.println(value);
}
}
創(chuàng)建消費者測試類:
public class TestConsumer {
public static void main(String[] args) {
ClassPathXmlApplicationContext ac = new ClassPathXmlApplicationContext("classpath:spring-consumer.xml");
}
}
啟動zk:
/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties
啟動kafka:
/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
創(chuàng)建主題:
/opt/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic 1704D
查看主題:
/opt/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181
發(fā)送一些消息:
/opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic 1704D
-->等待輸入發(fā)送的消息
啟動消費者:
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.25.133:9092 --topic 1704D --from-beginning 從第一條開始接受
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.25.133:9092 --topic 1704D從現(xiàn)在生產者發(fā)送開始接受锥咸。分別啟動java代碼進行測試
Kafka 生產過程分析
(1)點對點模式(一對一狭瞎,消費者主動拉取數(shù)據,消息收到后消息清除)
點對點模型通常是一個基于拉取或者輪詢的消息傳送模型她君,這種模型從隊列中請求信息脚作,而不是將消息推送到客戶端葫哗。這個模型的特點是發(fā)送到隊列的消息被一個且只有一個接收者接收處理缔刹,即使有多個消息監(jiān)聽者也是如此。
(2)發(fā)布/訂閱模式(一對多劣针,數(shù)據生產后校镐,推送給所有訂閱者)
發(fā)布訂閱模型則是一個基于推送的消息傳送模型萎胰。發(fā)布訂閱模型可以有多種不同的訂閱者蒸其,臨時訂閱者只在主動監(jiān)聽主題時才接收消息时鸵,而持久訂閱者則監(jiān)聽主題的所有消息董虱,即使當前訂閱者不可用眷篇,處于離線狀態(tài)犬第。
1)Producer :消息生產者型宙,就是向kafka broker發(fā)消息的客戶端疮茄。
2)Consumer :消息消費者擎浴,向kafka broker取消息的客戶端
3)Topic :可以理解為一個隊列员咽。
4) Consumer Group (CG):這是kafka用來實現(xiàn)一個topic消息的廣播(發(fā)給所有的consumer)和單播(發(fā)給任意一個consumer)的手段。一個topic可以有多個CG贮预。topic的消息會復制(不是真的復制贝室,是概念上的)到所有的CG,但每個partion只會把消息發(fā)給該CG中的一個consumer仿吞。如果需要實現(xiàn)廣播滑频,只要每個consumer有一個獨立的CG就可以了。要實現(xiàn)單播只要所有的consumer在同一個CG唤冈。用CG還可以將consumer進行自由的分組而不需要多次發(fā)送消息到不同的topic峡迷。
5)Broker :一臺kafka服務器就是一個broker。一個集群由多個broker組成你虹。一個broker可以容納多個topic绘搞。
6)Partition:為了實現(xiàn)擴展性,一個非常大的topic可以分布到多個broker(即服務器)上售葡,一個topic可以分為多個partition看杭,每個partition是一個有序的隊列。partition中的每條消息都會被分配一個有序的id(offset)挟伙。kafka只保證按一個partition中的順序將消息發(fā)給consumer楼雹,不保證一個topic的整體(多個partition間)的順序模孩。
7)Offset:kafka的存儲文件都是按照offset.kafka來命名,用offset做名字的好處是方便查找贮缅。例如你想找位于2049的位置榨咐,只要找到2048.kafka的文件即可。當然the first offset就是00000000000.kafka
1)producer先從zookeeper的 "/brokers/.../state"節(jié)點找到該partition的leader
2)producer將消息發(fā)送給該leader
3)leader將消息寫入本地log
4)followers從leader pull消息谴供,寫入本地log后向leader發(fā)送ACK
5)leader收到所有ISR中的replication的ACK后块茁,增加HW(high watermark,最后commit 的offset)并向producer發(fā)送ACK
Broker 保存消息
Kafka文件存儲基本結構
l 在Kafka文件存儲中桂肌,同一個topic下有多個不同partition数焊,每個partition為一個目錄,partiton命名規(guī)則為topic名稱+有序序號崎场,第一個partiton序號從0開始佩耳,序號最大值為partitions數(shù)量減1。
l 每個partion(目錄)相當于一個巨型文件被平均分配到多個大小相等segment(段)數(shù)據文件中谭跨。但每個段segment file消息數(shù)量不一定相等干厚,這種特性方便old segment file快速被刪除。默認保留7天的數(shù)據螃宙。
l 每個partiton只需要支持順序讀寫就行了蛮瞄,segment文件生命周期由服務端配置參數(shù)決定。(什么時候創(chuàng)建谆扎,什么時候刪除)
數(shù)據有序挂捅?
? 一個partition的數(shù)據是否是有序的? 間隔性有序燕酷,不連續(xù)
? 針對一個topic里面的數(shù)據籍凝,只能做到partition內部有序,不能做到全局有序苗缩。
? 特別加入消費者的場景后饵蒂,如何保證消費者消費的數(shù)據全局有序的?偽命題酱讶。
只有一種情況下才能保證全局有序退盯?就是只有一個partition。
11.5.2 KafkaPartitionSegment
Segment file組成:由2大部分組成泻肯,分別為index file和data file渊迁,此2個文件一一對應,成對出現(xiàn)灶挟,后綴".index"和“.log”分別表示為segment索引文件琉朽、數(shù)據文件。
Segment文件命名規(guī)則:partion全局的第一個segment從0開始稚铣,后續(xù)每個segment文件名為上一個segment文件最后一條消息的offset值箱叁。數(shù)值最大為64位long大小墅垮,19位數(shù)字字符長度,沒有數(shù)字用0填充耕漱。
索引文件存儲大量元數(shù)據算色,數(shù)據文件存儲大量消息,索引文件中元數(shù)據指向對應數(shù)據文件中message的物理偏移地址螟够。
3灾梦,497:當前l(fā)og文件中的第幾條信息,存放在磁盤上的那個地方
上述圖中索引文件存儲大量元數(shù)據妓笙,數(shù)據文件存儲大量消息若河,索引文件中元數(shù)據指向對應數(shù)據文件中message的物理偏移地址。
其中以索引文件中元數(shù)據3,497為例给郊,依次在數(shù)據文件中表示第3個message(在全局partiton表示第368772個message)牡肉、以及該消息的物理偏移地址為497。
segment data file由許多message組成淆九, qq物理結構如下:
Kafka 查找message
讀取offset=368776的message,需要通過下面2個步驟查找毛俏。
查找segment file
00000000000000000000.index表示最開始的文件炭庙,起始偏移量(offset)為0
00000000000000368769.index的消息量起始偏移量為368770 = 368769 + 1
00000000000000737337.index的起始偏移量為737338=737337 + 1
其他后續(xù)文件依次類推。
以起始偏移量命名并排序這些文件煌寇,只要根據offset 二分查找文件列表焕蹄,就可以快速定位到具體文件。當offset=368776時定位到00000000000000368769.index和對應log文件阀溶。
通過segment file查找message
當offset=368776時腻脏,依次定位到00000000000000368769.index的元數(shù)據物理位置和00000000000000368769.log的物理偏移地址
然后再通過00000000000000368769.log順序查找直到offset=368776為止。
消費過程分析
本質上kafka只支持Topic银锻;
每個group中可以有多個consumer永品,每個consumer屬于一個consumer group;
通常情況下击纬,一個group中會包含多個consumer鼎姐,這樣不僅可以提高topic中消息的并發(fā)消費能力,而且還能提高"故障容錯"性更振,如果group中的某個consumer失效那么其消費的partitions將會有其他consumer自動接管炕桨。
對于Topic中的一條特定的消息,只會被訂閱此Topic的每個group中的其中一個consumer消費肯腕,此消息不會發(fā)送給一個group的多個consumer献宫;
那么一個group中所有的consumer將會交錯的消費整個Topic,每個group中consumer消息消費互相獨立实撒,我們可以認為一個group是一個"訂閱"者姊途。
在kafka中,一個partition中的消息只會被group中的一個consumer消費(同一時刻)帖池;
一個Topic中的每個partions,只會被一個"訂閱者"中的一個consumer消費吭净,不過一個consumer可以同時消費多個partitions中的消息睡汹。
kafka的設計原理決定,對于一個topic,同一個group中不能有多于partitions個數(shù)的consumer同時消費寂殉,否則將意味著某些consumer將無法得到消息囚巴。
kafka只能保證一個partition中的消息被某個consumer消費時是順序的;事實上友扰,從Topic角度來說,當有多個partitions時,消息仍不是全局有序的彤叉。