簡介
部分引用來自博客:https://juejin.im/post/5a67f7e7f265da3e3c6c4f8b
Apache Kafka是一個分布式的發(fā)布-訂閱系統(tǒng)和一個強大的隊列狡孔。
特點:
1陵像、高吞吐(單機每秒10W條消息傳輸)
2盖桥、消息持久化(磁盤)
3、分布式(支持 Server 間的消息分區(qū)及分布式消費锻狗,同時保證每個 partition 內(nèi)的消息順序傳輸康二。這樣易于向外擴展楣导,所有的producer蝌衔、broker 和 consumer 都會有多個,均為分布式的)
4嚎研、消費消息采用pull模式(由consumer保存offset)
5蓖墅、支持online和offline場景
基本概念:
1库倘、 Broker
Kafka 集群中的一臺或多臺服務(wù)器統(tǒng)稱為 Broker,可以簡單理解為一個kafka節(jié)點论矾。
*Kafka集群通常使用多個Broker來實現(xiàn)集群的負載均衡
*Kafka brokers 是無狀態(tài)的教翩,它們使用 ZooKeeper 來保持集群信息
2、ZooKeeper
是一個分布式配置和同步服務(wù)贪壳,管理和協(xié)調(diào)Kafka broker
3饱亿、Topic
每條發(fā)布到 Kafka 的消息都有一個類別,這個類別被稱為 Topic 闰靴。(物理上不同Topic 的消息分開存儲彪笼。邏輯上一個 Topic 的消息雖然保存于一個或多個broker上,但用戶只需指定消息的 Topic 即可生產(chǎn)或消費數(shù)據(jù)而不必關(guān)心數(shù)據(jù)存于何處)
4蚂且、 Partition
Topic物理上的分組
配猫,一個 Topic 可以分為多個 Partition ,每個 Partition 是一個有序的隊列杏死。Partition 中的每條消息都會被分配一個有序的 id(offset)泵肄。但整個Topic消息不一定有序。
5淑翼、Segment
包含消息內(nèi)容的指定大小的文件
*由 index 文件和 log 文件組成;
*一個 Partition 由多個 Segment 文件組成
6腐巢、Offset
Segment 文件中消息的索引值, 從 0 開始計數(shù)
7、Replica (N)
消息的冗余備份
*每個 Partition 都會有 N 個完全相同的冗余備份, 這些備份會被盡量分散存儲在不同的機器上玄括。
8.冯丙、Producer
消息和數(shù)據(jù)的生產(chǎn)者,可以理解為往 Kafka 發(fā)消息的客戶端
9惠豺、 Consumer
消息和數(shù)據(jù)的消費者,可以理解為從 Kafka 取消息的客戶端
10风宁、Consumers
由于 Kafka brokers 是無狀態(tài)的洁墙, 因此需要Consumer來維護根據(jù)partition offset
已經(jīng)消費的消息數(shù)量信息。
*如果 consumer 確認了一個指定消息的offset
戒财,那也就意味著 consumer 已經(jīng)消費了該offset
之前的所有消息热监。
*Consumer可以向Broker異步發(fā)起一個拉取消息的請求來緩存待消費的消息。consumers 也可以通過提供一個指定的offset
值來回溯或跳過Partition中的消息饮寞。
*Consumer 消費消息的offset
值是保存在ZooKeeper中的孝扛。
11、Consumer Group
每個 Consumer 屬于一個特定的 Consumer Group(可為每個 Consumer 指定Group Name幽崩,若不指定 Group Name 則屬于默認的 Group
)苦始。
這是 Kafka 用來實現(xiàn)一個 Topic 消息的廣播(發(fā)給所有的 Consumer )和單播(發(fā)給任意一個 Consumer )的手段。一個 Topic 可以有多個 Consumer Group慌申。Topic 的消息會復(fù)制(不是真的復(fù)制陌选,是概念上的)到所有的 Consumer Group,但每個 Consumer Group 只會把消息發(fā)給該 Consumer Group 中的一個 Consumer。如果要實現(xiàn)廣播咨油,只要每個 Consumer 有一個獨立的 Consumer Group 就可以了您炉。如果要實現(xiàn)單播只要所有的 Consumer 在同一個 Consumer Group 。用 Consumer Group 還可以將 Consumer 進行自由的分組而不需要多次發(fā)送消息到不同的 Topic 役电。
本旨在探究kafka的部署及使用赚爵,下面介紹以docker方式安裝部署:
單機部署
第一步:下載鏡像。
// 選一個下載
docker pull wurstmeister/kafka
docker pull zookeeper
第二步:創(chuàng)建通信網(wǎng)絡(luò)法瑟。由于要涉及到zookeeper和kafka之間的通信冀膝,運用docker內(nèi)部容器通信機制先新建一個網(wǎng)絡(luò)。
[root@sz-ben-dev-01 ~]# docker network create hbl_test (新建網(wǎng)絡(luò))
dcb683a23044e902b251e01f493c814f940bd5bb592025c9eb4b78902f45091f
[root@sz-ben-dev-01 ~]# docker network ls (查看網(wǎng)絡(luò))
NETWORK ID NAME DRIVER SCOPE
dcb683a23044 hbl_test bridge local
[root@sz-ben-dev-01 ~]# docker network inspect hbl_test (查看網(wǎng)絡(luò)詳細信息)
[
{
"Name": "hbl_test",
"Id": "dcb683a23044e902b251e01f493c814f940bd5bb592025c9eb4b78902f45091f",
"Created": "2019-11-07T11:30:01.007966557+08:00",
"Scope": "local",
"Driver": "bridge",
"EnableIPv6": false,
"IPAM": {
"Driver": "default",
"Options": {},
"Config": [
{
"Subnet": "172.19.0.0/16",
"Gateway": "172.19.0.1"
}
]
},
"Internal": false,
"Attachable": false,
"Containers": {}, (連接的容器為空)
"Options": {},
"Labels": {}
}
]
第三步:創(chuàng)建zookeeper和kafka容器. (kafka容器啟動有坑瓢谢,接著往下看)
docker run --net=hbl_test --name hbl_zookeeper -p 21810:2181 -d docker.io/zookeeper
docker run --net=hbl_test --name hbl_kafka -p 9093:9092 \
--link hbl_zookeeper \
-e KAFKA_ZOOKEEPER_CONNECT=172.19.0.2:2181 \
-e KAFKA_ADVERTISED_HOST_NAME=172.28.2.104 \
-e KAFKA_ADVERTISED_PORT=9092 \
-d wurstmeister/kafka
KAFKA_ADVERTISED_HOST_NAME參數(shù)需要設(shè)置為宿主機地址172.28.2.104畸写。
KAFKA_ZOOKEEPER_CONNECT參數(shù)設(shè)置hbl-zookeeper容器內(nèi)部地址和端口(同一宿主機內(nèi)的容器互相訪問要用容器內(nèi)地址,查看指令為docker inspect hbl_zookeeper氓扛,在Networks字段可以看到容器內(nèi)ip地址)枯芬。
"Networks": {
"hbl_test": {
......
"IPAddress": "172.19.0.2",
......
}
}
這時在去查看網(wǎng)絡(luò)hbl_test,zookeeper和kafka容器都已經(jīng)加入hbl_test網(wǎng)絡(luò)了采郎。
[root@sz-ben-dev-01 ~]# docker network inspect hbl_test
[
{
"Name": "hbl_test",
......
"Containers": {
"1ea442cae355872506cfa6d1aab71149da8dfa0fd2d1b05c126f32112cbe957a": {
"Name": "hbl_zookeeper",
"EndpointID": "414a64059817d85133f5bcc32f4789aecfccdda5a5f987828fd59b86a3d8dcd6",
"MacAddress": "02:42:ac:13:00:02",
"IPv4Address": "172.19.0.2/16",
"IPv6Address": ""
},
"761485f630e9fd09e4737c511744da74018612e9a07f672927b123d3fc46b908": {
"Name": "hbl_kafka",
"EndpointID": "24bd176e9c828b577f8f269f081fcb83508e66764f6a46e08d3f59b36e6e1528",
"MacAddress": "02:42:ac:13:00:03",
"IPv4Address": "172.19.0.3/16",
"IPv6Address": ""
}
},
......
}
]
測試發(fā)送消息和接收消息
進入容器查看消費者信息或發(fā)布生產(chǎn)者消息
// 進入容器
docker exec -it hbl_kafka bash
// 進入kafka所在目錄
cd opt/kafka_2.12-2.3.0/bin/
// 啟動消息發(fā)送方
./kafka-console-producer.sh --broker-list localhost:9092 --topic mykafka
// 再開一個終端進入容器千所,啟動消息接收方
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mykafka --from-beginning
中途踩坑
1、啟動時kafka容器docker logs -f hbl_kafka查看日志報錯
WARN [Controller id=1001, targetBrokerId=1001] Connection to node 1001 (/172.28.2.104:9095) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
解決:防火墻對應(yīng)端口沒開
// 開啟9093端口
firewall-cmd --add-port=9093/tcp --permanent --zone=public
// 重載配置
firewall-cmd --reload
// 查看端口
firewall-cmd --list-port
2蒜埋、進入生產(chǎn)者發(fā)送消息報錯
WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 105 : {mykafka=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
解決:在一篇博客上看到說后來發(fā)現(xiàn)最新版本0.10.x broker配置棄用了advertised.host.name
和 advertised.port
這兩個個配置項淫痰,就配置advertised.listeners就可以了。
advertised.listeners=PLAINTEXT://59.64.11.22:9092
繼續(xù)
運行指令docker stop hbl_kafka停掉之前的kafka整份,重新啟動一個kafka容器:
docker run --net=hbl_test --name hbl_kafka2 -p 9094:9092 \
--link hbl_zookeeper \
-e KAFKA_BROKER_ID=4 \
-e KAFKA_ZOOKEEPER_CONNECT=172.19.0.2:2181 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.28.2.104:9094 \
-d wurstmeister/kafka
從第一個坑里還能看到如果我們不指定broker id待错,會隨機分配一個,所以還是指定一個吧烈评。(小伙伴們這里要開啟防火墻9094端口喲)
進入kafka容器發(fā)送消息
bash-4.4# ./kafka-console-producer.sh --broker-list localhost:9092 --topic test1
>hh
>kk
>nihao
>ss
>yapi
>
接收消息
bash-4.4# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1 --from-beginning
hh
kk
nihao
ss
yapi
上述做法是單個容器部署的火俄,其實已經(jīng)有點麻煩且容易出錯了,而作為分布式消息隊列讲冠,生產(chǎn)環(huán)境上肯定是集群部署瓜客,所以推薦用下面的docker compose容器編排工具來部署。
首先編寫
docker-compose.yml
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2182:2181"
networks:
- hbl_net
kafka:
image: wurstmeister/kafka
depends_on:
- zookeeper
links:
- zookeeper
networks:
- hbl_net
ports:
- "9095:9092"
environment:
KAFKA_BROKER_ID: 5
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://172.28.2.104:9095 #宿主機監(jiān)聽端口
volumes:
- /var/run/docker.sock:/var/run/docker.sock
networks:
hbl_net:
driver: bridge # 生成一個橋接網(wǎng)絡(luò)竿开,用于容器內(nèi)部通信谱仪,注意實際生成的網(wǎng)絡(luò)名稱會帶有docker-compose.yml文件所在文件夾的前綴,比如我的.yml文件放在了hbl文件夾下否彩,所以執(zhí)行后生成的網(wǎng)絡(luò)名為hbl_hbl_net
# external: true 如果外部已有網(wǎng)絡(luò)就用這個配置
執(zhí)行docker-compose up -d
即可疯攒。
集群部署
https://www.cnblogs.com/yingww/p/9188701.html
docker-compose.yml
version: '3'
services:
zoo1:
image: zookeeper
restart: always
container_name: zoo1
ports:
- "21811:2181"
environment:
ZOO_MY_ID: 1
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888 server.4=zoo4:2888:3888:observer
zoo2:
image: zookeeper
restart: always
container_name: zoo2
ports:
- "21812:2181"
environment:
ZOO_MY_ID: 2
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888 server.4=zoo4:2888:3888:observer
zoo3:
image: zookeeper
restart: always
container_name: zoo3
ports:
- "21813:2181"
environment:
ZOO_MY_ID: 3
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888 server.4=zoo4:2888:3888:observer
zoo4:
image: zookeeper
restart: always
container_name: zoo4
ports:
- "21814:2181"
environment:
ZOO_MY_ID: 4
PEER_TYPE: observer
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888 server.4=zoo4:2888:388:observer
broker1:
image: wurstmeister/kafka
restart: always
container_name: broker1
ports:
- "9192:9092"
depends_on:
- zoo1
- zoo2
- zoo3
- zoo4
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181,zoo4:2181
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://172.28.2.104:9192
volumes:
- /var/run/docker.sock:/var/run/docker.sock
broker2:
image: wurstmeister/kafka
restart: always
container_name: broker2
ports:
- "9292:9092"
depends_on:
- zoo1
- zoo2
- zoo3
- zoo4
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181,zoo4:2181
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://172.28.2.104:9292
volumes:
- /var/run/docker.sock:/var/run/docker.sock
broker3:
image: wurstmeister/kafka
restart: always
container_name: broker3
ports:
- "9392:9092"
depends_on:
- zoo1
- zoo2
- zoo3
- zoo4
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181,zoo4:2181
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://172.28.2.104:9392
volumes:
- /var/run/docker.sock:/var/run/docker.sock
后續(xù)待驗證,服務(wù)器內(nèi)存不夠
探究:
1列荔、不顯示指定消費者組是否會默認分配到同一個默認group卸例?
劃重點
1称杨、為什么要配置KAFKA_LISTENERS?
listeners 監(jiān)聽列表筷转,使用0.0.0.0綁定到所有接口
2姑原、為什么要配置KAFKA_ADVERTISED_LISTENERS?
advertised.listeners 發(fā)布到zookeeper供客戶端使用的監(jiān)聽器呜舒,可以簡單理解為kafka提供的外部訪問地址
3锭汛、為什么要掛載 /var/run/docker.sock?
Docker Daemon默認監(jiān)聽的是/var/run/docker.sock這個文件袭蝗,所以docker客戶端只要把請求發(fā)往這里唤殴,daemon就能收到并且做出響應(yīng)。把宿主機的/var/run/docker.sock映射到容器內(nèi)后到腥,在容器內(nèi)也能夠向/var/run/docker.sock發(fā)送http請求和Docker Daemon通信朵逝。
(參考博文https://blog.csdn.net/boling_cavalry/article/details/92846483)
小Tips
1、查看kafka的配置
位于kafka容器內(nèi) opt/kafka_2.12-2.3.0/config/ 路徑下乡范,查看server.properties文件部分信息如下:
......
group.initial.rebalance.delay.ms=0
advertised.port=9093
advertised.host.name=172.28.2.104
port=9092
2配名、查看kafka在zookeeper中的信息
進入zookeeper容器,/apache-zookeeper-3.5.6-bin/bin 路徑下執(zhí)行./zkCli.sh 進入客戶端晋辆,可以查看當(dāng)前broker id或topic分區(qū)等等信息
[zk: localhost:2181(CONNECTED) 6] ls /brokers/ids
[1011]
[zk: localhost:2181(CONNECTED) 7] ls /brokers/ids
[4]
[zk: localhost:2181(CONNECTED) 8] ls /brokers/topics/mykafka/partitions
[0]
[zk: localhost:2181(CONNECTED) 9] ls /brokers/topics/test1/partitions
[0]
[zk: localhost:2181(CONNECTED) 10]
2渠脉、docker-compose安裝使用
// 下載
curl -L https://github.com/docker/compose/releases/download/1.12.0/docker-compose-`uname -s`-`uname -m` > /usr/local/bin/docker-compose
// 設(shè)置可執(zhí)行權(quán)限
chmod +x /usr/local/bin/docker-compose