1.1.自定義Source說(shuō)明
Source是負(fù)責(zé)接收數(shù)據(jù)到Flume Agent的組件工坊。官方提供的source類型已經(jīng)很多聪全,但是有時(shí)候并不能滿足實(shí)際開(kāi)發(fā)當(dāng)中的需求熏矿,此時(shí)就需要根據(jù)實(shí)際需求自定義某些source艰亮。
官方提供了自定義source的接口說(shuō)明:
<u>https://flume.apache.org/FlumeDeveloperGuide.html#source</u>
自定義source中有一種類型叫做PollableSource。底層是通過(guò)線程不斷去調(diào)用process方法误债,主動(dòng)拉取消息恨溜。
1.2.自定義Source原理
根據(jù)官方說(shuō)明自定義source組件需要繼承AbstractSource類并實(shí)現(xiàn)Configurable和PollableSource接口,并實(shí)現(xiàn)相應(yīng)方法:
1.3.示例說(shuō)明
public class MySource extends AbstractSource implements Configurable, PollableSource {
private String myProp;
@Override
public void configure(Context context) {
String myProp = context.getString("myProp", "defaultValue");
// 獲取處理自定義 conf 文件中的屬性值(驗(yàn)證找前,或類型轉(zhuǎn)換......)
// preText = context.getString("preText");
// 將 myProp 放在一個(gè)變量中 后續(xù) process() 方法需要用
this.myProp = myProp;
}
@Override
public void start() {
// 初始化與外部客戶端的連接
}
@Override
public void stop () {
// 釋放資源或清空字段等
}
@Override
public Status process() throws EventDeliveryException {
Status status = null;
try {
// 自定義處理
// 接收新數(shù)據(jù)
Event e = getSomeData();
// 將 Event 存儲(chǔ)到此 Source 相關(guān)Channel(s) 中
getChannelProcessor().processEvent(e);
status = Status.READY;
} catch (Throwable t) {
// 異常記錄, 處理
status = Status.BACKOFF;
// 異常拋出
if (t instanceof Error) {
throw (Error)t;
}
} finally {
txn.close();
}
return status;
}
}