kafka介紹:
Kafka最初由LinkedIn公司開發(fā),使用Scala語(yǔ)言編寫镀赌,之后成為Apache項(xiàng)目的一部分氯哮。Kafka是一個(gè)分布式,可劃分的商佛,多訂閱者喉钢,冗余備份的持久性的日志服務(wù)。它主要用于處理活躍的流式數(shù)據(jù)良姆。在大系統(tǒng)中肠虽,我們經(jīng)常會(huì)碰到這樣的一個(gè)問題,大系統(tǒng)下的各個(gè)子系統(tǒng)需要數(shù)據(jù)高性能玛追、低延遲的不停流轉(zhuǎn)税课。kafka很適合處理這樣的問題!
消息隊(duì)列的分類:
點(diǎn)對(duì)點(diǎn):消息生產(chǎn)者生產(chǎn)消息發(fā)送到queue中痊剖,然后消息消費(fèi)者從queue中取出并且消費(fèi)消息韩玩,消息被消費(fèi)以后,queue中不再有儲(chǔ)存陆馁,所以消息消費(fèi)者不可能消費(fèi)到已經(jīng)被消費(fèi)的消息找颓。,即對(duì)消息而言氮惯,只會(huì)有一個(gè)消費(fèi)者叮雳。
發(fā)布/訂閱:消息生產(chǎn)者將消息發(fā)不到topic中想暗,同時(shí)可以有多個(gè)消息消費(fèi)者消費(fèi)該消息。和點(diǎn)對(duì)點(diǎn)方式不同帘不,發(fā)不到topic的消息會(huì)被所有訂閱者消費(fèi),kafka 就是典型發(fā)布储狭。
kafka的特點(diǎn):
1捣郊、同時(shí)為發(fā)布和訂閱提供高吞吐量刮萌,kafka每秒可以產(chǎn)生約25萬的消息(50MB),每秒能夠處理55萬消息(110MB)涮阔。
2敬特、可進(jìn)行持久化操作,將消息持久化到磁盤减俏。
3、分布式系統(tǒng)怕篷,易于向外擴(kuò)展。所有的producer蒸痹、broker和consumer都會(huì)有多個(gè)匿沛。均為分布式的逃呼。
4、 消息被處理的狀態(tài)是在consumer端維護(hù)推姻,而不是在server端維護(hù)。當(dāng)失敗是能自動(dòng)平衡校翔。
5防症、支持online和offline的場(chǎng)景。
下面介紹一下Kafka的架構(gòu)和組成:
Producer:是能夠發(fā)布消息到topic的任何對(duì)象奈嘿。
Consumer:消息和數(shù)據(jù)的消費(fèi)者,訂閱topics 并處理其發(fā)布的消息叶圃。
Consumer Group:可以并行消費(fèi)Topic中的partition的消息。
Broker:緩存代理德崭,Kafka集群中的一個(gè)kafka節(jié)點(diǎn)就是一個(gè)broker眉厨。
Topic: 特指Kafka 處理的消息源(feeds of messages)的不同分類缨叫。
Partition:topic物理上的分組耻姥,一個(gè) topic 可以分為多個(gè) partition,每個(gè) partition 是一個(gè)有序的隊(duì)列婉商。partition中的每條消息都會(huì)被分配一個(gè)有序的 id(offset)。
Message:消息蘑秽,是通信的基本單位,每個(gè) producer 可以向一個(gè)topic(主題)發(fā)布一些消息缀雳。
kafka的安裝和使用
kafka下載和相關(guān)文檔地址:
修改zookeper配置文件:
dataDir=D:\tmp\kafka\zookeeper
clientPort=2181
maxClientCnxns=0
修改server配置文件:
broker.id=0
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=D:\tmp\kafka\kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
啟動(dòng)服務(wù):
先啟動(dòng)zookeper服務(wù):
./bin/zookeeper-server-start.sh ./config/zookeeper.properties
再啟動(dòng)server服務(wù):
./bin/kafka-server-start.sh ./config/server.properties
pom依賴:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.1</version>
</dependency>
Config.java文件代碼:
package com.lqq.demo1;
public class Config {
public final static String TOPIC = "TEST-TOPIC";
public final static String bootstrap_servers = "localhost:9092";
public final static String group_id = "jd-group";
public final static String key_serializer="org.apache.kafka.common.serialization.StringSerializer";
public final static String value_serializer="org.apache.kafka.common.serialization.StringSerializer";
public final static String key_deserializer="org.apache.kafka.common.serialization.StringDeserializer";
public final static String value_deserializer="org.apache.kafka.common.serialization.StringDeserializer";
}
生產(chǎn)者端:
package com.lqq.demo1;
public class KProducer {
private final Producer<String, String> producer;
public KProducer() {
Properties props = new Properties();
props.put("bootstrap.servers", Config.bootstrap_servers);
props.put("acks", "all");
props.put("retries", 1);
props.put("batch.size", 16384);
props.put("key.serializer", Config.key_deserializer);
props.put("value.serializer", Config.value_deserializer);
producer = new KafkaProducer<>(props);
}
public void produce() {
for(int i=0;i<1000;i++){
String key = String.valueOf(i);
String data = "hello kafka message " + i;
ProducerRecord<String, String> record=new ProducerRecord<String, String>(Config.TOPIC, key, data);
System.out.println("Produce record key: "+key+" value: "+data);
producer.send(record);
}
}
public void close(){
producer.close();
}
public static void main(String[] args) {
KProducer producer=new KProducer();
producer.produce();
producer.close();
}
}
指定broker和序列化類型挤牛,然后向broker發(fā)送消息竞膳。
消費(fèi)者端:
package com.lqq.demo1;
public class KConsumer {
private final Consumer<String, String> consumer;
public KConsumer() {
Properties props = new Properties();
props.put("bootstrap.servers", Config.bootstrap_servers);
props.put("group.id", Config.group_id);
props.put("enable.auto.commit", "true");
props.put("client.id", "25424tg2");
props.put("heartbeat.interval.ms","1000");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", Config.key_deserializer);
props.put("value.deserializer", Config.value_deserializer);
consumer = new KafkaConsumer<String, String>(props);
}
public void consume() {
consumer.subscribe(Arrays.asList(Config.TOPIC));
consumer.seekToBeginning(new ArrayList<TopicPartition>());
while(true){
ConsumerRecords<String,String> records=consumer.poll(1000);
for(ConsumerRecord<String,String> record:records){
System.out.println("Consumer record offset="+record.offset()+" key="+record.key()+" value="+record.value());
}
}
}
public void close(){
consumer.close();
}
public static void main(String[] args) {
KConsumer kConsumer=new KConsumer();
kConsumer.consume();
kConsumer.close();
}
}