Kafka 原理介紹及安裝部署
標(biāo)簽:kafka 安裝
簡(jiǎn)介
Kafka 是 Linkedin 于 2010 年 12 月份開源的消息系統(tǒng)窍帝,它主要用于處理活躍的流式數(shù)據(jù)榕茧,包括網(wǎng)站的點(diǎn)擊量垃沦、用戶訪問或搜索的內(nèi)容等。
Kafka 是一個(gè)輕量級(jí)的/分布式的/具備 replication 能力的日志采集組件,通常被集成到應(yīng)用系統(tǒng)中,收集“用戶行為日志”等,并可以使用各種消費(fèi)終端(consumer)將消息轉(zhuǎn)存到 HDFS 等其他結(jié)構(gòu)化數(shù)據(jù)存儲(chǔ)系統(tǒng)中用押。
Kafka 的作用類似于緩存肢簿,即活躍的數(shù)據(jù)和離線處理系統(tǒng)之間的緩存。
特性
高吞吐率:即使在普通的節(jié)點(diǎn)上每秒鐘也能處理成百上千的 message蜻拨。
顯式分布式:即所有的 producer池充、broker 和 consumer 都會(huì)有多個(gè),均為分布式的缎讼。
易于擴(kuò)展:可以由一個(gè)節(jié)點(diǎn)擴(kuò)展至數(shù)千個(gè)節(jié)點(diǎn)收夸,不需要停止集群。
使用場(chǎng)景
Messaging
對(duì)于一些常規(guī)的消息系統(tǒng),kafka 是個(gè)不錯(cuò)的選擇血崭。Kafka 的 partitons/replication 和容錯(cuò),使其具有良好的擴(kuò)展性和性能優(yōu)勢(shì)卧惜。
但是厘灼,kafka 并沒有提供 JMS 中的“事務(wù)性”、“消息傳輸擔(dān)保(消息確認(rèn)機(jī)制)”咽瓷、“消息分組”等企業(yè)級(jí)特性设凹。kafka 只能使用作為"常規(guī)"的消息系統(tǒng),在一定程度上,尚未確保消息的發(fā)送與接收絕對(duì)可靠(比如,消息重發(fā),消息發(fā)送丟失等)。
Websit activity tracking
Kafka 可以作為“網(wǎng)站活性跟蹤”的最佳工具茅姜,可以將網(wǎng)頁(yè)/用戶操作等信息發(fā)送到 kafka 中闪朱,并進(jìn)行實(shí)時(shí)監(jiān)控,或者離線統(tǒng)計(jì)分析等。
Log Aggregation
kafka 的特性決定它非常適合作為“日志收集中心”钻洒,應(yīng)用程序可以將操作日志“批量”“異步”的發(fā)送到 kafka 集群中,而不是保存在本地或者數(shù)據(jù)庫(kù)中奋姿。
Kafka 可以批量提交消息/壓縮消息等,這對(duì)生產(chǎn)者而言,幾乎感覺不到性能的開支。此時(shí)消費(fèi)者可以使用 Hadoop 等其他系統(tǒng)化的存儲(chǔ)和分析系統(tǒng)航唆。
原理架構(gòu)
原理
Kafka 的設(shè)計(jì)初衷是希望做為一個(gè)統(tǒng)一的信息收集平臺(tái)胀蛮,能夠?qū)崟r(shí)的收集反饋信息,并需要能夠支撐較大的數(shù)據(jù)量,且具備良好的容錯(cuò)能力糯钙。
Kafka 使用文件存儲(chǔ)消息(append only log)粪狼,這就直接決定了 kafka 在性能上嚴(yán)重依賴文件系統(tǒng)的本身特性。為了減少磁盤寫入的次數(shù)任岸,broker 會(huì)將消息暫時(shí)緩存起來再榄,當(dāng)消息的個(gè)數(shù)(或尺寸)達(dá)到一定閥值時(shí),再一起刷新到磁盤享潜,這樣會(huì)減少磁盤 IO 調(diào)用的次數(shù)困鸥。
Producer 將會(huì)和 topic 下所有 partition leader 保持 socket 連接。消息由 producer 直接通過 socket 發(fā)送到 broker剑按,中間不會(huì)經(jīng)過任何“路由層”疾就。
事實(shí)上,消息被路由到哪個(gè) partition 上艺蝴,由 producer 客戶端決定猬腰,默認(rèn)方式為“輪詢”。
Consumer 端向 broker 發(fā)送 “fetch” 請(qǐng)求猜敢,并告知其獲取消息的 offset姑荷,此后 consumer 將會(huì)獲得一定條數(shù)的消息。Consumer 端也可以重置 offset 來重新消費(fèi)消息缩擂。
Kafka 將每個(gè) partition 數(shù)據(jù)復(fù)制到多個(gè) server 上鼠冕,任何一個(gè) partition 都有一個(gè) leader 和多個(gè) follower (可以沒有)。
備份的個(gè)數(shù)可以通過 broker 配置文件來設(shè)定胯盯,其中 leader 處理所有的讀寫請(qǐng)求懈费,follower 需要和 leader 保持同步。
當(dāng) leader 失效時(shí)博脑,需在 followers 中重新選取出新的 leader憎乙,可能此時(shí) follower 落后于 leader薄坏,因此需要選擇一個(gè) “up-to-date” 的 follower。選擇 follower 時(shí)需要兼顧一個(gè)問題寨闹,就是新的 leader server 上所已經(jīng)承載的 partition leader 的個(gè)數(shù),如果一個(gè) server 上有過多的 partition leader君账,意味著此 server 將承受著更多的 IO 壓力繁堡,因此在選舉新 leader 時(shí),需要考慮到“負(fù)載均衡”乡数。
Kafka 中所有的 topic 內(nèi)容都是以日志的形式存儲(chǔ)在 broker 上椭蹄。如果一個(gè) topic 的名稱為 “my_topic”,它有 2 個(gè) partitions净赴,那么日志將會(huì)保存在 my_topic_0 和 my_topic_1 兩個(gè)目錄中绳矩。
日志文件中保存了一序列 “l(fā)og entries” (日志條目),每個(gè) log entry 格式為“4個(gè)字節(jié)的數(shù)字 N 表示消息的長(zhǎng)度” + “N 個(gè)字節(jié)的消息內(nèi)容”玖翅。每個(gè)日志都有一個(gè) offset 來唯一的標(biāo)記一條消息翼馆,offset 的值為8個(gè)字節(jié)的數(shù)字,表示此消息在此 partition 中所處的起始位置金度。
部署架構(gòu)
Kafka 集群应媚、producer 和 consumer 都依賴于 zookeeper 來保證系統(tǒng)的可用性,保存一些元數(shù)據(jù)信息猜极。
kafka 集群幾乎不需要維護(hù)任何 consumer 和 producer 狀態(tài)信息中姜,這些信息由 zookeeper 保存,因此 producer 和 consumer 的客戶端實(shí)現(xiàn)非常輕量級(jí)跟伏,它們可以隨意離開丢胚,而不會(huì)對(duì)集群造成額外的影響。
Producer 端使用 zookeeper 用來發(fā)現(xiàn) broker 列表受扳,以及和 Topic 下每個(gè) partition leader 建立 socket 連接并發(fā)送消息携龟。
Broker 端使用 zookeeper 用來注冊(cè) broker 信息,監(jiān)測(cè) partition leader 存活性辞色。
Consumer 端使用 zookeeper 用來注冊(cè) consumer 信息骨宠,其中包括 consumer 消費(fèi)的 partition 列表等,同時(shí)也用來發(fā)現(xiàn) broker 列表相满,并和 partition leader 建立 socket 連接层亿,獲取消息。
安裝部署
安裝
Kafka 的安裝比較簡(jiǎn)單立美,只需要保證 zookeeper 集群運(yùn)行正常匿又,并配置好 server.properties 文件即可。
修改配置文件中的以下幾項(xiàng)建蹄,并保證在各節(jié)點(diǎn)上保持一致:
broker.id=0 //該屬性的值要保證各個(gè)節(jié)點(diǎn)之間不能重復(fù)碌更,該值可以為隨意的整數(shù)
port=9092
log.dirs=/opt/kafka-0.8.2/data
zookeeper.connect=localhost:2181 //此處需要修改成使用的 zookeeper 集群的信息裕偿,逗號(hào)分隔
啟動(dòng)
保證 zookeeper 集群正常運(yùn)行,然后在每個(gè)節(jié)點(diǎn)上執(zhí)行以下命令痛单,啟動(dòng)進(jìn)程:
/opt/kafka-0.8.2/bin/kafka-server-start.sh /opt/kafka-0.8.2/config/server.properties &
驗(yàn)證
可以使用 kafka 自帶的 producer 和 consumer 來驗(yàn)證集群是否能正常工作嘿棘。
使用 bin 目錄下的 kafka-console-consumer.sh 和 kafka-console-producer.sh 腳本可以啟動(dòng) consumer 和 producer 客戶端。
- 進(jìn)入 kafka 的安裝目錄旭绒,執(zhí)行以下命令(假設(shè) zookeeper 集群信息為:server1:2181,server2:2181,server3:2181)鸟妙,創(chuàng)建一個(gè)名為 “my_topic”的topic:
bin/kafka-topics.sh --create --zookeeper server1:2181,server2:2181,server3:2181 --replication-factor 1 --partitions 1 --topic my_topic
- 啟動(dòng)一個(gè) producer,將消息發(fā)送到 “my_topic”挥吵,執(zhí)行以下命令(假設(shè) kafka 集群信息為:server1:9092,server2:9092,server3:9092):
bin/kafka-console-producer.sh --borker-list server1:9092,server2:9092,server3:9092 --topic my_topic
- 輸入以下消息:
This is a message.
This is another message.
- 在集群中的另一個(gè)節(jié)點(diǎn)上重父,進(jìn)入 kafka 的安裝目錄,然后啟動(dòng)一個(gè) consumer忽匈,訂閱 “my_topic” 的消息房午,執(zhí)行以下命令:
bin/kafka-console-consumer.sh --zookeeper server1:2181,server2:2181,server3:2181 --topic my_topic --from-beginning
- 然后可以看到終端上輸出以下內(nèi)容,證明集群可以正常使用:
This is a message.
This is another message.
API
Producer
0.8 以前版本的 Procuder API 有兩種:kafka.producer.SyncProducer 和 kafka.producer.async.AsyncProducer.它們都實(shí)現(xiàn)了同一個(gè)接口丹允。
0.8 以后的新版本 Producer API 提供了以下功能:
可以將多個(gè)消息緩存到本地隊(duì)列里郭厌,然后異步的批量發(fā)送到 broker,可以通過參數(shù) producer.type=async 做到雕蔽。
自己編寫 Encoder 來序列化消息沪曙,只需實(shí)現(xiàn)下面這個(gè)接口。默認(rèn)的 Encoder 是 kafka.serializer.DefaultEncoder萎羔。
提供了基于 Zookeeper 的 broker 自動(dòng)感知能力液走,可以通過參數(shù) zk.connect 實(shí)現(xiàn)。如果不使用 Zookeeper贾陷,也可以使用 broker.list 參數(shù)指定一個(gè)靜態(tài)的 brokers 列表缘眶,這樣消息將被隨機(jī)的發(fā)送到一個(gè) broker 上记某,一旦選中的 broker 失敗了屈张,消息發(fā)送也就失敗了号俐。
通過分區(qū)函數(shù) kafka.producer.Partitioner 類對(duì)消息分區(qū)堪澎,可以通過參數(shù) partitioner.class 定制分區(qū)函數(shù)。
Consumer
Consumer API 有兩個(gè)級(jí)別:低級(jí)別和高級(jí)別拣宰。
低級(jí)別的和一個(gè)指定的 broker 保持連接庇麦,并在接收完消息后關(guān)閉連接饿敲,這個(gè)級(jí)別是無狀態(tài)的冈爹,每次讀取消息都帶著 offset涌攻。
高級(jí)別的 API 隱藏了和 brokers 連接的細(xì)節(jié),在不必關(guān)心服務(wù)端架構(gòu)的情況下和服務(wù)端通信频伤。還可以自己維護(hù)消費(fèi)狀態(tài)恳谎,并可以通過一些條件指定訂閱特定的 topic,比如白名單黑名單或者正則表達(dá)式。
低級(jí)別 API
低級(jí)別的 API 是高級(jí)別 API 實(shí)現(xiàn)的基礎(chǔ)因痛,也是為了一些對(duì)維持消費(fèi)狀態(tài)有特殊需求的場(chǎng)景婚苹,比如 Hadoop consumer 這樣的離線 consumer。
高級(jí)別 API
這個(gè) API 圍繞著由 KafkaStream 實(shí)現(xiàn)的迭代器展開鸵膏,每個(gè)流代表一系列從一個(gè)或多個(gè)分區(qū)的 broker 上匯聚來的消息膊升,每個(gè)流由一個(gè)線程處理,所以客戶端可以在創(chuàng)建的時(shí)候通過參數(shù)指定想要幾個(gè)流谭企。
一個(gè)流是多個(gè)分區(qū)多個(gè) broker 的合并用僧,但是每個(gè)分區(qū)的消息只會(huì)流向一個(gè)流。
代碼示例
以下是兩個(gè)簡(jiǎn)單的 Producer 和 Consumer 的代碼示例赞咙。
Producer(循環(huán)向topic中發(fā)送消息):
import java.util.Properties;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class Producer extends Thread{
private final kafka.javaapi.producer.Producer<Integer, String> producer;
private final String topic;
private final Properties props = new Properties();
public Producer(String topic){
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list", "10.106.1.234:9092"); //需要替換成自己的broker信息
producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));
this.topic = topic;
}
public void run() {
int messageNo = 1;
while(true){
String messageStr = new String("Message_" + messageNo);
producer.send(new KeyedMessage<Integer, String>(topic, messageStr));
messageNo++;
}
}
}
Consumer(訂閱topic消息,并在控制臺(tái)輸出):
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class Consumer extends Thread{
private final ConsumerConnector consumer;
private final String topic;
public Consumer(String topic){
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
this.topic = topic;
}
private static ConsumerConfig createConsumerConfig(){
Properties props = new Properties();
props.put("zookeeper.connect", zkConnect); //需要將zkConnect替換成自己的Zookeeper集群信息
props.put("group.id", "group1");
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
return new ConsumerConfig(props);
}
public void run() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while(it.hasNext())
System.out.println(new String(it.next().message()));
}
}
對(duì)比
Kafka VS Flume
Kafka 是一個(gè)非常通用的系統(tǒng)糟港。多個(gè)生產(chǎn)者和消費(fèi)者可以共享多個(gè)主題攀操。相比之下,F(xiàn)lume 被設(shè)計(jì)為一個(gè)旨在往 HDFS 或 HBase 發(fā)送數(shù)據(jù)的專用工具秸抚,它對(duì) HDFS 有特殊的優(yōu)化速和,并且集成了 Hadoop 的安全特性。
Flume 內(nèi)置了很多的 source 和 sink 組件剥汤。而 Kafka 只有一個(gè)更小的生產(chǎn)消費(fèi)者生態(tài)系統(tǒng)颠放,并且 Kafka 的社區(qū)支持不好。使用 Kafka 通常需要自己編寫生產(chǎn)者和消費(fèi)者代碼吭敢。
Flume 可以使用攔截器實(shí)時(shí)處理數(shù)據(jù)碰凶,這對(duì)于數(shù)據(jù)屏蔽或者過量是很有用的。而 Kafka 需要外部的流處理系統(tǒng)才能做到鹿驼。
Kafka 和 Flume 都是可靠的系統(tǒng)欲低,通過適當(dāng)?shù)呐渲枚寄鼙WC零數(shù)據(jù)丟失。然而畜晰,F(xiàn)lume 不支持副本事件砾莱,如果 Flume 代理的一個(gè)節(jié)點(diǎn)奔潰了,即使使用了可靠的文件管道方式凄鼻,也會(huì)丟失這些事件直到恢復(fù)這些磁盤腊瑟。而 Kafka 則沒有這個(gè)問題。
Kafka VS RabbitMQ
RabbitMQ 遵循 AMQP 協(xié)議块蚌,以 broker 為中心闰非,有消息的確認(rèn)機(jī)制。Kafka 遵從一般的 MQ 結(jié)構(gòu)峭范,以 consumer 為中心河胎,無消息確認(rèn)機(jī)制。
Kafka 具有很高的吞吐量虎敦,內(nèi)部采用消息的批量處理游岳,消息處理的效率很高政敢。RabbitMQ 在吞吐量方面稍遜于 Kafka,支持對(duì)消息的可靠的傳遞胚迫,支持事務(wù)喷户,但不支持批量的操作,基于存儲(chǔ)的可靠性的要求存儲(chǔ)可以采用內(nèi)存或者硬盤访锻。
Kafka 采用 Zookeeper 對(duì)集群中的 broker褪尝、consumer 進(jìn)行管理,可以注冊(cè) topic 到 Zookeeper 上期犬;通過 Zookeeper 的協(xié)調(diào)機(jī)制河哑,producer 保存對(duì)應(yīng) topic 的 broker 信息,可以隨機(jī)或者輪詢發(fā)送到 broker 上龟虎;并且 producer 可以基于語義指定分片璃谨,消息發(fā)送到 broker 的某分片上。而 RabbitMQ 的負(fù)載均衡需要單獨(dú)的 loadbalancer 進(jìn)行支持鲤妥。