Flink 使用介紹相關(guān)文檔目錄
Flink讀取配置文件的方式
Flink常用參數(shù)配置
注意:Flink配置項(xiàng)名稱隨版本的升級變化較大陶珠。本文的配置項(xiàng)名稱以Flink 1.19 1.20版本為準(zhǔn)。如果讀者使用其他版本的Flink,請先查詢官網(wǎng)對應(yīng)的配置項(xiàng)名稱耕腾,以免使用錯誤的配置無法生效。
端口地址
jobmanger.rpc.address: JobManager的地址棒假。Yarn模式會自動配置該選項(xiàng)麸锉。
jobmanager.rpc.port: JobManager的端口號。
jobmanager.bind-host: JobManager綁定的host扎即。在Yarn模式下如果配置為localhost會被忽略,默認(rèn)設(shè)置為0.0.0.0横缔。
taskmanager.bing-host: 同上铺遂。針對TaskManager綁定的host。
rest.port Flink rest接口和 web ui的端口號茎刚。
rest.bind-address: rest接口綁定的地址襟锐,如果要支持多網(wǎng)訪問,需要配置為0.0.0.0膛锭。
history.web.port 基于web的history server的端口號粮坞。
jobmanager.archive.fs.dir 將已完成的任務(wù)歸檔存儲的目錄。
historyserver.archive.fs.dir history server的歸檔目錄初狰。該配置必須包含jobmanager.archive.fs.dir
配置的目錄莫杈,以便history server能夠讀取到已完成的任務(wù)信息。
資源配置
JobManager內(nèi)存配置:
jobmanager.memory.process.size: JobManager的進(jìn)程總內(nèi)存大小奢入。
注意筝闹,內(nèi)存的配置建議使用進(jìn)程總內(nèi)存大小的配置方式。Flink會自動調(diào)整內(nèi)部各部分內(nèi)存的大小。
jobmanager.memory.flink.size:JobManager中可供Flink使用的內(nèi)存关顷。包含head和off-heap內(nèi)存糊秆。不包括JVM metaspace和JVM overhead占用的內(nèi)存。
jobmanager.memory.heap.size:JobManager的堆內(nèi)存大小议双。
jobmanager.memory.off-heap.size:off-heap占用的內(nèi)存大小痘番。
jobmanager.memory.jvm-metaspace.size:metaspace占用的內(nèi)存大小。
TaskManager內(nèi)存配置:
taskmanager.memory.process.size:TaskManager的進(jìn)程總內(nèi)存大小平痰。
taskmanager.memory.flink.size:TaskManager的進(jìn)程總內(nèi)存大小中可供Flink使用的內(nèi)存汞舱。包含head和off-heap內(nèi)存。不包括JVM metaspace和JVM overhead占用的內(nèi)存宗雇。
taskmanager.memory.framework.off-heap.batch-shuffle.size:批模式shuffle內(nèi)存限制昂芜。
taskmanager.memory.framework.heap.size:Flink框架的heap內(nèi)存。Task slot無法使用該部分內(nèi)存逾礁。
taskmanager.memory.framework.off-heap.size:Flink框架使用的off-heap內(nèi)存大小说铃。Task slot無法使用該部分內(nèi)存。
taskmanager.memory.jvm-metaspace.size:JVM metaspace內(nèi)存大小嘹履。
taskmanager.memory.managed.size:Memory manager管理的內(nèi)存大小腻扇。可供作業(yè)排序砾嫉,哈希表幼苛,中間結(jié)果緩存和rocksdb狀態(tài)后端使用。
taskmanager.memory.network.fraction:用于網(wǎng)絡(luò)緩存部分的內(nèi)存占比焕刮。
taskmanager.memory.process.size:用于網(wǎng)絡(luò)緩存部分的內(nèi)存大小舶沿。
taskmanager.memory.task.heap.size:作業(yè)可使用的heap內(nèi)存大小。
taskmanager.memory.task.off-heap.size:作業(yè)可使用的off-heap內(nèi)存大小配并。
taskmanager.numberOfTaskSlots: slot數(shù)量括荡。在yarn模式使用的時候會受到yarn.scheduler.maximum-allocation-vcores
值的影響。此處指定的slot數(shù)量如果超過yarn的maximum-allocation-vcores溉旋,flink啟動會報(bào)錯畸冲。在Yarn模式,flink啟動的task manager個數(shù)可以參照如下計(jì)算公式:
num_of_tm = ceil(parallelism / slot)
即并行度除以slot個數(shù)观腊,結(jié)果向上取整邑闲。
parallelism.default: 任務(wù)默認(rèn)并行度,如果任務(wù)未指定并行度梧油,將采用此設(shè)置苫耸。
Flink內(nèi)存模型圖
參見:
配置 Flink 進(jìn)程的內(nèi)存 | Apache Flink
狀態(tài)后端
state.backend.type:狀態(tài)后端的類型±茉桑可以使用hashmap
或者rocksdb
褪子。
execution.checkpointing.dir:保存檢查點(diǎn)目錄量淌。需要配置為分布式文件系統(tǒng),所有的集群節(jié)點(diǎn)都能夠訪問到褐筛。例如:hdfs:///flink-checkpoints
类少。
execution.checkpointing.savepoint-dir save point的目錄叙身。類似于execution.checkpointing.dir
渔扎。
execution.checkpointing.num-retained 保留最近檢查點(diǎn)的數(shù)量。
execution.checkpointing.incremental: 是否開啟增量checkpoint信轿。開啟可減少每次checkpoint寫入的數(shù)據(jù)量晃痴,減少checkpoint的耗時。
execution.checkpointing.interval:checkpoint時間間隔财忽。
execution.checkpointing.min-pause:兩次相鄰checkpoint的最小間隔時間倘核。該配置項(xiàng)的目的是防止系統(tǒng)checkpoint運(yùn)行時間過長,占據(jù)太多的時間比例即彪。
execution.checkpointing.externalized-checkpoint-retention:checkpoint的保留機(jī)制紧唱。有如下三個配置值:
- "DELETE_ON_CANCELLATION": Checkpoint state is only kept when the owning job fails. It is deleted if the job is cancelled.
- "RETAIN_ON_CANCELLATION": Checkpoint state is kept when the owning job is cancelled or fails.
- "NO_EXTERNALIZED_CHECKPOINTS": Externalized checkpoints are disabled.
execution.checkpointing.max-concurrent-checkpoints:最多同時有多少個checkpoint同時進(jìn)行。
execution.checkpointing.mode:checkpoint模式隶校。支持如下兩種模式: - "EXACTLY_ONCE"
- "AT_LEAST_ONCE"
execution.checkpointing.timeout:checkpoint的超時時間漏益。
execution.checkpointing.tolerable-failed-checkpoints:容許連續(xù)失敗的checkpoint操作次數(shù)。
execution.checkpointing.unaligned:是否啟用unaligned checkpoint深胳。
心跳
heartbeat.interval:心跳時間間隔绰疤。
heartbeat.timeout:心跳超時時間。
網(wǎng)絡(luò)
taskmanager.network.netty.client.connectTimeoutSec:taskmanager的客戶端連接超時的時間舞终,默認(rèn)為120s轻庆。
taskmanager.network.netty.sendReceiveBufferSize:netty的發(fā)送和接收的緩沖區(qū)大小。
taskmanager.network.netty.client.tcp.keepCount:連接保持?jǐn)?shù)量敛劝。
taskmanager.network.netty.client.tcp.keepIdleSec:連接空間多少秒之后余爆,開始發(fā)送keepalive probe。
taskmanager.network.netty.client.tcp.keepIntervalSec:keepalive probe發(fā)送的間隔時間夸盟。
Flink HA(Job Manager)的配置
Standalone模式使用Zookeeper協(xié)助HA模式的JobManager選主蛾方。
high-availability.type: zookeeper 使用zookeeper負(fù)責(zé)HA實(shí)現(xiàn)。
zookeeper.sasl.disable: true 是否使用SASL方式連接Zookeeper满俗。默認(rèn)啟用转捕。
high-availability.zookeeper.path.root: /flink flink信息在zookeeper存儲節(jié)點(diǎn)的名稱
high-availability.zookeeper.quorum: zk1,zk2,zk3 zookeeper集群節(jié)點(diǎn)的地址和端口
high-availability.storageDir: hdfs://nameservice/flink/ha/ JobManager元數(shù)據(jù)在文件系統(tǒng)儲存的位置。
注意:需要確定集群使用的Flink版本和Zookeeper版本是否兼容唆垃。例如Flink1.15.x放棄了對Zookeeper 3.4的支持五芝。
Yarn模式建議配置Yarn的Application Master重試次數(shù)。例如修改flink-conf.yaml
辕万,增加:
yarn.application-attempts: 10
注意:該配置值受到Y(jié)arn配置項(xiàng)
yarn.resourcemanager.am.max-attempts
的限制枢步,實(shí)際允許的重試次數(shù)為min(yarn.application-attempts, yarn.resourcemanager.am.max-attempts)
詳細(xì)參見:Flink HA 部署
Flink metrics 監(jiān)控相關(guān)配置
下面例子中為Prometheus監(jiān)控的相關(guān)配置沉删。Flink定期將監(jiān)控?cái)?shù)據(jù)往push gateway。
metrics.reporter.promgateway.factory.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterFactory
metrics.reporter.promgateway.hostUrl: http://localhost:9091
metrics.reporter.promgateway.jobName: myJob
metrics.reporter.promgateway.randomJobNameSuffix: true
metrics.reporter.promgateway.deleteOnShutdown: false
metrics.reporter.promgateway.groupingKey: k1=v1;k2=v2
metrics.reporter.promgateway.interval: 60 SECONDS
Prometheus具體部署和使用方式參見:Flink 配置Prometheus監(jiān)控
緩沖區(qū)超時配置
該配置項(xiàng)可以平衡吞吐量和延遲醉途。設(shè)置值的解釋如下:
- -1:直到緩沖區(qū)填滿才發(fā)送矾瑰,最大化吞吐量
- 0:一有數(shù)據(jù)就立刻發(fā)送,最小化延遲
- 其他值:在配置時間后發(fā)送隘擎,如果在配置時間內(nèi)緩沖區(qū)填滿殴穴,則立刻發(fā)送
設(shè)置方法:
env.setBufferTimeout(xxx)
提交應(yīng)用時參數(shù)配置
注意:per-job模式或者application模式可以為某個作業(yè)單獨(dú)指定JM和TM的資源消耗。資源的消耗情況應(yīng)該以能扛住高峰時段的數(shù)據(jù)處理壓力為準(zhǔn)货葬〔苫希可提前對集群進(jìn)行壓測,記錄極限情況的資源使用量震桶。
JobManager內(nèi)存
yarn-session:-jm 2048
yarn-cluster:-yjm 2048
TaskManager內(nèi)存
yarn-session:-tm 2048
yarn-cluster:-ytm 2048
每個TaskManager 的slot個數(shù)
yarn-session:-s 8
yarn-cluster:-ys 8
通過系統(tǒng)變量方式配置
還可以在提交作業(yè)的時候使用-D
參數(shù)配置休傍。支持的參數(shù)如下:
-Dyarn.application.queue=test \ 指定yarn隊(duì)列
-Djobmanager.memory.process.size=2048mb \ 指定JM的總進(jìn)程大小
-Dtaskmanager.memory.process.size=2048mb \ 指定每個TM的總進(jìn)程大小
-Dtaskmanager.numberOfTaskSlots=2 \ 指定每個TM的slot數(shù)
Kafka相關(guān)調(diào)優(yōu)配置
linger.ms/batch.size
這兩個配置項(xiàng)配合使用,可以在吞吐量和延遲中得到最佳的平衡點(diǎn)蹲姐。batch.size是kafka producer發(fā)送數(shù)據(jù)的批量大小磨取,當(dāng)數(shù)據(jù)量達(dá)到batch size的時候,會將這批數(shù)據(jù)發(fā)送出去柴墩,避免了數(shù)據(jù)一條一條的發(fā)送忙厌,頻繁建立和斷開網(wǎng)絡(luò)連接。但是如果數(shù)據(jù)量比較小拐邪,導(dǎo)致遲遲不能達(dá)到batch.size慰毅,為了保證延遲不會過大,kafka不能無限等待數(shù)據(jù)量達(dá)到batch.size的時候才發(fā)送扎阶。為了解決這個問題汹胃,引入了linger.ms
配置項(xiàng)。當(dāng)數(shù)據(jù)在緩存中的時間超過linger.ms
時东臀,無論緩存中數(shù)據(jù)是否達(dá)到批量大小着饥,都會被強(qiáng)制發(fā)送出去。
ack 數(shù)據(jù)源是否需要kafka得到確認(rèn)惰赋。all表示需要收到所有ISR節(jié)點(diǎn)的確認(rèn)信息宰掉,1表示只需要收到kafka leader的確認(rèn)信息,0表示不需要任何確認(rèn)信息赁濒。該配置項(xiàng)需要對數(shù)據(jù)精準(zhǔn)性和延遲吞吐量做出權(quán)衡轨奄。
Kafka topic分區(qū)數(shù)和Flink并行度的關(guān)系
Flink Kafka source的并行度需要和kafka topic的分區(qū)數(shù)一致。最大化利用kafka多分區(qū)topic的并行讀取能力拒炎。由于一個Kafka分區(qū)只能被一個消費(fèi)者消費(fèi)挪拟,因此一定要確保Flink Kafka source的并行度不要大于Kafka分區(qū)數(shù),否則有些計(jì)算資源會空閑击你。如果并行度和分區(qū)數(shù)相同配置后玉组,消費(fèi)數(shù)據(jù)的速度仍然跟不上生產(chǎn)數(shù)據(jù)的速度谎柄,需要加大Kafka的分區(qū)數(shù)。
同理惯雳,如果Sink端也是Kafka朝巫,sink的并行度盡量和Kafka分區(qū)數(shù)一致。
Yarn相關(guān)調(diào)優(yōu)配置
yarn.scheduler.maximum-allocation-vcores
yarn.scheduler.minimum-allocation-vcores
Flink單個task manager的slot數(shù)量必須介于這兩個值之間石景。
yarn.scheduler.maximum-allocation-mb
yarn.scheduler.minimum-allocation-mb
Flink的job manager 和task manager內(nèi)存不得超過container最大分配內(nèi)存大小劈猿。
yarn.nodemanager.resource.cpu-vcores yarn的虛擬CPU內(nèi)核數(shù),建議設(shè)置為物理CPU核心數(shù)的2-3倍鸵钝,如果設(shè)置過少糙臼,會導(dǎo)致CPU資源無法被充分利用,跑任務(wù)的時候CPU占用率不高恩商。
更多配置參見:Flink 使用之 Yarn 資源問題排查
數(shù)據(jù)傾斜問題
幾個解決套路:
- 盡量使用大的并行度
- 避免使用Map作為分區(qū)字段數(shù)據(jù)類型,同樣避免使用String
- 避免使用windowAll等無法并發(fā)操作的算子
- 使用合適的分區(qū)方法
分區(qū)方法有如下幾種:
- shuffle:隨機(jī)分區(qū)
- rebalance:輪詢
- rescale:每個源的數(shù)據(jù)分散給下游必逆,類似于一對多怠堪,這點(diǎn)和rebalance不同
- broadcast:廣播
- 自定義分區(qū),一個例子如下所示:
stream.partitionCustom(new Partitioner[String] {
override def partition(key: String, numPartitions: Int): Int = {
key.hashCode % numPartitions
}
}, s => s.charAt(0).toString)
這個例子取首字母作為key名眉,使用key的hashCode和分區(qū)數(shù)取余后的值作為分區(qū)編號粟矿。
并行度配置
并行度一般配置為CPU核心數(shù)的2-3倍。
并行度配置的幾個層次如下所示损拢。從上到下作用范圍依次增大陌粹,但是上面的配置可以覆蓋下面的配置。
- 算子層次福压。算子的
setParallelism
方法 - 執(zhí)行環(huán)境層次掏秩。
env
的setParallelism
方法 - 客戶端層次。提交任務(wù)時候的
-p
參數(shù) - 系統(tǒng)層次荆姆。
flink-conf.yaml
配置文件蒙幻,parallelism.default
配置項(xiàng)。
JVM參數(shù)配置
以增加GC日志為例胆筒,修改"conf/flink-conf.yaml
配置文件的env.java.opts.all
參數(shù)邮破,增加:
-Xloggc:<LOG_DIR>/gc_log.log
-XX:+PrintGCDetails
-XX:-OmitStackTraceInFastThrow
-XX:+PrintGCTimeStamps
-XX:+PrintGCDateStamps
-XX:+UseGCLogFileRotation
-XX:NumberOfGCLogFiles=10
-XX:GCLogFileSize=50M
其中
<LOG_DIR>
變量的值是JobManager或者TaskManager的日志路徑。如果是on Yarn 模式運(yùn)行仆救,該變量指向的是container的log目錄抒和。
除了使用env.java.opts.all
對Flink所有的進(jìn)程JVM參數(shù)統(tǒng)一配置外,還可以使用如下參數(shù)單獨(dú)配置Flink中的角色:
- env.java.opts.client: 配置Flink Client
- env.java.opts.historyserver: 配置History server
- env.java.opts.jobmanager: 配置Job manager
- env.java.opts.taskmanager: 配置Task manager
Yarn模式TaskManager駐留時間
Flink Yarn模式的TaskManager生命周期時隨著任務(wù)的彤蔽。有時候?yàn)榱苏{(diào)試摧莽,發(fā)現(xiàn)任務(wù)停止(或者異常終止)后沒多久TaskManager就被自動銷毀。來不及去頁面觀察問題和閱讀日志等铆惑。為了解決這個問題范嘱,我們可以修改TaskManager的駐留時間送膳。可修改如下配置:
- resourcemanager.taskmanager-timeout: 默認(rèn)為30000
冗余的TaskManager數(shù)量
前面章節(jié)提到TaskManager數(shù)量時根據(jù)如下公式計(jì)算的:
num_of_tm = ceil(parallelism / slot)
這個公式計(jì)算出的TaskManager是剛好夠用的丑蛤。如果任何一個TaskManager出現(xiàn)故障退出叠聋,F(xiàn)link需要重新啟動一個新的TaskManager代替。
我們可以配置冗余的TaskManager數(shù)量受裹。意思是除了上面公式計(jì)算出的最小TaskManager數(shù)量外碌补,F(xiàn)link還會額外啟動一些TaskManager。這樣當(dāng)TaskManager出現(xiàn)故障之時棉饶,不用等待新的TaskManager啟動厦章,冗余的TaskManager會立刻取代故障的TaskManager開始工作,縮短了故障恢復(fù)的等待時間照藻。配置方法如下:
- slotmanager.redundant-taskmanager-num: 冗余的task manager數(shù)量袜啃。
Checkpoint
Checkpoint周期性進(jìn)行。如果checkpoint操作耗時比checkpoint間隔時間還長幸缕,在上一個checkpoint未完成的時候群发,即便到了下一個checkpoint觸發(fā)時間,新的checkpoint操作不會立即開始发乔。只有在前一個checkpoint完成之后下一個checkpoint才能開始熟妓。這種情況下checkpoint會連續(xù)進(jìn)行,嚴(yán)重影響系統(tǒng)性能栏尚。
為了避免這種情況起愈,可以指定checkpoint之間的最小時間間隔。方法如下:
StreamExecutionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(milliseconds)
注意:可以在應(yīng)用中通過配置CheckpointConfig
译仗,可以允許多個checkpoint過程同步執(zhí)行抬虽。
除此之外,checkpoint還支持更多的配置:
// 開啟Checkpoint古劲,設(shè)置間隔時間
env.enableCheckpointing(TimeUnit.MINUTES.toMillis(10));
// 配置 Checkpoint
CheckpointConfig checkpointConf = env.getCheckpointConfig();
// 啟用EXACTLY_ONCE模式斥赋,使用Unaligned Checkpoint,保證數(shù)據(jù)精準(zhǔn)一次投送产艾,但會略微增大延遲
// 啟用AT_LEAST_ONCE模式疤剑,barrier不會對齊,投送數(shù)據(jù)可能會重復(fù)闷堡,但是延遲很低
checkpointConf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 最小間隔時間隘膘,上面已介紹過
checkpointConf.setMinPauseBetweenCheckpoints(TimeUnit.MINUTES.toMillis(10))
// 超時時間。如果超過這個時間checkpoint操作仍未完成杠览,checkpoint會被廢棄
checkpointConf.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(10));
// 作業(yè)取消后checkpoint仍然保留(需要人工清理)
checkpointConf.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
使用RocksDB backend
RocksDB支持增量checkpoint弯菊,相比全量checkpoint而言耗時更短。
啟用增量checkpoint方法踱阿,在flink-conf.yaml中增加:
execution.checkpointing.incremental: true
也可以在代碼中設(shè)置:
Configuration config = new Configuration();
config.set(StateBackendOptions.STATE_BACKEND, "rocksdb");
env.configure(config);
RocksDB調(diào)優(yōu)參見: https://mp.weixin.qq.com/s/YpDi3BV8Me3Ay4hzc0nPQA
調(diào)整SSTable的block和block cache
state.backend.rocksdb.block.blocksize
state.backend.rocksdb.block.cache-size
經(jīng)過實(shí)踐這兩個參數(shù)值對checkpoint性能影響較大管钳。
使用全局參數(shù)
可以通過全局參數(shù)的方式钦铁,將參數(shù)從JobManager傳遞給各個TaskManager。
在JobManager中注冊全局參數(shù)(ParameterTool是可序列化的):
env.getConfig().setGlobalJobParameters(ParameterTool.fromArgs(args));
TaskManager中才漆,通過Rich函數(shù)中使用如下方式獲取全局參數(shù):
ParameterTool parameterTool = (ParameterTool)getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
使用分布式緩存
上一節(jié)介紹了如何為各個TaskManager傳遞參數(shù)牛曹。這里的分布式緩存則用于向各個TaskManager分發(fā)文件。
注意:
AsyncFunction
不支持分布式緩存醇滥,直接使用會拋出異常黎比。
首先,需要在JobManager注冊需要分發(fā)的文件鸳玩。注冊的文件由JobManager發(fā)送給各個TaskManager阅虫,保存在TM運(yùn)行環(huán)境的臨時目錄中。
val env = ExecutionEnvironment.getExecutionEnvironment
env.registerCachedFile("d:\\data\\file\\a.txt","b.txt")
然后TaskManager使用的時候不跟,通過Rich函數(shù)拉取文件:
getRuntimeContext.getDistributedCache.getFile("b.txt")
反壓分析
性能瓶頸測試方法
測試反壓可以快速的定位流處理系統(tǒng)的性能瓶頸所在颓帝。先在Kafka中積攢一批數(shù)據(jù),然后在使用Flink消費(fèi)躬拢,就好比水庫泄洪躲履,很容易找到下游性能薄弱的環(huán)節(jié)。
反壓的可能原因
反壓的原因可能會有:
- 短時間的負(fù)載高峰聊闯,超過流處理系統(tǒng)極限
- 下游sink負(fù)載變大,數(shù)據(jù)無法及時輸出
- GC壓力過大米诉,停頓時間太長
- 某個算子作業(yè)過于復(fù)雜菱蔬,執(zhí)行耗時較長
- 集群網(wǎng)絡(luò)波動,上游傳遞給下游的網(wǎng)絡(luò)通道受阻
定位方式
定位之前禁用掉OperatorChain史侣,這樣原本chain到一起的多個算子會分開拴泌,方便我們更精細(xì)的定位性能瓶頸。
- 看頁面:查看Flink Web UI中對應(yīng)算子的Back Pressure頁面惊橱,如果各個SubTask顯示的結(jié)果為High蚪腐,說明該算子存在反壓情況。
- 看監(jiān)控:查看算子的inPoolUsage監(jiān)控項(xiàng)税朴。如果數(shù)值過高回季,說明存在反壓。
找到反壓算子之后正林,我們可以使用Flame Graph火焰圖泡一,來分析每個方法調(diào)用的耗時,從而找到耗時較長的方法觅廓。
開啟火焰圖的方法:
在flink-conf.yaml
中配置鼻忠。
參數(shù) | 默認(rèn)值 | 含義 |
---|---|---|
rest.flamegraph.enabled | false | 是否開啟火焰圖 |
rest.flamegraph.cleanup-interval | 10min | 統(tǒng)計(jì)信息的緩存清除時間 |
rest.flamegraph.delay-between-samples | 50 ms | 構(gòu)建 FlameGraph 的單個堆棧跟蹤樣本之間的延遲 |
rest.flamegraph.num-samples | 100 | 構(gòu)建flamegraph的采樣數(shù) |
rest.flamegraph.refresh-interval | 1 min | 火焰圖刷新的時間間隔 |
rest.flamegraph.stack-depth | 100 | 創(chuàng)建FlameGraphs 的堆棧跟蹤的最大深度 |
一些通用的方法:
- 優(yōu)化反壓算子的業(yè)務(wù)邏輯代碼
- 調(diào)用外部系統(tǒng)使用AsyncFunction
- 增大TM的內(nèi)存資源
- 增大反壓算子的并行度
- 減少反壓上游算子的并行度
分區(qū)空閑檢測
Barrier對齊環(huán)節(jié)(一個算子有多個input)需要收集齊各個input的watermark才放行數(shù)據(jù),如果某一個input的數(shù)據(jù)量很少杈绸,導(dǎo)致該input遲遲收不到watermark帖蔓,則整個數(shù)據(jù)鏈路會被阻塞矮瘟。
為了解決這個問題,F(xiàn)link提供了分區(qū)空閑檢測功能塑娇,如果某個input在一段時間內(nèi)沒有收到數(shù)據(jù)澈侠,會被標(biāo)記為空閑。在barrier對齊環(huán)節(jié)钝吮,這個input上面的watermark會被忽略埋涧。
配置分區(qū)空閑判定時間的方法如下:
SourceFunction.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofMinutes(2))
.withIdleness(Duration.ofMinutes(5))
);
作業(yè)提交方式
參考鏈接:YARN | Apache Flink
Flink作業(yè)提交的方式分為application模式,per-job模式和session模式奇瘦。
per-job模式
在Yarn創(chuàng)建一個Flink集群棘催,然后在提交任務(wù)客戶端所在機(jī)器本地運(yùn)行作業(yè)jar的main方法,提交生成的JobGraph到Flink集群的JobManager耳标。如果附帶--detached
參數(shù)醇坝,提交的作業(yè)被accept的時候,客戶端會停止運(yùn)行(命令行不用一直開著次坡,生產(chǎn)環(huán)境必須呼猪。開發(fā)測試時可不帶--detached
參數(shù),通過命令行查看運(yùn)行日志)砸琅。
flink run -t yarn-per-job --detached /path/to/job.jar
session模式
首先啟動Flink Yarn Session宋距,它是一個常駐與Yarn的Flink集群。啟動成功后症脂,無論是否有作業(yè)執(zhí)行谚赎,或者作業(yè)是否執(zhí)行完畢,該session始終保持運(yùn)行诱篷。啟動yarn session的方法如下:
export HADOOP_CLASSPATH=`hadoop classpath`
./bin/yarn-session.sh --detached
yarn-session支持的相關(guān)參數(shù)解釋:
- -d/--detached: Detach模式壶唤,session啟動成功后client停止運(yùn)行。不用保持控制臺一直開啟棕所。
- -nm: Application名稱
- -jm: Job Manager 容器的內(nèi)存
- -tm: Task Manager 容器的內(nèi)存
- -t: 傳送文件至集群闸盔,使用相對路徑。程序中讀取文件仍使用相對路徑
- -qu: 指定使用的Yarn隊(duì)列
提交作業(yè)到Y(jié)arn session:
flink run -t yarn-session \
-Dyarn.application.id=application_XXXX_YY \
/path/to/job.jar
停止Flink Yarn session可以通過Yarn UI的kill按鈕琳省。當(dāng)然也可以通過如下方式:
echo "stop" | ./bin/yarn-session.sh -id application_XXXXX_XXX
注意:一個Flink Yarn Session可以同時跑多個Flink作業(yè)迎吵。
application模式
和per-job模式類似,提交一次任務(wù)會在Yarn運(yùn)行一個Flink集群岛啸。不同之處為作業(yè)jar包的main方法在Yarn集群的JobManager上運(yùn)行钓觉,而不是提交作業(yè)的client端運(yùn)行。作業(yè)執(zhí)行完畢后坚踩,F(xiàn)link on yarn集群會被關(guān)閉荡灾。
flink run-application -t yarn-application /path/to/job.jar
application模式的好處是Flink yarn集群可以直接從HDFS上查找并下載作業(yè)jar以及所需依賴,避免了從client機(jī)器上傳。
flink run-application -t yarn-application \
-Dyarn.provided.lib.dirs="hdfs://myhdfs/my-remote-flink-dist-dir" \
hdfs://myhdfs/jars/my-application.jar
其中yarn.provided.lib.dirs
為Flink作業(yè)所需依賴包的地址批幌。
實(shí)際生產(chǎn)中推薦使用此模式础锐。每個作業(yè)都使用單獨(dú)的Flink集群,它們消耗的資源是互相隔離的荧缘,一個作業(yè)的崩潰不會影響到其他作業(yè)皆警。
Flink on Yarn 模式配置JDK
Flink 1.10版本以上支持使用JDK 11。從Flink 1.15開始逐漸放棄JDK 8的支持截粗。因JDK版本造成的問題不再提供修復(fù)方案信姓,建議升級JDK楣富。
Flink以standalone模式運(yùn)行的話配置JDK 11以上版本較為簡單蜂奸,為Flink配置java home可滿足要求。然而問題在于使用on Yarn模式吞鸭。Yarn穩(wěn)定起見本身在JDK 8環(huán)境運(yùn)行珊蟀。如何讓JDK 8版本的Yarn啟動JDK 11以上版本的Flink呢菊值?
我們可以使用如下兩個參數(shù):
- containerized.master.env.: 指定on Yarn模式下Flink JobManager的環(huán)境變量。
- containerized.taskmanager.env.: 指定on Yarn模式下Flink TaskManager的環(huán)境變量育灸。
我們可以通過上述方式腻窒,為JobManager和TaskManager指定JAVA_HOME
環(huán)境變量。例如JDK 11位于/opt/jdk-11.0.22+7
(需要每個Yarn NodeManager節(jié)點(diǎn)上面都安裝的有)磅崭,修改flink-conf.yaml
文件儿子,增加:
containerized.master.env.JAVA_HOME: /opt/jdk-11.0.22+7
containerized.taskmanager.env.JAVA_HOME: /opt/jdk-11.0.22+7
最后重啟Yarn session,或者提交新任務(wù)砸喻,即可生效典徊。
參考文獻(xiàn):配置參數(shù) | Apache Flink