file->flume->kafka
Flume組件選型
1)Source
(1)Taildir Source相比Exec Source从隆、Spooling Directory Source的優(yōu)勢
TailDir Source:斷點(diǎn)續(xù)傳踪区、多目錄刺彩。Flume1.6以前需要自己自定義Source記錄每次讀取文件位置血久,實(shí)現(xiàn)斷點(diǎn)續(xù)傳屹培。
Exec Source可以實(shí)時搜集數(shù)據(jù),但是在Flume不運(yùn)行或者Shell命令出錯的情況下沟绪,數(shù)據(jù)將會丟失。
Spooling Directory Source監(jiān)控目錄空猜,支持?jǐn)帱c(diǎn)續(xù)傳绽慈。
(2)batchSize大小如何設(shè)置?
Event 1K左右時辈毯,500-1000合適(默認(rèn)為100)
2)Channel
采用Kafka Channel坝疼,省去了Sink,提高了效率谆沃。KafkaChannel數(shù)據(jù)存儲在Kafka里面钝凶,所以數(shù)據(jù)是存儲在磁盤中。
配置文件實(shí)例
#為各組件命名
a1.sources = r1
a1.channels = c1
#描述source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.ETLInterceptor$Builder
#描述channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false
#綁定source和channel以及sink和channel的關(guān)系
a1.sources.r1.channels = c1
kafka->flume->hdfs
Flume組件選型
1)FileChannel和MemoryChannel區(qū)別
MemoryChannel傳輸數(shù)據(jù)速度更快唁影,但因?yàn)閿?shù)據(jù)保存在JVM的堆內(nèi)存中耕陷,Agent進(jìn)程掛掉會導(dǎo)致數(shù)據(jù)丟失,適用于對數(shù)據(jù)質(zhì)量要求不高的需求据沈。
FileChannel傳輸速度相對于Memory慢哟沫,但數(shù)據(jù)安全保障高,Agent進(jìn)程掛掉也可以從失敗中恢復(fù)數(shù)據(jù)锌介。
2)FileChannel優(yōu)化
- 通過配置dataDirs指向多個路徑嗜诀,每個路徑對應(yīng)不同的硬盤猾警,增大Flume吞吐量。
- Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance
- checkpointDir和backupCheckpointDir也盡量配置在不同硬盤對應(yīng)的目錄中隆敢,保證checkpoint損壞后发皿,可以快速使用backupCheckpointDir恢復(fù)數(shù)據(jù)
3)Sink:HDFS Sink
(1)HDFS存入大量小文件,有什么影響拂蝎?
元數(shù)據(jù)層面:每個小文件都有一份元數(shù)據(jù)穴墅,其中包括文件路徑,文件名匣屡,所有者封救,所屬組,權(quán)限捣作,創(chuàng)建時間等誉结,這些信息都保存在Namenode內(nèi)存中。所以小文件過多券躁,會占用Namenode服務(wù)器大量內(nèi)存惩坑,影響Namenode性能和使用壽命
計算層面:默認(rèn)情況下MR會對每個小文件啟用一個Map任務(wù)計算,非常影響計算性能也拜。同時也影響磁盤尋址時間以舒。
(2)HDFS小文件處理
官方默認(rèn)的這三個參數(shù)配置寫入HDFS后會產(chǎn)生小文件,hdfs.rollInterval慢哈、hdfs.rollSize蔓钟、hdfs.rollCount
基于以上hdfs.rollInterval=3600,hdfs.rollSize=134217728卵贱,hdfs.rollCount =0幾個參數(shù)綜合作用滥沫,效果如下:
(1)文件在達(dá)到128M時會滾動生成新文件
(2)文件創(chuàng)建超3600秒時會滾動生成新文件
配置文件實(shí)例
## 組件
a1.sources=r1
a1.channels=c1
a1.sinks=k1
## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics=topic_log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.TimeStampInterceptor$Builder
## channel1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6
## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log-
a1.sinks.k1.hdfs.round = false
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
## 控制輸出文件是原生文件。
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = lzop
## 拼裝
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
內(nèi)存優(yōu)化
1)問題描述:如果啟動消費(fèi)Flume拋出如下異常:
ERROR hdfs.HDFSEventSink: process failed
java.lang.OutOfMemoryError: GC overhead limit exceeded
2)解決方案
vim flume/conf/flume-env.sh
export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"
# JVM heap一般設(shè)置為4G或更高
# -Xmx與-Xms最好設(shè)置一致键俱,減少內(nèi)存抖動帶來的性能影響兰绣,如果設(shè)置不一致容易導(dǎo)致頻繁fullgc。
# -Xms表示JVM Heap(堆內(nèi)存)最小尺寸编振,初始分配缀辩;-Xmx 表示JVM Heap(堆內(nèi)存)最大允許的尺寸,按需分配踪央。如果不設(shè)置一致臀玄,容易在初始化時,由于內(nèi)存不夠杯瞻,頻繁觸發(fā)fullgc镐牺。