前言
前段時間我們講解了flink1.11中如何將流式數(shù)據(jù)寫入文件系統(tǒng)和hive [flink 1.11 使用sql將流式數(shù)據(jù)寫入hive]衰猛,今天我們來從源碼的角度深入分析一下霞揉。以便朋友們對flink流式數(shù)據(jù)寫入hive有一個深入的了解,以及在出現(xiàn)問題的時候知道該怎么調(diào)試童番。
其實我們可以想一下這個工作大概是什么流程芬失,首先要寫入hive故河,我們首先要從hive的元數(shù)據(jù)里拿到相關(guān)的hive表的信息巍虫,比如存儲的路徑是哪里彭则,以便往那個目錄寫數(shù)據(jù),還有存儲的格式是什么占遥,orc還是parquet俯抖,這樣我們需要調(diào)用對應(yīng)的實現(xiàn)類來進行寫入,其次這個表是否是分區(qū)表瓦胎,寫入數(shù)據(jù)是動態(tài)分區(qū)還是靜態(tài)分區(qū)芬萍,這些都會根據(jù)場景的不同而選擇不同的寫入策略。
寫入數(shù)據(jù)的時候肯定不會把所有數(shù)據(jù)寫入一個文件搔啊,那么文件的滾動策略是什么呢担忧?寫完了數(shù)據(jù)我們?nèi)绾胃耯ive的元數(shù)據(jù)信息,以便我們可以及時讀取到相應(yīng)的數(shù)據(jù)呢坯癣?
我畫了一個簡單的流程圖,大家可以先看下最欠,接下來我們帶著這些疑問示罗,一步步的從源碼里探索這些功能是如何實現(xiàn)的。
數(shù)據(jù)流處理
我們這次主要是分析flink如何將類似kafka的流式數(shù)據(jù)寫入到hive表芝硬,我們先來一段簡單的代碼:
//構(gòu)造hive catalog
String name = "myhive";
String defaultDatabase = "default";
String hiveConfDir = "/Users/user/work/hive/conf"; // a local path
String version = "3.1.2";
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
tEnv.registerCatalog("myhive", hive);
tEnv.useCatalog("myhive");
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tEnv.useDatabase("db1");
tEnv.createTemporaryView("kafka_source_table", dataStream);
String insertSql = "insert into hive.db1.fs_table SELECT userId, amount, " +
" DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') FROM kafka_source_table";
tEnv.executeSql(insertSql);
系統(tǒng)在啟動的時候會首先解析sql蚜点,獲取相應(yīng)的屬性,然后會通過java的SPI機制加載TableFactory的所有子類拌阴,包含TableSourceFactory和TableSinkFactory绍绘,之后,會根據(jù)從sql中解析的屬性循環(huán)判斷使用哪個工廠類迟赃,具體的操作是在TableFactoryUtil類的方法里面實現(xiàn)的陪拘。
比如對于上面的sql,解析之后纤壁,發(fā)現(xiàn)是要寫入一個表名為hive.db1.fs_table的hive sink左刽。所以系統(tǒng)在調(diào)用TableFactoryUtil#findAndCreateTableSink(TableSinkFactory.Context context)方法以后,得到了TableSinkFactory的子類HiveTableFactory酌媒,然后調(diào)用相應(yīng)的createTableSink方法來創(chuàng)建相應(yīng)的sink欠痴,也就是HiveTableSink迄靠。
我們來簡單看下HiveTableSink的變量和結(jié)構(gòu)。
/**
* Table sink to write to Hive tables.
*/
public class HiveTableSink implements AppendStreamTableSink, PartitionableTableSink, OverwritableTableSink {
private static final Logger LOG = LoggerFactory.getLogger(HiveTableSink.class);
private final boolean userMrWriter;
//是否有界喇辽,用來區(qū)分是批處理還是流處理
private final boolean isBounded;
private final JobConf jobConf;
private final CatalogTable catalogTable;
private final ObjectIdentifier identifier;
private final TableSchema tableSchema;
private final String hiveVersion;
private final HiveShim hiveShim;
private LinkedHashMap<String, String> staticPartitionSpec = new LinkedHashMap<>();
private boolean overwrite = false;
private boolean dynamicGrouping = false;
我們看到它實現(xiàn)了AppendStreamTableSink, PartitionableTableSink, OverwritableTableSink三個接口掌挚,這三個接口決定了hive sink實現(xiàn)的功能,數(shù)據(jù)只能是append模式的菩咨,數(shù)據(jù)是可分區(qū)的吠式、并且數(shù)據(jù)是可以被覆蓋寫的。
類里面的這些變量旦委,看名字就大概知道是什么意思了奇徒,就不做解釋了,講一下HiveShim缨硝,我們在構(gòu)造方法里看到hiveShim是和hive 的版本有關(guān)的摩钙,所以其實這個類我們可以理解為對不同hive版本操作的一層封裝。
hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);
tablesink處理數(shù)據(jù)流的方法是consumeDataStream查辩,我們來重點分析下胖笛。
hive基本信息獲取
首先會通過hive的配置連接到hive的元數(shù)據(jù)庫,得到hive表的基本信息宜岛。
String[] partitionColumns = getPartitionKeys().toArray(new String[0]);
String dbName = identifier.getDatabaseName();
String tableName = identifier.getObjectName();
try (HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(
new HiveConf(jobConf, HiveConf.class), hiveVersion)) {
Table table = client.getTable(dbName, tableName);
StorageDescriptor sd = table.getSd();
- 獲取到hive的表的信息长踊,也就是Table對象。
- 獲取表的一些存儲信息萍倡,StorageDescriptor對象身弊,這里面包含了hive表的存儲路徑、存儲格式等等列敲。
流阱佛、批判斷
接下來判斷寫入hive是批處理還是流處理
if (isBounded){
......
//batch
} else {
......
//streaming
}
由于這次我們主要分析flink的流處理,所以對于batch就暫且跳過戴而,進入else凑术,也就是流處理。
在這里所意,定義了一些基本的配置:
- 桶分配器TableBucketAssigner淮逊,簡單來說就是如何確定數(shù)據(jù)的分區(qū),比如按時間扶踊,還是按照字段的值等等泄鹏。
- 滾動策略,如何生成下一個文件秧耗,按照時間命满,還是文件的大小等等。
- 構(gòu)造bulkFactory绣版,目前只有parquet和orc的列存儲格式使用bulkFactory
//桶分配器
TableBucketAssigner assigner = new TableBucketAssigner(partComputer);
//滾動策略
TableRollingPolicy rollingPolicy = new TableRollingPolicy(
true,
conf.get(SINK_ROLLING_POLICY_FILE_SIZE).getBytes(),
conf.get(SINK_ROLLING_POLICY_ROLLOVER_INTERVAL).toMillis());
//構(gòu)造bulkFactory
Optional<BulkWriter.Factory<RowData>> bulkFactory = createBulkWriterFactory(partitionColumns, sd);
createBulkWriterFactory方法主要是用于構(gòu)造寫入列存儲格式的工廠類胶台,目前只支持parquet和orc格式歼疮,首先定義用于構(gòu)造工廠類的一些參數(shù),比如字段的類型诈唬,名稱等等韩脏,之后根據(jù)不同類型構(gòu)造不同的工廠類。如果是parquet格式铸磅,最終構(gòu)造的是ParquetWriterFactory工廠類赡矢,如果是orc格式,根據(jù)hive的版本不同阅仔,分別構(gòu)造出OrcBulkWriterFactory或者是OrcNoHiveBulkWriterFactory吹散。
寫入格式判斷
如果是使用MR的writer或者是行格式,進入if邏輯八酒,使用HadoopPathBasedBulkFormatBuilder空民,如果是列存儲格式,進入else邏輯羞迷,使用StreamingFileSink來寫入數(shù)據(jù).
if (userMrWriter || !bulkFactory.isPresent()) {
HiveBulkWriterFactory hadoopBulkFactory = new HiveBulkWriterFactory(recordWriterFactory);
builder = new HadoopPathBasedBulkFormatBuilder<>(
new Path(sd.getLocation()), hadoopBulkFactory, jobConf, assigner)
.withRollingPolicy(rollingPolicy)
.withOutputFileConfig(outputFileConfig);
LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer.");
} else {
builder = StreamingFileSink.forBulkFormat(
new org.apache.flink.core.fs.Path(sd.getLocation()),
new FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(), partComputer))
.withBucketAssigner(assigner)
.withRollingPolicy(rollingPolicy)
.withOutputFileConfig(outputFileConfig);
LOG.info("Hive streaming sink: Use native parquet&orc writer.");
}
在大數(shù)據(jù)處理中界轩,列式存儲比行存儲有著更好的查詢效率,所以我們這次以列式存儲為主衔瓮,聊聊StreamingFileSink是如何寫入列式數(shù)據(jù)的浊猾。通過代碼我們看到在構(gòu)造buckets builder的時候,使用了前面剛生成的bucket assigner热鞍、輸出的配置葫慎、以及文件滾動的策略。
構(gòu)造分區(qū)提交算子
在HiveTableSink#consumeDataStream方法的最后薇宠,進入了FileSystemTableSink#createStreamingSink方法偷办,這個方法主要做了兩件事情,一個是創(chuàng)建了用于流寫入的算子StreamingFileWriter昼接,另一個是當(dāng)存在分區(qū)列并且在配置文件配置了分區(qū)文件提交策略的時候,構(gòu)造了一個用于提交分區(qū)文件的算子StreamingFileCommitter悴晰,這個算子固定的只有一個并發(fā)度慢睡。
StreamingFileWriter fileWriter = new StreamingFileWriter(
rollingCheckInterval,
bucketsBuilder);
DataStream<CommitMessage> writerStream = inputStream.transform(
StreamingFileWriter.class.getSimpleName(),
TypeExtractor.createTypeInfo(CommitMessage.class),
fileWriter).setParallelism(inputStream.getParallelism());
DataStream<?> returnStream = writerStream;
// save committer when we don't need it.
if (partitionKeys.size() > 0 && conf.contains(SINK_PARTITION_COMMIT_POLICY_KIND)) {
StreamingFileCommitter committer = new StreamingFileCommitter(
path, tableIdentifier, partitionKeys, msFactory, fsFactory, conf);
returnStream = writerStream
.transform(StreamingFileCommitter.class.getSimpleName(), Types.VOID, committer)
.setParallelism(1)
.setMaxParallelism(1);
}
我們看到在代碼中,inputStream經(jīng)過transform方法铡溪,最終將要提交的數(shù)據(jù)轉(zhuǎn)換成CommitMessage格式漂辐,然后發(fā)送給它的下游StreamingFileCommitter算子,也就是說StreamingFileCommitter將會接收StreamingFileWriter中收集的數(shù)據(jù)棕硫。
詳解StreamingFileWriter
這個StreamingFileWriter我們可以理解為一個算子級別的寫入文件的sink髓涯,它對StreamingFileSink進行了一些包裝,然后添加了一些其他操作哈扮,比如提交分區(qū)信息等等纬纪。我們簡單看下這個類的結(jié)構(gòu),并簡單聊聊各個方法的作用蚓再。
public class StreamingFileWriter extends AbstractStreamOperator<CommitMessage>
implements OneInputStreamOperator<RowData, CommitMessage>, BoundedOneInput{
@Override
public void initializeState(StateInitializationContext context) throws Exception {
.........................
}
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
.........................
}
@Override
public void processWatermark(Watermark mark) throws Exception {
.........................
}
@Override
public void processElement(StreamRecord<RowData> element) throws Exception {
.........................
}
/**
* Commit up to this checkpoint id, also send inactive partitions to downstream for committing.
*/
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
.........................
}
@Override
public void endInput() throws Exception {
.........................
}
@Override
public void dispose() throws Exception {
.........................
}
}
- initializeState :初始化狀態(tài)的方法,在這里構(gòu)造了要寫入文件的buckets包各,以及具體寫入文件的StreamingFileSinkHelper等等摘仅。
- snapshotState:這個方法主要是進行每次checkpoint的時候調(diào)用。
- processWatermark這個方法通過名字就能看出來问畅,是處理水印的娃属,比如往下游發(fā)送水印等等。
- processElement:處理元素最核心的方法护姆,每來一條數(shù)據(jù)矾端,都會進入這個方法進行處理。
- notifyCheckpointComplete卵皂,每次checkpoint完成的時候調(diào)用該方法秩铆。在這里,收集了一些要提交的分區(qū)的信息渐裂,用于分區(qū)提交豺旬。
- endInput:不再有更多的數(shù)據(jù)進來,也就是輸入結(jié)束的時候調(diào)用柒凉。
- dispose:算子的生命周期結(jié)束的時候調(diào)用族阅。
簡述StreamingFileSink
StreamingFileSink我們來簡單的描述下,通過名字我們就能看出來膝捞,這是一個用于將流式數(shù)據(jù)寫入文件系統(tǒng)的sink坦刀,它集成了checkpoint提供exactly once語義。
在StreamingFileSink里有一個bucket的概念蔬咬,我們可以理解為數(shù)據(jù)寫入的目錄鲤遥,每個bucket下可以寫入多個文件。它提供了一個BucketAssigner的概念用于生成bucket林艘,進來的每一個數(shù)據(jù)在寫入的時候都會判斷下要寫入哪個bucket盖奈,默認(rèn)的實現(xiàn)是DateTimeBucketAssigner,每小時生成一個bucket狐援。
它根據(jù)不同的寫入格式分別使用StreamingFileSink#forRowFormat或者StreamingFileSink#forBulkFormat來進行相應(yīng)的處理钢坦。
此外,該sink還提供了一個RollingPolicy用于決定數(shù)據(jù)的滾動策略啥酱,比如文件到達多大或者經(jīng)過多久就關(guān)閉當(dāng)前文件爹凹,開啟下一個新文件。
具體的寫入ORC格式的數(shù)據(jù)镶殷,可以參考下這個文章:
flink 1.11 流式數(shù)據(jù)ORC格式寫入file 禾酱,由于我們這次主要是講整體寫入hive的流程,這個sink就不做太具體的講解了。
分區(qū)信息提交
StreamingFileWriter#notifyCheckpointComplete 調(diào)用commitUpToCheckpoint在checkpoint完成的時候觸發(fā)了分區(qū)的提交操作颤陶。
private void commitUpToCheckpoint(long checkpointId) throws Exception {
helper.commitUpToCheckpoint(checkpointId);
CommitMessage message = new CommitMessage(
checkpointId,
getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getNumberOfParallelSubtasks(),
new ArrayList<>(inactivePartitions));
output.collect(new StreamRecord<>(message));
inactivePartitions.clear();
}
在這里颗管,我們看到,使用inactivePartitions構(gòu)造了CommitMessage對象指郁,然后使用output.collect將這個提交數(shù)據(jù)收集起來忙上,也就是上文我們提到的這里收集到的這個數(shù)據(jù)將會發(fā)給StreamingFileCommitter算子來處理。
而inactivePartitions里面的數(shù)據(jù)是什么時候添加進來的呢闲坎,也就是什么時候才會生成要提交的分區(qū)呢疫粥?我們跟蹤一下代碼,發(fā)現(xiàn)是給寫入文件的buckets添加了一個監(jiān)聽器,在bucket成為非活躍狀態(tài)之后腰懂,觸發(fā)監(jiān)聽器梗逮,然后將對應(yīng)的bucket id 添加到inactivePartitions集合。
@Override
public void initializeState(StateInitializationContext context) throws Exception {
..........................
buckets.setBucketLifeCycleListener(new BucketLifeCycleListener<RowData, String>() {
@Override
public void bucketCreated(Bucket<RowData, String> bucket) {
}
@Override
public void bucketInactive(Bucket<RowData, String> bucket) {
inactivePartitions.add(bucket.getBucketId());
}
});
}
而通知bucket變?yōu)榉腔顒訝顟B(tài)又是什么情況會觸發(fā)呢绣溜?從代碼注釋我們看到慷彤,到目前為止該bucket已接收的所有記錄都已提交后,則該bucket將變?yōu)榉腔顒訝顟B(tài)怖喻。
提交分區(qū)算子
這是一個單并行度的算子底哗,用于提交寫入文件系統(tǒng)的分區(qū)信息。具體的處理步驟如下:
- 從上游收集要提交的分區(qū)信息
- 判斷某一個checkpoint下锚沸,所有的子任務(wù)是否都已經(jīng)接收了分區(qū)的數(shù)據(jù)
- 獲取分區(qū)提交觸發(fā)器跋选。(目前支持partition-time和process-time)
- 使用分區(qū)提交策略去依次提交分區(qū)信息(可以配置多個分區(qū)策略)
這里我們主要講一下 StreamingFileCommitter#processElement方法是如何對進來的每個提交數(shù)據(jù)進行處理的。
@Override
public void processElement(StreamRecord<CommitMessage> element) throws Exception {
CommitMessage message = element.getValue();
for (String partition : message.partitions) {
trigger.addPartition(partition);
}
if (taskTracker == null) {
taskTracker = new TaskTracker(message.numberOfTasks);
}
boolean needCommit = taskTracker.add(message.checkpointId, message.taskId);
if (needCommit) {
commitPartitions(message.checkpointId);
}
}
我們看到哗蜈,從上游接收到CommitMessage元素前标,然后從里面得到要提交的分區(qū),添加到PartitionCommitTrigger里(變量trigger)距潘,然后通過taskTracker來判斷一下炼列,該checkpoint每個子任務(wù)是否已經(jīng)接收到了分區(qū)數(shù)據(jù),最后通過commitPartitions方法來提交分區(qū)信息音比。
進入commitPartitions方法俭尖,看看是如何提交分區(qū)的。
private void commitPartitions(long checkpointId) throws Exception {
List<String> partitions = checkpointId == Long.MAX_VALUE ?
trigger.endInput() :
trigger.committablePartitions(checkpointId);
if (partitions.isEmpty()) {
return;
}
try (TableMetaStoreFactory.TableMetaStore metaStore = metaStoreFactory.createTableMetaStore()) {
for (String partition : partitions) {
LinkedHashMap<String, String> partSpec = extractPartitionSpecFromPath(new Path(partition));
LOG.info("Partition {} of table {} is ready to be committed", partSpec, tableIdentifier);
Path path = new Path(locationPath, generatePartitionPath(partSpec));
PartitionCommitPolicy.Context context = new PolicyContext(
new ArrayList<>(partSpec.values()), path);
for (PartitionCommitPolicy policy : policies) {
if (policy instanceof MetastoreCommitPolicy) {
((MetastoreCommitPolicy) policy).setMetastore(metaStore);
}
policy.commit(context);
}
}
}
}
從trigger中獲取該checkpoint下的所有要提交的分區(qū)洞翩,放到一個List集合partitions中稽犁,在提交的分區(qū)不為空的情況下,循環(huán)遍歷要配置的分區(qū)提交策略PartitionCommitPolicy菱农,然后提交分區(qū)缭付。
分區(qū)提交觸發(fā)器
目前系統(tǒng)提供了兩種分區(qū)提交的觸發(fā)器柿估,PartitionTimeCommitTigger和ProcTimeCommitTigger循未,分別用于處理什么時候提交分區(qū)。
- ProcTimeCommitTigger 主要依賴于分區(qū)的創(chuàng)建時間和delay,當(dāng)處理時間大于'partition creation time' + 'delay'的時候的妖,將提交這個分區(qū)
- PartitionTimeCommitTigger 依賴于水印绣檬,當(dāng)水印的值大于 partition-time + delay的時候提交這個分區(qū)。
分區(qū)提交策略
目前系統(tǒng)提供了一個接口PartitionCommitPolicy嫂粟,用于提交分區(qū)的信息娇未,目前系統(tǒng)提供了以下幾種方案,
- 一種是METASTORE星虹,主要是用于提交hive的分區(qū)零抬,比如創(chuàng)建hive分區(qū)等等
- 還有一種是SUCCESS_FILE,也就是往對應(yīng)的分區(qū)目錄下寫一個success文件宽涌。
- 此外平夜,系統(tǒng)還提供了一個對外的自定義實現(xiàn),用于用戶自定義分區(qū)提交卸亮,比如提交分區(qū)之后合并小文件等等忽妒。自定義提交策略的時候,需要實現(xiàn)PartitionCommitPolicy接口兼贸,并將提交策略置為custom段直。
我在網(wǎng)上也看到過一些實現(xiàn)該接口用于合并小文件的示例,但是我個人覺得其實有點不太完美溶诞,因為這個合并小文件可能會涉及很多的問題:
- 合并的時候如何保證事務(wù)鸯檬,保證合并的同時如何有讀操作不會發(fā)生臟讀
- 事務(wù)的一致性,如果合并出錯了怎么回滾
- 合并小文件的性能是否跟得上很澄,目前flink只提供了一個單并行度的提交算子京闰。
- 如何多并發(fā)合并寫入
所以暫時我也沒有想到一個完美的方案用于flink來合并小文件。
總結(jié)
通過上述的描述甩苛,我們簡單聊了一下flink是如何將流式數(shù)據(jù)寫入hive的蹂楣,但是可能每個人在做的過程中還是會遇到各種各種的環(huán)境問題導(dǎo)致的寫入失敗,比如window和linux系統(tǒng)的差異讯蒲,hdfs版本的差異痊土,系統(tǒng)時區(qū)的配置等等,在遇到一些個性化的問題之后墨林,就可能需要大家去針對自己的問題去個性化的debug了赁酝。
更多干貨信息,歡迎關(guān)注我的公眾號【大數(shù)據(jù)技術(shù)與應(yīng)用實戰(zhàn)】