創(chuàng)建kafka topic
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic topicname --partitions 12 --replication-factor 2
注:?partitions指定topic分區(qū)數(shù)榛搔,replication-factor指定topic每個分區(qū)的副本數(shù)
partitions分區(qū)數(shù):
partitions :分區(qū)數(shù)倦始,控制topic將分片成多少個log∶畋裕可以顯示指定吉懊,如果不指定則會使用broker(server.properties)中的num.partitions配置的數(shù)量
雖然增加分區(qū)數(shù)可以提供kafka集群的吞吐量、但是過多的分區(qū)數(shù)或者或是單臺服務(wù)器上的分區(qū)數(shù)過多假勿,會增加不可用及延遲的風(fēng)險(xiǎn)借嗽。因?yàn)槎嗟姆謪^(qū)數(shù),意味著需要打開更多的文件句柄转培、增加點(diǎn)到點(diǎn)的延時恶导、增加客戶端的內(nèi)存消耗。
分區(qū)數(shù)也限制了consumer的并行度浸须,即限制了并行consumer消息的線程數(shù)不能大于分區(qū)數(shù)
分區(qū)數(shù)也限制了producer發(fā)送消息是指定的分區(qū)惨寿。如創(chuàng)建topic時分區(qū)設(shè)置為1,producer發(fā)送消息時通過自定義的分區(qū)方法指定分區(qū)為2或以上的數(shù)都會出錯的删窒;這種情況可以通過alter –partitions 來增加分區(qū)數(shù)裂垦。
replication-factor副本
replication factor 控制消息保存在幾個broker(服務(wù)器)上,一般情況下等于broker的個數(shù)肌索。
如果沒有在創(chuàng)建時顯示指定或通過API向一個不存在的topic生產(chǎn)消息時會使用broker(server.properties)中的default.replication.factor配置的數(shù)量
查看所有topic列表
bin/kafka-topics.sh --zookeeper localhost:2181 --list
查看指定topic信息
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic t_cdr
控制臺向topic生產(chǎn)數(shù)據(jù)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic t_cdr
控制臺消費(fèi)topic的數(shù)據(jù)
bin/kafka-console-consumer.sh? --zookeeper localhost:2181? --topic t_cdr --from-beginning
查看topic某分區(qū)偏移量最大(薪堵!)值
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic hive-mdatabase-hostsltable? --time -1 --broker-list localhost:9092 --partitions 0
注:?time為-1時表示最大值,time為-2時表示最小值
增加topic分區(qū)數(shù)
為topic t_cdr 增加10個分區(qū)
bin/kafka-topics.sh --zookeeper localhost:2181? --alter --topic t_cdr --partitions 10
刪除topic,慎用晕换,只會刪除zookeeper中的元數(shù)據(jù)午乓,消息文件須手動刪除
bin/kafka-run-class.sh kafka.admin.DeleteTopicCommand --zookeeper localhost:2181 --topic t_cdr
查看topic消費(fèi)進(jìn)度
這個會顯示出consumer group的offset情況, 必須參數(shù)為--group闸准, 不指定--topic益愈,默認(rèn)為所有topic
Displays the: Consumer Group, Topic, Partitions, Offset, logSize, Lag, Owner for the specified set of Topics and Consumer Group
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker
required argument: [group]
Option Description
------ -----------
--broker-info Print broker info
--group Consumer group.
--help Print this message.
--topic Comma-separated list of consumer
? topics (all topics if absent).
--zkconnect ZooKeeper connect string. (default: localhost:2181)
Example,
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group pv
Group? ? ? ? ? Topic? ? ? ? ? ? ? Pid Offset? logSize? ? Lag? ? Owner
pv? ? ? ? ? ? ? page_visits? ? ? ? 0? 21? ? ? 21? ? ? ? 0? ? ? none
pv? ? ? ? ? ? ? page_visits? ? ? ? 1? 19? ? ? 19? ? ? ? 0? ? ? none
pv? ? ? ? ? ? ? page_visits? ? ? ? 2? 20? ? ? 20? ? ? ? 0? ? ? none
Kafka 原理?
Kafka是最初由Linkedin公司開發(fā),是一個分布式夷家、分區(qū)的蒸其、多副本的、多訂閱者瘾英,基于zookeeper協(xié)調(diào)的分布式日志系統(tǒng)(也可以當(dāng)做MQ系統(tǒng))枣接,常見可以用于web/nginx日志、訪問日志缺谴,消息服務(wù)等等但惶,Linkedin于2010年貢獻(xiàn)給了Apache基金會并成為頂級開源項(xiàng)目。
Kafka主要設(shè)計(jì)目標(biāo)如下:
以時間復(fù)雜度為O(1)的方式提供消息持久化能力湿蛔,即使對TB級以上數(shù)據(jù)也能保證常數(shù)時間的訪問性能膀曾。
高吞吐率。即使在非常廉價(jià)的商用機(jī)器上也能做到單機(jī)支持每秒100K條(也就是100000條——十萬)消息的傳輸阳啥。
支持Kafka Server間的消息分區(qū)添谊,及分布式消費(fèi),同時保證每個partition內(nèi)的消息順序傳輸察迟。
同時支持離線數(shù)據(jù)處理和實(shí)時數(shù)據(jù)處理斩狱。
Kafka專用術(shù)語:
Broker:消息中間件處理結(jié)點(diǎn),一個Kafka節(jié)點(diǎn)就是一個broker扎瓶,多個broker可以組成一個Kafka集群所踊。
Topic:一類消息,Kafka集群能夠同時負(fù)責(zé)多個topic的分發(fā)概荷。
Partition:topic物理上的分組秕岛,一個topic可以分為多個partition,每個partition是一個有序的隊(duì)列误证。
Segment:partition物理上由多個segment組成继薛。
offset:每個partition都由一系列有序的、不可變的消息組成愈捅,這些消息被連續(xù)的追加到partition中遏考。partition中的每個消息都有一個連續(xù)的序列號叫做offset,用于partition唯一標(biāo)識一條消息
topic & partition
在Kafka文件存儲中蓝谨,同一個topic下有多個不同partition诈皿,每個partition為一個目錄林束,partiton命名規(guī)則為topic名稱+有序序號,第一個partiton序號從0開始稽亏,序號最大值為partitions數(shù)量減1壶冒。
這里也就是broker——>topic——>partition——>segment?
segment file組成:由2大部分組成,分別為index file和data file截歉,此2個文件一一對應(yīng)胖腾,成對出現(xiàn),后綴".index"和“.log”分別表示為segment索引文件瘪松、數(shù)據(jù)文件咸作。
segment文件命名規(guī)則:partion全局的第一個segment從0開始,后續(xù)每個segment文件名為上一個segment文件最后一條消息的offset值宵睦。數(shù)值最大為64位long大小记罚,19位數(shù)字字符長度,沒有數(shù)字用0填充壳嚎。
segment中index與data file對應(yīng)關(guān)系物理結(jié)構(gòu)如下:
索引文件存儲大量元數(shù)據(jù)桐智,數(shù)據(jù)文件存儲大量消息,索引文件中元數(shù)據(jù)指向?qū)?yīng)數(shù)據(jù)文件中message的物理偏移地址烟馅。
其中以索引文件中元數(shù)據(jù)3,497為例说庭,依次在數(shù)據(jù)文件中表示第3個message(在全局partiton表示第368772個message),以及該消息的物理偏移地址為497郑趁。
副本(replication)策略
? 1.數(shù)據(jù)同步
kafka 0.8后提供了Replication機(jī)制來保證Broker的failover刊驴。
引入Replication之后,同一個Partition可能會有多個Replica寡润,而這時需要在這些Replication之間選出一個Leader捆憎,Producer和Consumer只與這個Leader交互,其它Replica作為Follower從Leader中復(fù)制數(shù)據(jù)梭纹。
2.副本放置策略
Kafka分配Replica的算法如下(注意!!! 下面的broker掷酗、partition副本數(shù)這些編號都是從0開始編號的):
????將所有存活的N個Brokers和待分配的Partition排序
將第i個Partition分配到第(i mod n)個Broker上读虏,這個Partition的第一個Replica存在于這個分配的Broker上陕悬,并且會作為partition的優(yōu)先副本(?這里就基本說明了一個topic的partition在集群上的大致分布情況?)
將第i個Partition的第j個Replica分配到第((i + j) mod n)個Broker上
假設(shè)集群一共有4個brokers指孤,一個topic有4個partition欢摄,每個Partition有3個副本金顿。下圖是每個Broker上的副本分配情況醒颖。
對于Kafka而言剿另,定義一個Broker是否“活著”包含兩個條件:
一是它必須維護(hù)與ZooKeeper的session(這個通過ZooKeeper的Heartbeat機(jī)制來實(shí)現(xiàn))雇锡。
二是Follower必須能夠及時將Leader的消息復(fù)制過來逛钻,不能“落后太多”。
重點(diǎn)在于kafka中partition的副本放置算法锰提,同時間接說明了一個topic的partition在集群中的分配情況...