場景:
數(shù)據(jù)流向:kafka-->spark-->es仿耽。
其中spark數(shù)據(jù)處理會有過濾,etl等步驟各薇,需求不同项贺,搭配不同。如A場景需要過濾+etl峭判;B場景需要過濾+Etl1+etl2开缎;C場景需要etl2+etl3。不同的合成有不同的功能朝抖。
實現(xiàn):使用合成模式啥箭。
定義:
接口ChildProcess,作為合成子項的統(tǒng)一接口:
public interface ChildProcess {
public Dataset doChildProcess() throws Exception;
}
組合類:
public class CompositeChildProcess implements ChildProcess,Serializable {
private List<ChildProcess> childProcesses = new ArrayList<>();
public void add(ChildProcess childProcess) {
childProcesses.add(childProcess);
}
public void remove(ChildProcess childProcess) {
childProcesses.remove(childProcess);
}
@Override
public Dataset doChildProcess(Dataset dataset) throws Exception{
for (ChildProcess childProcess : childProcesses) {
dataset = childProcess.doChildProcess(dataset);
}
return dataset;
}
}
elt合成子項:
public class CustomEtlProcess implements ChildProcess {
@Override
public Dataset doChildProcess(Dataset dataset) throws Exception{
....
return etl;
}
}
filter合成子項:
public class CustomkafkaFilterProcess implements ChildProcess,Serializable {
@Override
public Dataset doChildProcess(Dataset dataset) throws Exception{
....
return filter;
}
}
client調(diào)用端:
如kafka-filter-etl-es過程
CompositeChildProcess compositeChildProcess = new CompositeChildProcess();
CustomkafkaFilterProcess customkafkaFilterProcess = new CustomkafkaFilterProcess();
compositeChildProcess.add(customkafkaFilterProcess);
CustomEtlProcess customEtlProcess = new CustomEtlProcess();
compositeChildProcess.add(customEtlProcess);
compositeChildProcess.doChildProcess();
如kafka-etl-filter-etl-es過程:
CompositeChildProcess compositeChildProcess = new CompositeChildProcess();
CustomEtlProcess customEtlProcess = new CustomEtlProcess();
compositeChildProcess.add(customEtlProcess);
CustomkafkaFilterProcess customkafkaFilterProcess = new CustomkafkaFilterProcess();
compositeChildProcess.add(customkafkaFilterProcess);
CustomEtlProcess customEtlProcess = new CustomEtlProcess();
compositeChildProcess.add(customEtlProcess);
compositeChildProcess.doChildProcess();
由于所以子項,包含組合類都實現(xiàn)了子項接口治宣,因此可有任意組合和多層級組合急侥,實現(xiàn)靈活多變的流程控制。