數(shù)據(jù)分析實(shí)踐 | flink | 流程優(yōu)化篇

0x01flink執(zhí)行流程了解一下

流程如下:

flink執(zhí)行流程
由一個(gè)Source數(shù)據(jù)處理新蟆,結(jié)果分發(fā)到四個(gè)窗口進(jìn)行處理。

0x02表象:

flink需要優(yōu)化氨肌,最先表現(xiàn)出來(lái)的現(xiàn)狀就是:
窗口中使用metric體現(xiàn)出每秒的數(shù)據(jù)處理量很低粟瞬,或停止喷楣。

1.代碼中添加metric使用方法可參考:https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html
2.如果使用flink dashboard也可以使用metric?功能進(jìn)行統(tǒng)計(jì)

此處以flink dashboard為例。

0x03問(wèn)題點(diǎn)及優(yōu)化:

1.數(shù)據(jù)反壓

數(shù)據(jù)體現(xiàn)(背壓(Backpressure)機(jī)制):
-> 一個(gè)window中數(shù)據(jù)處理的速率慢
-> 導(dǎo)致Source數(shù)據(jù)處理過(guò)程越來(lái)越慢
-> 再導(dǎo)致所有窗口處理越來(lái)越慢丹弱。
dashboard體現(xiàn):

dashboard可以在背壓這里看到HIGH時(shí)氮帐,則存在數(shù)據(jù)反壓?jiǎn)栴}。


flink數(shù)據(jù)反壓

反壓邏輯:
若流程為A-B-C-D-E-F 扎阶,ABCDE出現(xiàn)反壓(即這里status為high)汹胃,則表示F處理流程導(dǎo)致E -> D-> C->B ->A 相繼慢。

優(yōu)化方式:

1.數(shù)據(jù)標(biāo)記分流[詳細(xì)代碼見(jiàn)通用優(yōu)化]
2.窗口優(yōu)化[詳細(xì)代碼見(jiàn)通用優(yōu)化]

2.數(shù)據(jù)傾斜

在多進(jìn)程環(huán)境下:

數(shù)據(jù)體現(xiàn):
-> 每個(gè)窗口中所有數(shù)據(jù)的分布不平均东臀,某個(gè)窗口處理數(shù)據(jù)量太大導(dǎo)致速率慢着饥。
-> 導(dǎo)致Source數(shù)據(jù)處理過(guò)程越來(lái)越慢
-> 再導(dǎo)致所有窗口處理越來(lái)越慢。
dashboard體現(xiàn):
dashboard中Subtasks中打開(kāi)每個(gè)窗口可以看到每個(gè)窗口進(jìn)程的運(yùn)行情況:
flink數(shù)據(jù)傾斜

如上圖惰赋,數(shù)據(jù)分布很不均勻宰掉,導(dǎo)致部分窗口數(shù)據(jù)處理緩慢。

優(yōu)化方式:

1.數(shù)據(jù)標(biāo)記分流[詳細(xì)代碼見(jiàn)通用優(yōu)化]
2.窗口優(yōu)化[詳細(xì)代碼見(jiàn)通用優(yōu)化]
3.在不影響邏輯的前提下赁濒,keyby對(duì)數(shù)據(jù)分流時(shí)選擇較為均勻的數(shù)據(jù)贵扰。

3.消費(fèi)滯后

尚未出現(xiàn)數(shù)據(jù)反壓和數(shù)據(jù)傾斜的狀況,但是flink的watermarks追不上實(shí)時(shí)時(shí)間流部,不能實(shí)時(shí)處理戚绕。

需單進(jìn)程確認(rèn)點(diǎn):

1. flink讀取的數(shù)據(jù)是否產(chǎn)生的及時(shí)。
2. 窗口Aggregate處理是否存在死循環(huán)或較慢的點(diǎn)
    (如:正則/redis/http等)
3. flink計(jì)算結(jié)果的輸出處理慢枝冀。
    (如:使用.disablechain.addsink()后再在dashboard中查看窗口和輸出分別處理的速率)

可優(yōu)化點(diǎn):

  1. 將窗口的處理邏輯優(yōu)化的簡(jiǎn)單一些舞丛,將較長(zhǎng)時(shí)間的處理放在數(shù)據(jù)處理部分或windowFunction部分耘子。

4.需在窗口內(nèi)做大量的外連情況,如redis/es等球切,redis連接過(guò)多會(huì)慢或直接報(bào)錯(cuò)谷誓。[2019.11.17更新]

解決方案:

1.可以在窗口外面申請(qǐng)全局redis連接池作為全局變量。

class MyProcessWindowFunction extends RichWindowFunction[Accumulator,String,String,TimeWindow] {
  @transient var config_redis = new JedisPoolConfig()
  config_redis.setMaxTotal(300)
  config_redis.setMaxWaitMillis (2*1000)

  @transient var jedisPool = new JedisPool(config_redis,"10.10.10.10.",1234,0,"yourpassword")
  @transient var client = Esinit() // 此處為es外聯(lián)的申明
  @transient var log = LoggerFactory.getLogger(getClass)
  //其他的一些全局變量也可以在這里定義吨凑,如log
  LoginCheck_api.KeepSession() 
  //檢查保持狀態(tài)的函數(shù)也可以在這里處理捍歪,這樣不會(huì)每個(gè)窗口都處理一遍。

  override def apply (key: String, window: TimeWindow, input: Iterable[Accumulator], out: Collector[String]): Unit = {
    ... 
    //窗口如果定義為null則重新做定義
    if(jedisPool==null){
      w_log = LoggerFactory.getLogger(getClass)
    
      config_redis = new JedisPoolConfig()
      config_redis.setMaxTotal(300)
      config_redis.setMaxWaitMillis (2*1000)
      jedisPool = new JedisPool(config_redis,"10.10.10.10.",1234,0,"yourpassword")
      LoginCheck_api.KeepSession()
    }
    if(client==null){
      client = Esinit()
    }
    ...

2.網(wǎng)絡(luò)延時(shí)問(wèn)題[2019.12.4更新]
場(chǎng)景:flink反壓鸵钝,且排查redis無(wú)太多慢查日志
檢查提交集群對(duì)redis的延時(shí)情況糙臼,正常應(yīng)該在0.099ms以內(nèi)不會(huì)影響到程序的處理過(guò)程。

3.將對(duì)外操作放進(jìn)單獨(dú)多線程操作(如果上述兩個(gè)問(wèn)題都解決不了問(wèn)題)[2019.12.4更新]
以redis舉例:

import java.util.concurrent.{Callable, ExecutorService, Executors, FutureTask}

import redis.clients.jedis.{JedisPool, JedisPoolConfig}

object ThreadPool {
  var config_redis = new JedisPoolConfig()
  config_redis.setMaxTotal(500)
  config_redis.setMaxIdle(500)
  config_redis.setBlockWhenExhausted(false)
  config_redis.setMaxWaitMillis (1000)
  config_redis.setMinEvictableIdleTimeMillis(6000)
  config_redis.setTimeBetweenEvictionRunsMillis(3000)
  var jedisPool = new JedisPool(config_redis,"10.10.10.10",1234,0,"yourpassword")
  val threadPool:ExecutorService=Executors.newFixedThreadPool(500)
  def sadd(key:String,value:String):Int= {
    var r = 1
    try {
      val future=new FutureTask[String](new Callable[String] {
        override def call():String = {
          var isexists = 1L //sadd返回1為添加成功恩商,0為已存在/添加不成功
          var jedis = jedisPool.getResource
          try{
            isexists = jedis.sadd(bolt_url,id_str)
          }catch{
            case e=>
          }finally {
            jedis.close()
          }
          return isexists.toString
        }
      })
      threadPool.execute(future)
      r = future.get().toInt //導(dǎo)出結(jié)果
      if(r==1){
      ...//邏輯操作
      }else{
      ...//邏輯操作
      }
    }finally {
//      threadPool.shutdown()
    }
    return r//可選擇是否返回結(jié)果
  }

  def main (args: Array[String]): Unit = {
    var t =sadd("a","b")
    println(t) 
    threadPool.shutdown()
  }


}

而后在窗口中調(diào)用ThreadPool.sadd方法变逃,獲取到redis操作結(jié)果后的邏輯操作也可在窗口外進(jìn)行,窗口只負(fù)責(zé)調(diào)度怠堪。

5.通用優(yōu)化:

1.數(shù)據(jù)標(biāo)記分流:

使用數(shù)據(jù)標(biāo)記過(guò)濾進(jìn)入窗口的數(shù)據(jù)揽乱,
而非使用filter,map等方式去篩選數(shù)據(jù)。
split分流 select選擇分流. 

val frequency_ = Features.split(
        (s:Map[String,Any])=>
          s.get("method").get.toString  match{
          
            case "a"|"b"|"c"|
                  => List("str")
            case "1"|"2"
                  =>List("int")
            case _
                  =>List("normal")
          }

      )

val all = frequency_.select("str","int").assignTimestampsAndWatermarks(new TimestampExtractor())

all.keyby().aggregate()
      
      ...
      
     
    
Ps. https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/
  
2.窗口聚合計(jì)算

window apply窗口最后觸發(fā)時(shí)進(jìn)行一次性計(jì)算 aggregate來(lái)一條數(shù)據(jù)計(jì)算一次粟矿。

Ps.https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/
3.keyby關(guān)鍵詞無(wú)法自行選擇較均勻的情況下凰棉,
可以采用keyby(Random(20)+key)的形式進(jìn)行分配窗口。

最好的方式:
原有DataStream中添加專門用于分窗口的字段,但是可能會(huì)影響你窗口聚合的結(jié)果陌粹。

def dealing_input(str):(String,String){
    val keyby_key = scala.util.Random.nextInt(20).toString+"-"+key
    return (data,keyby_key)
}

input.keyby(_._2).window().xxx
如何在處理完將隨時(shí)數(shù)去掉請(qǐng)參考另一篇文章:
http://www.reibang.com/p/1bca3c2758c1


遇坑待更新

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末撒犀,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子申屹,更是在濱河造成了極大的恐慌绘证,老刑警劉巖隧膏,帶你破解...
    沈念sama閱讀 217,734評(píng)論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件哗讥,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡胞枕,警方通過(guò)查閱死者的電腦和手機(jī)杆煞,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,931評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)腐泻,“玉大人决乎,你說(shuō)我怎么就攤上這事∨勺” “怎么了构诚?”我有些...
    開(kāi)封第一講書人閱讀 164,133評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)铆惑。 經(jīng)常有香客問(wèn)我范嘱,道長(zhǎng)送膳,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書人閱讀 58,532評(píng)論 1 293
  • 正文 為了忘掉前任丑蛤,我火速辦了婚禮叠聋,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘受裹。我一直安慰自己碌补,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,585評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布棉饶。 她就那樣靜靜地躺著厦章,像睡著了一般。 火紅的嫁衣襯著肌膚如雪砰盐。 梳的紋絲不亂的頭發(fā)上闷袒,一...
    開(kāi)封第一講書人閱讀 51,462評(píng)論 1 302
  • 那天,我揣著相機(jī)與錄音岩梳,去河邊找鬼囊骤。 笑死,一個(gè)胖子當(dāng)著我的面吹牛冀值,可吹牛的內(nèi)容都是我干的也物。 我是一名探鬼主播,決...
    沈念sama閱讀 40,262評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼列疗,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼滑蚯!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起抵栈,我...
    開(kāi)封第一講書人閱讀 39,153評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤告材,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后古劲,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體斥赋,經(jīng)...
    沈念sama閱讀 45,587評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,792評(píng)論 3 336
  • 正文 我和宋清朗相戀三年产艾,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了疤剑。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,919評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡闷堡,死狀恐怖隘膘,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情杠览,我是刑警寧澤弯菊,帶...
    沈念sama閱讀 35,635評(píng)論 5 345
  • 正文 年R本政府宣布,位于F島的核電站踱阿,受9級(jí)特大地震影響管钳,放射性物質(zhì)發(fā)生泄漏吨悍。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,237評(píng)論 3 329
  • 文/蒙蒙 一蹋嵌、第九天 我趴在偏房一處隱蔽的房頂上張望育瓜。 院中可真熱鬧,春花似錦栽烂、人聲如沸躏仇。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 31,855評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)焰手。三九已至,卻和暖如春怀喉,著一層夾襖步出監(jiān)牢的瞬間书妻,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 32,983評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工躬拢, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留躲履,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,048評(píng)論 3 370
  • 正文 我出身青樓聊闯,卻偏偏與公主長(zhǎng)得像工猜,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子菱蔬,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,864評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容

  • 本文是先介紹 Flink篷帅,再說(shuō) Flink的過(guò)去和現(xiàn)在 一、Flink介紹 Flink是一款分布式的計(jì)算引擎拴泌,它可...
    生活的探路者閱讀 1,282評(píng)論 0 22
  • 1.Flink架構(gòu)及特性分析 Flink是個(gè)相當(dāng)早的項(xiàng)目,開(kāi)始于2008年削茁,但只在最近才得到注意宙枷。Flink是原生...
    生活的探路者閱讀 1,664評(píng)論 0 5
  • 任何人都無(wú)法超越這個(gè)時(shí)代里的知識(shí)去對(duì)未來(lái)做出猜測(cè)掉房,最大膽的想象也只是根據(jù)已有的現(xiàn)實(shí)去揣測(cè)出來(lái)的茧跋,科技的進(jìn)步和人類的...
    末日魚言閱讀 170評(píng)論 0 0
  • 聽(tīng)到最多的就是說(shuō)看小說(shuō)的人容易移了性情瘾杭,最初,我是不懂的哪亿,為何這般描述粥烁,當(dāng)我漸漸發(fā)現(xiàn)自身多愁善感起來(lái)贤笆,才驚覺(jué),讀書...
    留盈閱讀 1,433評(píng)論 1 0
  • 馬云一直是本人的偶像埋涧,不管外界對(duì)他如何評(píng)判,我始終保持對(duì)他的敬佩奇瘦。他身上有很多東西值得我們學(xué)習(xí)的棘催。 比如在演講方面...
    簡(jiǎn)小塵閱讀 1,165評(píng)論 0 3