利弊
在 flink 1.12 社區(qū)推出了 upsert kafka羞迷,他與普通 kafka source connect 的最大區(qū)別就是引入了 changelog 中的 RowKind 類(lèi)型。借助 state 實(shí)現(xiàn)了所謂的 數(shù)據(jù)增刪改食侮,但其實(shí)很多事情和 kafka upsert 本身沒(méi)有一毛錢(qián)關(guān)系诈胜,下面我們會(huì)從源碼角度撕開(kāi)他神秘的狼皮贼涩。與此同時(shí)它雖然好用描睦,但是也存在一些弊端言缤,我個(gè)人目前主要關(guān)注端在 source .
- kafka 消息需要有 key. (至少社區(qū)大佬給的 demo 就是要攜帶 key)然而我們 kafka 生產(chǎn)端很多場(chǎng)景都是忽略了 key 關(guān)注的是 value. 這感覺(jué)限制了 upsert kafka 的使用嚼蚀,因?yàn)椴惶赡転榱耸褂?upsert kafka 而讓 kafka 生產(chǎn)端進(jìn)行消息改造,這很不友好.
- upsert kafka 默認(rèn)寫(xiě)死從 earilest 開(kāi)始消費(fèi)管挟,并且貌似沒(méi)有開(kāi)放其他的 消費(fèi)位置設(shè)置轿曙,這簡(jiǎn)直就是災(zāi)難,你能忍我不能忍
我會(huì)對(duì)以上兩點(diǎn)做出源碼改造
核心源碼解析
介紹下 RowKind
package org.apache.flink.types;
public enum RowKind {
INSERT("+I", (byte) 0), //代表新增
UPDATE_BEFORE("-U", (byte) 1), //代表更新前的數(shù)據(jù)
UPDATE_AFTER("+U", (byte) 2), //代表更新后的數(shù)據(jù)
DELETE("-D", (byte) 3); //代表刪除
}
/*
結(jié)合 upsert kafka 打個(gè)比方 主鍵為id 1,name 為 2的 的數(shù)據(jù)
如果消費(fèi)到一條主鍵為1 的數(shù)據(jù) print 出來(lái)是這樣的
| +I | 1 | 2 | //代表新增
如果再消費(fèi)到一條主鍵為1 的name為其他比如是22的數(shù)據(jù) print 出來(lái)是這樣的
| -U | 1 | 2 | //代表更新前的數(shù)據(jù)
| +U | 1 | 22 | //代表更新后的數(shù)據(jù)
如果再消費(fèi)到只含有主鍵為1 的數(shù)據(jù) print 出來(lái)是這樣的
| -D | 1 | 22 | //代表刪除
*/
反序列化器
在開(kāi)始接觸 upsert kafka 最想看的就是反序列化僻孝,因?yàn)橄葘?shù)據(jù)反序列化 然后下發(fā)到下游导帝,此時(shí)他應(yīng)該可以做很多事情 比如先查 state 看看數(shù)據(jù)是否存在,如果存在就下發(fā)兩條一條是 update before穿铆,一條 update after.代表更新前和更新后的數(shù)據(jù)類(lèi)似于mysql 的binlog數(shù)據(jù)結(jié)構(gòu)您单。另外肯定會(huì)對(duì)數(shù)據(jù)進(jìn)行一些標(biāo)記來(lái)標(biāo)識(shí)這條數(shù)據(jù)是 update 還是 insert 還是 delete.
回顧下create table 的邏輯到后臺(tái)創(chuàng)建 source 的邏輯,如果你熟悉了這個(gè)流程就可以直接跳過(guò)直接看反序列化器 DynamicKafkaDeserializationSchema
1荞雏、在 flink-connector-kafka_2.11-1.12.0.jar
的 META-INF.services.org.apache.flink.table.factories.TableFactory
下有 org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaDynamicTableFactory
這樣 flink 就可以加載到flink-connector-kafka
提供的 TableFactory
2虐秦、在 create table 的時(shí)候會(huì)指定 connector' = 'upsert-kafka'
等參數(shù),這樣 flink 在解析 建表 sql的時(shí)候根據(jù)參數(shù)去適配到合理的 TableFactory
然后 初始化 UpsertKafkaDynamicTableFactory
3凤优、然后創(chuàng)建 TableSource org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaDynamicTableFactory#createDynamicTableSource
-> new org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource()
KafkaDynamicSource 接口繼承如下
KafkaDynamicSource implements ScanTableSource, SupportsReadingMetadata, SupportsWatermarkPushDown
4悦陋、這里寫(xiě)死了
// always use earliest to keep data integrity
StartupMode earliest = StartupMode.EARLIEST;
源碼如下
public DynamicTableSource createDynamicTableSource(Context context) {
// 其他代碼省略
// always use earliest to keep data integrity
StartupMode earliest = StartupMode.EARLIEST;
return new KafkaDynamicSource(
schema.toPhysicalRowDataType(),
keyDecodingFormat,
new DecodingFormatWrapper(valueDecodingFormat),
keyValueProjections.f0,
keyValueProjections.f1,
keyPrefix,
KafkaOptions.getSourceTopics(tableOptions),
KafkaOptions.getSourceTopicPattern(tableOptions),
properties,
earliest,
Collections.emptyMap(),
0,
true);
}
4、在方法
org.apache.flink.table.planner.sources.DynamicSourceUtils#prepareDynamicSource
中會(huì)調(diào)用org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource#getScanRuntimeProvider
方法里面會(huì)創(chuàng)建反序列化器和 FlinkKafkaConsumer
源碼如下
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
final DeserializationSchema<RowData> keyDeserialization =
createDeserialization(context, keyDecodingFormat, keyProjection, keyPrefix);
final DeserializationSchema<RowData> valueDeserialization =
createDeserialization(context, valueDecodingFormat, valueProjection, null);
final TypeInformation<RowData> producedTypeInfo =
context.createTypeInformation(producedDataType);
final FlinkKafkaConsumer<RowData> kafkaConsumer =
createKafkaConsumer(keyDeserialization, valueDeserialization, producedTypeInfo);
return SourceFunctionProvider.of(kafkaConsumer, false);
}
5别洪、創(chuàng)建 FlinkKafkaConsumer
的同時(shí) 創(chuàng)建反序列化起 DynamicKafkaDeserializationSchema
protected FlinkKafkaConsumer<RowData> createKafkaConsumer(
DeserializationSchema<RowData> keyDeserialization,
DeserializationSchema<RowData> valueDeserialization,
TypeInformation<RowData> producedTypeInfo) {
final KafkaDeserializationSchema<RowData> kafkaDeserializer = new DynamicKafkaDeserializationSchema(
adjustedPhysicalArity,
keyDeserialization,
keyProjection,
valueDeserialization,
adjustedValueProjection,
hasMetadata,
metadataConverters,
producedTypeInfo,
upsertMode);
final FlinkKafkaConsumer<RowData> kafkaConsumer;
if (topics != null) {
kafkaConsumer = new FlinkKafkaConsumer<>(topics, kafkaDeserializer, properties);
} else {
kafkaConsumer = new FlinkKafkaConsumer<>(topicPattern, kafkaDeserializer, properties);
}
switch (startupMode) {
case EARLIEST:
kafkaConsumer.setStartFromEarliest();
break;
case LATEST:
kafkaConsumer.setStartFromLatest();
break;
case GROUP_OFFSETS:
kafkaConsumer.setStartFromGroupOffsets();
break;
case SPECIFIC_OFFSETS:
kafkaConsumer.setStartFromSpecificOffsets(specificStartupOffsets);
break;
case TIMESTAMP:
kafkaConsumer.setStartFromTimestamp(startupTimestampMillis);
break;
}
kafkaConsumer.setCommitOffsetsOnCheckpoints(properties.getProperty("group.id") != null);
if (watermarkStrategy != null) {
kafkaConsumer.assignTimestampsAndWatermarks(watermarkStrategy);
}
return kafkaConsumer;
}
反序列化器 DynamicKafkaDeserializationSchema
DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<RowData>
kafka source 消費(fèi)到數(shù)據(jù)就會(huì)進(jìn)行反序列化 調(diào)用 deserialize 方法叨恨,deserialize 代碼邏輯如下
public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<RowData> collector) throws Exception {
// shortcut in case no output projection is required,
// also not for a cartesian product with the keys
// 消息不需要解析 key
if (keyDeserialization == null && !hasMetadata) {
valueDeserialization.deserialize(record.value(), collector);
return;
}
// buffer key(s)
// 消息需要解析key
if (keyDeserialization != null) {
keyDeserialization.deserialize(record.key(), keyCollector);
}
// project output while emitting values
outputCollector.inputRecord = record;
outputCollector.physicalKeyRows = keyCollector.buffer;
outputCollector.outputCollector = collector;
// 在 upsert 模式下如果 消息的 value 為空當(dāng)做一條 刪除消息
if (record.value() == null && upsertMode) {
// collect tombstone messages in upsert mode by hand
outputCollector.collect(null);
} else {
valueDeserialization.deserialize(record.value(), outputCollector);
}
keyCollector.buffer.clear();
}
詳細(xì)看下 keyDeserialization.deserialize(record.key(), keyCollector);
和 valueDeserialization.deserialize(record.value(), outputCollector);
方法
先看 key 的keyDeserialization.deserialize(record.key(), keyCollector);
keyCollector
是 DynamicKafkaDeserializationSchema$BufferingCollector
keyDeserialization
是 org.apache.flink.formats.json.JsonRowDataDeserializationSchema
再看 value 的 valueDeserialization.deserialize(record.value(), outputCollector);
outputCollector
是 DynamicKafkaDeserializationSchema$OutputProjectionCollector
valueDeserialization
是 org.apache.flink.formats.json.JsonRowDataDeserializationSchema
key 和 value 的deserialize 鏈路是一樣的都是先調(diào)用父類(lèi) org.apache.flink.api.common.serialization.DeserializationSchema
的deserialize
方法
default void deserialize(byte[] message, Collector<T> out) throws IOException {
T deserialize = deserialize(message);
if (deserialize != null) {
out.collect(deserialize);
}
}
然后調(diào)用子類(lèi)的 org.apache.flink.formats.json.JsonRowDataDeserializationSchema
的deserialize
方法
org.apache.flink.formats.json.JsonRowDataDeserializationSchema
public RowData deserialize(byte[] message) throws IOException {
final JsonNode root = objectMapper.readTree(message);
//重點(diǎn)看 怎么把一個(gè)jsonNode 怎么轉(zhuǎn)成一個(gè) RowData
return (RowData) runtimeConverter.convert(root);
}
重點(diǎn)看 convert 方法 最后會(huì)調(diào)用如下方法
org.apache.flink.formats.json.JsonToRowDataConverters#createRowConverter
public JsonToRowDataConverter createRowConverter(RowType rowType) {
// 省略了部分代碼,把一個(gè)jsonNode 怎么轉(zhuǎn)成一個(gè) RowData
return jsonNode -> {
ObjectNode node = (ObjectNode) jsonNode;
int arity = fieldNames.length;
// 初始化 GenericRowData 這個(gè)里面會(huì)會(huì)設(shè)置 FLink RowKind
GenericRowData row = new GenericRowData(arity);
for (int i = 0; i < arity; i++) {
String fieldName = fieldNames[i];
JsonNode field = node.get(fieldName);
Object convertedField = convertField(fieldConverters[i], fieldName, field);
row.setField(i, convertedField);
}
return row;
};
}
org.apache.flink.table.data.GenericRowData
public GenericRowData(int arity) {
this.fields = new Object[arity]; //字段個(gè)數(shù)
this.kind = RowKind.INSERT; // INSERT as default
}
// 所以可以看到這個(gè)都是默認(rèn)的 INSERT 挖垛,那在什么時(shí)候 會(huì)把他變成其他類(lèi)型的 RowKind 呢?
反序列化結(jié)束 最后看 collect 下發(fā)到下游
先看 key 的 collect DynamicKafkaDeserializationSchema$BufferingCollector
private static final class BufferingCollector implements Collector<RowData>, Serializable {
private final List<RowData> buffer = new ArrayList<>();
@Override
public void collect(RowData record) {
// 貌似很簡(jiǎn)單就是放入到了一個(gè) buffer
buffer.add(record);
}
}
先看 value 的 collect DynamicKafkaDeserializationSchema$OutputProjectionCollector
private static final class OutputProjectionCollector implements Collector<RowData>, Serializable {
@Override
public void collect(RowData physicalValueRow) {
// no key defined
int length = keyProjection.length;
// 沒(méi)有key 這不是 upsert kafka 的場(chǎng)景
if (length == 0) {
emitRow(null, (GenericRowData) physicalValueRow);
return;
}
// 這里對(duì) kafka mesg value 進(jìn)行解析看看是否包含 除key 字段之外還有其他字段秉颗。如果其他字段都不存在相當(dāng)于 kafka mesg value 為空
Boolean hashValue = false;
if(physicalValueRow != null && length > 0){
Set<Integer> collect = Arrays.stream(keyProjection).boxed().collect(Collectors.toSet());
for(int i = 0; i < physicalValueRow.getArity() && !hashValue; i ++){
if(collect.contains(i)){
continue;
}
hashValue = !physicalValueRow.isNullAt(i);
}
}
// otherwise emit a value for each key
for (RowData physicalKeyRow : physicalKeyRows) {
if(!hashValue){
// 如果 hashValue 為空則為 null
emitRow((GenericRowData) physicalKeyRow, null);
}else{
emitRow((GenericRowData) physicalKeyRow, (GenericRowData) physicalValueRow);
}
}
}
private void emitRow(@Nullable GenericRowData physicalKeyRow, @Nullable GenericRowData physicalValueRow) {
final RowKind rowKind;
if (physicalValueRow == null) {
// 如果 physicalValueRow 為 null 且是 upsertMode 模式下 則 rowKind 為刪除
if (upsertMode) {
rowKind = RowKind.DELETE;
} else {
throw new DeserializationException(
"Invalid null value received in non-upsert mode. Could not to set row kind for output record.");
}
} else {
// 否則就取原本的 rowKind 類(lèi)型
rowKind = physicalValueRow.getRowKind();
}
//調(diào)用 org.apache.flink.util.Collector 下發(fā)下去
outputCollector.collect(producedRow);
}
}
所以可以出在整個(gè)反序列化到數(shù)據(jù)下發(fā)的過(guò)程 并沒(méi)有找 sate 看主鍵數(shù)據(jù)是否之前到過(guò) flink. 并且在這個(gè)過(guò)程中 rowKind 的設(shè)置就 2種 痢毒,首先默認(rèn)都是 insert 然后根據(jù) value 是否為空進(jìn)行判斷如果 value 為空則將 rowKind 設(shè)為 delete . 所以到底在哪里做判斷是否之前消費(fèi)過(guò)這個(gè)主鍵數(shù)據(jù)呢?
哪來(lái)的 state
偶然在 flink UI 上發(fā)現(xiàn)到 fink 在每一個(gè)source 下都接了一個(gè)changelogNormalize 的算子如下圖
恍然明白 這個(gè)應(yīng)該 flink 框架在將 flink sql 解析成 執(zhí)行計(jì)劃的時(shí)候加了一個(gè)特殊的算子蚕甥。跟蹤源碼如下哪替,這個(gè)鏈路太長(zhǎng)這里只說(shuō)下核心的幾個(gè)源碼
- 任務(wù)main 方法 executeSql
tEnv.executeSql("SELECT * from shop").print();
- 根據(jù) QueryOperation 轉(zhuǎn)化成一個(gè) TableResult
public TableResult executeInternal(QueryOperation operation) {
SelectSinkOperation sinkOperation = new SelectSinkOperation(operation);
//看這個(gè) translate 方法
List<Transformation<?>> transformations = translate(Collections.singletonList(sinkOperation));
String jobName = getJobName("collect");
Pipeline pipeline = execEnv.createPipeline(transformations, tableConfig, jobName);
}
org.apache.flink.table.planner.delegation.PlannerBase#translate
override def translate(
modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]] = {
if (modifyOperations.isEmpty) {
return List.empty[Transformation[_]]
}
// prepare the execEnv before translating
getExecEnv.configure(
getTableConfig.getConfiguration,
Thread.currentThread().getContextClassLoader)
overrideEnvParallelism()
// 這個(gè)方法也許是獲取上游依賴(lài)節(jié)點(diǎn)的意思
val relNodes = modifyOperations.map(translateToRel)
//對(duì)節(jié)點(diǎn)進(jìn)行優(yōu)化 重點(diǎn)看這里
val optimizedRelNodes = optimize(relNodes)
val execNodes = translateToExecNodePlan(optimizedRelNodes)
translateToPlan(execNodes)
}
- 在會(huì)optimize 的時(shí)候被一個(gè) 叫
StreamExecTableSourceScanRule
的規(guī)則匹配上
他會(huì)將FlinkLogicalTableSourceScan
轉(zhuǎn)換為StreamExecTableSourceSource
,能匹配的前提是它是一個(gè)FlinkLogicalTableSourceScan
并且對(duì)于 upsert source 來(lái)說(shuō)他還會(huì)優(yōu)化生成一個(gè)StreamExecChangelogNormalize
.
org.apache.flink.table.planner.plan.rules.physical.stream.StreamExecTableSourceScanRule#convert
def convert(rel: RelNode): RelNode = {
val scan = rel.asInstanceOf[FlinkLogicalTableSourceScan]
val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
val config = ShortcutUtils.unwrapContext(rel.getCluster).getTableConfig
val table = scan.getTable.asInstanceOf[TableSourceTable]
val newScan = new StreamExecTableSourceScan(
rel.getCluster,
traitSet,
table)
// 如果是一個(gè) upsert source
if (isUpsertSource(table.catalogTable, table.tableSource) ||
isSourceChangeEventsDuplicate(table.catalogTable, table.tableSource, config)) {
// generate changelog normalize node
// primary key has been validated in CatalogSourceTable
val primaryKey = table.catalogTable.getSchema.getPrimaryKey.get()
val keyFields = primaryKey.getColumns
val inputFieldNames = newScan.getRowType.getFieldNames
val primaryKeyIndices = ScanUtil.getPrimaryKeyIndices(inputFieldNames, keyFields)
val requiredDistribution = FlinkRelDistribution.hash(primaryKeyIndices, requireStrict = true)
val requiredTraitSet = rel.getCluster.getPlanner.emptyTraitSet()
.replace(requiredDistribution)
.replace(FlinkConventions.STREAM_PHYSICAL)
val newInput: RelNode = RelOptRule.convert(newScan, requiredTraitSet)
// 產(chǎn)生一個(gè) StreamExecChangelogNormalize
new StreamExecChangelogNormalize(
scan.getCluster,
traitSet,
newInput,
primaryKeyIndices)
} else {
newScan
}
}
- 上面相當(dāng)于加了一個(gè)
StreamExecChangelogNormalize
節(jié)點(diǎn)后面會(huì)對(duì)這個(gè)節(jié)點(diǎn)轉(zhuǎn)成成 stream api
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecChangelogNormalize#translateToPlanInternal
override protected def translateToPlanInternal(
planner: StreamPlanner): Transformation[RowData] = {
val inputTransform = getInputNodes.get(0).translateToPlan(planner)
.asInstanceOf[Transformation[RowData]]
val rowTypeInfo = inputTransform.getOutputType.asInstanceOf[InternalTypeInfo[RowData]]
val generateUpdateBefore = ChangelogPlanUtils.generateUpdateBefore(this)
val tableConfig = planner.getTableConfig
val isMiniBatchEnabled = tableConfig.getConfiguration.getBoolean(
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED)
val stateIdleTime = tableConfig.getIdleStateRetention.toMillis
val operator = if (isMiniBatchEnabled) {
//省略
} else {
// 這是重點(diǎn)
val processFunction = new ProcTimeDeduplicateKeepLastRowFunction(
rowTypeInfo,
stateIdleTime,
generateUpdateBefore,
true, // generateInsert
false) // inputInsertOnly
// 并且是一個(gè) KeyedProcessOperator
new KeyedProcessOperator[RowData, RowData, RowData](processFunction)
}
val ret = new OneInputTransformation(
inputTransform,
getRelDetailedDescription,
operator,
rowTypeInfo,
inputTransform.getParallelism)
if (inputsContainSingleton()) {
ret.setParallelism(1)
ret.setMaxParallelism(1)
}
val selector = KeySelectorUtil.getRowDataSelector(uniqueKeys, rowTypeInfo)
ret.setStateKeySelector(selector)
ret.setStateKeyType(selector.getProducedType)
ret
}
- 到這里已經(jīng)很明朗了菇怀,就是
ProcTimeDeduplicateKeepLastRowFunction
搞的名堂 感覺(jué)這個(gè) upsert 本身沒(méi)有太大的關(guān)系嘍凭舶,你說(shuō)你是不是披著狼皮的羊,害我在upsert kafka 代碼找了好久爱沟。
ProcTimeDeduplicateKeepLastRowFunction
源碼解析
package org.apache.flink.table.runtime.operators.deduplicate;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.util.Collector;
import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.processLastRowOnChangelog;
import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.processLastRowOnProcTime;
/**
* This function is used to deduplicate on keys and keeps only last row.
*/
public class ProcTimeDeduplicateKeepLastRowFunction
extends DeduplicateFunctionBase<RowData, RowData, RowData, RowData> {
private static final long serialVersionUID = -291348892087180350L;
private final boolean generateUpdateBefore;
private final boolean generateInsert;
private final boolean inputIsInsertOnly;
public ProcTimeDeduplicateKeepLastRowFunction(
InternalTypeInfo<RowData> typeInfo,
long stateRetentionTime,
boolean generateUpdateBefore,
boolean generateInsert,
boolean inputInsertOnly) {
super(typeInfo, null, stateRetentionTime);
this.generateUpdateBefore = generateUpdateBefore;
this.generateInsert = generateInsert;
this.inputIsInsertOnly = inputInsertOnly;
}
@Override
public void processElement(RowData input, Context ctx, Collector<RowData> out) throws Exception {
if (inputIsInsertOnly) {
processLastRowOnProcTime(input, generateUpdateBefore, generateInsert, state, out);
} else {
// 重點(diǎn)看這里 父類(lèi)方法
processLastRowOnChangelog(input, generateUpdateBefore, state, out);
}
}
}
- 好激動(dòng)直接上源碼 RowKind 的變化千呼萬(wàn)喚始出來(lái) 注釋等很清晰了自己看吧
static void processLastRowOnChangelog(
RowData currentRow,
boolean generateUpdateBefore,
ValueState<RowData> state,
Collector<RowData> out) throws Exception {
RowData preRow = state.value();
RowKind currentKind = currentRow.getRowKind();
if (currentKind == RowKind.INSERT || currentKind == RowKind.UPDATE_AFTER) {
if (preRow == null) {
// the first row, send INSERT message
currentRow.setRowKind(RowKind.INSERT);
out.collect(currentRow);
} else {
if (generateUpdateBefore) {
preRow.setRowKind(RowKind.UPDATE_BEFORE);
out.collect(preRow);
}
currentRow.setRowKind(RowKind.UPDATE_AFTER);
out.collect(currentRow);
}
// normalize row kind
currentRow.setRowKind(RowKind.INSERT);
// save to state
state.update(currentRow);
} else {
// DELETE or UPDATER_BEFORE
if (preRow != null) {
// always set to DELETE because this row has been removed
// even the the input is UPDATE_BEFORE, there may no UPDATE_AFTER after it.
preRow.setRowKind(RowKind.DELETE);
// output the preRow instead of currentRow,
// because preRow always contains the full content.
// currentRow may only contain key parts (e.g. Kafka tombstone records).
out.collect(preRow);
// clear state as the row has been removed
state.clear();
}
// nothing to do if removing a non-existed row
}
}
- 上面方法會(huì) 從 state 中拿舊數(shù)據(jù)帅霜,還會(huì)更新回新數(shù)據(jù),再看一眼 state 是個(gè)啥玩意
他是一個(gè)來(lái)自父類(lèi)DeduplicateFunctionBase
的 ValueState
abstract class DeduplicateFunctionBase<T, K, IN, OUT> extends KeyedProcessFunction<K, IN, OUT> {
private static final long serialVersionUID = 1L;
// the TypeInformation of the values in the state.
protected final TypeInformation<T> typeInfo;
protected final long stateRetentionTime;
protected final TypeSerializer<OUT> serializer;
// state stores previous message under the key.
protected ValueState<T> state;
public DeduplicateFunctionBase(
TypeInformation<T> typeInfo,
TypeSerializer<OUT> serializer,
long stateRetentionTime) {
this.typeInfo = typeInfo;
this.stateRetentionTime = stateRetentionTime;
this.serializer = serializer;
}
@Override
public void open(Configuration configure) throws Exception {
super.open(configure);
ValueStateDescriptor<T> stateDesc = new ValueStateDescriptor<>("deduplicate-state", typeInfo);
StateTtlConfig ttlConfig = createTtlConfig(stateRetentionTime);
if (ttlConfig.isEnabled()) {
stateDesc.enableTimeToLive(ttlConfig);
}
state = getRuntimeContext().getState(stateDesc);
}
}
先到此結(jié)束,后面再談源碼改造
- kafka 消息需要有 key. (至少社區(qū)大佬給的 demo 就是要攜帶 key)然而我們 kafka 生產(chǎn)端很多場(chǎng)景都是忽略了 key 關(guān)注的是 value. 這感覺(jué)限制了 upsert kafka 的使用呼伸,因?yàn)椴惶赡転榱耸褂?upsert kafka 而讓 kafka 生產(chǎn)端進(jìn)行消息改造身冀,這很不友好.
- upsert kafka 默認(rèn)寫(xiě)死從 earilest 開(kāi)始消費(fèi),并且貌似沒(méi)有開(kāi)放其他的 消費(fèi)位置設(shè)置,這簡(jiǎn)直就是災(zāi)難搂根,你能忍我不能忍
我會(huì)對(duì)以上兩點(diǎn)做出源碼改造