1咬腕、廣播變量
廣播變量主要分為兩種方式:dataStream當中的廣播變量以及dataSet當中的廣播變量欢峰,這兩個地方的廣播變量還有一定的不一樣的各自的特性,一句話解釋涨共,可以理解為是一個公共的共享變量纽帖,我們可以把一個dataset 數(shù)據(jù)集廣播出去,然后不同的task在節(jié)點上都能夠獲取到举反,這個數(shù)據(jù)在每個節(jié)點上只會存在一份懊直,節(jié)約內存
1、dataStream當中的廣播分區(qū)
將數(shù)據(jù)廣播給所有的分區(qū)火鼻,數(shù)據(jù)可能會被重復處理室囊,一般用于某些公共的配置信息讀取,不會涉及到更改的數(shù)據(jù)
將公共數(shù)據(jù)廣播到所有分區(qū)
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
object FlinkBroadCast {
def main(args: Array[String]): Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
environment.setParallelism(4)
import org.apache.flink.api.scala._
val result: DataStream[String] = environment.fromElements("hello").setParallelism(1)
val resultValue: DataStream[String] = result.broadcast.map(x => {
println(x)
x
})
resultValue.print()
environment.execute()
}
}
2魁索、dataSet當中的廣播變量
廣播變量允許編程人員在每臺機器上保持1個只讀的緩存變量波俄,而不是傳送變量的副本給tasks
廣播變量創(chuàng)建后,它可以運行在集群中的任何function上蛾默,而不需要多次傳遞給集群節(jié)點懦铺。另外需要記住,不應該修改廣播變量支鸡,這樣才能確保每個節(jié)點獲取到的值都是一致的
一句話解釋冬念,可以理解為是一個公共的共享變量,我們可以把一個dataset 數(shù)據(jù)集廣播出去牧挣,然后不同的task在節(jié)點上都能夠獲取到急前,這個數(shù)據(jù)在每個節(jié)點上只會存在一份。如果不使用broadcast瀑构,則在每個節(jié)點中的每個task中都需要拷貝一份dataset數(shù)據(jù)集裆针,比較浪費內存(也就是一個節(jié)點中可能會存在多份dataset數(shù)據(jù))。
用法
1:初始化數(shù)據(jù)
DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3)
2:廣播數(shù)據(jù)
.withBroadcastSet(toBroadcast, "broadcastSetName");
3:獲取數(shù)據(jù)
Collection<Integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");
注意:
1:廣播出去的變量存在于每個節(jié)點的內存中寺晌,所以這個數(shù)據(jù)集不能太大世吨。因為廣播出去的數(shù)據(jù),會常駐內存呻征,除非程序執(zhí)行結束
2:廣播變量在初始化廣播出去以后不支持修改耘婚,這樣才能保證每個節(jié)點的數(shù)據(jù)都是一致的。
需求:求取訂單對應的商品
將訂單和商品數(shù)據(jù)進行合并成為一條數(shù)據(jù)
注意:數(shù)據(jù)格式參見附件中的orders.txt以及product.txt陆赋,商品表當中的第1個字段表示商品id沐祷,訂單表當中的第3個字段表示商品id嚷闭,字段之間都是使用,進行切割
使用廣播變量,將商品數(shù)據(jù)廣播到每一個節(jié)點赖临,然后通過訂單數(shù)據(jù)來進行join即可
代碼實現(xiàn)
import java.util
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
import scala.collection.mutable
object FlinkDataSetBroadCast {
def main(args: Array[String]): Unit = {
val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val productData: DataSet[String] = environment.readTextFile("file:///D:\開課吧課程資料\Flink實時數(shù)倉\訂單與商品表\product.txt")
val productMap = new mutable.HashMapString,String
val prouctMapSet: DataSet[mutable.HashMap[String, String]] = productData.map(x => {
val strings: Array[String] = x.split(",")
productMap.put(strings(0), x)
productMap
})
//獲取商品數(shù)據(jù)
val ordersDataset: DataSet[String] = environment.readTextFile("file:///D:\\開課吧課程資料\\Flink實時數(shù)倉\\訂單與商品表\\orders.txt")
//將商品數(shù)據(jù)轉換成為map結構胞锰,key為商品id,value為一行數(shù)據(jù)
val resultLine: DataSet[String] = ordersDataset.map(new RichMapFunction[String, String] {
var listData: util.List[Map[String, String]] = null
var allMap = Map[String, String]()
override def open(parameters: Configuration): Unit = {
this.listData = getRuntimeContext.getBroadcastVariable[Map[String, String]]("productBroadCast")
val listResult: util.Iterator[Map[String, String]] = listData.iterator()
while (listResult.hasNext) {
allMap = allMap.++(listResult.next())
}
}
//獲取到了訂單數(shù)據(jù)兢榨,將訂單數(shù)據(jù)與商品數(shù)據(jù)進行拼接成為一整
override def map(eachOrder: String): String = {
val str: String = allMap.getOrElse(eachOrder.split(",")(2),"暫時沒有值")
eachOrder + ","+str
}
}).withBroadcastSet(prouctMapSet, "productBroadCast")
resultLine.print()
environment.execute("broadCastJoin")
}
}
2胜蛉、累加器
Accumulators(累加器)是非常簡單的,通過一個add操作累加最終的結果色乾,在job執(zhí)行后可以獲取最終結果
最簡單的累加器是counter(計數(shù)器):你可以通過Accumulator.add(V value)這個方法進行遞增誊册。在任務的最后,flink會吧所有的結果進行合并暖璧,然后把最終結果發(fā)送到client端案怯。累加器在調試或者你想更快了解你的數(shù)據(jù)的時候是非常有用的。
Flink現(xiàn)在有一下內置累加器澎办。每個累加器都實現(xiàn)了Accumulator接口嘲碱。
需求:統(tǒng)計tomcat日志當中exception關鍵字出現(xiàn)了多少次
代碼實現(xiàn):
import org.apache.flink.api.common.accumulators.LongCounter
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
object FlinkCounterAndAccumulator {
def main(args: Array[String]): Unit = {
val env=ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
//統(tǒng)計tomcat日志當中exception關鍵字出現(xiàn)了多少次
val sourceDataSet: DataSet[String] = env.readTextFile("file:///D:\開課吧課程資料\Flink實時數(shù)倉\catalina.out")
sourceDataSet.map(new RichMapFunction[String,String] {
var counter=new LongCounter()
override def open(parameters: Configuration): Unit = {
getRuntimeContext.addAccumulator("my-accumulator",counter)
}
override def map(value: String): String = {
if(value.toLowerCase().contains("exception")){
counter.add(1)
}
value
}
}).setParallelism(4).writeAsText("c:\\t4")
val job=env.execute()
//獲取累加器,并打印累加器的值
val a=job.getAccumulatorResult[Long]("my-accumulator")
println(a)
}
}
3局蚀、Flink的分布式緩存DistributedCache
Flink提供了一個分布式緩存麦锯,類似于hadoop,可以使用戶在并行函數(shù)中很方便的讀取本地文件
此緩存的工作機制如下:程序注冊一個文件或者目錄(本地或者遠程文件系統(tǒng)琅绅,例如hdfs或者s3)扶欣,通過ExecutionEnvironment注冊緩存文件并為它起一個名稱。當程序執(zhí)行千扶,F(xiàn)link自動將文件或者目錄復制到所有taskmanager節(jié)點的本地文件系統(tǒng)料祠,用戶可以通過這個指定的名稱查找文件或者目錄,然后從taskmanager節(jié)點的本地文件系統(tǒng)訪問它
用法:
1:注冊一個文件
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")
2:訪問數(shù)據(jù)
File myFile = getRuntimeContext().getDistributedCache().getFile("hdfsFile");
代碼實現(xiàn):
import org.apache.commons.io.FileUtils
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
object FlinkDistributedCache {
def main(args: Array[String]): Unit = {
//將緩存文件澎羞,拿到每臺服務器的本地磁盤進行存儲髓绽,然后需要獲取的時候,直接從本地磁盤文件進行獲取
val env = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
//1:注冊分布式緩存文件
env.registerCachedFile("D:\開課吧課程資料\Flink實時數(shù)倉\advert.csv","advert")
val data = env.fromElements("hello","flink","spark","dataset")
val result = data.map(new RichMapFunction[String,String] {
override def open(parameters: Configuration): Unit = {
super.open(parameters)
val myFile = getRuntimeContext.getDistributedCache.getFile("advert")
val lines = FileUtils.readLines(myFile)
val it = lines.iterator()
while (it.hasNext){
val line = it.next();
println("line:"+line)
}
}
override def map(value: String) = {
value
}
}).setParallelism(2)
result.print()
env.execute()
}
}