1.介紹
本開(kāi)發(fā)人員指南的目的是為讀者提供了解Apache NiFi擴(kuò)展的開(kāi)發(fā)方式所需的信息屯曹,并幫助他們解釋開(kāi)發(fā)組件背后的思考過(guò)程捆交。它提供了對(duì)用于開(kāi)發(fā)擴(kuò)展的API的介紹和說(shuō)明残揉。但是列林,由于本指南旨在補(bǔ)充API的JavaDoc,而不是替換它們,因此并未詳細(xì)介紹API中的每種方法予权。本指南還假定讀者熟悉Java 7和Apache Maven。
2.NiFi組件
NiFi提供了幾個(gè)擴(kuò)展點(diǎn)浪册,使開(kāi)發(fā)人員能夠向應(yīng)用程序添加功能以滿足他們的需求扫腺。以下列表對(duì)最常見(jiàn)的擴(kuò)展點(diǎn)進(jìn)行了高級(jí)描述:
- 處理器(Processer)
Processor接口的機(jī)制是通過(guò)NiFi暴露訪問(wèn)FlowFile,和其屬性和內(nèi)容 村象。Processor是構(gòu)成NiFi數(shù)據(jù)流的基本模塊笆环。Processor可以完成以下任務(wù):
創(chuàng)建FlowFiles,讀取FlowFile內(nèi)容厚者,編寫FlowFile內(nèi)容躁劣,讀取FlowFile屬性,更新FlowFile屬性库菲,提取數(shù)據(jù)账忘,出口數(shù)據(jù),路線數(shù)據(jù)熙宇,提取數(shù)據(jù)鳖擒,修改資料 - ReportingTask
ReportingTask接口的機(jī)制是允許將度量標(biāo)準(zhǔn),監(jiān)視信息和內(nèi)部NiFi狀態(tài)發(fā)布到外部端點(diǎn)烫止,例如日志文件蒋荚,電子郵件和遠(yuǎn)程Web服務(wù)。 - ControllerService
ControllerService在單個(gè)JVM中跨處理器馆蠕,其他ControllerService和ReportingTask提供共享狀態(tài)和功能期升。一個(gè)示例用例可能包括將非常大的數(shù)據(jù)集加載到內(nèi)存中。通過(guò)在ControllerService中執(zhí)行此工作互躬,數(shù)據(jù)可以一次加載并通過(guò)此服務(wù)公開(kāi)給所有處理器播赁,而不需要許多不同的處理器自己加載數(shù)據(jù)集。 - FlowFilePrioritizer
FlowFilePrioritizer接口提供了一種機(jī)制吨铸,通過(guò)該機(jī)制行拢,可以對(duì)隊(duì)列中的FlowFile進(jìn)行優(yōu)先級(jí)排序或排序,以便可以按對(duì)特定用例最有效的順序處理FlowFiles诞吱。 - 授權(quán)提供者
AuthorityProvider負(fù)責(zé)確定應(yīng)授予給定用戶哪些特權(quán)和角色(如果有)舟奠。
3.處理器API
處理器是NiFi中使用最廣泛的組件。處理器是唯一有權(quán)訪問(wèn)以創(chuàng)建房维,刪除沼瘫,修改或檢查FlowFiles(數(shù)據(jù)和屬性)的組件。
使用Java的ServiceLoader機(jī)制加載和實(shí)例化所有處理器咙俩。這意味著所有處理器都必須遵守以下規(guī)則:
處理器必須具有默認(rèn)構(gòu)造函數(shù)耿戚。
處理器的JAR文件必須包含META-INF / services目錄中名為的條目 org.apache.nifi.processor.Processor湿故。這是一個(gè)文本文件,其中每一行都包含處理器的完全限定的類名膜蛔。
盡管Processor是可以直接實(shí)現(xiàn)的接口坛猪,但是這樣做org.apache.nifi.processor.AbstractProcessor幾乎是很少見(jiàn)的,因?yàn)閹缀跛刑幚砥鲗?shí)現(xiàn)都是該類的基類皂股。本AbstractProcessor類提供的功能的顯著量墅茉,這使得開(kāi)發(fā)的處理器更容易,更方便的任務(wù)呜呐。對(duì)于本文檔就斤,我們將AbstractProcessor在處理Processor API時(shí)重點(diǎn)關(guān)注該類。
并發(fā)注釋
NiFi是一個(gè)高度并發(fā)的框架蘑辑。這意味著所有擴(kuò)展都必須是線程安全的洋机。如果不熟悉用Java編寫并發(fā)軟件,強(qiáng)烈建議您熟悉Java并發(fā)性原理洋魂。
3.1支持API
為了理解Processor API绷旗,我們必須首先(至少在較高層次上)理解幾個(gè)支持的類和接口,下面將對(duì)其進(jìn)行討論忧设。
3.1.1 FlowFile
FlowFile是一種邏輯概念刁标,它使一條數(shù)據(jù)與該數(shù)據(jù)的一組屬性相關(guān)聯(lián)颠通。這些屬性包括FlowFile的唯一標(biāo)識(shí)符址晕,其名稱,大小以及任何其他特定于流的值顿锰。雖然FlowFile的內(nèi)容和屬性可以更改谨垃,但FlowFile對(duì)象是不可變的。通過(guò)ProcessSession可以對(duì)FlowFile進(jìn)行修改硼控。
FlowFiles的核心屬性在org.apache.nifi.flowfile.attributes.CoreAttributes枚舉中定義刘陶。你會(huì)看到最常用的屬性是filename,path和uuid牢撼。括號(hào)中的字符串是CoreAttributes枚舉中屬性的值以及它在UI / API中的顯示方式匙隔。
- 文件名(filename):FlowFile的文件名。文件名不應(yīng)包含任何目錄結(jié)構(gòu)熏版。
- UUID(uuid):分配給此FlowFile的通用唯一標(biāo)識(shí)符纷责,用于將FlowFile與系統(tǒng)中的其他FlowFile區(qū)分開(kāi)。
- 路徑(path):FlowFile的路徑指示FlowFile所屬的相對(duì)目錄撼短,并且不包含文件名再膳。
絕對(duì)路徑(absolute.path):FlowFile的絕對(duì)路徑指示FlowFile所屬的絕對(duì)目錄,并且不包含文件名曲横。 - 優(yōu)先級(jí)(priority):表示FlowFile優(yōu)先級(jí)的數(shù)值喂柒。
- MIME類型(mime.type):此FlowFile的MIME類型。
- 丟棄原因(discard.reason):指定丟棄FlowFile的原因。
- 備用標(biāo)識(shí)符(alternate.identifier):表示除FlowFile的UUID之外的已知標(biāo)識(shí)符灾杰,該標(biāo)識(shí)符引用該FlowFile蚊丐。
Additional Common Attributes
3.1.2 ProcessSession
ProcessSession,通常簡(jiǎn)稱為“session”艳吠,提供了一種機(jī)制吠撮,通過(guò)它可以實(shí)現(xiàn)FlowFiles創(chuàng)建,銷毀讲竿,檢查泥兰,克隆以及將其轉(zhuǎn)移到其他處理器。此外题禀,ProcessSession提供了一種機(jī)制鞋诗,用于通過(guò)添加或刪除屬性或修改FlowFile的內(nèi)容來(lái)創(chuàng)建修改版本的FlowFiles。ProcessSession還公開(kāi)了一種發(fā)出來(lái)源事件的機(jī)制迈嘹,該機(jī)制提供了跟蹤FlowFile的傳輸路徑和歷史記錄的功能削彬。在一個(gè)或多個(gè)FlowFiles上執(zhí)行操作后,可以提交或回滾ProcessSession秀仲。
3.1.3 ProcessContext
ProcessContext在處理器與框架之間的提供了橋梁融痛。它提供了處理器怎樣進(jìn)行當(dāng)前配置和允許處理器執(zhí)行特定框架的任務(wù)的信息,例如產(chǎn)生其資源神僵,以便框架可以調(diào)度其他處理器運(yùn)行雁刷,而無(wú)需消耗不必要的資源。
3.1.4 PropertyDescriptor
PropertyDescriptor定義一個(gè)可以由Processor保礼,ReportingTask或ControllerService使用的屬性沛励。屬性的定義包括其名稱,屬性描述炮障,可選的默認(rèn)值目派,驗(yàn)證邏輯,以及有關(guān)是否需要該屬性才能使Processor有效的指示符胁赢。通過(guò)實(shí)例化該類的實(shí)例PropertyDescriptor.Builder 企蹭,調(diào)用適當(dāng)?shù)姆椒ㄒ蕴畛溆嘘P(guān)屬性的詳細(xì)信息并最終調(diào)用該build方法來(lái)創(chuàng)建PropertyDescriptor 。
3.1.5 Validator
PropertyDescriptor必須指定一個(gè)或多個(gè)驗(yàn)證器智末,這些驗(yàn)證器可用于確保用戶輸入的屬性值有效谅摄。如果驗(yàn)證器指示屬性值無(wú)效,組件無(wú)法運(yùn)行或者說(shuō)在屬性變?yōu)橛行е安荒苁褂么岛ΑH绻粗付?yàn)證器螟凭,則該組件將被視為無(wú)效,并且NiFi將報(bào)告該屬性不受支持它呀。
3.1.6 ValidationContext
驗(yàn)證屬性值時(shí)螺男,ValidationContext用于獲取ControllerServices棒厘,創(chuàng)建PropertyValue對(duì)象,以及使用表達(dá)式語(yǔ)言編譯和評(píng)估屬性值下隧。
3.1.7 PropertyValue
所有返回給Processor的屬性值都以PropertyValue對(duì)象的形式返回奢人。該對(duì)象具有方便的方法,可以將值從字符串轉(zhuǎn)換為其他形式淆院,例如數(shù)字和時(shí)間段何乎,以及提供用于評(píng)估表達(dá)式語(yǔ)言的API。
3.1.8 Relationship
Relationship定義了FlowFile可能從處理器傳輸?shù)降穆酚赏帘纭elationships通過(guò)實(shí)例化Relationship.Builder 創(chuàng)建支救,調(diào)用適當(dāng)?shù)姆椒▉?lái)填充Relationship的詳細(xì)信息,最后調(diào)用該build()方法 拷淘。
3.1.9 StateManager
StateManager為處理器各墨,報(bào)告任務(wù)和控制器服務(wù)提供了一種易于存儲(chǔ)和檢索狀態(tài)的機(jī)制。該API與ConcurrentHashMap相似启涯,但是每個(gè)操作都需要一個(gè)Scope贬堵。Scope表明是在局部還是在整個(gè)集群范圍內(nèi)檢索/存儲(chǔ)。有關(guān)更多信息结洼,請(qǐng)參見(jiàn) 狀態(tài)管理器部分黎做。
3.1.10 ProcessorInitializationContext
創(chuàng)建處理器后,它的initialize()方法將被調(diào)用松忍,同時(shí)創(chuàng)建InitializationContext對(duì)象蒸殿。該對(duì)象向處理器暴露在整個(gè)處理器的生命周期內(nèi)都不會(huì)改變的配置,例如處理器的唯一標(biāo)識(shí)符挽铁。
3.1.11 組件日志
鼓勵(lì)處理器通過(guò)該ComponentLog接口處理其日志記錄 伟桅,而不是由第三方記錄器的實(shí)例獲得。這是因?yàn)橥ㄟ^(guò)ComponentLog進(jìn)行的日志記錄允許框架將超出可配置嚴(yán)重性級(jí)別的日志消息呈現(xiàn)給用戶界面叽掘,允許在發(fā)生重要事件時(shí)通知監(jiān)視數(shù)據(jù)流的人員。此外玖雁,它為所有的處理器提供一種統(tǒng)一的日志格式更扁,通過(guò)在調(diào)試模式下記錄堆棧跟蹤并在日志消息中提供處理器的唯一標(biāo)識(shí)符。
3.2 AbstractProcessor API
由于絕大多數(shù)處理器將通過(guò)繼承AbstractProcessor來(lái)創(chuàng)建赫冬,因此我們將在本節(jié)中研究它的抽象類浓镜。AbstractProcessor提供了一些處理器開(kāi)發(fā)人員感興趣的方法。如下:
3.2.1 Processor Initialization
當(dāng)要?jiǎng)?chuàng)建處理器時(shí)劲厌,在其他方法被調(diào)用之前膛薛,AbstractProcessor 的init()方法將被調(diào)用。該方法只有一個(gè)參數(shù)补鼻,類型為 ProcessorInitializationContext哄啄。上下文對(duì)象為處理器提供了ComponentLog雅任,處理器的唯一標(biāo)識(shí)符和ControllerServiceLookup,可用于與已配置的ControllerServices進(jìn)行交互咨跌。每個(gè)這樣的對(duì)象是由AbstractProcessor存儲(chǔ)沪么,并且可以由子類經(jīng)由getLogger,getIdentifier和 getControllerServiceLookup方法分別獲得锌半。
3.2.2 Exposing Processor’s Relationships
為了使處理器將FlowFile傳輸?shù)叫碌哪康牡剡M(jìn)行后續(xù)處理禽车,處理器必須首先能夠向框架公開(kāi)其當(dāng)前支持的所有關(guān)系订歪。這允許應(yīng)用程序的用戶通過(guò)在處理器之間創(chuàng)建連接并為這些連接分配適當(dāng)?shù)年P(guān)系來(lái)將處理器彼此連接务热。
處理器通過(guò)覆蓋getRelationships方法公開(kāi)有效的關(guān)系集 。這個(gè)方法沒(méi)有參數(shù)舵变,并返回Set的Relationship 對(duì)象记焊。對(duì)于大多數(shù)處理器钦勘,此Set將是靜態(tài)的,但是其他處理器將根據(jù)用戶配置動(dòng)態(tài)生成Set亚亲。對(duì)于那些Set是靜態(tài)的Processor彻采,建議在Processor的構(gòu)造函數(shù)或init方法中創(chuàng)建一個(gè)不可變的Set并返回該值,而不是動(dòng)態(tài)生成Set捌归。這種模式使其更干凈的代碼和更好的性能肛响。
3.2.3 Exposing Processor’s Properties
大多數(shù)處理器在使用前都需要一定數(shù)量的用戶配置。處理器支持的屬性通過(guò)getSupportedPropertyDescriptors方法公開(kāi)給框架 惜索。這個(gè)方法沒(méi)有參數(shù)特笋,并返回List的 PropertyDescriptor對(duì)象。列表中對(duì)象的順序很重要巾兆,因?yàn)樗鼪Q定了在用戶界面中呈現(xiàn)屬性的順序猎物。
甲PropertyDescriptor目的是通過(guò)創(chuàng)建一個(gè)新的實(shí)例構(gòu)造PropertyDescriptor.Builder對(duì)象,調(diào)用構(gòu)建器的適當(dāng)?shù)姆椒ń撬埽⒆罱K調(diào)用build方法蔫磨。
盡管此方法涵蓋了大多數(shù)用例,但有時(shí)還是希望允許用戶配置名稱未知的其他屬性圃伶。這可以通過(guò)重寫getSupportedDynamicPropertyDescriptor方法來(lái)實(shí)現(xiàn) 堤如。此方法以String作為其唯一參數(shù),該參數(shù)指示屬性的名稱搀罢。該方法返回一個(gè)PropertyDescriptor對(duì)象侥猩,該 對(duì)象可用于驗(yàn)證屬性名稱和值欺劳。從此方法返回的任何PropertyDescriptor都應(yīng)構(gòu)建isDynamic,并將PropertyDescriptor.Builder類中的值設(shè)置為true 彩匕。AbstractProcessor的默認(rèn)行為是不允許任何動(dòng)態(tài)創(chuàng)建的屬性驼仪。
3.2.4 Validating Processor Properties
如果處理器的配置無(wú)效绪爸,則無(wú)法啟動(dòng)它宙攻∽颍可以通過(guò)在PropertyDescriptor上設(shè)置Validator或通過(guò)PropertyDescriptor.Builder的allowableValues方法來(lái)限制屬性的允許值溢陪,來(lái)實(shí)現(xiàn)Processor屬性的驗(yàn)證identifiesControllerService形真。
但是咆霜,有時(shí)僅驗(yàn)證處理器的屬性還不夠。為此光酣,AbstractProcessor公開(kāi)了一個(gè)customValidate方法挂疆。該方法采用type的單個(gè)參數(shù)ValidationContext。此方法的返回值是一個(gè)Collection的 ValidationResult描述驗(yàn)證過(guò)程中發(fā)現(xiàn)的任何問(wèn)題的對(duì)象视事。僅應(yīng)返回isValid方法返回的那些ValidationResult對(duì)象 false俐东。僅當(dāng)所有屬性均根據(jù)其關(guān)聯(lián)的“驗(yàn)證器”和“允許值”有效時(shí)虏辫,才會(huì)調(diào)用此方法砌庄。即娄昆,僅當(dāng)所有屬性本身都有效時(shí)萌焰,才調(diào)用此方法扒俯,并且此方法允許對(duì)處理器的配置進(jìn)行整體驗(yàn)證撼玄。
3.2.5 Responding to Changes in Configuration
有時(shí)需要讓處理器在其屬性發(fā)生更改時(shí)迅速做出反應(yīng)互纯。該onPropertyModified 方法允許處理器做到這一點(diǎn)留潦。當(dāng)用戶更改處理器的屬性值時(shí),onPropertyModified將為每個(gè)修改后的屬性調(diào)用該 方法。該方法采用三個(gè)參數(shù):PropertyDescriptor(指示修改了哪個(gè)屬性)孵稽,舊值和新值十偶。如果該屬性沒(méi)有先前的值惦积,則第二個(gè)參數(shù)為null狮崩。如果刪除了該屬性,則第三個(gè)參數(shù)為null毡熏。重要的是要注意痢法,無(wú)論值是否有效疯暑,都將調(diào)用此方法。僅在實(shí)際修改值時(shí)才調(diào)用此方法越锈,而不是在用戶更新Processor而不更改其值時(shí)調(diào)用此方法甘凭。在調(diào)用此方法時(shí)丹弱,可以確保調(diào)用此方法的線程是處理器中當(dāng)前正在執(zhí)行代碼的唯一線程躲胳,除非處理器本身創(chuàng)建了自己的線程坯苹。
3.2.6 Performing the Work
當(dāng)處理器有工作要做時(shí)摇天,安排它onTrigger通過(guò)框架調(diào)用其方法來(lái)這樣做。該方法有兩個(gè)參數(shù):a ProcessContext和a ProcessSession为鳄。該onTrigger方法的第一步通常是通過(guò)get在ProcessSession上調(diào)用方法之一來(lái)獲取要在其上執(zhí)行工作的FlowFile 。對(duì)于從外部來(lái)源將數(shù)據(jù)導(dǎo)入NiFi的處理器司训,將跳過(guò)此步驟壳猜。然后统扳,處理器可以自由檢查FlowFile屬性咒钟。添加,刪除或修改屬性萍嬉;讀取或修改FlowFile內(nèi)容壤追;并將FlowFiles傳輸?shù)竭m當(dāng)?shù)腞elationships。
3.2.7 When Processors are Triggered
onTrigger僅當(dāng)計(jì)劃運(yùn)行處理器并且該處理器存在工作時(shí)悼做,才會(huì)調(diào)用處理器的方法。如果滿足以下任何條件羹与,則據(jù)說(shuō)存在處理器的工作:
以處理器為目的地的連接在其隊(duì)列中至少有一個(gè)FlowFile
處理器沒(méi)有傳入的連接
處理器帶有@TriggerWhenEmpty批注
存在一些因素,這些因素將在onTrigger調(diào)用Processor 方法時(shí)起作用腾誉。首先趣效,除非用戶將處理器配置為運(yùn)行猪贪,否則不會(huì)觸發(fā)處理器热押。如果安排處理器運(yùn)行拥褂,則框架會(huì)定期(該時(shí)間段由用戶在用戶界面中配置)檢查處理器是否有工作要做,如上所述尤慰。如果是這樣,框架將檢查處理器的下游目標(biāo)责蝠。如果處理器的任何出站連接已滿,則默認(rèn)情況下不會(huì)安排處理器運(yùn)行。
但是医男,@TriggerWhenAnyDestinationAvailable可以將注釋添加到處理器的類踱启。在這種情況下透罢,將更改需求芽隆,以使只有一個(gè)下游目標(biāo)必須“可用”(如果連接隊(duì)列未滿牙躺,則將目標(biāo)視為“可用”)吨掌,而不是要求所有下游目標(biāo)均可用炼幔。
@TriggerSerially 注釋也與處理器調(diào)度有關(guān)肛著。使用此注釋的處理器永遠(yuǎn)不會(huì)有多個(gè)線程onTrigger同時(shí)運(yùn)行該方法刀脏。但是耀态,必須指出的是擎析,執(zhí)行代碼的線程可能會(huì)因調(diào)用而改變棚瘟。因此瞬内,仍必須注意確保處理器是線程安全的迷雪!
3.3 生命周期
NiFi API通過(guò)使用Java注解提供生命周期支持。該org.apache.nifi.annotation.lifecycle軟件包包含一些用于生命周期管理的注釋虫蝶。以下注釋可以應(yīng)用于NiFi組件中的Java方法章咧,以指示框架何時(shí)應(yīng)調(diào)用這些方法。為了討論組件生命周期能真,我們將NiFi組件定義為Processor赁严,ControllerServices或ReportingTask。
3.3.1 @OnAdded
3.3.2@OnEnabled
3.3.3 @OnRemoved
3.3.4 @OnScheduled
3.3.5 @OnUnscheduled
3.3.6 @OnStopped
3.3.7 @OnShutdown
3.4 組件通知
3.4.1 @OnPrimaryNodeStateChange
3.5 Restricted
受限組件是一種組件粉铐,可以用來(lái)執(zhí)行操作員通過(guò)NiFi REST API / UI提供的任意未經(jīng)消毒的代碼疼约,也可以用于使用NiFi OS憑據(jù)在NiFi主機(jī)系統(tǒng)上獲取或更改數(shù)據(jù)。這些組件可能會(huì)由其他經(jīng)過(guò)授權(quán)的NiFi用戶使用蝙泼,以超出應(yīng)用程序的預(yù)期用途程剥,提升特權(quán),或者可能公開(kāi)有關(guān)NiFi進(jìn)程或主機(jī)系統(tǒng)內(nèi)部的數(shù)據(jù)踱承。所有這些功能都應(yīng)被視為特權(quán)倡缠,管理員應(yīng)意識(shí)到這些功能,并為一部分受信任的用戶顯式啟用它們茎活。
可以使用@Restricted批注標(biāo)記處理器昙沦,控制器服務(wù)或報(bào)告任務(wù)。這將導(dǎo)致該組件被視為受限組件载荔,并要求將用戶明確添加到可以訪問(wèn)受限組件的用戶列表中盾饮。一旦允許用戶訪問(wèn)受限制的組件,將在允許所有其他權(quán)限的情況下允許他們創(chuàng)建和修改那些組件懒熙。如果無(wú)法訪問(wèn)受限制的組件丘损,則用戶仍然會(huì)知道這些類型的組件的存在,但是即使擁有足夠的權(quán)限工扎,也無(wú)法創(chuàng)建或修改它們徘钥。