一、Kafka簡介
1.Kafka---A distributed streaming platform
很多企業(yè)還是用作消息中間件子刮,其他的消息中間件還有 MQ Redis
2.術(shù)語
broker中間環(huán)節(jié),是一個kafka進程仇哆,作用是數(shù)據(jù)存儲
生產(chǎn)者 producer
消費者 consumer
企業(yè)應(yīng)用:Flume-->Kafka-->Spark Streaming
3.特點:
- 發(fā)布和訂閱(publish and subscribe) Read and write streams of data like a messaging system
- 實時app Write scalable stream processing applications that react to events in real-time.
- 分布式失暴、副本數(shù)、高容錯 Store streams of data safely in a distributed, replicated, fault-tolerant cluster.
4.kafka源代碼是用scala編寫的
5.基本概念: - topic: 主題
- partitions: 分區(qū) 分區(qū)下標從0開始返敬,假設(shè)3個分區(qū),分區(qū)下標分別為-0寥院,-1劲赠,-2,體現(xiàn)了高并發(fā)讀寫的特點
[hadoop@hadoop000 kafka-logs]$ ll
total 32
-rw-rw-r--. 1 hadoop hadoop 0 Sep 26 23:51 cleaner-offset-checkpoint
drwxrwxr-x. 2 hadoop hadoop 4096 Sep 26 23:58 huluwa_kafka_streaming-0
drwxrwxr-x. 2 hadoop hadoop 4096 Sep 27 03:10 huluwa_offset-0
-rw-rw-r--. 1 hadoop hadoop 54 Sep 26 23:51 meta.properties
-rw-rw-r--. 1 hadoop hadoop 76 Sep 27 09:13 recovery-point-offset-checkpoint
-rw-rw-r--. 1 hadoop hadoop 77 Sep 27 09:13 replication-offset-checkpoint
drwxrwxr-x. 2 hadoop hadoop 4096 Sep 27 08:54 test-0
drwxrwxr-x. 2 hadoop hadoop 4096 Sep 27 08:54 test-1
drwxrwxr-x. 2 hadoop hadoop 4096 Sep 27 08:54 test-2
[hadoop@hadoop000 kafka-logs]$ cd test-0
[hadoop@hadoop000 test-0]$ ll
total 4
-rw-rw-r--. 1 hadoop hadoop 10485760 Sep 27 08:54 00000000000000000000.index
-rw-rw-r--. 1 hadoop hadoop 70 Sep 27 08:55 00000000000000000000.log
-
replication-factor:副本因子只磷,指的是每個partition有幾個副本经磅,體現(xiàn)了高容錯的特點
假設(shè)kafka集群有三臺機器,3個partition钮追,3個副本,3個副本分別坐落在三臺機器上阿迈,此時有一條數(shù)據(jù)寫入機器1的partition-0元媚,那么它的另外兩個副本必然是寫在機器2的partition-0和機器3的partition-0中,并不是三個副本分別處于partition-0,partition-1和partition-2中
二刊棕、常用命令
1.創(chuàng)建一個topic
bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 3 \
--topic test
集群:--zookeeper computer1:2181,computer2:2181,computer3:2181/kafka
2.查看當前的topic信息
bin/kafka-topics.sh \
--list \
--zookeeper localhost:2181
集群:--zookeeper computer1:2181,computer2:2181,computer3:2181/kafka
3.發(fā)送數(shù)據(jù) 生產(chǎn)者
bin/kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic test
集群:--broker-list computer1:9092, computer2:9092, computer3:9092
4.接收數(shù)據(jù) 消費者
bin/kafka-console-consumer.sh \
--zookeeper localhost:2181 \
--topic test \
--from-beginning
集群:--zookeeper computer1:2181,computer2:2181,computer3:2181/kafka
發(fā)送幾條數(shù)據(jù)測試一下:
[hadoop@hadoop000 kafka]$ bin/kafka-console-producer.sh \
> --broker-list localhost:9092 \
> --topic test
spark
1
2
3
4
5
-------------------------------------------------------------------------------------------------------------
[hadoop@hadoop000 kafka]$ bin/kafka-console-consumer.sh \
> --zookeeper localhost:2181 \
> --topic test \
> --from-beginning
spark
2
1
3
4
5
可以正常發(fā)送和接收數(shù)據(jù)
先把producer和consumer都關(guān)掉炭晒,再打開consumer,觀察到:
[hadoop@hadoop000 kafka]$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
spark
3
2
5
1
4
數(shù)據(jù)的順序變了甥角,這就是分區(qū)有序网严,全局無序(單個分區(qū)內(nèi)數(shù)據(jù)有序,全局看是無序的)思考如何保證生產(chǎn)上全局有序
核心點:根據(jù)數(shù)據(jù)的特征設(shè)置一個拼裝的key嗤无,把相同特征的數(shù)據(jù)比如(mysql里所有處理id=1的數(shù)據(jù)的語句)發(fā)送到同一個topic的1個分區(qū)內(nèi)
hash(key) 取模
5.描述
bin/kafka-topics.sh \
--describe \
--zookeeper localhost:2181 \
--topic test
集群:--zookeeper computer1:2181,computer2:2181,computer3:2181/kafka
在集群環(huán)境下:
[hadoop@hadoop000 kafka]$ bin/kafka-topics.sh \
> --describe \
> --zookeeper computer1:2181,computer2:2181,computer3:2181/kafka \
> --topic test
Topic:test PartitionCount:3 ReplicationFactor:3 Configs:
Topic: test Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: test Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: test Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
當id=1的機器掛掉之后:
Topic:test PartitionCount:3 ReplicationFactor:3 Configs:
Topic: test Partition: 0 Leader: 2 Replicas: 1,2,3 Isr: 2,3
Topic: test Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: test Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
6.更改分區(qū)數(shù)(用在并發(fā)不夠震束,加機器的場景下)
bin/kafka-topics.sh \
--alter \
--zookeeper localhost:2181 \
--topic test \
--partitions 4
添加分區(qū)數(shù)不會重新分布數(shù)據(jù)
Topic:test PartitionCount:4 ReplicationFactor:3 Configs:
Topic: test Partition: 0 Leader: 2 Replicas: 1,2,3 Isr: 2,3,1
Topic: test Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: test Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: test Partition: 3 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2
7.刪除
bin/kafka-topics.sh
--delete
--zookeeper localhost:2181
--topic test
集群:--zookeeper computer1:2181,computer2:2181,computer3:2181/kafka
[hadoop@hadoop000 kafka]$ bin/kafka-topics.sh \
> --delete \
> --zookeeper localhost:2181 \
> --topic test
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
[hadoop@hadoop000 kafka]$ bin/kafka-topics.sh \
> --list \
> --zookeeper localhost:2181
huluwa_kafka_streaming
huluwa_offset
test - marked for deletion
如果不在server.properties中添加:delete.topic.enable=true,這條命令其實并不執(zhí)行刪除動作当犯,僅僅是在zookeeper上標記該topic要被刪除而已垢村,通過命令標記了test-topic要被刪除之后Kafka是怎么執(zhí)行刪除操作的呢?下面介紹一個能徹底刪除的方法:
[hadoop@hadoop000 bin]$ ./zkCli.sh
//先刪除元數(shù)據(jù)
[zk: localhost:2181(CONNECTED) 0] rmr /kafka/admin/delete_topics/test
[zk: localhost:2181(CONNECTED) 0] rmr /kafka/config/topics/test
[zk: localhost:2181(CONNECTED) 0] rmr /kafka/brokers/topics/test
//再刪除真實數(shù)據(jù)
[hadoop@hadoop000 ~]$ cd $KAFKA_HOME
[hadoop@hadoop000 kafka]$ rm -rf logs/test-*