centOS7.6下安裝kafka2.8.0詳解

一.安裝jdk

#安裝jdk
yum install -y java-1.8.0-openjdk.x86_64

#查看版本號
java -version

二.安裝kafka_2.12-2.8.0

下載頁面:https://kafka.apache.org/downloads
1.配置kafka

#切換到自己的安裝目錄
cd /usr/local/apps/

#拷貝
rz

#解壓
tar -xvf kafka_2.12-2.8.0.tg

#切換到kafka目錄
cd kafka_2.12-2.8.0/

#創(chuàng)建日志目錄
mkdir log

#修改配置文件
vim ./config/server.properties
image.png

2.配置zookeeper服務(我這里用默認配置算了)

三、創(chuàng)建啟動和關閉的 kafka 執(zhí)行腳本

1.創(chuàng)建啟動文件

#啟動zookeeper
vim kafka_start.sh

文件內(nèi)容

#!/bin/sh

/usr/local/apps/kafka_2.12-2.8.0/bin/zookeeper-server-start.sh /usr/local/apps/kafka_2.12-2.8.0/config/zookeeper.properties &
 
sleep 3 #等3秒后執(zhí)行
 
#啟動kafka
/usr/local/apps/kafka_2.12-2.8.0/bin/kafka-server-start.sh /usr/local/apps/kafka_2.12-2.8.0/config/server.properties &

2..創(chuàng)建停止文件

vim kafka_stop.sh

文件內(nèi)容

#!/bin/sh
#關閉zookeeper
/usr/local/apps/kafka_2.12-2.8.0/bin/zookeeper-server-stop.sh /usr/local/apps/kafka_2.12-2.8.0/config/zookeeper.properties &
 
sleep 3 #等3秒后執(zhí)行
 
#關閉kafka
/usr/local/apps/kafka_2.12-2.8.0/bin/kafka-server-stop.sh /usr/local/apps/kafka_2.12-2.8.0/config/server.properties &

3.為腳本執(zhí)行權(quán)限

chmod +x kafka_start.sh
chmod +x kafka_stop.sh
  1. 啟動腳本,設置開機自啟動
sh /usr/local/apps/kafka_2.12-2.8.0/kafka_start.sh &

四雪情、Kafka配置文件詳解 (2022.05.02增加)

1.producer.properties:生產(chǎn)端的配置文件

#指定kafka節(jié)點列表劣纲,用于獲取metadata挎塌,不必全部指定
#需要kafka的服務器地址采缚,來獲取每一個topic的分片數(shù)等元數(shù)據(jù)信息嘴脾。
metadata.broker.list=kafka01:9092,kafka02:9092,kafka03:9092
 
#生產(chǎn)者生產(chǎn)的消息被發(fā)送到哪個block视乐,需要一個分組策略洛搀。
#指定分區(qū)處理類。默認kafka.producer.DefaultPartitioner佑淀,表通過key哈希到對應分區(qū)
#partitioner.class=kafka.producer.DefaultPartitioner
 
#生產(chǎn)者生產(chǎn)的消息可以通過一定的壓縮策略(或者說壓縮算法)來壓縮留美。消息被壓縮后發(fā)送到broker集群,
#而broker集群是不會進行解壓縮的伸刃,broker集群只會把消息發(fā)送到消費者集群谎砾,然后由消費者來解壓縮。
#是否壓縮捧颅,默認0表示不壓縮景图,1表示用gzip壓縮,2表示用snappy壓縮碉哑。
#壓縮后消息中會有頭來指明消息壓縮類型症歇,故在消費者端消息解壓是透明的無需指定。
#文本數(shù)據(jù)會以1比10或者更高的壓縮比進行壓縮谭梗。
compression.codec=none
 
#指定序列化處理類忘晤,消息在網(wǎng)絡上傳輸就需要序列化,它有String激捏、數(shù)組等許多種實現(xiàn)设塔。
serializer.class=kafka.serializer.DefaultEncoder
 
#如果要壓縮消息,這里指定哪些topic要壓縮消息远舅,默認empty闰蛔,表示不壓縮。
#如果上面啟用了壓縮图柏,那么這里就需要設置
#compressed.topics= 
#這是消息的確認機制序六,默認值是0。在面試中常被問到蚤吹。
#producer有個ack參數(shù)例诀,有三個值随抠,分別代表:
#(1)不在乎是否寫入成功;
#(2)寫入leader成功繁涂;
#(3)寫入leader和所有副本都成功拱她;
#要求非炒戆睿可靠的話可以犧牲性能設置成最后一種仅偎。
#為了保證消息不丟失疟位,至少要設置為1欠动,也就
#是說至少保證leader將消息保存成功箱靴。
#設置發(fā)送數(shù)據(jù)是否需要服務端的反饋,有三個值0,1,-1只嚣,分別代表3種狀態(tài):
#0: producer不會等待broker發(fā)送ack穗椅。生產(chǎn)者只要把消息發(fā)送給broker之后蔓倍,就認為發(fā)送成功了全肮,這是第1種情況敞咧;
#1: 當leader接收到消息之后發(fā)送ack。生產(chǎn)者把消息發(fā)送到broker之后倔矾,并且消息被寫入到本地文件妄均,才認為發(fā)送成功,這是第二種情況哪自;#-1: 當所有的follower都同步消息成功后發(fā)送ack丰包。不僅是主的分區(qū)將消息保存成功了,
#而且其所有的分區(qū)的副本數(shù)也都同步好了壤巷,才會被認為發(fā)動成功邑彪,這是第3種情況。
request.required.acks=0
 
#broker必須在該時間范圍之內(nèi)給出反饋胧华,否則失敗寄症。
#在向producer發(fā)送ack之前,broker允許等待的最大時間 ,如果超時,
#broker將會向producer發(fā)送一個error ACK.意味著上一次消息因為某種原因
#未能成功(比如follower未能同步成功)
request.timeout.ms=10000
 
#生產(chǎn)者將消息發(fā)送到broker矩动,有兩種方式有巧,一種是同步,表示生產(chǎn)者發(fā)送一條悲没,broker就接收一條篮迎;
#還有一種是異步,表示生產(chǎn)者積累到一批的消息示姿,裝到一個池子里面緩存起來甜橱,再發(fā)送給broker,
#這個池子不會無限緩存消息栈戳,在下面岂傲,它分別有一個時間限制(時間閾值)和一個數(shù)量限制(數(shù)量閾值)的參數(shù)供我們來設置。
#一般我們會選擇異步子檀。
#同步還是異步發(fā)送消息镊掖,默認“sync”表同步乃戈,"async"表異步。異步可以提高發(fā)送吞吐量,
#也意味著消息將會在本地buffer中,并適時批量發(fā)送堰乔,但是也可能導致丟失未發(fā)送過去的消息
producer.type=sync
 
#在async模式下,當message被緩存的時間超過此值后,將會批量發(fā)送給broker,
#默認為5000ms
#此值和batch.num.messages協(xié)同工作.
queue.buffering.max.ms = 5000
 
#異步情況下偏化,緩存中允許存放消息數(shù)量的大小脐恩。
#在async模式下,producer端允許buffer的最大消息量
#無論如何,producer都無法盡快的將消息發(fā)送給broker,從而導致消息在producer端大量沉積
#此時,如果消息的條數(shù)達到閥值,將會導致producer端阻塞或者消息被拋棄镐侯,默認為10000條消息。
queue.buffering.max.messages=20000
 
#如果是異步驶冒,指定每次批量發(fā)送數(shù)據(jù)量苟翻,默認為200
batch.num.messages=500
 
#在生產(chǎn)端的緩沖池中,消息發(fā)送出去之后骗污,在沒有收到確認之前崇猫,該緩沖池中的消息是不能被刪除的,
#但是生產(chǎn)者一直在生產(chǎn)消息需忿,這個時候緩沖池可能會被撐爆诅炉,所以這就需要有一個處理的策略。
#有兩種處理方式屋厘,一種是讓生產(chǎn)者先別生產(chǎn)那么快涕烧,阻塞一下,等會再生產(chǎn)汗洒;另一種是將緩沖池中的消息清空议纯。
#當消息在producer端沉積的條數(shù)達到"queue.buffering.max.meesages"后阻塞一定時間后,
#隊列仍然沒有enqueue(producer仍然沒有發(fā)送出任何消息)
#此時producer可以繼續(xù)阻塞或者將消息拋棄,此timeout值用于控制"阻塞"的時間
#-1: 不限制阻塞超時時間,讓produce一直阻塞,這個時候消息就不會被拋棄
#0: 立即清空隊列,消息被拋棄
queue.enqueue.timeout.ms=-1
 
 
#當producer接收到error ACK,或者沒有接收到ACK時,允許消息重發(fā)的次數(shù)
#因為broker并沒有完整的機制來避免消息重復,所以當網(wǎng)絡異常時(比如ACK丟失)
#有可能導致broker接收到重復的消息,默認值為3.
message.send.max.retries=3
 
#producer刷新topic metada的時間間隔,producer需要知道partition leader
#的位置,以及當前topic的情況
#因此producer需要一個機制來獲取最新的metadata,當producer遇到特定錯誤時,
#將會立即刷新
#(比如topic失效,partition丟失,leader失效等),此外也可以通過此參數(shù)來配置
#額外的刷新機制溢谤,默認值600000
topic.metadata.refresh.interval.ms=60000

2瞻凤、consumer.properties:消費端的配置文件

#消費者集群通過連接Zookeeper來找到broker。
#zookeeper連接服務器地址
zookeeper.connect=zk01:2181,zk02:2181,zk03:2181
 
#zookeeper的session過期時間世杀,默認5000ms阀参,用于檢測消費者是否掛掉
zookeeper.session.timeout.ms=5000
 
#當消費者掛掉,其他消費者要等該指定時間才能檢查到并且觸發(fā)重新負載均衡
zookeeper.connection.timeout.ms=10000
 
#這是一個時間閾值瞻坝。
#指定多久消費者更新offset到zookeeper中蛛壳。
#注意offset更新時基于time而不是每次獲得的消息。
#一旦在更新zookeeper發(fā)生異常并重啟湿镀,將可能拿到已拿到過的消息
zookeeper.sync.time.ms=2000
 
#指定消費
group.id=xxxxx
 
#這是一個數(shù)量閾值炕吸,經(jīng)測試是500條。
#當consumer消費一定量的消息之后,將會自動向zookeeper提交offset信息#注意offset信息并不是每消費一次消息就向zk提交
#一次,而是現(xiàn)在本地保存(內(nèi)存),并定期提交,默認為true
auto.commit.enable=true
 
# 自動更新時間勉痴。默認60 * 1000
auto.commit.interval.ms=1000
 
# 當前consumer的標識,可以設定,也可以有系統(tǒng)生成,
#主要用來跟蹤消息消費情況,便于觀察
conusmer.id=xxx
 
# 消費者客戶端編號赫模,用于區(qū)分不同客戶端,默認客戶端程序自動產(chǎn)生
client.id=xxxx
 
# 最大取多少塊緩存到消費者(默認10)
queued.max.message.chunks=50
 
# 當有新的consumer加入到group時,將會reblance,此后將會
#有partitions的消費端遷移到新  的consumer上,如果一個
#consumer獲得了某個partition的消費權(quán)限,那么它將會向zk
#注冊 "Partition Owner registry"節(jié)點信息,但是有可能
#此時舊的consumer尚沒有釋放此節(jié)點, 此值用于控制,
#注冊節(jié)點的重試次數(shù).
rebalance.max.retries=5
 
#每拉取一批消息的最大字節(jié)數(shù)
#獲取消息的最大尺寸,broker不會像consumer輸出大于
#此值的消息chunk 每次feth將得到多條消息,此值為總大小,
#提升此值,將會消耗更多的consumer端內(nèi)存
fetch.min.bytes=6553600
 
#當消息的尺寸不足時,server阻塞的時間,如果超時,
#消息將立即發(fā)送給consumer
#數(shù)據(jù)一批一批到達蒸矛,如果每一批是10條消息瀑罗,如果某一批還
#不到10條胸嘴,但是超時了,也會立即發(fā)送給consumer斩祭。
fetch.wait.max.ms=5000
socket.receive.buffer.bytes=655360
 
# 如果zookeeper沒有offset值或offset值超出范圍劣像。
#那么就給個初始的offset。有smallest摧玫、largest耳奕、
#anything可選,分別表示給當前最小的offset诬像、
#當前最大的offset屋群、拋異常。默認largest
auto.offset.reset=smallest
 
# 指定序列化處理類
derializer.class=kafka.serializer.DefaultDecoder

3.server.properties:服務端的配置文件

#broker的全局唯一編號坏挠,不能重復
broker.id=0
 
#用來監(jiān)聽鏈接的端口芍躏,producer或consumer將在此端口建立連接
port=9092
 
#處理網(wǎng)絡請求的線程數(shù)量,也就是接收消息的線程數(shù)降狠。
#接收線程會將接收到的消息放到內(nèi)存中对竣,然后再從內(nèi)存中寫入磁盤。
num.network.threads=3
 
#消息從內(nèi)存中寫入磁盤是時候使用的線程數(shù)量榜配。
#用來處理磁盤IO的線程數(shù)量
num.io.threads=8
 
#發(fā)送套接字的緩沖區(qū)大小
socket.send.buffer.bytes=102400
 
#接受套接字的緩沖區(qū)大小
socket.receive.buffer.bytes=102400
 
#請求套接字的緩沖區(qū)大小
socket.request.max.bytes=104857600
 
#kafka運行日志存放的路徑
log.dirs=/export/servers/logs/kafka
 
#topic在當前broker上的分片個數(shù)
num.partitions=2
 
#我們知道segment文件默認會被保留7天的時間否纬,超時的話就
#會被清理,那么清理這件事情就需要有一些線程來做芥牌。這里就是
#用來設置恢復和清理data下數(shù)據(jù)的線程數(shù)量
num.recovery.threads.per.data.dir=1
 
#segment文件保留的最長時間烦味,默認保留7天(168小時),
#超時將被刪除壁拉,也就是說7天之前的數(shù)據(jù)將被清理掉谬俄。
log.retention.hours=168
 
#滾動生成新的segment文件的最大時間
log.roll.hours=168
 
#日志文件中每個segment的大小,默認為1G
log.segment.bytes=1073741824
 
#上面的參數(shù)設置了每一個segment文件的大小是1G弃理,那么
#就需要有一個東西去定期檢查segment文件有沒有達到1G溃论,
#多長時間去檢查一次,就需要設置一個周期性檢查文件大小
#的時間(單位是毫秒)痘昌。
log.retention.check.interval.ms=300000
 
#日志清理是否打開
log.cleaner.enable=true
 
#broker需要使用zookeeper保存meta數(shù)據(jù)
zookeeper.connect=zk01:2181,zk02:2181,zk03:2181
 
#zookeeper鏈接超時時間
zookeeper.connection.timeout.ms=6000
 
#上面我們說過接收線程會將接收到的消息放到內(nèi)存中钥勋,然后再從內(nèi)存
#寫到磁盤上,那么什么時候?qū)⑾膬?nèi)存中寫入磁盤辆苔,就有一個
#時間限制(時間閾值)和一個數(shù)量限制(數(shù)量閾值)算灸,這里設置的是
#數(shù)量閾值,下一個參數(shù)設置的則是時間閾值驻啤。
#partion buffer中菲驴,消息的條數(shù)達到閾值,將觸發(fā)flush到磁盤骑冗。
log.flush.interval.messages=10000
 
#消息buffer的時間赊瞬,達到閾值先煎,將觸發(fā)將消息從內(nèi)存flush到磁盤,
#單位是毫秒巧涧。
log.flush.interval.ms=3000
 
#刪除topic需要server.properties中設置delete.topic.enable=true否則只是標記刪除
delete.topic.enable=true
 
#此處的host.name為本機IP(重要),如果不改,則客戶端會拋出:
#Producer connection to localhost:9092 unsuccessful 錯誤!
host.name=kafka01
 
advertised.host.name=192.168.239.128

五薯蝎、相關命令

1.創(chuàng)建topic主題

bin/kafka-topics.sh --create --topic topicname --replication-factor 1 --partitions 1 --zookeeper localhost:2181

2.查看相關主題

bin/kafka-topics.sh --list --zookeeper localhost:2181
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市谤绳,隨后出現(xiàn)的幾起案子占锯,更是在濱河造成了極大的恐慌,老刑警劉巖闷供,帶你破解...
    沈念sama閱讀 216,470評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件烟央,死亡現(xiàn)場離奇詭異统诺,居然都是意外死亡歪脏,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,393評論 3 392
  • 文/潘曉璐 我一進店門粮呢,熙熙樓的掌柜王于貴愁眉苦臉地迎上來婿失,“玉大人,你說我怎么就攤上這事啄寡『拦瑁” “怎么了?”我有些...
    開封第一講書人閱讀 162,577評論 0 353
  • 文/不壞的土叔 我叫張陵挺物,是天一觀的道長懒浮。 經(jīng)常有香客問我,道長识藤,這世上最難降的妖魔是什么砚著? 我笑而不...
    開封第一講書人閱讀 58,176評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮痴昧,結(jié)果婚禮上稽穆,老公的妹妹穿的比我還像新娘。我一直安慰自己赶撰,他們只是感情好舌镶,可當我...
    茶點故事閱讀 67,189評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著豪娜,像睡著了一般餐胀。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上瘤载,一...
    開封第一講書人閱讀 51,155評論 1 299
  • 那天否灾,我揣著相機與錄音,去河邊找鬼惕虑。 笑死坟冲,一個胖子當著我的面吹牛磨镶,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播健提,決...
    沈念sama閱讀 40,041評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼琳猫,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了私痹?” 一聲冷哼從身側(cè)響起脐嫂,我...
    開封第一講書人閱讀 38,903評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎紊遵,沒想到半個月后账千,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,319評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡暗膜,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,539評論 2 332
  • 正文 我和宋清朗相戀三年匀奏,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片学搜。...
    茶點故事閱讀 39,703評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡娃善,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出瑞佩,到底是詐尸還是另有隱情聚磺,我是刑警寧澤,帶...
    沈念sama閱讀 35,417評論 5 343
  • 正文 年R本政府宣布炬丸,位于F島的核電站瘫寝,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏稠炬。R本人自食惡果不足惜焕阿,卻給世界環(huán)境...
    茶點故事閱讀 41,013評論 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望酸纲。 院中可真熱鬧捣鲸,春花似錦、人聲如沸闽坡。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,664評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽疾嗅。三九已至外厂,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間代承,已是汗流浹背汁蝶。 一陣腳步聲響...
    開封第一講書人閱讀 32,818評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人掖棉。 一個月前我還...
    沈念sama閱讀 47,711評論 2 368
  • 正文 我出身青樓墓律,卻偏偏與公主長得像,于是被迫代替她去往敵國和親幔亥。 傳聞我的和親對象是個殘疾皇子耻讽,可洞房花燭夜當晚...
    茶點故事閱讀 44,601評論 2 353

推薦閱讀更多精彩內(nèi)容