Flink DataStream API

Data Sources

源是程序讀取輸入數(shù)據(jù)的位置≈辉酰可以使用 StreamExecutionEnvironment.addSource(sourceFunction) 將源添加到程序。Flink 有許多預(yù)先實現(xiàn)的源函數(shù)香府,也可以通過實現(xiàn) SourceFunction 方法自定義非并行源 狈茉,或通過實現(xiàn) ParallelSourceFunction 或擴展 RichParallelSourceFunction 自定義并行源。

有幾個預(yù)定義的流數(shù)據(jù)源可從 StreamExecutionEnvironment 訪問:

基于文件:

  • readTextFile(path) 逐行讀取文本文件(文件符合 TextInputFormat 格式)劣欢,并作為字符串返回每一行。
  • readFile(fileInputFormat, path) 按指定的文件輸入格式(fileInputFormat)讀取指定路徑的文件裁良。

  • readFile(fileInputFormat, path, watchType, interval, pathFilter) 前兩個方法的內(nèi)部調(diào)用方法凿将。根據(jù)給定文件格式(fileInputFormat)讀取指定路徑的文件。根據(jù) watchType价脾,定期監(jiān)聽路徑下的新數(shù)據(jù)(FileProcessingMode.PROCESS_CONTINUOUSLY)牧抵,或者處理當前在路徑中的數(shù)據(jù)并退出(FileProcessingMode.PROCESS_ONCE)。使用 pathFilter侨把,可以進一步排除正在處理的文件犀变。

基于Socket:

  • socketTextStream 從 Socket 讀取,元素可以用分隔符分隔秋柄。

基于集合:

  • fromCollection(Seq) 用 Java.util.Collection 對象創(chuàng)建數(shù)據(jù)流获枝。集合中的所有元素必須屬于同一類型。

  • fromCollection(Iterator) 用迭代器創(chuàng)建數(shù)據(jù)流骇笔。指定迭代器返回的元素的數(shù)據(jù)類型省店。

  • fromElements(elements: _*) 從給定的對象序列創(chuàng)建數(shù)據(jù)流机隙。所有對象必須屬于同一類型。

  • fromParallelCollection(SplittableIterator) 并行地從迭代器創(chuàng)建數(shù)據(jù)流萨西。指定迭代器返回的元素的數(shù)據(jù)類型有鹿。

  • generateSequence(from, to) 并行生成給定間隔的數(shù)字序列。

自定義:

  • addSource 附加新的源函數(shù)谎脯。例如葱跋,要從 Apache Kafka 中讀取,可以使用 addSource(new FlinkKafkaConsumer08<>(...))源梭。請詳細查看 連接器娱俺。

DataStream Transformation

轉(zhuǎn)換函數(shù)

Map
DataStream -> DataStream,一個數(shù)據(jù)元生成一個新的數(shù)據(jù)元废麻。
將輸入流的元素翻倍:
dataStream.map { x => x * 2 }

FlatMap
DataStream -> DataStream荠卷,一個數(shù)據(jù)元生成多個數(shù)據(jù)元(可以為0)。將句子分割為單詞:
dataStream.flatMap { str => str.split(" ") }

Filter
DataStream -> DataStream烛愧,每個數(shù)據(jù)元執(zhí)行布爾函數(shù)油宜,只保存函數(shù)返回 true 的數(shù)據(jù)元。過濾掉零值的過濾器:
dataStream.filter { _ != 0 }

KeyBy
DataStream -> KeyedStream怜姿,將流劃分為不相交的分區(qū)慎冤。具有相同 Keys 的所有記錄在同一分區(qū)。指定 key 的取值:

dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple

Reduce
KeyedStream -> DataStream沧卢,KeyedStream 元素滾動執(zhí)行 Reduce蚁堤。將當前數(shù)據(jù)元與最新的一個 Reduce 值組合作為新值發(fā)送。創(chuàng)建 key 的值求和:

keyedStream.reduce { _ + _ }

Fold
KeyedStream -> DataStream但狭,具有初始值的 Reduce披诗。將當前數(shù)據(jù)元與最新的一個 Reduce 值組合作為新值發(fā)送。當應(yīng)用于序列(1,2,3,4,5)時立磁,發(fā)出序列"start-1"呈队,"start-1-2","start-1-2-3", ...:

keyedStream.fold("start")((str, i) => { str + "-" + i })

Aggregations
KeyedStream -> DataStream息罗,應(yīng)用于 KeyedStream 上的滾動聚合掂咒。minminBy 的區(qū)別是是 min 返回最小值才沧,minBy 具有最小值的數(shù)據(jù)元(maxmaxBy 同理):

keyedStream.sum(0)
keyedStream.sum("key")
keyedStream.min(0)
keyedStream.min("key")
keyedStream.max(0)
keyedStream.max("key")
keyedStream.minBy(0)
keyedStream.minBy("key")
keyedStream.maxBy(0)
keyedStream.maxBy("key")

Window
KeyedStream -> WindowedStream迈喉,Windows 可以在已經(jīng)分區(qū)的 KeyedStream 上定義。Windows 根據(jù)某些特征(例如温圆,在最近5秒內(nèi)到達的數(shù)據(jù))對每個Keys中的數(shù)據(jù)進行分組挨摸。更多說明參考 Windows譯版

// Last 5 seconds of data
dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))) 

WindowAll
DataStream -> AllWindowedStream岁歉,Windows 也可以在 DataStream 上定義得运。在許多情況下膝蜈,這是非并行轉(zhuǎn)換。所有記錄將收集在 windowAll 算子的一個任務(wù)中熔掺。

// Last 5 seconds of data
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) 

Window Apply
WindowedStream -> DataStream 或 AllWindowedStream -> DataStream饱搏,將函數(shù)應(yīng)用于整個窗口。一個對窗口數(shù)據(jù)求和:

windowedStream.apply { WindowFunction }

// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply { AllWindowFunction }

Window Reduce
WindowedStream -> DataStream置逻,Reduce 函數(shù)應(yīng)用于窗口并返回結(jié)果值推沸。

windowedStream.reduce { _ + _ }

Window Fold
WindowedStream -> DataStream,F(xiàn)old 函數(shù)應(yīng)用于窗口并返回結(jié)果值券坞。當函數(shù)應(yīng)用于窗口的序列(1,2,3,4,5)時鬓催,發(fā)送出 "start-1-2-3-4-5":

val result: DataStream[String] =
    windowedStream.fold("start", (str, i) => { str + "-" + i })

Aggregations on windows
WindowedStream -> DataStream,聚合窗口的內(nèi)容:

windowedStream.sum(0)
windowedStream.sum("key")
windowedStream.min(0)
windowedStream.min("key")
windowedStream.max(0)
windowedStream.max("key")
windowedStream.minBy(0)
windowedStream.minBy("key")
windowedStream.maxBy(0)
windowedStream.maxBy("key")

Union
DataStream* -> DataStream恨锚,兩個或多個數(shù)據(jù)流的合并宇驾,創(chuàng)建包含來自所有流的所有數(shù)據(jù)元的新流。如果將數(shù)據(jù)流與自身聯(lián)合猴伶,則會在結(jié)果流中獲取兩次數(shù)據(jù)元课舍。

dataStream.union(otherStream1, otherStream2, ...)

Window Join
DataStream,DataStream -> DataStream,Join 連接兩個流他挎,指定 Key 和窗口布卡。

dataStream.join(otherStream)
    .where(<key selector>).equalTo(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply { ... }

Window CoGroup
DataStream,DataStream -> DataStream,CoGroup 連接兩個流雇盖,指定 Key 和窗口忿等。

dataStream.coGroup(otherStream)
    .where(0).equalTo(1)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply {}

CoGroup 與 Join 的區(qū)別:
CoGroup 會輸出未匹配的數(shù)據(jù),Join 只輸出匹配的數(shù)據(jù)

Connect
DataStream,DataStream -> ConnectedStreams崔挖,連接兩個有各自類型的數(shù)據(jù)流贸街。允許兩個流之間的狀態(tài)共享。

someStream : DataStream[Int] = ...
otherStream : DataStream[String] = ...

val connectedStreams = someStream.connect(otherStream)

可用于數(shù)據(jù)流關(guān)聯(lián)配置流

CoMap, CoFlatMap
ConnectedStreams -> DataStream狸相,作用域連接數(shù)據(jù)流(connected data stream)上的 mapflatMap

connectedStreams.map(
    (_ : Int) => true,
    (_ : String) => false
)
connectedStreams.flatMap(
    (_ : Int) => true,
    (_ : String) => false
)

Split
DataStream -> SplitStream薛匪,將數(shù)據(jù)流拆分為兩個或更多個流。

val split = someDataStream.split(
  (num: Int) =>
    (num % 2) match {
      case 0 => List("even")
      case 1 => List("odd")
    }
)

Select
SplitStream -> DataStream脓鹃,從 SpliteStream 中選擇一個流或多個流逸尖。

val even = split select "even"
val odd = split select "odd"
val all = split.select("even","odd")

Iterate
DataStream -> IterativeStream -> DataStream,將一個算子的輸出重定向到某個先前的算子瘸右,在流中創(chuàng)建 feedback 循環(huán)娇跟。這對于定義不斷更新模型的算法特別有用。以下代碼以流開頭并連續(xù)應(yīng)用迭代體太颤。大于0的數(shù)據(jù)元將被發(fā)送回 feedback苞俘,其余數(shù)據(jù)元將向下游轉(zhuǎn)發(fā)。

initialStream.iterate {
  iteration => {
    val iterationBody = iteration.map {/*do something*/}
    (iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0))
  }
}

Extract Timestamps
DataStream -> DataStream龄章,從記錄中提取時間戳吃谣,以便使用事件時間窗口乞封。

stream.assignTimestamps (new TimeStampExtractor() {...});

Project
DataStream -> DataStream,作用于元組的轉(zhuǎn)換岗憋,從元組中選擇字段的子集肃晚。

DataStream<Tuple3<Integer, Double, String>> in = // [...]
DataStream<Tuple2<String, Integer>> out = in.project(2,0);

分區(qū)函數(shù)

Custom partitioning
DataStream -> DataStream,使用自定義的分區(qū)函數(shù)(Partitioner)為每個數(shù)據(jù)元選擇目標分區(qū)和所在任務(wù)仔戈。

dataStream.partitionCustom(partitioner, "someKey");
dataStream.partitionCustom(partitioner, 0);

Random partitioning
DataStream -> DataStream陷揪,隨機均勻分布分配數(shù)據(jù)元。

dataStream.shuffle();

Rebalancing (Round-robin partitioning)
DataStream -> DataStream杂穷,輪詢?yōu)閿?shù)據(jù)元分區(qū)悍缠,每個分區(qū)創(chuàng)建相等的負載。在存在數(shù)據(jù)偏斜時用于性能優(yōu)化耐量。

dataStream.rebalance()

Rescaling
DataStream -> DataStream飞蚓,根據(jù)上下游的分區(qū)數(shù)量,輪詢?yōu)閿?shù)據(jù)元分區(qū)廊蜒。

dataStream.rescale();

建議使用 rescale 替代 rebalance趴拧。
例如,上游是5個并發(fā)山叮,下游是10個并發(fā)著榴。當使用 Rebalance 時,上游每個并發(fā)會輪詢發(fā)給下游10個并發(fā)屁倔。當使用 Rescale 時脑又,上游每個并發(fā)只需輪詢發(fā)給下游2個并發(fā),能提高網(wǎng)絡(luò)效率锐借。
當上游的數(shù)據(jù)比較均勻時问麸,且上下游的并發(fā)數(shù)成比例時,可以使用 Rescale 替換 Rebalance钞翔。參數(shù):enable.rescale.shuffling=true严卖,默認關(guān)閉。

Broadcasting
DataStream -> DataStream布轿,向每個分區(qū)廣播數(shù)據(jù)元哮笆。

dataStream.broadcast()

Task chaining and resource groups

Chaining 兩個后續(xù)轉(zhuǎn)換意味著將它們定位在同一個線程中以獲得更好的性能。Flink 默認會鏈接一些算子(例如汰扭,連續(xù)兩個的 map 轉(zhuǎn)換)稠肘。API可以對鏈接進行細粒度控制:

使用 StreamExecutionEnvironment.disableOperatorChaining() 可以禁用整個工作的算子鏈接。對于更細粒度的控制东且,可以使用以下函數(shù)启具。(這些函數(shù)只能在 DataStream 轉(zhuǎn)換后立即使用本讥。例如珊泳,可以使用 someStream.map(...).startNewChain()鲁冯,但不能使用 someStream.startNewChain()

Resource group 是 Flink 中的一個 slot。如果需要色查,可以在單獨的 slot 中手動隔離算子薯演。

Start new chain
從這個算子開始,開始一個新的鏈秧了。兩個 map 將被鏈接跨扮,filter 將不會在鏈接當中。

someStream.filter(...).map(...).startNewChain().map(...)

Disable chaining
不要鏈接 map 算子

someStream.map(...).disableChaining()

Set slot sharing group
設(shè)置算子操作的 slot sharing验毡。將把具有相同 slot sharing 的算子操作放入同一個 slot衡创,同時保證其他 slot 中沒有 slot sharing 的算子操作【ǎ可用于隔離 slot璃氢。默認 slot sharing group 的名稱為"default",可以通過調(diào)用 slotSharingGroup("groupName") 將算子操作顯式放入此組中狮辽。

someStream.filter(...).slotSharingGroup("name")

Data Sinks

Data Sink 消費 DataStream 并轉(zhuǎn)發(fā)到文件一也,套接字,外部系統(tǒng)或打印到頁面喉脖。Flink 帶有各種內(nèi)置輸出格式椰苟,封裝在 DataStreams 上的算子操作后面:

  • writeAsText() / TextOutputFormat, 按字符串順序?qū)懭胛募Mㄟ^調(diào)用每個元素的 toString() 方法獲得字符串树叽。

  • writeAsCsv(...) / CsvOutputFormat舆蝴,將元組寫為逗號分隔的形式寫入文件。行和字段分隔符是可配置的题诵。每個字段的值來自對象的 toString() 方法须误。

  • print() / printToErr(),在標準輸出/標準錯誤流上打印每個元素的 toString() 值仇轻【┝。可以定義輸出前綴,這有助于區(qū)分不同的打印調(diào)用篷店。如果并行度大于1祭椰,輸出也包含生成輸出的任務(wù)的標識符。

  • writeUsingOutputFormat() / FileOutputFormat疲陕,自定義文件輸出的方法和基類方淤。支持自定義對象到字節(jié)的轉(zhuǎn)換。

  • writeToSocket蹄殃,將元素寫入 Socket携茂,使用 SerializationSchema 進行序列化。

  • addSink诅岩,調(diào)用自定義接收器函數(shù)讳苦。請詳細查看 連接器带膜。

DataStream 的 write*() 方法主要用于調(diào)試目的。他們沒有參與 Flink checkpoint鸳谜,這意味著這些函數(shù)通常具有至少一次的語義膝藕。刷新到目標系統(tǒng)的數(shù)據(jù)取決于 OutputFormat 的實現(xiàn),并非所有發(fā)送到 OutputFormat 的數(shù)據(jù)都會立即顯示在目標系統(tǒng)中咐扭。此外芭挽,在失敗的情況下,這些記錄可能會丟失蝗肪。

要將流可靠袜爪、準確地傳送到文件系統(tǒng),請使用 flink-connector-filesystem薛闪。通過 .addSink(...) 方法的自定義實現(xiàn)饿敲,可以實現(xiàn)在 checkpoint 中精確一次的語義。

Iterations

迭代流程序?qū)⒑瘮?shù)嵌入到 IterativeStream逛绵。由于 DataStream 程序可能永遠不會完成怀各,因此沒有最大迭代次數(shù)。相反术浪,需要指定流的哪個部分反饋到迭代瓢对,哪個部分使用 split 或 filter 轉(zhuǎn)發(fā)到下游。

下面是一個示例迭代胰苏,其中主體(重復(fù)的計算部分)是一個簡單的 map 轉(zhuǎn)換硕蛹,反饋的元素由使用過濾器向下游轉(zhuǎn)發(fā)的元素區(qū)分。

val iteratedStream = someDataStream.iterate(
  iteration => {
    val iterationBody = iteration.map(/* this is executed many times */)
    (iterationBody.filter(/* one part of the stream */), iterationBody.filter(/* some other part of the stream */))
})

例如硕并,從一系列整數(shù)中連續(xù)減去1直到它們達到零的程序:

val someIntegers: DataStream[Long] = env.generateSequence(0, 1000)

val iteratedStream = someIntegers.iterate(
  iteration => {
    val minusOne = iteration.map( v => v - 1)
    val stillGreaterThanZero = minusOne.filter (_ > 0)
    val lessThanZero = minusOne.filter(_ <= 0)
    (stillGreaterThanZero, lessThanZero)
  }
)

Reference:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/datastream_api.html

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末法焰,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子倔毙,更是在濱河造成了極大的恐慌埃仪,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,544評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件陕赃,死亡現(xiàn)場離奇詭異卵蛉,居然都是意外死亡,警方通過查閱死者的電腦和手機么库,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,430評論 3 392
  • 文/潘曉璐 我一進店門傻丝,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人诉儒,你說我怎么就攤上這事葡缰。” “怎么了?”我有些...
    開封第一講書人閱讀 162,764評論 0 353
  • 文/不壞的土叔 我叫張陵泛释,是天一觀的道長滤愕。 經(jīng)常有香客問我,道長胁澳,這世上最難降的妖魔是什么该互? 我笑而不...
    開封第一講書人閱讀 58,193評論 1 292
  • 正文 為了忘掉前任米者,我火速辦了婚禮韭畸,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘蔓搞。我一直安慰自己胰丁,他們只是感情好,可當我...
    茶點故事閱讀 67,216評論 6 388
  • 文/花漫 我一把揭開白布喂分。 她就那樣靜靜地躺著锦庸,像睡著了一般。 火紅的嫁衣襯著肌膚如雪蒲祈。 梳的紋絲不亂的頭發(fā)上甘萧,一...
    開封第一講書人閱讀 51,182評論 1 299
  • 那天,我揣著相機與錄音梆掸,去河邊找鬼扬卷。 笑死,一個胖子當著我的面吹牛酸钦,可吹牛的內(nèi)容都是我干的怪得。 我是一名探鬼主播,決...
    沈念sama閱讀 40,063評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼卑硫,長吁一口氣:“原來是場噩夢啊……” “哼徒恋!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起欢伏,我...
    開封第一講書人閱讀 38,917評論 0 274
  • 序言:老撾萬榮一對情侶失蹤入挣,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后硝拧,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體财岔,經(jīng)...
    沈念sama閱讀 45,329評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,543評論 2 332
  • 正文 我和宋清朗相戀三年河爹,在試婚紗的時候發(fā)現(xiàn)自己被綠了匠璧。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,722評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡咸这,死狀恐怖夷恍,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情,我是刑警寧澤酿雪,帶...
    沈念sama閱讀 35,425評論 5 343
  • 正文 年R本政府宣布遏暴,位于F島的核電站,受9級特大地震影響指黎,放射性物質(zhì)發(fā)生泄漏朋凉。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,019評論 3 326
  • 文/蒙蒙 一醋安、第九天 我趴在偏房一處隱蔽的房頂上張望杂彭。 院中可真熱鬧,春花似錦吓揪、人聲如沸亲怠。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,671評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽团秽。三九已至,卻和暖如春叭首,著一層夾襖步出監(jiān)牢的瞬間习勤,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,825評論 1 269
  • 我被黑心中介騙來泰國打工焙格, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留图毕,地道東北人。 一個月前我還...
    沈念sama閱讀 47,729評論 2 368
  • 正文 我出身青樓间螟,卻偏偏與公主長得像吴旋,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子厢破,可洞房花燭夜當晚...
    茶點故事閱讀 44,614評論 2 353

推薦閱讀更多精彩內(nèi)容