概念
producer : 消息生產(chǎn)者田藐,發(fā)布消息到 kafka 集群的終端或服務(wù)秋茫。
consumer : 從 kafka 集群中消費(fèi)消息的終端或服務(wù)夯膀。
Consumer group : high-level consumer API 中鲸匿,每個(gè) consumer 都屬于一個(gè) consumer group逗抑,每條消息只能被 consumer group 中的一個(gè) Consumer 消費(fèi)弦疮,但可以被多個(gè) consumer group 消費(fèi)夹攒。
topic: 每條發(fā)布到 kafka 集群的消息屬于的類別,即 kafka 是面向 topic 的胁塞。
broker:kafka 集群中包含的服務(wù)器咏尝。
partition:partition 是物理上的概念压语,每個(gè) topic 包含一個(gè)或多個(gè) partition。kafka 分配的單位是 partition编检。
replica: partition 的副本胎食,保障 partition 的高可用。
kafka配置文件
# 核心配置
broker.id=0
############### Socket Server Settings ##############
# 監(jiān)聽端口
listeners=PLAINTEXT://:9092
# 配置服務(wù)提供遠(yuǎn)端訪問能力
advertised.listeners=PLAINTEXT://192.168.199.101:9092
# 配置Https的連接
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# server用來處理網(wǎng)絡(luò)請求的網(wǎng)絡(luò)線程數(shù)目允懂;一般你不需要更改這個(gè)屬性厕怜。
num.network.threads=3
# server用來處理請求的I/O線程的數(shù)目;這個(gè)線程數(shù)目至少要等于硬盤的個(gè)數(shù)蕾总。
num.io.threads=8
# SO_SNDBUFF 緩存大小粥航,server進(jìn)行socket 連接所用
socket.send.buffer.bytes=102400
# SO_RCVBUFF緩存大小,server進(jìn)行socket連接時(shí)所用
socket.receive.buffer.bytes=102400
# server允許的最大請求尺寸生百; 這將避免server溢出递雀,它應(yīng)該小于Java heap size
socket.request.max.bytes=10485760
################ Log Basics ####################
# 此目錄每次重啟會被清理,測試用就不改了
log.dirs=/tmp/kafka-logs
# 如果創(chuàng)建topic時(shí)沒有給出劃分partitions個(gè)數(shù)蚀浆,這個(gè)數(shù)字將是topic下partitions數(shù)目的默認(rèn)數(shù)值缀程。
num.partitions=1
num.recovery.threads.per.data.dir=1
################# Internal Topic Settings #################
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
################# Log Flush Policy ###################
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
################# Log Retention Policy ##################
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
################# Zookeeper #################
zookeeper.connect=localhost:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
############ Group Coordinator Settings ############
group.initial.rebalance.delay.ms=0
例子 (kafka_2.11-0.10.1.1)
# Licensed to the Apache Software Foundation (ASF)
############ Server Basics ###################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
# Switch to enable topic deletion or not, default value is false
#delete.topic.enable=true
############## Socket Server Settings ##################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = security_protocol://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://192.168.199.101:9091
# The number of threads handling network requests
num.network.threads=3
# The number of threads doing disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
############# Log Basics ####################
# A comma seperated list of directories under which to store log files
log.dirs=/home/kfk/ka1_logs
# the brokers.
num.partitions=1
num.recovery.threads.per.data.dir=1
################# Log Flush Policy ###################
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############## Log Retention Policy #################
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
# The maximum size of a log segment file.
log.segment.bytes=1073741824
# to the retention policies
log.retention.check.interval.ms=300000
################ Zookeeper ################
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# root directory for all kafka znodes.
zookeeper.connect=192.168.199.101:2181,192.168.199.101:2182,192.168.199.101:2183
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
啟動命令
#!/bin/bash JMX_PORT=9981 bin/kafka-server-start.sh config/server1.properties >/dev/null 2>&1 &
停止命令
ps -ef |grep kafka |grep java|awk '{print $2}' |xargs kill -9
常用操作
創(chuàng)建topic test1
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1
查看topic列表
bin/kafka-topics.sh --list --zookeeper localhost:2181
查看一個(gè) topic 的分區(qū)及副本狀態(tài)信息。
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test1
啟動生產(chǎn)者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test1
啟動消費(fèi)者
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test1 --from-beginning
**在生產(chǎn)者發(fā)送的消息市俊,會在消費(fèi)者接收到杨凑,用于測試基礎(chǔ)功能
創(chuàng)建3個(gè)副本的topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic test2
將已有的topic修改為3個(gè)分區(qū)
kafka-topics.sh --alter --zookeeper localhost:2181 --topic test1 --partitions 3
zookeeper
配置文件實(shí)例 (zookeeper-3.4.14)
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/home/zkdata
dtaLogDir=/home/zkdata/logs
clientPort=2181
server.1=192.168.199.101:2881:3881
server.2=192.168.199.101:2882:3882
server.3=192.168.199.101:2883:3883
啟動命令
sh /opt/zookeeper1/bin/zkServer.sh start
start/status/stop/restart
連接服務(wù)器
zkCli.sh -server 127.0.0.1:2181
常用命令
1.ls -- 查看某個(gè)目錄包含的所有文件,例如:
[zk: 127.0.0.1:2181(CONNECTED) 1] ls /2.ls2 -- 查看某個(gè)目錄包含的所有文件秕衙,
與ls不同的是它查看到time蠢甲、version等信息僵刮,例如:
[zk: 127.0.0.1:2181(CONNECTED) 1] ls2 /3.create -- 創(chuàng)建znode据忘,并設(shè)置初始內(nèi)容,例如:
[zk: 127.0.0.1:2181(CONNECTED) 1] create /test "test"
Created /test創(chuàng)建一個(gè)新的 znode節(jié)點(diǎn)“ test ”以及與它關(guān)聯(lián)的字符串4.get -- 獲取znode的數(shù)據(jù)搞糕,
如下:
[zk: 127.0.0.1:2181(CONNECTED) 1] get /test5.set -- 修改znode內(nèi)容勇吊,例如:
[zk: 127.0.0.1:2181(CONNECTED) 1] set /test "ricky"6.delete -- 刪除znode,例如:
[zk: 127.0.0.1:2181(CONNECTED) 1] delete /test7.quit -- 退出客戶端8.help -- 幫助命令
zookeeper 四字命令 (服務(wù)器)
conf 輸出相關(guān)服務(wù)配置的詳細(xì)信息
cons 列出所有連接到服務(wù)器的客戶端的完全的連接 / 會話的詳細(xì)信息窍仰。
包括“接受 / 發(fā)送”的包數(shù)量汉规、會話 id 、操作延遲驹吮、最后的操作執(zhí)行等等信息
dump 列出未經(jīng)處理的會話和臨時(shí)節(jié)點(diǎn)针史。
envi 輸出關(guān)于服務(wù)環(huán)境的詳細(xì)信息(區(qū)別于 conf 命令)。
reqs 列出未經(jīng)處理的請求 ruok 測試服務(wù)是否處于正確狀態(tài)碟狞。
如果確實(shí)如此啄枕,那么服務(wù)返回“ imok ”,否則不做任何相應(yīng) stat 輸出關(guān)于性能和連接的客戶端的列表族沃。
wchs 列出服務(wù)器 watch 的詳細(xì)信息
wchc 通過 session 列出服務(wù)器 watch 的詳細(xì)信息频祝,它的輸出是一個(gè)與 watch 相關(guān)的會話的列表
wchp 通過路徑列出服務(wù)器 watch 的詳細(xì)信息泌参。它輸出一個(gè)與 session 相關(guān)的路徑