1、抽象分層
- ProcessFunction:提供對(duì)時(shí)間揪阶、事件缠捌、狀態(tài)的細(xì)粒度控制,用于處理一些復(fù)雜事件的邏輯上苍姜,易用性較低
- DataStreamApi&DataSet:核心api牢酵,提供對(duì)流/批數(shù)據(jù)的操作處理,基于函數(shù)式的怖现,簡(jiǎn)單易用
- SQL&TableApi:flink sql的集成基于apache calcite茁帽,使用比其他api更靈活方便
2、datastream api
datastream api主要包含以下3塊內(nèi)容
1屈嗤、datasource
數(shù)據(jù)的輸入來源潘拨,來源方式主要有以下幾種
-
來自文件:讀取文本文件,將符合TextInputFormat規(guī)范的文件饶号,將字符串返回
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> text = env.readTextFile("file:///filePath");
來自集合:fromCollection(Collection)铁追,fromElements(T ...)等
-
來自socket
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);
-
自定義輸入
自定義輸入源有兩種方式:
-
實(shí)現(xiàn)SourceFunction接口來自定義無并行度的數(shù)據(jù)源
demo:每一秒產(chǎn)生一條數(shù)據(jù)的source
package streaming.source; import org.apache.flink.streaming.api.functions.source.SourceFunction; /** * @author xiaolong */ public class InputSource implements SourceFunction<Long> { private boolean isRunning = true; private Long counter = 1L; @Override public void cancel() { isRunning = false; } @Override public void run(SourceContext<Long> context) throws Exception { while (isRunning) { context.collect(counter); counter++; Thread.sleep(1000); } } }
-
- 實(shí)現(xiàn)ParallelSourceFunction接口或者繼承RichParallelSourceFunction來自定義具有并行度的數(shù)據(jù)源
2、transform
flink提供了很多算子茫船,經(jīng)常使用的有以下這些:
Map:輸入一個(gè)元素琅束,可以進(jìn)行邏輯運(yùn)算,輸出一個(gè)元素
FlatMap:輸入一個(gè)元素算谈,輸出多個(gè)或零個(gè)元素
Filter:元素過濾涩禀,符合條件的會(huì)保留
-
Union:合并多個(gè)流,必須保證合并的流必須是格式一致的
修改InputSource的類型為String然眼,再新增一個(gè)InputStringSource
package streaming.source; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.util.Arrays; import java.util.List; import java.util.Random; /** * @author xiaolong */ public class InputStringSource implements SourceFunction<String> { private boolean isRunning = true; private List<String> alphabet = Arrays.asList("a", "b", "c", "d", "e", "f", "g"); @Override public void cancel() { isRunning = false; } @Override public void run(SourceContext<String> ctx) throws Exception { while (isRunning) { Random random = new Random(); ctx.collect(alphabet.get(random.nextInt(alphabet.size()))); Thread.sleep(1000); } } }
測(cè)試代碼:
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import streaming.source.InputSource; import streaming.source.InputStringSource; /** * @author xiaolong */ public class TestSource { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> source = env.addSource(new InputSource()); DataStreamSource<String> source2 = env.addSource(new InputStringSource()); source.union(source2).print(); env.execute("testInputSource"); } }
輸出結(jié)果如下:
Connect:只能合并兩個(gè)流艾船,可以不必保證流的格式一致性
-
coMap/coFlatMap:在ConnectedStream中使用這種函數(shù),類似于Map和FlatMap
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction; import org.apache.flink.util.Collector; import java.util.ArrayList; import java.util.List; import streaming.source.InputSource; import streaming.source.InputStringSource; /** * @author xiaolong */ public class TestSource { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Long> intSource = env.addSource(new InputSource()); DataStreamSource<String> strSource = env.addSource(new InputStringSource()); DataStream<List<String>> result = intSource.connect(strSource).flatMap(new CoFlatMapFunction<Long, String, List<String>>() { List<String> list = new ArrayList<>(); @Override public void flatMap1(Long aLong, Collector<List<String>> collector) throws Exception { list.add(aLong.toString()); collector.collect(list); } @Override public void flatMap2(String s, Collector<List<String>> collector) throws Exception { list.add(s); collector.collect(list); } }); result.print(); env.execute("testInputSource"); } }
測(cè)試結(jié)果:
Split:根據(jù)規(guī)則把一個(gè)流切分為多個(gè)流
Select:選擇切分后的流高每,與Split配合使用
KeyBy:根據(jù)指定的Key進(jìn)行分組屿岂,Key相同的數(shù)據(jù)會(huì)進(jìn)入到同一個(gè)分區(qū)
Aggregation:聚合算子,例如sum鲸匿,max等
-
Reduce:將上一條數(shù)據(jù)與當(dāng)前數(shù)據(jù)進(jìn)行聚合操作爷怀,返回一條新數(shù)據(jù)
import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * @author xiaolong */ public class TestSource { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Tuple2<Integer, Integer>> source = env.fromElements(Tuple2.of(1, 10), Tuple2.of(2, 20), Tuple2.of(2, 21), Tuple2.of(1, 11), Tuple2.of(2, 22)); SingleOutputStreamOperator<Tuple2<Integer, Integer>> reduce = source.keyBy(0).reduce(new ReduceFunction<Tuple2<Integer, Integer>>() { @Override public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> t2, Tuple2<Integer, Integer> t1) throws Exception { return new Tuple2<>(t1.f0, t2.f1 + t1.f1); } }); reduce.print(); env.execute("testInputSource"); } }
測(cè)試結(jié)果:
-
分區(qū):
隨機(jī)分區(qū):dataStream.shuffle();
重新平衡:dataStream.rebalance(),對(duì)數(shù)據(jù)進(jìn)行再平衡带欢、重分區(qū)和消除數(shù)據(jù)傾斜
-
重新調(diào)節(jié):dataStream.rescale
2和3的區(qū)別是rebalance會(huì)產(chǎn)生全量重分區(qū)运授,rescale重新調(diào)節(jié)的過程是烤惊,如果上游有4個(gè)并發(fā)操作,下游有2個(gè)并發(fā)徒坡,重新調(diào)節(jié)后上游的2個(gè)并發(fā)會(huì)分配給下游的1個(gè)并發(fā)操作撕氧,反之亦然。
-
自定義分區(qū):自定義分區(qū)需要實(shí)現(xiàn)partitionCustom方法
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.Arrays; import java.util.List; import streaming.source.InputStringSource; /** * @author xiaolong */ public class TestSource { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> strSource = env.addSource(new InputStringSource()); List<String> list = Arrays.asList("a", "b", "c", "d"); strSource.map(new MapFunction<String, Tuple1<String>>() { @Override public Tuple1<String> map(String s) throws Exception { return new Tuple1<>(s); } }).partitionCustom(new Partitioner<String>() { @Override public int partition(String s, int i) { System.out.println("分區(qū)個(gè)數(shù):" + i); if (list.contains(s)) { return 0; }else { return 1; } } }, 0).print(); env.execute("testFlinkJob"); } }
測(cè)試結(jié)果:
3喇完、sink
flink有如下幾種sink操作:
標(biāo)準(zhǔn)輸出:print()/printToErr()
輸出到文檔或socket:writeAsCsv伦泥,writeAsText,writeToSocket
-
寫入到flink第三方存儲(chǔ):ElasticSearch锦溪,Redis不脯,kafkaProducer等
測(cè)試從socket讀取數(shù)據(jù),寫入到kafka
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import java.util.Properties; /** * @author xiaolong */ public class TestSource { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> strSource = env.socketTextStream("localhost", 9000, "\n"); Properties properties = new Properties(); properties.put("bootstrap.servers", "xxxxxx"); // brokers地址 properties.put("transaction.timeout.ms", 15 * 60 * 1000); // 設(shè)置FlinkKafkaProducer011的超時(shí)時(shí)間刻诊,默認(rèn)是1h, kafka服務(wù)默認(rèn)事務(wù)超時(shí)時(shí)間是15min防楷,如果不設(shè)置會(huì)報(bào)錯(cuò) FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>( "kafkaDruid", // kafka topic new KeyedSerializationSchemaWrapper<String>(new SimpleStringSchema()), // 序列化 properties, // properties FlinkKafkaProducer011.Semantic.EXACTLY_ONCE); // kafka語義 // 0.10+ 版本的 Kafka 允許在將記錄寫入 Kafka 時(shí)附加記錄的事件時(shí)間戳; // 此方法不適用于早期版本的 Kafka myProducer.setWriteTimestampToKafka(true); strSource.addSink(myProducer); strSource.print(); env.execute("testFlinkJob"); } }
socket輸入:
測(cè)試結(jié)果则涯,到kafka平臺(tái)上可查看到最新的消息:
- 自定義輸出复局,實(shí)現(xiàn)SinkFunction或RichSInkFunction接口