Flume-1.8.0(三)Source支持的類型

avro 類型的 source

監(jiān)聽 Avro 端口來接收外部 avro 客戶端的事件流荤堪。和 netcat 不同的是朵夏,avro-source 接收到的是經(jīng)過 avro 序列化后的數(shù)據(jù)范嘱,然后反序列化數(shù)據(jù)繼續(xù)傳輸脯厨。所以若贮,如果是 avro-source 的話奈应,源數(shù)據(jù)必須經(jīng)過 avro 序列化后的數(shù)據(jù)定躏。而 netcat 接收的是字符串格式账磺。

利用avro source可以實(shí)現(xiàn)多級(jí)流動(dòng)、扇出流痊远、扇入流等效果垮抗,另外,也可以接收通過flume提供的avro客戶端發(fā)送的日志信息碧聪。

avro source配置說明

在 /opt/software/flume-1.8.0/conf 下創(chuàng)建 source-avro.conf

# a1 代表一個(gè)flume 給每個(gè)組件匿名
a1.sources=r1
a1.channels=c1
a1.sinks=s1

# 指定source 的數(shù)據(jù)來源以及堆外開放的端口
a1.sources.r1.type=avro
a1.sources.r1.bind=node113
a1.sources.r1.port=8888

# 指定a1的channels基于內(nèi)存
a1.channels.c1.type=memory
# 指定a1的sinks 輸出到控制臺(tái)
a1.sinks.s1.type=logger

# 綁定a1 sources和channle 的關(guān)系
a1.sources.r1.channels=c1
# 綁定a1 sinks 和 channel 的關(guān)系
a1.sinks.s1.channel=c1

啟動(dòng)

flume-ng agent -n a1 -c /opt/software/flume-1.8.0/conf -f /opt/software/flume-1.8.0/conf/source-avro.conf -Dflume.root.logger=INFO,console

測試

在 node103 的 flume 執(zhí)行命令冒版,把配置文件發(fā)過去

./flume-ng avro-client -H node113 -p 8888 -F /opt/software/flume-1.8.0/conf/source-avro.conf -c /opt/software/flume-1.8.0/conf/

node113 接收會(huì)打印

exec 類型的 source

可以將命令產(chǎn)生的輸出做為源

exec 配置

在 /opt/software/flume-1.8.0/conf 下創(chuàng)建 source-exec.conf
將type改成exec,并添加 command 命令逞姿,會(huì)執(zhí)行命令做為 source 的數(shù)據(jù)源辞嗡。

# a1 代表一個(gè)flume 給每個(gè)組件匿名
a1.sources=r1
a1.channels=c1
a1.sinks=s1

# 
a1.sources.r1.type=exec
a1.sources.r1.command=ping node103

# 指定a1的channels基于內(nèi)存
a1.channels.c1.type=memory
# 指定a1的sinks 輸出到控制臺(tái)
a1.sinks.s1.type=logger

# 綁定a1 sources和channle 的關(guān)系
a1.sources.r1.channels=c1
# 綁定a1 sinks 和 channel 的關(guān)系
a1.sinks.s1.channel=c1

spooling directory 類型的 source

將指定得文件加入到"自動(dòng)搜集"目錄中。flume會(huì)持續(xù)監(jiān)聽這個(gè)目錄滞造,把文件當(dāng)作source來處理续室。注意:一旦文件被放到 “自動(dòng)收集” 目錄中,便不能修改谒养,如果修改猎贴,flume 會(huì)報(bào)錯(cuò)。此外蝴光,他不能有重名的文件她渴,否則也會(huì)報(bào)錯(cuò)。

當(dāng)一個(gè)文件被 flume 讀了以后蔑祟,會(huì)在末尾 添加 .COMPLETED 標(biāo)識(shí)

spooling directory 配置 source

在 /opt/software/flume-1.8.0/conf 下創(chuàng)建 source-spooldir.conf

# a1 代表一個(gè)flume 給每個(gè)組件匿名
a1.sources=r1
a1.channels=c1
a1.sinks=s1

# 
a1.sources.r1.type=spooldir
# 目錄需要提前建立
a1.sources.r1.spoolDir=/home/data

# 指定a1的channels基于內(nèi)存
a1.channels.c1.type=memory
# 指定a1的sinks 輸出到控制臺(tái)
a1.sinks.s1.type=logger

# 綁定a1 sources和channle 的關(guān)系
a1.sources.r1.channels=c1
# 綁定a1 sinks 和 channel 的關(guān)系
a1.sinks.s1.channel=c1

sequence generator source(序列發(fā)生源) 類型的 source

一個(gè)簡單的序列發(fā)生器趁耗,不斷的產(chǎn)生事件,值是從0開始每次遞增1.主要用來測試疆虚。測試消費(fèi)能力苛败。

spooling directory 配置 source

在 /opt/software/flume-1.8.0/conf 下創(chuàng)建 source-seq.conf

# a1 代表一個(gè)flume 給每個(gè)組件匿名
a1.sources=r1
a1.channels=c1
a1.sinks=s1


a1.sources.r1.type=seq

# 指定a1的channels基于內(nèi)存
a1.channels.c1.type=memory
# 指定a1的sinks 輸出到控制臺(tái)
a1.sinks.s1.type=logger

# 綁定a1 sources和channle 的關(guān)系
a1.sources.r1.channels=c1
# 綁定a1 sinks 和 channel 的關(guān)系
a1.sinks.s1.channel=c1

http 類型的 source

此 source 接受 htpp 的 get 和 post 請(qǐng)求做為f lume 的事件。其中 get 方式應(yīng)該只用于試驗(yàn)径簿。

如果想讓flume正確解析http協(xié)議信息罢屈,比如解析出請(qǐng)求頭、請(qǐng)求體等信息篇亭,需要提供一個(gè)可插拔的 “處理器” 來將請(qǐng)求轉(zhuǎn)換為事件對(duì)象缠捌,這個(gè)處理器必須實(shí)現(xiàn) HTTPSourceHandler 接口。

這個(gè)處理器接受一個(gè) HttpServletRequest 對(duì)象译蒂,并返回一個(gè) Flume Event 對(duì)象集合曼月。

Flume 提供了一些常用的 Handler(處理器)谊却。

  • JSONHandler
    可以處理JSON格式的數(shù)據(jù),并支持UTF-8 UTF-16 UTF-32 字符集哑芹,該 handler 接受 Event 數(shù)組炎辨,并根據(jù)請(qǐng)求頭中的編碼將其轉(zhuǎn)換位 Flume Event,如果沒有指定的編碼聪姿,默認(rèn)編碼為 UTF-8.

spooling directory 配置 source

在 /opt/software/flume-1.8.0/conf 下創(chuàng)建 source-http.conf

# a1 代表一個(gè)flume 給每個(gè)組件匿名
a1.sources=r1
a1.channels=c1
a1.sinks=s1


a1.sources.r1.type=http
a1.sources.r1.bind=node113
a1.sources.r1.port=8888

# 指定a1的channels基于內(nèi)存
a1.channels.c1.type=memory
# 指定a1的sinks 輸出到控制臺(tái)
a1.sinks.s1.type=logger

# 綁定a1 sources和channle 的關(guān)系
a1.sources.r1.channels=c1
# 綁定a1 sinks 和 channel 的關(guān)系
a1.sinks.s1.channel=c1

啟動(dòng)測試

啟動(dòng)

flume-ng agent -n a1 -c /opt/software/flume-1.8.0/conf -f /opt/software/flume-1.8.0/conf/source-http.conf -Dflume.root.logger=INFO,console

測試碴萧,從node103發(fā)送數(shù)據(jù)

curl -X POST -d '[{"headers":{"text":"hello wold"},"body":"hello hello"}]' http://node113:8888

node113接收數(shù)據(jù)

2021-05-17 17:37:23,102 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{text=hello wold} body: 68 65 6C 6C 6F 20 68 65 6C 6C 6F                hello hello }

Kafka 類型

flume-kafka-source 是flume內(nèi)置的kafka source數(shù)據(jù)組件,是為了拉取kafka數(shù)據(jù)末购。flume-kafka-source 的offset是交由zk集群去維護(hù)offset破喻。

flume 屬于單線程拉取數(shù)據(jù)并將數(shù)據(jù)發(fā)送內(nèi)置channel并通過sink組件進(jìn)行數(shù)據(jù)轉(zhuǎn)發(fā)和處理,故對(duì)于kafka集群多副本方式拉取數(shù)據(jù)的時(shí)候招盲,應(yīng)適當(dāng)考慮多個(gè)flume節(jié)點(diǎn)拉取kafka多副本數(shù)據(jù)低缩,以避免flume節(jié)點(diǎn)在多個(gè)kafka集群副本中輪詢。加大flume拉取kafka數(shù)據(jù)的速率曹货。

屬性 默認(rèn)值 描述
channels 配置的channels 可配置多個(gè)channels 后續(xù)文章會(huì)說到
type org.apache.flume.source.kafka.KafkaSource
kafka.bootstrap.servers 配置kafka集群地址
kafka.consumer.group.id flume 唯一確定的消費(fèi)者群體咆繁。 在多個(gè)源或代理中設(shè)置相同的ID表示它們是同一個(gè)使用者組的一部分
kafka.topics 你需要消費(fèi)的topic
kafka.topics.regex 正則表達(dá)式,用于定義源訂閱的主題集顶籽。 此屬性的優(yōu)先級(jí)高于kafka.topics玩般,如果存在則覆蓋kafka.topics
batchSize 1000 一批中寫入Channel的最大消息數(shù) (優(yōu)化項(xiàng))
batchDurationMillis 1000 將批次寫入通道之前的最長時(shí)間(以毫秒為單位)只要達(dá)到第一個(gè)大小和時(shí)間礼饱,就會(huì)寫入批次坏为。(優(yōu)化項(xiàng))
backoffSleepIncrement 1000 Kafka主題顯示為空時(shí)觸發(fā)的初始和增量等待時(shí)間。 等待時(shí)間將減少對(duì)空kafka主題的激進(jìn)ping操作镊绪。 一秒鐘是攝取用例的理想選擇匀伏,但使用攔截器的低延遲操作可能需要較低的值。
maxBackoffSleep 5000 Kafka主題顯示為空時(shí)觸發(fā)的最長等待時(shí)間蝴韭。 5秒是攝取用例的理想選擇够颠,但使用攔截器的低延遲操作可能需要較低的值。
useFlumeEventFormat false 默認(rèn)情況下榄鉴,事件從Kafka主題直接作為字節(jié)直接進(jìn)入事件主體履磨。 設(shè)置為true以將事件讀取為Flume Avro二進(jìn)制格式。 與KafkaSink上的相同屬性或Kafka Channel上的parseAsFlumeEvent屬性一起使用時(shí)庆尘,這將保留在生成端發(fā)送的任何Flume標(biāo)頭剃诅。
setTopicHeader true 設(shè)置為true時(shí),將檢索到的消息的主題存儲(chǔ)到標(biāo)題中驶忌,該標(biāo)題由topicHeader屬性定義矛辕。
topicHeader topic 如果setTopicHeader屬性設(shè)置為true,則定義用于存儲(chǔ)接收消息主題名稱的標(biāo)題的名稱。 如果與Kafka SinktopicHeader屬性結(jié)合使用如筛,應(yīng)該小心堡牡,以避免在循環(huán)中將消息發(fā)送回同一主題抒抬。
migrateZookeeperOffsets true 如果找不到Kafka存儲(chǔ)的偏移量杨刨,請(qǐng)?jiān)赯ookeeper中查找偏移量并將它們提交給Kafka。 這應(yīng)該是支持從舊版本的Flume無縫Kafka客戶端遷移擦剑。 遷移后妖胀,可以將其設(shè)置為false,但通常不需要這樣做惠勒。 如果未找到Zookeeper偏移量赚抡,則Kafka配置kafka.consumer.auto.offset.reset定義如何處理偏移量。 查看[Kafka文檔](http://kafka.apache.org/documentation.html#newconsumerconfigs)了解詳細(xì)信息
kafka.consumer.security.protocol PLAINTEXT 如果使用某種級(jí)別的安全性寫入Kafka纠屋,則設(shè)置為SASL_PLAINTEXT涂臣,SASL_SSL或SSL。
Other Kafka Consumer Properties 這些屬性用于配置Kafka Consumer售担。 可以使用Kafka支持的任何消費(fèi)者財(cái)產(chǎn)赁遗。 唯一的要求是在前綴為“kafka.consumer”的前綴中添加屬性名稱。 例如:kafka.consumer.auto.offset.reset

Kafka source 覆蓋了兩個(gè) Kafka 消費(fèi)者參數(shù):source 將 auto.commit.enable 設(shè)置為“false”族铆,以批次進(jìn)行提交岩四。Kafka source 保證至少一次消息檢索策略。source 啟動(dòng)時(shí)可能會(huì)出現(xiàn)重復(fù)項(xiàng)哥攘。Kafka Source 還為key.deserializer(org.apache.kafka.common.serialization.StringSerializer) 和 value.deserializer(org.apache.kafka.common.serialization.ByteArraySerializer) 提供了默認(rèn)值剖煌。不建議修改這些參數(shù)。

#1 代表一個(gè)flume 給每個(gè)組件匿名
a1.sources=r1
a1.channels=c1
a1.sinks=s1

# 設(shè)置kafka
a1.sources.r1.type=org.apache.flume.source.kafka.KafkaSource
# 一批寫入 Channel 的最大消息數(shù)
a1.sources.r1.batchSize=5000
# 將批次寫入通道之前的最長時(shí)間(以毫秒為單位)只要達(dá)到第一個(gè)大小和時(shí)間逝淹,就會(huì)寫入批次耕姊。(優(yōu)化項(xiàng))
a1.sources.r1.batchDurationMillis=2000
a1.sources.r1.kafka.bootstrap.servers=192.168.81.101:9092
a1.sources.r1.kafka.topics=flink_yx_produce,flink_yc_produce
a1.sources.r1.kafka.consumer.group.id=flume_consume_1


# 指定a1的channels基于內(nèi)存
a1.channels.c1.type=memory
# 指定a1的sinks 輸出到控制臺(tái)
a1.sinks.s1.type=logger

# 綁定a1 sources和channle 的關(guān)系
a1.sources.r1.channels=c1
# 綁定a1 sinks 和 channel 的關(guān)系
a1.sinks.s1.channel=c1

agent.sources.r1.batchSize = 5000; agent.sources.r1.batchDurationMillis = 2000,即每2秒鐘拉取 kafka 一批數(shù)據(jù)栅葡,批數(shù)據(jù)大小為5000放入到 flume-channels 中 茉兰。這兩個(gè)值總和考慮以下兩項(xiàng):

  • 需要配置kafka單條數(shù)據(jù) broker.conf 中配置 message.max.bytes
  • 當(dāng)前flume channel sink 組件最大消費(fèi)能力如何

文檔地址 https://github.com/apache/flume/blob/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst

Taildir Source

在日志收集服務(wù)器的某個(gè)目錄下,會(huì)按照一段時(shí)間生成一個(gè)日志文件妥畏,并且日志會(huì)不斷的追加到這個(gè)文件中邦邦,比如,每小時(shí)一個(gè)命名規(guī)則為log_20151015_10.log的日志文件醉蚁,所有10點(diǎn)產(chǎn)生的日志都會(huì)追加到這個(gè)文件中燃辖,到了11點(diǎn),就會(huì)生成另一個(gè)log_20151015_11.log的文件网棍。

這種場景如果通過flume(1.6)收集黔龟,當(dāng)前提供的Spooling Directory Source和Exec Source均不能滿足動(dòng)態(tài)實(shí)時(shí)收集的需求,在當(dāng)前正在開發(fā)的flume1.7版本中,提供了一個(gè)非常好用的TaildirSource氏身,使用這個(gè)source巍棱,可以監(jiān)控一個(gè)目錄,并且使用正則表達(dá)式匹配該目錄中的文件名進(jìn)行實(shí)時(shí)收集蛋欣。

Taildir Source可實(shí)時(shí)監(jiān)控一批文件航徙,并記錄每個(gè)文件最新消費(fèi)位置,agent進(jìn)程重啟后不會(huì)有重復(fù)消費(fèi)的問題陷虎。

# source的名字
agent.sources = s1
# channels的名字
agent.channels = c1
# sink的名字
agent.sinks = r1

# 指定source使用的channel
agent.sources.s1.channels = c1
# 指定sink使用的channel
agent.sinks.r1.channel = c1

######## source相關(guān)配置 ########
# source類型
agent.sources.s1.type = TAILDIR
# 元數(shù)據(jù)位置
agent.sources.s1.positionFile = /Users/wangpei/tempData/flume/taildir_position.json
# 監(jiān)控的目錄
agent.sources.s1.filegroups = f1
agent.sources.s1.filegroups.f1=/Users/wangpei/tempData/flume/data/.*log
agent.sources.s1.fileHeader = true

######## channel相關(guān)配置 ########
# channel類型
agent.channels.c1.type = file
# 數(shù)據(jù)存放路徑
agent.channels.c1.dataDirs = /Users/wangpei/tempData/flume/filechannle/dataDirs
# 檢查點(diǎn)路徑
agent.channels.c1.checkpointDir = /Users/wangpei/tempData/flume/filechannle/checkpointDir
# channel中最多緩存多少
agent.channels.c1.capacity = 1000
# channel一次最多吐給sink多少
agent.channels.c1.transactionCapacity = 100

######## sink相關(guān)配置 ########
# sink類型
agent.sinks.r1.type = org.apache.flume.sink.kafka.KafkaSink
# brokers地址
agent.sinks.r1.kafka.bootstrap.servers = localhost:9092
# topic
agent.sinks.r1.kafka.topic = testTopic3
# 壓縮
agent.sinks.r1.kafka.producer.compression.type = snappy
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末到踏,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子尚猿,更是在濱河造成了極大的恐慌窝稿,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,591評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件凿掂,死亡現(xiàn)場離奇詭異伴榔,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)庄萎,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,448評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門踪少,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人惨恭,你說我怎么就攤上這事秉馏。” “怎么了脱羡?”我有些...
    開封第一講書人閱讀 162,823評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵萝究,是天一觀的道長。 經(jīng)常有香客問我锉罐,道長帆竹,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,204評(píng)論 1 292
  • 正文 為了忘掉前任脓规,我火速辦了婚禮栽连,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘侨舆。我一直安慰自己秒紧,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,228評(píng)論 6 388
  • 文/花漫 我一把揭開白布挨下。 她就那樣靜靜地躺著熔恢,像睡著了一般。 火紅的嫁衣襯著肌膚如雪臭笆。 梳的紋絲不亂的頭發(fā)上叙淌,一...
    開封第一講書人閱讀 51,190評(píng)論 1 299
  • 那天秤掌,我揣著相機(jī)與錄音,去河邊找鬼鹰霍。 笑死闻鉴,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的茂洒。 我是一名探鬼主播孟岛,決...
    沈念sama閱讀 40,078評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼获黔!你這毒婦竟也來了蚀苛?” 一聲冷哼從身側(cè)響起在验,我...
    開封第一講書人閱讀 38,923評(píng)論 0 274
  • 序言:老撾萬榮一對(duì)情侶失蹤玷氏,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后腋舌,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體盏触,經(jīng)...
    沈念sama閱讀 45,334評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,550評(píng)論 2 333
  • 正文 我和宋清朗相戀三年块饺,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了赞辩。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,727評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡授艰,死狀恐怖辨嗽,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情淮腾,我是刑警寧澤糟需,帶...
    沈念sama閱讀 35,428評(píng)論 5 343
  • 正文 年R本政府宣布,位于F島的核電站谷朝,受9級(jí)特大地震影響洲押,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜圆凰,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,022評(píng)論 3 326
  • 文/蒙蒙 一杈帐、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧专钉,春花似錦挑童、人聲如沸髓考。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,672評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽峰弹。三九已至回怜,卻和暖如春大年,著一層夾襖步出監(jiān)牢的瞬間换薄,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,826評(píng)論 1 269
  • 我被黑心中介騙來泰國打工翔试, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留轻要,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,734評(píng)論 2 368
  • 正文 我出身青樓垦缅,卻偏偏與公主長得像冲泥,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子壁涎,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,619評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容