Flink程序是對(duì)數(shù)據(jù)流,進(jìn)行各種分布式處理/轉(zhuǎn)換发绢。通過sources創(chuàng)建輸入的數(shù)據(jù)流(可以讀取文件备图,從kafka的topic讀取影所,或者內(nèi)存的collecitons)蹦肴;結(jié)果通過Sinks輸出,可以寫到本不是文件系統(tǒng)上猴娩,或者輸出到標(biāo)準(zhǔn)輸出阴幌。Flink程序可以獨(dú)立運(yùn)行,也可以嵌入到其它程序中運(yùn)行卷中;Flink可以在本機(jī)的JVM中執(zhí)行矛双,也可以提交到多機(jī)器的集群上執(zhí)行。
Flink程序分為批處理和流處理兩種蟆豫,批處理用來(lái)處理有限的數(shù)據(jù)集议忽,流處理用來(lái)處理持續(xù)的流數(shù)據(jù)。這兩種類型的基本編程模式是類似的十减,本別使用DataStream API和DataSet API栈幸,接下來(lái)基于DataStream API來(lái)介紹愤估。
Flink程序的五個(gè)部分:
1 獲取執(zhí)行環(huán)境
2 載入數(shù)據(jù)
3 對(duì)數(shù)據(jù)進(jìn)行處理/轉(zhuǎn)換
4 設(shè)置數(shù)據(jù)輸出方式
5 啟動(dòng)程序,開始執(zhí)行
下面以一個(gè)實(shí)際程序?yàn)槔齺?lái)說明速址,下面的程序的輸入是出租車司機(jī)的結(jié)單數(shù)據(jù)玩焰,輸出每個(gè)出租車司機(jī)的累積結(jié)單數(shù)量:
ParameterTool params = ParameterTool.fromArgs(args);
final String input = params.get("input", ExerciseBase.pathToRideData);
final int maxEventDelay = 60; // events are out of order by max 60 seconds
final int servingSpeedFactor = 600; // events of 10 minutes are served every second
// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// start the data generator
DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(input, maxEventDelay, servingSpeedFactor));
// map each ride to a tuple of (driverId, 1)
DataStream<Tuple2<Long, Long>> tuples = rides.map(new MapFunction<TaxiRide, Tuple2<Long, Long>>() {
@Override
public Tuple2<Long, Long> map(TaxiRide ride) throws Exception {
return new Tuple2<Long, Long>(ride.driverId, 1L) ;
}
});
// partition the stream by the driverId
KeyedStream<Tuple2<Long, Long>, Tuple> keyedByDriverId = tuples.keyBy(0);
// count the rides for each driver
DataStream<Tuple2<Long, Long>> rideCounts = keyedByDriverId.sum(1);
// we could, in fact, print out any or all of these streams
rideCounts.print();
// run the cleansing pipeline
env.execute("Ride Count");