Apache nifi
開發(fā)指南
版本:V 1.6
日期:2018年6月13日
[if !supportLists]1. [endif]Apache Nifi 概念
[if !supportLists]1.1. [endif]NiFi簡介
? Apache NiFi 是一個易于使用、功能強大而且可靠的數(shù)據(jù)拉取、數(shù)據(jù)處理和分發(fā)系統(tǒng)叮姑,用于自動化管理系統(tǒng)間的數(shù)據(jù)流粮呢。它支持高度可配置的指示圖的數(shù)據(jù)路由喇潘、轉(zhuǎn)換和系統(tǒng)中介邏輯枕赵,支持從多種數(shù)據(jù)源動態(tài)拉取數(shù)據(jù)渴析。NiFi原來是NSA的一個項目晚伙,目前已經(jīng)代碼開源吮龄,是Apache基金會的頂級項目之一。
NiFi是基于Java的咆疗,使用Maven支持包的構(gòu)建管理漓帚。 NiFi基于Web方式工作,后臺在服務器上進行調(diào)度午磁。用戶可以為數(shù)據(jù)處理定義為一個流程尝抖,然后進行處理,后臺具有數(shù)據(jù)處理引擎迅皇、任務調(diào)度等組件昧辽。
[if !supportLists]1.2. [endif]Nifi核心概念
FlowFile:表示通過系統(tǒng)移動的每個對象,包含數(shù)據(jù)流的基本屬性
FlowFile Processor(處理器):負責實際對數(shù)據(jù)流執(zhí)行工作
Connection(連接線):負責不同處理器之間的連接登颓,是數(shù)據(jù)的有界緩沖區(qū)
Flow Controller(流量控制器):管理進程使用的線程及其分配
Process Group(過程組):進程組是一組特定的進程及其連接搅荞,允許組合其他組件創(chuàng)建新組件
[if !supportLists]1.3. [endif]NIFI架構(gòu)
NiFi是基于Java的,NiFi的核心部件在JVM里的位置如下圖所示:
NiFi在主機操作系統(tǒng)上的JVM內(nèi)執(zhí)行挺据。JVM上的NiFi的主要組件如下:
3.1 網(wǎng)絡服務器
Web服務器的目的是托管NiFi的基于HTTP的命令和控制API取具。
3.2 流控制器
流控制器是操作的大腦。它提供用于擴展程序運行的線程扁耐,并管理擴展程序接收資源以執(zhí)行的時間表。
3.3 擴展
有各種類型的NiFi擴展在其他文檔中描述产阱。這里的關(guān)鍵是擴展在JVM中運行和執(zhí)行婉称。
3.4 FlowFile存儲庫
FlowFile存儲庫是NiFi跟蹤目前在流程中活動的給定FlowFile的知識狀態(tài)。存儲庫
實現(xiàn)是可插拔的构蹬。默認方法是位于指定磁盤分區(qū)上的持久寫入前端日志王暗。
3.5 內(nèi)容存儲庫
Content Repository是給定FlowFile的實際內(nèi)容字節(jié)。存儲庫的實現(xiàn)是可插拔的庄敛。默認方法是一個相當簡單的機制俗壹,它將數(shù)據(jù)塊存儲在文件系統(tǒng)中≡蹇荆可以指定多個文件系統(tǒng)存儲位置绷雏,以便獲得不同的物理分區(qū),以減少任何單個卷上的爭用怖亭。
3.6 源頭存儲庫
Provenance Repository是存儲所有來源的事件數(shù)據(jù)的地方涎显。存儲庫構(gòu)造是可插入的,默認實現(xiàn)是使用一個或多個物理磁盤卷兴猩。在每個位置內(nèi)期吓,事件數(shù)據(jù)被索引和可搜索。
3.7 作為功能強大的數(shù)據(jù)處理和分發(fā)組件倾芝,NiFi自然原生支持集群部署方式(推薦部署方式)讨勤。NiFi集群部署模式如下圖:
集群模式下箭跳,NiFi集群中的每個節(jié)點對數(shù)據(jù)執(zhí)行相同的任務,但是每個節(jié)點都在不同的數(shù)據(jù)集上進行操作潭千。和大部分大數(shù)據(jù)組件一樣衅码,NiFi集群使用Apache ZooKeeper提供協(xié)調(diào)服務。 Apache ZooKeeper選擇一個NiFi節(jié)點作為集群協(xié)調(diào)器脊岳,故障轉(zhuǎn)移由ZooKeeper自動處理逝段。 所有集群節(jié)點向集群協(xié)調(diào)器報告心跳和狀態(tài)信息。集群協(xié)調(diào)器負責節(jié)點的斷開和連接割捅。 此外奶躯,ZooKeeper會為每個集群選舉一個節(jié)點作為集群主節(jié)點。 作為DataFlow管理器亿驾,您可以通過任何節(jié)點的用戶界面(UI)與NiFi集群進行交互嘹黔。您所做的任何更改都會同步到集群中的所有節(jié)點,從而允許多個入口點莫瞬。
[if !supportLists]2. [endif]NiFi的搭建
[if !supportLists]2.1. [endif]單機開發(fā)環(huán)境搭建
1運行環(huán)境準備儡蔓。
Apache nifi即可運行在Windows平臺,也可運行在Linux平臺疼邀,需要安裝jdk(nifi 1.x以上需要jdk8以上喂江,0.x需jdk7以上)和maven(至少3.1.0以上版本)。
2下載
NIFI下載地址:http://nifi.apache.org/download.html
下載當前版本的NiFi二進制工程旁振,目前最新的版本為1.6.0获询。
3支持瀏覽器:
· Internet Explorer 9+ (see note below)
· Mozilla FireFox 24+
· Google Chrome 36+
· Safari 8
4修改配置文件。
由于NIFI默認端口為8080拐袜,所以需要檢查一下8080端口是否被占用吉嚣,如果被占用可以使用別的未被占用的端口,如9090,9091等蹬铺。
檢查端口是否被占用命令:netstat -ano|findstr "8080"
NIFI配置文件:/usr/local/conf/nifi.properties,配置ip(134行nifi.web.http.host)和端口(135行:nifi.web.http.port)?
5啟動服務尝哆。
在linux平臺,啟動服務使用命令({NIFI ROOT})/bin/nifi.sh start;
在window平臺使用命令{NIFI ROOT})\bin\run-nifi.bat甜攀。
(雙擊啟動文件:({NIFI ROOT})\bin\run-nifi.bat)
6驗證測試
啟動服務后過大概3到5分鐘秋泄,在瀏覽器中輸入:http://localhost/nifi? 或者:http://localhost:8080/nifi,即可開始使用了赴邻。
7基本命令
啟動:./nifi.sh start
關(guān)閉:./nifi.sh stop
重啟:./nifi.sh restart
狀態(tài):./nifi.sh status
報表:./nifi-app.log
8印衔、NiFi的操作
(1)UI界面介紹
· 工具欄這里主要是構(gòu)造數(shù)據(jù)流操作的主要面板。
添加模塊(processor) nifi內(nèi)部會提供各個處理模塊姥敛,當我們在進行數(shù)據(jù)處理的過程中奸焙,可以選擇不同的模塊并調(diào)整變量進行拼裝,從而組合成一個完整的數(shù)據(jù)流處理的組。
? 添加數(shù)據(jù)流傳入點(input-port)雖說是數(shù)據(jù)流輸入點与帆,但是并不是整體數(shù)據(jù)流的起點了赌。它是作為組與組之間的數(shù)據(jù)流連接的傳入點與輸出點。
添加數(shù)據(jù)流輸出點(output-port) 同理上面的輸入點玄糟。它是作為組與組之間的數(shù)據(jù)流連接的傳入點與輸出點勿她。
添加組(process-group)組相當于系統(tǒng)中的文件夾,作用就是使數(shù)據(jù)流的各個部分看起來更工整阵翎,思路更清晰逢并,不至于從頭到尾一條線閱讀起來十分不方便。
添加遠端的組(remote process-group)根據(jù)彈出框進行信息配置郭卫,可加入遠程的組砍聊。
拉取已有的文件(template)每當做好一個完整的數(shù)據(jù)流后,可存儲到本地為xml文件贰军,nifi支持本地的template上傳玻蝌,這個按鈕就是在上傳本地template之后,選擇上傳過的一個獲取到操作畫布上词疼。
添加便簽(label)相當于便簽俯树,可放置在畫布空白處,寫上備注信息贰盗。
· Navigate這一部分是對區(qū)域一這個畫布的縮小預覽莲蜘,點擊放大縮小可調(diào)整視野儒飒,藍框區(qū)域就是畫布當前的界面叉信,可用鼠標在這部分進行移動從而調(diào)整畫布的視野栓始。
· 操作欄
開始運行選中模塊并點擊運行按鈕,開始進行對數(shù)據(jù)流的處理书释。
停止運行選中模塊并點擊停止按鈕,則停止了進行對數(shù)據(jù)流的處理赊窥。
保存template選擇你要保存的一個template爆惧,點擊這個保存按鈕,可把這個template保存到
nifi系統(tǒng)里(并不是電腦本地锨能,如果想保存到電腦本地扯再,可點擊右上角
這個按鈕,選擇Template址遇,彈出的頁面上有下載選項)熄阻。
上傳template可上傳本地的template(xml文件)到nifi系統(tǒng)里。
(2)模板
創(chuàng)建模板:在要創(chuàng)建模板的group中點擊模板左側(cè)的create template或者鼠標右鍵空白處倔约。
下載模板:
使用模板:選擇界面上方的template拖放至畫布秃殉,選擇要使用的模板。
NiFi的模板會保存組中的處理器配置及controller servres。例如數(shù)據(jù)庫連接钾军,但是不會保存密碼鳄袍。
[if !supportLists](3)[endif]Processor
添加處理器:
點擊add
將處理器拖到畫布上后,可以通過右鍵單擊處理器并從上下文菜單中選擇一個選項來與其進行交互吏恭。根據(jù)分配給您的權(quán)限拗小,上下文菜單中可用的選項會有所不同。
雖然上下文菜單中的選項有所不同樱哼,但是當您具有使用處理器的完全權(quán)限時哀九,通常可以使用以下選項:
· Configure(配置):此選項允許用戶建立或更改處理器的配置搅幅。
· Start(啟動或停止):此選項允許用戶啟動或停止處理器; 該選項可以是Start或Stop阅束,具體取決于處理器的當前狀態(tài)。
· Disable(啟用或禁用):此選項允許用戶啟用或啟用處理器; 該選項將為“啟用”或“禁用”盏筐,具體取決于處理器的當前狀態(tài)围俘。
· View data provenance(查看數(shù)據(jù)來源):此選項顯示NiFi數(shù)據(jù)來源表,其中包含有關(guān)通過該處理器路由的FlowFiles的數(shù)據(jù)來源事件的信息琢融。
· View status history(查看狀態(tài)歷史記錄):此選項打開處理器統(tǒng)計信息隨時間的圖形表示界牡。
· View usage(查看用法):此選項將用戶帶到處理器的使用文檔。
· View connection → Upstream(查看連接→上游):此選項允許用戶查看和“跳轉(zhuǎn)”
入處理器的上游連接漾抬。當處理器連接進出其他進程組時宿亡,這尤其有用。
· View connection → Downstream(查看連接→下游):此選項允許用戶查看和“跳轉(zhuǎn)”
到處理器外的下游連接纳令。當處理器連接進出其他進程組時挽荠,這尤其有用。
· Centere in view(視圖中心):此選項將畫布的視圖置于給定的處理器上平绩。
· Change color(更改顏色):此選項允許用戶更改處理器的顏色圈匆,這可以使大流量的可視化管理更容易。
· Create template(創(chuàng)建模板):此選項允許用戶從所選處理器創(chuàng)建模板捏雌。
· Copy(復制):此選項將所選處理器的副本放在剪貼板上跃赚,以便可以通過右鍵單擊畫布并選擇“粘貼”將其粘貼到畫布上的其他位置。復制/粘貼操作也可以使用按鍵Ctrl-C(Command-C)和Ctrl-V(Command-V)完成性湿。
· Delete(刪除):此選項允許從畫布中刪除處理器纬傲。
(4)配置processor屬性
要配置處理器,請右鍵單擊處理器肤频,然后Configure從上下文菜單中選擇該選項叹括。或者宵荒,只需雙擊處理器即可汁雷。
· 設置選項卡
“處理器配置”對話框中的第一個選項卡是“設置”選項卡
Name:Processor名稱净嘀,默認與處理器類型相同,可以更改摔竿。處理器名稱旁邊是一個復選框面粮,指示處理器是否已啟用。
Id:Processor唯一標識符以及Processor的類型和NAR包继低,無法修改熬苍。
Type:Processor類型,無法更改袁翁。
Bundle:Processor 的NAR包柴底,無法更改。
Penalty Duration(懲罰持續(xù)時間):在處理一段數(shù)據(jù)(FlowFile)的正常過程期間粱胜,可能發(fā)生事件柄驻,該事件指示此時不但是數(shù)據(jù)可以在稍后的時間處理。
Yield Duration::處理器可以確定存在某種情況焙压,使得處理器不再能夠進行任何進展鸿脓,而不管其正在處理的數(shù)據(jù),這將阻止處理器被安排運行一段時間涯曲。
Bulletin Level(公告):每當處理器寫入其日志時野哭,處理器也將生成公告。此設置指示應在用戶界面中顯示的最低級別的公告幻件。默認情況下拨黔,公告級別設置為WARN,這意味著它將顯示所有警告和錯誤級別公告绰沥。
Automatically Terminate Relationships(自動終止關(guān)系):為了使處理器被視為有效且能夠運行篱蝇,處理器定義的每個關(guān)系必須連接到下游組件或自動終止。
· 調(diào)度選項卡
“處理器配置”對話框中的第二個選項卡是“計劃”選項卡:
NiFi支持三種調(diào)度策略徽曲,包括Time Driven(時間驅(qū)動)零截、CRON Driven(CRON驅(qū)動)和Event Driven(事件驅(qū)動,非可選):
Time Driven:這是默認模式秃臣。處理器將安排定期運行瞻润。處理器運行的時間間隔由“運行計劃”選項定義。
Event Driven:當選擇此模式時甜刻,處理器將被觸發(fā)以事件運行,并且當FlowFiles輸入連接到此處理器的連接時會發(fā)生該事件正勒。此模式目前被認為是實驗性的得院,并且不受所有處理器的支持。選擇此模式時章贞,“運行計劃”選項不可配置祥绞,因為處理器不會觸發(fā)為定期運行,而是作為事件的結(jié)果。此外蜕径,這是“并行任務”選項可以設置為0的唯一模式两踏。在這種情況下,線程數(shù)量僅受管理員配置的事件驅(qū)動線程池大小的限制兜喻。
CRON驅(qū)動:當使用CRON驅(qū)動的調(diào)度模式時梦染,處理器被安排定期運行,類似于定時器驅(qū)動的調(diào)度模式朴皆。然而帕识,CRON驅(qū)動模式提供了更大的靈活性,但增加了配置的復雜性遂铡。CRON驅(qū)動的調(diào)度值是由六個必填字段和一個可選字段組成的字符串肮疗,每個字段由一個空格分隔。
CRON的各參數(shù)含義分別代表:秒扒接、分伪货、時、日钾怔、月碱呼、周、年蒂教,需要配合*巍举、?和L共同執(zhí)行(*代表字段的值都有效凝垛;?代表對于指定的字段不指定值懊悯;L代表長整形)。如:“0 0 13 * * ?”代表想要在每天下午1點進行調(diào)度執(zhí)行梦皮。根據(jù)業(yè)務需求進行參數(shù)的調(diào)度配置炭分。
詳情請參閱Quartz文檔中的Chron Trigger教程。
http://www.quartz-scheduler.org/documentation/quartz-2.x/tutorials/crontrigger.html
· 屬性選項卡
Properties選項卡提供了一種配置特定于Processor的行為的機制剑肯。
(5)連接processor
一旦處理器和其他組件被添加到畫布中并進行配置捧毛,下一步就是將它們彼此連接起來,以便NiFi知道在處理完每個FlowFile后如何處理让网。這是通過在每個組件之間創(chuàng)建一個連接來完成的呀忧。用戶將連接氣泡從一個組件拖動到另一個組件,直到第二個組件被突出顯示溃睹。當用戶釋放鼠標時而账,會出現(xiàn)一個“創(chuàng)建連接”對話框。必須至少選擇一個關(guān)系因篇。如果只有一個關(guān)系可用泞辐,則會自動選擇它笔横。
設置
“設置”選項卡提供配置連接名稱,F(xiàn)lowFile到期咐吼,背壓閾值和優(yōu)先級的功能:
· FlowFlie Expiration
通過FlowFile到期可以自動從流中刪除無法及時處理的數(shù)據(jù)吹缔。比如說,如果給定連接上的文件到期時間設置為“1小時”锯茄,并且已經(jīng)在NiFi實例中一小時的文件到達該連接厢塘,則該文件將過期。默認值為0 sec表示數(shù)據(jù)永不過期撇吞。當設置了“0秒”以外的文件到期時俗冻,連接標簽上會出現(xiàn)一個小時鐘圖標,因此當查看畫布上的流時牍颈,DFM可以一目了然地看到它迄薄。
· Back Pressure
NiFi為背壓提供兩種配置元素。這允許系統(tǒng)避免數(shù)據(jù)溢出煮岁。
Back pressure object threshold(背壓對象閾值):在應用背壓之前可以在隊列中的FlowFiles的數(shù)量讥蔽。
Back pressure data size threshold(背壓數(shù)據(jù)大小閾值):指定了在應用反壓之前應排隊的最大數(shù)據(jù)量(大小)画机。
啟用背壓時冶伞,連接標簽上會出現(xiàn)小進度條,因此在查看畫布上的流時步氏,DFM可以一目了然地看到它响禽。進度條根據(jù)隊列百分比更改顏色:綠色(0-60%),黃色(61-85%)和紅色(86-100%)荚醒。
將鼠標懸停在條形圖上會顯示確切的百分比芋类。
隊列完全填滿后,Connection將以紅色突出顯示界阁。
· 優(yōu)先級
選項卡的右側(cè)提供了對隊列中數(shù)據(jù)進行優(yōu)先級排序的功能侯繁,以便首先處理更高優(yōu)先級的數(shù)據(jù)。優(yōu)先級可以從頂部('可用的優(yōu)先級排序器')拖動到底部('選擇優(yōu)先級排序器')泡躯。
可以選擇多個優(yōu)先級排序器贮竟。位于“所選優(yōu)先級”列表頂部的優(yōu)先級排序是最高優(yōu)先級。如果兩個FlowFiles根據(jù)此優(yōu)先級排序器具有相同的值较剃,則第二個優(yōu)先級排序器將確定首先處理哪個FlowFile咕别,依此類推。如果不再需要優(yōu)先級排序器写穴,則可以將其從“選定的優(yōu)先級排序器”列表拖動到“可用的優(yōu)先級排序器”列表顷级。
可以使用以下優(yōu)先順序:
FirstInFirstOutPrioritizer:給定兩個FlowFiles,首先處理首先到達連接的FlowFiles确垫。
NewestFlowFileFirstPrioritizer:給定兩個FlowFiles弓颈,將首先處理數(shù)據(jù)流中最新的FlowFiles。
OldestFlowFileFirstPrioritizer:給定兩個FlowFiles删掀,將首先處理數(shù)據(jù)流中最舊的FlowFiles翔冀。
PriorityAttributePrioritizer:給定兩個都具有“priority”屬性的FlowFile,將首先處理具有最高優(yōu)先級值的FlowFiles披泪。請注意纤子,應該使用UpdateAttribute處理器將“priority”屬性添加到FlowFiles,然后才能到達具有此優(yōu)先級設置的連接款票】嘏穑“優(yōu)先級”屬性的值可以是字母數(shù)字,其中“a”是比“z”更高的優(yōu)先級艾少,“1”是比“9”更高的優(yōu)先級卡乾。
(6)處理器驗證
在嘗試啟動處理器之前,確保處理器的配置有效非常重要缚够。狀態(tài)指示器顯示在處理器的左上角幔妨。如果處理器無效,指示器將顯示黃色警告指示器谍椅,并帶有感嘆號误堡,表示存在問題:
在這種情況下,使用鼠標懸停在指示器圖標上將提供工具提示雏吭,顯示處理器的所有驗證錯誤锁施。一旦解決了所有驗證錯誤,狀態(tài)指示器將變?yōu)镾top圖標杖们,表示處理器有效并準備啟動但當前未運行:
(7)啟動processor
為了啟動組件悉抵,必須滿足以下條件:
· 組件的配置必須有效
· 所有為組件定義的關(guān)系必須連接到另一個組件或自動終止
· 組件必須停止
· 該組件必須沒有活動任務
可以通過右鍵單擊一個組件并從上下文菜單中選擇Start來啟動組件。
如果啟動進程組胀莹,則該進程組中的所有組件(包括子進程組)都將啟動基跑,但那些無效或禁用的組件除外。
一旦啟動描焰,處理器的狀態(tài)指示器將變?yōu)椴シ欧枴?/p>
[if !supportLists]2.2. [endif]集群環(huán)境搭建
從NiFi 1.0版本開始媳否,NiFi采用Zero-Master聚類范例。NiFi集群中的每個節(jié)點都對數(shù)據(jù)執(zhí)行相同的任務荆秦,但每個節(jié)點都運行在不同的數(shù)據(jù)集上篱竭。Apache ZooKeeper選擇其中一個節(jié)點作為集群協(xié)調(diào)器,故障轉(zhuǎn)移由ZooKeeper自動處理步绸。所有群集節(jié)點都會向群集協(xié)調(diào)器報告心跳和狀態(tài)信息掺逼。群集協(xié)調(diào)器負責斷開和連接節(jié)點。作為DataFlow管理器瓤介,您可以通過群集中任何節(jié)點的UI與NiFi群集進行交互吕喘。您所做的任何更改都會復制到群集中的所有節(jié)點赘那,從而允許多個入口點進入群集。
2.1核心模塊: NiFi Cluster Coordinator(集群協(xié)調(diào)器):集群中節(jié)點氯质,負責控制任務和管理節(jié)點有負載均衡的功能募舟。節(jié)點:負責實際的數(shù)據(jù)處理主節(jié)點:有zookeeper自動選擇,此節(jié)點上運行隔離處理器Isolated Processors(隔離處理器):不希望在每個節(jié)點上運行的任務闻察。獨立運行拱礁。Heartbeats(心跳):傳達節(jié)點的運行狀態(tài)。與集群協(xié)調(diào)器通信特點:采用零主集群范例辕漂。每個節(jié)點對數(shù)據(jù)執(zhí)行相同的任務呢灶,但每個節(jié)點對不同的數(shù)據(jù)集進行操作
2.2搭建集群
以一臺電腦,兩臺虛擬機(最小的Centos 7)為例钉嘹,在三個實例上部署二進制文件并解壓縮⊙炷耍現(xiàn)在每個節(jié)點上都有一個NiFi目錄。
首先要在配置文件“./conf/zookeep.properties”中配置ZK(ZooKeeper)實例的列表:
server.1=node-1:2888:3888
server.2=node-2:2888:3888
server.3=node-3:2888:3888
2.3配置myid
如果多個NiFi節(jié)點正在運行嵌入式ZK隧期,則告訴服務器哪一個是重要的飒责。
在nifi目錄下創(chuàng)建文件夾/state/zookeeper/并創(chuàng)建文件myid,文件內(nèi)容與第二步中的server.id一致仆潮。
2.4配置state-management.xml:
<property name="Connect String">
node-1:2888,node-2:2888,node-3:2888
</property>
2.5配置nifi節(jié)點屬性
目錄:conf/nifi.properties
指定NiFi必須運行嵌入式ZK實例宏蛉,并具有以下屬性:
nifi.state.management.embedded.zooker.start =true
使用內(nèi)置zookeeper:nifi.zookeeper.connect.string=node-1:2181,node-2:2181,node-3:2181
下面需每個節(jié)點單獨配,根據(jù)節(jié)點的IP相應配置性置,保持集群中節(jié)點使用的端口一致
nifi.cluster.is.node=true
nifi.cluster.node.address=node-1
nifi.cluster.node.protocol.port=9999
nifi.remote.input.host=node-1
nifi.remote.input.secure=false
nifi.remote.input.socket.port=9998
nifi.web.http.host =node-1
配置完成后就可以此啟用節(jié)點拾并,集群將選取產(chǎn)生主節(jié)點。
2.6測試集群
正如在左上角看到的鹏浅,集群中有3個節(jié)點嗅义。此外,如果我們進入菜單(右上角的按鈕)并選擇群集頁面隐砸,將會出現(xiàn)三個節(jié)點的詳細信息:
node-2已被選為集群協(xié)調(diào)器之碗,而node-3則是主節(jié)點。這種區(qū)別很重要季希,因為某些處理器必須運行在一個唯一的節(jié)點上(為了數(shù)據(jù)一致性)褪那,在這種情況下,我們希望它運行在“主節(jié)點上”式塌。
我們可以在特定節(jié)點上顯示細節(jié)(左側(cè)的“信息”圖標):
[if !supportLists]3. [endif]典型技術(shù)場景
[if !supportLists]3.1. [endif]GetFile To PutFile
[if !supportLists]1. [endif]整體流程圖
涉及到的處理器以及功能
- GetFile:從指定的路徑中讀取文件?
- PutFile:移動文件到指定位置
[if !supportLists]2. [endif]細節(jié)說明:
[if !supportLists](1)[endif]GetFile:讀取文件
· Input Directory:從中提取文件的輸入目錄
· File Filter:僅拾取名稱與給定正則表達式匹配的文件
[if !supportLists](2)[endif]PutFile:存放文件
· Directory:文件存放目錄
[if !supportLists]3.2. [endif]從csv到mysql
[if !supportLists]3. [endif]整體流程圖
涉及到的處理器以及功能· GetFile:從指定的路徑中讀取文件? · ConvertRecord:通過指定Reader和Writer的類型博敬,完成文件格式轉(zhuǎn)換
·? Splitjson:將JSON文件拆分為多個獨立的FlowFiles
· ConvertJSONToSQL:將json中的元素轉(zhuǎn)化為sql中的insert語句· PutSQL:執(zhí)行SQL UPDATE或INSERT命令
[if !supportLists]4. [endif]細節(jié)說明:
[if !supportLists](3)[endif]GetFile:讀取文件
設置循環(huán)時間為1 days ,防止數(shù)據(jù)重復插入
· Input Directory:輸入目錄峰尝,從中提取文件的輸入目錄
· File Filter:文件過濾器偏窝,只有名稱與給定正則表達式匹配的文件才會被拾取
· Keep Source File:默認情況下,會將源文件刪除
(2)ConvertRecord:轉(zhuǎn)換文件格式
首先添加一個Record Reader和Record Writer,對于Record Reader祭往,我們選擇的是CSVReader伦意,因為我們讀取的文件是CSV格式,這個需要根據(jù)讀取文件的格式選擇链沼。對于RecordWriter默赂,我們選擇的是JsonRecordSetWriter。
· Record Reader:CSVReader(根據(jù)所要讀入數(shù)據(jù)的格式進行設定),點擊右側(cè)的箭頭括勺,? ? 對CSVReader的屬性進行設定
· Schema Access Strategy:這里我們選擇通過Schema Test來找到對應的schema· Schema Registry:需要選擇Scheme Registry的類型,這里選擇的是AvroSchemaRegis? try曲掰,右側(cè)又出現(xiàn)一個小箭頭疾捍,需要對AvroSchemaRegistry進行設置。
· Record Write進行類似的設置即可栏妖。
· 啟動控制器
(3)SplitJson :將JSON文件拆分為由JsonPath表達式指定的數(shù)組元素的多個獨立的
FlowFiles
然后從ConvertAvroToJson拖一條線到SplitJson乱豆,關(guān)系為success。
· JsonPathExpression:一個JsonPath表達式吊趾,用于指示要拆分為JSON 的數(shù)組元素
(4)ConvertJSONToSQL處理器:將JSON格式的FlowFile轉(zhuǎn)換為SQL語句
【注意】該處理器有一個特性宛裕,只能處理flat json,所謂flat是由一個JSON元素組成论泛,每個字段映射到一個簡單類型
· JDBC Connection Pool:根據(jù)要連接的數(shù)據(jù)庫類型選擇揩尸,我要連接的是mysql數(shù)據(jù)庫,因此選擇DBCPConnectionPool
· Statement Type:設置要執(zhí)行的操作屁奏,INSERT和UPDATE等岩榆,這里要執(zhí)行的是插入操作
· Table Name:語句應更新的表的名稱
· Translate Field Name: 如果json中元素的屬性名稱與數(shù)據(jù)表中的列名稱一致,則選擇false坟瓢,否則選擇true
· JDBC Connection Pool的屬性后面有一個小箭頭勇边,點擊箭頭對此項進行設置:
實際上這個java連接數(shù)據(jù)的設置是一致的, · Database Driver Class Name: 根據(jù)要連接的數(shù)據(jù)庫類型選擇
jdbc:mysql://localhost:3306/test
【注意】數(shù)據(jù)庫和系統(tǒng)時區(qū)差異問題折联,在jdbc連接的url后面加上serverTimezone=GMT即可解決問題粒褒,如果需要使用gmt+8時區(qū),需要寫成GMT%2B8诚镰,否則會被解析為空奕坟。再一個解決辦法就是使用低版本的MySQL jdbc驅(qū)動,5.1.28不會存在時區(qū)的問題怕享≈瓷模· Database Driver location:選擇對應數(shù)據(jù)庫連接jar包的完整路徑
【注意】Jar包版本要與MySQL版本相匹配
D:\Java\maven-3.5.3\.m2\repository\mysql\mysql-connector-java\5.1.6
· Database User:登錄數(shù)據(jù)庫的用戶名 · Password:用戶名對應的密碼
(5)PutSQL處理器:這里只設定了 JDBC Connection Pool
[if !supportLists]3.3. [endif]MySQL To Oracle
1. 整體流程圖
涉及到的處理器以及功能 - ExecuteSQL:執(zhí)行提供的SQL選擇查詢,查詢結(jié)果將轉(zhuǎn)換為Avro格式
- ConvertAvroToJson:將avro格式的數(shù)據(jù)轉(zhuǎn)化成json格式
-? Splitjson:將JSON文件拆分為多個獨立的FlowFiles
- ConvertJSONToSQL:將json中的元素轉(zhuǎn)化為sql中的insert語句
- PutSQL:執(zhí)行SQL UPDATE或INSERT命令
2. 細節(jié)說明
[if !supportLists](1)[endif]ExecuteSQL:
· 設置SQL select query為 select * from user
· service-->DBCPConnectionPool函筋,然后再點擊右側(cè)的箭頭沙合,配置下一個選項
·Database Connection URL:jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=true
· Database Driver Class Name: com.mysql.jdbc.Driver
· Database Driver location:/home/xxxx/mysql-connector-java-5.1.39.jar
· Database User:登錄數(shù)據(jù)庫的用戶名
· Password:用戶名對應的密碼
(2)ConvertAvroToJson:將二進制Avro記錄轉(zhuǎn)換為JSON對象然后,從ExecuteSQL拖一條線到ConvertAvroToJson跌帐,關(guān)系為success首懈。
[if !supportLists](2)[endif]SplitJson :將JSON文件拆分為由JsonPath表達式指定的數(shù)組元素的多個獨立的
FlowFiles
然后從ConvertAvroToJson拖一條線到SplitJson绊率,關(guān)系為success。
· JsonPathExpression:一個JsonPath表達式究履,用于指示要拆分為JSON 的數(shù)組元素
[if !supportLists](3)[endif]添加一個ConvertJSONToSQL到界面滤否,然后配置
(5)PutSQL
[if !supportLists]3.4. [endif]執(zhí)行javaScript腳本
ExecuteScript是一個多功能處理器,允許用戶使用編程語言編寫自定義邏輯最仑,每次觸發(fā)
ExecuteScript處理器時都會執(zhí)行該編程語言藐俺。
以下變量綁定被提供給腳本以允許訪問NiFi組件:
· session(會話):這是對分配給處理器的ProcessSession的引用。會話允許您對流文件(如create()泥彤,putAttribute()和transfer()以及read()和write()()進行操作欲芹。
· context(上下文):這是對處理器的ProcessContext的引用。它可以用來檢索處理器屬性吟吝,關(guān)系菱父,Controller服務和StateManager。
· log:這是對處理器ComponentLog的引用剑逃。用它來記錄消息給NiFi浙宜,比如log.info('Hello world!')
· REL_SUCCESS:這是對處理器定義的“成功”關(guān)系的引用蛹磺。它也可以通過引用父類(ExecuteScript)的靜態(tài)成員來繼承粟瞬,但是一些引擎(如Lua)不允許引用靜態(tài)成員,所以這是一個方便的變量称开。這也節(jié)省了必須使用關(guān)系的完全合格的名稱亩钟。
· REL_FAILURE:這是對處理器定義的“失敗”關(guān)系的引用。和REL_SUCCESS一樣鳖轰,它也可以通過引用父類(ExecuteScript)的靜態(tài)成員來繼承清酥,但是一些引擎(如Lua)不允許引用靜態(tài)成員,所以這是一個方便的變量蕴侣。這也節(jié)省了必須使用關(guān)系的完全合格的名稱焰轻。
· Dynamic Properties : 在ExecuteScript中定義的任何動態(tài)屬性都將作為設置為與動態(tài)屬性對應的PropertyValue對象的變量傳遞給腳本引擎。這允許您獲取屬性的String值昆雀,還可以針對NiFi表達式語言評估該屬性辱志,將該值作為適當?shù)臄?shù)據(jù)類型(例如布爾值)等進行轉(zhuǎn)換。由于動態(tài)屬性名稱會成為腳本的變量名稱狞膘,您必須知道所選腳本引擎的變量命名屬性揩懒。例如,Groovy不允許在變量名稱中使用句點(挽封。)已球,因此如果“my.property”是一個動態(tài)屬性名稱,則會發(fā)生錯誤。
· Script Engine:腳本引擎選擇ECMAScript中
· Script File:腳本文件
· Script Body:腳本內(nèi)容
(1)從會話中獲取傳入的流文件
方法:使用會話對象中的get()方法智亮。
var flowFile = session.get();
if (flowFile != null) {
// All processing code goes here
}
[if !supportLists](4)[endif]從會話中獲取多個傳入的流文件
方法:使用會話對象中的get(maxResults)方法忆某。
flowFileList = session.get(100) ;
if(!flowFileList.isEmpty()) {
for each (var flowFile in flowFileList) {
// Process each FlowFile here
}
}
[if !supportLists](5)[endif]從父級FlowFile創(chuàng)建一個新的FlowFile
方法:使用會話對象的create(parentFlowFile)方法。
var flowFile = session.get();
if (flowFile != null) {
var newFlowFile = session.create(flowFile);
// Additional processing here
}
[if !supportLists](6)[endif]為流文件添加一個屬性
方法:使用會話對象中的putAttribute(flowFile阔蛉,attributeKey弃舒,attributeValue)
方法。
var flowFile = session.get();
if (flowFile != null) {
flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')
}
(5)將多個屬性添加到流文件
方法:使用會話對象中的putAllAttributes(flowFile状原,attributeMap)方法聋呢。
var number2 = 2;
var attrMap = {'myAttr1':'1', 'myAttr2': number2.toString()}
var flowFile = session.get()
if (flowFile != null) {
flowFile = session.putAllAttributes(flowFile, attrMap)
}
(6)從流文件中獲取屬性
方法:使用FlowFile對象的getAttribute(attributeKey)方法。
var flowFile = session.get();
if (flowFile != null) {
var myAttr = flowFile.getAttribute('filename')
}
(7)從流文件獲取所有屬性
方法:使用FlowFile對象的getAttributes()方法颠区。
var flowFile = session.get() if (flowFile != null) {
var attrs = flowFile.getAttributes();
for each (var attrKey in attrs.keySet()) {
// Do something with attrKey (the key) and/or attrs[attrKey] (the value)
}
}
(8)將流文件轉(zhuǎn)移到關(guān)系
方法:使用會話對象的transfer(flowFile坝冕,relationship)方法。
var flowFile = session.get();
if (flowFile != null) {
// All processing code goes here
if(errorOccurred) {
session.transfer(flowFile, REL_FAILURE)
} else {
session.transfer(flowFile, REL_SUCCESS)
}
}
(9)以指定的日志記錄級別向日志發(fā)送消息
方法:使用帶有warn()瓦呼,trace()栓辜,debug()箱熬,info()或error()方法的log變量。
var ObjectArrayType = Java.type("java.lang.Object[]");
var objArray = new ObjectArrayType(3);
objArray[0] = 'Hello';
objArray[1] = 1;
objArray[2] = true;
log.info('Found these things: {} {} {}', objArray)
[if !supportLists]3.5. [endif]Hive To Elasticsearch
1. 整體流程圖
涉及到的處理器以及功能- SelectHiveQL:從Hive庫中查取數(shù)據(jù)
- ConvertAvroToJson:將查出來的數(shù)據(jù)轉(zhuǎn)換為Json格式
-? Splitjson:將Json文件拆分為多個獨立的FlowFiles
- PutElasticsearchHttp:將數(shù)據(jù)插入到ES庫中
2. 細節(jié)說明
(1)SelectHiveQL:讀取文件
· HiveQl Select Query :查詢語句
· Hive Database Connection Pooling Service : 點擊箭頭配置Hive庫連接
· Database Connection URL :
jdbc:hive2://192.168.51.103:24002/sg_udm;serviceDiscoveryMode=zooKeeper;
zooKeeperNamespace=hiveserver2
· Database User : 用戶名
· Password :密碼
(2)ConvertAvroToJson? 默認設置
(3)Splitjson默認設置
(4)PutElasticsearchHttp
· Elasticsearch URL : 設置ES的ip:端口
注意:此處是http協(xié)議
· Index :設置ES庫的Index
· Type :設置ES庫的Type
【查詢】http://192.168.6.244:9200/index1/_search?pretty
[if !supportLists]3.6. [endif]Elasticsearch To MySQL
1. 整體流程圖
涉及到的處理器以及功能- InvokeHTTP:從ES庫中查取數(shù)據(jù)
-? Splitjson:將Json文件拆分為多個獨立的FlowFiles
- ConvertJSONToSQL:將Json格式的數(shù)據(jù)轉(zhuǎn)換為SQL語句
- PutSQL:將數(shù)據(jù)插入到MySQL數(shù)據(jù)庫中
2.細節(jié)說明:
[if !supportLists](1)[endif]InvokeHTTP:查詢數(shù)據(jù)
· HTTP Method :采用GET請求方式
· Remote URL : ES查詢 Rest API
(2)Splitjson:默認設置
(3)ConvertJSONToSQL:
· JDBC Connection Pool :配置MySQL數(shù)據(jù)庫連接
· Statement Type :執(zhí)行INSERT 方式
· Table Name : 數(shù)據(jù)庫的表名
[if !supportLists](4)[endif]PutSQL
· JDBC Connection Pool :配置MySQL數(shù)據(jù)庫連接
[if !supportLists]3.7. [endif]hbase To Kafka
[if !supportLists]1. [endif]? 整體流程圖
涉及到的處理器以及功能 - GetHBase:為HBase查詢指定表中的任何記錄
- PutKafka:將FlowFile的內(nèi)容作為消息發(fā)送到Apache Kafka
[if !supportLists]2. [endif] 細節(jié)說明
(1)GetHBase:查詢hbase中的數(shù)據(jù)
· Table Name是所要查詢hbase中的表名
· HBase Client Service用于連接hbase约谈,需要創(chuàng)建一個連接
創(chuàng)建hbase的連接配置如下
· ZooKeeper Quorum是zookeeper的ip地址列表
· ZooKeeper Client Port是zookeeper的端口號
· ZooKeeper ZNode Parent是hbase在zookeeper中的節(jié)點目錄
[if !supportLists](2)[endif]PutKafka:將數(shù)據(jù)發(fā)布到kafka
· Known Brokers是連接kafka的ip與端口质和;
· Topic Name是發(fā)布到kafka上的topic名稱。
[if !supportLists]3.8. [endif]Hive To Kafka
[if !supportLists]1. [endif]整體流程圖
涉及到的處理器以及功能 - SelectHiveQL:為HBase查詢指定表中的任何記錄
- ConvertAvroToJson:將查詢出來的數(shù)據(jù)轉(zhuǎn)換成Json格式
- PutKafka:將FlowFile的內(nèi)容作為消息發(fā)送到Apache Kafka
2. 細節(jié)說明
(1)SelectHiveQL:查詢hive中的數(shù)據(jù)
· HiveQL Select Query :查詢hive中數(shù)據(jù)的查詢語句稚字。
· Hive Database Connection Pooling Service:用于連接hive饲宿,需要創(chuàng)建一個連接
· Database Connection URL是連接hive的url
· Database User是連接hive的用戶名
· Password是連接hive的密碼
配置完成后需要啟動連接胆描,
[if !supportLists](2)[endif]ConvertAvroToJSON
(3)PutKafka:將數(shù)據(jù)發(fā)布到kafka
· Known Brokers:連接kafka的ip與端口
· Topic Name是發(fā)布到kafka上的topic名稱瘫想。
[if !supportLists]4. [endif]組件擴展開發(fā)
[if !supportLists]4.1. [endif]開始
Nifi有很多可用的、文檔化的Processor資源昌讲,但是某些時候你依然需要去開發(fā)屬于你自己的Processor国夜,例如從某些特殊的數(shù)據(jù)庫中提取數(shù)據(jù)、提取不常見的文件格式短绸,或者其他特殊情況车吹。
[if !supportLists]4.2. [endif]項目依賴
本文以Eclipse開發(fā)為例,創(chuàng)建了一個基礎(chǔ)的json文件讀取Processor醋闭,將內(nèi)容轉(zhuǎn)化為屬性值窄驹。
(1)安裝JDK8,Maven证逻,Eclipse配置Maven管理工具
(2)Eclipse新建Maven項目乐埠,如下圖所示
[if !supportLists](3)[endif]點擊next,進入如下圖所示頁面
[if !supportLists](4)[endif]默認選項,點擊next進入如下頁面
[if !supportLists](5)[endif]點擊上圖中箭頭所指的按鈕Add Archetype進入如下所示頁面
[if !supportLists](6)[endif]填寫:
Archetype Group Id:org.apache.nifi
Archetype Artifact Id:nifi-processor-bundle-archetype
Archetype Version:1.2.0
點擊OK,可以看到nifi的archetype已經(jīng)添加到meven中了饮戳。
[if !supportLists](7)[endif]選擇org.apache.nifi豪治,點擊Next,如下圖所示
[if !supportLists](8)[endif]填寫項目信息后點擊finish完成項目創(chuàng)建
[if !supportLists](9)[endif]創(chuàng)建完成以后項目下新生成3個目錄扯罐,我們要開發(fā)的東西在nifi-nifitest-processors中完成负拟。
[if !supportLists](10)[endif]打開nifi-nifitest-processors目錄如下:
[if !supportLists]4.3. [endif]JSON Processor
現(xiàn)在自定義Nifi Processor的前期準備工作都做完了,可以開始構(gòu)建屬于我們自己的Processor了歹河。
(1)在包下新加類起名JsonProcessor使之繼承AbstractProcessor
@Tag標簽是為了在web GUI中掩浙,能夠使用搜索的方式快速找到我們自己定義的這個Processor。
@CapabilityDescription內(nèi)的值會暫時在Processor選擇的那個頁面中秸歧,相當于一個備注厨姚。
一般來說只需要繼承AbstractProcessor就可以了,但是某些復雜的任務可能需要去繼承更底層的AbstractSessionFactoryProcessor這個抽象類键菱。
@Tags({"JSON"})//快速搜索標簽
@CapabilityDescription("提取的json文件的屬性")
@SeeAlso({})
@ReadsAttributes({@ReadsAttribute(attribute="",description="")})
@WritesAttributes({@WritesAttribute(attribute="",description="")})
public class JsonProcessor extends AbstractProcessor{
(2)新建幾個PropertyDescriptor(接受頁面配置的參數(shù)谬墙,如果不需要,可以不進行配置)
public static final PropertyDescriptor JSON_PATH =?
new PropertyDescriptor
? ? ? .Builder().name("Json Path")
? ? ? .required(true)
? ? ? .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
? ? ? .build();
(3)新建幾個Relationship(輸出狀態(tài)经备,成功或者失敗或者其他)
public static final Relationship SUCCESS = new Relationship.Builder()
.name("success").description("SUCCESS")
.build();
public static final Relationship FAILURE = new Relationship.Builder()
.name("failure").description("FAILURE")
.build();
[if !supportLists](3)[endif]定義兩個集合添加上面創(chuàng)建的PropertyDescriptor和Relationship
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
descriptors.add(JSON_PATH);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<Relationship>();
relationships.add(SUCCESS);
relationships.add(FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
}
[if !supportLists](4)[endif]添加新創(chuàng)建屬性的get方法
public Set<Relationship> getRelationships(){
return relationships;
}
public final List<PropertyDescriptor> getSupportedPropertyDescriptors(){
return descriptors;
}
如上是初始化Nifi進程拭抬,由于Nifi是高度并發(fā)條件,所以descriptors和relationship是存儲在一個不可變的集合中侵蒙。
[if !supportLists](5)[endif]onTrigger方法中實現(xiàn)自己的業(yè)務造虎,onTrigger方法會在一個flowfile被傳入處理器? ? 時調(diào)用。
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final AtomicReference<String> value = new AtomicReference<>();
//獲取flowFile中的內(nèi)容
FlowFile flowFile = session.get();
session.read(flowFile, in ->{
try {
String json = IOUtils.toString(in);
String result = JsonPath.read(json, "$.hello");
value.set(result);
} catch (Exception e) {
e.printStackTrace();
getLogger().error("Failed to read json string");
}
});
//將讀取json數(shù)據(jù)寫入flowFile中
String results = value.get();
if(results != null && !results.isEmpty()) {
flowFile = session.putAttribute(flowFile, "match", results);
}
//將處理結(jié)果返回flowFile
flowFile = session.write(flowFile, out -> out.write(value.get().getBytes()));
session.transfer(flowFile , SUCCESS);
}
[if !supportLists]4.4. [endif]打包部署
找到文件org.apache.nifi.processor.Processor
在里面添加:包名+類名纷闺,將processor暴露出來
Maven運行nifitest算凿,會在項目nifi-nifitest-nar下生成一個nar的包,將包放在nifi目錄下的lib目錄中犁功,重新啟動nifi服務器即可查看到自己添加的processor了氓轰。
[if !supportLists]4.5. [endif]單元測試
Apache Nifi框架的單元測試是基于Junit的Test Runners的,在這一階段波桩,我們會將單元測試功能加入我們之前創(chuàng)建的JsonProcessor中戒努。
處理器或控制器服務的大多數(shù)單元測試都是通過創(chuàng)建 TestRunner 類的實例來開始的。為了向處理器添加必要的類镐躲,我們需要在maven中添加nifi對應的依賴:
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>${nifi version}</version>
</dependency>
(2)在測試中储玫,有幾個org.apache.nifi.utils包是需要被import的,比如TestRunner萤皂、TestRunners撒穷、MockFlowFile這三個類。
(3)在測試方法上添加@Test標簽裆熙,在添加了這個JUnit 標簽后端礼,就可以在方法中去初始化Nifi提供的TestRunner等組件了禽笑。
(4)創(chuàng)建一個TestRunner類,然后把自定義的Processor傳給它蛤奥,接著對其的PropertiesDescription進行傳值佳镜,為了測試可以模擬一個本地的json文件作為資源文件。
(5)當一個test runner創(chuàng)建時凡桥,使用runner.setProperties(PropertyDescriptor)以及runner.enqueue(content)進行值賦予蟀伸。然后使用一些斷言進行單元測試,測試結(jié)果情況缅刽。
public class TestProcessor{
@Test
public void testOnTrigger() throws IOException {
//json文件內(nèi)容
InputStream content = new ByteArrayInputStream("{\"hello\":\"nifi rocks\"}".getBytes());
//模擬處理器
TestRunner runner =? TestRunners.newTestRunner(new JsonProcessor());
//設置JSON_PATH
runner.setProperty(JsonProcessor.JSON_PATH, "$.hello");
//向處理器添加內(nèi)容
runner.enqueue(content);
runner.run(1);
? ? runner.assertQueueEmpty();
? ? List<MockFlowFile> results = runner.getFlowFilesForRelationship(JsonProcessor.SUCCESS);
? ? assertTrue("1 match", results.size() == 1);
? ? MockFlowFile result = results.get(0);
? ? String resultValue = new String(runner.getContentAsByteArray(result));
? ? System.out.println("Match: " + IOUtils.toString(runner.getContentAsByteArray(result)));
? ? //測試屬性和內(nèi)容
? ? result.assertAttributeEquals(JsonProcessor.MATCH_ATTR, "nifi rocks");
? ? result.assertContentEquals("nifi rocks");
}
}
[if !supportLists]5. [endif]Nifi RestAPI
為方便用戶使用NiFi 進行二次開發(fā)啊掏,NiFi 為開發(fā)者提供了 NIFI RestAPI。Rest Api提供實時命令和控制NiFi實例的編程訪問衰猛。啟動和停止處理器迟蜜,監(jiān)視隊列,查詢起源數(shù)據(jù)等啡省。
5.1.控制器
· 創(chuàng)建一個新公告
/controller/bulletin
· 獲取集群的內(nèi)容
/controller/cluster
· 獲取集群中的節(jié)點
/controller/cluster/nodes/{id}
· 檢索此NiFi控制器的配置
/controller/config
·? 創(chuàng)建一個新的控制器服務
/controller/controller-services
·? 清除歷史
/controller/history
·? 獲取可用注冊表客戶端的列表
/controller/registry-clients
·? 獲取注冊表客戶端
/controller/registry-clients/{id}
· 創(chuàng)建新的報告任務
/controller/reporting-tasks
5.2.控制器服務
獲取控制器的服務
獲取控制器服務屬性描述符/controller-services/{id}/descriptors
獲取控制器服務的狀態(tài)/controller-services/{id}/state
5.3.報告任務
· 獲取報告任務
/reporting-tasks/{id}
· 獲取報告任務屬性描述符
/reporting-tasks/{id}/descriptors
· 獲取報告任務的狀態(tài)
/reporting-tasks/{id}/state
· 清除報告任務的狀態(tài)
/reporting-tasks/{id}/state/clear-requests
5.4.服務器
· 獲取處理器
/processors/{id}
· 獲取處理器屬性的描述符
/processors/{id}/descriptors
· 獲取有關(guān)處理器的診斷信息
/processors/{id}/diagnostics
· 獲取處理器的狀態(tài)
/processors/{id}/state
· 清除處理器的狀態(tài)
/processors/{id}/state/clear-requests
· 終止處理器娜睛,實質(zhì)上是“刪除”其線程和任何活動任務
/processors/{id}/threads
5.5.連接
· 獲取連接/connections/{id}
5.6.? FlowFile隊列
· 創(chuàng)建刪除此連接中隊列內(nèi)容的請求。/flowfile-queues/{id}/drop-requests
· 獲取指定連接的丟棄請求的當前狀態(tài)/flowfile-queues/{id}/drop-requests/{drop-request-id}
· 從Connection獲取FlowFile
/flowfile-queues/{id}/flowfiles/{flowfile-uuid}
· 獲取Connection中FlowFile的內(nèi)容
/flowfile-queues/{id}/flowfiles/{flowfile-uuid}/content
· 列出此連接中隊列的內(nèi)容
/flowfile-queues/{id}/listing-requests
· 獲取指定連接的列表請求的當前狀態(tài)
/flowfile-queues/{id}/listing-requests/{listing-request-id}
[if !supportLists]6. [endif]集群壓力測試
[if !supportLists]6.1. [endif]壓力測試場景
1.1背景
考慮到大數(shù)據(jù)管理平臺有數(shù)據(jù)接入量大卦睹、數(shù)據(jù)源多樣化微姊、對數(shù)據(jù)的完整性和容錯率要求高、延遲率低等特點分预,因此計劃對Nifi的數(shù)據(jù)完整性、異常狀態(tài)下的容錯性以及服務器在高負載情況下的性能做一個全面的測試評估薪捍,以便于了解nifi的優(yōu)點和缺陷笼痹,從而優(yōu)化整個大數(shù)據(jù)管理平臺架構(gòu)。1.2測試概要(1)測試環(huán)境Nifi 1.6.0 集群版(4個節(jié)點)
(2)測試目標· 數(shù)據(jù)完整性測試酪穿〉矢桑· 異常狀態(tài)容錯機制測試”患茫· 不同負載下的響應時間測試救赐。· Nifi集群模式下的主從切換測試只磷。
[if !supportLists]6.2. [endif]壓力測試結(jié)果
2.1積壓數(shù)據(jù)量越大经磅,數(shù)據(jù)處理性能越差,處理時間隨著數(shù)據(jù)量的增加呈指數(shù)級增長钮追。2.2 數(shù)據(jù)是否丟失和連接池最大連接數(shù)參數(shù)以及批量處理SQL的批次條數(shù)有關(guān)预厌,這個應該是數(shù)據(jù)處理代碼層面的BUG,和Nifi本身無關(guān)元媚。Nifi的數(shù)據(jù)完整性在小數(shù)據(jù)量下還是可以的轧叽。大數(shù)據(jù)量時候?qū)?shù)優(yōu)化要求顯得比較嚴格苗沧。 2.3 數(shù)據(jù)處理速度和SQL批量處理的批次條數(shù)有關(guān),每批處理的越多炭晒,處理性能越好待逞。2.4? Nifi自身發(fā)生錯誤: Nifi集群的節(jié)點如果有宕機情況,會導致整個集群的任務流程無法啟動网严,主節(jié)點掛掉會導致nifi的UI界面不可用识樱。如果在任務執(zhí)行過程中kill掉某個節(jié)點進程,會發(fā)生丟失數(shù)據(jù)情況屿笼,必須等待節(jié)點重新啟動后數(shù)據(jù)會自動恢復牺荠。 2.5 處理模塊發(fā)生錯誤:如:mysql掛掉后,數(shù)據(jù)會自動在入庫操作的上游堆積驴一,等待數(shù)據(jù)庫恢復休雌。數(shù)據(jù)庫恢復后,可以完成自動入庫肝断,整體數(shù)據(jù)無丟失杈曲。Kafka掛掉后數(shù)據(jù)流也會進入等待,直至kafka恢復后數(shù)據(jù)自動流轉(zhuǎn)胸懈。
[if !supportLists]6.3. [endif]性能調(diào)優(yōu)
3.1 Nifi的數(shù)據(jù)完整性還是有保障的担扑,測試中出現(xiàn)的數(shù)據(jù)丟失問題主要是由于現(xiàn)在的代碼層面對入庫失敗的數(shù)據(jù)未做處理造成的。 3.2 Nifi集群的處理性能和穩(wěn)定性遠高于Nifi單機模式趣钱。 3.3 Nifi集群的處理性能和數(shù)據(jù)冗余量有直接關(guān)系涌献,即nifi處理數(shù)據(jù)主要依賴磁盤IO。 3.4 Nifi自身的集群容錯率較低首有,并非傳統(tǒng)的主從結(jié)構(gòu)燕垃,但對數(shù)據(jù)處理模塊中的組件容錯率較強。