1.應(yīng)用的一致性保障
Fink的檢查點和恢復(fù)機制和可以重置讀位置的source連接器結(jié)合使用娩鹉,可以保證應(yīng)用程序不會丟失任何數(shù)據(jù)攻谁。盡管如此,應(yīng)用程序可能會發(fā)出兩次計算結(jié)果弯予,因為從上一次檢查點恢復(fù)的應(yīng)用程序所計算的結(jié)果將會被重新發(fā)送一次(一些結(jié)果已經(jīng)發(fā)送出去了戚宦,這時任務(wù)故障,然后從上一次檢查點恢復(fù)锈嫩,這些結(jié)果將被重新計算一次然后發(fā)送出去)受楼。所以垦搬,可重置讀位置的source和Flink的恢復(fù)機制不足以提供端到端的恰好處理一次語義,即使應(yīng)用程序的狀態(tài)是恰好處理一次一致性級別艳汽。
(精準一次性)
冪等性寫
一個冪等操作無論執(zhí)行多少次都會返回同樣的結(jié)果猴贰。例如,重復(fù)的向hashmap中插入同樣的key-value對就是冪等操作河狐,因為頭一次插入操作之后所有的插入操作都不會改變這個hashmap米绕,因為hashmap已經(jīng)包含這個key-value對了。另一方面馋艺,append操作就不是冪等操作了栅干,因為多次append同一個元素將會導(dǎo)致列表每次都會添加一個元素。在流處理程序中捐祠,冪等寫入操作是很有意思的碱鳞,因為冪等寫入操作可以執(zhí)行多次但不改變結(jié)果。所以它們可以在某種程度上緩和Flink檢查點機制帶來的重播計算結(jié)果的效應(yīng)雏赦。
需要注意的是劫笙,依賴于冪等性sink來達到exactly-once語義的應(yīng)用程序,必須保證在從檢查點恢復(fù)以后星岗,它將會覆蓋之前已經(jīng)寫入的結(jié)果填大。例如,一個包含有sink操作的應(yīng)用在sink到一個key-value存儲時必須保證它能夠確定的計算出將要更新的key值俏橘。同時允华,從Flink程序sink到的key-value存儲中讀取數(shù)據(jù)的應(yīng)用,在Flink從檢查點恢復(fù)的過程中寥掐,可能會看到不想看到的結(jié)果靴寂。當重播開始時,之前已經(jīng)發(fā)出的計算結(jié)果可能會被更早的結(jié)果所覆蓋(因為在恢復(fù)過程中)召耘。所以百炬,一個消費Flink程序輸出數(shù)據(jù)的應(yīng)用,可能會觀察到時間回退污它,例如讀到了比之前小的計數(shù)剖踊。也就是說,當流處理程序處于恢復(fù)過程中時衫贬,流處理程序的結(jié)果將處于不穩(wěn)定的狀態(tài)德澈,因為一些結(jié)果被覆蓋掉,而另一些結(jié)果還沒有被覆蓋固惯。一旦重播完成梆造,也就是說應(yīng)用程序已經(jīng)通過了之前出故障的點,結(jié)果將會繼續(xù)保持一致性葬毫。
事務(wù)性寫
第二種實現(xiàn)端到端的恰好處理一次一致性語義的方法基于事務(wù)性寫入镇辉。其思想是只將最近一次成功保存的檢查點之前的計算結(jié)果寫入到外部系統(tǒng)中去屡穗。這樣就保證了在任務(wù)故障的情況下,端到端恰好處理一次語義忽肛。應(yīng)用將被重置到最近一次的檢查點鸡捐,而在這個檢查點之后并沒有向外部系統(tǒng)發(fā)出任何計算結(jié)果。通過只有當檢查點保存完成以后再寫入數(shù)據(jù)這種方法麻裁,事務(wù)性的方法將不會遭受冪等性寫入所遭受的重播不一致的問題。盡管如此源祈,事務(wù)性寫入?yún)s帶來了延遲煎源,因為只有在檢查點完成以后,我們才能看到計算結(jié)果香缺。
Flink提供了兩種構(gòu)建模塊來實現(xiàn)事務(wù)性sink連接器:write-ahead-log(WAL手销,預(yù)寫式日志)sink和兩階段提交sink。WAL式sink將會把所有計算結(jié)果寫入到應(yīng)用程序的狀態(tài)中图张,等接到檢查點完成的通知锋拖,才會將計算結(jié)果發(fā)送到sink系統(tǒng)。因為sink操作會把數(shù)據(jù)都緩存在狀態(tài)后段祸轮,所以WAL可以使用在任何外部sink系統(tǒng)上兽埃。盡管如此爪喘,WAL還是無法提供刀槍不入的恰好處理一次語義的保證惊奇,再加上由于要緩存數(shù)據(jù)帶來的狀態(tài)后段的狀態(tài)大小的問題摧莽,WAL模型并不十分完美委可。
與之形成對比的价卤,2PC sink需要sink系統(tǒng)提供事務(wù)的支持或者可以模擬出事務(wù)特性的模塊礼华。對于每一個檢查點砚殿,sink開始一個事務(wù)丙号,然后將所有的接收到的數(shù)據(jù)都添加到事務(wù)中疫萤,并將這些數(shù)據(jù)寫入到sink系統(tǒng)颂跨,但并沒有提交(commit)它們。當事務(wù)接收到檢查點完成的通知時扯饶,事務(wù)將被commit恒削,數(shù)據(jù)將被真正的寫入sink系統(tǒng)。這項機制主要依賴于一次sink可以在檢查點完成之前開始事務(wù)帝际,并在應(yīng)用程序從一次故障中恢復(fù)以后再commit的能力蔓同。
2PC協(xié)議依賴于Flink的檢查點機制。檢查點屏障是開始一個新的事務(wù)的通知蹲诀,所有操作符自己的檢查點成功的通知是它們可以commit的投票斑粱,而作業(yè)管理器通知一個檢查點成功的消息是commit事務(wù)的指令。于WAL sink形成對比的是脯爪,2PC sinks依賴于sink系統(tǒng)和sink本身的實現(xiàn)可以實現(xiàn)恰好處理一次語義则北。更多的矿微,2PC sink不斷的將數(shù)據(jù)寫入到sink系統(tǒng)中,而WAL寫模型就會有之前所述的問題尚揣。
2.內(nèi)置連接器
Apache Kafka Source連接器
Kafka將事件流組織為所謂的topics涌矢。一個主題就是一個事件日志系統(tǒng),Kafka可以保證主題中的數(shù)據(jù)在被讀取時和這些數(shù)據(jù)在被寫入時相同的順序快骗。為了擴大讀寫的規(guī)模娜庇,主題可以分裂為多個分區(qū),這些分區(qū)分布在一個集群上面方篮。這時名秀,讀寫順序的保證就限制到了分區(qū)這個粒度, Kafka并沒有提供從不同分區(qū)讀取數(shù)據(jù)時的順序保證藕溅。Kafka分區(qū)的讀位置稱為偏移量(offset)匕得。
Kafka的依賴引入如下:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version></version>
</dependency>
kafka連接器使用代碼
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "test")
val stream: DataStream[String] = env.addSource(
new FlinkKafkaConsumer[String](
"topic",
new SimpleStringSchema(),
properties))
- 第一個參數(shù)定義了從哪些topic中讀取數(shù)據(jù),可以是一個topic巾表,也可以是topic列表汁掠,還可以是匹配所有想要讀取的topic的正則表達式。當從多個topic中讀取數(shù)據(jù)時集币,Kafka連接器將會處理所有topic的分區(qū)考阱,將這些分區(qū)的數(shù)據(jù)放到一條流中去。
- 第二個參數(shù)是一個DeserializationSchema或者KeyedDeserializationSchema惠猿。Kafka消息被存儲為原始的字節(jié)數(shù)據(jù)羔砾,所以需要反序列化成Java或者Scala對象。上例中使用的SimpleStringSchema偶妖,是一個內(nèi)置的DeserializationSchema姜凄,它僅僅是簡單的將字節(jié)數(shù)組反序列化成字符串。DeserializationSchema和KeyedDeserializationSchema是公共的接口趾访,所以我們可以自定義反序列化邏輯态秧。
- 第三個參數(shù)是一個Properties對象,設(shè)置了用來讀寫的Kafka客戶端的一些屬性扼鞋。
為了抽取事件時間的時間戳然后產(chǎn)生水印申鱼,我們可以通過調(diào)用
FlinkKafkaConsumer.assignTimestampsAndWatermark()
Apache Kafka Sink連接器
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version></version>
</dependency>
Kakfa Sink的at-least-once保證
Flink的Kafka sink提供了基于配置的一致性保證。Kafka sink使用下面的條件提供了至少處理一次保證:
Flink檢查點機制開啟云头,所有的數(shù)據(jù)源都是可重置的捐友。
當寫入失敗時,sink連接器將會拋出異常溃槐,使得應(yīng)用程序掛掉然后重啟匣砖。這是默認行為。應(yīng)用程序內(nèi)部的Kafka客戶端還可以配置為重試寫入,只要提前聲明當寫入失敗時猴鲫,重試幾次這樣的屬性(retries property)对人。
sink連接器在完成它的檢查點之前會等待Kafka發(fā)送已經(jīng)將數(shù)據(jù)寫入的通知。
Kafka Sink的恰好處理一次語義保證
Kafka 0.11版本引入了事務(wù)寫特性拂共。由于這個新特性牺弄,F(xiàn)link Kafka sink可以為輸出結(jié)果提供恰好處理一次語義的一致性保證,只要經(jīng)過合適的配置就行宜狐。Flink程序必須開啟檢查點機制势告,并從可重置的數(shù)據(jù)源進行消費。FlinkKafkaProducer還提供了包含Semantic參數(shù)的構(gòu)造器來控制sink提供的一致性保證抚恒∨嗷牛可能的取值如下:
- Semantic.NONE,不提供任何一致性保證柑爸。數(shù)據(jù)可能丟失或者被重寫多次。
- Semantic.AT_LEAST_ONCE盒音,保證無數(shù)據(jù)丟失表鳍,但可能被處理多次。這個是默認設(shè)置祥诽。
- Semantic.EXACTLY_ONCE譬圣,基于Kafka的事務(wù)性寫入特性實現(xiàn),保證每條數(shù)據(jù)恰好處理一次雄坪。
文件系統(tǒng)source連接器
val lineReader = new TextInputFormat(null)
val lineStream: DataStream[String] = env.readFile[String](
lineReader, // The FileInputFormat
"hdfs:///path/to/my/data", // The path to read
FileProcessingMode
.PROCESS_CONTINUOUSLY, // The processing mode
30000L) // The monitoring interval in ms
StreamExecutionEnvironment.readFile()接收如下參數(shù):
FileInputFormat參數(shù)厘熟,負責讀取文件中的內(nèi)容。
文件路徑维哈。如果文件路徑指向單個文件绳姨,那么將會讀取這個文件。如果路徑指向一個文件夾阔挠,F(xiàn)ileInputFormat將會掃描文件夾中所有的文件飘庄。
PROCESS_CONTINUOUSLY將會周期性的掃描文件,以便掃描到文件新的改變购撼。
30000L表示多久掃描一次監(jiān)聽的文件跪削。
FileInputFormat是一個特定的InputFormat,用來從文件系統(tǒng)中讀取文件迂求。FileInputFormat分兩步讀取文件碾盐。首先掃描文件系統(tǒng)的路徑,然后為所有匹配到的文件創(chuàng)建所謂的input splits揩局。一個input split將會定義文件上的一個范圍毫玖,一般通過讀取的開始偏移量和讀取長度來定義。在將一個大的文件分割成一堆小的splits以后,這些splits可以分發(fā)到不同的讀任務(wù)孕豹,這樣就可以并行的讀取文件了涩盾。FileInputFormat的第二步會接收一個input split,讀取被split定義的文件范圍励背,然后返回對應(yīng)的數(shù)據(jù)春霍。
DataStream應(yīng)用中使用的FileInputFormat需要實現(xiàn)CheckpointableInputFormat接口。這個接口定義了方法來做檢查點和重置文件片段的當前的讀取位置叶眉。
在Flink 1.7中址儒,F(xiàn)link提供了一些類,這些類繼承了FileInputFormat衅疙,并實現(xiàn)了CheckpointableInputFormat接口莲趣。TextInputFormat一行一行的讀取文件,而CsvInputFormat使用逗號分隔符來讀取文件饱溢。
文件系統(tǒng)sink連接器
在將流處理應(yīng)用配置成exactly-once檢查點機制喧伞,以及配置成所有源數(shù)據(jù)都能在故障的情況下可以重置,F(xiàn)link的StreamingFileSink提供了端到端的恰好處理一次語義保證绩郎。下面的例子展示了StreamingFileSink的使用方式潘鲫。
val input: DataStream[String] = …
val sink: StreamingFileSink[String] = StreamingFileSink
.forRowFormat(
new Path("/base/path"),
new SimpleStringEncoder[String]("UTF-8"))
.build()
input.addSink(sink)
當StreamingFileSink接到一條數(shù)據(jù),這條數(shù)據(jù)將被分配到一個桶(bucket)中肋杖。一個桶是我們配置的“/base/path”的子目錄溉仑。
Flink使用BucketAssigner來分配桶。BucketAssigner是一個公共的接口状植,為每一條數(shù)據(jù)返回一個BucketId浊竟,BucketId決定了數(shù)據(jù)被分配到哪個子目錄。如果沒有指定BucketAssigner津畸,F(xiàn)link將使用DateTimeBucketAssigner來將每條數(shù)據(jù)分配到每個一個小時所產(chǎn)生的桶中去振定,基于數(shù)據(jù)寫入的處理時間(機器時間,墻上時鐘)肉拓。
StreamingFileSink提供了exactly-once輸出的保證吩案。sink通過一個commit協(xié)議來達到恰好處理一次語義的保證。這個commit協(xié)議會將文件移動到不同的階段帝簇,有以下狀態(tài):in progress徘郭,pending,finished丧肴。這個協(xié)議基于Flink的檢查點機制残揉。當Flink決定roll a file時,這個文件將被關(guān)閉并移動到pending狀態(tài)芋浮,通過重命名文件來實現(xiàn)抱环。當下一個檢查點完成時壳快,pending文件將被移動到finished狀態(tài),同樣是通過重命名來實現(xiàn)镇草。
一旦任務(wù)故障眶痰,sink任務(wù)需要將處于in progress狀態(tài)的文件重置到上一次檢查點的寫偏移量。這個可以通過關(guān)閉當前in progress的文件梯啤,并將文件結(jié)尾無效的部分丟棄掉來實現(xiàn)竖伯。
3.實現(xiàn)自定義數(shù)據(jù)源函數(shù)
DataStream API
提供了兩個接口來實現(xiàn)source連接器:
SourceFunction
和RichSourceFunction
可以用來定義非并行的source連接器,source跑在單任務(wù)上因宇。
ParallelSourceFunction
和RichParallelSourceFunction
可以用來定義跑在并行實例上的source連接器七婴。
除了并行于非并行的區(qū)別,這兩種接口完全一樣察滑。就像process function的rich版本一樣打厘,RichSourceFunction
和RichParallelSourceFunction
的子類可以override open()
和close()
方法,也可以訪問RuntimeContext
贺辰,RuntimeContext
提供了并行任務(wù)實例的數(shù)量户盯,當前任務(wù)實例的索引,以及一些其他信息饲化。
SourceFunction
和ParallelSourceFunction
定義了兩種方法:
void run(SourceContext ctx)
cancel()
run()
方法用來讀取或者接收數(shù)據(jù)然后將數(shù)據(jù)攝入到Flink應(yīng)用中先舷。根據(jù)接收數(shù)據(jù)的系統(tǒng),數(shù)據(jù)可能是推送的也可能是拉取的滓侍。Flink僅僅在特定的線程調(diào)用run()方法一次,通常情況下會是一個無限循環(huán)來讀取或者接收數(shù)據(jù)并發(fā)送數(shù)據(jù)牲芋。任務(wù)可以在某個時間點被顯式的取消撩笆,或者由于流是有限流,當數(shù)據(jù)被消費完畢時缸浦,任務(wù)也會停止夕冲。
當應(yīng)用被取消或者關(guān)閉時,cancel()
方法會被Flink調(diào)用裂逐。為了優(yōu)雅的關(guān)閉Flink應(yīng)用歹鱼,run()
方法需要在cancel()
被調(diào)用以后,立即終止執(zhí)行卜高。下面的例子顯示了一個簡單的源函數(shù)的例子:從0數(shù)到Long.MaxValue
弥姻。
class CountSource extends SourceFunction[Long] {
var isRunning: Boolean = true
override def run(ctx: SourceFunction.SourceContext[Long]) = {
var cnt: Long = -1
while (isRunning && cnt < Long.MaxValue) {
cnt += 1
ctx.collect(cnt)
}
}
override def cancel() = isRunning = false
}
可重置的源函數(shù)
支持重播輸出的源函數(shù)需要和Flink的檢查點機制集成起來,還需要在檢查點被處理時掺涛,持久化當前所有的讀取位置庭敦。當應(yīng)用從一個保存點(savepoint)恢復(fù)或者從故障恢復(fù)時,F(xiàn)link會從最近一次的檢查點或者保存點中獲取讀偏移量薪缆。如果程序開始時并不存在狀態(tài)秧廉,那么讀偏移量將會被設(shè)置到一個默認值。一個可重置的源函數(shù)需要實現(xiàn)CheckpointedFunction接口,還需要能夠存儲讀偏移量和相關(guān)的元數(shù)據(jù)疼电,例如文件的路徑嚼锄,分區(qū)的ID。這些數(shù)據(jù)將被保存在list state或者union list state中
class ResettableCountSource
extends SourceFunction[Long] with CheckpointedFunction {
var isRunning: Boolean = true
var cnt: Long = _
var offsetState: ListState[Long] = _
override def run(ctx: SourceFunction.SourceContext[Long]) = {
while (isRunning && cnt < Long.MaxValue) {
// synchronize data emission and checkpoints
ctx.getCheckpointLock.synchronized {
cnt += 1
ctx.collect(cnt)
}
}
}
override def cancel() = isRunning = false
override def snapshotState(
snapshotCtx: FunctionSnapshotContext
): Unit = {
// remove previous cnt
offsetState.clear()
// add current cnt
offsetState.add(cnt)
}
override def initializeState(
initCtx: FunctionInitializationContext): Unit = {
val desc = new ListStateDescriptor[Long](
"offset", classOf[Long])
offsetState = initCtx
.getOperatorStateStore
.getListState(desc)
// initialize cnt variable
val it = offsetState.get()
cnt = if (null == it || !it.iterator().hasNext) {
-1L
} else {
it.iterator().next()
}
}
}
4.實現(xiàn)自定義數(shù)據(jù)匯函數(shù)
DataStream API中蔽豺,任何運算符或者函數(shù)都可以向外部系統(tǒng)發(fā)送數(shù)據(jù)区丑。DataStream不需要最終流向sink運算符。例如茫虽,我們可能實現(xiàn)了一個FlatMapFunction
刊苍,這個函數(shù)將每一個接收到的數(shù)據(jù)通過HTTP POST
請求發(fā)送出去,而不使用Collecto
r發(fā)送到下一個運算符濒析。DataStream API也提供了SinkFunction
接口以及對應(yīng)的rich版本RichSinkFunction
抽象類正什。SinkFunction
接口提供了一個方法:
void invode(IN value, Context ctx)
SinkFunction的Context可以訪問當前處理時間,當前水位線号杏,以及數(shù)據(jù)的時間戳婴氮。
下面的例子展示了一個簡單的SinkFunction,可以將傳感器讀數(shù)寫入到socket中去盾致。需要注意的是主经,我們需要在啟動Flink程序前啟動一個監(jiān)聽相關(guān)端口的進程。否則將會拋出ConnectException異常庭惜≌肿ぃ可以運行“nc -l localhost 9191”命令。
val readings: DataStream[SensorReading] = ...
// write the sensor readings to a socket
readings.addSink(new SimpleSocketSink("localhost", 9191))
// set parallelism to 1 because only one thread can write to a socket
.setParallelism(1)
// -----
class SimpleSocketSink(val host: String, val port: Int)
extends RichSinkFunction[SensorReading] {
var socket: Socket = _
var writer: PrintStream = _
override def open(config: Configuration): Unit = {
// open socket and writer
socket = new Socket(InetAddress.getByName(host), port)
writer = new PrintStream(socket.getOutputStream)
}
override def invoke(
value: SensorReading,
ctx: SinkFunction.Context[_]): Unit = {
// write sensor reading to socket
writer.println(value.toString)
writer.flush()
}
override def close(): Unit = {
// close writer and socket
writer.close()
socket.close()
}
}
端到端的一致性保證建立在sink連接器的屬性上面护赊。為了達到端到端的恰好處理一次語義的目的惠遏,應(yīng)用程序需要冪等性的sink連接器或者事務(wù)性的sink連接器。上面例子中的SinkFunction既不是冪等寫入也不是事務(wù)性的寫入骏啰。由于socket具有只能添加(append-only)這樣的屬性节吮,所以不可能實現(xiàn)冪等性的寫入。又因為socket不具備內(nèi)置的事務(wù)支持判耕,所以事務(wù)性寫入就只能使用Flink的WAL sink特性來實現(xiàn)了透绩。
冪等sink連接器
對于大多數(shù)應(yīng)用,SinkFunction接口足以實現(xiàn)一個冪等性寫入的sink連接器了壁熄。需要以下兩個條件:
- 結(jié)果數(shù)據(jù)必須具有確定性的key帚豪,在這個key上面冪等性更新才能實現(xiàn)。例如一個計算每分鐘每個傳感器的平均溫度值的程序草丧,確定性的key值可以是傳感器的ID和每分鐘的時間戳志鞍。確定性的key值,對于在故障恢復(fù)的場景下方仿,能夠正確的覆蓋結(jié)果非常的重要固棚。
- 外部系統(tǒng)支持針對每個key的更新统翩,例如關(guān)系型數(shù)據(jù)庫或者key-value存儲。
val readings: DataStream[SensorReading] = ...
// write the sensor readings to a Derby table
readings.addSink(new DerbyUpsertSink)
// -----
class DerbyUpsertSink extends RichSinkFunction[SensorReading] {
var conn: Connection = _
var insertStmt: PreparedStatement = _
var updateStmt: PreparedStatement = _
override def open(parameters: Configuration): Unit = {
// connect to embedded in-memory Derby
conn = DriverManager.getConnection(
"jdbc:derby:memory:flinkExample",
new Properties())
// prepare insert and update statements
insertStmt = conn.prepareStatement(
"INSERT INTO Temperatures (sensor, temp) VALUES (?, ?)")
updateStmt = conn.prepareStatement(
"UPDATE Temperatures SET temp = ? WHERE sensor = ?")
}
override def invoke(SensorReading r, context: Context[_]): Unit = {
// set parameters for update statement and execute it
updateStmt.setDouble(1, r.temperature)
updateStmt.setString(2, r.id)
updateStmt.execute()
// execute insert statement
// if update statement did not update any row
if (updateStmt.getUpdateCount == 0) {
// set parameters for insert statement
insertStmt.setString(1, r.id)
insertStmt.setDouble(2, r.temperature)
// execute insert statement
insertStmt.execute()
}
}
override def close(): Unit = {
insertStmt.close()
updateStmt.close()
conn.close()
}
}
事務(wù)性sink連接器
事務(wù)寫入sink連接器需要和Flink的檢查點機制集成此洲,因為只有在檢查點成功完成以后厂汗,事務(wù)寫入sink連接器才會向外部系統(tǒng)commit數(shù)據(jù)。
為了簡化事務(wù)性sink的實現(xiàn)呜师,F(xiàn)link提供了兩個模版用來實現(xiàn)自定義sink運算符娶桦。這兩個模版都實現(xiàn)了CheckpointListener接口。CheckpointListener接口將會從作業(yè)管理器接收到檢查點完成的通知汁汗。
GenericWriteAheadSink模版會收集檢查點之前的所有的數(shù)據(jù)衷畦,并將數(shù)據(jù)存儲到sink任務(wù)的運算符狀態(tài)中。狀態(tài)保存到了檢查點中知牌,并在任務(wù)故障的情況下恢復(fù)祈争。當任務(wù)接收到檢查點完成的通知時,任務(wù)會將所有的數(shù)據(jù)寫入到外部系統(tǒng)中角寸。
TwoPhaseCommitSinkFunction模版利用了外部系統(tǒng)的事務(wù)特性菩混。對于每一個檢查點,任務(wù)首先開始一個新的事務(wù)扁藕,并將接下來所有的數(shù)據(jù)都寫到外部系統(tǒng)的當前事務(wù)上下文中去沮峡。當任務(wù)接收到檢查點完成的通知時,sink連接器將會commit這個事務(wù)亿柑。
GENERICWRITEAHEADSINK
GenericWriteAheadSink使得sink運算符可以很方便的實現(xiàn)邢疙。這個運算符和Flink的檢查點機制集成使用,目標是將每一條數(shù)據(jù)恰好一次寫入到外部系統(tǒng)中去望薄。需要注意的是疟游,在發(fā)生故障的情況下,write-ahead log sink可能會不止一次的發(fā)送相同的數(shù)據(jù)式矫。所以GenericWriteAheadSink無法提供完美無缺的恰好處理一次語義的一致性保證,而是僅能提供at-least-once這樣的保證役耕。我們接下來詳細的討論這些場景采转。
GenericWriteAheadSink的原理是將接收到的所有數(shù)據(jù)都追加到有檢查點分割好的預(yù)寫式日志中去。每當sink運算符碰到檢查點屏障瞬痘,運算符將會開辟一個新的section故慈,并將接下來的所有數(shù)據(jù)都追加到新的section中去。WAL(預(yù)寫式日志)將會保存到運算符狀態(tài)中框全。由于log能被恢復(fù)察绷,所有不會有數(shù)據(jù)丟失。
當GenericWriteAheadSink接收到檢查點完成的通知時津辩,將會發(fā)送對應(yīng)檢查點的WAL中存儲的所有數(shù)據(jù)拆撼。當所有數(shù)據(jù)發(fā)送成功容劳,對應(yīng)的檢查點必須在內(nèi)部提交。
檢查點的提交分兩步闸度。第一步竭贩,sink持久化檢查點被提交的信息。第二步莺禁,刪除WAL中所有的數(shù)據(jù)留量。我們不能將commit信息保存在Flink應(yīng)用程序狀態(tài)中,因為狀態(tài)不是持久化的哟冬,會在故障恢復(fù)時重置狀態(tài)楼熄。相反,GenericWriteAheadSink依賴于可插拔的組件在一個外部持久化存儲中存儲和查找提交信息浩峡。這個組件就是CheckpointCommitter可岂。
繼承GenericWriteAheadSink的運算符需要提供三個構(gòu)造器函數(shù)。
- CheckpointCommitter
- TypeSerializer红符,用來序列化輸入數(shù)據(jù)青柄。
- 一個job ID,傳給CheckpointCommitter预侯,當應(yīng)用重啟時可以識別commit信息致开。
還有,write-ahead運算符需要實現(xiàn)一個單獨的方法:
boolean sendValues(Iterable<IN> values, long chkpntId, long timestamp)
val readings: DataStream[SensorReading] = ...
// write the sensor readings to the standard out via a write-ahead log
readings.transform(
"WriteAheadSink", new SocketWriteAheadSink)
class StdOutWriteAheadSink extends GenericWriteAheadSink[SensorReading](
// CheckpointCommitter that commits
// checkpoints to the local filesystem
new FileCheckpointCommitter(System.getProperty("java.io.tmpdir")),
// Serializer for records
createTypeInformation[SensorReading]
.createSerializer(new ExecutionConfig),
// Random JobID used by the CheckpointCommitter
UUID.randomUUID.toString) {
override def sendValues(
readings: Iterable[SensorReading],
checkpointId: Long,
timestamp: Long): Boolean = {
for (r <- readings.asScala) {
// write record to standard out
println(r)
}
true
}
}
GenericWriteAheadSink無法提供完美的exactly-once保證萎馅。有兩個故障狀況會導(dǎo)致數(shù)據(jù)可能被發(fā)送不止一次双戳。
當任務(wù)執(zhí)行sendValues()方法時,程序掛掉了糜芳。如果外部系統(tǒng)無法原子性的寫入所有數(shù)據(jù)(要么都寫入要么都不寫)飒货,一些數(shù)據(jù)可能會寫入,而另一些數(shù)據(jù)并沒有被寫入峭竣。由于checkpoint還沒有commit塘辅,所以在任務(wù)恢復(fù)的過程中一些數(shù)據(jù)可能會被再次寫入。
所有數(shù)據(jù)都寫入成功了皆撩,sendValues()方法也返回true了扣墩;但在CheckpointCommitter方法被調(diào)用之前程序掛了,或者CheckpointCommitter在commit檢查點時失敗了扛吞。那么在恢復(fù)的過程中呻惕,所有未被提交的檢查點將會被重新寫入。
TWOPHASECOMMITSINKFUNCTION
Flink提供了TwoPhaseCommitSinkFunction接口來簡化sink函數(shù)的實現(xiàn)滥比。這個接口保證了端到端的exactly-once語義亚脆。2PC sink函數(shù)是否提供這樣的一致性保證取決于我們的實現(xiàn)細節(jié)。我們需要討論一個問題:“2PC協(xié)議是否開銷太大盲泛?”
通常來講濒持,為了保證分布式系統(tǒng)的一致性键耕,2PC是一個非常昂貴的方法。盡管如此弥喉,在Flink的語境下郁竟,2PC協(xié)議針對每一個檢查點只運行一次。TwoPhaseCommitSinkFunction和WAL sink很相似由境,不同點在于前者不會將數(shù)據(jù)收集到state中棚亩,而是會寫入到外部系統(tǒng)事務(wù)的上下文中。
TwoPhaseCommitSinkFunction實現(xiàn)了以下協(xié)議虏杰。在sink任務(wù)發(fā)送出第一條數(shù)據(jù)之前讥蟆,任務(wù)將在外部系統(tǒng)中開始一個事務(wù),所有接下來的數(shù)據(jù)將被寫入這個事務(wù)的上下文中纺阔。當作業(yè)管理器初始化檢查點并將檢查點屏障插入到流中的時候瘸彤,2PC協(xié)議的投票階段開始。當運算符接收到檢查點屏障笛钝,運算符將保存它的狀態(tài)质况,當保存完成時,運算符將發(fā)送一個acknowledgement信息給作業(yè)管理器玻靡。當sink任務(wù)接收到檢查點屏障時结榄,運算符將會持久化它的狀態(tài),并準備提交當前的事務(wù)囤捻,以及acknowledge JobManager中的檢查點臼朗。發(fā)送給作業(yè)管理器的acknowledgement信息類似于2PC協(xié)議中的commit投票。sink任務(wù)還不能提交事務(wù)蝎土,因為它還沒有保證所有的任務(wù)都已經(jīng)完成了它們的檢查點操作视哑。sink任務(wù)也會為下一個檢查點屏障之前的所有數(shù)據(jù)開始一個新的事務(wù)。
當作業(yè)管理器成功接收到所有任務(wù)實例發(fā)出的檢查點操作成功的通知時誊涯,作業(yè)管理器將會把檢查點完成的通知發(fā)送給所有感興趣的任務(wù)挡毅。這里的通知對應(yīng)于2PC協(xié)議的提交命令。當sink任務(wù)接收到通知時暴构,它將commit所有處于開啟狀態(tài)的事務(wù)跪呈。一旦sink任務(wù)acknowledge了檢查點操作,它必須能夠commit對應(yīng)的事務(wù)丹壕,即使任務(wù)發(fā)生故障庆械。如果commit失敗薇溃,數(shù)據(jù)將會丟失菌赖。
讓我們總結(jié)一下外部系統(tǒng)需要滿足什么樣的要求:
- 外部系統(tǒng)必須提供事務(wù)支持,或者sink的實現(xiàn)能在外部系統(tǒng)上模擬事務(wù)功能沐序。
- 在檢查點操作期間琉用,事務(wù)必須處于open狀態(tài)堕绩,并接收這段時間數(shù)據(jù)的持續(xù)寫入。
- 事務(wù)必須等到檢查點操作完成的通知到來才可以提交邑时。在恢復(fù)周期中奴紧,可能需要一段時間等待。如果sink系統(tǒng)關(guān)閉了事務(wù)(例如超時了)晶丘,那么未被commit的數(shù)據(jù)將會丟失黍氮。
- sink必須在進程掛掉后能夠恢復(fù)事務(wù)。一些sink系統(tǒng)會提供事務(wù)ID浅浮,用來commit或者abort一個開始的事務(wù)沫浆。
- commit一個事務(wù)必須是一個冪等性操作。sink系統(tǒng)或者外部系統(tǒng)能夠觀察到事務(wù)已經(jīng)被提交滚秩,或者重復(fù)提交并沒有副作用专执。
class TransactionalFileSink(val targetPath: String, val tempPath: String)
extends TwoPhaseCommitSinkFunction[(String, Double), String, Void](
createTypeInformation[String].createSerializer(new ExecutionConfig),
createTypeInformation[Void].createSerializer(new ExecutionConfig)) {
var transactionWriter: BufferedWriter = _
// Creates a temporary file for a transaction into
// which the records are written.
override def beginTransaction(): String = {
// path of transaction file
// is built from current time and task index
val timeNow = LocalDateTime.now(ZoneId.of("UTC"))
.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)
val taskIdx = this.getRuntimeContext.getIndexOfThisSubtask
val transactionFile = s"$timeNow-$taskIdx"
// create transaction file and writer
val tFilePath = Paths.get(s"$tempPath/$transactionFile")
Files.createFile(tFilePath)
this.transactionWriter = Files.newBufferedWriter(tFilePath)
println(s"Creating Transaction File: $tFilePath")
// name of transaction file is returned to
// later identify the transaction
transactionFile
}
/** Write record into the current transaction file. */
override def invoke(
transaction: String,
value: (String, Double),
context: Context[_]): Unit = {
transactionWriter.write(value.toString)
transactionWriter.write('\n')
}
/** Flush and close the current transaction file. */
override def preCommit(transaction: String): Unit = {
transactionWriter.flush()
transactionWriter.close()
}
// Commit a transaction by moving
// the precommitted transaction file
// to the target directory.
override def commit(transaction: String): Unit = {
val tFilePath = Paths.get(s"$tempPath/$transaction")
// check if the file exists to ensure
// that the commit is idempotent
if (Files.exists(tFilePath)) {
val cFilePath = Paths.get(s"$targetPath/$transaction")
Files.move(tFilePath, cFilePath)
}
}
// Aborts a transaction by deleting the transaction file.
override def abort(transaction: String): Unit = {
val tFilePath = Paths.get(s"$tempPath/$transaction")
if (Files.exists(tFilePath)) {
Files.delete(tFilePath)
}
}
}
TwoPhaseCommitSinkFunction[IN, TXN, CONTEXT]包含如下三個范型參數(shù):
- IN表示輸入數(shù)據(jù)的類型。
- TXN定義了一個事務(wù)的標識符郁油,可以用來識別和恢復(fù)事務(wù)本股。
- CONTEXT定義了自定義的上下文。
TwoPhaseCommitSinkFunction的構(gòu)造器需要兩個TypeSerializer桐腌。一個是TXN的類型拄显,另一個是CONTEXT的類型。
最后哩掺,TwoPhaseCommitSinkFunction定義了五個需要實現(xiàn)的方法:
- beginTransaction(): TXN開始一個事務(wù)凿叠,并返回事務(wù)的標識符。
- invoke(txn: TXN, value: IN, context: Context[_]): Unit將值寫入到當前事務(wù)中嚼吞。
- preCommit(txn: TXN): Unit預(yù)提交一個事務(wù)盒件。一個預(yù)提交的事務(wù)不會接收新的寫入。
- commit(txn: TXN): Unit提交一個事務(wù)舱禽。這個操作必須是冪等的炒刁。
- abort(txn: TXN): Unit終止一個事務(wù)。
(自定義數(shù)據(jù)源和自定義數(shù)據(jù)匯沒學(xué)過,有時間看一遍)
5.異步訪問外部系統(tǒng)
AsyncFunction
https://blog.csdn.net/dajiangtai007/article/details/88743860