0x01flink執(zhí)行流程了解一下
流程如下:
由一個(gè)Source數(shù)據(jù)處理新蟆,結(jié)果分發(fā)到四個(gè)窗口進(jìn)行處理。
0x02表象:
flink需要優(yōu)化氨肌,最先表現(xiàn)出來(lái)的現(xiàn)狀就是:
窗口中使用metric體現(xiàn)出每秒的數(shù)據(jù)處理量很低粟瞬,或停止喷楣。
1.代碼中添加metric使用方法可參考:https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html
2.如果使用flink dashboard也可以使用metric?功能進(jìn)行統(tǒng)計(jì)
此處以flink dashboard為例。
0x03問(wèn)題點(diǎn)及優(yōu)化:
1.數(shù)據(jù)反壓
數(shù)據(jù)體現(xiàn)(背壓(Backpressure)機(jī)制):
-> 一個(gè)window中數(shù)據(jù)處理的速率慢
-> 導(dǎo)致Source數(shù)據(jù)處理過(guò)程越來(lái)越慢
-> 再導(dǎo)致所有窗口處理越來(lái)越慢丹弱。
dashboard體現(xiàn):
dashboard可以在背壓這里看到HIGH時(shí)氮帐,則存在數(shù)據(jù)反壓?jiǎn)栴}。
反壓邏輯:
若流程為A-B-C-D-E-F 扎阶,ABCDE出現(xiàn)反壓(即這里status為high)汹胃,則表示F處理流程導(dǎo)致E -> D-> C->B ->A 相繼慢。
優(yōu)化方式:
1.數(shù)據(jù)標(biāo)記分流[詳細(xì)代碼見(jiàn)通用優(yōu)化]
2.窗口優(yōu)化[詳細(xì)代碼見(jiàn)通用優(yōu)化]
2.數(shù)據(jù)傾斜
在多進(jìn)程環(huán)境下:
數(shù)據(jù)體現(xiàn):
-> 每個(gè)窗口中所有數(shù)據(jù)的分布不平均东臀,某個(gè)窗口處理數(shù)據(jù)量太大導(dǎo)致速率慢着饥。
-> 導(dǎo)致Source數(shù)據(jù)處理過(guò)程越來(lái)越慢
-> 再導(dǎo)致所有窗口處理越來(lái)越慢。
dashboard體現(xiàn):
dashboard中Subtasks中打開(kāi)每個(gè)窗口可以看到每個(gè)窗口進(jìn)程的運(yùn)行情況:
如上圖惰赋,數(shù)據(jù)分布很不均勻宰掉,導(dǎo)致部分窗口數(shù)據(jù)處理緩慢。
優(yōu)化方式:
1.數(shù)據(jù)標(biāo)記分流[詳細(xì)代碼見(jiàn)通用優(yōu)化]
2.窗口優(yōu)化[詳細(xì)代碼見(jiàn)通用優(yōu)化]
3.在不影響邏輯的前提下赁濒,keyby對(duì)數(shù)據(jù)分流時(shí)選擇較為均勻的數(shù)據(jù)贵扰。
3.消費(fèi)滯后
尚未出現(xiàn)數(shù)據(jù)反壓和數(shù)據(jù)傾斜的狀況,但是flink的watermarks追不上實(shí)時(shí)時(shí)間流部,不能實(shí)時(shí)處理戚绕。
需單進(jìn)程確認(rèn)點(diǎn):
1. flink讀取的數(shù)據(jù)是否產(chǎn)生的及時(shí)。
2. 窗口Aggregate處理是否存在死循環(huán)或較慢的點(diǎn)
(如:正則/redis/http等)
3. flink計(jì)算結(jié)果的輸出處理慢枝冀。
(如:使用.disablechain.addsink()后再在dashboard中查看窗口和輸出分別處理的速率)
可優(yōu)化點(diǎn):
- 將窗口的處理邏輯優(yōu)化的簡(jiǎn)單一些舞丛,將較長(zhǎng)時(shí)間的處理放在數(shù)據(jù)處理部分或windowFunction部分耘子。
4.需在窗口內(nèi)做大量的外連情況,如redis/es等球切,redis連接過(guò)多會(huì)慢或直接報(bào)錯(cuò)谷誓。[2019.11.17更新]
解決方案:
1.可以在窗口外面申請(qǐng)全局redis連接池作為全局變量。
class MyProcessWindowFunction extends RichWindowFunction[Accumulator,String,String,TimeWindow] {
@transient var config_redis = new JedisPoolConfig()
config_redis.setMaxTotal(300)
config_redis.setMaxWaitMillis (2*1000)
@transient var jedisPool = new JedisPool(config_redis,"10.10.10.10.",1234,0,"yourpassword")
@transient var client = Esinit() // 此處為es外聯(lián)的申明
@transient var log = LoggerFactory.getLogger(getClass)
//其他的一些全局變量也可以在這里定義吨凑,如log
LoginCheck_api.KeepSession()
//檢查保持狀態(tài)的函數(shù)也可以在這里處理捍歪,這樣不會(huì)每個(gè)窗口都處理一遍。
override def apply (key: String, window: TimeWindow, input: Iterable[Accumulator], out: Collector[String]): Unit = {
...
//窗口如果定義為null則重新做定義
if(jedisPool==null){
w_log = LoggerFactory.getLogger(getClass)
config_redis = new JedisPoolConfig()
config_redis.setMaxTotal(300)
config_redis.setMaxWaitMillis (2*1000)
jedisPool = new JedisPool(config_redis,"10.10.10.10.",1234,0,"yourpassword")
LoginCheck_api.KeepSession()
}
if(client==null){
client = Esinit()
}
...
2.網(wǎng)絡(luò)延時(shí)問(wèn)題[2019.12.4更新]
場(chǎng)景:flink反壓鸵钝,且排查redis無(wú)太多慢查日志
檢查提交集群對(duì)redis的延時(shí)情況糙臼,正常應(yīng)該在0.099ms以內(nèi)不會(huì)影響到程序的處理過(guò)程。
3.將對(duì)外操作放進(jìn)單獨(dú)多線程操作(如果上述兩個(gè)問(wèn)題都解決不了問(wèn)題)[2019.12.4更新]
以redis舉例:
import java.util.concurrent.{Callable, ExecutorService, Executors, FutureTask}
import redis.clients.jedis.{JedisPool, JedisPoolConfig}
object ThreadPool {
var config_redis = new JedisPoolConfig()
config_redis.setMaxTotal(500)
config_redis.setMaxIdle(500)
config_redis.setBlockWhenExhausted(false)
config_redis.setMaxWaitMillis (1000)
config_redis.setMinEvictableIdleTimeMillis(6000)
config_redis.setTimeBetweenEvictionRunsMillis(3000)
var jedisPool = new JedisPool(config_redis,"10.10.10.10",1234,0,"yourpassword")
val threadPool:ExecutorService=Executors.newFixedThreadPool(500)
def sadd(key:String,value:String):Int= {
var r = 1
try {
val future=new FutureTask[String](new Callable[String] {
override def call():String = {
var isexists = 1L //sadd返回1為添加成功恩商,0為已存在/添加不成功
var jedis = jedisPool.getResource
try{
isexists = jedis.sadd(bolt_url,id_str)
}catch{
case e=>
}finally {
jedis.close()
}
return isexists.toString
}
})
threadPool.execute(future)
r = future.get().toInt //導(dǎo)出結(jié)果
if(r==1){
...//邏輯操作
}else{
...//邏輯操作
}
}finally {
// threadPool.shutdown()
}
return r//可選擇是否返回結(jié)果
}
def main (args: Array[String]): Unit = {
var t =sadd("a","b")
println(t)
threadPool.shutdown()
}
}
而后在窗口中調(diào)用ThreadPool.sadd方法变逃,獲取到redis操作結(jié)果后的邏輯操作也可在窗口外進(jìn)行,窗口只負(fù)責(zé)調(diào)度怠堪。
5.通用優(yōu)化:
1.數(shù)據(jù)標(biāo)記分流:
使用數(shù)據(jù)標(biāo)記過(guò)濾進(jìn)入窗口的數(shù)據(jù)揽乱,
而非使用filter,map等方式去篩選數(shù)據(jù)。
split分流 select選擇分流.
val frequency_ = Features.split(
(s:Map[String,Any])=>
s.get("method").get.toString match{
case "a"|"b"|"c"|
=> List("str")
case "1"|"2"
=>List("int")
case _
=>List("normal")
}
)
val all = frequency_.select("str","int").assignTimestampsAndWatermarks(new TimestampExtractor())
all.keyby().aggregate()
...
Ps. https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/
2.窗口聚合計(jì)算
window apply窗口最后觸發(fā)時(shí)進(jìn)行一次性計(jì)算 aggregate來(lái)一條數(shù)據(jù)計(jì)算一次粟矿。
Ps.https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/
3.keyby關(guān)鍵詞無(wú)法自行選擇較均勻的情況下凰棉,
可以采用keyby(Random(20)+key)的形式進(jìn)行分配窗口。
最好的方式:
原有DataStream中添加專門用于分窗口的字段,但是可能會(huì)影響你窗口聚合的結(jié)果陌粹。
def dealing_input(str):(String,String){
val keyby_key = scala.util.Random.nextInt(20).toString+"-"+key
return (data,keyby_key)
}
input.keyby(_._2).window().xxx
如何在處理完將隨時(shí)數(shù)去掉請(qǐng)參考另一篇文章:
http://www.reibang.com/p/1bca3c2758c1
遇坑待更新