kafka是一個分布式消息隊列。具有高性能托享、持久化骚烧、多副本備份、橫向擴(kuò)展能力闰围。生產(chǎn)者往隊列里寫消息赃绊,消費(fèi)者從隊列里取消息進(jìn)行業(yè)務(wù)邏輯。一般在架構(gòu)設(shè)計中起到解耦羡榴、削峰碧查、異步處理的作用。kafka對外使用topic的概念传惠,生產(chǎn)者往topic里寫消息,消費(fèi)者從讀消息稻扬。為了做到水平擴(kuò)展,一個topic實(shí)際是由多個partition組成的泰佳,遇到瓶頸時,可以通過增加partition的數(shù)量來進(jìn)行橫向擴(kuò)容逝她。單個parition內(nèi)是保證消息有序。每新寫一條消息汽绢,kafka就是在對應(yīng)的文件append寫侧戴,所以性能非常高。
基礎(chǔ)知識
什么是消息隊列(Message Queue)酗宋?
- 消息(Message)
網(wǎng)絡(luò)中的兩臺計算機(jī)或者兩個通訊設(shè)備之間傳遞的數(shù)據(jù)。例如說:文本蜕猫、音樂、視頻等內(nèi)容回右。
- 隊列(Queue)
一種特殊的線性表(數(shù)據(jù)元素首尾相接),特殊之處在于只允許在首部刪除元素和在尾部追加元素翔烁。入隊、出隊蹬屹。
- 消息隊列(MQ)
消息+隊列,保存消息的隊列慨默。消息的傳輸過程中的容器贩耐;主要提供生產(chǎn)潮太、消費(fèi)接口供外部調(diào)用做數(shù)據(jù)的存儲和獲取。
MQ分類
MQ主要分為兩類:點(diǎn)對點(diǎn)(p2p)虾攻、發(fā)布訂閱(Pub/Sub)
共同點(diǎn):
*消息生產(chǎn)者生產(chǎn)消息發(fā)送到queue中抛蚤,然后消息消費(fèi)者從queue中讀取并且消費(fèi)消息寻狂。
不同點(diǎn):
p2p模型包括:消息隊列(Queue)、發(fā)送者(Sender)蛇券、接收者(Receiver)
一個生產(chǎn)者生產(chǎn)的消息只有一個消費(fèi)者(Consumer)(即一旦被消費(fèi)缀壤,消息就不在消息隊列中)纠亚。比如說打電話。
Pub/Sub包含:消息隊列(Queue)蒂胞、主題(Topic)、發(fā)布者(Publisher)骗随、訂閱者(Subscriber)。每個消息可以有多個消費(fèi)者鸿染,彼此互不影響。比如我發(fā)布一個微博:關(guān)注我的人都能夠看到涨椒。
那么在大數(shù)據(jù)領(lǐng)域呢,為了滿足日益增長的數(shù)據(jù)量蚕冬,也有一款可以滿足百萬級別消息的生成和消費(fèi),分布式囤热、持久穩(wěn)定的產(chǎn)品——Kafka。
vKafka概念
在要了解Kafka之前赢乓,必須先了解主題忧侧,經(jīng)紀(jì)人蚓炬,生產(chǎn)者和消費(fèi)者等主要術(shù)語。 下圖說明了主要術(shù)語肯夏,表格詳細(xì)描述了圖表組件。如已了解的可以跳過此部分。
![Linux安裝Kafka](https://upload-images.jianshu.io/upload_im#2-d61b70b3ee8bc906.jpg?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
在上圖中烁兰,主題配置為三個分區(qū)。 分區(qū)1具有兩個偏移因子0和1.分區(qū)2具有四個偏移因子0,1,2和3.分區(qū)3具有一個偏移因子0.副本的id與承載它的服務(wù)器的id相同沪斟。
假設(shè),如果主題的復(fù)制因子設(shè)置為3暇矫,那么Kafka將創(chuàng)建每個分區(qū)的3個相同的副本,并將它們放在集群中以使其可用于其所有操作李根。 為了平衡集群中的負(fù)載,每個代理都存儲一個或多個這些分區(qū)房轿。 多個生產(chǎn)者和消費(fèi)者可以同時發(fā)布和檢索消息。
Topics(主題):每條發(fā)布到Kafka集群的消息都有一個類別囱持,這個類別被稱為topic。(物理上不同topic的消息分開存儲洪唐,邏輯上一個topic的消息雖然保存于一個或多個broker上但用戶只需指定消息的topic即可生產(chǎn)或消費(fèi)數(shù)據(jù)而不必關(guān)心數(shù)據(jù)存于何處)
Partition(分區(qū)):parition是物理上的概念吼蚁,每個topic包含一個或多個partition,創(chuàng)建topic時可指定parition數(shù)量肝匆。每個partition對應(yīng)于一個文件夾,該文件夾下存儲該partition的數(shù)據(jù)和索引文件
Partition offset(分區(qū)偏移):每個分區(qū)消息具有稱為 offset 的唯一序列標(biāo)識旗国。
Replicas of partition(分區(qū)備份):副本只是一個分區(qū)的備份。 副本從不讀取或?qū)懭霐?shù)據(jù)能曾。 它們用于防止數(shù)據(jù)丟失。
Broker:Kafka集群包含一個或多個服務(wù)器寿冕,這種服務(wù)器被稱為broker
Brokers(經(jīng)紀(jì)人):代理是負(fù)責(zé)維護(hù)發(fā)布數(shù)據(jù)的簡單系統(tǒng)。 每個代理中的每個主題可以具有零個或多個分區(qū)驼唱。 假設(shè),如果在一個主題和N個代理中有N個分區(qū),每個代理將有一個分區(qū)辨赐。假設(shè)在一個主題中有N個分區(qū)并且多于N個代理(n + m),則第一個N代理將具有一個分區(qū)掀序,并且下一個M代理將不具有用于該特定主題的任何分區(qū)。假設(shè)在一個主題中有N個分區(qū)并且小于N個代理(n-m)森枪,每個代理將在它們之間具有一個或多個分區(qū)共享。 由于代理之間的負(fù)載分布不相等县袱,不推薦使用此方案。
Kafka Cluster(Kafka集群):Kafka有多個代理被稱為Kafka集群式散。 可以擴(kuò)展Kafka集群,無需停機(jī)暴拄。 這些集群用于管理消息數(shù)據(jù)的持久性和復(fù)制漓滔。
Producers(生產(chǎn)者):生產(chǎn)者是發(fā)送給一個或多個Kafka主題的消息的發(fā)布者响驴。 生產(chǎn)者向Kafka經(jīng)紀(jì)人發(fā)送數(shù)據(jù)。 每當(dāng)生產(chǎn)者將消息發(fā)布給代理時撕蔼,代理只需將消息附加到最后一個段文件。 實(shí)際上鲸沮,該消息將被附加到分區(qū)。 生產(chǎn)者還可以向他們選擇的分區(qū)發(fā)送消息讼溺。
Consumers(消費(fèi)者):消費(fèi)消息。每個consumer屬于一個特定的consumer group(可為每個consumer指定group name怒坯,若不指定group name則屬于默認(rèn)的group)。使用consumer high level API時剔猿,同一topic的一條消息只能被同一個consumer group內(nèi)的一個consumer消費(fèi),但多個consumer group可同時消費(fèi)這一消息艳馒。
Consumer Group(消費(fèi)者組):是邏輯上的概念员寇,是Kafka實(shí)現(xiàn)單播和廣播兩種消息模型的手段。同一個topic的數(shù)據(jù)第美,會廣播給不同的group蝶锋;同一個group中的worker什往,只有一個worker能拿到這個數(shù)據(jù)。換句話說别威,對于同一個topic,每個group都可以拿到同樣的所有數(shù)據(jù)省古,但是數(shù)據(jù)進(jìn)入group后只能被其中的一個worker消費(fèi)。group內(nèi)的worker可以使用多線程或多進(jìn)程來實(shí)現(xiàn)豺妓,也可以將進(jìn)程分散在多臺機(jī)器上,worker的數(shù)量通常不超過partition的數(shù)量琳拭,且二者最好保持整數(shù)倍關(guān)系,因?yàn)镵afka在設(shè)計時假定了一個partition只能被一個worker消費(fèi)(同一group內(nèi))白嘁。簡單的理解就是,實(shí)現(xiàn)了隊列的方式絮缅。同一個groupid 的 consumer 屬于一個隊列方式,消費(fèi)了就完事了
Leader(領(lǐng)導(dǎo)者): Leader 是負(fù)責(zé)給定分區(qū)的所有讀取和寫入的節(jié)點(diǎn)盟蚣。 每個分區(qū)都有一個服務(wù)器充當(dāng)Leader.
Follower(追隨者):跟隨領(lǐng)導(dǎo)者指令的節(jié)點(diǎn)被稱為Follower卖怜。 如果領(lǐng)導(dǎo)失敗屎开,一個追隨者將自動成為新的領(lǐng)導(dǎo)者马靠。 跟隨者作為正常消費(fèi)者,拉取消息并更新其自己的數(shù)據(jù)存儲甩鳄。
Kafka的特性:
- 可靠性:Kafka是分布式,分區(qū)妙啃,復(fù)制和容錯的俊戳。
- 可擴(kuò)展性:Kafka消息傳遞系統(tǒng)輕松縮放,無需停機(jī)抑胎。
- 耐用性/持久性:Kafka使用分布式提交日志,這意味著消息會盡可能快地保留在磁盤上阿逃,因此它是持久的。
- 性能:Kafka對于發(fā)布和訂閱消息都具有高吞吐量赃蛛。 即使存儲了許多TB的消息,它也保持穩(wěn)定的性能呕臂。
- 高并發(fā):支持?jǐn)?shù)千個客戶端同時讀寫
使用場景:
- 指標(biāo):Kafka通常用于操作監(jiān)控數(shù)據(jù)。 這涉及聚合來自分布式應(yīng)用程序的統(tǒng)計信息诵闭,以產(chǎn)生操作數(shù)據(jù)的集中饋送。
- 運(yùn)營指標(biāo):Kafka也經(jīng)常用來記錄運(yùn)營監(jiān)控數(shù)據(jù)疏尿。包括收集各種分布式應(yīng)用的數(shù)據(jù),生產(chǎn)各種操作的集中反饋褥琐,比如報警和報告。
- 日志聚合解決方案:Kafka可用于跨組織從多個服務(wù)收集日志敌呈,并使它們以標(biāo)準(zhǔn)格式提供給多個服務(wù)器。
- 消息系統(tǒng):解耦和生產(chǎn)者和消費(fèi)者磕洪、緩存消息等。
- 流處理:流行的框架(如Storm和Spark Streaming)從主題中讀取數(shù)據(jù)析显,對其進(jìn)行處理,并將處理后的數(shù)據(jù)寫入新主題谷异,供用戶和應(yīng)用程序使用。 Kafka的強(qiáng)耐久性在流處理的上下文中也非常有用歹嘹。
安裝Kafka之前,先確認(rèn)是否已安裝Java和Zookeeper
沒有安裝Java JDK的朋友可以直接看這里尺上。《CentOS安裝Java JDK》
沒有安裝Zookeeper的朋友可以直接看這里材蛛。《安裝ZooKeeper》
安裝Kafka
2.1 下載
wget http://mirrors.hust.edu.cn/apache/kafka/2.0.0/kafka_2.12-2.0.0.tgz
如果下載很慢或者不方便圆到,也可以用這里已經(jīng)下載好的壓縮包。鏈接: https://pan.baidu.com/s/1u8mSfubwZupFqKtK6PH6Qw 提取碼: v5em
2.2 解壓
tar -xzf kafka_2.12-2.0.0.tgz
注意构资,kafka_2.12-2.0.0.tgz版本是已經(jīng)編譯好的版本,解壓就能使用陨簇。
2.3 配置server.properties
默認(rèn)配置advertised.listeners=PLAINTEXT://:your.host.name:9092
修改為advertised.listeners=PLAINTEXT://:ip:9092
ip為服務(wù)器ip。
hostname和端口是用來建議給生產(chǎn)者和消費(fèi)者使用的河绽,如果沒有設(shè)置,將會使用listeners的配置耙饰,如果listeners也沒有配置,將使用java.net.InetAddress.getCanonicalHostName()來獲取這個hostname和port苟跪,對于ipv4,基本就是localhost了件已。
"PLAINTEXT"表示協(xié)議,可選的值有PLAINTEXT和SSL篷扩,hostname可以指定IP地址,也可以用"0.0.0.0"表示對所有的網(wǎng)絡(luò)接口有效鉴未,如果hostname為空表示只對默認(rèn)的網(wǎng)絡(luò)接口有效。也就是說如果你沒有配置advertised.listeners铜秆,就使用listeners的配置通告給消息的生產(chǎn)者和消費(fèi)者饰豺,這個過程是在生產(chǎn)者和消費(fèi)者獲取源數(shù)據(jù)(metadata)领跛。
更多介紹:
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults
############################# Server Basics #############################
##################################################################################
# broker就是一個kafka的部署實(shí)例,在一個kafka集群中值纱,每一臺kafka都要有一個broker.id
# 并且鳞贷,該id唯一,且必須為整數(shù)
##################################################################################
broker.id=10
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = security_protocol://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
##################################################################################
#The number of threads handling network requests
# 默認(rèn)處理網(wǎng)絡(luò)請求的線程個數(shù) 3個
##################################################################################
num.network.threads=3
##################################################################################
# The number of threads doing disk I/O
# 執(zhí)行磁盤IO操作的默認(rèn)線程個數(shù) 8
##################################################################################
num.io.threads=8
##################################################################################
# The send buffer (SO_SNDBUF) used by the socket server
# socket服務(wù)使用的進(jìn)行發(fā)送數(shù)據(jù)的緩沖區(qū)大小搀愧,默認(rèn)100kb
##################################################################################
socket.send.buffer.bytes=102400
##################################################################################
# The receive buffer (SO_SNDBUF) used by the socket server
# socket服務(wù)使用的進(jìn)行接受數(shù)據(jù)的緩沖區(qū)大小疆偿,默認(rèn)100kb
##################################################################################
socket.receive.buffer.bytes=102400
##################################################################################
# The maximum size of a request that the socket server will accept (protection against OOM)
# socket服務(wù)所能夠接受的最大的請求量搓幌,防止出現(xiàn)OOM(Out of memory)內(nèi)存溢出,默認(rèn)值為:100m
# (應(yīng)該是socker server所能接受的一個請求的最大大小溉愁,默認(rèn)為100M)
##################################################################################
socket.request.max.bytes=104857600
############################# Log Basics (數(shù)據(jù)相關(guān)部分,kafka的數(shù)據(jù)稱為log)#############################
##################################################################################
# A comma seperated list of directories under which to store log files
# 一個用逗號分隔的目錄列表拐揭,用于存儲kafka接受到的數(shù)據(jù)
##################################################################################
log.dirs=/home/uplooking/data/kafka
##################################################################################
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
# 每一個topic所對應(yīng)的log的partition分區(qū)數(shù)目,默認(rèn)1個堂污。更多的partition數(shù)目會提高消費(fèi)
# 并行度,但是也會導(dǎo)致在kafka集群中有更多的文件進(jìn)行傳輸
# (partition就是分布式存儲盟猖,相當(dāng)于是把一份數(shù)據(jù)分開幾份來進(jìn)行存儲,即劃分塊式镐、劃分分區(qū)的意思)
##################################################################################
num.partitions=1
##################################################################################
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
# 每一個數(shù)據(jù)目錄用于在啟動kafka時恢復(fù)數(shù)據(jù)和在關(guān)閉時刷新數(shù)據(jù)的線程個數(shù)。如果kafka數(shù)據(jù)存儲在磁盤陣列中
# 建議此值可以調(diào)整更大娘汞。
##################################################################################
num.recovery.threads.per.data.dir=1
############################# Log Flush Policy (數(shù)據(jù)刷新策略)#############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs(平衡) here:
# 1. Durability 持久性: Unflushed data may be lost if you are not using replication.
# 2. Latency 延時性: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput 吞吐量: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# kafka中只有基于消息條數(shù)和時間間隔數(shù)來制定數(shù)據(jù)刷新策略,而沒有大小的選項价说,這兩個選項可以選擇配置一個
# 當(dāng)然也可以兩個都配置,默認(rèn)情況下兩個都配置鳖目,配置如下。
# The number of messages to accept before forcing a flush of data to disk
# 消息刷新到磁盤中的消息條數(shù)閾值
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
# 消息刷新到磁盤生成一個log數(shù)據(jù)文件的時間間隔
#log.flush.interval.ms=1000
############################# Log Retention Policy(數(shù)據(jù)保留策略) #############################
# The following configurations control the disposal(清理) of log segments(分片). The policy can
# be set to delete segments after a period of time, or after a given size has accumulated(累積).
# A segment will be deleted whenever(無論什么時間) *either* of these criteria(標(biāo)準(zhǔn)) are met. Deletion always happens
# from the end of the log.
# 下面的配置用于控制數(shù)據(jù)片段的清理领迈,只要滿足其中一個策略(基于時間或基于大谐勾拧),分片就會被刪除
# The minimum age of a log file to be eligible for deletion
# 基于時間的策略衷蜓,刪除日志數(shù)據(jù)的時間,默認(rèn)保存7天
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes. 1G
# 基于大小的策略磁浇,1G
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
# 數(shù)據(jù)分片策略
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies 5分鐘
# 每隔多長時間檢測數(shù)據(jù)是否達(dá)到刪除條件
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=uplooking01:2181,uplooking02:2181,uplooking03:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
啟動Kafka
3.1 啟動ZooKeeper
/usr/local/zookeeper-3.4.13/bin/zkServer.sh start
注意朽褪,需要先啟動ZooKeeper再啟動kafka无虚,不然會報錯。如下圖:
[圖片上傳中...(image-e84db3-1578707854882-5)]
3.2 啟動kafka
bin/kafka-server-start.sh config/server.properties
啟動Kafka Broker后衍锚,在ZooKeeper終端上鍵入命令 jps,效果如下:
3.3 停止kafka
bin/kafka-server-stop.sh config/server.properties
vKafka topic
4.1 創(chuàng)建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic demo
其中demo為創(chuàng)建的topic名稱。
如上圖戴质,創(chuàng)建了一個名為 demo 的主題,其中包含一個分區(qū)和一個副本因子告匠。 創(chuàng)建成功之后會輸出:Created topic "demo".
如上圖斗埂,創(chuàng)建主題后,系統(tǒng)會在config / server.properties文件中的"/ tmp / kafka-logs /"中指定的創(chuàng)建主題的日志呛凶。
4.2 查詢topic列表
bin/kafka-topics.sh --list --zookeeper localhost:2181
4.3 查看topic信息
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic demo
4.3 刪除topic
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic demo
vKafka 生產(chǎn)/消費(fèi)
5.1 啟動生產(chǎn)者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic demo
從上面的語法,生產(chǎn)者命令行客戶端需要兩個主要參數(shù) -
代理列表 - 我們要發(fā)送郵件的代理列表漾稀。 在這種情況下,我們只有一個代理崭捍。 Config / server.properties文件包含代理端口ID,因?yàn)槲覀冎牢覀兊拇碚趥陕牰丝?092啰脚,因此您可以直接指定它。主題名稱:demo橄浓。
5.2 啟動消費(fèi)者
為了方便測試,另啟一個sheel窗口 這樣效果更明顯荸实。需要注意的是舊版本和新版本的命令是不一樣的
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic demo --from-beginning
報錯提示:zookeeper is not a recognized option
發(fā)現(xiàn)在啟動的時候說使用 --zookeeper是一個過時的方法,最新的版本中命令如下:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo --from-beginning
可以開啟兩個終端准给,一個發(fā)送消息,一個接受消息露氮。效果如下:
Kafka 博客總結(jié)
Kafka是一個統(tǒng)一的平臺,用于處理所有實(shí)時數(shù)據(jù)Feed畔规。 Kafka支持低延遲消息傳遞局扶,并在出現(xiàn)機(jī)器故障時提供對容錯的保證详民。 它具有處理大量不同消費(fèi)者的能力。 Kafka非成蚩纾快,執(zhí)行2百萬寫/秒兔综。 Kafka將所有數(shù)據(jù)保存到磁盤,這實(shí)質(zhì)上意味著所有寫入都會進(jìn)入操作系統(tǒng)(RAM)的頁面緩存软驰。 這使得將數(shù)據(jù)從頁面緩存?zhèn)鬏數(shù)骄W(wǎng)絡(luò)套接字非常有效。