flink sql upsert kafka 源碼解讀 你就是一只披著狼皮的羊

利弊

在 flink 1.12 社區(qū)推出了 upsert kafka羞迷,他與普通 kafka source connect 的最大區(qū)別就是引入了 changelog 中的 RowKind 類(lèi)型。借助 state 實(shí)現(xiàn)了所謂的 數(shù)據(jù)增刪改食侮,但其實(shí)很多事情和 kafka upsert 本身沒(méi)有一毛錢(qián)關(guān)系诈胜,下面我們會(huì)從源碼角度撕開(kāi)他神秘的狼皮贼涩。與此同時(shí)它雖然好用描睦,但是也存在一些弊端言缤,我個(gè)人目前主要關(guān)注端在 source .

  • kafka 消息需要有 key. (至少社區(qū)大佬給的 demo 就是要攜帶 key)然而我們 kafka 生產(chǎn)端很多場(chǎng)景都是忽略了 key 關(guān)注的是 value. 這感覺(jué)限制了 upsert kafka 的使用嚼蚀,因?yàn)椴惶赡転榱耸褂?upsert kafka 而讓 kafka 生產(chǎn)端進(jìn)行消息改造,這很不友好.
  • upsert kafka 默認(rèn)寫(xiě)死從 earilest 開(kāi)始消費(fèi)管挟,并且貌似沒(méi)有開(kāi)放其他的 消費(fèi)位置設(shè)置轿曙,這簡(jiǎn)直就是災(zāi)難,你能忍我不能忍
    我會(huì)對(duì)以上兩點(diǎn)做出源碼改造

核心源碼解析

介紹下 RowKind

package org.apache.flink.types;
public enum RowKind {
    INSERT("+I", (byte) 0), //代表新增
    UPDATE_BEFORE("-U", (byte) 1), //代表更新前的數(shù)據(jù)
    UPDATE_AFTER("+U", (byte) 2), //代表更新后的數(shù)據(jù)
    DELETE("-D", (byte) 3); //代表刪除
}
/*
結(jié)合 upsert kafka 打個(gè)比方 主鍵為id 1,name 為 2的 的數(shù)據(jù)
如果消費(fèi)到一條主鍵為1 的數(shù)據(jù) print 出來(lái)是這樣的
| +I |                              1 |                              2 |  //代表新增
如果再消費(fèi)到一條主鍵為1 的name為其他比如是22的數(shù)據(jù) print 出來(lái)是這樣的
| -U |                              1 |                              2 |  //代表更新前的數(shù)據(jù)
| +U |                              1 |                              22 | //代表更新后的數(shù)據(jù)
如果再消費(fèi)到只含有主鍵為1 的數(shù)據(jù) print 出來(lái)是這樣的
| -D |                              1 |                              22 | //代表刪除
*/

反序列化器

在開(kāi)始接觸 upsert kafka 最想看的就是反序列化僻孝,因?yàn)橄葘?shù)據(jù)反序列化 然后下發(fā)到下游导帝,此時(shí)他應(yīng)該可以做很多事情 比如先查 state 看看數(shù)據(jù)是否存在,如果存在就下發(fā)兩條一條是 update before穿铆,一條 update after.代表更新前和更新后的數(shù)據(jù)類(lèi)似于mysql 的binlog數(shù)據(jù)結(jié)構(gòu)您单。另外肯定會(huì)對(duì)數(shù)據(jù)進(jìn)行一些標(biāo)記來(lái)標(biāo)識(shí)這條數(shù)據(jù)是 update 還是 insert 還是 delete.

回顧下create table 的邏輯到后臺(tái)創(chuàng)建 source 的邏輯,如果你熟悉了這個(gè)流程就可以直接跳過(guò)直接看反序列化器 DynamicKafkaDeserializationSchema

1荞雏、在 flink-connector-kafka_2.11-1.12.0.jarMETA-INF.services.org.apache.flink.table.factories.TableFactory 下有 org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaDynamicTableFactory 這樣 flink 就可以加載到flink-connector-kafka 提供的 TableFactory
2虐秦、在 create table 的時(shí)候會(huì)指定 connector' = 'upsert-kafka'等參數(shù),這樣 flink 在解析 建表 sql的時(shí)候根據(jù)參數(shù)去適配到合理的 TableFactory 然后 初始化 UpsertKafkaDynamicTableFactory
3凤优、然后創(chuàng)建 TableSource org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaDynamicTableFactory#createDynamicTableSource -> new org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource()
KafkaDynamicSource 接口繼承如下
KafkaDynamicSource implements ScanTableSource, SupportsReadingMetadata, SupportsWatermarkPushDown
4悦陋、這里寫(xiě)死了

// always use earliest to keep data integrity
StartupMode earliest = StartupMode.EARLIEST; 

源碼如下

public DynamicTableSource createDynamicTableSource(Context context) {
    // 其他代碼省略
    // always use earliest to keep data integrity
    StartupMode earliest = StartupMode.EARLIEST;

    return new KafkaDynamicSource(
            schema.toPhysicalRowDataType(),
            keyDecodingFormat,
            new DecodingFormatWrapper(valueDecodingFormat),
            keyValueProjections.f0,
            keyValueProjections.f1,
            keyPrefix,
            KafkaOptions.getSourceTopics(tableOptions),
            KafkaOptions.getSourceTopicPattern(tableOptions),
            properties,
            earliest,
            Collections.emptyMap(),
            0,
            true);
}

4、在方法
org.apache.flink.table.planner.sources.DynamicSourceUtils#prepareDynamicSource 中會(huì)調(diào)用org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource#getScanRuntimeProvider 方法里面會(huì)創(chuàng)建反序列化器和 FlinkKafkaConsumer 源碼如下

public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
    final DeserializationSchema<RowData> keyDeserialization =
            createDeserialization(context, keyDecodingFormat, keyProjection, keyPrefix);

    final DeserializationSchema<RowData> valueDeserialization =
            createDeserialization(context, valueDecodingFormat, valueProjection, null);

    final TypeInformation<RowData> producedTypeInfo =
            context.createTypeInformation(producedDataType);

    final FlinkKafkaConsumer<RowData> kafkaConsumer =
            createKafkaConsumer(keyDeserialization, valueDeserialization, producedTypeInfo);

    return SourceFunctionProvider.of(kafkaConsumer, false);
}

5别洪、創(chuàng)建 FlinkKafkaConsumer 的同時(shí) 創(chuàng)建反序列化起 DynamicKafkaDeserializationSchema

protected FlinkKafkaConsumer<RowData> createKafkaConsumer(
        DeserializationSchema<RowData> keyDeserialization,
        DeserializationSchema<RowData> valueDeserialization,
        TypeInformation<RowData> producedTypeInfo) {

    final KafkaDeserializationSchema<RowData> kafkaDeserializer = new DynamicKafkaDeserializationSchema(
            adjustedPhysicalArity,
            keyDeserialization,
            keyProjection,
            valueDeserialization,
            adjustedValueProjection,
            hasMetadata,
            metadataConverters,
            producedTypeInfo,
            upsertMode);

    final FlinkKafkaConsumer<RowData> kafkaConsumer;
    if (topics != null) {
        kafkaConsumer = new FlinkKafkaConsumer<>(topics, kafkaDeserializer, properties);
    } else {
        kafkaConsumer = new FlinkKafkaConsumer<>(topicPattern, kafkaDeserializer, properties);
    }

    switch (startupMode) {
        case EARLIEST:
            kafkaConsumer.setStartFromEarliest();
            break;
        case LATEST:
            kafkaConsumer.setStartFromLatest();
            break;
        case GROUP_OFFSETS:
            kafkaConsumer.setStartFromGroupOffsets();
            break;
        case SPECIFIC_OFFSETS:
            kafkaConsumer.setStartFromSpecificOffsets(specificStartupOffsets);
            break;
        case TIMESTAMP:
            kafkaConsumer.setStartFromTimestamp(startupTimestampMillis);
            break;
    }

    kafkaConsumer.setCommitOffsetsOnCheckpoints(properties.getProperty("group.id") != null);

    if (watermarkStrategy != null) {
        kafkaConsumer.assignTimestampsAndWatermarks(watermarkStrategy);
    }
    return kafkaConsumer;
}

反序列化器 DynamicKafkaDeserializationSchema

DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<RowData>
kafka source 消費(fèi)到數(shù)據(jù)就會(huì)進(jìn)行反序列化 調(diào)用 deserialize 方法叨恨,deserialize 代碼邏輯如下

public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<RowData> collector) throws Exception {
    // shortcut in case no output projection is required,
    // also not for a cartesian product with the keys
    // 消息不需要解析 key
    if (keyDeserialization == null && !hasMetadata) {
        valueDeserialization.deserialize(record.value(), collector);
        return;
    }

    // buffer key(s)
    // 消息需要解析key
    if (keyDeserialization != null) {
        keyDeserialization.deserialize(record.key(), keyCollector);
    }

    // project output while emitting values
    outputCollector.inputRecord = record;
    outputCollector.physicalKeyRows = keyCollector.buffer;
    outputCollector.outputCollector = collector;
    // 在 upsert 模式下如果 消息的 value 為空當(dāng)做一條 刪除消息
    if (record.value() == null && upsertMode) {
        // collect tombstone messages in upsert mode by hand
        outputCollector.collect(null);
    } else {
        valueDeserialization.deserialize(record.value(), outputCollector);
    }
    keyCollector.buffer.clear();
}

詳細(xì)看下 keyDeserialization.deserialize(record.key(), keyCollector);valueDeserialization.deserialize(record.value(), outputCollector); 方法

先看 key 的keyDeserialization.deserialize(record.key(), keyCollector);

keyCollectorDynamicKafkaDeserializationSchema$BufferingCollector
keyDeserializationorg.apache.flink.formats.json.JsonRowDataDeserializationSchema

再看 value 的 valueDeserialization.deserialize(record.value(), outputCollector);

outputCollectorDynamicKafkaDeserializationSchema$OutputProjectionCollector
valueDeserializationorg.apache.flink.formats.json.JsonRowDataDeserializationSchema

key 和 value 的deserialize 鏈路是一樣的都是先調(diào)用父類(lèi) org.apache.flink.api.common.serialization.DeserializationSchemadeserialize方法

default void deserialize(byte[] message, Collector<T> out) throws IOException {
    T deserialize = deserialize(message);
    if (deserialize != null) {
        out.collect(deserialize);
    }
}

然后調(diào)用子類(lèi)的 org.apache.flink.formats.json.JsonRowDataDeserializationSchemadeserialize方法

org.apache.flink.formats.json.JsonRowDataDeserializationSchema
public RowData deserialize(byte[] message) throws IOException {
    final JsonNode root = objectMapper.readTree(message);
    //重點(diǎn)看 怎么把一個(gè)jsonNode 怎么轉(zhuǎn)成一個(gè) RowData
    return (RowData) runtimeConverter.convert(root);
}

重點(diǎn)看 convert 方法 最后會(huì)調(diào)用如下方法

org.apache.flink.formats.json.JsonToRowDataConverters#createRowConverter
public JsonToRowDataConverter createRowConverter(RowType rowType) {
    // 省略了部分代碼,把一個(gè)jsonNode 怎么轉(zhuǎn)成一個(gè) RowData
    return jsonNode -> {
        ObjectNode node = (ObjectNode) jsonNode;
        int arity = fieldNames.length;
        // 初始化 GenericRowData 這個(gè)里面會(huì)會(huì)設(shè)置 FLink RowKind
        GenericRowData row = new GenericRowData(arity);
        for (int i = 0; i < arity; i++) {
            String fieldName = fieldNames[i];
            JsonNode field = node.get(fieldName);
            Object convertedField = convertField(fieldConverters[i], fieldName, field);
            row.setField(i, convertedField);
        }
        return row;
    };
}

org.apache.flink.table.data.GenericRowData
public GenericRowData(int arity) {
    this.fields = new Object[arity]; //字段個(gè)數(shù)
    this.kind = RowKind.INSERT; // INSERT as default
}
// 所以可以看到這個(gè)都是默認(rèn)的  INSERT 挖垛,那在什么時(shí)候 會(huì)把他變成其他類(lèi)型的 RowKind 呢?

反序列化結(jié)束 最后看 collect 下發(fā)到下游

先看 key 的 collect DynamicKafkaDeserializationSchema$BufferingCollector

private static final class BufferingCollector implements Collector<RowData>, Serializable {
    private final List<RowData> buffer = new ArrayList<>();
    @Override
    public void collect(RowData record) {
        // 貌似很簡(jiǎn)單就是放入到了一個(gè) buffer
        buffer.add(record);
    }
}

先看 value 的 collect DynamicKafkaDeserializationSchema$OutputProjectionCollector

private static final class OutputProjectionCollector implements Collector<RowData>, Serializable {
    @Override
    public void collect(RowData physicalValueRow) {
        // no key defined
        int length = keyProjection.length;
        // 沒(méi)有key 這不是 upsert kafka 的場(chǎng)景
        if (length == 0) {
            emitRow(null, (GenericRowData) physicalValueRow);
            return;
        }
        // 這里對(duì) kafka mesg value  進(jìn)行解析看看是否包含 除key 字段之外還有其他字段秉颗。如果其他字段都不存在相當(dāng)于 kafka mesg value 為空
        Boolean hashValue = false;
        if(physicalValueRow != null && length > 0){
            Set<Integer> collect = Arrays.stream(keyProjection).boxed().collect(Collectors.toSet());
            for(int i = 0; i < physicalValueRow.getArity() && !hashValue; i ++){
                if(collect.contains(i)){
                    continue;
                }
                hashValue = !physicalValueRow.isNullAt(i);
            }
        }
        // otherwise emit a value for each key
        for (RowData physicalKeyRow : physicalKeyRows) {
            if(!hashValue){
                // 如果 hashValue 為空則為 null
                emitRow((GenericRowData) physicalKeyRow,  null);
            }else{
                emitRow((GenericRowData) physicalKeyRow, (GenericRowData) physicalValueRow);
            }
        }
    }

    

    private void emitRow(@Nullable GenericRowData physicalKeyRow, @Nullable GenericRowData physicalValueRow) {
        final RowKind rowKind;
        if (physicalValueRow == null) {
            // 如果 physicalValueRow 為 null 且是 upsertMode 模式下 則 rowKind 為刪除
            if (upsertMode) {
                rowKind = RowKind.DELETE;
            } else {
                throw new DeserializationException(
                        "Invalid null value received in non-upsert mode. Could not to set row kind for output record.");
            }
        } else {
            // 否則就取原本的 rowKind 類(lèi)型
            rowKind = physicalValueRow.getRowKind();
        }

        //調(diào)用 org.apache.flink.util.Collector 下發(fā)下去
        outputCollector.collect(producedRow);
    }
}

所以可以出在整個(gè)反序列化到數(shù)據(jù)下發(fā)的過(guò)程 并沒(méi)有找 sate 看主鍵數(shù)據(jù)是否之前到過(guò) flink. 并且在這個(gè)過(guò)程中 rowKind 的設(shè)置就 2種 痢毒,首先默認(rèn)都是 insert 然后根據(jù) value 是否為空進(jìn)行判斷如果 value 為空則將 rowKind 設(shè)為 delete . 所以到底在哪里做判斷是否之前消費(fèi)過(guò)這個(gè)主鍵數(shù)據(jù)呢?

哪來(lái)的 state

偶然在 flink UI 上發(fā)現(xiàn)到 fink 在每一個(gè)source 下都接了一個(gè)changelogNormalize 的算子如下圖


image.png

恍然明白 這個(gè)應(yīng)該 flink 框架在將 flink sql 解析成 執(zhí)行計(jì)劃的時(shí)候加了一個(gè)特殊的算子蚕甥。跟蹤源碼如下哪替,這個(gè)鏈路太長(zhǎng)這里只說(shuō)下核心的幾個(gè)源碼

  • 任務(wù)main 方法 executeSql tEnv.executeSql("SELECT * from shop").print();
  • 根據(jù) QueryOperation 轉(zhuǎn)化成一個(gè) TableResult
public TableResult executeInternal(QueryOperation operation) {
    SelectSinkOperation sinkOperation = new SelectSinkOperation(operation);
    //看這個(gè) translate 方法
    List<Transformation<?>> transformations = translate(Collections.singletonList(sinkOperation));
    String jobName = getJobName("collect");
    Pipeline pipeline = execEnv.createPipeline(transformations, tableConfig, jobName);
}
  • org.apache.flink.table.planner.delegation.PlannerBase#translate
override def translate(
      modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]] = {
    if (modifyOperations.isEmpty) {
      return List.empty[Transformation[_]]
    }
    // prepare the execEnv before translating
    getExecEnv.configure(
      getTableConfig.getConfiguration,
      Thread.currentThread().getContextClassLoader)
    overrideEnvParallelism()
    // 這個(gè)方法也許是獲取上游依賴(lài)節(jié)點(diǎn)的意思
    val relNodes = modifyOperations.map(translateToRel)
   //對(duì)節(jié)點(diǎn)進(jìn)行優(yōu)化 重點(diǎn)看這里
    val optimizedRelNodes = optimize(relNodes)
    val execNodes = translateToExecNodePlan(optimizedRelNodes)
    translateToPlan(execNodes)
  }
  • 在會(huì)optimize 的時(shí)候被一個(gè) 叫StreamExecTableSourceScanRule 的規(guī)則匹配上
    他會(huì)將 FlinkLogicalTableSourceScan 轉(zhuǎn)換為 StreamExecTableSourceSource,能匹配的前提是它是一個(gè) FlinkLogicalTableSourceScan 并且對(duì)于 upsert source 來(lái)說(shuō)他還會(huì)優(yōu)化生成一個(gè) StreamExecChangelogNormalize.
org.apache.flink.table.planner.plan.rules.physical.stream.StreamExecTableSourceScanRule#convert
def convert(rel: RelNode): RelNode = {
    val scan = rel.asInstanceOf[FlinkLogicalTableSourceScan]
    val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
    val config = ShortcutUtils.unwrapContext(rel.getCluster).getTableConfig
    val table = scan.getTable.asInstanceOf[TableSourceTable]

    val newScan = new StreamExecTableSourceScan(
      rel.getCluster,
      traitSet,
      table)
    // 如果是一個(gè) upsert source
    if (isUpsertSource(table.catalogTable, table.tableSource) ||
        isSourceChangeEventsDuplicate(table.catalogTable, table.tableSource, config)) {
      // generate changelog normalize node
      // primary key has been validated in CatalogSourceTable
      val primaryKey = table.catalogTable.getSchema.getPrimaryKey.get()
      val keyFields = primaryKey.getColumns
      val inputFieldNames = newScan.getRowType.getFieldNames
      val primaryKeyIndices = ScanUtil.getPrimaryKeyIndices(inputFieldNames, keyFields)
      val requiredDistribution = FlinkRelDistribution.hash(primaryKeyIndices, requireStrict = true)
      val requiredTraitSet = rel.getCluster.getPlanner.emptyTraitSet()
        .replace(requiredDistribution)
        .replace(FlinkConventions.STREAM_PHYSICAL)
      val newInput: RelNode = RelOptRule.convert(newScan, requiredTraitSet)
      // 產(chǎn)生一個(gè) StreamExecChangelogNormalize
      new StreamExecChangelogNormalize(
        scan.getCluster,
        traitSet,
        newInput,
        primaryKeyIndices)
    } else {
      newScan
    }
  }
  • 上面相當(dāng)于加了一個(gè) StreamExecChangelogNormalize 節(jié)點(diǎn)后面會(huì)對(duì)這個(gè)節(jié)點(diǎn)轉(zhuǎn)成成 stream api
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecChangelogNormalize#translateToPlanInternal
override protected def translateToPlanInternal(
      planner: StreamPlanner): Transformation[RowData] = {

    val inputTransform = getInputNodes.get(0).translateToPlan(planner)
      .asInstanceOf[Transformation[RowData]]

    val rowTypeInfo = inputTransform.getOutputType.asInstanceOf[InternalTypeInfo[RowData]]
    val generateUpdateBefore = ChangelogPlanUtils.generateUpdateBefore(this)
    val tableConfig = planner.getTableConfig
    val isMiniBatchEnabled = tableConfig.getConfiguration.getBoolean(
      ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED)
    val stateIdleTime = tableConfig.getIdleStateRetention.toMillis
    val operator = if (isMiniBatchEnabled) {
      //省略
    } else {
     // 這是重點(diǎn)
      val processFunction = new ProcTimeDeduplicateKeepLastRowFunction(
        rowTypeInfo,
        stateIdleTime,
        generateUpdateBefore,
        true,   // generateInsert
        false)  // inputInsertOnly
     // 并且是一個(gè) KeyedProcessOperator
      new KeyedProcessOperator[RowData, RowData, RowData](processFunction)
    }

    val ret = new OneInputTransformation(
      inputTransform,
      getRelDetailedDescription,
      operator,
      rowTypeInfo,
      inputTransform.getParallelism)

    if (inputsContainSingleton()) {
      ret.setParallelism(1)
      ret.setMaxParallelism(1)
    }

    val selector = KeySelectorUtil.getRowDataSelector(uniqueKeys, rowTypeInfo)
    ret.setStateKeySelector(selector)
    ret.setStateKeyType(selector.getProducedType)
    ret
  }
  • 到這里已經(jīng)很明朗了菇怀,就是 ProcTimeDeduplicateKeepLastRowFunction 搞的名堂 感覺(jué)這個(gè) upsert 本身沒(méi)有太大的關(guān)系嘍凭舶,你說(shuō)你是不是披著狼皮的羊,害我在upsert kafka 代碼找了好久爱沟。

ProcTimeDeduplicateKeepLastRowFunction 源碼解析

package org.apache.flink.table.runtime.operators.deduplicate;

import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.util.Collector;

import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.processLastRowOnChangelog;
import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.processLastRowOnProcTime;

/**
 * This function is used to deduplicate on keys and keeps only last row.
 */
public class ProcTimeDeduplicateKeepLastRowFunction
        extends DeduplicateFunctionBase<RowData, RowData, RowData, RowData> {

    private static final long serialVersionUID = -291348892087180350L;
    private final boolean generateUpdateBefore;
    private final boolean generateInsert;
    private final boolean inputIsInsertOnly;

    public ProcTimeDeduplicateKeepLastRowFunction(
            InternalTypeInfo<RowData> typeInfo,
            long stateRetentionTime,
            boolean generateUpdateBefore,
            boolean generateInsert,
            boolean inputInsertOnly) {
        super(typeInfo, null, stateRetentionTime);
        this.generateUpdateBefore = generateUpdateBefore;
        this.generateInsert = generateInsert;
        this.inputIsInsertOnly = inputInsertOnly;
    }

    @Override
    public void processElement(RowData input, Context ctx, Collector<RowData> out) throws Exception {
        if (inputIsInsertOnly) {
            processLastRowOnProcTime(input, generateUpdateBefore, generateInsert, state, out);
        } else {
            // 重點(diǎn)看這里 父類(lèi)方法
            processLastRowOnChangelog(input, generateUpdateBefore, state, out);
        }
    }
}
  • 好激動(dòng)直接上源碼 RowKind 的變化千呼萬(wàn)喚始出來(lái) 注釋等很清晰了自己看吧
static void processLastRowOnChangelog(
            RowData currentRow,
            boolean generateUpdateBefore,
            ValueState<RowData> state,
            Collector<RowData> out) throws Exception {
        RowData preRow = state.value();
        RowKind currentKind = currentRow.getRowKind();
        if (currentKind == RowKind.INSERT || currentKind == RowKind.UPDATE_AFTER) {
            if (preRow == null) {
                // the first row, send INSERT message
                currentRow.setRowKind(RowKind.INSERT);
                out.collect(currentRow);
            } else {
                if (generateUpdateBefore) {
                    preRow.setRowKind(RowKind.UPDATE_BEFORE);
                    out.collect(preRow);
                }
                currentRow.setRowKind(RowKind.UPDATE_AFTER);
                out.collect(currentRow);
            }
            // normalize row kind
            currentRow.setRowKind(RowKind.INSERT);
            // save to state
            state.update(currentRow);
        } else {
            // DELETE or UPDATER_BEFORE
            if (preRow != null) {
                // always set to DELETE because this row has been removed
                // even the the input is UPDATE_BEFORE, there may no UPDATE_AFTER after it.
                preRow.setRowKind(RowKind.DELETE);
                // output the preRow instead of currentRow,
                // because preRow always contains the full content.
                // currentRow may only contain key parts (e.g. Kafka tombstone records).
                out.collect(preRow);
                // clear state as the row has been removed
                state.clear();
            }
            // nothing to do if removing a non-existed row
        }
    }
  • 上面方法會(huì) 從 state 中拿舊數(shù)據(jù)帅霜,還會(huì)更新回新數(shù)據(jù),再看一眼 state 是個(gè)啥玩意
    他是一個(gè)來(lái)自父類(lèi) DeduplicateFunctionBase 的 ValueState
abstract class DeduplicateFunctionBase<T, K, IN, OUT> extends KeyedProcessFunction<K, IN, OUT> {

    private static final long serialVersionUID = 1L;

    // the TypeInformation of the values in the state.
    protected final TypeInformation<T> typeInfo;
    protected final long stateRetentionTime;
    protected final TypeSerializer<OUT> serializer;
    // state stores previous message under the key.
    protected ValueState<T> state;

    public DeduplicateFunctionBase(
            TypeInformation<T> typeInfo,
            TypeSerializer<OUT> serializer,
            long stateRetentionTime) {
        this.typeInfo = typeInfo;
        this.stateRetentionTime = stateRetentionTime;
        this.serializer = serializer;
    }

    @Override
    public void open(Configuration configure) throws Exception {
        super.open(configure);
        ValueStateDescriptor<T> stateDesc = new ValueStateDescriptor<>("deduplicate-state", typeInfo);
        StateTtlConfig ttlConfig = createTtlConfig(stateRetentionTime);
        if (ttlConfig.isEnabled()) {
            stateDesc.enableTimeToLive(ttlConfig);
        }
        state = getRuntimeContext().getState(stateDesc);
    }
}

先到此結(jié)束,后面再談源碼改造

  • kafka 消息需要有 key. (至少社區(qū)大佬給的 demo 就是要攜帶 key)然而我們 kafka 生產(chǎn)端很多場(chǎng)景都是忽略了 key 關(guān)注的是 value. 這感覺(jué)限制了 upsert kafka 的使用呼伸,因?yàn)椴惶赡転榱耸褂?upsert kafka 而讓 kafka 生產(chǎn)端進(jìn)行消息改造身冀,這很不友好.
  • upsert kafka 默認(rèn)寫(xiě)死從 earilest 開(kāi)始消費(fèi),并且貌似沒(méi)有開(kāi)放其他的 消費(fèi)位置設(shè)置,這簡(jiǎn)直就是災(zāi)難搂根,你能忍我不能忍
    我會(huì)對(duì)以上兩點(diǎn)做出源碼改造
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末珍促,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子剩愧,更是在濱河造成了極大的恐慌猪叙,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,589評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件仁卷,死亡現(xiàn)場(chǎng)離奇詭異沐悦,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)五督,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,615評(píng)論 3 396
  • 文/潘曉璐 我一進(jìn)店門(mén)藏否,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人充包,你說(shuō)我怎么就攤上這事副签。” “怎么了基矮?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,933評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵淆储,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我家浇,道長(zhǎng)本砰,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,976評(píng)論 1 295
  • 正文 為了忘掉前任钢悲,我火速辦了婚禮点额,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘莺琳。我一直安慰自己还棱,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,999評(píng)論 6 393
  • 文/花漫 我一把揭開(kāi)白布惭等。 她就那樣靜靜地躺著珍手,像睡著了一般。 火紅的嫁衣襯著肌膚如雪辞做。 梳的紋絲不亂的頭發(fā)上琳要,一...
    開(kāi)封第一講書(shū)人閱讀 51,775評(píng)論 1 307
  • 那天,我揣著相機(jī)與錄音秤茅,去河邊找鬼稚补。 笑死,一個(gè)胖子當(dāng)著我的面吹牛嫂伞,可吹牛的內(nèi)容都是我干的孔厉。 我是一名探鬼主播拯钻,決...
    沈念sama閱讀 40,474評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼撰豺!你這毒婦竟也來(lái)了粪般?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 39,359評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤污桦,失蹤者是張志新(化名)和其女友劉穎亩歹,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體凡橱,經(jīng)...
    沈念sama閱讀 45,854評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡小作,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,007評(píng)論 3 338
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了稼钩。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片顾稀。...
    茶點(diǎn)故事閱讀 40,146評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖坝撑,靈堂內(nèi)的尸體忽然破棺而出静秆,到底是詐尸還是另有隱情,我是刑警寧澤巡李,帶...
    沈念sama閱讀 35,826評(píng)論 5 346
  • 正文 年R本政府宣布抚笔,位于F島的核電站,受9級(jí)特大地震影響侨拦,放射性物質(zhì)發(fā)生泄漏殊橙。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,484評(píng)論 3 331
  • 文/蒙蒙 一狱从、第九天 我趴在偏房一處隱蔽的房頂上張望膨蛮。 院中可真熱鬧,春花似錦矫夯、人聲如沸鸽疾。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,029評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至冒窍,卻和暖如春递沪,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背综液。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,153評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工款慨, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人谬莹。 一個(gè)月前我還...
    沈念sama閱讀 48,420評(píng)論 3 373
  • 正文 我出身青樓檩奠,卻偏偏與公主長(zhǎng)得像桩了,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子埠戳,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,107評(píng)論 2 356

推薦閱讀更多精彩內(nèi)容