1.環(huán)境配置
kafka依賴zookeeper來(lái)調(diào)度束莫,以及選舉leader冬骚,因此需要先安裝zookeeper
1.1 安裝zookeeper
點(diǎn)擊下載zookeeper下載合適版本的zookeeper,當(dāng)前最新的穩(wěn)定版本是3.4.9
創(chuàng)建好數(shù)據(jù)目錄,命名為data宽堆,下一步配置用到
$ cd opt/ && tar -zxf zookeeper-3.4.6.tar.gz && cd zookeeper-3.4.6
$ mkdir data
1.2 配置zookeeper
$ vi conf/zoo.cfg
tickTime=2000
dataDir=/path/to/zookeeper/data
clientPort=2181
initLimit=5
syncLimit=2
1.3 啟動(dòng)zookeeper
$ bin/zkServer.sh start
相應(yīng)的停止zookeeper的命令為:
$ bin/zkServer.sh stop
1.4 啟動(dòng)zookeeper CLI
$ bin/zkCli.sh
1.2 安裝kafka
1.2.1 下載并解壓
點(diǎn)擊下載kafka的壓縮包
$ cd opt/
$ tar -zxf kafka_2.11-0.10.1.0.tgz
$ cd kafka_2.11-0.10.1.0
1.3.1 啟動(dòng)和關(guān)閉Kafka
啟動(dòng)kafka
$ bin/kafka-server-start.sh config/server.properties
關(guān)閉kafka
$ bin/kafka-server-stop.sh config/server.properties
2.測(cè)試單broker
我的kafka服務(wù)創(chuàng)建在Linux虛擬機(jī)上腌紧,IP地址為:192.168.61.131(按需替換成自己的IP地址),在這里需要配置server.properties
文件畜隶,將advertised.host.name設(shè)置為虛擬機(jī)的IP地址 advertised.host.name=192.168.61.131
壁肋,否則在宿主機(jī)上無(wú)法訪問(wèn)虛擬機(jī)上面的服務(wù)
2.1 使用Shell命令測(cè)試topic
2.1.1 創(chuàng)建topic
在命令行界面kafka目錄,輸入下面命令:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic page_visits
2.1.2 測(cè)試發(fā)布者
輸入以下命令籽慢,打開(kāi)發(fā)布消息CLI
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic page_visits
在CLI界面輸入浸遗,兩行測(cè)試消息
Hello kafka
你好嗎?
2.1.3 測(cè)試訂閱者
輸入一下命令打開(kāi)訂閱者CLI
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --zookeeper localhost:2181 --from-beginning --topic page_visits
如果執(zhí)行正確箱亿,會(huì)顯示剛才發(fā)布者發(fā)送的兩行消息
2.2 使用Java代碼創(chuàng)建Client來(lái)發(fā)布訂閱消息
需要先在pom中添加kafka依賴:
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.1.1</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<artifactId>jmxri</artifactId>
<groupId>com.sun.jmx</groupId>
</exclusion>
<exclusion>
<artifactId>jms</artifactId>
<groupId>javax.jms</groupId>
</exclusion>
<exclusion>
<artifactId>jmxtools</artifactId>
<groupId>com.sun.jdmk</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.0</version>
</dependency>
</dependencies>
2.2.1 創(chuàng)建發(fā)布者發(fā)布消息
下面一段代碼跛锌,會(huì)每隔3秒中發(fā)布一個(gè)測(cè)試消息
public class MyProducer {
private final static String TOPIC = "page_visits";
public static void main(String[] args) throws InterruptedException {
long events = 100;
Properties properties = new Properties();
properties.put("metadata.broker.list", "192.168.61.131:9092");
properties.put("serializer.class", "kafka.serializer.StringEncoder");
ProducerConfig config = new ProducerConfig(properties);
Producer<String, String> producer = new Producer<String, String>(config);
for (long nEvent = 0; nEvent< events; nEvent++){
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
KeyedMessage<String,String> data = new KeyedMessage<String, String>(TOPIC,String.valueOf(nEvent),"Test message from java program " + sdf.format(new Date()));
Thread.sleep(3000);
producer.send(data);
}
producer.close();
}
}
2.2.2 創(chuàng)建訂閱者訂閱消息
下面的代碼會(huì)綁定到虛擬機(jī)長(zhǎng)的kafka服務(wù),當(dāng)發(fā)布者發(fā)布消息時(shí)届惋,訂閱者會(huì)不斷地打印發(fā)布者發(fā)布的消息:
public class MyConsumer {
private final static String TOPIC = "page_visits";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers","192.168.61.131:9092");
properties.put("enable.auto.commit", "true");
properties.put("group.id", "test");
properties.put("auto.commit.interval.ms", "1000");
properties.put("session.timeout.ms", "30000");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Arrays.asList(TOPIC));
System.out.println("Subscribe to topic "+TOPIC);
while (true){
ConsumerRecords<String,String> consumerRecords = consumer.poll(100);
for(ConsumerRecord<String,String> record: consumerRecords){
System.out.printf("offset = %d,key = %s,value = %s\n",record.offset(),record.key(),record.value());
}
}
}
}