Kafka是由美國(guó)的職業(yè)網(wǎng)站LinkedIn創(chuàng)造,作為一個(gè)社交企業(yè)漆魔,LinkedIn 有非常多的IT 系統(tǒng)而且日活量非常大硝全, 由此產(chǎn)生的數(shù)據(jù)被許多系統(tǒng)產(chǎn)生及使用。剛開(kāi)始抄腔,LinkedIn使用ActiveMQ作為數(shù)據(jù)通道去處理這些數(shù)據(jù)瓢湃,但是后來(lái)發(fā)現(xiàn)經(jīng)常會(huì)出現(xiàn)阻塞及服務(wù)不可用,然后LinkedIn就開(kāi)發(fā)了kafka赫蛇。由于Kafka解決的是生產(chǎn)環(huán)境中上下游系統(tǒng)的耦合問(wèn)題绵患,所以kafka不僅是一個(gè)消息中間件,還是一個(gè)數(shù)據(jù)引擎悟耘,或者分布式實(shí)時(shí)流處理平臺(tái)落蝙。
使用場(chǎng)景
1. 消息傳遞
消息傳遞就是發(fā)送消息,可以把kafka作為一個(gè)MQ(ActiveMQ作煌、RabbitMQ...)掘殴,實(shí)現(xiàn)異步、解耦粟誓、削峰奏寨,而且吞吐量比這些消息中間件更大。
2. 網(wǎng)站活動(dòng)追蹤
可以把用戶(hù)活動(dòng)信息鹰服,比如登錄病瞳、點(diǎn)擊、瀏覽等各種行為進(jìn)行監(jiān)控悲酷、追蹤套菜,然后分析并發(fā)送到下游系統(tǒng),給用戶(hù)提供更加精確的內(nèi)容推薦设易。
3. 日志聚合
Kafka可以實(shí)現(xiàn)日志聚合逗柴,這樣就不用把日志記錄到磁盤(pán)或者數(shù)據(jù)庫(kù),實(shí)現(xiàn)分布式的日志聚合顿肺。
4. 應(yīng)用指標(biāo)監(jiān)控
還可以用來(lái)做運(yùn)維系統(tǒng)監(jiān)控戏溺。比如監(jiān)控交易系統(tǒng)的訂單信息渣蜗,用戶(hù)的年齡分布,的確分布旷祸,購(gòu)買(mǎi)偏好等耕拷。或者監(jiān)控應(yīng)用服務(wù)器的內(nèi)存托享、CPU骚烧、磁盤(pán)、網(wǎng)絡(luò)等的使用情況闰围,并進(jìn)行緊急情況提醒赃绊。
5. 數(shù)據(jù)集成+流計(jì)算
kafka 內(nèi)置的kafka Streams 可以更加方便的進(jìn)行數(shù)據(jù)流的處理,把數(shù)據(jù)導(dǎo)入的離線(xiàn)數(shù)據(jù)庫(kù)比如Hadoop辫诅、Hbased等凭戴,實(shí)現(xiàn)數(shù)據(jù)分析。所以Kafka不僅僅是一個(gè)MQ中間件炕矮,還是一個(gè)流處理平臺(tái)。在kafka中者冤,消息就是日志肤视。日志就是消息的數(shù)據(jù)文件。
簡(jiǎn)單使用
首先需要搭建一套Kafka的環(huán)境涉枫,在這就不過(guò)多介紹了邢滑,網(wǎng)上一大堆教程。但是要說(shuō)明一點(diǎn)愿汰,kafka需要ZK的服務(wù)困后,Zookeeper做了什么?因?yàn)閆ookeeper的有序節(jié)點(diǎn)衬廷、臨時(shí)節(jié)點(diǎn)和監(jiān)聽(tīng)機(jī)制摇予,所以ZK幫kafka做了這些事情:配置中心(管理Broker、Topic吗跋、Partition侧戴、Consumer的信息,包括元數(shù)據(jù)的變動(dòng))跌宛、負(fù)載均衡酗宋、集群管理和選舉、分布式鎖等疆拘。
1. Jave Basic Demo蜕猫。
導(dǎo)入Kafka的Maven依賴(lài):
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency>
創(chuàng)建一個(gè)Counsumer類(lèi),先初始化Consumer需要的配置哎迄,然后創(chuàng)建一個(gè)KafkaConsumer對(duì)象并訂閱MyTopic
主題, 然后隔一段時(shí)間從Kafka中拉取消息回右。
public class Consumer {
public static void main(String[] args) {
Properties props = new Properties();
// 綁定KafkaServer
props.put("bootstrap.servers","192.168.1.91:9092");
// 綁定一個(gè)Consumer Group
props.put("group.id","yq-test-group");
// 消費(fèi)Messgae之后自動(dòng)提交到Kafka
props.put("enable.auto.commit","true");
// 消費(fèi)Messgae之后自動(dòng)提交的時(shí)間間隔
props.put("auto.commit.interval.ms","1000");
// 新來(lái)的消費(fèi)者從哪里開(kāi)始消費(fèi)隆圆,值有:earlist / lastest / none
props.put("auto.offset.reset","earliest");
// key 和 value的序列化工具
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
// 創(chuàng)建一個(gè)Consumer對(duì)象
KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);
consumer.subscribe(Arrays.asList("MyTopic"));
try {
while (true){
// 從服務(wù)器隔一段時(shí)間去拉取最新消息
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String,String> record : records) {
System.out.printf("offset = %d, key = %s, value= %s, partition = %s\n",
record.offset(),record.key(),record.value(),record.partition());
}
}
}finally {
// 關(guān)閉消費(fèi)者
consumer.close();
}
}
}
然后創(chuàng)建一個(gè)Producer類(lèi),初始化producer配置楣黍,并創(chuàng)建一個(gè)KafkaProducer對(duì)象匾灶,并發(fā)送MyTopic主題的消息.
public class Producer {
public static void main(String[] args) {
Properties props = new Properties();
// 綁定服務(wù)器
props.put("bootstrap.servers","192.168.1.91:9092");
// 重發(fā)次數(shù)
props.put("ack","1");
// 批量發(fā)送的大小
props.put("batch.size",16384);
// 發(fā)送
props.put("group.id","yq-test-group");
props.put("buffer.memory",33554432); props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i <= 100; i++) {
producer.send(new ProducerRecord<String, String>("MyTopic",Integer.toString(i),Integer.toString(i)));
}
producer.close();
}
}
先運(yùn)行Consumer類(lèi),Consumer開(kāi)始監(jiān)聽(tīng)服務(wù)器租漂,然后運(yùn)行Producer類(lèi)阶女,然后在Consumer的Console就可以看到類(lèi)似下面的log:
offset = 592, key = 88, value= 88, partition = 0
offset = 592, key = 88, value= 88, partition = 0
offset = 593, key = 89, value= 89, partition = 0
offset = 594, key = 90, value= 90, partition = 0
offset = 595, key = 91, value= 91, partition = 0
offset = 596, key = 92, value= 92, partition = 0
offset = 597, key = 93, value= 93, partition = 0
offset = 598, key = 94, value= 94, partition = 0
offset = 599, key = 95, value= 95, partition = 0
offset = 600, key = 96, value= 96, partition = 0
offset = 601, key = 97, value= 97, partition = 0
offset = 602, key = 98, value= 98, partition = 0
offset = 603, key = 99, value= 99, partition = 0
2. SpringBoot Demo
創(chuàng)建一個(gè)SpringBoot Project,同上先導(dǎo)入Maven依賴(lài)哩治,然后再application.properties中添加如下配置
server.port=7271
spring.kafka.bootstrap-servers=192.168.1.91:9092
spring.kafka.group.id=yq-test-group
spring.kafka.enable.auto.commit=true
spring.kafka.auto.commit.interval.ms=1000
spring.kafka.auto.offset.reset=earliest
spring.kafka.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.ack=1
spring.kafka.retries=1
spring.kafka.batch.size=16384
spring.kafka.linger.ms=5
spring.kafka.buffer.memory=33554432
spring.kafka.max.block.ms=3000
spring.kafka.key.serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.value.serializer=org.apache.kafka.common.serialization.StringSerializer
Consumer類(lèi):
@Component
public class Consumer {
@KafkaListener(topics = "SpringBootTopics",groupId = "SpringBootTopic-group")
public void onMessage(String msg){
System.out.println("------ 收到消息 : " + msg);
}
}
Producer類(lèi)
@RestController
public class Producer {
@Autowired
private KafkaTemplate template;
@GetMapping("/send")
public String sent(@RequestParam("msg") String msg) {
template.send("SpringBootTopics", msg);
return "Ok";
}
}
再瀏覽器打開(kāi)如下鏈接:http://localhost:7271/send?msg=12313秃踩,在編輯器控制臺(tái)就可以看到如下log:
2021-03-02 21:24:44.378 INFO 62448 --- [nio-7271-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 66563e712b0b9f84
2021-03-02 21:24:44.378 INFO 62448 --- [nio-7271-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1614691484378
2021-03-02 21:24:44.386 INFO 62448 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: i815xO16TgeNlXHhQPfEzA
------ 收到消息 : 12313
上面就是Kafka的簡(jiǎn)單使用。
Kafka架構(gòu)
這個(gè)經(jīng)典的圖是kafka的data flow业筏,下面就來(lái)分析一下這個(gè)圖中的信息憔杨。
- 有2個(gè)生產(chǎn)者,Producer0生產(chǎn)主題為
Topic0
的消息 蒜胖,Producer1生產(chǎn)Topic0
和Topic1
兩種主題的消息消别。 - 有3個(gè)
Broker
,可以認(rèn)為是有三臺(tái)服務(wù)器台谢。 -
Topic0
有2個(gè)partition寻狂,3個(gè)replica(副本),Topic1
有一個(gè)partition朋沮,也有3個(gè)replica蛇券。紅色字體代表是該分區(qū)的leader節(jié)點(diǎn),剩下兩個(gè)代表該主題的follower節(jié)點(diǎn)樊拓。生產(chǎn)者向leader節(jié)點(diǎn)中寫(xiě)入數(shù)據(jù)纠亚,follower節(jié)點(diǎn)向leader節(jié)點(diǎn)同步最新數(shù)據(jù)。 - 有2個(gè)Consumer Group筋夏,
Consumer group0
中的Consumer 0
消費(fèi)Topic0
蒂胞;Consumer group1
中的Consumer 0
消費(fèi)Topic0
和Topic1
,Consumer 1
消費(fèi)Topic0
叁丧,Consumer 2
什么也不消費(fèi)啤誊。
1. Producer
Producer就是生產(chǎn)者,就是發(fā)送消息的一方拥娄。Kafka發(fā)消息不是一條一條發(fā)送的蚊锹,是批量發(fā)送的,這樣就能夠提高發(fā)送效率稚瘾。
由這個(gè)字段決定:
props.put("batch.size",16384);
2. Consumer
Consumer就是消費(fèi)者牡昆,就是消費(fèi)消息的一方。Kafka使用pull來(lái)獲取消息,這樣消費(fèi)者就可以控制消費(fèi)的速率丢烘,就不會(huì)出現(xiàn)消息太多消費(fèi)不了的情況柱宦。
3. Broker
Broker就是kafka的服務(wù)播瞳。生產(chǎn)者和消費(fèi)者都需要連接broker才能實(shí)現(xiàn)消息的轉(zhuǎn)發(fā)。
4. Message
客戶(hù)端之間傳輸?shù)臄?shù)據(jù)就叫做message忧侧,也叫做record。生產(chǎn)者中為ProducerRecord牌芋,消費(fèi)者中為ConsumerRecord。
消息在傳輸過(guò)程中需要進(jìn)行序列化躺屁,可以使用kafka內(nèi)置的幾種序列化工具,不滿(mǎn)足需求也可以創(chuàng)建自定義的序列化工具犀暑。
5. Topic
生產(chǎn)者和消費(fèi)者需要一個(gè)隊(duì)列才能關(guān)聯(lián)起來(lái)驯击,生產(chǎn)者要把消息發(fā)送到某個(gè)主題的隊(duì)列中,消費(fèi)者接受消息耐亏,也要接收這個(gè)隊(duì)列的消息苹熏。這個(gè)隊(duì)列在kafka中叫做Topic币喧。一個(gè)生產(chǎn)者可以生產(chǎn)多種不同topic的消息,一個(gè)消費(fèi)這個(gè)也可以消費(fèi)不同topic的消息干发。
如果要?jiǎng)h除一個(gè)topic,就要把a(bǔ)uto.create.topics.enable 設(shè)置為false枉长,默認(rèn)為true琼讽。
6. Partition
如果一個(gè)topic中消息過(guò)多钻蹬,就會(huì)產(chǎn)生下面的問(wèn)題:
并發(fā)或者負(fù)載的問(wèn)題,如果客戶(hù)端的操作都指向一個(gè)topic,在高并發(fā)情況下性能就會(huì)下降问欠。
還有不方便橫向擴(kuò)展,如果想把數(shù)據(jù)分散到不同的機(jī)器上做集群旗国,而不是升級(jí)硬件,這個(gè)topic就無(wú)法在物理上拆分到各個(gè)機(jī)器上能曾。
為了解決這些問(wèn)題kafka就提出了Partition的概念,其實(shí)就是把Topic分割為不同的分區(qū)(分片思想借浊,類(lèi)似數(shù)據(jù)庫(kù)中的分庫(kù)分表)蚂斤。
舉個(gè)例子, Topic有3個(gè)Partition,發(fā)送了9條消息曙蒸,第一個(gè)分區(qū)存儲(chǔ)1纽窟、4、7森枪,第二個(gè)分區(qū)存儲(chǔ)2审孽、5、8式散,第三個(gè)分區(qū)存儲(chǔ)3打颤、6编饺、9。這樣就實(shí)現(xiàn)了負(fù)載均衡那伐。
在服務(wù)器中的/tmp/kafka-logs/下面就可以看到每個(gè)Partition的數(shù)據(jù),而且kafka的數(shù)據(jù)文件是順序追加寫(xiě)入的畅形,這也是為啥kafka吞吐量很大的一個(gè)原因诉探。
7. Replica機(jī)制
如果Partition的數(shù)據(jù)只有一份,如果這個(gè)broker 掛掉了竖席,就會(huì)出現(xiàn)服務(wù)不可用毕荐,甚至數(shù)據(jù)丟失不可恢復(fù)艳馒。這個(gè)副本機(jī)制就是為了提高kafka的容錯(cuò)率而設(shè)計(jì)的。
8. Segment
kafka的消息是放在.log文件中第美,如果一個(gè)Partition只有一個(gè)log文件陆爽,如果寫(xiě)入很多數(shù)據(jù)后慌闭,檢索效率也會(huì)變得很差,因此這時(shí)候kafka就會(huì)重新拆分Partition文件兔港,切分處理后的每一個(gè).log文件就是一個(gè)segment仔拟。segment的大小可以由log.segment.bytes
控制利花。
比如這個(gè)圖中就有2個(gè)segment, 每一個(gè)segment都有2個(gè)索引文件和一個(gè)數(shù)據(jù)文件载佳。
xxx.index:記錄的Consumer當(dāng)前讀取消息的偏移量
xxx.log: 數(shù)據(jù)文件
xxx.timeindex : 記錄的是數(shù)據(jù)創(chuàng)建或者寫(xiě)入的時(shí)間
9. Consumer group
Consumer group是為了確定這個(gè)組內(nèi)的消費(fèi)者是不是消費(fèi)同一個(gè)topci然后引入的蔫慧。消費(fèi)同一topic的消費(fèi)者不一定在同一個(gè)組。只有g(shù)roup id相同才是同一個(gè)消費(fèi)者組睡扬。
注意:同一group的消費(fèi)者不能同時(shí)消費(fèi)相同的Partition,這樣會(huì)造成重復(fù)消費(fèi)的問(wèn)題屎开,這也是為什么上面那個(gè)data flow圖中 Consumer 3 沒(méi)有消費(fèi)任何topic的原因马靠,因?yàn)橹挥?個(gè)Partition甩鳄,但是有3個(gè)Consumer,而最多能讓2個(gè)消費(fèi)者去消費(fèi)妙啃。
10. Consumer offset
因?yàn)閗afka是順序?qū)懭氲模绻鸆onsumer掛掉了茁瘦,后來(lái)起來(lái)后我么如何知道它上一次消費(fèi)到哪里了呢? kafka對(duì)消息者進(jìn)行了編號(hào)储笑,而且存儲(chǔ)了每個(gè)消費(fèi)者消費(fèi)每個(gè)partition的offset, 這個(gè)offset就存儲(chǔ)在/tmp/kafka-logs/下的類(lèi)似這樣__consumer_offsets-46的folder中突倍。總共有50個(gè)焊虏,會(huì)根據(jù)cousumer id的hash code 與50取模秕磷,然后放到相應(yīng)的Consumer 索引文件中,這樣即使服務(wù)掛掉疏尿,只要這個(gè)文件不丟失易桃,依然可以確定上次消費(fèi)到那個(gè)offset晤郑。
進(jìn)階功能
1. 消息冪等性
如果生產(chǎn)者無(wú)法確定消費(fèi)者有沒(méi)有收到或者消費(fèi)消息贸宏,如果要重發(fā)消息吭练,這就可能造成消息重復(fù)消費(fèi)的情況褐鸥。消息重復(fù)消費(fèi)要在消費(fèi)者端去處理,kafka干脆在broker端實(shí)現(xiàn)了消息的重復(fù)問(wèn)題處理浑侥,這樣就大大減輕了消費(fèi)者的工作寓落。
在Producer端設(shè)置enable.idempotence 為true后荞下,這個(gè)producer就是冪等性的producer,kafka會(huì)自動(dòng)去重仰税。主要是通過(guò)這兩個(gè)值去確定唯一消息抽诉。
1. PID(Producer ID, 冪等性的每個(gè)客戶(hù)端都有一個(gè)唯一的ID
2. Sequence Number, 冪等性的每個(gè)生產(chǎn)者發(fā)送的消息都有一個(gè)唯一的Seq number
Broker 就是根據(jù)上面這兩個(gè)值來(lái)確保數(shù)據(jù)是否重復(fù)迹淌。但是這也只能保證在單partition上冪等性或者單一回話(huà)上的冪等性,即重啟producer后的冪等性就不能保證了耙饰。如果想要保證全局?jǐn)?shù)據(jù)的冪等性纹份,就要用到kafka的事物機(jī)制了蔓涧。
2. 生產(chǎn)者事務(wù)處理
通過(guò)Kafka的事物機(jī)制就可以保證全局消息的冪等性。在下面這幾種情況就需要使用事務(wù)來(lái)處理了。
- 只有1個(gè)broker, 1個(gè)topic 1個(gè)副本昨寞,發(fā)送一組消息,要么全部失敗歼狼,要么全部成功。
- 如果發(fā)送到多個(gè)partition或者多個(gè)topic园蝠,它們可能分布在不同的服務(wù)器上,需要它們要么全部失敗鳞贷,要么全部成功搀愧。
- 消費(fèi)者后生產(chǎn)者在同一塊代碼中(Consume-Process-Produce),即先消費(fèi)然后再作為生產(chǎn)者發(fā)送給下游系統(tǒng)搓幌,就需要保證接受消息和發(fā)送消息全部成功迅箩。
使用方式:
// 開(kāi)啟事務(wù)一定要設(shè)置transaction.id
props.put("transactional.id", UUID.randomUUID().toString());
Producer<String,String> producer = new KafkaProducer<String,String>(props);
// 初始化事務(wù)
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(new ProducerRecord<String,String>("transaction-test","1","1"));
producer.send(new ProducerRecord<String,String>("transaction-test","2","2"));
// Integer i = 1/0;
producer.send(new ProducerRecord<String,String>("transaction-test","3","3"));
// 提交事務(wù)
producer.commitTransaction();
} catch (KafkaException e) {
// 中止事務(wù)
producer.abortTransaction();
}
因?yàn)橄⒖赡芸绶?wù)器沙热,所以這里的事務(wù)就是分布式事務(wù)篙贸。采用的是2PC(2階段提交)。大家都可以commit才commit敷鸦,否則就abort寝贡。kafka會(huì)場(chǎng)景一個(gè)Coordinator圃泡,并把事務(wù)的日志用topic_transaction_state記錄下來(lái),就跟消費(fèi)的offset一樣价说。這樣生產(chǎn)者掛了之后下次重啟后還能根據(jù)這個(gè)offset繼續(xù)上次的事務(wù)。
總結(jié)
kafka作為一個(gè)流處理平臺(tái)扮叨,有許多優(yōu)點(diǎn)彻磁。
- 高吞吐狸捅、低延遲:kakfa的優(yōu)點(diǎn)就是收發(fā)消息特別快,每秒可以處理幾十萬(wàn)條消息恍箭。
- 高伸縮性:通過(guò)分區(qū)partition的機(jī)制扯夭,不同的分區(qū)可以處在不同的broker中鞍匾。然后通過(guò)ZK去管理橡淑,實(shí)現(xiàn)負(fù)載。
- 持久性置森、可靠性:通過(guò)把消息持久化在磁盤(pán)中符糊,并在不同broker中的replica機(jī)制男娄,保證數(shù)據(jù)可靠性。
- 容錯(cuò)性:集群中的某個(gè)節(jié)點(diǎn)掛掉建瘫,整體服務(wù)也可用尸折。
- 高并發(fā):同時(shí)支持?jǐn)?shù)千個(gè)客戶(hù)端同時(shí)讀寫(xiě)实夹。