這個Connector提供了一個sink來寫分區(qū)文件到任何Hadoop FileSystem支持的任何文件系統(tǒng)中问潭,為了使用這個Connector筋岛,請將下面的依賴添加到你的工程中:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.10</artifactId>
<version>1.3.0</version>
</dependency>
注意:streaming connectors目前還不是二進(jìn)制發(fā)布包的一部分,請參考此處來了解如何在分布式執(zhí)行中關(guān)聯(lián)到這些connectors毛肋。
Flink Sink分桶
分桶行為跟寫行為都是可以配置的礁叔,這個后面我們會講到赋铝,你可以通過默認(rèn)配置來創(chuàng)建一個分桶的sink,將數(shù)據(jù)sink到以時間作為劃分的滾動文件中:
Java 代碼:
DataStream<String> input = ...;
input.addSink(new BucketingSink<String>("/base/path"));
Scala 代碼:
val input: DataStream[String] = ...
input.addSink(new BucketingSink[String]("/base/path"))
這里唯一需要參數(shù)是這些分桶所要存儲的目錄地址旋奢,sink還可以通過配置一個自定義的bucketer、writer和批大小來進(jìn)一步配置然痊。
默認(rèn)情況下分桶sink是通過元素到達(dá)的系統(tǒng)時間來進(jìn)行切分的至朗,并用"yyyy-MM-dd HH"的時間格式來命名桶,這個時間格式與當(dāng)前的系統(tǒng)時間傳入SimpleDateFormat
來形成一個桶的路徑剧浸,當(dāng)遇到一個新的時間后就會創(chuàng)建一個新的桶锹引。例如:如果你有一個以分鐘作為最細(xì)粒度的模式矗钟,那么你將每分鐘獲得一個新的分桶。每個分桶本身是一個包含若干分區(qū)文件的目錄嫌变,每個并行的sink實例會創(chuàng)建它自己的分區(qū)文件吨艇,當(dāng)分區(qū)文件過大時,sink會緊接著其它分區(qū)文件創(chuàng)建一個新的分區(qū)文件腾啥。當(dāng)一個桶變成非活躍狀態(tài)時东涡,打開的文件會被刷新和關(guān)閉,當(dāng)一個桶不再被寫入時倘待,會被認(rèn)為是非活躍的疮跑。默認(rèn)情況下,sink會每分鐘檢查一遍是否非活躍凸舵,并關(guān)閉超過一分鐘沒有數(shù)據(jù)寫入的分桶祖娘,這種行為可以通過在BucketingSink
的
setInactiveBucketCheckInterval()
和 setInactiveBucketThreshold()
來配置。
你可以在BucketingSink
中使用setBucketer()
來指定一個自定義的bucketer,如果需要啊奄,bucketer可以使用元素或者元組的屬性來決定bucketer的目錄渐苏。
默認(rèn)的writer是StringWriter
,這個writer會調(diào)用到達(dá)的元素的toString()
方法菇夸,將數(shù)據(jù)以新的行作為劃分寫入到分區(qū)文件中琼富。你可以在BucketingSink
中使用setWriter()
來指定一個自定義的writer,如果你想寫到Hadoop SequenceFiles
峻仇,你可以只用預(yù)定義的SequenceFileWriter
公黑,這個writer還可以指定壓縮格式。
最后的配置項是批大小摄咆,這個配置指定了一個分區(qū)文件何時需要被關(guān)閉凡蚜、新的分區(qū)文件開始。(默認(rèn)的分區(qū)文件大小是384MB)
例如:
Java 代碼:
DataStream<Tuple2<IntWritable,Text>> input = ...;
BucketingSink<String> sink = new BucketingSink<String>("/base/path");
sink.setBucketer(new DateTimeBucketer<String>("yyyy-MM-dd--HHmm"));
sink.setWriter(new SequenceFileWriter<IntWritable, Text>());
sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB,
input.addSink(sink);
Scala 代碼:
val input: DataStream[Tuple2[IntWritable, Text]] = ...
val sink = new BucketingSink[String]("/base/path")
sink.setBucketer(new DateTimeBucketer[String]("yyyy-MM-dd--HHmm"))
sink.setWriter(new SequenceFileWriter[IntWritable, Text]())
sink.setBatchSize(1024 * 1024 * 400) // this is 400 MB,
input.addSink(sink)
這個例子會創(chuàng)建一個按下面的模式來寫數(shù)據(jù)到分桶文件的sink:
/base/path/{date-time}/part-{parallel-task}-{count}
這里date-time
是我們從date/time模式中獲取的字符串吭从,parallel-task
是并行sink實例的索引朝蜘,count是分區(qū)文件的運行編號,這個運行編號是由于分區(qū)文件的批大小導(dǎo)致的涩金。