Flume的三要素(Source/Channel/Sink)问欠、攔截器肝匆、選擇器、Sink組

三要素(Source/Channel/Sink)

  • Source:負(fù)責(zé)接收數(shù)據(jù)到flume的組件

    • 1.Netcat:基于TCP端口的數(shù)據(jù)源接收器
      # 配置Agent中的三要素
      a1.sources = r1
      a1.sinks = k1
      a1.channels = c1
      
      # 配置Source部分
      a1.sources.r1.type = netcat
      a1.sources.r1.bind = 192.0.0.2
      a1.sources.r1.port = 44444
      
      # 配置Sink部分
      a1.sinks.k1.type = logger
      
      # 配置Channel部分
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 1000
      a1.channels.c1.transactionCapacity = 100
      
      # 綁定相關(guān)組件
      a1.sources.r1.channels = c1
      a1.sinks.k1.channel = c1
      
      • logger4j配置文件log4j.properties(控制臺(tái)輸出+文件輸出)
      # 設(shè)置Logger的日志級(jí)別為INFO顺献,同時(shí)增加兩個(gè)日志輸出項(xiàng)叫A1,A2.
      log4j.rootLogger=INFO, A1, A2
      
      # A1這個(gè)設(shè)置項(xiàng)被配置為控制臺(tái)輸出 ConsoleAppender.
      log4j.appender.A1=org.apache.log4j.ConsoleAppender
      
      # A1 輸出項(xiàng)的輸出格式.
      log4j.appender.A1.layout=org.apache.log4j.PatternLayout
      log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
      
      # A2 這個(gè)配置項(xiàng)的設(shè)置旗国,文件輸出
      log4j.appender.A2=org.apache.log4j.FileAppender
      
      # 設(shè)置日志的文件名
      log4j.appender.A2.File=./logs/log.out
      
      # 定義輸出的日志格式
      log4j.appender.A2.layout=org.apache.log4j.PatternLayout
      log4j.appender.A2.layout.conversionPattern=%m%n
      
      • 啟動(dòng)程序
        flume-ng agent --name a1 --conf-file /root/trainging/flume-1.9.0/conf/example1.conf --conf /root/trainging/flume-1.9.0/conf/

      • 發(fā)送數(shù)據(jù)
        telnet 192.0.0.2 44444

    • 2.Exec:基于命令行標(biāo)準(zhǔn)輸出來(lái)產(chǎn)生數(shù)據(jù)的數(shù)據(jù)源接收器
      # 配置Agent中的三要素
      a1.sources = r1
      a1.sinks = k1
      a1.channels = c1
      
      # 配置Source部分
      a1.sources.r1.type = exec
      a1.sources.r1.command = tail -F /root/data/flume/access.log
      
      # 配置Sink部分
      a1.sinks.k1.type = logger
      
      # 配置Channel部分
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 1000
      a1.channels.c1.transactionCapacity = 100
      
      # 綁定相關(guān)組件
      a1.sources.r1.channels = c1
      a1.sinks.k1.channel = c1
      
      • logger4j配置文件log4j.properties(控制臺(tái)輸出+文件輸出):配置同上

      • 啟動(dòng)程序
        flume-ng agent --name a1 --conf-file /root/trainging/flume-1.9.0/conf/example2.conf --conf /root/trainging/flume-1.9.0/conf/

      • 發(fā)送數(shù)據(jù)
        echo hello >> /root/data/flume/access.log

    • 3.avro:高擴(kuò)展的RPC數(shù)據(jù)源(最常用)
      # 配置Agent中的三要素
      a1.sources = r1
      a1.sinks = k1
      a1.channels = c1
      
      # 配置Source部分
      a1.sources.r1.type = avro
      a1.sources.r1.bind = 192.0.0.2
      a1.sources.r1.port = 44444
      
      # 配置Sink部分
      a1.sinks.k1.type = logger
      
      # 配置Channel部分
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 1000
      a1.channels.c1.transactionCapacity = 100
      
      # 綁定相關(guān)組件
      a1.sources.r1.channels = c1
      a1.sinks.k1.channel = c1
      
      • logger4j配置文件log4j.properties(控制臺(tái)輸出+文件輸出):配置同上

      • 啟動(dòng)程序
        flume-ng agent --name a1 --conf-file /root/trainging/flume-1.9.0/conf/example3.conf --conf /root/trainging/flume-1.9.0/conf/

      • 發(fā)送端Java代碼

      package pkg01;
      
      import java.nio.charset.Charset;
      import org.apache.flume.Event;
      import org.apache.flume.EventDeliveryException;
      import org.apache.flume.api.RpcClient;
      import org.apache.flume.api.RpcClientFactory;
      import org.apache.flume.event.EventBuilder;
      
      public class class1 {
      
          public static void main(String[] args) throws EventDeliveryException {
      
              String ip = "192.0.0.2";
              int port = 44444;
      
              RpcClient client = RpcClientFactory.getDefaultInstance(ip, port); // Avro
      //      RpcClient client = RpcClientFactory.getThriftInstance(ip, port); // Thrift
              Event event = EventBuilder.withBody("hello flume!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!",
                      Charset.forName("UTF8"));
              client.append(event);
              client.close();
          }
      }
      
  • Channel:位于Source和Sink之間的緩沖塊,允許Source和Sink運(yùn)行在不同速率上

    • 1.MemoryChannel:建立在內(nèi)存中的通道注整,數(shù)據(jù)存儲(chǔ)在JVM的堆上
      • 允許數(shù)據(jù)少量丟失可使用內(nèi)存通道能曾,不允許數(shù)據(jù)丟失可使用文件通道
      • 內(nèi)存通道支持事務(wù)特性,如下所示:


        image.png
      # 配置Agent中的三要素
      a1.sources = r1
      a1.sinks = k1
      a1.channels = c1
      
      # 配置Source部分
      a1.sources.r1.type = netcat
      a1.sources.r1.bind = 192.0.0.2
      a1.sources.r1.port = 44444
      
      # 配置Sink部分
      a1.sinks.k1.type = logger
      
      # 配置Channel部分
        # 設(shè)置內(nèi)存通道
      a1.channels.c1.type = memory
        # 可以存最大10萬(wàn)個(gè)事件event
      a1.channels.c1.capacity = 100000
        # 每個(gè)事務(wù)可以存取最大100個(gè)事件event
      a1.channels.c1.transactionCapacity = 100
        # 內(nèi)存通道大小為500MB
      a1.channels.c1.byteCapacity = 500000000
        # 其中10%存放頭文件肿轨,500MB*10%
      a1.channels.c1.byteCapacityBufferPercentage = 10
      
      # 綁定相關(guān)組件
      a1.sources.r1.channels = c1
      a1.sinks.k1.channel = c1
      
  • Sink:從channel中獲取數(shù)據(jù)并推送給下一級(jí)Flume Agent或者存儲(chǔ)數(shù)據(jù)到指定位置

    • Sink組概念:


      image.png
    • Sink優(yōu)化:


      image.png
    • Sink事務(wù)特性:


      image.png
    • 一般配置:
      # sink類型
      a1.sinks.k1.type = hdfs
      # hdfs目錄
      a1.sinks.k1.hdfs.path = /user/hduser/logs/data_%Y-%m-%d
      # 文件前綴
      a1.sinks.k1.hdfs.filePrefix = retail
      # 文件后綴
      a1.sinks.k1.hdfs.fileSuffix = .txt
      # 60秒或128兆或100個(gè)事件寿冕,則關(guān)閉文件
      a1.sinks.k1.hdfs.rollInterval = 60
      a1.sinks.k1.hdfs.rollSize  = 128000000
      a1.sinks.k1.hdfs.rollCount = 100
      # 30秒沒(méi)數(shù)據(jù)寫(xiě)入則關(guān)閉文件
      a1.sinks.k1.hdfs.idleTimeout= 30
      # 最多打開(kāi)100個(gè)文件
      a1.sinks.k1.hdfs.maxOpenFiles= 30
      # 文件不壓縮
      # a1.sinks.k1.hdfs.fileType = DataStream
      # 文件壓縮,snappy方式壓縮
      a1.sinks.k1.hdfs.fileType = CompressedStream
      a1.sinks.k1.hdfs.codeC = snappy
      # 使用本地時(shí)間戳
      a1.sinks.k1.hdfs.useLocalTimeStamp = true
      # 每10分鐘寫(xiě)到一個(gè)bucket(目錄)
      a1.sinks.k1.hdfs.round = true
      a1.sinks.k1.hdfs.roundUnit = minute
      a1.sinks.k1.hdfs.roundValue= 10
      
  • 攔截器:工作在Source和Channel之間萝招,在Source接收到數(shù)據(jù)后蚂斤,攔截器基于自定義的規(guī)則刪除或轉(zhuǎn)換相關(guān)事件存捺,如果一個(gè)鏈路上存在多個(gè)攔截器槐沼,將按順序依次執(zhí)行

    • 主機(jī)攔截器:Agent將主機(jī)IP或主機(jī)名添加到事件的報(bào)文頭中,事件報(bào)文頭使用hostHeader配置捌治,默認(rèn)是host
      a1.sources.r1.interceptors = i1
      a1.sources.r1.interceptors.i1.type = host
      # 是否覆蓋原有的值
      a1.sources.r1.interceptors.i1.preserveExisting = true
      # 默認(rèn)是使用IP岗钩,這里不使用IP
      a1.sources.r1.interceptors.i1.useIP = false
      # 使用主機(jī)名
      a1.sources.r1.interceptors.i1.hostHeader = hostname
      
      image.png

      image.png
    • 時(shí)間攔截器(常用):事件報(bào)文頭帶有timestamp鍵,可以方便HDFS Sink進(jìn)行分桶
      a1.sources.r1.interceptors = i1
      a1.sources.r1.interceptors.i1.type = timestamp
      
      image.png
    • 靜態(tài)攔截器:簡(jiǎn)單的將固定報(bào)文的KV對(duì)插入到報(bào)文事件中
      a1.sources.r1.interceptors = i1
      a1.sources.r1.interceptors.i1.type = static
      a1.sources.r1.interceptors.i1.key = Zone
      a1.sources.r1.interceptors.i1.value = NEW_YORK
      
      image.png
    • UUID攔截器:通過(guò)攔截器給每個(gè)事件添加唯一標(biāo)識(shí)符UUID
      a1.sources.r1.interceptors = i1
      a1.sources.r1.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
      
      image.png
      • 正則過(guò)濾攔截器:選擇性保留數(shù)據(jù)
        a1.sources.r1.interceptors = i1 i2
        
        a1.sources.r1.interceptors.i1.type = regex_filter
        a1.sources.r1.interceptors.i1.regex = .*info*
        # 保留此類事件
        a1.sources.r1.interceptors.i1.excludeEvents = false 
        
        a1.sources.r1.interceptors.i2.type = regex_filter
        a1.sources.r1.interceptors.i2.regex = .*data3*
        # 排除此類事件
        a1.sources.r1.interceptors.i2.excludeEvents = true
        
        image.png
      • 自定義攔截器:自定義的攔截器肖油,java實(shí)現(xiàn)
        Map<String, String> headers = new HashMap<String, String>(); 
        headers.put("ClientServer", "Client01srv");
        
        List<Event> events = new ArrayList<Event>();
        events.add(EventBuilder.withBody("info ", Charset.forName("UTF8"), headers));
        
        RpcClient client = RpcClientFactory.getDefaultInstance(ip, port); // Avro
        client.appendBatch(events);
        client.close();
        
        image.png
  • 通道選擇器:Channel處理器選擇哪些事件進(jìn)入哪個(gè)Channel

    • 復(fù)制Channel選擇器:復(fù)制Source過(guò)來(lái)的數(shù)據(jù)到每個(gè)Channel中
      # 配置Agent中的三要素
      a1.sources = r1
      a1.sinks = k1 k2
      a1.channels = c1 c2
      
      # 配置Source部分
      a1.sources.r1.type = avro
      a1.sources.r1.bind = 10.0.1.213
      a1.sources.r1.port = 44444
      
      # 配置Sink部分
      a1.sinks.k1.type = logger
      a1.sinks.k2.type = logger
      
      # 配置Channel部分
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 100
      
      a1.channels.c2.type = memory
      a1.channels.c2.capacity = 100
      
      # 綁定相關(guān)組件
      a1.sources.r1.channels = c1 c2
      a1.sinks.k1.channel = c1
      a1.sinks.k2.channel = c2
      
    • 多路復(fù)用Channel選擇器:根據(jù)事件頭數(shù)據(jù)中key對(duì)應(yīng)的value來(lái)選擇此事件進(jìn)入哪個(gè)Channel
      # 配置Agent中的三要素
      a1.sources = r1
      a1.sinks = k1 k2
      a1.channels = c1 c2
      
      # 配置Source部分
      a1.sources.r1.type = avro
      a1.sources.r1.bind = 10.0.1.213
      a1.sources.r1.port = 44444
      a1.sources.r1.selector.type = multiplexing
      a1.sources.r1.selector.header = op
      # op為1兼吓,事件進(jìn)入 c1 Channel
      a1.sources.r1.selector.mapping.1 = c1
      # op為2,事件進(jìn)入 c2 Channel
      a1.sources.r1.selector.mapping.2 = c2
      # op為其他值森枪,事件進(jìn)入 c2 Channel
      a1.sources.r1.selector.default = c2
      
      # 配置Sink部分
      a1.sinks.k1.type = logger
      a1.sinks.k2.type = logger
      
      # 配置Channel部分
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 100
      
      a1.channels.c2.type = memory
      a1.channels.c2.capacity = 100
      
      # 綁定相關(guān)組件
      a1.sources.r1.channels = c1 c2
      a1.sinks.k1.channel = c1
      a1.sinks.k2.channel = c2
      
  • Sink組:多個(gè)Sink組成一個(gè)Sink組视搏,可用于負(fù)載均衡和故障轉(zhuǎn)移

    • 負(fù)載均衡:多條鏈路都發(fā)送數(shù)據(jù),不保證數(shù)據(jù)順序
      image.png
      # 配置Sink部分
      a1.sinkgroups = g1
      a1.sinkgroups.g1.sinks = k1 k2
      a1.sinkgroups.g1.processor.type = load_balance
      a1.sinkgroups.g1.processor.backoff = true
      #a1.sinkgroups.g1.processor.selector = random
      a1.sinkgroups.g1.processor.selector = round_robin
      a1.sinks.k1.type = logger
      a1.sinks.k2.type = logger
      
    • 故障轉(zhuǎn)移:一個(gè)鏈路壞了县袱,其他鏈路會(huì)按照優(yōu)先級(jí)起來(lái)頂替工作浑娜,保證數(shù)據(jù)順序
      image.png
      # 配置Sink部分
      a1.sinkgroups = g1
      a1.sinkgroups.g1.sinks = k1 k2
      a1.sinkgroups.g1.processor.type = failover
      a1.sinkgroups.g1.processor.priority.k1 = 5
      a1.sinkgroups.g1.processor.priority.k2 = 10
      a1.sinkgroups.g1.processor.maxpenalty = 10000
      #a1.sinks.k1.type = logger
      a1.sinks.k2.type = logger
      a1.sinks.k1.type = avro
      a1.sinks.k1.hostname = localhost
      a1.sinks.k1.port = 44444
      
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市式散,隨后出現(xiàn)的幾起案子筋遭,更是在濱河造成了極大的恐慌,老刑警劉巖暴拄,帶你破解...
    沈念sama閱讀 218,607評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件漓滔,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡乖篷,警方通過(guò)查閱死者的電腦和手機(jī)响驴,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,239評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)撕蔼,“玉大人豁鲤,你說(shuō)我怎么就攤上這事石蔗。” “怎么了畅形?”我有些...
    開(kāi)封第一講書(shū)人閱讀 164,960評(píng)論 0 355
  • 文/不壞的土叔 我叫張陵养距,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我日熬,道長(zhǎng)棍厌,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,750評(píng)論 1 294
  • 正文 為了忘掉前任竖席,我火速辦了婚禮耘纱,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘毕荐。我一直安慰自己束析,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,764評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布憎亚。 她就那樣靜靜地躺著员寇,像睡著了一般。 火紅的嫁衣襯著肌膚如雪第美。 梳的紋絲不亂的頭發(fā)上蝶锋,一...
    開(kāi)封第一講書(shū)人閱讀 51,604評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音什往,去河邊找鬼扳缕。 笑死,一個(gè)胖子當(dāng)著我的面吹牛别威,可吹牛的內(nèi)容都是我干的躯舔。 我是一名探鬼主播,決...
    沈念sama閱讀 40,347評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼省古,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼粥庄!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起衫樊,我...
    開(kāi)封第一講書(shū)人閱讀 39,253評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤飒赃,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后科侈,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體载佳,經(jīng)...
    沈念sama閱讀 45,702評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,893評(píng)論 3 336
  • 正文 我和宋清朗相戀三年臀栈,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了蔫慧。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,015評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡权薯,死狀恐怖姑躲,靈堂內(nèi)的尸體忽然破棺而出睡扬,到底是詐尸還是另有隱情,我是刑警寧澤黍析,帶...
    沈念sama閱讀 35,734評(píng)論 5 346
  • 正文 年R本政府宣布卖怜,位于F島的核電站,受9級(jí)特大地震影響阐枣,放射性物質(zhì)發(fā)生泄漏马靠。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,352評(píng)論 3 330
  • 文/蒙蒙 一蔼两、第九天 我趴在偏房一處隱蔽的房頂上張望甩鳄。 院中可真熱鬧,春花似錦额划、人聲如沸妙啃。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,934評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)揖赴。三九已至,卻和暖如春品抽,著一層夾襖步出監(jiān)牢的瞬間储笑,已是汗流浹背甜熔。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,052評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工圆恤, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人腔稀。 一個(gè)月前我還...
    沈念sama閱讀 48,216評(píng)論 3 371
  • 正文 我出身青樓盆昙,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親焊虏。 傳聞我的和親對(duì)象是個(gè)殘疾皇子淡喜,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,969評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容