flink的廣播變量臭挽,累加器,計數(shù)器以及分布式緩存

1咬腕、廣播變量
廣播變量主要分為兩種方式:dataStream當中的廣播變量以及dataSet當中的廣播變量欢峰,這兩個地方的廣播變量還有一定的不一樣的各自的特性,一句話解釋涨共,可以理解為是一個公共的共享變量纽帖,我們可以把一個dataset 數(shù)據(jù)集廣播出去,然后不同的task在節(jié)點上都能夠獲取到举反,這個數(shù)據(jù)在每個節(jié)點上只會存在一份懊直,節(jié)約內存
1、dataStream當中的廣播分區(qū)
將數(shù)據(jù)廣播給所有的分區(qū)火鼻,數(shù)據(jù)可能會被重復處理室囊,一般用于某些公共的配置信息讀取,不會涉及到更改的數(shù)據(jù)
將公共數(shù)據(jù)廣播到所有分區(qū)
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

object FlinkBroadCast {
def main(args: Array[String]): Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
environment.setParallelism(4)
import org.apache.flink.api.scala._
val result: DataStream[String] = environment.fromElements("hello").setParallelism(1)
val resultValue: DataStream[String] = result.broadcast.map(x => {
println(x)
x
})
resultValue.print()
environment.execute()
}
}

2魁索、dataSet當中的廣播變量
廣播變量允許編程人員在每臺機器上保持1個只讀的緩存變量波俄,而不是傳送變量的副本給tasks
廣播變量創(chuàng)建后,它可以運行在集群中的任何function上蛾默,而不需要多次傳遞給集群節(jié)點懦铺。另外需要記住,不應該修改廣播變量支鸡,這樣才能確保每個節(jié)點獲取到的值都是一致的
一句話解釋冬念,可以理解為是一個公共的共享變量,我們可以把一個dataset 數(shù)據(jù)集廣播出去牧挣,然后不同的task在節(jié)點上都能夠獲取到急前,這個數(shù)據(jù)在每個節(jié)點上只會存在一份。如果不使用broadcast瀑构,則在每個節(jié)點中的每個task中都需要拷貝一份dataset數(shù)據(jù)集裆针,比較浪費內存(也就是一個節(jié)點中可能會存在多份dataset數(shù)據(jù))。
用法
1:初始化數(shù)據(jù)
DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3)
2:廣播數(shù)據(jù)
.withBroadcastSet(toBroadcast, "broadcastSetName");
3:獲取數(shù)據(jù)
Collection<Integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");
注意:
1:廣播出去的變量存在于每個節(jié)點的內存中寺晌,所以這個數(shù)據(jù)集不能太大世吨。因為廣播出去的數(shù)據(jù),會常駐內存呻征,除非程序執(zhí)行結束
2:廣播變量在初始化廣播出去以后不支持修改耘婚,這樣才能保證每個節(jié)點的數(shù)據(jù)都是一致的。
需求:求取訂單對應的商品
將訂單和商品數(shù)據(jù)進行合并成為一條數(shù)據(jù)
注意:數(shù)據(jù)格式參見附件中的orders.txt以及product.txt陆赋,商品表當中的第1個字段表示商品id沐祷,訂單表當中的第3個字段表示商品id嚷闭,字段之間都是使用,進行切割
使用廣播變量,將商品數(shù)據(jù)廣播到每一個節(jié)點赖临,然后通過訂單數(shù)據(jù)來進行join即可
代碼實現(xiàn)

import java.util
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
import scala.collection.mutable

object FlinkDataSetBroadCast {
def main(args: Array[String]): Unit = {
val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val productData: DataSet[String] = environment.readTextFile("file:///D:\開課吧課程資料\Flink實時數(shù)倉\訂單與商品表\product.txt")
val productMap = new mutable.HashMapString,String

val prouctMapSet: DataSet[mutable.HashMap[String, String]] = productData.map(x => {
  val strings: Array[String] = x.split(",")
  productMap.put(strings(0), x)
  productMap
})

//獲取商品數(shù)據(jù)
val ordersDataset: DataSet[String] = environment.readTextFile("file:///D:\\開課吧課程資料\\Flink實時數(shù)倉\\訂單與商品表\\orders.txt")

//將商品數(shù)據(jù)轉換成為map結構胞锰,key為商品id,value為一行數(shù)據(jù)
val resultLine: DataSet[String] = ordersDataset.map(new RichMapFunction[String, String] {
  var listData: util.List[Map[String, String]] = null
  var allMap = Map[String, String]()

  override def open(parameters: Configuration): Unit = {
    this.listData = getRuntimeContext.getBroadcastVariable[Map[String, String]]("productBroadCast")
    val listResult: util.Iterator[Map[String, String]] = listData.iterator()
    while (listResult.hasNext) {
      allMap =  allMap.++(listResult.next())
    }
  }

  //獲取到了訂單數(shù)據(jù)兢榨,將訂單數(shù)據(jù)與商品數(shù)據(jù)進行拼接成為一整
  override def map(eachOrder: String): String = {
    val str: String = allMap.getOrElse(eachOrder.split(",")(2),"暫時沒有值")
    eachOrder + ","+str
  }
}).withBroadcastSet(prouctMapSet, "productBroadCast")
resultLine.print()
environment.execute("broadCastJoin")

}
}

2胜蛉、累加器
Accumulators(累加器)是非常簡單的,通過一個add操作累加最終的結果色乾,在job執(zhí)行后可以獲取最終結果
最簡單的累加器是counter(計數(shù)器):你可以通過Accumulator.add(V value)這個方法進行遞增誊册。在任務的最后,flink會吧所有的結果進行合并暖璧,然后把最終結果發(fā)送到client端案怯。累加器在調試或者你想更快了解你的數(shù)據(jù)的時候是非常有用的。
Flink現(xiàn)在有一下內置累加器澎办。每個累加器都實現(xiàn)了Accumulator接口嘲碱。
需求:統(tǒng)計tomcat日志當中exception關鍵字出現(xiàn)了多少次
代碼實現(xiàn):
import org.apache.flink.api.common.accumulators.LongCounter
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration

object FlinkCounterAndAccumulator {

def main(args: Array[String]): Unit = {
val env=ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
//統(tǒng)計tomcat日志當中exception關鍵字出現(xiàn)了多少次
val sourceDataSet: DataSet[String] = env.readTextFile("file:///D:\開課吧課程資料\Flink實時數(shù)倉\catalina.out")

sourceDataSet.map(new RichMapFunction[String,String] {

  var counter=new LongCounter()

  override def open(parameters: Configuration): Unit = {
    getRuntimeContext.addAccumulator("my-accumulator",counter)
  }
  override def map(value: String): String = {
    if(value.toLowerCase().contains("exception")){
      counter.add(1)

    }
    value
  }
}).setParallelism(4).writeAsText("c:\\t4")

val job=env.execute()
//獲取累加器,并打印累加器的值
val a=job.getAccumulatorResult[Long]("my-accumulator")
println(a)

}
}

3局蚀、Flink的分布式緩存DistributedCache
Flink提供了一個分布式緩存麦锯,類似于hadoop,可以使用戶在并行函數(shù)中很方便的讀取本地文件
此緩存的工作機制如下:程序注冊一個文件或者目錄(本地或者遠程文件系統(tǒng)琅绅,例如hdfs或者s3)扶欣,通過ExecutionEnvironment注冊緩存文件并為它起一個名稱。當程序執(zhí)行千扶,F(xiàn)link自動將文件或者目錄復制到所有taskmanager節(jié)點的本地文件系統(tǒng)料祠,用戶可以通過這個指定的名稱查找文件或者目錄,然后從taskmanager節(jié)點的本地文件系統(tǒng)訪問它
用法:
1:注冊一個文件
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")
2:訪問數(shù)據(jù)
File myFile = getRuntimeContext().getDistributedCache().getFile("hdfsFile");
代碼實現(xiàn):
import org.apache.commons.io.FileUtils
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration

object FlinkDistributedCache {
def main(args: Array[String]): Unit = {
//將緩存文件澎羞,拿到每臺服務器的本地磁盤進行存儲髓绽,然后需要獲取的時候,直接從本地磁盤文件進行獲取
val env = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
//1:注冊分布式緩存文件
env.registerCachedFile("D:\開課吧課程資料\Flink實時數(shù)倉\advert.csv","advert")
val data = env.fromElements("hello","flink","spark","dataset")
val result = data.map(new RichMapFunction[String,String] {

  override def open(parameters: Configuration): Unit = {
    super.open(parameters)
    val myFile = getRuntimeContext.getDistributedCache.getFile("advert")
    val lines = FileUtils.readLines(myFile)
    val it = lines.iterator()
    while (it.hasNext){
      val line = it.next();
      println("line:"+line)
    }
  }
  override def map(value: String) = {
    value
  }
}).setParallelism(2)
result.print()
env.execute()

}
}

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末妆绞,一起剝皮案震驚了整個濱河市顺呕,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌括饶,老刑警劉巖株茶,帶你破解...
    沈念sama閱讀 206,214評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異巷帝,居然都是意外死亡忌卤,警方通過查閱死者的電腦和手機扫夜,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,307評論 2 382
  • 文/潘曉璐 我一進店門楞泼,熙熙樓的掌柜王于貴愁眉苦臉地迎上來驰徊,“玉大人,你說我怎么就攤上這事堕阔」鞒В” “怎么了?”我有些...
    開封第一講書人閱讀 152,543評論 0 341
  • 文/不壞的土叔 我叫張陵超陆,是天一觀的道長牺弹。 經常有香客問我,道長时呀,這世上最難降的妖魔是什么张漂? 我笑而不...
    開封第一講書人閱讀 55,221評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮谨娜,結果婚禮上航攒,老公的妹妹穿的比我還像新娘。我一直安慰自己趴梢,他們只是感情好漠畜,可當我...
    茶點故事閱讀 64,224評論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著坞靶,像睡著了一般憔狞。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上彰阴,一...
    開封第一講書人閱讀 49,007評論 1 284
  • 那天瘾敢,我揣著相機與錄音,去河邊找鬼尿这。 笑死廉丽,一個胖子當著我的面吹牛,可吹牛的內容都是我干的妻味。 我是一名探鬼主播正压,決...
    沈念sama閱讀 38,313評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼责球!你這毒婦竟也來了焦履?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 36,956評論 0 259
  • 序言:老撾萬榮一對情侶失蹤雏逾,失蹤者是張志新(化名)和其女友劉穎嘉裤,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體栖博,經...
    沈念sama閱讀 43,441評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡屑宠,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 35,925評論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了仇让。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片典奉。...
    茶點故事閱讀 38,018評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡躺翻,死狀恐怖,靈堂內的尸體忽然破棺而出卫玖,到底是詐尸還是另有隱情公你,我是刑警寧澤,帶...
    沈念sama閱讀 33,685評論 4 322
  • 正文 年R本政府宣布假瞬,位于F島的核電站陕靠,受9級特大地震影響,放射性物質發(fā)生泄漏脱茉。R本人自食惡果不足惜剪芥,卻給世界環(huán)境...
    茶點故事閱讀 39,234評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望琴许。 院中可真熱鬧粗俱,春花似錦、人聲如沸虚吟。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,240評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽串慰。三九已至偏塞,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間邦鲫,已是汗流浹背灸叼。 一陣腳步聲響...
    開封第一講書人閱讀 31,464評論 1 261
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留庆捺,地道東北人古今。 一個月前我還...
    沈念sama閱讀 45,467評論 2 352
  • 正文 我出身青樓,卻偏偏與公主長得像滔以,于是被迫代替她去往敵國和親捉腥。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 42,762評論 2 345