Flume插件開發(fā)遠(yuǎn)程調(diào)試

因工作需求女揭,研究了下flume插件開發(fā)肋杖,調(diào)通了開發(fā)斷點環(huán)境。
公司使用了CDH5.10.1勘纯, flume對應(yīng)的版本是1.6局服,文檔和源碼請參見下面鏈接。
http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.6.0-cdh5.10.1/index.html
直接給出CDH版本flume 1.6版本的源碼下載鏈接(這里浪費我很多時間驳遵,花了一段時間才找到)
https://git-wip-us.apache.org/repos/asf?p=flume.git;a=commit;h=refs/tags/release-1.6.0

image.png


第一次編譯錯誤

[ERROR] Failed to execute goal on project flume-hdfs-sink: Could not resolve dependencies for project org.apache.flume.flume-ng-sinks:flume-hdfs-sink:jar:1.5.0: Failure to find org.apache.hadoop:hadoop-test:jar:2.4.0 in http://repo1.maven.org/maven2 was cached in the local repository, resolution will not be reattempted until the update interval of repo1.maven.org has elapsed or updates are forced -> [Help 1] 

在flume根目錄下的pom.xml中<repositorys>添加

    <repository>
      <id>p2.jfrog.org</id>
      <url>http://p2.jfrog.org/libs-releases</url>
      <releases>
        <enabled>true</enabled>
      </releases>
      <snapshots>
        <enabled>false</enabled>
      </snapshots>
    </repository>

執(zhí)行mvn clean install后淫奔,flume-ng-dist模塊生成打包文件。

flume-ng-dist\target\apache-flume-1.6.0-bin\apache-flume-1.6.0-bin\conf

[只講解windows系統(tǒng)的方法]
準(zhǔn)備flume配置文件
在flume-ng-dist\target\apache-flume-1.6.0-bin\apache-flume-1.6.0-bin\conf文件夾新建文件test-header.conf

配置信息test-header.conf

a1.channels = c1
a1.sources = r1
a1.sinks = k1
#channel
a1.channels.c1.type = memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=50000
#source
a1.sources.r1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = E:\\flumefile
a1.sources.r1.batchSize= 100
a1.sources.r1.inputCharset = UTF-8
a1.sources.r1.fileHeader = true
a1.sources.r1.fileHeaderKey = file_path 
a1.sources.r1.basenameHeader = true
a1.sources.r1.basenameHeaderKey = file_name
## source 攔截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_extractor
#a1.sources.r1.interceptors.i1.regex = cookieid is (.*) and ip is (.*)
a1.sources.r1.interceptors.i1.regex = cookieid is (.*) and ip is (.*)
a1.sources.r1.interceptors.i1.serializers = s1 s2
a1.sources.r1.interceptors.i1.serializers.s1.name = cookieid
a1.sources.r1.interceptors.i1.serializers.s2.name = ip
#sink
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1

flume啟動方式一:

請準(zhǔn)備windows下flume啟動bat:
在flume-ng-dist\target\apache-flume-1.6.0-bin\apache-flume-1.6.0-bin\bin文件夾新建文件flume-ng_windows.bat

set CONF_FILE=test-header.conf
::set agent name 
set AGENT_NAME=a1

setlocal
for %%i in ("%~dp0..") do set "folder=%%~fi"
set FLUME_HOME="%folder%"

::Dflume.monitoring.type=ganglia
::-Xdebug -Xrunjdwp:transport=dt_socket,address=8000,server=y,suspend=y
%JAVA_HOME%\bin\java.exe -Xms512m -Xmx512m -Dlog4j.configuration=file:///E:\git\flume-4652849\flume-ng-dist\target\apache-flume-1.6.0-bin1\apache-flume-1.6.0-bin\conf\log4j.properties -Dflume.root.logger=INFO,console -cp %FLUME_HOME%\lib\*;%FLUME_HOME%\plugins.d\KolazzInterceptor\lib\* org.apache.flume.node.Application -f %FLUME_HOME%\conf\%CONF_FILE% -n %AGENT_NAME%

pause

flume啟動方式二(推薦):

----------命令啟動堤结,開始唆迁,在這個flume版本 powershell啟動方式有些問題(已解決問題,解決方案見內(nèi)容)竞穷,推薦--------
flume 1.6中存在問題媒惕,如圖


image.png

需要先將 apache-flume-1.6.0-bin\bin\flume-ng.ps1 文件中322行如圖所示部分替換掉。


image.png

替換為

    $pluginTmp1 = (@(Get-ChildItem "$plugin\*\lib") -join "\*"";""")
    if( "$pluginTmp1" -ne "" ) {
      $javaClassPath="$javaClassPath;""" + $pluginTmp1 + "\*"";"
    }

    $pluginTmp2 = (@(Get-ChildItem "$plugin\*\libext") -join "\*"";""")
    if( "$pluginTmp2" -ne "" ) {
      $javaClassPath="$javaClassPath;""" + $pluginTmp2 + "\*"";"
    }

然后從conf\flume-env.ps1.template復(fù)制一個新文件命名為flume-env.ps1 来庭,文件修改$JAVA_OPTS="-Xmx100m -Xdebug -Xrunjdwp:transport=dt_socket,address=8000,server=y,suspend=y"
在bin目錄中執(zhí)行命令:
flume-ng.cmd agent --conf ../conf --conf-file ../conf/test-header.conf --name a1 -property
flume.root.logger=INFO,console
----------命令啟動,結(jié)束--------

開啟flume遠(yuǎn)程調(diào)試

請確認(rèn)flume-ng_windows.bat文件穿挨,遠(yuǎn)程調(diào)試端口8000月弛,

 -Xdebug -Xrunjdwp:transport=dt_socket,address=8000,server=y,suspend=y

添加配置


image.png
image.png

端口修改為8000


image.png

執(zhí)行flume-ng_windows.bat啟動flume


image.png

此時命令控制臺將暫停在這里,不用擔(dān)心科盛,請用剛才在IDEA中啟動配置啟動調(diào)試flume項目(先命令啟動flume帽衙,再啟動調(diào)試idea項目),

當(dāng)前講解用例通過插件來演示贞绵。
請先在flume源碼中找到RegexExtractorInterceptor.java文件厉萝,并在如圖所示中打上斷點。


image.png

由于source是spooldir方式,請將一個內(nèi)容為cookieid is c_1 and ip is 127.0.0.1的文件丟入E:\flumefile文件夾


image.png

此時進入斷點:


image.png
image.png

接下來谴垫,請盡情開發(fā)flume插件吧章母。

demo 插件

source日志數(shù)據(jù)demo:

2018-07-11 13:00:07.122|LOG_INFO |dev14|recv_term|13922222222|{"port":"30011","pro":"808","ver":"2013","buss":"anxin","thread":"140543769519872","connid":"15131","len",220}|7E 02 00 00 CF 01 39 22 22 22 22 87 10 08 00 08 20 00 0C 00 01 02 11 49 C1 06 C6 BB CC 00 00 00 00 00 00 18 07 11 13 00 05 01 04 00 00 00 14 02 02 00 00 03 02 00 00 04 02 00 00 25 04 80 00 00 00 2A 02 00 00 2B 04 00 7A 00 B7 30 01 05 31 01 00 20 02 FF FF 21 02 FF FF 22 02 FF FF 23 02 FF FF 24 04 00 00 00 00 26 04 FF FF FF FF 28 01 00 3A 01 00 3B 04 FF FF FF FF 3F 04 FF FF FF FF 40 04 FF FF FF FF 42 02 00 EF 43 02 FF FF 44 02 FF FF 45 02 FF FF 46 02 FF FF 47 02 FF FF 48 02 FF FF 38 04 00 00 0F 12 37 01 00 E1 30 01 01 FF 02 01 05 03 01 00 04 04 00 00 0F 12 05 04 FF FF FF FF 06 04 FF FF FF FF 08 01 02 09 02 FF FF 0A 02 FF FF 0B 02 FF FF 0C 01 51 0D 01 FF 7B 7E
/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.kolazz.flume.interceptor;

import com.google.common.base.Charsets;
import org.apache.commons.lang.time.DateUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class KolazzInterceptor implements Interceptor {

    private static final Logger logger = LoggerFactory.getLogger(KolazzInterceptor.class);
    private String headerName;
    private String formatter;
    private String dest_formatter;
    private final Pattern regex;

    /**
     * Only {@link KolazzInterceptor.Builder} can build me
     */
    protected KolazzInterceptor(Context context) {
        headerName = context.getString(Constants.HEADER_NAME, Constants.HEADER_NAME_DFLT);
        formatter = context.getString(Constants.FORMATTER, Constants.FORMATTER_DFLT);
        dest_formatter = context.getString(Constants.DEST_FORMATTER, Constants.DEST_FORMATTER_DFLT);
        regex = Pattern.compile("(.*)\\|(.*)\\|(.*)\\|(.*)\\|(.*)\\|(.*)\\|(.*)");
    }

    @Override
    public void initialize() {
        // no-op
    }

    public static void main(String[] args) {
        try {
        SimpleDateFormat timeFormater = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        String time = "2018-7-12 15:22:22.123";
        Date dscDt = timeFormater.parse(time);

        System.out.println(dscDt);

        SimpleDateFormat outformatter = new SimpleDateFormat("yyyyMMddHH");
        String dateString = outformatter.format(dscDt);
        System.out.println(dateString);

        } catch (ParseException e) {
            logger.error(e.getMessage(), e);
        }
    }
    /**
     * Modifies events in-place.
     */
    @Override
    public Event intercept(Event event) {
        try {
            Map<String, String> headers = event.getHeaders();
            String logTimeStr = "";
            Matcher matcher = regex.matcher(
                    new String(event.getBody(), Charsets.UTF_8));
            int count = matcher.groupCount();
            if (matcher.matches()) {
                if (count > 1) {
                    logTimeStr = matcher.group(1);
                }
            }

            SimpleDateFormat timeFormater = new SimpleDateFormat(formatter);

            Date dscDt = null;
            try {
                dscDt = timeFormater.parse(logTimeStr);
            } catch (ParseException e) {
                logger.error(e.getMessage(), e);
            }

            //如時間錯誤,則使用當(dāng)前時間
            if (dscDt == null) {
                dscDt = new Date();
            }

            SimpleDateFormat formatter = new SimpleDateFormat(dest_formatter);
            String destDtStr = formatter.format(dscDt);

            headers.put(KolazzInterceptor.Constants.HEADER_NAME, destDtStr);

        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
        return event;
    }

    /**
     * Delegates to {@link #intercept(Event)} in a loop.
     * @param events
     * @return
     */
    @Override
    public List<Event> intercept(List<Event> events) {
        for (Event event : events) {
            intercept(event);
        }
        return events;
    }

    @Override
    public void close() {
        // no-op
    }

    /**
     * Builder which builds new instances of the TimestampInterceptor.
     */
    public static class Builder implements Interceptor.Builder {

        private Context context;

        @Override
        public Interceptor build() {
            return new KolazzInterceptor(context);
        }

        @Override
        public void configure(Context context) {
            this.context = context;
        }

    }

    public static class Constants {
        public static final String HEADER_NAME = "kolazz_dt";
        public static final String FORMATTER = "formatter";
        public static final String DEST_FORMATTER = "dest_formatter";

        public static String HEADER_NAME_DFLT = "kolazz_date";
        public static String FORMATTER_DFLT = "yyyy-MM-dd HH:mm:ss.SSS";
        public static final String DEST_FORMATTER_DFLT = "yyyyMMdd";
    }

}


log日志:


image.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末翩剪,一起剝皮案震驚了整個濱河市乳怎,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌前弯,老刑警劉巖蚪缀,帶你破解...
    沈念sama閱讀 210,835評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異恕出,居然都是意外死亡询枚,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,900評論 2 383
  • 文/潘曉璐 我一進店門浙巫,熙熙樓的掌柜王于貴愁眉苦臉地迎上來金蜀,“玉大人,你說我怎么就攤上這事狈醉×停” “怎么了?”我有些...
    開封第一講書人閱讀 156,481評論 0 345
  • 文/不壞的土叔 我叫張陵苗傅,是天一觀的道長抒线。 經(jīng)常有香客問我,道長渣慕,這世上最難降的妖魔是什么嘶炭? 我笑而不...
    開封第一講書人閱讀 56,303評論 1 282
  • 正文 為了忘掉前任,我火速辦了婚禮逊桦,結(jié)果婚禮上眨猎,老公的妹妹穿的比我還像新娘。我一直安慰自己强经,他們只是感情好睡陪,可當(dāng)我...
    茶點故事閱讀 65,375評論 5 384
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著匿情,像睡著了一般兰迫。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上炬称,一...
    開封第一講書人閱讀 49,729評論 1 289
  • 那天汁果,我揣著相機與錄音,去河邊找鬼玲躯。 笑死据德,一個胖子當(dāng)著我的面吹牛鳄乏,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播棘利,決...
    沈念sama閱讀 38,877評論 3 404
  • 文/蒼蘭香墨 我猛地睜開眼橱野,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了赡译?” 一聲冷哼從身側(cè)響起仲吏,我...
    開封第一講書人閱讀 37,633評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎蝌焚,沒想到半個月后裹唆,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,088評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡只洒,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,443評論 2 326
  • 正文 我和宋清朗相戀三年许帐,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片毕谴。...
    茶點故事閱讀 38,563評論 1 339
  • 序言:一個原本活蹦亂跳的男人離奇死亡成畦,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出涝开,到底是詐尸還是另有隱情循帐,我是刑警寧澤,帶...
    沈念sama閱讀 34,251評論 4 328
  • 正文 年R本政府宣布舀武,位于F島的核電站拄养,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏银舱。R本人自食惡果不足惜瘪匿,卻給世界環(huán)境...
    茶點故事閱讀 39,827評論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望寻馏。 院中可真熱鬧棋弥,春花似錦、人聲如沸诚欠。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,712評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽轰绵。三九已至家乘,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間藏澳,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,943評論 1 264
  • 我被黑心中介騙來泰國打工耀找, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留翔悠,地道東北人业崖。 一個月前我還...
    沈念sama閱讀 46,240評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像蓄愁,于是被迫代替她去往敵國和親双炕。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,435評論 2 348