kafka基本概念:
- Kafka是一個分布式消息隊列。Kafka對消息保存時根據(jù)Topic進行歸類僚纷,發(fā)送消息者稱為Producer,消息接受者稱為Consumer拗盒,此外kafka集群有多個kafka實例組成怖竭,每個實例(server)稱為broker。無論是kafka集群陡蝇,還是consumer都依賴于zookeeper集群保存一些meta信息痊臭,來保證系統(tǒng)高可用性。
kafka的優(yōu)點:
- batch機制和request機制解決頻繁網(wǎng)絡通信帶來的性能低下問題;
- ACK應答機制解決消息一定能夠被消費到,就算傳輸過程中出現(xiàn)故障,只要消息到達了kafka,就會被保存到offset中,方便恢復數(shù)據(jù);
- 每個主題topic可以有多個分區(qū);kafka將分區(qū)均勻地分配到整個集群中,提高吞吐量;
- 順序讀寫:kafka是個可持久化的日志服務登夫,它將數(shù)據(jù)以數(shù)據(jù)日志的形式進行追加趣兄,最后持久化在磁盤中。利用了磁盤的順序讀寫悼嫉,來提高讀寫效率艇潭。時間復雜度為O(1)。
kafka的缺點:
- 部署集群的話,至少需要6臺服務器,3臺zookeeper(kafka的topic和consumer依賴于zookeeper);
- 復雜性:Kafka依賴Zookeeper進行元數(shù)據(jù)管理,Topic一般需要人工創(chuàng)建蹋凝,部署和維護比一般MQ成本更高;
- 消息亂序鲁纠。Kafka某一個固定的Partition內(nèi)部的消息是保證有序的,如果一個Topic有多個Partition鳍寂,partition之間的消息送達不保證有序改含。
- 監(jiān)控不完善,需要安裝插件迄汛;(rabbitmq自帶可視化監(jiān)控web界面,能夠清晰的看到各種參數(shù).)
kafka和其它消息中間件的優(yōu)缺點見鏈接: https://www.cnblogs.com/mengchunchen/p/9999774.html
kafka性能基準測試見鏈接:http://www.cnblogs.com/xiaodf/p/6023531.html
[kafka和其它消息中間件的優(yōu)缺點見鏈接:]: https://www.cnblogs.com/mengchunchen/p/9999774.html
[kafka性能基準測試見鏈接:]: http://www.cnblogs.com/xiaodf/p/6023531.html
kafka安裝及配置:
kafka安裝及配置見鏈接: https://www.cnblogs.com/RUReady/p/6479464.html
[kafka安裝及配置見鏈接]: https://www.cnblogs.com/RUReady/p/6479464.html
kafka實戰(zhàn)demo:
- 導入pom依賴:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>0.11.0.0</version>
</dependency>
-
創(chuàng)建生產(chǎn)者:
package com.byavs.kafka.produce; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; public class CustomProducer { public static void main(String[] args) { Properties props = new Properties(); // Kafka服務端的主機名和端口號 props.put("bootstrap.servers", "47.98.63.22:9092"); // 等待所有副本節(jié)點的應答 props.put("acks", "all"); // 消息發(fā)送最大嘗試次數(shù) props.put("retries", 0); // 一批消息處理大小 props.put("batch.size", 16384); // 請求延時 props.put("linger.ms", 1); // 發(fā)送緩存區(qū)內(nèi)存大小 props.put("buffer.memory", 33554432); // key序列化 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // value序列化 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 50; i++) { producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), "hello world-" + i)) } producer.close(); } }
- 創(chuàng)建消費者:
package com.byavs.kafka.consume;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class CustomNewConsumer {
public static void main(String[] args) {
Properties props = new Properties();
// 定義kakfa 服務的地址捍壤,不需要將所有broker指定上
props.put("bootstrap.servers", "47.98.63.22:9092");
// 制定consumer group
props.put("group.id", "test");
// 是否自動確認offset
props.put("enable.auto.commit", "true");
// 自動確認offset的時間間隔
props.put("auto.commit.interval.ms", "1000");
// key的序列化類
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// value的序列化類
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 定義consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 消費者訂閱的topic, 可同時訂閱多個
consumer.subscribe(Arrays.asList("first", "second","third"));
while (true) {
// 讀取數(shù)據(jù),讀取超時時間為100ms
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
spring也提供了一套對kafka操作的API,更加方便.
- 導入pom依賴
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
- application.yml配置
spring:
kafka:
bootstrap-servers: 47.98.63.22:9092
consumer:
group-id: kafka2
auto-offset-reset: latest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
? 主配置類加注解@EnableKafka
? 3. 生產(chǎn)者消費者代碼:
@Component
@EnableScheduling
public class KafkaProducer {
@Autowired
private KafkaTemplate kafkaTemplate;
/**
* 定時任務
*/
@Scheduled(cron = "* * * * * ?")
public void send(){
String message = UUID.randomUUID().toString();
// topic1為你在kafka中手動創(chuàng)建的分區(qū)
ListenableFuture future = kafkaTemplate.send("topic1", message);
future.addCallback(o -> System.out.println("send-消息發(fā)送成功:" + message), throwable -> System.out.println("消息發(fā)送失敯鞍:" + message));
}
}
/**
* kafka消費者測試
*/
@Component
public class TestConsumer {
@KafkaListener(topics = "topic1")
public void listen (ConsumerRecord<?, ?> record) {
System.out.printf("接受到消息: topic = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value());
}
}
消息隊列內(nèi)部實現(xiàn)原理:
kafka架構:
各名詞解釋:
1)Producer :消息生產(chǎn)者鹃觉,就是向kafka broker發(fā)消息的客戶端;
2)Consumer :消息消費者睹逃,向kafka broker取消息的客戶端盗扇;
3)Topic :可以理解為一個隊列;
4) Consumer Group (CG):這是kafka用來實現(xiàn)一個topic消息的廣播(發(fā)給所有的consumer)和單播(發(fā)給任意一個consumer)的手段沉填。一個topic可以有多個CG疗隶。topic的消息會復制(不是真的復制,是概念上的)到所有的CG翼闹,但每個partion只會把消息發(fā)給該CG中的一個consumer斑鼻。如果需要實現(xiàn)廣播,只要每個consumer有一個獨立的CG就可以了猎荠。要實現(xiàn)單播只要所有的consumer在同一個CG坚弱。用CG還可以將consumer進行自由的分組而不需要多次發(fā)送消息到不同的topic;
5)Broker :一臺kafka服務器就是一個broker法牲。一個集群由多個broker組成。一個broker可以容納多個topic琼掠;
6)Partition:為了實現(xiàn)擴展性拒垃,一個非常大的topic可以分布到多個broker(即服務器)上,一個topic可以分為多個partition瓷蛙,每個partition是一個有序的隊列悼瓮。partition中的每條消息都會被分配一個有序的id(offset)。kafka只保證按一個partition中的順序?qū)⑾l(fā)給consumer艰猬,不保證一個topic的整體(多個partition間)的順序横堡;
7)Offset:kafka的存儲文件都是按照offset.kafka來命名,用offset做名字的好處是方便查找冠桃。例如你想找位于2049的位置命贴,只要找到2048.kafka的文件即可。當然the first offset就是00000000000.kafka。