Kafka學習筆記一

1.簡介

kafka是一個分布式的流平臺祭陷,一個流平臺有以下三個能力:

  • 發(fā)布和訂閱記錄流涉瘾,就像一個消息隊列或者企業(yè)級消息系統(tǒng)
  • 以容錯的持久方式存儲記錄流厘线。
  • 在記錄發(fā)生時處理記錄流奄毡。

Kafka通常用于兩大類應用程序:

  • 構(gòu)建實時流數(shù)據(jù)管道鸦做,在系統(tǒng)或應用程序之間可靠地獲取數(shù)據(jù)
  • 構(gòu)建實時流應用程序励烦,轉(zhuǎn)換或響應數(shù)據(jù)流

為了理解Kafka是如何做這些事情的,讓我們深入研究一下Kafka的能力泼诱。
首先是幾個概念:

  • Kafka作為集群運行在一個或多個服務器上坛掠,這些服務器可以跨多個數(shù)據(jù)中心
  • Kafka集群將記錄流存儲在稱為主題的類別中。
  • 每條記錄由一個鍵、一個值和一個時間戳組成却音。

卡夫卡有四個核心api:

  • 生產(chǎn)者接口改抡,允許一個應用發(fā)布記錄流到一個或者多個kafka主題中
  • 消費者接口矢炼,允許一個用用訂閱一個或者多個主題并且處理產(chǎn)生給它們的記錄流系瓢。
  • 流接口,允許一個應用作為一個流處理器句灌,從一個或者多個主題中消費一個輸入流夷陋,產(chǎn)生一個輸出流到一個或者多個主題中,有效的轉(zhuǎn)換輸入流到輸出流胰锌。
  • 連接接口骗绕,允許構(gòu)建和運行可重用的生產(chǎn)者或消費者,將Kafka主題連接到現(xiàn)有的應用程序或數(shù)據(jù)系統(tǒng)资昧。例如酬土,關系數(shù)據(jù)庫的連接器可能捕獲對表的每個更改。

在Kafka中格带,客戶端和服務器之間的通信使用簡單撤缴、高性能、語言無關的TCP協(xié)議完成叽唱。該協(xié)議是版本化的屈呕,并與舊版本保持向后兼容性。我們?yōu)榭ǚ蚩ㄌ峁┝艘粋€Java客戶端棺亭,但是客戶端有多種語言版本虎眨。

Topics and Logs

讓我們首先深入卡夫卡為記錄流提供的核心抽象——主題。
主題是記錄發(fā)布到的類別或提要名稱镶摘,主題在kafka中常是多訂閱用戶的嗽桩,也就是說,一個主題可以有零個凄敢、一個或多個訂閱寫入其中的數(shù)據(jù)的消費者碌冶。

對于每個主題,Kafka集群維護一個分區(qū)日志贡未,每個分區(qū)都是一個有序的种樱,不變的記錄序列,這些記錄連續(xù)地附加到一個結(jié)構(gòu)化的提交日志中俊卤。分區(qū)中的每條記錄都被分配了一個名為偏移量的連續(xù)id號嫩挤,該偏移量惟一地標識分區(qū)中的每條記錄。

卡夫卡群集使用可配置的保留期持久保存所有已發(fā)布的記錄消恍,無論它們是否已被使用岂昭。例如,如果保留策略設置為兩天狠怨,則在記錄發(fā)布后的兩天內(nèi)约啊,該記錄可供使用邑遏,之后將被丟棄以釋放空間。Kafka的性能在數(shù)據(jù)大小方面是穩(wěn)定的恰矩,所以長時間存儲數(shù)據(jù)不是問題记盒。

事實上,基于每個消費者保留的唯一元數(shù)據(jù)是該消費者在日志中的偏移量或位置外傅。該偏移量由消費者控制:通常消費者在讀取記錄時會線性地推進偏移量纪吮,但是,事實上萎胰,因為位置是由消費者控制的碾盟,所以它可以按照自己喜歡的任何順序消費記錄。例如技竟,消費者可以重置為較舊的偏移量來重新處理過去的數(shù)據(jù)冰肴,或者跳到最近的記錄并從“現(xiàn)在”開始消費。

這些功能的結(jié)合意味著卡夫卡的消費者非常便宜——他們可以來去自由榔组,而不會對集群或其他消費者產(chǎn)生太大影響熙尉。例如,您可以使用我們的命令行工具來“跟蹤”任何主題的內(nèi)容瓷患,而不改變?nèi)魏维F(xiàn)有消費者所消費的內(nèi)容骡尽。

日志中的分區(qū)有幾個目的。首先擅编,它們允許日志擴展到適合單個服務器的大小之外攀细。每個單獨的分區(qū)必須適合托管它的服務器,但是一個主題可能有許多分區(qū)爱态,因此它可以處理任意數(shù)量的數(shù)據(jù)谭贪。其次,它們充當并行性的單位——稍后會詳細介紹锦担。

Distribution

日志的分區(qū)分布在卡夫卡集群中的服務器上俭识,每個服務器處理數(shù)據(jù)和分區(qū)共享請求。為了容錯洞渔,每個分區(qū)在可配置數(shù)量的服務器上復制套媚。

每個分區(qū)有一個充當“領導者”的服務器和零個或多個充當“追隨者”的服務器。領導者處理分區(qū)的所有讀寫請求磁椒,而追隨者被動地復制領導者堤瘤。如果領導者失敗,其中一個追隨者將自動成為新的領導者浆熔。每個服務器充當它的一些分區(qū)的領導者和其他分區(qū)的追隨者本辐,因此集群內(nèi)的負載非常平衡。

Geo-Replication

Kafka MirrorMaker為集群提供地理復制支持。使用MirrorMaker慎皱,消息可以跨多個數(shù)據(jù)中心或云區(qū)域復制老虫。您可以在主動/被動情況下使用它進行備份和恢復;或者在主動/主動場景中茫多,將數(shù)據(jù)放置得更靠近用戶祈匙,或者支持數(shù)據(jù)局部性要求。

Producers

生產(chǎn)者發(fā)布數(shù)據(jù)到選擇的主題上地梨,生產(chǎn)者負責選擇將哪個記錄分配給主題中的哪個分區(qū)菊卷。這可以簡單地通過循環(huán)方式來平衡負載缔恳,也可以根據(jù)一些語義分區(qū)函數(shù)(比如基于記錄中的某個鍵)來完成宝剖。稍后將詳細介紹分區(qū)的使用!

Consumers

消費者用一個消費者組名稱來標記自己歉甚,發(fā)布到一個主題的每個記錄被傳遞到每個訂閱消費者組中的一個消費者實例万细。消費者實例可以在不同的過程中,也可以在不同的機器上纸泄。

如果所有的消費者實例具有相同的消費者組赖钞,那么記錄將在消費者實例上有效地負載平衡。

如果所有的消費者實例都有不同的消費者組聘裁,那么每個記錄將被廣播給所有的消費者進程雪营。

一個雙服務器卡夫卡集群托管四個分區(qū)(P0-P3),有兩個消費群衡便。消費者組甲有兩個消費者實例献起,組乙有四個。

然而镣陕,更常見的是谴餐,我們發(fā)現(xiàn)主題有少量的消費者群體,每個“邏輯訂戶”有一個呆抑。每個組由許多可伸縮性和容錯的消費者實例組成岂嗓。這只不過是發(fā)布-訂閱語義,其中訂閱者是一群消費者鹊碍,而不是一個流程厌殉。

卡夫卡實現(xiàn)消費的方式是在消費實例上劃分日志中的分區(qū),這樣每個實例在任何時間點都是分區(qū)“公平份額”的唯一消費方侈咕」保卡夫卡協(xié)議動態(tài)地處理保持團體成員身份的過程。如果新實例加入該組乎完,它們將從該組的其他成員那里接管一些分區(qū)熏兄;如果一個實例死亡,它的分區(qū)將被分配給其余的實例。

卡夫卡只提供了一個分區(qū)內(nèi)記錄的總順序摩桶,而不是一個主題中不同分區(qū)之間的順序桥状。對于大多數(shù)應用程序來說,按分區(qū)排序以及按鍵劃分數(shù)據(jù)的能力就足夠了硝清。但是辅斟,如果您需要對記錄進行總排序,這可以通過只有一個分區(qū)的主題來實現(xiàn)芦拿,盡管這意味著每個消費者組只有一個消費者進程士飒。

Multi-tenancy

您可以將kafka部署為多租戶解決方案。多租戶是通過配置哪些主題可以生產(chǎn)或消費數(shù)據(jù)來實現(xiàn)的蔗崎。配額也有運營支持酵幕。管理員可以對請求定義和實施配額,以控制客戶端使用的代理資源缓苛。有關更多信息芳撒,請參見安全文檔。

Guarantees

kafka給出以下保障:

  • 生產(chǎn)者發(fā)送到特定主題分區(qū)的消息將按照發(fā)送順序進行附加未桥。也就是說笔刹,如果記錄M1和M2是由同一個生產(chǎn)者發(fā)送的,而M1是第一個發(fā)送的冬耿,那么M1的偏移量將低于M2舌菜,并且會出現(xiàn)在日志中的更早位置
  • 消費者實例按照記錄在日志中的存儲順序查看記錄。
  • 對于復制因子為N的主題亦镶,我們將容忍多達N-1個服務器故障日月,而不會丟失提交給日志的任何記錄。

文檔的Design部分給出了關于這些保證的更多細節(jié)染乌。

Kafka as a Messaging System

Kafka的流概念與傳統(tǒng)的企業(yè)消息系統(tǒng)相比如何?
消息傳遞傳統(tǒng)上有兩種模式:隊列和發(fā)布訂閱山孔。在一個隊列中,一個消費者池可以從服務器中讀取荷憋,并且每個記錄都到達其中一個台颠;在發(fā)布-訂閱中,記錄被廣播給所有消費者勒庄。這兩種模式各有優(yōu)缺點串前。隊列的優(yōu)勢在于,它允許您將數(shù)據(jù)處理劃分到多個消費者實例上实蔽,這使您可以擴展您的處理荡碾。不幸的是,隊列不是多用戶的——一旦一個進程讀取了數(shù)據(jù)局装,它就消失了坛吁。發(fā)布-訂閱允許您向多個進程廣播數(shù)據(jù)劳殖,但是沒有擴展處理的方法,因為每個消息都發(fā)送給每個訂閱者拨脉。

卡夫卡的消費組概念概括了這兩個概念哆姻。與隊列一樣,消費者組允許您將處理劃分為多個進程(消費者組的成員)玫膀。如同發(fā)布訂閱一樣矛缨,卡夫卡允許你向多個消費者組廣播信息。

卡夫卡模型的優(yōu)點是每個主題都有這兩個屬性——它可以擴展處理帖旨,也是多用戶的——沒有必要選擇其中一個箕昭。

卡夫卡也比傳統(tǒng)的信息系統(tǒng)有更強的順序保證。

傳統(tǒng)的隊列在服務器上按順序保留記錄解阅,如果多個消費者從隊列中消費落竹,那么服務器將按照記錄的存儲順序分發(fā)記錄。然而瓮钥,盡管服務器按順序分發(fā)記錄筋量,但記錄是異步傳遞給消費者的,因此它們可能會在不同的消費者中無序到達碉熄。這實際上意味著在并行消費的情況下,記錄的排序會丟失肋拔。消息傳遞系統(tǒng)通常通過“獨占消費者”的概念來解決這個問題锈津,該概念只允許一個進程從隊列中消費,但這當然意味著在處理中沒有并行性凉蜂。

卡夫卡做得更好琼梆。通過在主題中引入并行性(分區(qū))的概念,卡夫卡能夠在消費者進程池中提供排序保證和負載平衡窿吩。這是通過將主題中的分區(qū)分配給消費者組中的消費者來實現(xiàn)的茎杂,這樣每個分區(qū)正好被組中的一個消費者使用。通過這樣做纫雁,我們確保消費者是該分區(qū)的唯一讀者煌往,并且按順序消費數(shù)據(jù)。由于有許多分區(qū)轧邪,這仍然平衡了許多消費者實例的負載刽脖。但是,請注意忌愚,消費者組中的消費者實例不能多于分區(qū)曲管。

Kafka as a Storage System

任何允許發(fā)布消息和消費消息分離的消息隊列都是同樣作為一個存儲系統(tǒng)。Kafka不同的地方在于硕糊,他是一個很好的存儲系統(tǒng)院水。

寫入卡夫卡的數(shù)據(jù)被寫入磁盤并復制以實現(xiàn)容錯腊徙。卡夫卡允許生產(chǎn)者等待確認檬某,這樣寫操作就不會被認為是完整的昧穿,直到它被完全復制,并保證存留橙喘,即使寫入的服務器發(fā)生故障时鸵。

卡夫卡的磁盤結(jié)構(gòu)很好地利用了可伸縮性——無論服務器上有50 KB還是50 TB的持久數(shù)據(jù),卡夫卡都會執(zhí)行相同的操作厅瞎。

由于重視存儲并允許客戶端控制其讀取位置饰潜,您可以將卡夫卡視為一種專用分布式文件系統(tǒng),專門用于高性能和簸、低延遲的提交日志存儲彭雾、復制和傳播。

有關卡夫卡提交日志存儲和復制設計的詳細信息锁保,請閱讀本頁

Kafka for Stream Processing

僅僅讀取薯酝、寫入和存儲數(shù)據(jù)流是不夠的,目的是實現(xiàn)數(shù)據(jù)流的實時處理爽柒。

在Kafka中吴菠,流處理器是指從輸入主題獲取連續(xù)的數(shù)據(jù)流,對這個輸入執(zhí)行一些處理浩村,并產(chǎn)生連續(xù)的數(shù)據(jù)流到輸出主題做葵。

例如,一個零售應用程序可能接受銷售和發(fā)貨的輸入流心墅,并根據(jù)這些數(shù)據(jù)計算出重新訂購和價格調(diào)整的輸出流酿矢。

可以直接使用生產(chǎn)者和消費者APIs進行簡單的處理。然而怎燥,對于更復雜的轉(zhuǎn)換瘫筐,卡夫卡提供了一個完全集成的流應用編程接口。這允許構(gòu)建執(zhí)行非平凡處理的應用程序铐姚,這些應用程序通過流計算聚合或?qū)⒘鬟B接在一起策肝。

此功能有助于解決此類應用程序所面臨的難題:處理無序數(shù)據(jù)、在代碼更改時重新處理輸入谦屑、執(zhí)行有狀態(tài)計算等等驳糯。

流應用編程接口建立在卡夫卡提供的核心原語之上:它使用生產(chǎn)者和消費者應用編程接口作為輸入,使用卡夫卡作為狀態(tài)存儲氢橙,并在流處理器實例之間使用相同的組機制作為容錯酝枢。

Putting the Pieces Together

這種消息傳遞、存儲和流處理的結(jié)合可能看起來不尋常悍手,但對于卡夫卡作為流平臺的角色來說卻是至關重要的帘睦。

像HDFS這樣的分布式文件系統(tǒng)允許為批處理存儲靜態(tài)文件袍患。像這樣的系統(tǒng)實際上允許存儲和處理過去的歷史數(shù)據(jù)。

傳統(tǒng)的企業(yè)消息系統(tǒng)允許處理您訂閱后將到達的未來消息竣付。以這種方式構(gòu)建的應用程序在未來數(shù)據(jù)到達時對其進行處理诡延。

卡夫卡結(jié)合了這兩種能力,這種結(jié)合對于卡夫卡作為流媒體應用平臺的使用以及流媒體數(shù)據(jù)管道都至關重要古胆。

通過結(jié)合存儲和低延遲訂閱肆良,流式應用程序可以以相同的方式處理過去和未來的數(shù)據(jù)。也就是說逸绎,單個應用程序可以處理歷史的惹恃、存儲的數(shù)據(jù),但不會在到達最后一條記錄時結(jié)束棺牧,而是可以在未來數(shù)據(jù)到達時繼續(xù)處理巫糙。這是流處理的廣義概念,包括批處理和消息驅(qū)動的應用程序颊乘。

同樣参淹,對于流式數(shù)據(jù)管道,對實時事件的訂閱的組合使得將卡夫卡用于非常低延遲的管道成為可能乏悄;但是可靠地存儲數(shù)據(jù)的能力使得它可以用于必須保證數(shù)據(jù)交付的關鍵數(shù)據(jù)浙值,或者用于與離線系統(tǒng)集成,離線系統(tǒng)僅周期性地加載數(shù)據(jù)纲爸,或者可能長時間停機進行維護亥鸠。流處理設施使數(shù)據(jù)到達時進行轉(zhuǎn)換成為可能。

有關卡夫卡提供的保證识啦、APIs和功能的更多信息,請參見其他文檔神妹。

2.使用案例

下面是kafka的一些常見的使用案例

Messaging

Kafka可以很好地替代傳統(tǒng)的消息代理颓哮。消息代理用于各種各樣的原因(將處理與數(shù)據(jù)生成器解耦,緩沖未處理的消息鸵荠,等等)冕茅。與大多數(shù)消息傳遞系統(tǒng)相比,Kafka具有更好的吞吐量蛹找、內(nèi)置分區(qū)姨伤、復制和容錯能力,這使它成為大型消息處理應用程序的一個很好的解決方案庸疾。

根據(jù)我們的經(jīng)驗乍楚,消息傳遞的使用通常是相對較低的吞吐量,但是可能需要較低的端到端延遲届慈,并且通常取決于卡夫卡提供的強大的持久性保證徒溪。

在這一領域是卡夫卡比得上傳統(tǒng)的消息系統(tǒng)忿偷,例如ActiveMQ的或RabbitMQ的。

Website Activity Tracking

卡夫卡最初的使用案例是能夠?qū)⒂脩艋顒痈櫣艿乐亟橐唤M實時發(fā)布訂閱源臊泌。這意味著網(wǎng)站活動(頁面視圖鲤桥、搜索或用戶可能采取的其他操作)發(fā)布到中心主題,每個活動類型有一個主題渠概。這些源可用于訂閱一系列用例茶凳,包括實時處理、實時監(jiān)控播揪,以及加載到Hadoop或離線數(shù)據(jù)倉庫系統(tǒng)中進行離線處理和報告贮喧。

活動跟蹤通常非常大,因為每個用戶頁面視圖都會生成許多活動消息剪芍。

Metrics

Kafka通常用于操作監(jiān)控數(shù)據(jù)塞淹。這包括聚合來自分布式應用程序的統(tǒng)計信息,以生成操作數(shù)據(jù)的集中提要罪裹。

Log Aggregation

許多人用卡夫卡來代替日志聚合解決方案饱普。日志聚合通常從服務器收集物理日志文件,并將它們放在中心位置(可能是文件服務器或HDFS)進行處理状共√赘卡夫卡將文件的細節(jié)抽象化,并將日志或事件數(shù)據(jù)更清晰地抽象化為消息流峡继。這允許更低延遲的處理冯袍,更容易支持多個數(shù)據(jù)源和分布式數(shù)據(jù)消耗。與以日志為中心的系統(tǒng)(如抄寫員或水槽)相比碾牌,卡夫卡提供了同樣好的性能康愤、更強的復制耐久性保證以及低得多的端到端延遲。

Stream Processing

卡夫卡的許多用戶在由多個階段組成的處理管道中處理數(shù)據(jù)舶吗,在這些階段征冷,原始輸入數(shù)據(jù)從卡夫卡主題中被消費,然后被聚集、豐富或以其他方式轉(zhuǎn)換成新的主題以供進一步消費或后續(xù)處理。例如连茧,用于推薦新聞文章的處理管道可能從RSS源抓取文章內(nèi)容,并將其發(fā)布到“文章”主題叔收;進一步的處理可能會對該內(nèi)容進行規(guī)范化或重復數(shù)據(jù)消除,并將清理后的文章內(nèi)容發(fā)布到新主題傲隶;最后的處理階段可能會嘗試向用戶推薦該內(nèi)容饺律。這種處理流水線基于各個主題創(chuàng)建實時數(shù)據(jù)流的圖形。從0.10.0.0開始伦籍,Apache Kafka提供了一個稱為Kafka Streams的輕量級但功能強大的流處理庫來執(zhí)行上述數(shù)據(jù)處理蓝晒。除了卡夫卡流腮出,其他開源流處理工具包括阿帕奇風暴(Apache Storm)和阿帕奇薩姆扎(Apache Samza)。

Event Sourcing

事件源是應用程序設計的一種風格芝薇,其中狀態(tài)變化被記錄為按時間順序排列的記錄序列胚嘲。卡夫卡對非常大的存儲日志數(shù)據(jù)的支持使得它成為以這種風格構(gòu)建的應用程序的優(yōu)秀后端洛二。

Commit Log

Kafka可以作為分布式系統(tǒng)的一種外部提交日志馋劈。日志幫助在節(jié)點之間復制數(shù)據(jù),并充當失敗節(jié)點恢復數(shù)據(jù)的重新同步機制晾嘶。Kafka中的日志壓縮特性有助于支持這種用法妓雾。在這種用法中,Kafka類似于Apache BookKeeper項目垒迂。

快速開始

本教程假設你是新手械姻,并且沒有Kafka和Zookeeper數(shù)據(jù),因為kafka控制臺腳本在unix和windows平臺是不同的机断,在windows平臺上楷拳,使用bin\windows\ 取代bin/,并且腳本擴展名為.bat

1.下載并解壓

下載地址2.3.0版本

tar -xzf kafka_2.12-2.3.0.tgz
cd kafka_2.12-2.3.0

2.啟動服務

kafka使用Zookeeper吏奸,所以你需要首先啟動一個Zookeeper服務器欢揖,你可以使用包中的腳本,開啟一個單節(jié)點的Zookeeper實例:

?  kafka_2.12-2.3.0 bin/zookeeper-server-start.sh config/zookeeper.properties
[2019-09-27 13:13:12,607] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2019-09-27 13:13:12,612] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)
[2019-09-27 13:13:12,613] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager)
[2019-09-27 13:13:12,613] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager)
[2019-09-27 13:13:12,613] WARN Either no config or no quorum defined in config, running  in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)
[2019-09-27 13:13:12,641] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2019-09-27 13:13:12,641] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain)
[2019-09-27 13:13:12,674] INFO Server environment:zookeeper.version=3.4.14-4c25d480e66aadd371de8bd2fd8da255ac140bcf, built on 03/06/2019 16:18 GMT (org.apache.zookeeper.server.ZooKeeperServer)
[2019-09-27 13:13:12,674] INFO Server environment:host.name=192.168.44.193 (org.apache.zookeeper.server.ZooKeeperServer)
[2019-09-27 13:13:12,674] INFO Server environment:java.version=1.8.0_211 (org.apache.zookeeper.server.ZooKeeperServer)
[2019-09-27 13:13:12,674] INFO Server environment:java.vendor=Oracle Corporation (org.apache.zookeeper.server.ZooKeeperServer)
[2019-09-27 13:13:12,674] INFO Server environment:java.home=/Library/Java/JavaVirtualMachines/jdk1.8.0_211.jdk/Contents/Home/jre (org.apache.zookeeper.server.ZooKeeperServer)
......

現(xiàn)在就可以開啟kafka服務了

?  kafka_2.12-2.3.0 bin/kafka-server-start.sh config/server.properties
[2019-09-27 13:14:35,753] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2019-09-27 13:14:36,630] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)
[2019-09-27 13:14:36,631] INFO starting (kafka.server.KafkaServer)
[2019-09-27 13:14:36,633] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2019-09-27 13:14:36,674] INFO [ZooKeeperClient Kafka server] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)
[2019-09-27 13:14:36,687] INFO Client environment:zookeeper.version=3.4.14-4c25d480e66aadd371de8bd2fd8da255ac140bcf, built on 03/06/2019 16:18 GMT (org.apache.zookeeper.ZooKeeper)
[2019-09-27 13:14:36,687] INFO Client environment:host.name=192.168.44.193 (org.apache.zookeeper.ZooKeeper)
[2019-09-27 13:14:36,687] INFO Client environment:java.version=1.8.0_211 (org.apache.zookeeper.ZooKeeper)
[2019-09-27 13:14:36,687] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)
[2019-09-27 13:14:36,687] INFO Client environment:java.home=/Library/Java/JavaVirtualMachines/jdk1.8.0_211.jdk/Contents/Home/jre (org.apache.zookeeper.ZooKeeper)
......

現(xiàn)在奋蔚,讓我們創(chuàng)建一個主題她混,名為test,只有一個分區(qū),一個副本

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

現(xiàn)在查看主題泊碑,運行以下命令:

bin/kafka-topics.sh --list --bootstrap-server localhost:9092



test

此外坤按,您還可以配置代理,使其在發(fā)布不存在的主題時自動創(chuàng)建主題馒过,而不是手動創(chuàng)建主題晋涣。

卡夫卡有一個命令行客戶端,它將從文件或標準輸入中獲取輸入沉桌,并將其作為消息發(fā)送給卡夫卡集群。默認情況下算吩,每一行都將作為單獨的消息發(fā)送留凭。
運行生成器,然后在控制臺中鍵入一些消息發(fā)送到服務器:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

卡夫卡還有一個命令行消費者偎巢,可以將消息轉(zhuǎn)儲到標準輸出中蔼夜。

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message

如果您在不同的終端上運行上述每個命令,那么您現(xiàn)在應該能夠在生產(chǎn)者終端中鍵入消息压昼,并看到它們出現(xiàn)在消費者終端中求冷。

所有的命令行工具都有額外的選項;在沒有參數(shù)的情況下運行該命令將顯示詳細記錄它們的使用信息瘤运。

最后,設置多代理集群

到目前為止匠题,我們一直在運行一個單一代理拯坟,但這一點都不好玩。對kafka來說韭山,一個單一的代理只是一個大小為1的集群郁季,所以除了再啟動幾個代理實例之外,沒有什么變化钱磅。但是為了感受一下梦裂,讓我們將集群擴展到三個節(jié)點(仍然都在本地機器上)。

首先盖淡,我們?yōu)槊總€代理創(chuàng)建一個配置文件(在windows平臺上使用copy命令):

> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties

現(xiàn)在編輯新的文件年柠,設置以下屬性:

config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dirs=/tmp/kafka-logs-1
 
config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dirs=/tmp/kafka-logs-2

broker.id屬性是群集中每個節(jié)點的唯一和永久名稱。我們必須覆蓋端口和日志目錄褪迟,只是因為我們在同一臺機器上運行這些目錄冗恨,并且我們希望阻止代理在同一端口上注冊或覆蓋彼此的數(shù)據(jù)

我們已經(jīng)有了Zookeeper,我們的單個節(jié)點已經(jīng)啟動牵咙,所以我們只需要啟動兩個新節(jié)點:

> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...

現(xiàn)在創(chuàng)建一個復制因子為3的新主題:

> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic

好吧派近,但是現(xiàn)在我們有了一個集群,我們怎么知道哪個代理在做什么洁桌?要查看渴丸,運行“describe topics”命令:

> bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: my-replicated-topic  Partition: 0    Leader: 1   Replicas: 1,2,0 Isr: 1,2,0
    
    

以下是對輸出的解釋。第一行給出了所有分區(qū)的概要另凌,每一行給出了關于一個分區(qū)的信息谱轨。因為這個主題只有一個分區(qū),所以只有一行吠谢。

  • “l(fā)eader”是負責給定分區(qū)的所有讀寫的節(jié)點土童。每個節(jié)點將是隨機選擇的分區(qū)部分的領導者。
  • “replicas”是復制這個分區(qū)日志的節(jié)點列表工坊,無論它們是主節(jié)點還是當前活動節(jié)點献汗。
  • “isr”是一組“同步”副本。這是副本列表的子集王污,它當前是活動的罢吃,并趕上了領導者。

請注意昭齐,在我的示例中尿招,節(jié)點1是主題唯一分區(qū)的領導者。

我們可以對我們創(chuàng)建的原始主題運行相同的命令來查看它在哪里:

> bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test
Topic:test  PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: test Partition: 0    Leader: 0   Replicas: 0 Isr: 0

因此,一點也不奇怪--原來的主題沒有副本且位于0服務節(jié)點上就谜,也是我們創(chuàng)建的集群中唯一的服務節(jié)點怪蔑。

讓我們在新的主題上發(fā)布一些消息:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
^C

現(xiàn)在,來消費這些消息:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C

現(xiàn)在讓我們測試一下容錯能力丧荐。節(jié)點1是領導者缆瓣,所以讓我們殺了它:

> ps aux | grep server-1.properties
7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
> kill -9 7564

在windows平臺上,可使用:

> wmic process where "caption = 'java.exe' and commandline like '%server-1.properties%'" get processid
ProcessId
6016
> taskkill /pid 6016 /f

領導層已切換到其中一個追隨者篮奄,節(jié)點1不再在同步副本集中:

> bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: my-replicated-topic  Partition: 0    Leader: 2   Replicas: 1,2,0 Isr: 2,0

但是這些信息仍然可以被消費捆愁,即使最初記錄這些信息的領導者已經(jīng)離開了:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C

從控制臺寫入數(shù)據(jù)并將其寫入控制臺是一個方便的起點,但是您可能希望使用其他來源的數(shù)據(jù)或?qū)afka的數(shù)據(jù)導出到其他系統(tǒng)窟却。對于許多系統(tǒng)昼丑,您可以使用Kafka Connect導入或?qū)С鰯?shù)據(jù),而不是編寫定制的集成代碼夸赫。

Kafka Connect是Kafka附帶的一個工具菩帝,可以導入和導出數(shù)據(jù)到Kafka。它是一個運行連接器的可擴展工具茬腿,連接器實現(xiàn)了與外部系統(tǒng)交互的自定義邏輯呼奢。在這個快速入門中,我們將看到如何使用簡單的連接器運行Kafka Connect切平,這些連接器將數(shù)據(jù)從文件導入Kafka主題握础,并將數(shù)據(jù)從Kafka主題導出到文件。

首先悴品,我們將創(chuàng)建一些用于測試的種子數(shù)據(jù):

> echo -e "foo\nbar" > test.txt

在windows平臺:

> echo foo> test.txt
> echo bar>> test.txt

接下來禀综,我們將啟動兩個以獨立模式運行的連接器,這意味著它們運行在單個本地專用進程中苔严。我們提供了三個配置文件作為參數(shù)定枷。第一種始終是Kafka連接進程的配置,包括要連接的常見配置届氢,如Kafka代理和數(shù)據(jù)的序列化格式欠窒。其余的配置文件每個都指定要創(chuàng)建的連接器。這些文件包括一個惟一的連接器名稱退子、要實例化的連接器類以及連接器所需的任何其他配置岖妄。

> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

kafka附帶的這些示例配置文件使用了您之前啟動的默認本地集群配置,并創(chuàng)建了兩個連接器:第一個是源連接器寂祥,它從輸入文件中讀取行衣吠,并將每一行發(fā)送到一個kafka主題;第二個是接收連接器壤靶,它從kafka主題中讀取消息,并將每一行生成輸出文件中的行惊搏。

在啟動過程中贮乳,您會看到許多日志消息忧换,包括一些指示連接器正在實例化的消息。一旦kafka連接進程開啟向拆,源連接器應該開始從test.txt中讀取行亚茬,并將其生成到主題connect-test,而接收連接器應該開始從主題connect-test中讀取消息浓恳,并將其寫入文件test.sink.txt.我們可以通過檢查輸出文件的內(nèi)容來驗證數(shù)據(jù)是否已經(jīng)通過整個管道傳送:

> more test.sink.txt
foo
bar

數(shù)據(jù)被存貯在kafka名為connect-test的主題中刹缝,所以,我們可以運行一個控制臺消費者來查看主題中的數(shù)據(jù)颈将,如下:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
...

連接器繼續(xù)處理數(shù)據(jù)梢夯,因此我們可以將數(shù)據(jù)添加到文件中,并看到它在管道中移動:

> echo Another line>> test.txt

您應該看到這一行出現(xiàn)在控制臺消費者輸出和接收器文件中晴圾。

kafka Streams是一個客戶庫颂砸,用于構(gòu)建關鍵任務實時應用和微服務,其中輸入和/或輸出數(shù)據(jù)存儲在卡夫卡集群中死姚。kafka Streams將在客戶端編寫和部署標準的Java和Scala應用程序的簡單性與卡夫卡的服務器端集群技術的優(yōu)勢結(jié)合起來人乓,使這些應用程序具有高度的可伸縮性、彈性都毒、容錯性色罚、分布式等等。這個快速入門示例將演示如何運行在這個庫中編碼的流式應用程序账劲。

Ecosystem

在主要發(fā)行版之外戳护,有太多的工具與卡夫卡相結(jié)合。生態(tài)系統(tǒng)頁面列出了其中的許多涤垫,包括流處理系統(tǒng)姑尺、Hadoop集成、監(jiān)控和部署工具蝠猬。

?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末切蟋,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子榆芦,更是在濱河造成了極大的恐慌柄粹,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,839評論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件匆绣,死亡現(xiàn)場離奇詭異驻右,居然都是意外死亡,警方通過查閱死者的電腦和手機崎淳,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,543評論 2 382
  • 文/潘曉璐 我一進店門堪夭,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人,你說我怎么就攤上這事森爽『藁恚” “怎么了?”我有些...
    開封第一講書人閱讀 153,116評論 0 344
  • 文/不壞的土叔 我叫張陵爬迟,是天一觀的道長橘蜜。 經(jīng)常有香客問我,道長付呕,這世上最難降的妖魔是什么计福? 我笑而不...
    開封第一講書人閱讀 55,371評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮徽职,結(jié)果婚禮上象颖,老公的妹妹穿的比我還像新娘。我一直安慰自己活箕,他們只是感情好力麸,可當我...
    茶點故事閱讀 64,384評論 5 374
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著育韩,像睡著了一般克蚂。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上筋讨,一...
    開封第一講書人閱讀 49,111評論 1 285
  • 那天埃叭,我揣著相機與錄音,去河邊找鬼悉罕。 笑死赤屋,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的壁袄。 我是一名探鬼主播类早,決...
    沈念sama閱讀 38,416評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼嗜逻!你這毒婦竟也來了涩僻?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,053評論 0 259
  • 序言:老撾萬榮一對情侶失蹤栈顷,失蹤者是張志新(化名)和其女友劉穎逆日,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體萄凤,經(jīng)...
    沈念sama閱讀 43,558評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡室抽,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,007評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了靡努。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片坪圾。...
    茶點故事閱讀 38,117評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡晓折,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出神年,到底是詐尸還是另有隱情已维,我是刑警寧澤,帶...
    沈念sama閱讀 33,756評論 4 324
  • 正文 年R本政府宣布已日,位于F島的核電站,受9級特大地震影響栅屏,放射性物質(zhì)發(fā)生泄漏飘千。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,324評論 3 307
  • 文/蒙蒙 一栈雳、第九天 我趴在偏房一處隱蔽的房頂上張望护奈。 院中可真熱鬧,春花似錦哥纫、人聲如沸霉旗。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,315評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽厌秒。三九已至,卻和暖如春擅憔,著一層夾襖步出監(jiān)牢的瞬間鸵闪,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,539評論 1 262
  • 我被黑心中介騙來泰國打工暑诸, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留蚌讼,地道東北人。 一個月前我還...
    沈念sama閱讀 45,578評論 2 355
  • 正文 我出身青樓个榕,卻偏偏與公主長得像篡石,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子西采,可洞房花燭夜當晚...
    茶點故事閱讀 42,877評論 2 345

推薦閱讀更多精彩內(nèi)容