安裝部署
Standalone Cluster
- 安裝配置
首先基礎(chǔ)的Java環(huán)境1.7x以上,ssh等等盖奈。
到Flink官網(wǎng)下載好相應(yīng)的包狐援,注意hadoop和scala的版本,然后解壓等蚊俺, 并進(jìn)入解壓后的Flink根目錄逛万。然后我們對其進(jìn)行相關(guān)的配置批钠。主要涉及到的配置文件是conf/flink-conf.yaml
設(shè)置以下比較重要的參數(shù):
jobmanager.rpc.address:10.0.0.1 # 設(shè)置成你master節(jié)點(diǎn)的IP地址
jobmanager.rpc.port:6123
jobmanager.heap.mb:512
taskmanager.heap.mb:1024
taskmanager.numberOfTaskSlots:2
parallelism.default:2
taskmanager.tmp.dirs:/tmp
jobmanager.web.port: 8081 # web ui端口號
將要作為worker節(jié)點(diǎn)的IP(或者是hostname)地址存放在conf/slaves文件中埋心,就像HDFS配置一樣,每個(gè)IP地址必須放在一行.
設(shè)置JAVA_HOME 環(huán)境變量,或者設(shè)置env.java.home為jdk的路徑
- 啟動
在flink根目錄下: bin/start-cluster.sh - 在已經(jīng)運(yùn)行的集群中添加JobManager/TaskManager
bin/jobmanager.sh (start cluster)|stop|stop-all
bin/taskmanager.sh start|stop|stop-all - 停止
bin/stop-cluster.sh
Flink On YARN
要求集群環(huán)境中已正確安裝hadoop并配置好相應(yīng)的環(huán)境及變量闲坎。Flink on yarn模式不需要額外的Flink配置腰懂,只需要集群特定的一些環(huán)境變量生效即可项秉,具體:
export SCALA_HOME=/data/scala
export HADOOP_CONF_DIR=/etc/hadoop/conf
export YARN_CONF_DIR=/etc/hadoop/conf
export FLINK_HOME=/data/flink/latest
在YARN上啟動Flink主要有兩種方式:
- 啟動一個(gè)YARN session(Start a long-running Flink cluster on YARN)
- 在Flink的根目錄下:bin/yarn-session.sh -n 4 -tm 1024 -s 2
上面的命令啟動了4個(gè)TaskManager娄蔼,每個(gè)TaskManager內(nèi)存為1G,且開啟了2 TaskSlots的yarn的Flink session環(huán)境锚沸。 通過Flink的web ui可以看到啟動集群的物理和環(huán)境信息涕癣。 - 在這個(gè)session環(huán)境下属划,可以通過flink run命令,啟動Flink任務(wù):
flink run -m hnode4:34707 /data/flink/latest/examples/batch/WordCount.jar --input hdfs:///tmp/yarn/photo_test.csv
一般情況下run選項(xiàng)提交作業(yè)到y(tǒng)arn绽昼,client端可以自動找到JobManager的地址须蜗,但是本人在實(shí)驗(yàn)時(shí)有問題所以通過-m 制定
- 直接在YARN上提交運(yùn)行Flink作業(yè)(Run a Flink job on YARN)
- Flink同樣支持在yarn中啟動一個(gè)獨(dú)立的Flink作業(yè)。具體的命令參數(shù)如下:
flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 1024 /data/flink/latest/examples/batch/WordCount.jar --input hdfs:///tmp/yarn/photo_test.csv --output hdfs:///tmp/yarn/result_out1 - 這種方式如果client端斷開缭付,session是會斷開的循未。Flink提供了一種detached YARN session的妖,啟動時(shí)候加上參數(shù)-d或--detached
示例分析
- 流處理一
package com.jiuyan.flink.stream
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource
object WikipediaAnalysis {
def main(args: Array[String]) {
val see = StreamExecutionEnvironment.getExecutionEnvironment
//val see = StreamExecutionEnvironment.createLocalEnvironment(2); //創(chuàng)建一個(gè)本地的執(zhí)行環(huán)境嫂粟,可以用于本機(jī)調(diào)試代碼
val edits = see.addSource(new WikipediaEditsSource()) // 這是一個(gè)Wikipedia IRC log的數(shù)據(jù)流
val result = edits.keyBy(_.getUser)
.timeWindow(Time.seconds(5)) //定一個(gè)5s的時(shí)間窗口
.fold(("",0l))((acc,event) => {
val user = event.getUser
val diff = acc._2 + event.getByteDiff
(user,diff)
});
result.print
see.execute
}
}
一些說明:
a. 窗口分類,按分割標(biāo)準(zhǔn)劃分:timeWindow零抬、countWindow宽涌,按窗口行為劃分:Tumbling Window护糖、Sliding Window、自定義窗口
b. flod函數(shù)锰扶,就是一個(gè)折疊操作寝受『艹危可以將KeyedStream → DataStream,如:val result: DataStream[String] = keyedStream.fold("start")((str, i) => { str + "-" + i })蹂楣。 初始為'sart'; 對于一個(gè)int值的序列 (1,2,3,4,5), 將產(chǎn)生序列 "start-1", "start-1-2", "start-1-2-3", ...
c. 如果出現(xiàn)could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent] 的異常痊土,通常是因?yàn)槌绦蛐枰粋€(gè)隱式參數(shù)(implicit parameter)墨林,導(dǎo)入scala的包信息即可,具體:import org.apache.flink.api.scala._
- 流處理二
這個(gè)示例的大致邏輯:在每100毫秒產(chǎn)生一批車速事件的數(shù)據(jù)流中酌呆,計(jì)算10秒內(nèi)隙袁,汽車每行進(jìn)50米內(nèi)的最高速度。具體到實(shí)現(xiàn)邏輯猜揪,根據(jù)carId聚合坛梁,將10s的內(nèi)事件數(shù)據(jù)匯集到一個(gè)窗口划咐,當(dāng)兩個(gè)事件的距離值的差值大于50钧萍,觸發(fā)窗口計(jì)算风瘦。
package com.jiuyan.flink.stream
import java.beans.Transient
import java.util.concurrent.TimeUnit
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger
import scala.language.postfixOps
import scala.util.Random
object TopSpeedWindowing {
case class CarEvent(carId: Int, speed: Int, distance: Double, time: Long)
val numOfCars = 2
val evictionSec = 10
val triggerMeters = 50d
def main(args: Array[String]) {
val params = ParameterTool.fromArgs(args)//參數(shù)解析的工具
//val env = StreamExecutionEnvironment.createLocalEnvironment(1)
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.setGlobalJobParameters(params)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val cars =
if (params.has("input")) {
env.readTextFile(params.get("input"))
.map(parseMap(_))
.map(x => CarEvent(x._1, x._2, x._3, x._4))
} else {
println("Executing TopSpeedWindowing example with default inputs data set.")
println("Use --input to specify file input.")
env.addSource(new SourceFunction[CarEvent]() {
val speeds = Array.fill[Integer](numOfCars)(50)
val distances = Array.fill[Double](numOfCars)(0d)
@Transient lazy val rand = new Random()
var isRunning:Boolean = true
override def run(ctx: SourceContext[CarEvent]) = {
while (isRunning) {
Thread.sleep(100)
for (carId <- 0 until numOfCars) {
if (rand.nextBoolean) speeds(carId) = Math.min(100, speeds(carId) + 5)
else speeds(carId) = Math.max(0, speeds(carId) - 5)
distances(carId) += speeds(carId) / 3.6d
val record = CarEvent(carId, speeds(carId),
distances(carId), System.currentTimeMillis)
ctx.collect(record)
}
}
}
override def cancel(): Unit = isRunning = false
})
}
val topSeed = cars
.assignAscendingTimestamps( _.time )
.keyBy("carId")
.window(GlobalWindows.create) // GlobalWindow是一個(gè)全局窗口
.evictor(TimeEvictor.of(Time.of(evictionSec * 1000, TimeUnit.MILLISECONDS))) //給定的保留時(shí)間(keep time)作為剔除規(guī)則
.trigger(DeltaTrigger.of(triggerMeters, new DeltaFunction[CarEvent] {
def getDelta(oldSp: CarEvent, newSp: CarEvent): Double = newSp.distance - oldSp.distance
}, cars.getType().createSerializer(env.getConfig)))
// .window(Time.of(evictionSec * 1000, (car : CarEvent) => car.time))
// .every(Delta.of[CarEvent](triggerMeters,
// (oldSp,newSp) => newSp.distance-oldSp.distance, CarEvent(0,0,0,0)))
.maxBy("speed")
if (params.has("output")) {
topSeed.writeAsText(params.get("output"))
} else {
println("Printing result to stdout. Use --output to specify output path.")
topSeed.print()
}
env.execute("TopSpeedWindowing")
}
def parseMap(line : String): (Int, Int, Double, Long) = {
val record = line.substring(1, line.length - 1).split(",")
(record(0).toInt, record(1).toInt, record(2).toDouble, record(3).toLong)
}
}
一些說明:
a. 窗口的確定,一般需要定義窗口的類型(GlobalWindows)瞬雹,窗口元素的剔除條件(TimeEvictor)酗捌,以及窗口的出發(fā)條件(DeltaTrigger),以及序列化方式尚镰。DeltaTrigger是基于DeltaFunction和一個(gè)給定的閾值觸發(fā)哪廓,該觸發(fā)器在最后一個(gè)到達(dá)元素和當(dāng)前元素之間計(jì)算一個(gè)delta值跟給定的閾值比較撩独,如果高于給定的閾值账月,則觸發(fā)局齿。
b. 在帶key的數(shù)據(jù)流上應(yīng)用window操作橄登,在無key的數(shù)據(jù)流上應(yīng)用windowAll谣妻。在帶key的數(shù)據(jù)流上以及進(jìn)行并行計(jì)算充坑,而非key的數(shù)據(jù)流不可也榄。
c. 窗口信息參考
- 批處理
這個(gè)示例描述是基礎(chǔ)的page rank算法棵介。
對于算法的一些背景信息,可以參考:http://www.cnblogs.com/rubinorth/p/5799848.html。
對于Markov(馬可夫)過程的信息可參考: http://blog.csdn.net/weaponsun/article/details/50007411
package com.jiuyan.flink.batch
import java.lang.Iterable
import org.apache.flink.api.common.functions.GroupReduceFunction
import org.apache.flink.api.java.aggregation.Aggregations.SUM
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
import org.apache.flink.util.Collector
import scala.collection.JavaConverters._
object PageRankBasic {
private final val DAMPENING_FACTOR: Double = 0.85
private final val EPSILON: Double = 0.0001
def main(args: Array[String]) {
val params: ParameterTool = ParameterTool.fromArgs(args)
// set up execution environment
//val env = ExecutionEnvironment.getExecutionEnvironment
val env = ExecutionEnvironment.createLocalEnvironment(2)
// make parameters available in the web interface
env.getConfig.setGlobalJobParameters(params)
// read input data
val (pages, numPages) = getPagesDataSet(env, params)
val links = getLinksDataSet(env, params)
val maxIterations = params.getInt("iterations", 10)
// assign initial ranks to pages
val pagesWithRanks = pages.map(p => Page(p, 1.0 / numPages)).withForwardedFields("*->pageId") //表示map的輸入轉(zhuǎn)發(fā)到Page的pageId字段
// build adjacency list from link input
val adjacencyLists = links
.groupBy("sourceId").reduceGroup( new GroupReduceFunction[Link, AdjacencyList] {
override def reduce(values: Iterable[Link], out: Collector[AdjacencyList]): Unit = {
var outputId = -1L
val outputList = values.asScala map { t => outputId = t.sourceId; t.targetId }
out.collect(new AdjacencyList(outputId, outputList.toArray))
}
})
// start iteration
val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) {
currentRanks =>
val newRanks = currentRanks
// distribute ranks to target pages
.join(adjacencyLists).where("pageId").equalTo("sourceId") {
(page, adjacent, out: Collector[Page]) =>
val targets = adjacent.targetIds
val len = targets.length
adjacent.targetIds foreach { t => out.collect(Page(t, page.rank /len )) }
}
// collect ranks and sum them up
.groupBy("pageId").aggregate(SUM, "rank")
// apply dampening factor
.map { p =>
Page(p.pageId, (p.rank * DAMPENING_FACTOR) + ((1 - DAMPENING_FACTOR) / numPages))
}.withForwardedFields("pageId")
// terminate if no rank update was significant
val termination = currentRanks.join(newRanks).where("pageId").equalTo("pageId") {
(current, next, out: Collector[Int]) =>
// check for significant update
if (math.abs(current.rank - next.rank) > EPSILON) out.collect(1)
}
(newRanks, termination)
}
val result = finalRanks
// emit result
if (params.has("output")) {
result.writeAsCsv(params.get("output"), "\n", " ")
// execute program
env.execute("Basic PageRank Example")
} else {
println("Printing result to stdout. Use --output to specify output path.")
result.print()
}
}
case class Link(sourceId: Long, targetId: Long)
case class Page(pageId: Long, rank: Double)
case class AdjacencyList(sourceId: Long, targetIds: Array[Long])
private def getPagesDataSet(env: ExecutionEnvironment, params: ParameterTool):
(DataSet[Long], Long) = {
if (params.has("pages") && params.has("numPages")) {
val pages = env
.readCsvFile[Tuple1[Long]](params.get("pages"), fieldDelimiter = " ", lineDelimiter = "\n")
.map(x => x._1)
(pages, params.getLong("numPages"))
} else {
println("Executing PageRank example with default pages data set.")
println("Use --pages and --numPages to specify file input.")
(env.generateSequence(1, 15), PageRankData.getNumberOfPages)
}
}
private def getLinksDataSet(env: ExecutionEnvironment, params: ParameterTool):
DataSet[Link] = {
if (params.has("links")) {
env.readCsvFile[Link](params.get("links"), fieldDelimiter = " ",
includedFields = Array(0, 1))
} else {
println("Executing PageRank example with default links data set.")
println("Use --links to specify file input.")
val edges = PageRankData.EDGES.map { case Array(v1, v2) => Link(v1.asInstanceOf[Long],
v2.asInstanceOf[Long])}
env.fromCollection(edges)
}
}
}
一些說明:
a. 轉(zhuǎn)發(fā)字段注解 withForwardedFields函數(shù):
轉(zhuǎn)發(fā)字段注解定義了輸入對象中哪些字段是在函數(shù)中不會被修改熬芜,直接轉(zhuǎn)發(fā)到output中相同位置或其他位置的圆。
用[field expressions]來確定field轉(zhuǎn)發(fā)信息叮称。 在output中轉(zhuǎn)發(fā)位置相同的filed由它們的位置來確定娱节。 確定的位置必須是input中有效和houtput 中數(shù)據(jù)類型必須相同 舉例來說稠歉, “f2”定義了java input tuple中第三個(gè)字段带饱, 它同樣等同于output tuple中第三個(gè)字段。
不做修改直接轉(zhuǎn)發(fā)到其他位置的field, 通過“filed express”來定義猿诸。 比如”f0->f2”表示 java input tuple中第一個(gè)字段將不做修改直接copy到j(luò)ava 輸出的第三個(gè)字段。 “*”可以表示整個(gè)輸入或輸出旬陡, 比如”f0->*” 表示函數(shù)的輸出就是等同于java 輸入tuple的第一個(gè)字段砰左〔迹可以在一個(gè)string中定義多個(gè)字段轉(zhuǎn)發(fā) "f0; f2->f1; f3->f2"或者多個(gè)單獨(dú)string比如"f0", "f2->f1", "f3->f2"
。 (參考:Semantic Annotations)
b. iterateWithTermination: 批量迭代函數(shù)憋他,迭代的終止條件一個(gè)達(dá)設(shè)定的迭代次數(shù)举瑰,再有就是更新集合為空蔬螟。