寫在開頭
僅用于自己學(xué)習(xí)記錄汗唱,本章節(jié)學(xué)習(xí)目的是快速上手kafka和體驗go操作kafka
目錄
1.消息隊列
消息隊列MQ(Message Queue)也被稱為中間件朦拖,不存儲消息內(nèi)容裁僧,只是消息的搬運工因篇,具體表現(xiàn)在:
- 在不同進程間傳遞消息
- 在統(tǒng)一進程的不同線程間傳遞消息
在該模式下磕仅,生產(chǎn)者只需要向消息隊列投遞消息,生產(chǎn)者只需要等消息隊列搬運消息鸵荠,此時冕茅,生產(chǎn)者和消費者就解耦了
2.Kafka
kafka是一個分布式,支持多分區(qū),基于zookeeper的分布式消息流平臺(元數(shù)據(jù)都保存在zookeeper中姨伤,因此3.7版本之前都需要先安裝zookeeper)它同時也是一款開源的基于發(fā)布訂閱模式的消息引擎系統(tǒng)哨坪。
為什么要學(xué)習(xí)kafka,對于數(shù)據(jù)密集型應(yīng)用來說乍楚,kafka能很好幫助我們應(yīng)對數(shù)據(jù)量的激增当编,舉個例子,上游比如是300個示例的大型數(shù)據(jù)中心徒溪,下游是一個搜索和查詢的引擎忿偷,中間件使用kafka隔離上下游業(yè)務(wù),將上游激增的流量緩存起來臊泌,以平滑的方式傳到到下游子系統(tǒng)中鲤桥,避免了流量的不規(guī)則沖擊。
2.1消息引擎系統(tǒng)
看名字就知道缺虐,它比MQ逼格更高芜壁,wiki上的介紹是
消息引擎是一組規(guī)范,企業(yè)利用這組規(guī)范在不同系統(tǒng)之間傳遞準(zhǔn)確的消息高氮,實現(xiàn)松耦合的異步式數(shù)據(jù)傳遞
即:
- 用于不同系統(tǒng)之間
- 傳輸?shù)膶ο笫窍?/li>
這么一看是不是和MQ大差不差慧妄,但是之所以把他叫做引擎,是它能把消息轉(zhuǎn)換成一定的格式剪芍,即如何傳輸消息塞淹,如何設(shè)計待傳輸消息的格式都屬于消息引擎設(shè)計的一部分(摩托車引擎把燃油轉(zhuǎn)為動能,消息引擎也是如此罪裹,所以才叫引擎)饱普。
實際上kafka在傳輸時使用的是純二進制的字節(jié)序列
2.2為什么使用kafka
在這章開頭舉了如何對抗峰值流量例子,就是削峰填谷状共,緩沖上下游突發(fā)的流量套耕,使其平滑,來保護下游服務(wù)
2.3Kafka術(shù)語
消息:record峡继,指kafka處理的主要對象冯袍,類比就是數(shù)據(jù)庫表中的一行記錄
生產(chǎn)者/消費者:指發(fā)布/消費消息的應(yīng)用程序
-
主題:Topic,承載消息的容器碾牌,類比就是數(shù)據(jù)庫中的表康愤,更直觀點解釋就是一個業(yè)務(wù)就是一個topicimage.png
-
分區(qū):一個有序不變的消息序列,每個主題下可以有多個分區(qū)image.png
消息位移offset:分區(qū)中每條消息的位置
副本replica:Kafka 中同一條消息能夠被拷貝到多個地方以提供數(shù)據(jù)冗余舶吗,這些地方就是所謂的副本征冷,副本分為領(lǐng)導(dǎo)者副本和追隨者副本,生產(chǎn)者只與領(lǐng)導(dǎo)者副本交互
消費者組:多個消費者實例組成一個組誓琼,同時消費多個消息以實現(xiàn)提高吞吐量(如果一個 topic 有 N 個分區(qū)检激,那么同一個消費組最多有 N 個消費者肴捉。多于這個數(shù)字的消
費者會被忽略。)消費者位移:表示消費者消費進度叔收,每個消費者都有自己的消費者位移
重平衡:組內(nèi)某個消費者掛了每庆,其他實例自動重新分配訂閱主題分區(qū)的過程
2.4集群配置參數(shù)
2.4.1Broker端參數(shù)
Broker需要配置存儲信息,即Broker使用哪些磁盤今穿,針對存儲信息的重要參數(shù)有以下幾個:
- log.dirs:指定Broker需要使用的若干個文件目錄路徑,沒有默認(rèn)值必須手動指定
- log.dir:補充上一個參數(shù)
實際只需要配置log.dirs即可伦籍,線上生產(chǎn)環(huán)境一定要配置多個路徑(提升讀寫性能蓝晒,實現(xiàn)故障轉(zhuǎn)移),采用CSV格式(用逗號分隔多個路徑帖鸦,如/home/kafka1,/home/kafka2,/home/kafka3)
與zooKeeper相關(guān)設(shè)置:
- zookeeper.connect :zooKeeper集群連接芝薇,采用csv格式(zk1:2181,zk2:2181,zk3:2181)
Broker連接相關(guān)(客戶端連接或與其他broker連接)
- listeners:告訴外部連接者要通過什么協(xié)議訪問指定主機名和端口開放的 Kafka 服務(wù)
- advertised.listeners 這組監(jiān)聽器是 Broker 用于對外發(fā)布的,即外網(wǎng)訪問
- host.name/port:列出這兩個參數(shù)就是想說你把它們忘掉吧作儿,壓根不要為它們指定值洛二,畢竟都是過期的參數(shù)了
Topic相關(guān)
- auto.create.topics.enable:是否允許自動創(chuàng)建 Topic ,推薦設(shè)置false
- unclean.leader.election.enable:是否允許 Unclean Leader 選舉攻锰,建議設(shè)置成false 晾嘶,堅決不能讓那些落后太多的副本競選 Leader
- auto.leader.rebalance.enable是否允許定期進行 Leader 選舉,推薦設(shè)置成false娶吞,在生產(chǎn)環(huán)境中換一次 Leader 代價很高的垒迂,原本向 A 發(fā)送請求的所有客戶端都要切換成向 B 發(fā)送請求,而且這種換 Leader 本質(zhì)上沒有任何性能收益妒蛇,因此我建議你在生產(chǎn)環(huán)境中把這個參數(shù)設(shè)置成 false机断。
數(shù)據(jù)保留
- log.retention.{hour|minutes|ms}:這是個“三兄弟”,都是控制一條消息數(shù)據(jù)被保存多長時間
- log.retention.bytes:這是指定 Broker 為消息保存的總磁盤容量大小绣夺,默認(rèn)值-1吏奸,表示保存多少數(shù)據(jù)都可以
- message.max.bytes:控制 Broker 能夠接收的最大消息大小
2.4.1 Topic級別參數(shù)
Kafka支持為不同的topic設(shè)置不同的參數(shù)值,Topic級別參數(shù)會覆蓋全局broker參數(shù)
- retention.ms:規(guī)定了該 Topic 消息被保存的時長陶耍。默認(rèn)是 7 天
- retention.bytes:規(guī)定了要為該 Topic 預(yù)留多大的磁盤空間
如何設(shè)置topic級別參數(shù)奋蔚?
- 創(chuàng)建時設(shè)置( Kafka 開放了kafka-topics命令供我們來創(chuàng)建 Topic,--config用于設(shè)置topic級別參數(shù) )
- 修改時設(shè)置(更推薦使用該種)
3.快速上手kafka
我用的3.7.0 不需要額外安裝zookeeper
參考Docker---apache/kafka
sudo docker pull apache/kafka:3.7.0
sudo docker run -d --name kafka -p 9092:9092 apache/kafka:3.7.0
但是這種方式有一個弊端物臂,我的kafka是安裝在云服務(wù)器上的旺拉,本地的windows上無法訪問!棵磷!這時我們想到可能是上面的參數(shù)在作祟
advertised.listeners
我們進入容器查看
sudo docker exec -it kafka /bin/bash
cd opt/kafka/config
cat server.properties | grep listeners
發(fā)現(xiàn)advertised.listeners的值為localhost:9092蛾狗,只允許本地訪問,我們需要將他修改成以下形式
//我在云服務(wù)器上仪媒,這個ip就是我云服務(wù)器的彈性公網(wǎng)ip
PLAINTEXT://ip:9092
但是很遺憾沉桌,在docker里該文件是只讀谢鹊,我們也沒root權(quán)限,那么是否啟動時修改配置參數(shù)就行了留凭,可以佃扼,但很麻煩,根據(jù)kafka的docker介紹
Apache Kafka 支持多種代理配置蔼夜,您可以通過環(huán)境變量覆蓋這些配置兼耀。環(huán)境變量必須以 開頭KAFKA_,代理配置中的任何點都應(yīng)在相應(yīng)的環(huán)境變量中指定為下劃線求冷。
需要注意的是瘤运,如果您要覆蓋任何配置,則不會使用任何默認(rèn)配置匠题。
沒錯你不能光寫一個
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localxxxxxx:9092
還要連其他的一起補充
docker run -d \
--name broker \
-e KAFKA_NODE_ID=1 \
-e KAFKA_PROCESS_ROLES=broker,controller \
-e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
-e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 \
-e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 \
-e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 \
-e KAFKA_NUM_PARTITIONS=3 \
apache/kafka:latest
學(xué)習(xí)成本蹭蹭上去了拯坟,所以我選擇直接復(fù)制一個配置文件給他啟動
將kafka里的配置拷貝出一份
mkdir -p config
sudo docker kafka:/opt/kafka/config/server.properties ./config
cd config
vim server.properties //修改成你的彈性公網(wǎng)ip
vim Dockerfile //寫入下面兩行
FROM apache/kafka:3.7.0
COPY server.properties /etc/kafka/docker
//構(gòu)建kafka
sudo docker build -t="mykafka:1.0.0" .
//停止并刪除之前的容器
sudo docker stop kafka
sudo docker rm kafka
//啟動自己封裝的鏡像
sudo docker run -d -p 9092:9092 --name kafka mykafka:1.0.0
添加topic
//進入容器
sudo docker exec -it kafka /bin/bash
cd opt/kafka/bin
./kafka-topics.sh --create --bootstrap-server localhost:9092 \
--topic tests
檢查topic
./kafka-topics.sh --list --bootstrap-server localhost:9092
3.1go連接kafka
選用sarama 因為用戶多,注意現(xiàn)在文件移動到了IBM
go get -u github.com/IBM/sarama
下載消費者模擬工具模擬消費者消費消息
go install github.com/IBM/sarama/tools/kafka-console-consumer@latest
啟動成功表示已經(jīng)能成功連接遠程kafka
kafka-console-consumer -topic tests -brokers ip地址:9092
編寫生產(chǎn)者
func TestProducer(t *testing.T) {
cfg := sarama.NewConfig()
cfg.Producer.Return.Successes = true
cfg.Producer.Return.Errors = true
cfg.Version = sarama.MaxVersion
borkers := []string{"xxxxxxxx:9092"}
producer, err := sarama.NewAsyncProducer(borkers, cfg)
assert.NoError(t, err)
defer producer.Close()
msg := &sarama.ProducerMessage{
Topic: "tests",
Value: sarama.StringEncoder("hello"),
}
producer.Input() <- msg
select {
case success := <-producer.Successes():
t.Log(success.Partition, success.Offset)
return
case err := <-producer.Errors():
t.Log("發(fā)送失敗", err)
}
}
消費端輸出如下
Partition: 0
Offset: 2
Key:
Value: hello
生產(chǎn)者輸出如下:
0,2