概述
- 系統(tǒng)的學(xué)習(xí)一下flink的streaming 算子操作怀伦,學(xué)習(xí)一下新的技術(shù),增加一下個(gè)人的技術(shù)棧儲(chǔ)備峡迷,java/scala的api儡遮;
- Flink流式算子api官網(wǎng)連接:DataStream Transformations
- 建議多閱讀官網(wǎng),F(xiàn)link在阿里的推動(dòng)下抛杨,發(fā)展很猛茄靠,在官網(wǎng)已經(jīng)有可中文版,可以嘗試閱讀一下蝶桶,邊閱讀邊總結(jié)慨绳;
- flink 1.9.0官網(wǎng)釋義
數(shù)據(jù)準(zhǔn)備
先準(zhǔn)備一份測(cè)試數(shù)據(jù),flink消費(fèi)kafka_010的數(shù)據(jù)真竖,不采用batch的方式;
scala的api練習(xí)詳見github;
java代碼中flink消費(fèi)kafka中的單個(gè)topic脐雪,scla代碼中消費(fèi)的是兩個(gè)topic
java代碼中先運(yùn)行mockup包下方法,產(chǎn)生測(cè)試數(shù)據(jù)恢共,即可運(yùn)行算子操作的demo战秋;
streaming算子
Map
- 處理DataStream中的每一個(gè)元素;
- 官網(wǎng)釋義如下:
DataStream → DataStream
Takes one element and produces one element. A map function that doubles the values of the input stream:
DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) throws Exception {
return 2 * value;
}
});
- demo
/**
* 獲取的是整個(gè)pojo
* MockUpModel(name=yahui, gender=female, timestamp=1563090399305, age=34)
* @param kafkaData
* @return
*/
private static SingleOutputStreamOperator<MockUpModel> getAddAgePojo(SingleOutputStreamOperator<MockUpModel> kafkaData) {
// 返回的是MockUpModel pojo類讨韭,其中age字段均+5
return kafkaData.map(new MapFunction<MockUpModel, MockUpModel>() {
@Override
public MockUpModel map(MockUpModel value) throws Exception {
MockUpModel mockUpModel = new MockUpModel();
mockUpModel.name = value.name;
mockUpModel.gender = value.gender;
mockUpModel.age = value.age + 5;
mockUpModel.timestamp = value.timestamp;
return mockUpModel;
}
});
}
FlatMap
- 將DataStream中的每一個(gè)元素展開返回0到多個(gè)元素脂信,以iterator的形式返回;
- 官網(wǎng)釋義
DataStream → DataStream
Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:
dataStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out)
throws Exception {
for(String word: value.split(" ")){
out.collect(word);
}
}
});
- demo
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
SingleOutputStreamOperator<MockUpModel> kafkaData = getKafka010Data(env);
// 這個(gè)操作不能反應(yīng)flatMap的算子作用,下面的作用相當(dāng)于filter,輸出結(jié)果為MockUpModel(name=liyahui-0, gender=male, timestamp=1561516105296, age=0)
kafkaData.flatMap(new FlatMapFunction<MockUpModel, MockUpModel>() {
@Override
public void flatMap(MockUpModel value, Collector<MockUpModel> out) throws Exception {
if (value.age % 2 == 0) {
out.collect(value);
}
}
}).print().setParallelism(1);
// flatmap透硝,是將嵌套集合轉(zhuǎn)換并平鋪成非嵌套集合狰闪。最好的解釋詳見官網(wǎng)的釋義
/*
dataStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out)
throws Exception {
for(String word: value.split(" ")){
out.collect(word);
}
}
});
*/
env.execute("flink kafka010 demo");
}
Fliter
- 過濾,用于DataStream的每一個(gè)元素,返回的是true或者false
DataStream → DataStream
Evaluates a boolean function for each element and retains those for which the function returns true. A filter that filters out zero values:
dataStream.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) throws Exception {
return value != 0;
}
});
- demo
// method 1
/**
* 過濾出年齡是偶數(shù)的人
*
* @param kafkaData
* @return
*/
private static SingleOutputStreamOperator<MockUpModel> getFilterDS2(SingleOutputStreamOperator<MockUpModel> kafkaData) {
return kafkaData.filter(new FilterFunction<MockUpModel>() {
@Override
public boolean filter(MockUpModel value) throws Exception {
if (value.age % 2 == 0) {
return true;
}
return false;
}
});
}
// method-2
/**
* lambda 的方式
*
* @param kafkaData
* @return
*/
private static SingleOutputStreamOperator<MockUpModel> getFilterDS(SingleOutputStreamOperator<MockUpModel> kafkaData) {
return kafkaData.filter(line -> line.age % 2 == 0);
}
KeyBy
- 對(duì)DataStream進(jìn)行按照指定的key進(jìn)行分區(qū)
- 注意:pojo不能重寫hashCode方法濒生;
DataStream → KeyedStream
Logically partitions a stream into disjoint partitions. All records with the same key are assigned to the same partition. Internally, *keyBy()* is implemented with hash partitioning. There are different ways to [specify keys](https://ci.apache.org/projects/flink/flink-docs-
master/dev/api_concepts.html#specifying-keys).
This transformation returns a *KeyedStream*, which is, among other things, required to use [keyed state](https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/state.html#keyed-state).
dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple
Attention A type **cannot be a key** if:
1. it is a POJO type but does not override the *hashCode()* method and relies on the *Object.hashCode()* implementation.
2. it is an array of any type.
- demo
/**
* 以年齡為分組條件進(jìn)行keyBy
*
* @param kafkaData
* @return
*/
private static KeyedStream<MockUpModel, Integer> getKeyedDS(SingleOutputStreamOperator<MockUpModel> kafkaData) {
// lambda 表達(dá)式
//kafkaData.keyBy(line -> line.age).print().setParallelism(1);
return kafkaData.keyBy(new KeySelector<MockUpModel, Integer>() {
@Override
public Integer getKey(MockUpModel value) throws Exception {
return value.age;
}
});
}
Reduce
- 疊加操作
Reduce
KeyedStream → DataStream
A "rolling" reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value.
A reduce function that creates a stream of partial sums:
keyedStream.reduce(new ReduceFunction<Integer>() {
@Override
public Integer reduce(Integer value1, Integer value2)
throws Exception {
return value1 + value2;
}
});
- demo
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
SingleOutputStreamOperator<MockUpModel> kafka010Data = getKafka010Data(env);
// lambda +java
kafka010Data.keyBy(line -> line.gender).reduce(new ReduceFunction<MockUpModel>() {
@Override
public MockUpModel reduce(MockUpModel value1, MockUpModel value2) throws Exception {
MockUpModel mockUpModel = new MockUpModel();
mockUpModel.name = value1.name + "--" + value2.name;
mockUpModel.gender = value1.gender;
mockUpModel.age = (value1.age + value2.age) / 2;
return mockUpModel;
}
}).print().setParallelism(1);
env.execute("flink kafka010 demo");
}
Fold
- 將keyedDS轉(zhuǎn)成DS;在V1.9中是過期函數(shù)
- 官網(wǎng)釋義
KeyedStream → DataStream
A "rolling" fold on a keyed data stream with an initial value. Combines the current element with the last folded value and emits the new value.
A fold function that, when applied on the sequence (1,2,3,4,5), emits the sequence "start-1", "start-1-2", "start-1-2-3", ...
DataStream<String> result =
keyedStream.fold("start", new FoldFunction<Integer, String>() {
@Override
public String fold(String current, Integer value) {
return current + "-" + value;
}
});
Aggregations
- 聚合函數(shù)埋泵,獲取keyedStream中的最大/最小/sum值, max 和 maxBy 之間的區(qū)別在于 max 返回流中的最大值罪治,但 maxBy 返回具有最大值的鍵丽声, min 和 minBy 同理。
KeyedStream → DataStream
Rolling aggregations on a keyed data stream. The difference between min and minBy is that min returns the minimum value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).
keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");
union
- 聚合算子觉义,可以將多個(gè)結(jié)構(gòu)相同的DataStream進(jìn)行union雁社;
DataStream* → DataStream
Union of two or more data streams creating a new stream containing all the elements from all the streams. Note: If you union a data stream with itself you will get each element twice in the resulting stream.
dataStream.union(otherStream1, otherStream2, ...);
connect
- 用來將兩個(gè)dataStream組裝成一個(gè)ConnectedStreams,而且這個(gè)connectedStream的組成結(jié)構(gòu)就是保留原有的dataStream的結(jié)構(gòu)體;這樣我們就可以把不同的數(shù)據(jù)組裝成同一個(gè)結(jié)構(gòu);
Connect
DataStream,DataStream → ConnectedStreams
"Connects" two data streams retaining their types. Connect allowing for shared state between the two streams.
DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...
ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);
split
- Split就是將一個(gè)DataStream分成兩個(gè)或者多個(gè)DataStream
DataStream → SplitStream
Split the stream into two or more streams according to some criterion.
SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
@Override
public Iterable<String> select(Integer value) {
List<String> output = new ArrayList<String>();
if (value % 2 == 0) {
output.add("even");
}
else {
output.add("odd");
}
return output;
}
});
select
- Select就是獲取分流后對(duì)應(yīng)的數(shù)據(jù)
SplitStream → DataStream
Select one or more streams from a split stream.
SplitStream<Integer> split;
DataStream<Integer> even = split.select("even");
DataStream<Integer> odd = split.select("odd");
DataStream<Integer> all = split.select("even","odd");
iterate
DataStream → IterativeStream → DataStream
Creates a "feedback" loop in the flow, by redirecting the output of one operator to some previous operator. This is especially useful for defining algorithms that continuously update a model. The following code starts with a stream and applies the iteration body continuously. Elements that are greater than 0 are sent back to the feedback channel, and the rest of the elements are forwarded downstream. See [iterations](https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/index.html#iterations) for a complete description.
IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/*do something*/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
@Override
public boolean filter(Long value) throws Exception {
return value > 0;
}
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
@Override
public boolean filter(Long value) throws Exception {
return value <= 0;
}
});
總結(jié)
- Flink的stream算子學(xué)習(xí)第一部分晒骇,簡(jiǎn)單完成霉撵。后期優(yōu)化滋饲;
李小李可不能落后啊