在上一篇StreamOperator源碼簡析從源碼角度分析了StreamOperator以及其實現(xiàn)類,此篇幅主要分析一下如何自定義一個StreamOperator仿荆。
StreamOperator接口提供了其生命周期的抽象方法,例如初始化方法setup坏平、open拢操、initializeState,checkpoint相關(guān)方法prepareSnapshotPreBarrier舶替、snapshotState令境,但是我們沒有必要去自己一一實現(xiàn)這些方法,可以繼承其抽象類AbstractStreamOperator顾瞪,覆蓋一些我們需要重寫的方法舔庶。在上一篇分析中提到對于source端不需要接受上游數(shù)據(jù)抛蚁,也就不需要實現(xiàn)OneInputStreamOperator或者TwoInputStreamOperator接口,如果我們需要接收上游數(shù)據(jù)就必須實現(xiàn)這兩個接口中的一個惕橙,主要看一個輸入還是兩個輸入來選擇瞧甩。
案例:假設(shè)我們現(xiàn)在需要實現(xiàn)一個通用的定時、定量的輸出的StreamOperator吕漂。
實現(xiàn)步驟:
繼承AbstractStreamOperator抽象類亲配,實現(xiàn)OneInputStreamOperator接口
重寫open方法尘应,調(diào)用flink 提供的定時接口惶凝,并且注冊定時器
重寫initializeState/snapshotState方法,由于批量寫需要做緩存犬钢,那么需要保證數(shù)據(jù)的一致性苍鲜,將緩存數(shù)據(jù)存在狀態(tài)中
重寫processElement方法,將數(shù)據(jù)存在緩存中玷犹,達(dá)到一定大小然后輸出
由于需要做定時調(diào)用混滔,那么需要有一個定時調(diào)用的回調(diào)方法,那么定義的類需要實現(xiàn)ProcessingTimeCallback接口歹颓,并且實現(xiàn)其onProcessingTime方法(關(guān)于flink定時可以參考定時系列文章)
代碼:
publicabstractclassCommonSinkOperator<T extendsSerializable>extendsAbstractStreamOperator<Object>
implementsProcessingTimeCallback,OneInputStreamOperator<T,Object>{
privateList<T> list;
privateListState<T> listState;
privateint batchSize;
privatelong interval;
privateProcessingTimeService processingTimeService;
publicCommonSinkOperator(){
}
publicCommonSinkOperator(int batchSize,long interval){
this.chainingStrategy =ChainingStrategy.ALWAYS;
this.batchSize = batchSize;
this.interval = interval;
}
@Overridepublicvoid open()throwsException{
super.open();
if(interval >0&& batchSize >1){
//獲取AbstractStreamOperator里面的ProcessingTimeService坯屿, 該對象用來做定時調(diào)用
//注冊定時器將當(dāng)前對象作為回調(diào)對象,需要實現(xiàn)ProcessingTimeCallback接口
processingTimeService = getProcessingTimeService();
long now = processingTimeService.getCurrentProcessingTime();
processingTimeService.registerTimer(now + interval,this);
}
}
//狀態(tài)恢復(fù)
@Overridepublicvoid initializeState(StateInitializationContext context)throwsException{
super.initializeState(context);
this.list =newArrayList<T>();
listState = context.getOperatorStateStore().getSerializableListState("batch-interval-sink");
if(context.isRestored()){
listState.get().forEach(x ->{
list.add(x);
});
}
}
@Overridepublicvoid processElement(StreamRecord<T> element)throwsException{
list.add(element.getValue());
if(list.size()>= batchSize){
saveRecords(list);
}
}
//checkpoint
@Overridepublicvoid snapshotState(StateSnapshotContext context)throwsException{
super.snapshotState(context);
if(list.size()>0){
listState.clear();
listState.addAll(list);
}
}
//定時回調(diào)
@Overridepublicvoid onProcessingTime(long timestamp)throwsException{
if(list.size()>0){
saveRecords(list);
list.clear();
}
long now = processingTimeService.getCurrentProcessingTime();
processingTimeService.registerTimer(now + interval,this);//再次注冊
}
publicabstractvoid saveRecords(List<T> datas);
}
如何調(diào)用巍扛?直接使用dataStream.transform方式即可领跛。
整體來說這個demo相對來說是比較簡單的,但是這里面涉及的定時撤奸、狀態(tài)管理也是值得研究吠昭,比喻說在這里定時我們直接選擇ProcessingTimeService,而沒有選擇InternalTimerService來完成定時注冊胧瓜,主要是由于InternalTimerService會做定時調(diào)用狀態(tài)保存矢棚,在窗口操作中需要任務(wù)失敗重啟仍然可以觸發(fā)定時,但是在我們案例中不需要府喳,直接下次啟動重新注冊即可蒲肋,因此選擇了ProcessingTimeService。
推薦閱讀
1. Flink中延時調(diào)用設(shè)計與實現(xiàn)
2. Flink維表關(guān)聯(lián)系列之Hbase維表關(guān)聯(lián):LRU策略
4. Flink exactly-once系列之事務(wù)性輸出實現(xiàn)
5. Flink時間系統(tǒng)系列之實例講解:如何做定時輸出
6. Flink實戰(zhàn):全局TopN分析與實現(xiàn)
7. Flink per-Job模式InfluxdbReporter上報JobName
關(guān)注回復(fù)Flink獲取更多信息~