翻譯作品,水平有限放前,如有錯誤干奢,煩請留言指正痊焊。原文請見 官網(wǎng)英文文檔
引言
概述
Apache Flume是一個分布式的、可靠的、易用的系統(tǒng)薄啥,可以有效地將來自很多不同源系統(tǒng)的大量日志數(shù)據(jù)收集辕羽、匯總或者轉(zhuǎn)移到一個數(shù)據(jù)中心存儲。
Apache Flume的作用不僅限于日志匯總垄惧,因為數(shù)據(jù)源是可以自定義的刁愿,F(xiàn)lume也可以被用于傳輸大量的事件數(shù)據(jù),包括但不限于網(wǎng)絡(luò)流量數(shù)據(jù)到逊、社交媒體產(chǎn)生的數(shù)據(jù)酌毡、電子郵件和幾乎所有可能的數(shù)據(jù)源。
Apache Flume是Apache軟件基金會的頂級項目蕾管。
目前有兩個已發(fā)布的代碼版本可以使用枷踏,分別是0.9.x和1.x.
0.9.x系列的文檔獲取方式是 Flume 0.9.x User Guide.
這一篇文檔適用于1.4.x系列版本。
鼓勵新用戶和現(xiàn)存用戶使用1.x系列的發(fā)布版本掰曾,因為在最新的版本中具有顯著的性能提升和靈活配置的特性旭蠕。
系統(tǒng)要求
- java運行環(huán)境-java 1.6或者更新(推薦使用1.7)
- 內(nèi)存-有足夠的內(nèi)存滿足于sources、channels旷坦、和sinks的配置
- 存儲空間-有足夠的空間滿足于channels和sinks的配置
- 目錄權(quán)限-agent使用的目錄要有讀寫權(quán)限
架構(gòu)
數(shù)據(jù)流模型
Flume event是一個數(shù)據(jù)流的基本單元掏熬,包含一個字節(jié)的載荷和一些可選的字符屬性;Flume agent是一個JVM進程包含一些組件秒梅,通過這些組件旗芬,event從一個外部源流向下一個目的地(即一跳的傳輸)。
Flume source消費像web server一樣的外部source交付給它的event捆蜀,外部source是以一種可以被目標Flume source識別的格式向Flume發(fā)送event疮丛。例如,一個Avro Flume source可以接收Avro event辆它,這些event可以來自Avro client誊薄,也可以來自流中其它的Flume agent從Avro sink中發(fā)送的event。一個類似的數(shù)據(jù)流可以是這樣的锰茉,Thrift Flume Source可以接收的event包括來自Thrift Sink呢蔫,來自Flume Thrift RPC Client,來自其他任何語言根據(jù)Flume Thrift協(xié)議實現(xiàn)的Thrift Client飒筑。當Flume Source接收到一個event時片吊,它會將其存儲到一個或者多個Channel中,Channel是一個被動存儲协屡,它會存儲接收的event一直到該event被Flume Sink消費掉俏脊。就拿file channel舉例,它是依靠本地文件系統(tǒng)的著瓶。Sink將event從channel中清除掉联予,然后把它放到一個外部的倉庫(如HDFS)中啼县,這可以通過Flume HDFS Sink實現(xiàn),或者把它傳遞給整個流中下一個Flume agent(即下一跳)的Flume Source沸久。因為有channel的存儲季眷,在一個給定的agent中的Source和Sink可以分階段地異步地處理event。
復(fù)雜數(shù)據(jù)流
Flume允許用戶創(chuàng)建多跳的流程卷胯,其中的event在到達最終的目的地之前會經(jīng)過多個agent子刮。Flume也允許扇入扇出(扇入:一個模塊被多個模塊調(diào)用,扇出:一個模塊調(diào)用多個模塊)模式的數(shù)據(jù)流窑睁,基于上下文的路由和作為失敗跳轉(zhuǎn)的備份路由(故障轉(zhuǎn)移)挺峡。
可靠性
在每一個agent中的event都是存儲在相應(yīng)的channel中的,然后這些event才會被傳送到下一個agent或者最終的存儲系統(tǒng)(如HDFS)担钮,這些event只用存儲在下一個agent中的channel中或者最終存儲系統(tǒng)中之后才會從當前的channel中清除橱赠。這也就是Flume在一次跳轉(zhuǎn)的數(shù)據(jù)流中如何提供端到端可靠性的。
Flume使用一種事務(wù)性的方法來保證event傳送過程中的可靠性箫津,Source和Sink封裝在了一個存儲/檢索event的事務(wù)中狭姨,分別對應(yīng)channel提供的放入/取出的事務(wù),這就保證了event在數(shù)據(jù)流的點對點傳輸過程中的可靠性苏遥。在多跳數(shù)據(jù)流的情形下饼拍,上一跳的sink和下一跳的source它們在一個事務(wù)中運行,確保數(shù)據(jù)能夠安全地存儲到下一跳的channel中田炭。
可恢復(fù)性
Event被存儲在channel中师抄,同時channel管理著事務(wù)的失敗恢復(fù)。Flume有一個基于本地文件系統(tǒng)的可持久化的文件channel教硫,還有一個內(nèi)存channel叨吮,它能簡單地將event存儲在內(nèi)存隊列中,它的速度更快栋豫,但是當一個agent進程死掉的時候挤安,還在channel中的event就不能恢復(fù)了谚殊。
安裝
安裝agent
Flume agent的配置是存儲在本地的配置文件中的丧鸯,這是一個文本文件,它是java properties文件格式嫩絮,可以在一個配置文件中指定一個或者多個agent的配置丛肢。這個配置文件包括一個agent中每一個source,sink和channel的屬性剿干,而且還有它們是如何綁在一些形成數(shù)據(jù)流的蜂怎。
配置單個組件
在數(shù)據(jù)流中的每一個組件(source,sink或者channel)都有一個name置尔,type和一些跟這些type和實例相關(guān)的屬性杠步,例如,一個Avro source需要一個hostname(或者Ip地址)和一個端口號來接收數(shù)據(jù),一個內(nèi)存channel需要有最大的隊列長度(容量)幽歼,一個HDFS sink需要一個已知的文件系統(tǒng)URI朵锣,創(chuàng)建文件路徑和文件的循環(huán)頻次(hdfs.rollInterval)等等。一個組件的所有這些屬性都被設(shè)置在Flume agent的properties文件中甸私。
組件串聯(lián)
agent需要知道哪一個組件需要加載诚些,它們是怎樣有序地連接在一起組成一個數(shù)據(jù)流的。這就需要列舉出一個agent中的每一個source皇型,sink和channel诬烹,并且給每一個source和sink指定連接的channel。例如弃鸦,一個agent從一個叫做“avroWeb”的Avro source中將event傳送到一個叫做“hdfs-cluster1”的HDFS sink中绞吁,它們之間是通過一個叫做“file-channel”的文件channel,因此在配置文件中需要包含這些組件的名字和avroWeb source and hdfs-cluster1 sink共享的通道file-channel唬格。
啟動一個agent
agent是用一個叫做flume-ng的shell腳本啟動的掀泳,這個腳本位于flume的bin目錄下面,執(zhí)行時你需要指定agent的名字西轩,配置文件目錄和配置文件员舵,命令如下:
$ bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template
現(xiàn)在agent將啟動運行指定properties文件配置的source和sink。
一個簡單的例子
這里我們給出了一個配置文件的例子藕畔,描述了一個單點的flume的部署马僻,這個配置文件能使用戶生產(chǎn)的event順序地打印在console控制臺上。
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
在這個配置文件里面僅定義了一個agent注服,它的名字叫a1韭邓,a1有一個source監(jiān)聽的是端口44444的數(shù)據(jù),有一個channel是在內(nèi)存中緩存event數(shù)據(jù)溶弟,還有一個sink將event數(shù)據(jù)打印到console控制臺女淑。在這個配置文件中配置了多個組件,然后描述了它們的type和配置參數(shù)辜御。一個給定的配置文件可以指定多個不同名字的agent鸭你,當一個flume進程啟動的時候,一個標志會被傳進去告訴它啟動哪一個agent擒权。
考慮這個配置文件袱巨,我們可以使用下面的命令來啟動flume:
$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
注:在實際的部署中我們通常會多包含一個參數(shù):--conf=<conf-dir>
,這個目錄<conf-dir>
包含一個shell腳本 flume-env.sh 和一個強大的log4j的properties文件碳抄,在這個例子中我們還傳進去了一個java選項愉老,強制Flume將日志打印在控制臺console,而不是用定制的環(huán)境腳本剖效。
在另外一個終端我們可以telnet端口44444嫉入,發(fā)送給flume一個event:
$ telnet localhost 44444
Trying 127.0.0.1...
Connected to localhost.localdomain (127.0.0.1).
Escape character is '^]'.
Hello world! <ENTER>
OK
在原來的flume的終端上將會輸出這個event的log信息:
12/06/19 15:32:19 INFO source.NetcatSource: Source starting
12/06/19 15:32:19 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]
12/06/19 15:32:34 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 77 6F 72 6C 64 21 0D Hello world!. }
祝賀焰盗,你已經(jīng)成功地配置并部署了一個Flume agent,在接下來的部分會更加詳細地介紹agent的配置信息咒林。
基于Zookeeper的配置
Flume支持通過zookeeper配置agent姨谷,但這是一個實驗性特性,配置文件需要上傳到zookeeper上去映九,在一個可配置的前綴下面梦湘,這個配置文件被存儲在zookeeper的節(jié)點數(shù)據(jù)中,下面是zookeeper的節(jié)點樹件甥,比如 agent a1和a2.
- /flume
|- /a1 [Agent config file]
|- /a2 [Agent config file]
配置文件上傳到zookeeper之后捌议,agent的啟動命令如下:
$ bin/flume-ng agent –conf conf -z zkhost:2181,zkhost1:2181 -p /flume –name a1 -Dflume.root.logger=INFO,console
參數(shù)解釋如下:
參數(shù)名 | 默認值 | 描述 |
---|---|---|
z | - | zookeeper的連接字符串,hostname:port列表用逗號分隔 |
p | /flume | zookeeper中存儲agent配置文件的根路徑 |
安裝第三方插件
Flume有一個完整的基于插件的架構(gòu)引有。Flume自帶了很多開箱即用的source瓣颅,sink,channel譬正,serializer宫补,并且在發(fā)Flume之外存在很多第三方的實現(xiàn)。
當然你也可以自定義很多組件曾我,只要將它們的jar包放在flume-env.sh
文件中的FLUME_CLASSPATH
的變量中就可以粉怕。Flume現(xiàn)在支持一個特殊的目錄叫plugins.d
,其中按照一定格式打包的插件能夠被自動檢測到抒巢,這樣使得插件的打包管理很方便贫贝,而且也使調(diào)試和問題跟蹤變得簡單了,尤其是依賴包的沖突問題蛉谜。
plugins.d目錄
plugins.d
目錄的位置是$FLUME_HOME/plugins.d
. 在啟動的時候稚晚,flume-ng
的啟動腳本會檢查plugins.d
目錄下符合以下格式的插件,把它們包含到java
啟動時合適的路徑上型诚。
插件的目錄布局
在plugins.d
目錄下面的每一個插件(子目錄)可以有三個子目錄:
- lib - 存放插件的jar包客燕。
- libext - 插件依賴的jar包。
- native - 存放任何的本地依賴包狰贯,如
.so
文件也搓。
下面有兩個存儲在目錄plugins.d
里的插件的例子:
plugins.d/
plugins.d/custom-source-1/
plugins.d/custom-source-1/lib/my-source.jar
plugins.d/custom-source-1/libext/spring-core-2.5.6.jar
plugins.d/custom-source-2/
plugins.d/custom-source-2/lib/custom.jar
plugins.d/custom-source-2/native/gettext.so
數(shù)據(jù)的采集
Flume支持很多種從外部source中采集數(shù)據(jù)的方式。
RPC
在flume的發(fā)布版本中包含了一個Avro的客戶端暮现,它能通過avro RPC方式發(fā)送一個指定的文件到Flume Avro source中:
bin/flume-ng avro-client -H localhost -p 41414 -F /usr/logs/log.10
上面的命令將會發(fā)送文件/usr/logs/log.10
的內(nèi)容到監(jiān)聽在端口上的Flume source还绘。
執(zhí)行命令
有一個exec source,它可以執(zhí)行一個給定的命令栖袋,然后消費這個輸出,輸出是個單行文本抚太,文本以\r
或者\n
或者\r\n
結(jié)尾塘幅。
注:Flume不支持tail命令的source昔案,有一個可以使用exec source執(zhí)行的tail命令,同樣可以流式的輸出文件电媳。
網(wǎng)絡(luò)流
Flume支持以下幾種方式從常見的日志流中讀取數(shù)據(jù):
- Avro
- Thrift
- Syslog
- Netcat
配置多個agent的數(shù)據(jù)流
為了能使數(shù)據(jù)流跨越多個agent或者跳踏揣,前一個agent的sink和當前一跳的source需要同樣是avro類型的,并且sink需要指定source的hostname(或者ip地址)和端口號匾乓。
數(shù)據(jù)流合并
在做日志收集的時候一個常見的場景就是捞稿,大量的生產(chǎn)日志的客戶端發(fā)送數(shù)據(jù)到少量的附屬于存儲子系統(tǒng)的消費者agent。例如拼缝,從數(shù)百個web服務(wù)器中收集日志娱局,它們發(fā)送數(shù)據(jù)到十幾個負責將數(shù)據(jù)寫入HDFS集群的agent。
這個可在Flume中可以實現(xiàn)咧七,需要配置大量第一層的agent衰齐,每一個agent都有一個avro sink,讓它們都指向同一個agent的avro source(強調(diào)一下继阻,在這樣一個場景下你也可以使用thrift source/sink/client)耻涛。在第二層agent上的source將收到的event合并到一個channel中,event被一個sink消費到它的最終的目的地瘟檩。
數(shù)據(jù)流復(fù)用
Flume支持多路輸出event流到一個或多個目的地抹缕。這是靠定義一個多路數(shù)據(jù)流實現(xiàn)的,它可以實現(xiàn)復(fù)制和選擇性路由一個event到一個或者多個channel墨辛。
上面的例子展示了agent foo中source扇出數(shù)據(jù)流到三個不同的channel歉嗓,這個扇出可以是復(fù)制或者多路輸出。在復(fù)制數(shù)據(jù)流的情況下背蟆,每一個event被發(fā)送所有的三個channel鉴分;在多路輸出的情況下,一個event被發(fā)送到一部分可用的channel中带膀,它們是根據(jù)event的屬性和預(yù)先配置的值選擇channel的志珍,例如一個event的
txnType
屬性的值是customer
,這個它應(yīng)該被發(fā)送到channel1和channel3垛叨,如果值是vendor
伦糯,它應(yīng)該被發(fā)送到channel2,如果沒有到達channel2則會被發(fā)送到channel3. 這些映射關(guān)系應(yīng)該被填寫在agent的配置文件中嗽元。
配置
在上面幾部分中都有提到敛纲,F(xiàn)lume agent的配置是從一個文件中讀取的,該文件是一個具有層次結(jié)構(gòu)的java properties文件剂癌。
數(shù)據(jù)流的定義
為了定義單個agent內(nèi)部的數(shù)據(jù)流淤翔,你需要通過channel將source和sink連接起來。你需要列出給定agent的source佩谷,sink和channel旁壮,然后將source和sink指向一個channel监嗜。一個source實例可以指定多個channel,但是一個sink只能指定一個channel抡谐。相關(guān)配置格式如下:
# list the sources, sinks and channels for the agent
<Agent>.sources = <Source>
<Agent>.sinks = <Sink>
<Agent>.channels = <Channel1> <Channel2>
# set channel for source
<Agent>.sources.<Source>.channels = <Channel1> <Channel2> ...
# set channel for sink
<Agent>.sinks.<Sink>.channel = <Channel1>
例如裁奇,一個名為agent_foo
,從一個外部的avro客戶端讀取數(shù)據(jù)麦撵,然后通過一個內(nèi)存channel將數(shù)據(jù)發(fā)送到HDFS系統(tǒng)中刽肠。配置文件weblog.config
看起來是下面這樣的:
# list the sources, sinks and channels for the agent
agent_foo.sources = avro-appserver-src-1
agent_foo.sinks = hdfs-sink-1
agent_foo.channels = mem-channel-1
# set channel for source
agent_foo.sources.avro-appserver-src-1.channels = mem-channel-1
# set channel for sink
agent_foo.sinks.hdfs-sink-1.channel = mem-channel-1
這樣的配置會使event通過 mem-channel-1
從source avro-appserver-src-1
中流向sink hdfs-sink-1
。當一個agent使用weblog.config
作為它的配置文件啟動時免胃,它將實例化那個數(shù)據(jù)流音五。
單個組件的配置
定義過了數(shù)據(jù)流之后,你需要設(shè)置每一個組件(source杜秸、sink放仗、channel)的屬性。在properties文件中為每個組件的type指定值撬碟,都在同一個有層次的命名空間下完成的:
# properties for sources
<Agent>.sources.<Source>.<someProperty> = <someValue>
# properties for channels
<Agent>.channel.<Channel>.<someProperty> = <someValue>
# properties for sinks
<Agent>.sources.<Sink>.<someProperty> = <someValue>
每一個組件都需要設(shè)置type
屬性诞挨,這樣Flume才能理解需要什么類型的對象。每個source呢蛤、sink和channel都有屬于自己的一組屬性惶傻,來實現(xiàn)它們預(yù)期的功能。所有這些屬性都是按需設(shè)置的其障,在前一個例子中银室,我們有一個數(shù)據(jù)流是通過內(nèi)存channel mem-channel-1
從 avro-AppSrv-source
到 hdfs-Cluster1-sink
,下面是一個展示其中每一個組件的配置的例子:
agent_foo.sources = avro-AppSrv-source
agent_foo.sinks = hdfs-Cluster1-sink
agent_foo.channels = mem-channel-1
# set channel for sources, sinks
# properties of avro-AppSrv-source
agent_foo.sources.avro-AppSrv-source.type = avro
agent_foo.sources.avro-AppSrv-source.bind = localhost
agent_foo.sources.avro-AppSrv-source.port = 10000
# properties of mem-channel-1
agent_foo.channels.mem-channel-1.type = memory
agent_foo.channels.mem-channel-1.capacity = 1000
agent_foo.channels.mem-channel-1.transactionCapacity = 100
# properties of hdfs-Cluster1-sink
agent_foo.sinks.hdfs-Cluster1-sink.type = hdfs
agent_foo.sinks.hdfs-Cluster1-sink.hdfs.path = hdfs://namenode/flume/webdata
#...
在一個agent中配置多個數(shù)據(jù)流
在一個單一的Flume agent中可以包含多個獨立的數(shù)據(jù)流励翼,在一個配置文件中你可以列出多個source蜈敢、sink 和 channel。這些組件會被連接起來形成多個數(shù)據(jù)流:
# list the sources, sinks and channels for the agent
<Agent>.sources = <Source1> <Source2>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2>
然后你可以利用相應(yīng)的channel將對應(yīng)的 source 和 sink 連接起來形成兩個不同的數(shù)據(jù)流汽抚。例如抓狭,如果你需要在一個agent中配置兩個數(shù)據(jù)流,一個來自外部的avro客戶端流向外部的HDFS造烁,另一個來自外部的 tail
流向avro sink否过,需要下面這樣的配置:
# list the sources, sinks and channels in the agent
agent_foo.sources = avro-AppSrv-source1 exec-tail-source2
agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2
agent_foo.channels = mem-channel-1 file-channel-2
# flow #1 configuration
agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1
agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1
# flow #2 configuration
agent_foo.sources.exec-tail-source2.channels = file-channel-2
agent_foo.sinks.avro-forward-sink2.channel = file-channel-2
多agent數(shù)據(jù)流的配置
為了配置一個多層數(shù)據(jù)流,你需要將第一跳的 avro/thrift sink指向下一跳的 avro/thrift source惭蟋。這樣第一個Flume agent就會將event傳送到下一個flume agent中苗桂。例如,如果你使用avro 客戶端周期性地傳送一個文件(每個event一個文件)到本地的flume agent中告组,然后這個本地agent將它傳送到下一個agent煤伟,然后把它存儲起來。
Weblog agent 配置信息:
# list sources, sinks and channels in the agent
agent_foo.sources = avro-AppSrv-source
agent_foo.sinks = avro-forward-sink
agent_foo.channels = file-channel
# define the flow
agent_foo.sources.avro-AppSrv-source.channels = file-channel
agent_foo.sinks.avro-forward-sink.channel = file-channel
# avro sink properties
agent_foo.sources.avro-forward-sink.type = avro
agent_foo.sources.avro-forward-sink.hostname = 10.1.1.100
agent_foo.sources.avro-forward-sink.port = 10000
# configure other pieces
#...
HDFS agent 配置信息:
# list sources, sinks and channels in the agent
agent_foo.sources = avro-collection-source
agent_foo.sinks = hdfs-sink
agent_foo.channels = mem-channel
# define the flow
agent_foo.sources.avro-collection-source.channels = mem-channel
agent_foo.sinks.hdfs-sink.channel = mem-channel
# avro sink properties
agent_foo.sources.avro-collection-source.type = avro
agent_foo.sources.avro-collection-source.bind = 10.1.1.100
agent_foo.sources.avro-collection-source.port = 10000
# configure other pieces
#...
這樣我們就將 weblog agent 的 avro-forward-sink 和 hdfs agent 的 avro-collection-source 連接起來了。最后持偏,來自外部 appserver source 的 event 就存儲在了HDFS中驼卖。
扇出數(shù)據(jù)流
在前面部分我們提到氨肌,F(xiàn)lume支持從一個source中扇出數(shù)據(jù)流到多個 channel 中鸿秆,F(xiàn)lume有兩種扇出模式,復(fù)制和多路復(fù)用怎囚。在復(fù)制數(shù)據(jù)流中卿叽,event 被發(fā)送到所有配置的channel中,在多路復(fù)用數(shù)據(jù)流中恳守,event只被發(fā)送到特定的channel(所有channel的一個子集)中考婴。為了扇出數(shù)據(jù)流,需要為一個source指定一個channel列表和扇出策略催烘,扇出策略是通過添加channel 的 Selector
完成的沥阱,selector 的類型可能是復(fù)制也可能是多路復(fù)用,如果是多路復(fù)用伊群,需要進一步配置選擇規(guī)則考杉,如果你不指定一個 Selector
默認就是復(fù)制:
# List the sources, sinks and channels for the agent
<Agent>.sources = <Source1>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2>
# set list of channels for source (separated by space)
<Agent>.sources.<Source1>.channels = <Channel1> <Channel2>
# set channel for sinks
<Agent>.sinks.<Sink1>.channel = <Channel1>
<Agent>.sinks.<Sink2>.channel = <Channel2>
<Agent>.sources.<Source1>.selector.type = replicating
多路選擇器為了使數(shù)據(jù)分流需要進一步的設(shè)置,這需要指定一個event的mapping屬性舰始,使event流向指定的channel崇棠。selector會檢查在event header中配置的每個屬性,如果它匹配到了特定的值丸卷,event會被發(fā)送到那個值映射到的所有channel中枕稀,如果沒有匹配的,event會被發(fā)送到若干默認配置的channel:
# Mapping for multiplexing selector
<Agent>.sources.<Source1>.selector.type = multiplexing
<Agent>.sources.<Source1>.selector.header = <someHeader>
<Agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1>
<Agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2>
<Agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2>
#...
<Agent>.sources.<Source1>.selector.default = <Channel2>
每個值的mapping允許channel重疊谜嫉。
下面有一個單個數(shù)據(jù)流通過多路選擇流向兩個路徑例子萎坷,名叫agent_foo
的agent有一個avro source 兩個 channel 連接著兩個 sink:
# list the sources, sinks and channels in the agent
agent_foo.sources = avro-AppSrv-source1
agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2
agent_foo.channels = mem-channel-1 file-channel-2
# set channels for source
agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1 file-channel-2
# set channel for sinks
agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1
agent_foo.sinks.avro-forward-sink2.channel = file-channel-2
# channel selector configuration
agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
agent_foo.sources.avro-AppSrv-source1.selector.header = State
agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1
通過上面的配置,Selector 會檢查event中一個名叫 State
的 header沐兰,如果它的值是CA
哆档,這樣的event會被發(fā)送到 channel mem-channel-1
中,如果它的值是 AZ
僧鲁,這樣的event會被發(fā)送到 channel file-channel-2
中虐呻,如果它的值是 NY
,這樣的event則會被發(fā)送到兩個channel中寞秃,如果State
header沒有設(shè)值或者沒有匹配到三個中的任何一個斟叼,這樣的event會被發(fā)送到 default
設(shè)置的 mem-channel-1
中。
Selector 還支持可選擇的 channel春寿,為了設(shè)置可選擇的 channel 使用了一個 header朗涩,在下面的參數(shù)中配置參數(shù)optional
被使用:
# channel selector configuration
agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
agent_foo.sources.avro-AppSrv-source1.selector.header = State
agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.optional.CA = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1
Selector 首先嘗試寫入第一個必需的channels,如果這些消費event的channel中有一個失敗绑改,則整個事務(wù)失敗谢床,事務(wù)會再次嘗試所有的這些channel兄一,如果所有的必需的channel都消費到了event,然后selector會嘗試寫入可選的channel中识腿,如果這些可選的channel中的任何一個消費event失敗出革,都會被簡單的忽略,而不會發(fā)起再次嘗試渡讼。
如果在一個指定的header的可選的channel和必需的channel有重疊骂束,這個channel 會被認為是必須的,這個channel的失敗同樣會導(dǎo)致所有必需channel的重試成箫。舉個例子展箱,在上面的配置樣例中,header值等于 CA
的 channnel mem-channel-1
是被認為是必需的 channel蹬昌,盡管它在必需的和可選的中都有標記混驰,一旦寫入這個channel失敗,將會導(dǎo)致失敗的event重試 Selector 中配置的所有的channel皂贩。
注意栖榨,如果一個header 沒有任何的必需的channel,然后這個event會被寫入到默認的channel中先紫,并且嘗試寫入到這個header的可選channel中治泥。如果在沒有指定必需的channel的情況下,設(shè)置可選的channel會導(dǎo)致event被寫入默認的channel 中 遮精。如果沒有channel被指定為默認的居夹,也沒有必需的channel,Selector 會嘗試將event寫入到可選的channel中本冲,在這種情況下准脂,任何的失敗都會被忽略。
Flume Sources
Avro Source
監(jiān)聽在Avro端口上接收來自外部的Avro客戶端的數(shù)據(jù)流檬洞,當它與內(nèi)嵌在另一個Flume agent (上一跳)的Avro Sink 成對出現(xiàn)時狸膏,它可以創(chuàng)建出分層的信息收集拓撲結(jié)構(gòu),下表中加粗的是必需的屬性添怔。
參數(shù)名 | 默認值 | 描述 |
---|---|---|
channels | - | |
type | - | 組件的類型名湾戳,值必須是 avro
|
bind | - | 監(jiān)聽的主機名或者IP地址 |
port | - | bind 的端口號 |
threads | - | 能產(chǎn)生的最大線程數(shù) |
selector.type | ||
selector.* | ||
interceptors | - | 以空格分隔的interceptor列表 |
interceptors.* | ||
compression-type | none | 它的值可以是“none”或者“deflate”,但是必須和AvroSource匹配广料。 |
ssl | false | 設(shè)其值為true時砾脑,啟用SSL加密,同時還必須指定一個 “keystore” 和 “keystore-password”艾杏。 |
keystore | - | 它的值是 Java keystore 的路徑韧衣,SSL必需的。 |
keystore-password | - | 為 Java keystore 設(shè)置密碼,SSL必需的畅铭。 |
keystore-type | JKS | Java keystore 的類型氏淑,它的值可以是 “JKS” 或者 “PKCS12”。 |
exclude-protocols | SSLv3 | 由空格分隔的要排除的SSL/TLS 協(xié)議列表硕噩,除了指定的協(xié)議之外假残,SSLv3總是被排除在外。 |
ipFilter | false | 其值設(shè)置為true時榴徐,啟用netty的ipFiltering守问。 |
ipFilterRules | - | 使用此配置定義N netty ipFilter的模式規(guī)則 |
一個名叫 a1 的 agent 的例子:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
ipFilterRules 的例子:
ipFilterRules 定義的 N netty ipFilters 的規(guī)則必須是逗號隔開的匀归,并且必須是下面的格式:
<’allow’ or deny>:<’ip’ or ‘name’ 機器名>:<pattern>
或者 allow/deny:ip/name:pattern
例子:
ipFilterRules=allow:ip:127.*,allow:name:localhost,deny:ip:*
注坑资,匹配到的第一條規(guī)則適用于下面的例子,在本地客戶端上展示穆端。
配置“allow:name:localhost,deny:ip:
袱贮,這將允許來自本地的客戶端,拒絕來自其它任何ip的客戶端体啰,配置deny:name:localhost,allow:ip:
攒巍,這將拒絕來自本地的客戶端允許其它任何ip的客戶端。
Thrift Source
監(jiān)聽 Thrift 端口荒勇,接收外部 Thrift 客戶端流進來的 event柒莉。當它與內(nèi)嵌有Thrift Sink的另一個(上一跳)Flume agent成對出現(xiàn)時,它能創(chuàng)建分層的數(shù)據(jù)收集拓撲結(jié)構(gòu)沽翔。Thrift Source可以通過配置kerberos 認證實現(xiàn)在安全模式下啟動兢孝,agent-principal 和 agent-keytab 是 Thrift Source 向 kerberos KDC 授權(quán)的兩個屬性。粗體是必須的屬性仅偎。
參數(shù)名 | 默認值 | 描述 |
---|---|---|
channels | - | |
type | - | 組件的類型名跨蟹,值必須是 thrift
|
bind | - | 監(jiān)聽的主機名或者IP地址 |
port | - | bind 的端口號 |
threads | - | 能產(chǎn)生的最大線程數(shù) |
selector.type | ||
selector.* | ||
interceptors | - | 以空格分隔的interceptor列表 |
interceptors.* | ||
ssl | false | 設(shè)其值為true時,啟用SSL加密橘沥,同時還必須指定一個 “keystore” 和 “keystore-password”窗轩。 |
keystore | - | 它的值是 Java keystore 的路徑,SSL必需的座咆。 |
keystore-password | - | 為 Java keystore 設(shè)置密碼痢艺,SSL必需的。 |
keystore-type | JKS | Java keystore 的類型介陶,它的值可以是 “JKS” 或者 “PKCS12”堤舒。 |
exclude-protocols | SSLv3 | 由空格分隔的要排除的SSL/TLS 協(xié)議列表,除了指定的協(xié)議之外斤蔓,SSLv3總是被排除在外植酥。 |
kerberos | false | 值為true時晨另,啟用kerberos認證,在kerberos模式下洞渔,agent-principal 和 agent-keytab是成功認證通過必需的屬性蹬屹,Thrift source在安全模式下,僅接受來自啟用kerberos并且成功通過kerberos KDC認證的thrift客戶端的連接卸留。 |
agent-principal | - | 它被用于 thrift source 向 kerberos KDC 授權(quán)時使用走越。 |
agent-keytab | - | keytab的位置和agent-principal結(jié)合使用。 |
一個名為a1的agent的例子:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = thrift
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
Exec Source
Exec Source 可以運行一個unix命令耻瑟,這個命令是啟動命令或者是一個持續(xù)在標準輸出上生產(chǎn)數(shù)據(jù)的進程(stderr會被簡單的拋棄旨指,除非配置logStdErr
這個屬性的值為true)。如果這個進程不管任何原因退出喳整,這個resource也會退出谆构,并且也不會產(chǎn)生進一步的數(shù)據(jù)。這意味著配置例如cat[namedpipe]
或者tail-F[file]
這樣的命令來產(chǎn)生我們希望的結(jié)果框都,而命令 date
不一樣搬素,前面兩個命令將產(chǎn)生一個數(shù)據(jù)流,而后面的一個命令只會產(chǎn)生一個event而退出魏保。 粗體是必需的參數(shù)熬尺。
參數(shù)名 | 默認值 | 描述 | |
---|---|---|---|
channels | - | ||
type | - | 組件的類型名,值必須是 exec
|
|
command | - | 要執(zhí)行的命令 | |
shell | - | 用于執(zhí)行command的shell外殼谓罗,例如 /bin/sh -c 粱哼,僅在使用的命令依賴某種shell類型時,這個屬性是必需的檩咱,比如通配符揭措、返回標記、管道符等税手。 |
|
restartThrottle | 10000 | 在嘗試重新啟動之前等待的總時長(單位毫秒) | |
restart | false | 如果進程死掉了是否執(zhí)行命令重新啟動 | |
logStdErr | false | 命令的標準錯誤輸出是否應(yīng)該被記錄 | |
batchSize | 20 | 同時讀取和發(fā)送到channel的最大行數(shù) | |
batchTimeout | 3000 | 如果數(shù)據(jù)在被推到下游之前蜂筹,緩沖區(qū)的大小未達到,要等待的時長(單位毫秒) | |
selector.type | replicating | replicating(復(fù)制) 或者 multiplexing(多路) | |
selector.* | |||
interceptors | - | 以空格分隔的interceptor列表 | |
interceptors.* |
警告:ExecSource和其它的異步的source都有的一個問題是芦倒,如果有一個event放入channel失敗了艺挪,source不可能保證客戶端會知道。在這種情況下會出現(xiàn)數(shù)據(jù)丟失兵扬。作為一個實例麻裳,我們最常需要的一個功能是
tail-F[file]
,使用場景是當一個應(yīng)用在硬盤上寫日志文件的時候器钟,F(xiàn)lume tail 那個文件津坑,然后將每一行當作一個event發(fā)送出去。有一個很可能出現(xiàn)的很明顯的問題是傲霸,如果channel被填滿了疆瑰,F(xiàn)lume不能發(fā)送event了會發(fā)生什么事情眉反?Flume沒有辦法知道應(yīng)用是正在寫日志文件需要它保存日志呢,還是由于某種原因event沒有被發(fā)送呢穆役。如果這是沒有意義的寸五,你僅需要知道它:當你使用一個單向的、異步的接口(如ExecSource)時耿币,你的應(yīng)用可能從來都不能保證數(shù)據(jù)已經(jīng)被發(fā)送出去了梳杏!作為這個警告的延伸,你要完全清楚一件事淹接,使用這個source十性,event的發(fā)送完全是零保證的。為了有更強的可靠性保證塑悼,可以考慮使用 Spooling Directory Source 或者使用flume的SDK直接集成到應(yīng)用中去劲适。
and
注意:你可以模仿Flume 0.9x(flume og)的 TailSource 來使用 ExecSource。它只能使用unix命令
tail-F/full/path/to/your/file
拢肆。在這種情況下减响,參數(shù)-F
比-f
更好,因為它支持文件的輪替郭怪。
一個名為a1的agent的例子:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1
shell
參數(shù)被用于配置執(zhí)行 command
的一個shell(如Bash 或 Powershell), command
的值會被作為一個參數(shù)傳給 shell
執(zhí)行刊橘,這就允許 command
使用一些該 shell 的特性鄙才,例如通配符、返回標記促绵、管道攒庵、循環(huán)、條件等败晴。如果缺少 shell
的配置浓冒,command
將會被直接調(diào)用。shell
常見的值包括/bin/sh -c
, /bin/ksh -c
, cmd /c
, powershell -Command
等尖坤。
a1.sources.tailsource-1.type = exec
a1.sources.tailsource-1.shell = /bin/bash -c
a1.sources.tailsource-1.command = for i in /path/*.txt; do cat $i; done
JMS Source
JMS Source 從 JMS 的目的地(如隊列稳懒、主題)中讀取消息。作為一個 JMS 應(yīng)用應(yīng)該能夠和任何的 JMS 提供者一起工作慢味,但是目前僅測試過 ActiveMQ 场梆。JMS Source 可配置 batch size
, message selector
, 用戶名/密碼, 和從 message 到 flume event 的轉(zhuǎn)碼器。注意供應(yīng)商提供的 JMS jar 包應(yīng)該被包含在Flume的classpath中纯路,首選放在plugins.d文件夾下或油,或者在命令行中使用 –classpath
參數(shù),再或者通過在文件 flume-env.sh
中定義的 FLUME_CLASSPATH
設(shè)置驰唬。
粗體是必需的參數(shù)顶岸。
每天持續(xù)更新中...