大數(shù)據(jù)筆記

1.同時支持流處理和批處理的計算引擎历筝,只有兩種選擇:一個 是 Apache Spark,一個是 Apache Flink友鼻。Spark 的技術理念是基于批來模擬流的計算傻昙,F(xiàn)link基于流計算來模擬批計算
2.Flink 是一個低延遲、高吞吐彩扔、統(tǒng)一的大數(shù)據(jù)計算引擎妆档,F(xiàn)link 的計算 平臺可以實現(xiàn)毫秒級的延遲情況下,每秒鐘處理上億次的消息或者事件虫碉。同時 Flink 提供了一個 Exactly-once 的一致性語義贾惦。保證了數(shù)據(jù)的正確性


image.png

3.Flink提供了有狀態(tài)的計算,支持狀態(tài)管理敦捧,支持強一致性的數(shù)據(jù)語義须板,以及支持 Event Time,WaterMark 對消息亂序的處理

4.狀態(tài)管理

經(jīng)常要對數(shù)據(jù)進行統(tǒng)計, 如 Sum兢卵、Count习瑰、Min、Max,這些值是需要存儲的秽荤。因為要不斷更新甜奄,這些值或者變量就可以理 解為一種狀態(tài)。Flink 提供了內(nèi)置的狀態(tài)管理王滤,可以把這些狀態(tài)存儲在 Flink 內(nèi)部贺嫂,而不需要把它存儲在外部系 統(tǒng)。link 會定期將這些狀 態(tài)做 Checkpoint 持久化雁乡,把 Checkpoint 存儲到一個分布式的持久化系統(tǒng)中第喳,比如 HDFS。這樣 的話踱稍,當 Flink 的任務出現(xiàn)任何故障時曲饱,它都會從最近的一次 Checkpoint 將整個流的狀態(tài)進行 恢復悠抹,然后繼續(xù)運行它的流處理

  • 4.1 Flink 是如何做到在 Checkpoint 恢復過程中沒有任何數(shù)據(jù)的丟失和數(shù)據(jù)的冗余?來保證精準計 算的?

Flink 利用了一套非常經(jīng)典的 Chandy-Lamport 算法,它的核心思想是把這個流計 算看成一個流式的拓撲扩淀,定期從這個拓撲的頭部 Source 點開始插入特殊的 Barriers楔敌,從上游開 始不斷的向下游廣播這個 Barriers。每一個節(jié)點收到所有的 Barriers,會將 State 做一次 Snapshot驻谆, 當每個節(jié)點都做完 Snapshot 之后卵凑,整個拓撲就算完整的做完了一次 Checkpoint。接下來不管出 現(xiàn)任何故障胜臊,都會從最近的 Checkpoint 進行恢復勺卢。Flink 利用這套經(jīng)典的算法,保證了強一致性的語義


image.png
  • 4.2 Flink 是如何解決亂序問題的象对?

Flink 提供了 Event Time 和 WaterMark 的一些先進技術 來解決亂序的問題黑忱。使得用戶可以有序的處理這個消息


image.png
  • 5.Flink在滴滴的應用

1、 軌跡數(shù)據(jù):軌跡數(shù)據(jù)和訂單數(shù)據(jù)往往是業(yè)務方特別關心的勒魔。同時因為每一個用戶在打車以 后甫煞,都必須要實時的看到自己的軌跡,所以這些數(shù)據(jù)有強烈的實時需求冠绢。
2抚吠、 交易數(shù)據(jù):滴滴的交易數(shù)據(jù)
3、 埋點數(shù)據(jù):滴滴各個業(yè)務方的埋點數(shù)據(jù)唐全,包括終端以及后端的所有業(yè)務數(shù)據(jù)埃跷,
4、 日志數(shù)據(jù):整個的日志系統(tǒng)都有一些特別強烈的實時需求


image.png
  • 6.Flink 與 Storm 兩個框架對比:


    image.png
  • 7.IDEA快速搭建Flink
image.png

Flink程序的常規(guī)步驟:
1.獲取一個執(zhí)行環(huán)境(execution environment)
2.加載/創(chuàng)建初始數(shù)據(jù)邮利;
3.指定數(shù)據(jù)相關的轉換弥雹;
4.指定計算結果的存儲位置;
5.觸發(fā)程序執(zhí)行

  • Flink UML關系圖
image.png
  • Flink實操

基于兩個集群進行的: Flink Session Cluster 以及一個 Kafka 集群延届。
一個 Flink 集群總是包含一個 JobManager 以及一個或多個 Flink TaskManager剪勿。JobManager 負責處理 Job 提交、 Job 監(jiān)控以及資源管理方庭。Flink TaskManager 運行 worker 進程厕吉, 負責實際任務 Tasks 的執(zhí)行,而這些任務共同組成了一個 Flink Job械念。 在這篇文章中头朱, 我們會先運行一個 TaskManager,接下來會擴容到多個 TaskManager龄减。 另外项钮,這里我們會專門使用一個 client 容器來提交 Flink Job, 后續(xù)還會使用該容器執(zhí)行一些操作任務。需要注意的是烁巫,F(xiàn)link 集群的運行并不需要依賴 client 容器署隘, 我們這里引入只是為了使用方便。
這里的 Kafka 集群由一個 Zookeeper 服務端和一個 Kafka Broker 組成

image.png

一開始亚隙,我們會往 JobManager 提交一個名為 Flink 事件計數(shù) 的 Job磁餐,此外,我們還創(chuàng)建了兩個 Kafka Topic:input 和 output
image.png

該 Job 負責從 input topic 消費點擊事件 ClickEvent阿弃,每個點擊事件都包含一個 timestamp 和一個 page 屬性诊霹。 這些事件將按照 page 屬性進行分組,然后按照每 15s 窗口 windows 進行統(tǒng)計渣淳, 最終結果輸出到 output topic 中畅哑。
總共有 6 種不同的 page 屬性,針對特定 page水由,我們會按照每 15s 產(chǎn)生 1000 個點擊事件的速率生成數(shù)據(jù)。 因此赛蔫,針對特定 page砂客,該 Flink job 應該能在每個窗口中輸出 1000 個該 page 的點擊數(shù)據(jù)

  • Flink環(huán)境搭建

1>需要在自己的主機上提前安裝好 docker (1.12+) 和 docker-compose (2.1+)
2>使用的配置文件位于 flink-playgrounds 倉庫中, 首先檢出該倉庫并構建 docker 鏡像:

git clone https://github.com/apache/flink-playgrounds.git
cd flink-playgrounds/operations-playground
docker-compose build

接下來在開始運行之前先在 Docker 主機上創(chuàng)建檢查點和保存點目錄(這些卷由 jobmanager 和 taskmanager 掛載呵恢,如 docker-compose.yaml 中所指定的):

mkdir -p /Users/wudy.yu/flink/tmp/flink-checkpoints-directory/
mkdir -p /Users/wudy.yu/flink/tmp/flink-savepoints-directory 

然后啟動環(huán)境(docker-compose up -d):

image.png

接下來你可以執(zhí)行如下命令來查看正在運行中的 Docker 容器:

wudy.yu@wudyyudeMacBook-Pro operations-playground % docker-compose >ps
                 Name                                 Command               State                     >Ports                  
------------------------------------------------------------------------------------------------------->-----------------------
operations-playground_clickevent-           /docker-entrypoint.sh java ...   Up      >6123/tcp, 8081/tcp                       
generator_1                                                                                                                   
operations-playground_client_1              /docker-entrypoint.sh flin ...   Up      >6123/tcp, 8081/tcp                       
operations-playground_jobmanager_1          /docker-entrypoint.sh jobm ...   >Up      6123/tcp, 0.0.0.0:8081->8081/tcp         
operations-playground_kafka_1               start-kafka.sh                   Up      >0.0.0.0:9092->9092/tcp,                  
                                                                                   0.0.0.0:9094->9094/tcp                   
operations-playground_taskmanager_1         /docker-entrypoint.sh task ...   >Up      6123/tcp, 8081/tcp                       
operations-playground_zookeeper_1           /bin/sh -c /usr/sbin/sshd  ...   Up      >2181/tcp, 22/tcp, 2888/tcp, 3888/tcp     

從上面的信息可以看出 client 容器已成功提交了 Flink Job (Exit 0)鞠值, 同時包含數(shù)據(jù)生成器在內(nèi)的所有集群組件都處于運行中狀態(tài) (Up)。
你可以執(zhí)行如下命令停止 docker 環(huán)境:
docker-compose down -v

  • Flink WebUI 界面 #

觀察Flink集群首先想到的就是 Flink WebUI 界面:打開瀏覽器并訪問 http://localhost:8081渗钉,如果一切正常彤恶,你將會在界面上看到一個 TaskManager 和一個處于 “RUNNING” 狀態(tài)的名為 Click Event Count 的 Job

image.png

  • 日志 #

  • JobManager

JobManager 日志可以通過 docker-compose 命令進行查看:
docker-compose logs -f jobmanager
JobManager 剛啟動完成之時,你會看到很多關于 checkpoint completion (檢查點完成)的日志

  • TaskManager

TaskManager 日志也可以通過同樣的方式進行查看:
docker-compose logs -f taskmanager
TaskManager 剛啟動完成之時鳄橘,你同樣會看到很多關于 checkpoint completion (檢查點完成)的日志

Flink CLI #

Flink CLI 相關命令可以在 client 容器內(nèi)進行使用声离。 比如,想查看 Flink CLI 的 help 命令瘫怜,可以通過如下方式進行查看:
docker-compose run --no-deps client flink --help

Flink REST API #

Flink REST API 可以通過本機的 localhost:8081 進行訪問术徊,也可以在 client 容器中通過 jobmanager:8081 進行訪問。 比如鲸湃,通過如下命令可以獲取所有正在運行中的 Job:
curl localhost:8081/jobs

wudy.yu@wudyyudeMacBook-Pro ~ % curl  localhost:8081/jobs
{"jobs": [{"id":"23023e23c9b6539c18bad615e10419dd","status":"RUNNING"}]}%   

Kafka Topics #

可以運行如下命令查看 Kafka Topics 中的記錄:

//input topic (1000 records/s)
docker-compose exec kafka kafka-console-consumer.sh \
--bootstrap-server localhost:9092 --topic input
//output topic (24 records/min)
docker-compose exec kafka kafka-console-consumer.sh \
--bootstrap-server localhost:9092 --topic output

核心特性探索 #

CLI命令:
docker-compose run --no-deps client flink list

Creating operations-playground_client_run ... done
Waiting for response...
------------------ Running/Restarting Jobs -------------------
21.05.2023 13:04:01 : 23023e23c9b6539c18bad615e10419dd : Click Event >Count (RUNNING)
--------------------------------------------------------------
No scheduled jobs.

一旦 Job 提交赠涮,F(xiàn)link 會默認為其生成一個 JobID,后續(xù)對該 Job 的 所有操作(無論是通過 CLI 還是 REST API)都需要帶上 JobID

Job 失敗與恢復 #

在 Job (部分)失敗的情況下暗挑,F(xiàn)link 對事件處理依然能夠提供精確一次的保障笋除, 在本節(jié)中你將會觀察到并能夠在某種程度上驗證這種行為

Step 1: 觀察輸出 #

前文所述,事件以特定速率生成炸裆,剛好使得每個統(tǒng)計窗口都包含確切的 1000 條記錄垃它。 因此,你可以實時查看 output topic 的輸出,確定失敗恢復后所有的窗口依然輸出正確的統(tǒng)計數(shù)字嗤瞎, 以此來驗證 Flink 在 TaskManager 失敗時能夠成功恢復墙歪,而且不丟失數(shù)據(jù)、不產(chǎn)生數(shù)據(jù)重復贝奇。
為此虹菲,通過控制臺命令消費 output topic,保持消費直到 Job 從失敗中恢復 :

 --bootstrap-server localhost:9092 --topic output```

Step 2: 模擬失敗 #

為了模擬部分失敗故障掉瞳,你可以 kill 掉一個 TaskManager毕源,這種失敗行為在生產(chǎn)環(huán)境中就相當于 TaskManager 進程掛掉、TaskManager 機器宕機或者從框架或用戶代碼中拋出的一個臨時異常(例如陕习,由于外部資源暫時不可用)而導致的失敗霎褐。
docker-compose kill taskmanager
幾秒鐘后,JobManager 就會感知到 TaskManager 已失聯(lián)该镣,接下來它會 取消 Job 運行并且立即重新提交該 Job 以進行恢復冻璃。 當 Job 重啟后,所有的任務都會處于 SCHEDULED 狀態(tài)损合,如以下截圖中紫色方格所示

image.png

注意:雖然 Job 的所有任務都處于 SCHEDULED 狀態(tài)省艳,但整個 Job 的狀態(tài)卻顯示為 RUNNING
此時,由于 TaskManager 提供的 TaskSlots 資源不夠用嫁审,Job 的所有任務都不能成功轉為 RUNNING 狀態(tài)跋炕,直到有新的 TaskManager 可用。在此之前律适,該 Job 將經(jīng)歷一個取消和重新提交 不斷循環(huán)的過程辐烂。
與此同時,數(shù)據(jù)生成器 (data generator) 一直不斷地往 input topic 中生成 ClickEvent 事件捂贿,在生產(chǎn)環(huán)境中也經(jīng)常出現(xiàn)這種 Job 掛掉但源頭還在不斷產(chǎn)生數(shù)據(jù)的情況

Step 3: 失敗恢復 #

一旦 TaskManager 重啟成功纠修,它將會重新連接到 JobManager。
docker-compose up -d taskmanager
當 TaskManager 注冊成功后眷蜓,JobManager 就會將處于 SCHEDULED 狀態(tài)的所有任務調(diào)度到該 TaskManager 的可用 TaskSlots 中運行分瘾,此時所有的任務將會從失敗前最近一次成功的 checkpoint 進行恢復, 一旦恢復成功吁系,它們的狀態(tài)將轉變?yōu)?RUNNING德召。
接下來該 Job 將快速處理 Kafka input 事件的全部積壓(在 Job 中斷期間累積的數(shù)據(jù)), 并以更快的速度(>24 條記錄/分鐘)產(chǎn)生輸出汽纤,直到它追上 kafka 的 lag 延遲為止上岗。 此時觀察 output topic 輸出, 你會看到在每一個時間窗口中都有按 page 進行分組的記錄蕴坪,而且計數(shù)剛好是 1000肴掷。 由于我們使用的是 FlinkKafkaProducer “至少一次"模式敬锐,因此你可能會看到一些記錄重復輸出多次。
注意:在大部分生產(chǎn)環(huán)境中都需要一個資源管理器 (Kubernetes呆瞻、Yarn)對 失敗的 Job 進行自動重啟

Job 升級與擴容 #

升級 Flink 作業(yè)一般都需要兩步:第一台夺,使用 Savepoint 優(yōu)雅地停止 Flink Job。 Savepoint 是整個應用程序狀態(tài)的一次快照(類似于 checkpoint )痴脾,該快照是在一個明確定義的颤介、全局一致的時間點生成的。第二赞赖,從 Savepoint 恢復啟動待升級的 Flink Job滚朵。 在此,“升級”包含如下幾種含義:

  • 配置升級(比如 Job 并行度修改)
  • Job 拓撲升級(比如添加或者刪除算子)
  • Job 的用戶自定義函數(shù)升級
    在開始升級之前前域,你可能需要實時查看 Output topic 輸出辕近, 以便觀察在升級過程中沒有數(shù)據(jù)丟失或損壞
--bootstrap-server localhost:9092 --topic output

Step 1: 停止 Job #

要優(yōu)雅停止 Job,需要使用 JobID 通過 CLI 或 REST API 調(diào)用 “stop” 命令匿垄。 JobID 可以通過獲取所有運行中的 Job 接口或 Flink WebUI 界面獲取移宅,拿到 JobID 后就可以繼續(xù)停止作業(yè)了:
CLI命令:
docker-compose run --no-deps client flink stop <job-id>
預期輸出

Suspending job "<job-id>" with a savepoint.
Suspended job "<job-id>" with a savepoint.

Savepoint 已保存在 state.savepoints.dir 指定的路徑中,該配置在 flink-conf.yaml 中定義椿疗,flink-conf.yaml 掛載在本機的 /Users/wudy.yu/flink/tmp/flink-savepoints-directory/ 目錄下吞杭。 在下一步操作中我們會用到這個 Savepoint 路徑,如果我們是通過 REST API 操作的变丧, 那么 Savepoint 路徑會隨著響應結果一起返回,我們可以直接查看文件系統(tǒng)來確認 Savepoint 保存情況
ls -lia /tmp/flink-savepoints-directory

total 0
13957451 drwxr-xr-x  2 wudy.yu  staff   64  5 20 23:04 .
13957440 drwxr-xr-x  6 wudy.yu  staff  192  5 20 23:10 ..

Step 2a: 重啟 Job (不作任何變更) #

現(xiàn)在你可以從這個 Savepoint 重新啟動待升級的 Job绢掰,為了簡單起見痒蓬,不對該 Job 作任何變更就直接重啟
CLI命令:

docker-compose run --no-deps client flink run -s <savepoint-path> \
-d /opt/ClickCountJob.jar \
 --bootstrap.servers kafka:9092 --checkpointing --event-time

預期輸出

Starting execution of program
Job has been submitted with JobID <job-id>

一旦該 Job 再次處于 RUNNING 狀態(tài),你將從 output Topic 中看到數(shù)據(jù)在快速輸出滴劲, 因為剛啟動的 Job 正在處理停止期間積壓的大量數(shù)據(jù)攻晒。另外,你還會看到在升級期間 沒有產(chǎn)生任何數(shù)據(jù)丟失:所有窗口都在輸出 1000

Step 2b: 重啟 Job (修改并行度) #

在從 Savepoint 重啟 Job 之前班挖,你還可以通過修改并行度來達到擴容 Job 的目的

docker-compose run --no-deps client flink run -p 3 -s <savepoint-path> \
 -d /opt/ClickCountJob.jar \
--bootstrap.servers kafka:9092 --checkpointing --event-time

預期輸出:

Starting execution of program
Job has been submitted with JobID <job-id>

現(xiàn)在 Job 已重新提交鲁捏,但由于我們提高了并行度所以導致 TaskSlots 不夠用(1 個 TaskSlot 可用,總共需要 3 個)萧芙,最終 Job 會重啟失敗给梅。通過如下命令
docker-compose scale taskmanager=2
你可以向 Flink 集群添加第二個 TaskManager(為 Flink 集群提供 2 個 TaskSlots 資源), 它會自動向 JobManager 注冊双揪,TaskManager 注冊完成后动羽,Job 會再次處于 “RUNNING” 狀態(tài)。
一旦 Job 再次運行起來渔期,從 output Topic 的輸出中你會看到在擴容期間數(shù)據(jù)依然沒有丟失: 所有窗口的計數(shù)都正好是 1000

查詢 Job 指標 #

可以通過 JobManager 提供的 REST API 來獲取系統(tǒng)和用戶指標
具體請求方式取決于我們想查詢哪類指標运吓,Job 相關的指標分類可通過 jobs/<job-id>/metrics 獲得渴邦,而要想查詢某類指標的具體值則可以在請求地址后跟上 get 參數(shù)
curl "localhost:8081/jobs/<jod-id>/metrics?get=lastCheckpointSize"

wudy.yu@wudyyudeMacBook-Pro ~ % curl >"localhost:8081/jobs/23023e23c9b6539c18bad615e10419dd/metrics?>get=lastCheckpointSize"
[{"id":"lastCheckpointSize","value":"5411"}]%     

REST API 不僅可以用于查詢指標,還可以用于獲取正在運行中的 Job 詳細信息 curl localhost:8081/jobs/23023e23c9b6539c18bad615e10419dd

{
    "jid": "23023e23c9b6539c18bad615e10419dd",
    "name": "Click Event Count",
    "isStoppable": false,
    "state": "RUNNING",
    "start-time": 1684674241279,
    "end-time": -1,
    "duration": 3787413,
    "maxParallelism": -1,
    "now": 1684678028692,
    "timestamps": {
        "INITIALIZING": 1684674241279,
        "FINISHED": 0,
        "FAILING": 0,
        "CANCELLING": 0,
        "RECONCILING": 0,
        "RESTARTING": 1684676632478,
        "RUNNING": 1684676633487,
        "FAILED": 0,
        "CREATED": 1684674243053,
        "CANCELED": 0,
        "SUSPENDED": 0
    },
    "vertices": [{
        "id": "bc764cd8ddf7a0cff126f51c16239658",
        "name": "Source: ClickEvent Source",
        "maxParallelism": 128,
        "parallelism": 1,
        "status": "RUNNING",
        "start-time": 1684676634197,
        "end-time": -1,
        "duration": 1394495,
        "tasks": {
            "RUNNING": 1,
            "CANCELED": 0,
            "FAILED": 0,
            "DEPLOYING": 0,
            "SCHEDULED": 0,
            "CANCELING": 0,
            "INITIALIZING": 0,
            "CREATED": 0,
            "FINISHED": 0,
            "RECONCILING": 0
        },
        "metrics": {
            "read-bytes": 0,
            "read-bytes-complete": true,
            "write-bytes": 0,
            "write-bytes-complete": true,
            "read-records": 0,
            "read-records-complete": true,
            "write-records": 0,
            "write-records-complete": true,
            "accumulated-backpressured-time": 0,
            "accumulated-idle-time": 1313806,
            "accumulated-busy-time": 0.0
        }
    }, {
        "id": "0a448493b4782967b150582570326227",
        "name": "ClickEvent Counter",
        "maxParallelism": 128,
        "parallelism": 1,
        "status": "RUNNING",
        "start-time": 1684676634249,
        "end-time": -1,
        "duration": 1394443,
        "tasks": {
            "RUNNING": 1,
            "CANCELED": 0,
            "FAILED": 0,
            "DEPLOYING": 0,
            "SCHEDULED": 0,
            "CANCELING": 0,
            "INITIALIZING": 0,
            "CREATED": 0,
            "FINISHED": 0,
            "RECONCILING": 0
        },
        "metrics": {
            "read-bytes": 49708,
            "read-bytes-complete": true,
            "write-bytes": 0,
            "write-bytes-complete": true,
            "read-records": 0,
            "read-records-complete": true,
            "write-records": 0,
            "write-records-complete": true,
            "accumulated-backpressured-time": 0,
            "accumulated-idle-time": 1375570,
            "accumulated-busy-time": 0.0
        }
    }, {
        "id": "ea632d67b7d595e5b851708ae9ad79d6",
        "name": "ClickEventStatistics Sink: Writer",
        "maxParallelism": 128,
        "parallelism": 1,
        "status": "RUNNING",
        "start-time": 1684676634295,
        "end-time": -1,
        "duration": 1394397,
        "tasks": {
            "RUNNING": 1,
            "CANCELED": 0,
            "FAILED": 0,
            "DEPLOYING": 0,
            "SCHEDULED": 0,
            "CANCELING": 0,
            "INITIALIZING": 0,
            "CREATED": 0,
            "FINISHED": 0,
            "RECONCILING": 0
        },
        "metrics": {
            "read-bytes": 49708,
            "read-bytes-complete": true,
            "write-bytes": 54936,
            "write-bytes-complete": true,
            "read-records": 0,
            "read-records-complete": true,
            "write-records": 0,
            "write-records-complete": true,
            "accumulated-backpressured-time": 0,
            "accumulated-idle-time": 1327184,
            "accumulated-busy-time": 0.0
        }
    }, {
        "id": "6d2677a0ecc3fd8df0b72ec675edf8f4",
        "name": "ClickEventStatistics Sink: Committer",
        "maxParallelism": 128,
        "parallelism": 1,
        "status": "RUNNING",
        "start-time": 1684676634298,
        "end-time": -1,
        "duration": 1394394,
        "tasks": {
            "RUNNING": 1,
            "CANCELED": 0,
            "FAILED": 0,
            "DEPLOYING": 0,
            "SCHEDULED": 0,
            "CANCELING": 0,
            "INITIALIZING": 0,
            "CREATED": 0,
            "FINISHED": 0,
            "RECONCILING": 0
        },
        "metrics": {
            "read-bytes": 104644,
            "read-bytes-complete": true,
            "write-bytes": 0,
            "write-bytes-complete": true,
            "read-records": 1308,
            "read-records-complete": true,
            "write-records": 0,
            "write-records-complete": true,
            "accumulated-backpressured-time": 0,
            "accumulated-idle-time": 1380924,
            "accumulated-busy-time": 0.0
        }
    }],
    "status-counts": {
        "RUNNING": 4,
        "CANCELED": 0,
        "FAILED": 0,
        "DEPLOYING": 0,
        "SCHEDULED": 0,
        "CANCELING": 0,
        "INITIALIZING": 0,
        "CREATED": 0,
        "FINISHED": 0,
        "RECONCILING": 0
    },
    "plan": {
        "jid": "23023e23c9b6539c18bad615e10419dd",
        "name": "Click Event Count",
        "type": "STREAMING",
        "nodes": [{
            "id": "6d2677a0ecc3fd8df0b72ec675edf8f4",
            "parallelism": 1,
            "operator": "",
            "operator_strategy": "",
            "description": "ClickEventStatistics Sink: Committer<br/>",
            "inputs": [{
                "num": 0,
                "id": "ea632d67b7d595e5b851708ae9ad79d6",
                "ship_strategy": "FORWARD",
                "exchange": "pipelined_bounded"
            }],
            "optimizer_properties": {}
        }, {
            "id": "ea632d67b7d595e5b851708ae9ad79d6",
            "parallelism": 1,
            "operator": "",
            "operator_strategy": "",
            "description": "ClickEventStatistics Sink: Writer<br/>",
            "inputs": [{
                "num": 0,
                "id": "0a448493b4782967b150582570326227",
                "ship_strategy": "FORWARD",
                "exchange": "pipelined_bounded"
            }],
            "optimizer_properties": {}
        }, {
            "id": "0a448493b4782967b150582570326227",
            "parallelism": 1,
            "operator": "",
            "operator_strategy": "",
            "description": "Window(TumblingEventTimeWindows(15000), EventTimeTrigger, CountingAggregator, ClickEventStatisticsCollector)<br/>",
            "inputs": [{
                "num": 0,
                "id": "bc764cd8ddf7a0cff126f51c16239658",
                "ship_strategy": "HASH",
                "exchange": "pipelined_bounded"
            }],
            "optimizer_properties": {}
        }, {
            "id": "bc764cd8ddf7a0cff126f51c16239658",
            "parallelism": 1,
            "operator": "",
            "operator_strategy": "",
            "description": "Source: ClickEvent Source<br/>",
            "optimizer_properties": {}
        }]
    }
} %

請查閱 REST API 參考拘哨,該參考上有完整的指標查詢接口信息谋梭,包括如何查詢不同種類的指標(例如 TaskManager 指標)

延伸拓展 #

你可能已經(jīng)注意到了,Click Event Count 這個 Job 在啟動時總是會帶上 --checkpointing--event-time 兩個參數(shù)倦青, 如果我們?nèi)コ@兩個參數(shù)瓮床,那么 Job 的行為也會隨之改變。

  • --checkpointing 參數(shù)開啟了 checkpoint 配置姨夹,checkpoint 是 Flink 容錯機制的重要保證纤垂。 如果你沒有開啟 checkpoint,那么在 Job 失敗與恢復這一節(jié)中磷账,你將會看到數(shù)據(jù)丟失現(xiàn)象發(fā)生峭沦。
  • --event-time 參數(shù)開啟了 Job 的 事件時間 機制,該機制會使用 ClickEvent 自帶的時間戳進行統(tǒng)計逃糟。 如果不指定該參數(shù)吼鱼,F(xiàn)link 將結合當前機器時間使用事件處理時間進行統(tǒng)計。如此一來绰咽,每個窗口計數(shù)將不再是準確的 1000 了菇肃。
    Click Event Count 這個 Job 還有另外一個選項,該選項默認是關閉的取募,你可以在 client 容器的 docker-compose.yaml 文件中添加該選項從而觀察該 Job 在反壓下的表現(xiàn)琐谤,該選項描述如下:
  • --backpressure 將一個額外算子添加到 Job 中,該算子會在偶數(shù)分鐘內(nèi)產(chǎn)生嚴重的反壓(比如:10:12 期間玩敏,而 10:13 期間不會)斗忌。這種現(xiàn)象可以通過多種網(wǎng)絡指標觀察到,比如:outputQueueLengthoutPoolUsage 指標旺聚,通過 WebUI 上的反壓監(jiān)控也可以觀察到织阳。
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市砰粹,隨后出現(xiàn)的幾起案子唧躲,更是在濱河造成了極大的恐慌,老刑警劉巖碱璃,帶你破解...
    沈念sama閱讀 221,406評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件弄痹,死亡現(xiàn)場離奇詭異,居然都是意外死亡嵌器,警方通過查閱死者的電腦和手機界酒,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,395評論 3 398
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來嘴秸,“玉大人毁欣,你說我怎么就攤上這事庇谆。” “怎么了凭疮?”我有些...
    開封第一講書人閱讀 167,815評論 0 360
  • 文/不壞的土叔 我叫張陵饭耳,是天一觀的道長。 經(jīng)常有香客問我执解,道長寞肖,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,537評論 1 296
  • 正文 為了忘掉前任衰腌,我火速辦了婚禮新蟆,結果婚禮上,老公的妹妹穿的比我還像新娘右蕊。我一直安慰自己琼稻,他們只是感情好,可當我...
    茶點故事閱讀 68,536評論 6 397
  • 文/花漫 我一把揭開白布饶囚。 她就那樣靜靜地躺著帕翻,像睡著了一般褥符。 火紅的嫁衣襯著肌膚如雪淤堵。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,184評論 1 308
  • 那天实幕,我揣著相機與錄音规惰,去河邊找鬼睬塌。 笑死,一個胖子當著我的面吹牛歇万,可吹牛的內(nèi)容都是我干的衫仑。 我是一名探鬼主播,決...
    沈念sama閱讀 40,776評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼堕花,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了粥鞋?” 一聲冷哼從身側響起缘挽,我...
    開封第一講書人閱讀 39,668評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎呻粹,沒想到半個月后壕曼,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,212評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡等浊,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,299評論 3 340
  • 正文 我和宋清朗相戀三年腮郊,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片筹燕。...
    茶點故事閱讀 40,438評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡轧飞,死狀恐怖衅鹿,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情过咬,我是刑警寧澤大渤,帶...
    沈念sama閱讀 36,128評論 5 349
  • 正文 年R本政府宣布,位于F島的核電站掸绞,受9級特大地震影響泵三,放射性物質發(fā)生泄漏。R本人自食惡果不足惜衔掸,卻給世界環(huán)境...
    茶點故事閱讀 41,807評論 3 333
  • 文/蒙蒙 一烫幕、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧敞映,春花似錦较曼、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,279評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至埃疫,卻和暖如春伏恐,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背栓霜。 一陣腳步聲響...
    開封第一講書人閱讀 33,395評論 1 272
  • 我被黑心中介騙來泰國打工翠桦, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人胳蛮。 一個月前我還...
    沈念sama閱讀 48,827評論 3 376
  • 正文 我出身青樓销凑,卻偏偏與公主長得像,于是被迫代替她去往敵國和親仅炊。 傳聞我的和親對象是個殘疾皇子斗幼,可洞房花燭夜當晚...
    茶點故事閱讀 45,446評論 2 359

推薦閱讀更多精彩內(nèi)容