1比然、單節(jié)點(diǎn) 單 broker
1.1丈氓、ZK 的安裝
1、首先下載 ZK强法,解壓到/app万俗,配置下環(huán)境變量
2、進(jìn)入$ZOOKEEPER_HOME/conf
拟烫,配置一份 zk.conf
(從zoo_sample.cfg
拷貝)
3该编、修改 data 的路徑為dataDir=/app/zookeeper-3.4.12/data
,諸葛目錄要手動(dòng)構(gòu)建
4、啟動(dòng)服務(wù) zkServer.sh start
1.2硕淑、KafKa安裝
1课竣、下載 kafka_2.11-2.0.0,注意對(duì)應(yīng)的 Scala置媳,
2于樟、配置環(huán)境變量
1.3、配置server.properties
broker.id=0 集群的時(shí)候用拇囊,每個(gè) cluster 該 id 不同
listeners=PLAINTEXT://localhost:9092 默認(rèn)端口9092
host.name=localhost 當(dāng)前機(jī)器
log.dirs=/app/kafka_2.11-2.0.0/kafaka-logs kafaka 日志
zookeeper.connect=localhost:2181 ZK 地址
1.4迂曲、啟動(dòng)
kafka-server-start.sh $KAFKA_HOME/config/server.properties
1.5、創(chuàng)建 Topic
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hello_topic
其中:
--zookeeper 指定 zk 地址
--replication-factor 指定副本數(shù)
--partitions 指定分區(qū)數(shù)
--topic 指定名稱
1.6寥袭、查看所有 Topic
kafka-topics.sh --list --zookeeper localhost:2181
查看狀態(tài)
kafka-topics.sh --describe --zookeeper localhost:2181 --topic hello_topic
1.7路捧、產(chǎn)生消息
kafka-console-producer.sh --broker-list localhost:9092 --topic hello_topic
生成消息是送入 Topic 里面,這里需要指定--broker-list
传黄,進(jìn)入阻塞模式
1.8杰扫、消費(fèi)消息
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello_topic --from-beginning
--from-beginning 表示從第一條消息開始
然后進(jìn)入阻塞狀態(tài)
1.9、調(diào)試
在生產(chǎn)的狀態(tài)下膘掰,發(fā)送消息章姓,然后在消費(fèi)的狀態(tài)下,可以看到消息正常消費(fèi)
2识埋、單節(jié)點(diǎn)多 broker
2.1 啟動(dòng) ZK
同上
2.2 配置多份 server.properties
cp $KAFKA_HOME/config/server.properties $KAFKA_HOME/config/server-1.properties
cp $KAFKA_HOME/config/server.properties $KAFKA_HOME/config/server-2.properties
修改其中的
#config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://localhost:9093
log.dirs=/tmp/kafka-logs-1
#config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://localhost:9094
log.dirs=/tmp/kafka-logs-2
2.3 后臺(tái)運(yùn)行
kafka-server-start.sh $KAFKA_HOME/config/server.properties &
kafka-server-start.sh $KAFKA_HOME/config/server-1.properties &
kafka-server-start.sh $KAFKA_HOME/config/server-2.properties &
2.4 創(chuàng)建 Topic
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
2.5 查看這個(gè) Topic
kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
leader 表示標(biāo)號(hào)是2的 broker 是主
replicas 表示副本是3個(gè)
Isr 表示活著的有broker
2.6 發(fā)送消息和消費(fèi)消息
kafka-console-producer.sh --broker-list PLAINTEXT://localhost:9092,PLAINTEXT://localhost:9093,PLAINTEXT://localhost:9094 --topic my-replicated-topic
kafka-console-consumer.sh --bootstrap-server PLAINTEXT://localhost:9092,PLAINTEXT://localhost:9093,PLAINTEXT://localhost:9094 --from-beginning --topic my-replicated-topic
2.7 分別停掉其中的 broker凡伊,進(jìn)行測(cè)試
3、使用 API
引入 pom
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
生產(chǎn)者
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
public class MyProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.31.122:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//生產(chǎn)者發(fā)送消息
String topic = "my-replicated-topic";
Producer<String, String> procuder = new KafkaProducer<String,String>(props);
for (int i = 1; i <= 10; i++) {
String value = "value_" + i;
ProducerRecord<String, String> msg = new ProducerRecord<String, String>(topic, value);
procuder.send(msg);
}
//列出topic的相關(guān)信息
List<PartitionInfo> partitions = new ArrayList<PartitionInfo>() ;
partitions = procuder.partitionsFor(topic);
for(PartitionInfo p:partitions)
{
System.out.println(p);
}
System.out.println("send message over.");
procuder.close(100,TimeUnit.MILLISECONDS);
}
}
消費(fèi)者
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class MyConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.31.122:9092");
props.put("group.id", "test");//消費(fèi)者的組id
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
//訂閱主題列表topic
consumer.subscribe(Arrays.asList("my-replicated-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()+"\n");
}
}
}