#基本操作
參考:http://kafka.apache.org/quickstart#quickstart_multibroker
kakfa安裝配置啟動(dòng)
http://kafka.apache.org/quickstart
- 啟動(dòng)zookerper
root:~/kafka$ ./bin/zookeeper-server-start.sh config/zookeeper.properties - 啟動(dòng)kafka
root:~/kafka_2.12-0.10.2.1$ ./bin/kafka-server-start.sh config/server.properties
server.properties中的cluster_id要求唯一
- 創(chuàng)建主題:
root:~/kafka_2.12-0.10.2.1$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test". - 啟動(dòng)producer
root:~/kafka_2.12-0.10.2.1$ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test - 啟動(dòng)consumer
root:~/kafka_2.12-0.10.2.1$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
在Producer終端輸入字符串,在Consumer終端將會(huì)接收到Producer端收到的信息
查看主題相關(guān)信息
root:~/kafka_2.12-0.10.2.1$ ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
查看:
root:~/kafka_2.12-0.10.2.1$ ./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
每一行代表一個(gè)分區(qū)
leader:負(fù)責(zé)指定分區(qū)的所有讀寫(xiě)操作的節(jié)點(diǎn)(cluster_id=2的節(jié)點(diǎn))
replicas:對(duì)指定分區(qū)做日志復(fù)制的節(jié)點(diǎn)列表(cluster_id =2,0,1的節(jié)點(diǎn))
isr:當(dāng)前可用的或可成為leader的復(fù)制列表的子集(cluster_id =2,0,1的節(jié)點(diǎn))
kafka設(shè)置多個(gè)集群
- 設(shè)置新集群的配置文件
cp config/server.properties config/server-1.properties
修改以下配置:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1 - 啟動(dòng)
bin/kafka-server-start.sh config/server-1.properties
數(shù)據(jù)的導(dǎo)入
- 設(shè)置連接
root:~/kafka_2.12-0.10.2.1$ bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties - 啟動(dòng)consumer監(jiān)聽(tīng)指定主題
root:~/kafka_2.12-0.10.2.1$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning - 文件輸入
root:~/kafka_2.12-0.10.2.1$ echo "line">>test.txt
connect-file-source.properties文件中設(shè)置了數(shù)據(jù)導(dǎo)入來(lái)源,connect-file-sink.properties 設(shè)置了數(shù)據(jù)保存地址
數(shù)據(jù)處理
- 新建主題
root:~/kafka_2.12-0.10.2.1$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic streams-file-input - 設(shè)置從文件中獲取producer輸入
root:~/kafka_2.12-0.10.2.1$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input < file-input.txt - 新建應(yīng)用
root:~/kafka_2.12-0.10.2.1$ vi org.apache.kafka.streams.examples.wordcount.WordCountDemo - 運(yùn)行應(yīng)用
root:~/kafka_2.12-0.10.2.1$ bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo - 消費(fèi)
root:~/kafka_2.12-0.10.2.1$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-wordcount-output --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --property print.key=true
all 1
lead 1
to 1
hello 1
streams 2
join 1
kafka 3
summit 1
流處理是對(duì)發(fā)送的消息進(jìn)行再次處理
java開(kāi)發(fā)
參考:http://kafka.apache.org/documentation
kafka源代碼中包含了java示例程序
- 添加依賴
<!--kafka-->
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.10 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.10.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.10.2.1</version>
</dependency>
<!--kafka-->
stream
- 添加Producer和Consumer,并新建Stream要處理的消息主題
- 新建KafkaStreams對(duì)象
- 在Producer端發(fā)送消息,啟動(dòng)KafkaStreams運(yùn)行
KStream的相關(guān)用法
- 針對(duì)每條消息記錄進(jìn)行處理:
//KStream示例
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount1");//一個(gè)stream應(yīng)用指定一個(gè)topic
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
// Note: To re-run the demo, you need to use the offset reset tool:
// https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source = builder.stream("topic1");//input topic
processMessage(source);
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
}
private static void processMessage(KStream<String, String> source) throws FileNotFoundException {
final PrintWriter printWriter = new PrintWriter(WordCountDemo.class.getResource("").getPath() + "outputfile");
source.foreach(new ForeachAction<String, String>() {
@Override
public void apply(String key, String value) {
printWriter.append("|").append(key).append("|").append(" ").append("|").append(value).append("|").append("\n");
printWriter.flush();
}
});
source.print();//打印到控制臺(tái)
source.writeAsText(WordCountDemo.class.getResource("").getPath() + "outputfile");//寫(xiě)入文件
}
kafka用途
參考:http://kafka.apache.org/uses#uses_commitlog
- 消息處理
- 網(wǎng)站實(shí)時(shí)監(jiān)測(cè)
- 監(jiān)控
- 日志收集
- 數(shù)據(jù)流處理
- 事件跟蹤(存在事件順序)
7.分布式系統(tǒng)的提交記錄
源代碼編譯
- 下載源碼
git clone http://git-wip-us.apache.org/repos/asf/kafka.git<kafka.project.dir> - gradle編譯
- cd <kafka.project.dir>
- gradle idea