Flink 使用之配置與調(diào)優(yōu)

Flink 使用介紹相關(guān)文檔目錄

Flink 使用介紹相關(guān)文檔目錄

Flink讀取配置文件的方式

參見:Flink 獲取配置的途徑

Flink常用參數(shù)配置

注意:Flink配置項(xiàng)名稱隨版本的升級變化較大陶珠。本文的配置項(xiàng)名稱以Flink 1.19 1.20版本為準(zhǔn)。如果讀者使用其他版本的Flink,請先查詢官網(wǎng)對應(yīng)的配置項(xiàng)名稱耕腾,以免使用錯誤的配置無法生效。

端口地址

jobmanger.rpc.address: JobManager的地址棒假。Yarn模式會自動配置該選項(xiàng)麸锉。
jobmanager.rpc.port: JobManager的端口號。
jobmanager.bind-host: JobManager綁定的host扎即。在Yarn模式下如果配置為localhost會被忽略,默認(rèn)設(shè)置為0.0.0.0横缔。
taskmanager.bing-host: 同上铺遂。針對TaskManager綁定的host。
rest.port Flink rest接口和 web ui的端口號茎刚。
rest.bind-address: rest接口綁定的地址襟锐,如果要支持多網(wǎng)訪問,需要配置為0.0.0.0膛锭。
history.web.port 基于web的history server的端口號粮坞。
jobmanager.archive.fs.dir 將已完成的任務(wù)歸檔存儲的目錄。
historyserver.archive.fs.dir history server的歸檔目錄初狰。該配置必須包含jobmanager.archive.fs.dir配置的目錄莫杈,以便history server能夠讀取到已完成的任務(wù)信息。

資源配置

JobManager內(nèi)存配置:

jobmanager.memory.process.size: JobManager的進(jìn)程總內(nèi)存大小奢入。

注意筝闹,內(nèi)存的配置建議使用進(jìn)程總內(nèi)存大小的配置方式。Flink會自動調(diào)整內(nèi)部各部分內(nèi)存的大小。

jobmanager.memory.flink.size:JobManager中可供Flink使用的內(nèi)存关顷。包含head和off-heap內(nèi)存糊秆。不包括JVM metaspace和JVM overhead占用的內(nèi)存。
jobmanager.memory.heap.size:JobManager的堆內(nèi)存大小议双。
jobmanager.memory.off-heap.size:off-heap占用的內(nèi)存大小痘番。
jobmanager.memory.jvm-metaspace.size:metaspace占用的內(nèi)存大小。

TaskManager內(nèi)存配置:

taskmanager.memory.process.size:TaskManager的進(jìn)程總內(nèi)存大小平痰。
taskmanager.memory.flink.size:TaskManager的進(jìn)程總內(nèi)存大小中可供Flink使用的內(nèi)存汞舱。包含head和off-heap內(nèi)存。不包括JVM metaspace和JVM overhead占用的內(nèi)存宗雇。
taskmanager.memory.framework.off-heap.batch-shuffle.size:批模式shuffle內(nèi)存限制昂芜。
taskmanager.memory.framework.heap.size:Flink框架的heap內(nèi)存。Task slot無法使用該部分內(nèi)存逾礁。
taskmanager.memory.framework.off-heap.size:Flink框架使用的off-heap內(nèi)存大小说铃。Task slot無法使用該部分內(nèi)存。
taskmanager.memory.jvm-metaspace.size:JVM metaspace內(nèi)存大小嘹履。
taskmanager.memory.managed.size:Memory manager管理的內(nèi)存大小腻扇。可供作業(yè)排序砾嫉,哈希表幼苛,中間結(jié)果緩存和rocksdb狀態(tài)后端使用。
taskmanager.memory.network.fraction:用于網(wǎng)絡(luò)緩存部分的內(nèi)存占比焕刮。
taskmanager.memory.process.size:用于網(wǎng)絡(luò)緩存部分的內(nèi)存大小舶沿。
taskmanager.memory.task.heap.size:作業(yè)可使用的heap內(nèi)存大小。
taskmanager.memory.task.off-heap.size:作業(yè)可使用的off-heap內(nèi)存大小配并。

taskmanager.numberOfTaskSlots: slot數(shù)量括荡。在yarn模式使用的時候會受到yarn.scheduler.maximum-allocation-vcores值的影響。此處指定的slot數(shù)量如果超過yarn的maximum-allocation-vcores溉旋,flink啟動會報(bào)錯畸冲。在Yarn模式,flink啟動的task manager個數(shù)可以參照如下計(jì)算公式:

num_of_tm = ceil(parallelism / slot)

即并行度除以slot個數(shù)观腊,結(jié)果向上取整邑闲。

parallelism.default: 任務(wù)默認(rèn)并行度,如果任務(wù)未指定并行度梧油,將采用此設(shè)置苫耸。

Flink內(nèi)存模型圖

Flink內(nèi)存模型.png

參見:
配置 Flink 進(jìn)程的內(nèi)存 | Apache Flink

狀態(tài)后端

state.backend.type:狀態(tài)后端的類型±茉桑可以使用hashmap或者rocksdb褪子。
execution.checkpointing.dir:保存檢查點(diǎn)目錄量淌。需要配置為分布式文件系統(tǒng),所有的集群節(jié)點(diǎn)都能夠訪問到褐筛。例如:hdfs:///flink-checkpoints类少。
execution.checkpointing.savepoint-dir save point的目錄叙身。類似于execution.checkpointing.dir渔扎。
execution.checkpointing.num-retained 保留最近檢查點(diǎn)的數(shù)量。
execution.checkpointing.incremental: 是否開啟增量checkpoint信轿。開啟可減少每次checkpoint寫入的數(shù)據(jù)量晃痴,減少checkpoint的耗時。
execution.checkpointing.interval:checkpoint時間間隔财忽。
execution.checkpointing.min-pause:兩次相鄰checkpoint的最小間隔時間倘核。該配置項(xiàng)的目的是防止系統(tǒng)checkpoint運(yùn)行時間過長,占據(jù)太多的時間比例即彪。
execution.checkpointing.externalized-checkpoint-retention:checkpoint的保留機(jī)制紧唱。有如下三個配置值:

  • "DELETE_ON_CANCELLATION": Checkpoint state is only kept when the owning job fails. It is deleted if the job is cancelled.
  • "RETAIN_ON_CANCELLATION": Checkpoint state is kept when the owning job is cancelled or fails.
  • "NO_EXTERNALIZED_CHECKPOINTS": Externalized checkpoints are disabled.
    execution.checkpointing.max-concurrent-checkpoints:最多同時有多少個checkpoint同時進(jìn)行。
    execution.checkpointing.mode:checkpoint模式隶校。支持如下兩種模式:
  • "EXACTLY_ONCE"
  • "AT_LEAST_ONCE"

execution.checkpointing.timeout:checkpoint的超時時間漏益。
execution.checkpointing.tolerable-failed-checkpoints:容許連續(xù)失敗的checkpoint操作次數(shù)。
execution.checkpointing.unaligned:是否啟用unaligned checkpoint深胳。

心跳

heartbeat.interval:心跳時間間隔绰疤。
heartbeat.timeout:心跳超時時間。

網(wǎng)絡(luò)

taskmanager.network.netty.client.connectTimeoutSec:taskmanager的客戶端連接超時的時間舞终,默認(rèn)為120s轻庆。
taskmanager.network.netty.sendReceiveBufferSize:netty的發(fā)送和接收的緩沖區(qū)大小。
taskmanager.network.netty.client.tcp.keepCount:連接保持?jǐn)?shù)量敛劝。
taskmanager.network.netty.client.tcp.keepIdleSec:連接空間多少秒之后余爆,開始發(fā)送keepalive probe。
taskmanager.network.netty.client.tcp.keepIntervalSec:keepalive probe發(fā)送的間隔時間夸盟。

Flink HA(Job Manager)的配置

Standalone模式使用Zookeeper協(xié)助HA模式的JobManager選主蛾方。

high-availability.type: zookeeper 使用zookeeper負(fù)責(zé)HA實(shí)現(xiàn)。
zookeeper.sasl.disable: true 是否使用SASL方式連接Zookeeper满俗。默認(rèn)啟用转捕。
high-availability.zookeeper.path.root: /flink flink信息在zookeeper存儲節(jié)點(diǎn)的名稱
high-availability.zookeeper.quorum: zk1,zk2,zk3 zookeeper集群節(jié)點(diǎn)的地址和端口
high-availability.storageDir: hdfs://nameservice/flink/ha/ JobManager元數(shù)據(jù)在文件系統(tǒng)儲存的位置。

注意:需要確定集群使用的Flink版本和Zookeeper版本是否兼容唆垃。例如Flink1.15.x放棄了對Zookeeper 3.4的支持五芝。

Yarn模式建議配置Yarn的Application Master重試次數(shù)。例如修改flink-conf.yaml辕万,增加:

yarn.application-attempts: 10

注意:該配置值受到Y(jié)arn配置項(xiàng)yarn.resourcemanager.am.max-attempts的限制枢步,實(shí)際允許的重試次數(shù)為min(yarn.application-attempts, yarn.resourcemanager.am.max-attempts)

詳細(xì)參見:Flink HA 部署

Flink metrics 監(jiān)控相關(guān)配置

下面例子中為Prometheus監(jiān)控的相關(guān)配置沉删。Flink定期將監(jiān)控?cái)?shù)據(jù)往push gateway。

metrics.reporter.promgateway.factory.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterFactory
metrics.reporter.promgateway.hostUrl: http://localhost:9091
metrics.reporter.promgateway.jobName: myJob
metrics.reporter.promgateway.randomJobNameSuffix: true
metrics.reporter.promgateway.deleteOnShutdown: false
metrics.reporter.promgateway.groupingKey: k1=v1;k2=v2
metrics.reporter.promgateway.interval: 60 SECONDS

Prometheus具體部署和使用方式參見:Flink 配置Prometheus監(jiān)控

緩沖區(qū)超時配置

該配置項(xiàng)可以平衡吞吐量和延遲醉途。設(shè)置值的解釋如下:

  • -1:直到緩沖區(qū)填滿才發(fā)送矾瑰,最大化吞吐量
  • 0:一有數(shù)據(jù)就立刻發(fā)送,最小化延遲
  • 其他值:在配置時間后發(fā)送隘擎,如果在配置時間內(nèi)緩沖區(qū)填滿殴穴,則立刻發(fā)送

設(shè)置方法:

env.setBufferTimeout(xxx)

提交應(yīng)用時參數(shù)配置

注意:per-job模式或者application模式可以為某個作業(yè)單獨(dú)指定JM和TM的資源消耗。資源的消耗情況應(yīng)該以能扛住高峰時段的數(shù)據(jù)處理壓力為準(zhǔn)货葬〔苫希可提前對集群進(jìn)行壓測,記錄極限情況的資源使用量震桶。

JobManager內(nèi)存

yarn-session:-jm 2048
yarn-cluster:-yjm 2048

TaskManager內(nèi)存

yarn-session:-tm 2048
yarn-cluster:-ytm 2048

每個TaskManager 的slot個數(shù)

yarn-session:-s 8
yarn-cluster:-ys 8

通過系統(tǒng)變量方式配置

還可以在提交作業(yè)的時候使用-D參數(shù)配置休傍。支持的參數(shù)如下:

-Dyarn.application.queue=test \ 指定yarn隊(duì)列
-Djobmanager.memory.process.size=2048mb \ 指定JM的總進(jìn)程大小
-Dtaskmanager.memory.process.size=2048mb \ 指定每個TM的總進(jìn)程大小
-Dtaskmanager.numberOfTaskSlots=2 \ 指定每個TM的slot數(shù)

Kafka相關(guān)調(diào)優(yōu)配置

linger.ms/batch.size 這兩個配置項(xiàng)配合使用,可以在吞吐量和延遲中得到最佳的平衡點(diǎn)蹲姐。batch.size是kafka producer發(fā)送數(shù)據(jù)的批量大小磨取,當(dāng)數(shù)據(jù)量達(dá)到batch size的時候,會將這批數(shù)據(jù)發(fā)送出去柴墩,避免了數(shù)據(jù)一條一條的發(fā)送忙厌,頻繁建立和斷開網(wǎng)絡(luò)連接。但是如果數(shù)據(jù)量比較小拐邪,導(dǎo)致遲遲不能達(dá)到batch.size慰毅,為了保證延遲不會過大,kafka不能無限等待數(shù)據(jù)量達(dá)到batch.size的時候才發(fā)送扎阶。為了解決這個問題汹胃,引入了linger.ms配置項(xiàng)。當(dāng)數(shù)據(jù)在緩存中的時間超過linger.ms時东臀,無論緩存中數(shù)據(jù)是否達(dá)到批量大小着饥,都會被強(qiáng)制發(fā)送出去。
ack 數(shù)據(jù)源是否需要kafka得到確認(rèn)惰赋。all表示需要收到所有ISR節(jié)點(diǎn)的確認(rèn)信息宰掉,1表示只需要收到kafka leader的確認(rèn)信息,0表示不需要任何確認(rèn)信息赁濒。該配置項(xiàng)需要對數(shù)據(jù)精準(zhǔn)性和延遲吞吐量做出權(quán)衡轨奄。

Kafka topic分區(qū)數(shù)和Flink并行度的關(guān)系

Flink Kafka source的并行度需要和kafka topic的分區(qū)數(shù)一致。最大化利用kafka多分區(qū)topic的并行讀取能力拒炎。由于一個Kafka分區(qū)只能被一個消費(fèi)者消費(fèi)挪拟,因此一定要確保Flink Kafka source的并行度不要大于Kafka分區(qū)數(shù),否則有些計(jì)算資源會空閑击你。如果并行度和分區(qū)數(shù)相同配置后玉组,消費(fèi)數(shù)據(jù)的速度仍然跟不上生產(chǎn)數(shù)據(jù)的速度谎柄,需要加大Kafka的分區(qū)數(shù)。

同理惯雳,如果Sink端也是Kafka朝巫,sink的并行度盡量和Kafka分區(qū)數(shù)一致。

Yarn相關(guān)調(diào)優(yōu)配置

yarn.scheduler.maximum-allocation-vcores
yarn.scheduler.minimum-allocation-vcores

Flink單個task manager的slot數(shù)量必須介于這兩個值之間石景。

yarn.scheduler.maximum-allocation-mb
yarn.scheduler.minimum-allocation-mb

Flink的job manager 和task manager內(nèi)存不得超過container最大分配內(nèi)存大小劈猿。

yarn.nodemanager.resource.cpu-vcores yarn的虛擬CPU內(nèi)核數(shù),建議設(shè)置為物理CPU核心數(shù)的2-3倍鸵钝,如果設(shè)置過少糙臼,會導(dǎo)致CPU資源無法被充分利用,跑任務(wù)的時候CPU占用率不高恩商。

更多配置參見:Flink 使用之 Yarn 資源問題排查

數(shù)據(jù)傾斜問題

幾個解決套路:

  • 盡量使用大的并行度
  • 避免使用Map作為分區(qū)字段數(shù)據(jù)類型,同樣避免使用String
  • 避免使用windowAll等無法并發(fā)操作的算子
  • 使用合適的分區(qū)方法

分區(qū)方法有如下幾種:

  • shuffle:隨機(jī)分區(qū)
  • rebalance:輪詢
  • rescale:每個源的數(shù)據(jù)分散給下游必逆,類似于一對多怠堪,這點(diǎn)和rebalance不同
  • broadcast:廣播
  • 自定義分區(qū),一個例子如下所示:
stream.partitionCustom(new Partitioner[String] {
  override def partition(key: String, numPartitions: Int): Int = {
    key.hashCode % numPartitions
  }
}, s => s.charAt(0).toString)

這個例子取首字母作為key名眉,使用key的hashCode和分區(qū)數(shù)取余后的值作為分區(qū)編號粟矿。

并行度配置

并行度一般配置為CPU核心數(shù)的2-3倍。

并行度配置的幾個層次如下所示损拢。從上到下作用范圍依次增大陌粹,但是上面的配置可以覆蓋下面的配置。

  • 算子層次福压。算子的setParallelism方法
  • 執(zhí)行環(huán)境層次掏秩。envsetParallelism方法
  • 客戶端層次。提交任務(wù)時候的-p參數(shù)
  • 系統(tǒng)層次荆姆。flink-conf.yaml配置文件蒙幻,parallelism.default配置項(xiàng)。

JVM參數(shù)配置

以增加GC日志為例胆筒,修改"conf/flink-conf.yaml配置文件的env.java.opts.all參數(shù)邮破,增加:

-Xloggc:<LOG_DIR>/gc_log.log
-XX:+PrintGCDetails 
-XX:-OmitStackTraceInFastThrow 
-XX:+PrintGCTimeStamps 
-XX:+PrintGCDateStamps 
-XX:+UseGCLogFileRotation 
-XX:NumberOfGCLogFiles=10 
-XX:GCLogFileSize=50M

其中<LOG_DIR>變量的值是JobManager或者TaskManager的日志路徑。如果是on Yarn 模式運(yùn)行仆救,該變量指向的是container的log目錄抒和。

除了使用env.java.opts.all對Flink所有的進(jìn)程JVM參數(shù)統(tǒng)一配置外,還可以使用如下參數(shù)單獨(dú)配置Flink中的角色:

  • env.java.opts.client: 配置Flink Client
  • env.java.opts.historyserver: 配置History server
  • env.java.opts.jobmanager: 配置Job manager
  • env.java.opts.taskmanager: 配置Task manager

參考鏈接:配置參數(shù) | Apache Flink

Yarn模式TaskManager駐留時間

Flink Yarn模式的TaskManager生命周期時隨著任務(wù)的彤蔽。有時候?yàn)榱苏{(diào)試摧莽,發(fā)現(xiàn)任務(wù)停止(或者異常終止)后沒多久TaskManager就被自動銷毀。來不及去頁面觀察問題和閱讀日志等铆惑。為了解決這個問題范嘱,我們可以修改TaskManager的駐留時間送膳。可修改如下配置:

  • resourcemanager.taskmanager-timeout: 默認(rèn)為30000

參考鏈接:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/config/#resourcemanager-taskmanager-timeout

冗余的TaskManager數(shù)量

前面章節(jié)提到TaskManager數(shù)量時根據(jù)如下公式計(jì)算的:

num_of_tm = ceil(parallelism / slot)

這個公式計(jì)算出的TaskManager是剛好夠用的丑蛤。如果任何一個TaskManager出現(xiàn)故障退出叠聋,F(xiàn)link需要重新啟動一個新的TaskManager代替。

我們可以配置冗余的TaskManager數(shù)量受裹。意思是除了上面公式計(jì)算出的最小TaskManager數(shù)量外碌补,F(xiàn)link還會額外啟動一些TaskManager。這樣當(dāng)TaskManager出現(xiàn)故障之時棉饶,不用等待新的TaskManager啟動厦章,冗余的TaskManager會立刻取代故障的TaskManager開始工作,縮短了故障恢復(fù)的等待時間照藻。配置方法如下:

  • slotmanager.redundant-taskmanager-num: 冗余的task manager數(shù)量袜啃。

Checkpoint

Checkpoint周期性進(jìn)行。如果checkpoint操作耗時比checkpoint間隔時間還長幸缕,在上一個checkpoint未完成的時候群发,即便到了下一個checkpoint觸發(fā)時間,新的checkpoint操作不會立即開始发乔。只有在前一個checkpoint完成之后下一個checkpoint才能開始熟妓。這種情況下checkpoint會連續(xù)進(jìn)行,嚴(yán)重影響系統(tǒng)性能栏尚。

為了避免這種情況起愈,可以指定checkpoint之間的最小時間間隔。方法如下:

StreamExecutionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(milliseconds)

注意:可以在應(yīng)用中通過配置CheckpointConfig译仗,可以允許多個checkpoint過程同步執(zhí)行抬虽。

除此之外,checkpoint還支持更多的配置:

// 開啟Checkpoint古劲,設(shè)置間隔時間
env.enableCheckpointing(TimeUnit.MINUTES.toMillis(10));
// 配置 Checkpoint
CheckpointConfig checkpointConf = env.getCheckpointConfig();
// 啟用EXACTLY_ONCE模式斥赋,使用Unaligned Checkpoint,保證數(shù)據(jù)精準(zhǔn)一次投送产艾,但會略微增大延遲
// 啟用AT_LEAST_ONCE模式疤剑,barrier不會對齊,投送數(shù)據(jù)可能會重復(fù)闷堡,但是延遲很低
checkpointConf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 最小間隔時間隘膘,上面已介紹過
checkpointConf.setMinPauseBetweenCheckpoints(TimeUnit.MINUTES.toMillis(10))
// 超時時間。如果超過這個時間checkpoint操作仍未完成杠览,checkpoint會被廢棄
checkpointConf.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(10));
// 作業(yè)取消后checkpoint仍然保留(需要人工清理)
checkpointConf.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

使用RocksDB backend

RocksDB支持增量checkpoint弯菊,相比全量checkpoint而言耗時更短。

啟用增量checkpoint方法踱阿,在flink-conf.yaml中增加:

execution.checkpointing.incremental: true

也可以在代碼中設(shè)置:

Configuration config = new Configuration();
config.set(StateBackendOptions.STATE_BACKEND, "rocksdb");
env.configure(config);

RocksDB調(diào)優(yōu)參見: https://mp.weixin.qq.com/s/YpDi3BV8Me3Ay4hzc0nPQA

調(diào)整SSTable的block和block cache

state.backend.rocksdb.block.blocksize
state.backend.rocksdb.block.cache-size

經(jīng)過實(shí)踐這兩個參數(shù)值對checkpoint性能影響較大管钳。

使用全局參數(shù)

可以通過全局參數(shù)的方式钦铁,將參數(shù)從JobManager傳遞給各個TaskManager。

在JobManager中注冊全局參數(shù)(ParameterTool是可序列化的):

env.getConfig().setGlobalJobParameters(ParameterTool.fromArgs(args));

TaskManager中才漆,通過Rich函數(shù)中使用如下方式獲取全局參數(shù):

ParameterTool parameterTool = (ParameterTool)getRuntimeContext().getExecutionConfig().getGlobalJobParameters();

使用分布式緩存

上一節(jié)介紹了如何為各個TaskManager傳遞參數(shù)牛曹。這里的分布式緩存則用于向各個TaskManager分發(fā)文件。

注意:AsyncFunction不支持分布式緩存醇滥,直接使用會拋出異常黎比。

首先,需要在JobManager注冊需要分發(fā)的文件鸳玩。注冊的文件由JobManager發(fā)送給各個TaskManager阅虫,保存在TM運(yùn)行環(huán)境的臨時目錄中。

val env = ExecutionEnvironment.getExecutionEnvironment

env.registerCachedFile("d:\\data\\file\\a.txt","b.txt")

然后TaskManager使用的時候不跟,通過Rich函數(shù)拉取文件:

getRuntimeContext.getDistributedCache.getFile("b.txt")

反壓分析

性能瓶頸測試方法

測試反壓可以快速的定位流處理系統(tǒng)的性能瓶頸所在颓帝。先在Kafka中積攢一批數(shù)據(jù),然后在使用Flink消費(fèi)躬拢,就好比水庫泄洪躲履,很容易找到下游性能薄弱的環(huán)節(jié)。

反壓的可能原因

反壓的原因可能會有:

  • 短時間的負(fù)載高峰聊闯,超過流處理系統(tǒng)極限
  • 下游sink負(fù)載變大,數(shù)據(jù)無法及時輸出
  • GC壓力過大米诉,停頓時間太長
  • 某個算子作業(yè)過于復(fù)雜菱蔬,執(zhí)行耗時較長
  • 集群網(wǎng)絡(luò)波動,上游傳遞給下游的網(wǎng)絡(luò)通道受阻

定位方式

定位之前禁用掉OperatorChain史侣,這樣原本chain到一起的多個算子會分開拴泌,方便我們更精細(xì)的定位性能瓶頸。

  • 看頁面:查看Flink Web UI中對應(yīng)算子的Back Pressure頁面惊橱,如果各個SubTask顯示的結(jié)果為High蚪腐,說明該算子存在反壓情況。
  • 看監(jiān)控:查看算子的inPoolUsage監(jiān)控項(xiàng)税朴。如果數(shù)值過高回季,說明存在反壓。

找到反壓算子之后正林,我們可以使用Flame Graph火焰圖泡一,來分析每個方法調(diào)用的耗時,從而找到耗時較長的方法觅廓。

開啟火焰圖的方法:

flink-conf.yaml中配置鼻忠。

參數(shù) 默認(rèn)值 含義
rest.flamegraph.enabled false 是否開啟火焰圖
rest.flamegraph.cleanup-interval 10min 統(tǒng)計(jì)信息的緩存清除時間
rest.flamegraph.delay-between-samples 50 ms 構(gòu)建 FlameGraph 的單個堆棧跟蹤樣本之間的延遲
rest.flamegraph.num-samples 100 構(gòu)建flamegraph的采樣數(shù)
rest.flamegraph.refresh-interval 1 min 火焰圖刷新的時間間隔
rest.flamegraph.stack-depth 100 創(chuàng)建FlameGraphs 的堆棧跟蹤的最大深度

一些通用的方法:

  • 優(yōu)化反壓算子的業(yè)務(wù)邏輯代碼
  • 調(diào)用外部系統(tǒng)使用AsyncFunction
  • 增大TM的內(nèi)存資源
  • 增大反壓算子的并行度
  • 減少反壓上游算子的并行度

分區(qū)空閑檢測

Barrier對齊環(huán)節(jié)(一個算子有多個input)需要收集齊各個input的watermark才放行數(shù)據(jù),如果某一個input的數(shù)據(jù)量很少杈绸,導(dǎo)致該input遲遲收不到watermark帖蔓,則整個數(shù)據(jù)鏈路會被阻塞矮瘟。

為了解決這個問題,F(xiàn)link提供了分區(qū)空閑檢測功能塑娇,如果某個input在一段時間內(nèi)沒有收到數(shù)據(jù)澈侠,會被標(biāo)記為空閑。在barrier對齊環(huán)節(jié)钝吮,這個input上面的watermark會被忽略埋涧。

配置分區(qū)空閑判定時間的方法如下:

SourceFunction.assignTimestampsAndWatermarks(
                WatermarkStrategy
                        .forBoundedOutOfOrderness(Duration.ofMinutes(2))
                        .withIdleness(Duration.ofMinutes(5))
);

作業(yè)提交方式

參考鏈接:YARN | Apache Flink

Flink作業(yè)提交的方式分為application模式,per-job模式和session模式奇瘦。

per-job模式

在Yarn創(chuàng)建一個Flink集群棘催,然后在提交任務(wù)客戶端所在機(jī)器本地運(yùn)行作業(yè)jar的main方法,提交生成的JobGraph到Flink集群的JobManager耳标。如果附帶--detached參數(shù)醇坝,提交的作業(yè)被accept的時候,客戶端會停止運(yùn)行(命令行不用一直開著次坡,生產(chǎn)環(huán)境必須呼猪。開發(fā)測試時可不帶--detached參數(shù),通過命令行查看運(yùn)行日志)砸琅。

flink run -t yarn-per-job --detached /path/to/job.jar

session模式

首先啟動Flink Yarn Session宋距,它是一個常駐與Yarn的Flink集群。啟動成功后症脂,無論是否有作業(yè)執(zhí)行谚赎,或者作業(yè)是否執(zhí)行完畢,該session始終保持運(yùn)行诱篷。啟動yarn session的方法如下:

export HADOOP_CLASSPATH=`hadoop classpath`
./bin/yarn-session.sh --detached

yarn-session支持的相關(guān)參數(shù)解釋:

  • -d/--detached: Detach模式壶唤,session啟動成功后client停止運(yùn)行。不用保持控制臺一直開啟棕所。
  • -nm: Application名稱
  • -jm: Job Manager 容器的內(nèi)存
  • -tm: Task Manager 容器的內(nèi)存
  • -t: 傳送文件至集群闸盔,使用相對路徑。程序中讀取文件仍使用相對路徑
  • -qu: 指定使用的Yarn隊(duì)列

提交作業(yè)到Y(jié)arn session:

flink run -t yarn-session \
  -Dyarn.application.id=application_XXXX_YY \
  /path/to/job.jar

停止Flink Yarn session可以通過Yarn UI的kill按鈕琳省。當(dāng)然也可以通過如下方式:

echo "stop" | ./bin/yarn-session.sh -id application_XXXXX_XXX

注意:一個Flink Yarn Session可以同時跑多個Flink作業(yè)迎吵。

application模式

和per-job模式類似,提交一次任務(wù)會在Yarn運(yùn)行一個Flink集群岛啸。不同之處為作業(yè)jar包的main方法在Yarn集群的JobManager上運(yùn)行钓觉,而不是提交作業(yè)的client端運(yùn)行。作業(yè)執(zhí)行完畢后坚踩,F(xiàn)link on yarn集群會被關(guān)閉荡灾。

flink run-application -t yarn-application /path/to/job.jar

application模式的好處是Flink yarn集群可以直接從HDFS上查找并下載作業(yè)jar以及所需依賴,避免了從client機(jī)器上傳。

flink run-application -t yarn-application \
    -Dyarn.provided.lib.dirs="hdfs://myhdfs/my-remote-flink-dist-dir" \
    hdfs://myhdfs/jars/my-application.jar

其中yarn.provided.lib.dirs為Flink作業(yè)所需依賴包的地址批幌。

實(shí)際生產(chǎn)中推薦使用此模式础锐。每個作業(yè)都使用單獨(dú)的Flink集群,它們消耗的資源是互相隔離的荧缘,一個作業(yè)的崩潰不會影響到其他作業(yè)皆警。

Flink on Yarn 模式配置JDK

Flink 1.10版本以上支持使用JDK 11。從Flink 1.15開始逐漸放棄JDK 8的支持截粗。因JDK版本造成的問題不再提供修復(fù)方案信姓,建議升級JDK楣富。

Flink以standalone模式運(yùn)行的話配置JDK 11以上版本較為簡單蜂奸,為Flink配置java home可滿足要求。然而問題在于使用on Yarn模式吞鸭。Yarn穩(wěn)定起見本身在JDK 8環(huán)境運(yùn)行珊蟀。如何讓JDK 8版本的Yarn啟動JDK 11以上版本的Flink呢菊值?

我們可以使用如下兩個參數(shù):

  • containerized.master.env.: 指定on Yarn模式下Flink JobManager的環(huán)境變量。
  • containerized.taskmanager.env.: 指定on Yarn模式下Flink TaskManager的環(huán)境變量育灸。

我們可以通過上述方式腻窒,為JobManager和TaskManager指定JAVA_HOME環(huán)境變量。例如JDK 11位于/opt/jdk-11.0.22+7(需要每個Yarn NodeManager節(jié)點(diǎn)上面都安裝的有)磅崭,修改flink-conf.yaml文件儿子,增加:

containerized.master.env.JAVA_HOME: /opt/jdk-11.0.22+7
containerized.taskmanager.env.JAVA_HOME: /opt/jdk-11.0.22+7

最后重啟Yarn session,或者提交新任務(wù)砸喻,即可生效典徊。

參考文獻(xiàn):配置參數(shù) | Apache Flink

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市恩够,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌羡铲,老刑警劉巖蜂桶,帶你破解...
    沈念sama閱讀 218,755評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異也切,居然都是意外死亡扑媚,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,305評論 3 395
  • 文/潘曉璐 我一進(jìn)店門雷恃,熙熙樓的掌柜王于貴愁眉苦臉地迎上來疆股,“玉大人,你說我怎么就攤上這事倒槐⊙裕” “怎么了?”我有些...
    開封第一講書人閱讀 165,138評論 0 355
  • 文/不壞的土叔 我叫張陵,是天一觀的道長两残。 經(jīng)常有香客問我永毅,道長,這世上最難降的妖魔是什么人弓? 我笑而不...
    開封第一講書人閱讀 58,791評論 1 295
  • 正文 為了忘掉前任沼死,我火速辦了婚禮,結(jié)果婚禮上崔赌,老公的妹妹穿的比我還像新娘意蛀。我一直安慰自己,他們只是感情好健芭,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,794評論 6 392
  • 文/花漫 我一把揭開白布县钥。 她就那樣靜靜地躺著,像睡著了一般吟榴。 火紅的嫁衣襯著肌膚如雪魁蒜。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,631評論 1 305
  • 那天吩翻,我揣著相機(jī)與錄音兜看,去河邊找鬼。 笑死狭瞎,一個胖子當(dāng)著我的面吹牛细移,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播熊锭,決...
    沈念sama閱讀 40,362評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼弧轧,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了碗殷?” 一聲冷哼從身側(cè)響起精绎,我...
    開封第一講書人閱讀 39,264評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎锌妻,沒想到半個月后代乃,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,724評論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡仿粹,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,900評論 3 336
  • 正文 我和宋清朗相戀三年搁吓,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片吭历。...
    茶點(diǎn)故事閱讀 40,040評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡堕仔,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出晌区,到底是詐尸還是另有隱情摩骨,我是刑警寧澤通贞,帶...
    沈念sama閱讀 35,742評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站仿吞,受9級特大地震影響滑频,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜唤冈,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,364評論 3 330
  • 文/蒙蒙 一峡迷、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧你虹,春花似錦绘搞、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,944評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至董饰,卻和暖如春蒿褂,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背卒暂。 一陣腳步聲響...
    開封第一講書人閱讀 33,060評論 1 270
  • 我被黑心中介騙來泰國打工啄栓, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人也祠。 一個月前我還...
    沈念sama閱讀 48,247評論 3 371
  • 正文 我出身青樓昙楚,卻偏偏與公主長得像,于是被迫代替她去往敵國和親诈嘿。 傳聞我的和親對象是個殘疾皇子堪旧,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,979評論 2 355