上面試了Processing Time捣作,在這里準(zhǔn)備看下Event Time态秧,以及必須需要關(guān)注的惩激,在ET場(chǎng)景下的Watermarks。
EventTime & Watermark
Event time programs must specify how to generate Event Time Watermarks, which is the mechanism that signals progress in event time.
以event time為準(zhǔn)的程序苞慢,必須要指定watermark.
以下內(nèi)容引自 《從0到1學(xué)習(xí)flink》及 官網(wǎng)說(shuō)明:
支持 Event Time 的流處理器需要一種方法來(lái)衡量 Event Time 的進(jìn)度诵原。 例如,當(dāng) Event Time 超過(guò)一小時(shí)結(jié)束時(shí)挽放,需要通知構(gòu)建每小時(shí)窗口的窗口操作符绍赛,以便操作員可以關(guān)閉正在進(jìn)行的窗口。
Event Time 可以獨(dú)立于 Processing Time 進(jìn)行骂维。 例如惹资,在一個(gè)程序中,操作員的當(dāng)前 Event Time 可能略微落后于 Processing Time (考慮到接收事件的延遲)航闺,而兩者都以相同的速度進(jìn)行褪测。另一方面,另一個(gè)流程序可能只需要幾秒鐘的時(shí)間就可以處理完 Kafka Topic 中數(shù)周的 Event Time 數(shù)據(jù)潦刃。
A stream processor that supports event time needs a way to measure the progress of event time. For example, a window operator that builds hourly windows needs to be notified when event time has passed beyond the end of an hour, so that the operator can close the window in progress.
Event time can progress independently of processing time (measured by wall clocks). For example, in one program the current event time of an operator may trail slightly behind the processing time (accounting for a delay in receiving the events), while both proceed at the same speed. On the other hand, another streaming program might progress through weeks of event time with only a few seconds of processing, by fast-forwarding through some historic data already buffered in a Kafka topic (or another message queue).
Flink 中用于衡量 Event Time 進(jìn)度的機(jī)制是 Watermarks侮措。 Watermarks 作為數(shù)據(jù)流的一部分流動(dòng)并帶有時(shí)間戳 t。 Watermark(t)聲明 Event Time 已到達(dá)該流中的時(shí)間 t乖杠,這意味著流中不應(yīng)再有具有時(shí)間戳 t’<= t 的元素(即時(shí)間戳大于或等于水印的事件)
下圖顯示了帶有(邏輯)時(shí)間戳和內(nèi)聯(lián)水印的事件流分扎。在本例中,事件是按順序排列的(相對(duì)于它們的時(shí)間戳)胧洒,這意味著水印只是流中的周期性標(biāo)記畏吓。
Watermark 對(duì)于無(wú)序流是至關(guān)重要的墨状,如下所示,其中事件不按時(shí)間戳排序菲饼。通常肾砂,Watermark 是一種聲明,通過(guò)流中的該點(diǎn)宏悦,到達(dá)某個(gè)時(shí)間戳的所有事件都應(yīng)該到達(dá)镐确。一旦水印到達(dá)操作員,操作員就可以將其內(nèi)部事件時(shí)間提前到水印的值饼煞。
理解下來(lái)源葫,如果flink中設(shè)置的時(shí)間類型是Event Time,必須要設(shè)置watermark砖瞧,作為告訴flink進(jìn)度的標(biāo)志息堂。
如果watermark(time1)已經(jīng)確定,那么說(shuō)明流中所有time2早于watermark-time1的數(shù)據(jù)肯定都已經(jīng)被處理完畢块促,不管是有序數(shù)據(jù)流還是無(wú)序數(shù)據(jù)流储矩。
watermark是誰(shuí)來(lái)產(chǎn)生的?--sorry褂乍,是跑在flink中的job代碼來(lái)產(chǎn)生,而不是datasource本身即硼。
watermark是每個(gè)數(shù)據(jù)都有一個(gè)對(duì)應(yīng)的么逃片?可以1:1,但不是只酥,按需要和實(shí)際情況來(lái)做褥实。
It is possible to generate a watermark on every single event. However, because each watermark causes some computation downstream, an excessive number of watermarks degrades performance.
平行流中的水印
水印是在源函數(shù)處生成的,或直接在源函數(shù)之后生成的裂允。源函數(shù)的每個(gè)并行子任務(wù)通常獨(dú)立生成其水印损离。這些水印定義了特定并行源處的事件時(shí)間。
當(dāng)水印通過(guò)流程序時(shí)绝编,它們會(huì)提前到達(dá)操作人員處的事件時(shí)間僻澎。當(dāng)一個(gè)操作符提前(advanced)它的事件時(shí)間(event time)時(shí),它為它的后續(xù)操作符在下游生成一個(gè)新的水印十饥。
一些操作員消耗多個(gè)輸入流; 例如窟勃,一個(gè) union,或者跟隨 keyBy(…)或 partition(…)函數(shù)的運(yùn)算符逗堵。 這樣的操作員當(dāng)前事件時(shí)間是其輸入流的事件時(shí)間的最小值秉氧。 由于其輸入流更新其事件時(shí)間,因此操作員也是如此蜒秤。
下圖顯示了流經(jīng)并行流的事件和水印的示例汁咏,以及跟蹤事件時(shí)間的運(yùn)算符亚斋。
從上圖看,event time是從source中產(chǎn)生的攘滩,同樣的帅刊,watermark也是如此。
數(shù)據(jù)從source在經(jīng)過(guò)map轉(zhuǎn)換轰驳,并且放在window中處理
其他的沒(méi)看懂厚掷。。级解。
關(guān)于TimeStamp及Watermark
In order to work with event time, Flink needs to know the events’ timestamps, meaning each element in the stream needs to have its event timestamp assigned. This is usually done by accessing/extracting the timestamp from some field in the element.
Timestamp assignment goes hand-in-hand with generating watermarks, which tell the system about progress in event time.
There are two ways to assign timestamps and generate watermarks:
- Directly in the data stream source
- Via a timestamp assigner / watermark generator: in Flink, timestamp assigners also define the watermarks to be emitted
Attention Both timestamps and watermarks are specified as milliseconds since the Java epoch of 1970-01-01T00:00:00Z.
event time類型下冒黑,flink必須知道event對(duì)應(yīng)的timestamp,也就是說(shuō)勤哗,這個(gè)stream中的每個(gè)元素都要分配timestamp抡爹,一般是放在每個(gè)元素中對(duì)應(yīng)的字段。
分配timestamp和生成watermark一般是在一起處理的(hand-in-hand).
有兩種方式來(lái)分配timestamp+生成watermark
- 直接在datasource中指定
- 通過(guò)一個(gè)timestamp assigner(或者稱之為watermark generator)來(lái)指定芒划。在flink中冬竟,timestamp assigner 同時(shí)也是一個(gè)watermark generator
直接在datasource中指定
Stream sources can directly assign timestamps to the elements they produce, and they can also emit watermarks. When this is done, no timestamp assigner is needed. Note that if a timestamp assigner is used, any timestamps and watermarks provided by the source will be overwritten.
To assign a timestamp to an element in the source directly, the source must use the
collectWithTimestamp(...)
method on theSourceContext
. To generate watermarks, the source must call theemitWatermark(Watermark)
function.
比如之前的mysql datasource with spring,其實(shí)現(xiàn)是這樣的:
@Override
public void run(SourceContext<UrlInfo> sourceContext) throws Exception {
log.info("------query ");
if(urlInfoManager == null){
init();
}
List<UrlInfo> urlInfoList = urlInfoManager.queryAll();
urlInfoList.parallelStream().forEach(urlInfo -> sourceContext.collect(urlInfo));
}
如果需要加入timestamp民逼,則需要調(diào)用collectWithTimestamp泵殴;如果需要生成watermark,則需要調(diào)用emitWatermark拼苍。
修改后如下:
@Override
public void run(SourceContext<UrlInfo> sourceContext) throws Exception {
log.info("------query ");
if(urlInfoManager == null){
init();
}
List<UrlInfo> urlInfoList = urlInfoManager.queryAll();
urlInfoList.parallelStream().forEach(urlInfo -> {
// 增加timestamp
sourceContext.collectWithTimestamp(urlInfo,System.currentTimeMillis());
// 生成水印
sourceContext.emitWatermark(new Watermark(urlInfo.getCurrentTime()== null? System.currentTimeMillis():urlInfo.getCurrentTime().getTime()));
sourceContext.collect(urlInfo);
});
}
注意其中增加的兩行代碼笑诅,timestamp和watermark都是針對(duì)每個(gè)元素的。
通過(guò)Timestamp Assigners / Watermark Generators指定
Timestamp assigners take a stream and produce a new stream with timestamped elements and watermarks. If the original stream had timestamps and/or watermarks already, the timestamp assigner overwrites them.
Timestamp assigners are usually specified immediately after the data source, but it is not strictly required to do so. A common pattern, for example, is to parse (MapFunction) and filter (FilterFunction) before the timestamp assigner. In any case, the timestamp assigner needs to be specified before the first operation on event time (such as the first window operation). As a special case, when using Kafka as the source of a streaming job, Flink allows the specification of a timestamp assigner / watermark emitter inside the source (or consumer) itself. More information on how to do so can be found in the Kafka Connector documentation.
Timestamp Assigner 允許輸入一個(gè)stream疮鲫,輸出一個(gè)帶timestamp吆你、watermark的元素組成的流。如果流之前已經(jīng)有了timestamp俊犯、watermark妇多,則會(huì)被覆蓋。
Timestamp Assigner 一般會(huì)立即在datasoure初始化之后馬上指定燕侠,不過(guò)卻并不一定非要這么做者祖。一個(gè)通用的模式是在parse、filter之后绢彤,指定timestamp assigner咸包;不過(guò)在任何第一次需要對(duì)event time操作之前,必須指定timestamp assigner杖虾。
先看一個(gè)例子:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
MysqlDSWithSpringForFlink streamSource = new MysqlDSWithSpringForFlink();
DataStreamSource dataStreamSource = env.addSource(streamSource);//addSink(new PrintSinkFunction<>());
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
SingleOutputStreamOperator<UrlInfo> withTimestampAndWatermarkStream =
dataStreamSource.filter((FilterFunction<UrlInfo>) o -> {
if (o.getDomain() == UrlInfo.BAIDU) {
return true;
}
return false;
}).assignTimestampsAndWatermarks(new MyTimestampAndWatermarkAssigner());
dataStreamSource.addSink(new PrintSinkFunction());
env.execute("mysql Datasource with pool and spring");
}
可以看到烂瘫,這里在filter之后做了一個(gè)assignTimestampAndWatermarks的操作。
With Periodic Watermarks--周期性的添加watermark
AssignerWithPeriodicWatermarks
assigns timestamps and generates watermarks periodically (possibly depending on the stream elements, or purely based on processing time).The interval (every n milliseconds) in which the watermark will be generated is defined via
ExecutionConfig.setAutoWatermarkInterval(...)
. The assigner’sgetCurrentWatermark()
method will be called each time, and a new watermark will be emitted if the returned watermark is non-null and larger than the previous watermark.
如果需要周期性的生成watermark,而不是每次都生成坟比,就需要調(diào)用方法AssignerWithPeriodicWatermarks芦鳍,時(shí)間間隔以milliseconds為單位,需要在ExecutionConfig.setAutoWatermarkInterval方法中設(shè)置葛账。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
MysqlDSWithSpringForFlink streamSource = new MysqlDSWithSpringForFlink();
DataStreamSource dataStreamSource = env.addSource(streamSource);//addSink(new PrintSinkFunction<>());
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 設(shè)定watermark間隔時(shí)間
ExecutionConfig config = env.getConfig();
config.setAutoWatermarkInterval(300);
SingleOutputStreamOperator<UrlInfo> withTimestampAndWatermarkStream =
dataStreamSource.filter((FilterFunction<UrlInfo>) o -> {
if (o.getDomain() == UrlInfo.BAIDU) {
return true;
}
return false;
}).assignTimestampsAndWatermarks(new TimeLagWatermarkGenerator());
dataStreamSource.addSink(new PrintSinkFunction());
env.execute("mysql Datasource with pool and spring");
}
可以看到柠衅,這里通過(guò)ExecuteConfig設(shè)置了watermark生成的間隔時(shí)間,同時(shí)在filter之后加入了TimeLagWatermarkGenerator籍琳,其代碼如下(來(lái)源于官網(wǎng)菲宴,稍有修改):
/**
* This generator generates watermarks that are lagging behind processing time by a fixed amount.
* It assumes that elements arrive in Flink after a bounded delay.
*/
public class TimeLagWatermarkGenerator implements AssignerWithPeriodicWatermarks<UrlInfo> {
private final long maxTimeLag = 5000; // 5 seconds
@Override
public long extractTimestamp(UrlInfo element, long previousElementTimestamp) {
return element.getCurrentTime().getTime();
}
@Override
public Watermark getCurrentWatermark() {
// return the watermark as current time minus the maximum time lag
return new Watermark(System.currentTimeMillis() - maxTimeLag);
}
}
With Punctuated(不時(shí)打斷) Watermarks
To generate watermarks whenever a certain event indicates that a new watermark might be generated, use
AssignerWithPunctuatedWatermarks
. For this class Flink will first call theextractTimestamp(...)
method to assign the element a timestamp, and then immediately call thecheckAndGetNextWatermark(...)
method on that element.The
checkAndGetNextWatermark(...)
method is passed the timestamp that was assigned in theextractTimestamp(...)
method, and can decide whether it wants to generate a watermark. Whenever thecheckAndGetNextWatermark(...)
method returns a non-null watermark, and that watermark is larger than the latest previous watermark, that new watermark will be emitted.
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
MysqlDSWithSpringForFlink streamSource = new MysqlDSWithSpringForFlink();
DataStreamSource dataStreamSource = env.addSource(streamSource);//addSink(new PrintSinkFunction<>());
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
SingleOutputStreamOperator<UrlInfo> withTimestampAndWatermarkStream =
dataStreamSource.filter((FilterFunction<UrlInfo>) o -> {
if (o.getDomain() == UrlInfo.BAIDU) {
return true;
}
return false;
}).assignTimestampsAndWatermarks(new PunctuatedAssigner());
dataStreamSource.addSink(new PrintSinkFunction());
env.execute("mysql Datasource with pool and spring");
}
import myflink.model.UrlInfo;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
public class PunctuatedAssigner implements AssignerWithPunctuatedWatermarks<UrlInfo> {
@Override
public long extractTimestamp(UrlInfo element, long previousElementTimestamp) {
return element.getCurrentTime().getTime();
}
@Override
public Watermark checkAndGetNextWatermark(UrlInfo lastElement, long extractedTimestamp) {
/**
* Creates a new watermark with the given timestamp in milliseconds.
*/
return lastElement.hasWatermarkMarker() ? new Watermark(extractedTimestamp) : null;
}
}
kafka相關(guān)
When using Apache Kafka as a data source, each Kafka partition may have a simple event time pattern (ascending timestamps or bounded out-of-orderness). However, when consuming streams from Kafka, multiple partitions often get consumed in parallel, interleaving the events from the partitions and destroying the per-partition patterns (this is inherent in how Kafka’s consumer clients work).
In that case, you can use Flink’s Kafka-partition-aware watermark generation. Using that feature, watermarks are generated inside the Kafka consumer, per Kafka partition, and the per-partition watermarks are merged in the same way as watermarks are merged on stream shuffles.
For example, if event timestamps are strictly ascending per Kafka partition, generating per-partition watermarks with the ascending timestamps watermark generator will result in perfect overall watermarks.
The illustrations below show how to use the per-Kafka-partition watermark generation, and how watermarks propagate through the streaming dataflow in that case.
由于kafka有多個(gè)partition,每個(gè)kafka partition中可能都有自己的event time規(guī)則趋急,而在消費(fèi)端喝峦,多個(gè)partition中的數(shù)據(jù)是并行處理的,來(lái)自于不同partition的數(shù)據(jù)其event time規(guī)則不同呜达,所以就破壞掉了event time的生成規(guī)則谣蠢。
在這種情況下,可以使用flink的Kafka-partition-aware watermark生成查近,如下代碼:
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("zookeeper.connect", "localhost:2181");
properties.put("group.id", "metric-group");
properties.put("auto.offset.reset", "latest");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
SingleOutputStreamOperator<UrlInfo> dataStreamSource = env.addSource(
new FlinkKafkaConsumer010<String>(
"testjin",// topic
new SimpleStringSchema(),
properties
)
).setParallelism(1)
// map操作眉踱,轉(zhuǎn)換,從一個(gè)數(shù)據(jù)流轉(zhuǎn)換成另一個(gè)數(shù)據(jù)流霜威,這里是從string-->UrlInfo
.map(string -> JSON.parseObject(string, UrlInfo.class));
dataStreamSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UrlInfo>(){
@Override
public long extractAscendingTimestamp(UrlInfo element) {
return element.getCurrentTime().getTime();
}
});
env.execute("save url to db");
}
注意使用的是AscendingTimestampExtractor谈喳,也就是一個(gè)升序的timestamp 指派器。
參考資料:
http://www.54tianzhisheng.cn/2018/12/11/Flink-time/
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_timestamps_watermarks.html