現(xiàn)象:
當期的操作流程如下圖:
這樣的處理流程會造成如下問題:
1衔瓮、當binlog解析出的批次數(shù)據(jù)中挥萌,數(shù)據(jù)包含了對同一條數(shù)據(jù)的刪除和修改操作時悍引,無法保證操作執(zhí)行的順序家肯。
解決方案(針對kudu的Destination):
Kudu的Destination中有個設(shè)置Default Operation ,這個設(shè)置的說明是:
default operation to perform if sdc.operation.type is not set in record header.
所以我們可以通過Record的Header Attribute 中的sdc.opeation.type來直接控制數(shù)據(jù)在kudu的Destination中執(zhí)行的操作畸颅。
對數(shù)據(jù)的操作不進行分離担巩,通過sdc.operation.type加入的Record的Header Attributes中進行控制。
sdc.operation.type的值的說明如下:
- INSERT_CODE = 1;
- DELETE_CODE = 2;
- UPDATE_CODE = 3;
- UPSERT_CODE = 4;
在Javascript Evaluator 中的JS中增加如下的代碼:
for(var i = 0; i < records.length; i++) {
try {
var newRecord = sdcFunctions.createRecord(true);
var attributes = records[i].attributes
if(records[i].value['Type'] =='DELETE'){
newRecord.attributes['sdc.operation.type']='2';
newRecord.value = records[i].value['OldData'];
}else{
newRecord.attributes['sdc.operation.type']='4'
newRecord.value = records[i].value['Data'];
}
newRecord.value.Type = records[i].value['Type'];
newRecord.value.Database = records[i].value['Database'];
newRecord.value.Table = records[i].value['Table'];
output.write(newRecord);
} catch (e) {
// Send record to error
error.write(records[i], e);
}
}
``