本文首先進(jìn)行 Flink Kafka Consumer 原理分析,結(jié)合 SourceFunction 和 Kafka Client API 詳解源碼刃麸。
1.Flink Kafka Consumer 原理
本文基于 flink-1.11 分析 Kafka Consumer 原理。
FlinkKafkaConsumer 主要是繼承基類(lèi) RichParallelSourceFunction司浪,不但可以執(zhí)行 run(...) 方法讀取數(shù)據(jù)泊业,而且擁有狀態(tài)把沼、metric 和多并發(fā)等功能。
1.1 RichParallelSourceFunction 分析
RichParallelSourceFunction 與父類(lèi)的繼承關(guān)系吁伺,如下圖所示饮睬。一方面,RichParallelSourceFunction 間接實(shí)現(xiàn)接口 SourceFunction篮奄,可以執(zhí)行 run(...) 方法讀取數(shù)據(jù)捆愁;另一方面,RichParallelSourceFunction 間接實(shí)現(xiàn)接口 RichFunction窟却,擁有狀態(tài)昼丑、metric 和多并發(fā)等功能。因此夸赫,RichParallelSourceFunction 是有狀態(tài)的和多并發(fā)的 Source 基類(lèi)菩帝。
① ParallelSourceFunction 是接口 SourceFunction 的子類(lèi)。共同點(diǎn)是 Source 的基類(lèi)茬腿,需要實(shí)現(xiàn) run() 讀取數(shù)據(jù)呼奢。不同點(diǎn)是前者提供多并發(fā)的能力,后者的并發(fā)度只能為 1切平;
② AbstractRichFunction 是接口 RichFunction 的實(shí)現(xiàn)類(lèi)握础,可以提供 open() 方法獲取 RuntimeContext,而 RuntimeContext 擁有 metric揭绑、subtasks 信息弓候、accumulator、state 等功能他匪;
1.2 Flink Kafka Consumer 流程分析
如下圖所示菇存,F(xiàn)link Kafka Consumer 流程主要分為 ①主線(xiàn)程循環(huán)獲取緩存數(shù)據(jù),發(fā)送到下游邦蜜;②消費(fèi)線(xiàn)程循環(huán)消費(fèi) Kafka 數(shù)據(jù)依鸥,保存到緩存。
Handover.next:Handover 類(lèi)的 next 屬性悼沈,即 ConsumerRecords 類(lèi)型的緩存數(shù)據(jù)贱迟。Handover 的主要作用是協(xié)調(diào)主線(xiàn)程和消費(fèi)線(xiàn)程,有序地消費(fèi) Kafka 和發(fā)送數(shù)據(jù)到下游算子絮供。
(1)主線(xiàn)程
主線(xiàn)程獲取緩存的 Handover.next 對(duì)象即 ConsumerRecords衣吠,發(fā)送到下游算子。首先創(chuàng)建 KafkaFetcher壤靶,同時(shí)內(nèi)部創(chuàng)建消費(fèi)線(xiàn)程 KafkaConsumerThread缚俏。然后,調(diào)用 KafkaFetcher.runFetchLoop() 方法,啟動(dòng)消費(fèi)線(xiàn)程忧换、循環(huán)獲取緩存數(shù)據(jù)恬惯;最后,根據(jù)分區(qū)往下游發(fā)送數(shù)據(jù)亚茬。
(2)消費(fèi)線(xiàn)程
消費(fèi)線(xiàn)程 KafkaConsumerThread 主要循環(huán)消費(fèi) Kafka 數(shù)據(jù)酪耳,保存到緩存。首先刹缝,主線(xiàn)程啟動(dòng)消費(fèi)線(xiàn)程碗暗。接著,KafkaConsumer 從 Kafka Broker 循環(huán) poll 數(shù)據(jù)赞草,同時(shí)保持到緩存中讹堤。
2.Flink Kafka Consumer 源碼詳解
問(wèn)題1:如何使用 FlinkKafkaConsumer ?如何直接使用 KafkaClient API 厨疙?
/**
* 示例1: Flink DataStream API 使用 FlinkKafkaConsumer
**/
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//SimpleStringSchema為數(shù)據(jù)字段解析類(lèi)
env.addSource(new FlinkKafkaConsumer<>("eventTopic", new SimpleStringSchema(), properties)
/**
* 示例2: KafkaClient API 直接使用 KafkaConsumer
**/
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.poll(Duration.ofMillis(100));
問(wèn)題2:FlinkKafkaConsumer 內(nèi)部是如何使用 KafkaClient API 洲守?
① 初始化
執(zhí)行 env.addSource 的時(shí)候會(huì)創(chuàng)建 StreamSource 算子對(duì)象;StreamSource 構(gòu)造函數(shù)中將 function 即 FlinkKafkaConsumer 對(duì)象傳給父類(lèi) AbstractUdfStreamOperator 的 userFunction 變量沾凄;
StreamExecutionEnvironment源碼:
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo) {
// 省略...
// function 即 FlinkKafkaConsumer
final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function);
// 省略...
}
AbstractUdfStreamOperator源碼:
// userFunction 即 FlinkKafkaConsumer
public AbstractUdfStreamOperator(F userFunction) {
this.userFunction = requireNonNull(userFunction);
checkUdfCheckpointingPreconditions();
}
② Task 啟動(dòng)和運(yùn)行
Task 實(shí)現(xiàn) Java多線(xiàn)程接口 Runnable梗醇。Task 啟動(dòng)后,函數(shù)調(diào)用鏈如下 Task.run() -> Task.doRun() -> StreamTask.invoke() -> StreamTask.runMailboxLoop() -> MailboxProcessor.runMailboxLoop() -> MailboxProcessor.runMailboxStep() -> SourceStreamTask .processInput()撒蟀。processInput() 方法里面啟動(dòng)線(xiàn)程 sourceThread.start()叙谨。上述的關(guān)鍵源碼,如下所示保屯。
StreamTask 源碼如下:
@Override
public final void invoke() throws Exception {
// 省略...
// 調(diào)用 MailboxProcessor.runMailboxLoop()
runMailboxLoop();
// 省略...
}
MailboxProcessor 源碼如下:
public void runMailboxLoop() throws Exception {
// 省略...
// 循環(huán)執(zhí)行 runMailboxStep
while (runMailboxStep(localMailbox, defaultActionContext)) {
}
}
private boolean runMailboxStep(TaskMailbox localMailbox, MailboxController defaultActionContext) throws Exception {
if (processMail(localMailbox)) {
// 執(zhí)行 mailboxDefaultAction.runDefaultAction手负,即執(zhí)行 SourceStreamTask .processInput()
mailboxDefaultAction.runDefaultAction(defaultActionContext); // lock is acquired inside default action as needed
return true;
}
return false;
}
SourceStreamTask 源碼如下:
@Override
protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
// 由于目前沒(méi)有輸入,TaskMailbox 先暫停 loop 主線(xiàn)程
controller.suspendDefaultAction();
sourceThread.setTaskDescription(getName());
sourceThread.start();
// 省略...
}
private class LegacySourceFunctionThread extends Thread {
// 省略...
@Override
public void run() {
try {
// 執(zhí)行 source function 的 run() 方法
mainOperator.run(lock, getStreamStatusMaintainer(), operatorChain);
completionFuture.complete(null);
} catch (Throwable t) {
completionFuture.completeExceptionally(t);
}
}
// 省略...
}
③ 消費(fèi) Kafka
FlinkKafkaConsumerBase 間接實(shí)現(xiàn)了 SourceFunction 接口姑尺,主要實(shí)現(xiàn) run() 方法竟终。然后,在 run() 方法創(chuàng)建了一個(gè) KafkaFetcher 對(duì)象切蟋,并主要調(diào)用 KafkaFetcher.runFetchLoop()统捶。最終,運(yùn)行消費(fèi)線(xiàn)程 KafkaConsumerThread柄粹,并 while 循環(huán)地 poll Kafka 數(shù)據(jù)喘鸟。上述的關(guān)鍵源碼,如下所示驻右。
FlinkKafkaConsumerBase 源碼如下:
@Override
public void run(SourceContext<T> sourceContext) throws Exception {
// 省略...
// 創(chuàng)建 KafkaFetcher 對(duì)象
this.kafkaFetcher = createFetcher(
sourceContext,
subscribedPartitionsToStartOffsets,
watermarkStrategy,
(StreamingRuntimeContext) getRuntimeContext(),
offsetCommitMode,
getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
useMetrics);
// 省略...
// kafkaFetcher 執(zhí)行 runFetchLoop()什黑,即循環(huán)消費(fèi)數(shù)據(jù)
kafkaFetcher.runFetchLoop();
// 省略...
}
KafkaFetcher 源碼如下:
@Override
public void runFetchLoop() throws Exception {
try {
// 啟動(dòng)消費(fèi)線(xiàn)程 KafkaConsumerThread
consumerThread.start();
while (running) {
// 獲取協(xié)調(diào)者 Handover 的 next 緩存值
final ConsumerRecords<byte[], byte[]> records = handover.pollNext();
// 從partition 獲取 數(shù)據(jù)
for (KafkaTopicPartitionState<T, TopicPartition> partition : subscribedPartitionStates()) {
List<ConsumerRecord<byte[], byte[]>> partitionRecords =
records.records(partition.getKafkaPartitionHandle());
// 向下游發(fā)送數(shù)據(jù)
partitionConsumerRecordsHandler(partitionRecords, partition);
}
}
}
finally {
consumerThread.shutdown();
}
KafkaConsumerThread 源碼如下,run() 方法中創(chuàng)建 KafkaClient API 的 KafkaConsumer堪夭,并使用 KafkaConsumer.poll() 消費(fèi)數(shù)據(jù)兑凿。
@Override
public void run() {
// 省略...
// 從主線(xiàn)程獲取的 handover 賦值給本地變量...
final Handover handover = this.handover;
// 省略...
try {
// 創(chuàng)建 KafkaConsumer
this.consumer = getConsumer(kafkaProperties);
}
catch (Throwable t) {
handover.reportError(t);
return;
}
// 省略...
ConsumerRecords<byte[], byte[]> records = null;
// while 循環(huán)消費(fèi) Kafka
while (running) {
// 省略...
if (records == null) {
try {
// KafkaConsumer poll 數(shù)據(jù)凯力,即使用 KafkaClient API 的 KafkaConsumer 消費(fèi)數(shù)據(jù)
records = consumer.poll(pollTimeout);
}
catch (WakeupException we) {
continue;
}
}
try {
// 把 Kafka 的數(shù)據(jù)保存在 Handover 的緩存中
handover.produce(records);
records = null;
}
// 省略...
}
}
問(wèn)題3:Handover 是如何協(xié)調(diào)消費(fèi)線(xiàn)程和主線(xiàn)程,使得前者可以及時(shí)消費(fèi)和保存數(shù)據(jù)礼华,而后者也可以及時(shí)獲取數(shù)據(jù) ?
Handover 的關(guān)鍵方法是 produce() 保存緩存數(shù)據(jù) next拗秘、pollNext() 獲取緩存數(shù)據(jù) next圣絮,主要作用是在消費(fèi)線(xiàn)程和主線(xiàn)程下,保證同一個(gè)緩存數(shù)據(jù) next 雕旨,在同一時(shí)間內(nèi)是不能既更新(寫(xiě))扮匠,也輸出(讀),即保證原子性操作 next凡涩。
Handover 源碼如下:
/**
* consumer 線(xiàn)程把 Kafka 數(shù)據(jù)保存到 next
**/
public void produce(final ConsumerRecords<byte[], byte[]> element)
throws InterruptedException, WakeupException, ClosedException {
checkNotNull(element);
synchronized (lock) {
// 循環(huán)判斷 next 是否為 null
while (next != null && !wakeupProducer) {
// lock 會(huì)釋放當(dāng)前的鎖棒搜,該 consumer 線(xiàn)程進(jìn)入 waiting 狀態(tài)
lock.wait();
}
// 省略...
else if (error == null) {
// 寫(xiě) next
next = element;
// 喚醒 lock(使得處于 waiting 狀態(tài)的 main 線(xiàn)程能夠繼續(xù)執(zhí)行)
lock.notifyAll();
}
// 省略...
}
}
/**
* main 線(xiàn)程讀取 next
**/
public ConsumerRecords<byte[], byte[]> pollNext() throws Exception {
synchronized (lock) {
// 循環(huán)判斷 next 是否為 null
while (next == null && error == null) {
// lock 會(huì)釋放當(dāng)前的鎖,該 main 線(xiàn)程進(jìn)入 waiting 狀態(tài)
lock.wait();
}
// 讀取 next
ConsumerRecords<byte[], byte[]> n = next;
if (n != null) {
next = null;
// 喚醒 lock(使得處于 waiting 狀態(tài)的 consumer 線(xiàn)程能夠繼續(xù)執(zhí)行)
lock.notifyAll();
return n;
}
// 省略...
}
}
Java 多線(xiàn)程的等待/通知機(jī)制:Object 的 wait()活箕、notify/notifyAll()
① 當(dāng)線(xiàn)程執(zhí)行 wait() 方法的時(shí)候力麸,會(huì)釋放當(dāng)前的鎖,然后讓出CPU育韩,進(jìn)入等待狀態(tài)克蚂。
② 當(dāng)線(xiàn)程執(zhí)行 notify/notifyAll() 方法的時(shí)候,會(huì)喚醒一個(gè)或多個(gè)正處于等待狀態(tài)的線(xiàn)程筋讨,然后繼續(xù)往下執(zhí)行埃叭,直到執(zhí)行完synchronized 代碼塊的代碼或是中途遇到 wait() ,再次釋放鎖悉罕。