因工作需求女揭,研究了下flume插件開發(fā)肋杖,調(diào)通了開發(fā)斷點環(huán)境。
公司使用了CDH5.10.1勘纯, flume對應(yīng)的版本是1.6局服,文檔和源碼請參見下面鏈接。
http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.6.0-cdh5.10.1/index.html
直接給出CDH版本flume 1.6版本的源碼下載鏈接(這里浪費我很多時間驳遵,花了一段時間才找到)
https://git-wip-us.apache.org/repos/asf?p=flume.git;a=commit;h=refs/tags/release-1.6.0
第一次編譯錯誤
[ERROR] Failed to execute goal on project flume-hdfs-sink: Could not resolve dependencies for project org.apache.flume.flume-ng-sinks:flume-hdfs-sink:jar:1.5.0: Failure to find org.apache.hadoop:hadoop-test:jar:2.4.0 in http://repo1.maven.org/maven2 was cached in the local repository, resolution will not be reattempted until the update interval of repo1.maven.org has elapsed or updates are forced -> [Help 1]
在flume根目錄下的pom.xml中<repositorys>添加
<repository>
<id>p2.jfrog.org</id>
<url>http://p2.jfrog.org/libs-releases</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
執(zhí)行mvn clean install后淫奔,flume-ng-dist模塊生成打包文件。
flume-ng-dist\target\apache-flume-1.6.0-bin\apache-flume-1.6.0-bin\conf
[只講解windows系統(tǒng)的方法]
準(zhǔn)備flume配置文件
在flume-ng-dist\target\apache-flume-1.6.0-bin\apache-flume-1.6.0-bin\conf文件夾新建文件test-header.conf
配置信息test-header.conf
a1.channels = c1
a1.sources = r1
a1.sinks = k1
#channel
a1.channels.c1.type = memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=50000
#source
a1.sources.r1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = E:\\flumefile
a1.sources.r1.batchSize= 100
a1.sources.r1.inputCharset = UTF-8
a1.sources.r1.fileHeader = true
a1.sources.r1.fileHeaderKey = file_path
a1.sources.r1.basenameHeader = true
a1.sources.r1.basenameHeaderKey = file_name
## source 攔截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_extractor
#a1.sources.r1.interceptors.i1.regex = cookieid is (.*) and ip is (.*)
a1.sources.r1.interceptors.i1.regex = cookieid is (.*) and ip is (.*)
a1.sources.r1.interceptors.i1.serializers = s1 s2
a1.sources.r1.interceptors.i1.serializers.s1.name = cookieid
a1.sources.r1.interceptors.i1.serializers.s2.name = ip
#sink
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
flume啟動方式一:
請準(zhǔn)備windows下flume啟動bat:
在flume-ng-dist\target\apache-flume-1.6.0-bin\apache-flume-1.6.0-bin\bin文件夾新建文件flume-ng_windows.bat
set CONF_FILE=test-header.conf
::set agent name
set AGENT_NAME=a1
setlocal
for %%i in ("%~dp0..") do set "folder=%%~fi"
set FLUME_HOME="%folder%"
::Dflume.monitoring.type=ganglia
::-Xdebug -Xrunjdwp:transport=dt_socket,address=8000,server=y,suspend=y
%JAVA_HOME%\bin\java.exe -Xms512m -Xmx512m -Dlog4j.configuration=file:///E:\git\flume-4652849\flume-ng-dist\target\apache-flume-1.6.0-bin1\apache-flume-1.6.0-bin\conf\log4j.properties -Dflume.root.logger=INFO,console -cp %FLUME_HOME%\lib\*;%FLUME_HOME%\plugins.d\KolazzInterceptor\lib\* org.apache.flume.node.Application -f %FLUME_HOME%\conf\%CONF_FILE% -n %AGENT_NAME%
pause
flume啟動方式二(推薦):
----------命令啟動堤结,開始唆迁,在這個flume版本 powershell啟動方式有些問題(已解決問題,解決方案見內(nèi)容)竞穷,推薦--------
flume 1.6中存在問題媒惕,如圖
需要先將 apache-flume-1.6.0-bin\bin\flume-ng.ps1 文件中322行如圖所示部分替換掉。
替換為
$pluginTmp1 = (@(Get-ChildItem "$plugin\*\lib") -join "\*"";""")
if( "$pluginTmp1" -ne "" ) {
$javaClassPath="$javaClassPath;""" + $pluginTmp1 + "\*"";"
}
$pluginTmp2 = (@(Get-ChildItem "$plugin\*\libext") -join "\*"";""")
if( "$pluginTmp2" -ne "" ) {
$javaClassPath="$javaClassPath;""" + $pluginTmp2 + "\*"";"
}
然后從conf\flume-env.ps1.template復(fù)制一個新文件命名為flume-env.ps1 来庭,文件修改$JAVA_OPTS="-Xmx100m -Xdebug -Xrunjdwp:transport=dt_socket,address=8000,server=y,suspend=y"
在bin目錄中執(zhí)行命令:
flume-ng.cmd agent --conf ../conf --conf-file ../conf/test-header.conf --name a1 -property
flume.root.logger=INFO,console
----------命令啟動,結(jié)束--------
開啟flume遠(yuǎn)程調(diào)試
請確認(rèn)flume-ng_windows.bat文件穿挨,遠(yuǎn)程調(diào)試端口8000月弛,
-Xdebug -Xrunjdwp:transport=dt_socket,address=8000,server=y,suspend=y
添加配置
端口修改為8000
執(zhí)行flume-ng_windows.bat啟動flume
此時命令控制臺將暫停在這里,不用擔(dān)心科盛,請用剛才在IDEA中啟動配置啟動調(diào)試flume項目(先命令啟動flume帽衙,再啟動調(diào)試idea項目),
當(dāng)前講解用例通過插件來演示贞绵。
請先在flume源碼中找到RegexExtractorInterceptor.java文件厉萝,并在如圖所示中打上斷點。
由于source是spooldir方式,請將一個內(nèi)容為cookieid is c_1 and ip is 127.0.0.1的文件丟入E:\flumefile文件夾
此時進入斷點:
接下來谴垫,請盡情開發(fā)flume插件吧章母。
demo 插件
source日志數(shù)據(jù)demo:
2018-07-11 13:00:07.122|LOG_INFO |dev14|recv_term|13922222222|{"port":"30011","pro":"808","ver":"2013","buss":"anxin","thread":"140543769519872","connid":"15131","len",220}|7E 02 00 00 CF 01 39 22 22 22 22 87 10 08 00 08 20 00 0C 00 01 02 11 49 C1 06 C6 BB CC 00 00 00 00 00 00 18 07 11 13 00 05 01 04 00 00 00 14 02 02 00 00 03 02 00 00 04 02 00 00 25 04 80 00 00 00 2A 02 00 00 2B 04 00 7A 00 B7 30 01 05 31 01 00 20 02 FF FF 21 02 FF FF 22 02 FF FF 23 02 FF FF 24 04 00 00 00 00 26 04 FF FF FF FF 28 01 00 3A 01 00 3B 04 FF FF FF FF 3F 04 FF FF FF FF 40 04 FF FF FF FF 42 02 00 EF 43 02 FF FF 44 02 FF FF 45 02 FF FF 46 02 FF FF 47 02 FF FF 48 02 FF FF 38 04 00 00 0F 12 37 01 00 E1 30 01 01 FF 02 01 05 03 01 00 04 04 00 00 0F 12 05 04 FF FF FF FF 06 04 FF FF FF FF 08 01 02 09 02 FF FF 0A 02 FF FF 0B 02 FF FF 0C 01 51 0D 01 FF 7B 7E
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.kolazz.flume.interceptor;
import com.google.common.base.Charsets;
import org.apache.commons.lang.time.DateUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class KolazzInterceptor implements Interceptor {
private static final Logger logger = LoggerFactory.getLogger(KolazzInterceptor.class);
private String headerName;
private String formatter;
private String dest_formatter;
private final Pattern regex;
/**
* Only {@link KolazzInterceptor.Builder} can build me
*/
protected KolazzInterceptor(Context context) {
headerName = context.getString(Constants.HEADER_NAME, Constants.HEADER_NAME_DFLT);
formatter = context.getString(Constants.FORMATTER, Constants.FORMATTER_DFLT);
dest_formatter = context.getString(Constants.DEST_FORMATTER, Constants.DEST_FORMATTER_DFLT);
regex = Pattern.compile("(.*)\\|(.*)\\|(.*)\\|(.*)\\|(.*)\\|(.*)\\|(.*)");
}
@Override
public void initialize() {
// no-op
}
public static void main(String[] args) {
try {
SimpleDateFormat timeFormater = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
String time = "2018-7-12 15:22:22.123";
Date dscDt = timeFormater.parse(time);
System.out.println(dscDt);
SimpleDateFormat outformatter = new SimpleDateFormat("yyyyMMddHH");
String dateString = outformatter.format(dscDt);
System.out.println(dateString);
} catch (ParseException e) {
logger.error(e.getMessage(), e);
}
}
/**
* Modifies events in-place.
*/
@Override
public Event intercept(Event event) {
try {
Map<String, String> headers = event.getHeaders();
String logTimeStr = "";
Matcher matcher = regex.matcher(
new String(event.getBody(), Charsets.UTF_8));
int count = matcher.groupCount();
if (matcher.matches()) {
if (count > 1) {
logTimeStr = matcher.group(1);
}
}
SimpleDateFormat timeFormater = new SimpleDateFormat(formatter);
Date dscDt = null;
try {
dscDt = timeFormater.parse(logTimeStr);
} catch (ParseException e) {
logger.error(e.getMessage(), e);
}
//如時間錯誤,則使用當(dāng)前時間
if (dscDt == null) {
dscDt = new Date();
}
SimpleDateFormat formatter = new SimpleDateFormat(dest_formatter);
String destDtStr = formatter.format(dscDt);
headers.put(KolazzInterceptor.Constants.HEADER_NAME, destDtStr);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return event;
}
/**
* Delegates to {@link #intercept(Event)} in a loop.
* @param events
* @return
*/
@Override
public List<Event> intercept(List<Event> events) {
for (Event event : events) {
intercept(event);
}
return events;
}
@Override
public void close() {
// no-op
}
/**
* Builder which builds new instances of the TimestampInterceptor.
*/
public static class Builder implements Interceptor.Builder {
private Context context;
@Override
public Interceptor build() {
return new KolazzInterceptor(context);
}
@Override
public void configure(Context context) {
this.context = context;
}
}
public static class Constants {
public static final String HEADER_NAME = "kolazz_dt";
public static final String FORMATTER = "formatter";
public static final String DEST_FORMATTER = "dest_formatter";
public static String HEADER_NAME_DFLT = "kolazz_date";
public static String FORMATTER_DFLT = "yyyy-MM-dd HH:mm:ss.SSS";
public static final String DEST_FORMATTER_DFLT = "yyyyMMdd";
}
}
log日志: