Flink 的 HDFS Connector

這個Connector提供了一個sink來寫分區(qū)文件到任何Hadoop FileSystem支持的任何文件系統(tǒng)中问潭,為了使用這個Connector筋岛,請將下面的依賴添加到你的工程中:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-filesystem_2.10</artifactId>
  <version>1.3.0</version>
</dependency>

注意:streaming connectors目前還不是二進(jìn)制發(fā)布包的一部分,請參考此處來了解如何在分布式執(zhí)行中關(guān)聯(lián)到這些connectors毛肋。

Flink Sink分桶

分桶行為跟寫行為都是可以配置的礁叔,這個后面我們會講到赋铝,你可以通過默認(rèn)配置來創(chuàng)建一個分桶的sink,將數(shù)據(jù)sink到以時間作為劃分的滾動文件中:
Java 代碼:

DataStream<String> input = ...;

input.addSink(new BucketingSink<String>("/base/path"));

Scala 代碼:

val input: DataStream[String] = ...

input.addSink(new BucketingSink[String]("/base/path"))

這里唯一需要參數(shù)是這些分桶所要存儲的目錄地址旋奢,sink還可以通過配置一個自定義的bucketer、writer和批大小來進(jìn)一步配置然痊。

默認(rèn)情況下分桶sink是通過元素到達(dá)的系統(tǒng)時間來進(jìn)行切分的至朗,并用"yyyy-MM-dd HH"的時間格式來命名桶,這個時間格式與當(dāng)前的系統(tǒng)時間傳入SimpleDateFormat來形成一個桶的路徑剧浸,當(dāng)遇到一個新的時間后就會創(chuàng)建一個新的桶锹引。例如:如果你有一個以分鐘作為最細(xì)粒度的模式矗钟,那么你將每分鐘獲得一個新的分桶。每個分桶本身是一個包含若干分區(qū)文件的目錄嫌变,每個并行的sink實例會創(chuàng)建它自己的分區(qū)文件吨艇,當(dāng)分區(qū)文件過大時,sink會緊接著其它分區(qū)文件創(chuàng)建一個新的分區(qū)文件腾啥。當(dāng)一個桶變成非活躍狀態(tài)時东涡,打開的文件會被刷新和關(guān)閉,當(dāng)一個桶不再被寫入時倘待,會被認(rèn)為是非活躍的疮跑。默認(rèn)情況下,sink會每分鐘檢查一遍是否非活躍凸舵,并關(guān)閉超過一分鐘沒有數(shù)據(jù)寫入的分桶祖娘,這種行為可以通過在BucketingSink
setInactiveBucketCheckInterval()setInactiveBucketThreshold()來配置。

你可以在BucketingSink中使用setBucketer()來指定一個自定義的bucketer,如果需要啊奄,bucketer可以使用元素或者元組的屬性來決定bucketer的目錄渐苏。

默認(rèn)的writer是StringWriter,這個writer會調(diào)用到達(dá)的元素的toString()方法菇夸,將數(shù)據(jù)以新的行作為劃分寫入到分區(qū)文件中琼富。你可以在BucketingSink中使用setWriter()來指定一個自定義的writer,如果你想寫到Hadoop SequenceFiles峻仇,你可以只用預(yù)定義的SequenceFileWriter公黑,這個writer還可以指定壓縮格式。

最后的配置項是批大小摄咆,這個配置指定了一個分區(qū)文件何時需要被關(guān)閉凡蚜、新的分區(qū)文件開始。(默認(rèn)的分區(qū)文件大小是384MB)
例如:
Java 代碼:

DataStream<Tuple2<IntWritable,Text>> input = ...;

BucketingSink<String> sink = new BucketingSink<String>("/base/path");
sink.setBucketer(new DateTimeBucketer<String>("yyyy-MM-dd--HHmm"));
sink.setWriter(new SequenceFileWriter<IntWritable, Text>());
sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB,

input.addSink(sink);

Scala 代碼:

val input: DataStream[Tuple2[IntWritable, Text]] = ...

val sink = new BucketingSink[String]("/base/path")
sink.setBucketer(new DateTimeBucketer[String]("yyyy-MM-dd--HHmm"))
sink.setWriter(new SequenceFileWriter[IntWritable, Text]())
sink.setBatchSize(1024 * 1024 * 400) // this is 400 MB,

input.addSink(sink)

這個例子會創(chuàng)建一個按下面的模式來寫數(shù)據(jù)到分桶文件的sink:
/base/path/{date-time}/part-{parallel-task}-{count}
這里date-time是我們從date/time模式中獲取的字符串吭从,parallel-task是并行sink實例的索引朝蜘,count是分區(qū)文件的運行編號,這個運行編號是由于分區(qū)文件的批大小導(dǎo)致的涩金。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末谱醇,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子步做,更是在濱河造成了極大的恐慌副渴,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,185評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件全度,死亡現(xiàn)場離奇詭異煮剧,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,652評論 3 393
  • 文/潘曉璐 我一進(jìn)店門勉盅,熙熙樓的掌柜王于貴愁眉苦臉地迎上來佑颇,“玉大人,你說我怎么就攤上這事草娜√粜兀” “怎么了?”我有些...
    開封第一講書人閱讀 163,524評論 0 353
  • 文/不壞的土叔 我叫張陵宰闰,是天一觀的道長茬贵。 經(jīng)常有香客問我,道長议蟆,這世上最難降的妖魔是什么闷沥? 我笑而不...
    開封第一講書人閱讀 58,339評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮咐容,結(jié)果婚禮上舆逃,老公的妹妹穿的比我還像新娘。我一直安慰自己戳粒,他們只是感情好路狮,可當(dāng)我...
    茶點故事閱讀 67,387評論 6 391
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著蔚约,像睡著了一般奄妨。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上苹祟,一...
    開封第一講書人閱讀 51,287評論 1 301
  • 那天砸抛,我揣著相機(jī)與錄音,去河邊找鬼树枫。 笑死直焙,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的砂轻。 我是一名探鬼主播奔誓,決...
    沈念sama閱讀 40,130評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼搔涝!你這毒婦竟也來了厨喂?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,985評論 0 275
  • 序言:老撾萬榮一對情侶失蹤庄呈,失蹤者是張志新(化名)和其女友劉穎蜕煌,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體诬留,經(jīng)...
    沈念sama閱讀 45,420評論 1 313
  • 正文 獨居荒郊野嶺守林人離奇死亡幌绍,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,617評論 3 334
  • 正文 我和宋清朗相戀三年颁褂,在試婚紗的時候發(fā)現(xiàn)自己被綠了故响。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片傀广。...
    茶點故事閱讀 39,779評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖彩届,靈堂內(nèi)的尸體忽然破棺而出伪冰,到底是詐尸還是另有隱情,我是刑警寧澤樟蠕,帶...
    沈念sama閱讀 35,477評論 5 345
  • 正文 年R本政府宣布贮聂,位于F島的核電站,受9級特大地震影響寨辩,放射性物質(zhì)發(fā)生泄漏吓懈。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,088評論 3 328
  • 文/蒙蒙 一靡狞、第九天 我趴在偏房一處隱蔽的房頂上張望耻警。 院中可真熱鬧,春花似錦甸怕、人聲如沸甘穿。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,716評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽温兼。三九已至,卻和暖如春武契,著一層夾襖步出監(jiān)牢的瞬間募判,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,857評論 1 269
  • 我被黑心中介騙來泰國打工咒唆, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留届垫,地道東北人。 一個月前我還...
    沈念sama閱讀 47,876評論 2 370
  • 正文 我出身青樓钧排,卻偏偏與公主長得像敦腔,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子恨溜,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,700評論 2 354

推薦閱讀更多精彩內(nèi)容