avro 類型的 source
監(jiān)聽 Avro 端口來接收外部 avro 客戶端的事件流荤堪。和 netcat 不同的是朵夏,avro-source 接收到的是經(jīng)過 avro 序列化后的數(shù)據(jù)范嘱,然后反序列化數(shù)據(jù)繼續(xù)傳輸脯厨。所以若贮,如果是 avro-source 的話奈应,源數(shù)據(jù)必須經(jīng)過 avro 序列化后的數(shù)據(jù)定躏。而 netcat 接收的是字符串格式账磺。
利用avro source可以實(shí)現(xiàn)多級(jí)流動(dòng)、扇出流痊远、扇入流等效果垮抗,另外,也可以接收通過flume提供的avro客戶端發(fā)送的日志信息碧聪。
avro source配置說明
在 /opt/software/flume-1.8.0/conf 下創(chuàng)建 source-avro.conf
# a1 代表一個(gè)flume 給每個(gè)組件匿名
a1.sources=r1
a1.channels=c1
a1.sinks=s1
# 指定source 的數(shù)據(jù)來源以及堆外開放的端口
a1.sources.r1.type=avro
a1.sources.r1.bind=node113
a1.sources.r1.port=8888
# 指定a1的channels基于內(nèi)存
a1.channels.c1.type=memory
# 指定a1的sinks 輸出到控制臺(tái)
a1.sinks.s1.type=logger
# 綁定a1 sources和channle 的關(guān)系
a1.sources.r1.channels=c1
# 綁定a1 sinks 和 channel 的關(guān)系
a1.sinks.s1.channel=c1
啟動(dòng)
flume-ng agent -n a1 -c /opt/software/flume-1.8.0/conf -f /opt/software/flume-1.8.0/conf/source-avro.conf -Dflume.root.logger=INFO,console
測試
在 node103 的 flume 執(zhí)行命令冒版,把配置文件發(fā)過去
./flume-ng avro-client -H node113 -p 8888 -F /opt/software/flume-1.8.0/conf/source-avro.conf -c /opt/software/flume-1.8.0/conf/
node113 接收會(huì)打印
exec 類型的 source
可以將命令產(chǎn)生的輸出做為源
exec 配置
在 /opt/software/flume-1.8.0/conf 下創(chuàng)建 source-exec.conf
將type改成exec,并添加 command 命令逞姿,會(huì)執(zhí)行命令做為 source 的數(shù)據(jù)源辞嗡。
# a1 代表一個(gè)flume 給每個(gè)組件匿名
a1.sources=r1
a1.channels=c1
a1.sinks=s1
#
a1.sources.r1.type=exec
a1.sources.r1.command=ping node103
# 指定a1的channels基于內(nèi)存
a1.channels.c1.type=memory
# 指定a1的sinks 輸出到控制臺(tái)
a1.sinks.s1.type=logger
# 綁定a1 sources和channle 的關(guān)系
a1.sources.r1.channels=c1
# 綁定a1 sinks 和 channel 的關(guān)系
a1.sinks.s1.channel=c1
spooling directory 類型的 source
將指定得文件加入到"自動(dòng)搜集"目錄中。flume會(huì)持續(xù)監(jiān)聽這個(gè)目錄滞造,把文件當(dāng)作source來處理续室。注意:一旦文件被放到 “自動(dòng)收集” 目錄中,便不能修改谒养,如果修改猎贴,flume 會(huì)報(bào)錯(cuò)。此外蝴光,他不能有重名的文件她渴,否則也會(huì)報(bào)錯(cuò)。
當(dāng)一個(gè)文件被 flume 讀了以后蔑祟,會(huì)在末尾 添加 .COMPLETED 標(biāo)識(shí)
spooling directory 配置 source
在 /opt/software/flume-1.8.0/conf 下創(chuàng)建 source-spooldir.conf
# a1 代表一個(gè)flume 給每個(gè)組件匿名
a1.sources=r1
a1.channels=c1
a1.sinks=s1
#
a1.sources.r1.type=spooldir
# 目錄需要提前建立
a1.sources.r1.spoolDir=/home/data
# 指定a1的channels基于內(nèi)存
a1.channels.c1.type=memory
# 指定a1的sinks 輸出到控制臺(tái)
a1.sinks.s1.type=logger
# 綁定a1 sources和channle 的關(guān)系
a1.sources.r1.channels=c1
# 綁定a1 sinks 和 channel 的關(guān)系
a1.sinks.s1.channel=c1
sequence generator source(序列發(fā)生源) 類型的 source
一個(gè)簡單的序列發(fā)生器趁耗,不斷的產(chǎn)生事件,值是從0開始每次遞增1.主要用來測試疆虚。測試消費(fèi)能力苛败。
spooling directory 配置 source
在 /opt/software/flume-1.8.0/conf 下創(chuàng)建 source-seq.conf
# a1 代表一個(gè)flume 給每個(gè)組件匿名
a1.sources=r1
a1.channels=c1
a1.sinks=s1
a1.sources.r1.type=seq
# 指定a1的channels基于內(nèi)存
a1.channels.c1.type=memory
# 指定a1的sinks 輸出到控制臺(tái)
a1.sinks.s1.type=logger
# 綁定a1 sources和channle 的關(guān)系
a1.sources.r1.channels=c1
# 綁定a1 sinks 和 channel 的關(guān)系
a1.sinks.s1.channel=c1
http 類型的 source
此 source 接受 htpp 的 get 和 post 請(qǐng)求做為f lume 的事件。其中 get 方式應(yīng)該只用于試驗(yàn)径簿。
如果想讓flume正確解析http協(xié)議信息罢屈,比如解析出請(qǐng)求頭、請(qǐng)求體等信息篇亭,需要提供一個(gè)可插拔的 “處理器” 來將請(qǐng)求轉(zhuǎn)換為事件對(duì)象缠捌,這個(gè)處理器必須實(shí)現(xiàn) HTTPSourceHandler 接口。
這個(gè)處理器接受一個(gè) HttpServletRequest 對(duì)象译蒂,并返回一個(gè) Flume Event 對(duì)象集合曼月。
Flume 提供了一些常用的 Handler(處理器)谊却。
-
JSONHandler
可以處理JSON格式的數(shù)據(jù),并支持UTF-8 UTF-16 UTF-32 字符集哑芹,該 handler 接受 Event 數(shù)組炎辨,并根據(jù)請(qǐng)求頭中的編碼將其轉(zhuǎn)換位 Flume Event,如果沒有指定的編碼聪姿,默認(rèn)編碼為 UTF-8.
spooling directory 配置 source
在 /opt/software/flume-1.8.0/conf 下創(chuàng)建 source-http.conf
# a1 代表一個(gè)flume 給每個(gè)組件匿名
a1.sources=r1
a1.channels=c1
a1.sinks=s1
a1.sources.r1.type=http
a1.sources.r1.bind=node113
a1.sources.r1.port=8888
# 指定a1的channels基于內(nèi)存
a1.channels.c1.type=memory
# 指定a1的sinks 輸出到控制臺(tái)
a1.sinks.s1.type=logger
# 綁定a1 sources和channle 的關(guān)系
a1.sources.r1.channels=c1
# 綁定a1 sinks 和 channel 的關(guān)系
a1.sinks.s1.channel=c1
啟動(dòng)測試
啟動(dòng)
flume-ng agent -n a1 -c /opt/software/flume-1.8.0/conf -f /opt/software/flume-1.8.0/conf/source-http.conf -Dflume.root.logger=INFO,console
測試碴萧,從node103發(fā)送數(shù)據(jù)
curl -X POST -d '[{"headers":{"text":"hello wold"},"body":"hello hello"}]' http://node113:8888
node113接收數(shù)據(jù)
2021-05-17 17:37:23,102 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{text=hello wold} body: 68 65 6C 6C 6F 20 68 65 6C 6C 6F hello hello }
Kafka 類型
flume-kafka-source 是flume內(nèi)置的kafka source數(shù)據(jù)組件,是為了拉取kafka數(shù)據(jù)末购。flume-kafka-source 的offset是交由zk集群去維護(hù)offset破喻。
flume 屬于單線程拉取數(shù)據(jù)并將數(shù)據(jù)發(fā)送內(nèi)置channel并通過sink組件進(jìn)行數(shù)據(jù)轉(zhuǎn)發(fā)和處理,故對(duì)于kafka集群多副本方式拉取數(shù)據(jù)的時(shí)候招盲,應(yīng)適當(dāng)考慮多個(gè)flume節(jié)點(diǎn)拉取kafka多副本數(shù)據(jù)低缩,以避免flume節(jié)點(diǎn)在多個(gè)kafka集群副本中輪詢。加大flume拉取kafka數(shù)據(jù)的速率曹货。
屬性 | 默認(rèn)值 | 描述 |
---|---|---|
channels | – | 配置的channels 可配置多個(gè)channels 后續(xù)文章會(huì)說到 |
type | – | org.apache.flume.source.kafka.KafkaSource |
kafka.bootstrap.servers | – | 配置kafka集群地址 |
kafka.consumer.group.id | flume | 唯一確定的消費(fèi)者群體咆繁。 在多個(gè)源或代理中設(shè)置相同的ID表示它們是同一個(gè)使用者組的一部分 |
kafka.topics | – | 你需要消費(fèi)的topic |
kafka.topics.regex | – | 正則表達(dá)式,用于定義源訂閱的主題集顶籽。 此屬性的優(yōu)先級(jí)高于kafka.topics 玩般,如果存在則覆蓋kafka.topics 。 |
batchSize | 1000 | 一批中寫入Channel的最大消息數(shù) (優(yōu)化項(xiàng)) |
batchDurationMillis | 1000 | 將批次寫入通道之前的最長時(shí)間(以毫秒為單位)只要達(dá)到第一個(gè)大小和時(shí)間礼饱,就會(huì)寫入批次坏为。(優(yōu)化項(xiàng)) |
backoffSleepIncrement | 1000 | Kafka主題顯示為空時(shí)觸發(fā)的初始和增量等待時(shí)間。 等待時(shí)間將減少對(duì)空kafka 主題的激進(jìn)ping操作镊绪。 一秒鐘是攝取用例的理想選擇匀伏,但使用攔截器的低延遲操作可能需要較低的值。 |
maxBackoffSleep | 5000 | Kafka主題顯示為空時(shí)觸發(fā)的最長等待時(shí)間蝴韭。 5秒是攝取用例的理想選擇够颠,但使用攔截器的低延遲操作可能需要較低的值。 |
useFlumeEventFormat | false | 默認(rèn)情況下榄鉴,事件從Kafka主題直接作為字節(jié)直接進(jìn)入事件主體履磨。 設(shè)置為true以將事件讀取為Flume Avro二進(jìn)制格式。 與KafkaSink上的相同屬性或Kafka Channel上的parseAsFlumeEvent屬性一起使用時(shí)庆尘,這將保留在生成端發(fā)送的任何Flume標(biāo)頭剃诅。 |
setTopicHeader | true | 設(shè)置為true時(shí),將檢索到的消息的主題存儲(chǔ)到標(biāo)題中驶忌,該標(biāo)題由topicHeader 屬性定義矛辕。 |
topicHeader | topic | 如果setTopicHeader 屬性設(shè)置為true ,則定義用于存儲(chǔ)接收消息主題名稱的標(biāo)題的名稱。 如果與Kafka SinktopicHeader 屬性結(jié)合使用如筛,應(yīng)該小心堡牡,以避免在循環(huán)中將消息發(fā)送回同一主題抒抬。 |
migrateZookeeperOffsets | true | 如果找不到Kafka存儲(chǔ)的偏移量杨刨,請(qǐng)?jiān)赯ookeeper中查找偏移量并將它們提交給Kafka。 這應(yīng)該是支持從舊版本的Flume無縫Kafka客戶端遷移擦剑。 遷移后妖胀,可以將其設(shè)置為false,但通常不需要這樣做惠勒。 如果未找到Zookeeper偏移量赚抡,則Kafka配置kafka.consumer.auto.offset.reset定義如何處理偏移量。 查看[Kafka文檔](http://kafka.apache.org/documentation.html#newconsumerconfigs)了解詳細(xì)信息 |
kafka.consumer.security.protocol | PLAINTEXT | 如果使用某種級(jí)別的安全性寫入Kafka纠屋,則設(shè)置為SASL_PLAINTEXT涂臣,SASL_SSL或SSL。 |
Other Kafka Consumer Properties | – | 這些屬性用于配置Kafka Consumer售担。 可以使用Kafka支持的任何消費(fèi)者財(cái)產(chǎn)赁遗。 唯一的要求是在前綴為“kafka.consumer”的前綴中添加屬性名稱。 例如:kafka.consumer.auto.offset.reset
|
Kafka source 覆蓋了兩個(gè) Kafka 消費(fèi)者參數(shù):source 將 auto.commit.enable 設(shè)置為“false”族铆,以批次進(jìn)行提交岩四。Kafka source 保證至少一次消息檢索策略。source 啟動(dòng)時(shí)可能會(huì)出現(xiàn)重復(fù)項(xiàng)哥攘。Kafka Source 還為key.deserializer(org.apache.kafka.common.serialization.StringSerializer) 和 value.deserializer(org.apache.kafka.common.serialization.ByteArraySerializer) 提供了默認(rèn)值剖煌。不建議修改這些參數(shù)。
#1 代表一個(gè)flume 給每個(gè)組件匿名
a1.sources=r1
a1.channels=c1
a1.sinks=s1
# 設(shè)置kafka
a1.sources.r1.type=org.apache.flume.source.kafka.KafkaSource
# 一批寫入 Channel 的最大消息數(shù)
a1.sources.r1.batchSize=5000
# 將批次寫入通道之前的最長時(shí)間(以毫秒為單位)只要達(dá)到第一個(gè)大小和時(shí)間逝淹,就會(huì)寫入批次耕姊。(優(yōu)化項(xiàng))
a1.sources.r1.batchDurationMillis=2000
a1.sources.r1.kafka.bootstrap.servers=192.168.81.101:9092
a1.sources.r1.kafka.topics=flink_yx_produce,flink_yc_produce
a1.sources.r1.kafka.consumer.group.id=flume_consume_1
# 指定a1的channels基于內(nèi)存
a1.channels.c1.type=memory
# 指定a1的sinks 輸出到控制臺(tái)
a1.sinks.s1.type=logger
# 綁定a1 sources和channle 的關(guān)系
a1.sources.r1.channels=c1
# 綁定a1 sinks 和 channel 的關(guān)系
a1.sinks.s1.channel=c1
agent.sources.r1.batchSize = 5000; agent.sources.r1.batchDurationMillis = 2000,即每2秒鐘拉取 kafka 一批數(shù)據(jù)栅葡,批數(shù)據(jù)大小為5000放入到 flume-channels 中 茉兰。這兩個(gè)值總和考慮以下兩項(xiàng):
- 需要配置kafka單條數(shù)據(jù) broker.conf 中配置 message.max.bytes
- 當(dāng)前flume channel sink 組件最大消費(fèi)能力如何
文檔地址 https://github.com/apache/flume/blob/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst
Taildir Source
在日志收集服務(wù)器的某個(gè)目錄下,會(huì)按照一段時(shí)間生成一個(gè)日志文件妥畏,并且日志會(huì)不斷的追加到這個(gè)文件中邦邦,比如,每小時(shí)一個(gè)命名規(guī)則為log_20151015_10.log的日志文件醉蚁,所有10點(diǎn)產(chǎn)生的日志都會(huì)追加到這個(gè)文件中燃辖,到了11點(diǎn),就會(huì)生成另一個(gè)log_20151015_11.log的文件网棍。
這種場景如果通過flume(1.6)收集黔龟,當(dāng)前提供的Spooling Directory Source和Exec Source均不能滿足動(dòng)態(tài)實(shí)時(shí)收集的需求,在當(dāng)前正在開發(fā)的flume1.7版本中,提供了一個(gè)非常好用的TaildirSource氏身,使用這個(gè)source巍棱,可以監(jiān)控一個(gè)目錄,并且使用正則表達(dá)式匹配該目錄中的文件名進(jìn)行實(shí)時(shí)收集蛋欣。
Taildir Source可實(shí)時(shí)監(jiān)控一批文件航徙,并記錄每個(gè)文件最新消費(fèi)位置,agent進(jìn)程重啟后不會(huì)有重復(fù)消費(fèi)的問題陷虎。
# source的名字
agent.sources = s1
# channels的名字
agent.channels = c1
# sink的名字
agent.sinks = r1
# 指定source使用的channel
agent.sources.s1.channels = c1
# 指定sink使用的channel
agent.sinks.r1.channel = c1
######## source相關(guān)配置 ########
# source類型
agent.sources.s1.type = TAILDIR
# 元數(shù)據(jù)位置
agent.sources.s1.positionFile = /Users/wangpei/tempData/flume/taildir_position.json
# 監(jiān)控的目錄
agent.sources.s1.filegroups = f1
agent.sources.s1.filegroups.f1=/Users/wangpei/tempData/flume/data/.*log
agent.sources.s1.fileHeader = true
######## channel相關(guān)配置 ########
# channel類型
agent.channels.c1.type = file
# 數(shù)據(jù)存放路徑
agent.channels.c1.dataDirs = /Users/wangpei/tempData/flume/filechannle/dataDirs
# 檢查點(diǎn)路徑
agent.channels.c1.checkpointDir = /Users/wangpei/tempData/flume/filechannle/checkpointDir
# channel中最多緩存多少
agent.channels.c1.capacity = 1000
# channel一次最多吐給sink多少
agent.channels.c1.transactionCapacity = 100
######## sink相關(guān)配置 ########
# sink類型
agent.sinks.r1.type = org.apache.flume.sink.kafka.KafkaSink
# brokers地址
agent.sinks.r1.kafka.bootstrap.servers = localhost:9092
# topic
agent.sinks.r1.kafka.topic = testTopic3
# 壓縮
agent.sinks.r1.kafka.producer.compression.type = snappy