Flink安裝部署與示例

安裝部署

Standalone Cluster

  • 安裝配置
    首先基礎(chǔ)的Java環(huán)境1.7x以上,ssh等等盖奈。
    到Flink官網(wǎng)下載好相應(yīng)的包狐援,注意hadoop和scala的版本,然后解壓等蚊俺, 并進(jìn)入解壓后的Flink根目錄逛万。然后我們對其進(jìn)行相關(guān)的配置批钠。主要涉及到的配置文件是conf/flink-conf.yaml
    設(shè)置以下比較重要的參數(shù):
jobmanager.rpc.address:10.0.0.1        # 設(shè)置成你master節(jié)點(diǎn)的IP地址
jobmanager.rpc.port:6123              
jobmanager.heap.mb:512 
taskmanager.heap.mb:1024
taskmanager.numberOfTaskSlots:2
parallelism.default:2
taskmanager.tmp.dirs:/tmp
jobmanager.web.port: 8081   # web ui端口號

將要作為worker節(jié)點(diǎn)的IP(或者是hostname)地址存放在conf/slaves文件中埋心,就像HDFS配置一樣,每個(gè)IP地址必須放在一行.
設(shè)置JAVA_HOME 環(huán)境變量,或者設(shè)置env.java.home為jdk的路徑

  • 啟動
    在flink根目錄下: bin/start-cluster.sh
  • 在已經(jīng)運(yùn)行的集群中添加JobManager/TaskManager
    bin/jobmanager.sh (start cluster)|stop|stop-all
    bin/taskmanager.sh start|stop|stop-all
  • 停止
    bin/stop-cluster.sh

Flink On YARN

要求集群環(huán)境中已正確安裝hadoop并配置好相應(yīng)的環(huán)境及變量闲坎。Flink on yarn模式不需要額外的Flink配置腰懂,只需要集群特定的一些環(huán)境變量生效即可项秉,具體:

export SCALA_HOME=/data/scala
export HADOOP_CONF_DIR=/etc/hadoop/conf
export YARN_CONF_DIR=/etc/hadoop/conf
export FLINK_HOME=/data/flink/latest

在YARN上啟動Flink主要有兩種方式:

  1. 啟動一個(gè)YARN session(Start a long-running Flink cluster on YARN)
  • 在Flink的根目錄下:bin/yarn-session.sh -n 4 -tm 1024 -s 2
    上面的命令啟動了4個(gè)TaskManager娄蔼,每個(gè)TaskManager內(nèi)存為1G,且開啟了2 TaskSlots的yarn的Flink session環(huán)境锚沸。 通過Flink的web ui可以看到啟動集群的物理和環(huán)境信息涕癣。
  • 在這個(gè)session環(huán)境下属划,可以通過flink run命令,啟動Flink任務(wù):
    flink run -m hnode4:34707 /data/flink/latest/examples/batch/WordCount.jar --input hdfs:///tmp/yarn/photo_test.csv
    一般情況下run選項(xiàng)提交作業(yè)到y(tǒng)arn绽昼,client端可以自動找到JobManager的地址须蜗,但是本人在實(shí)驗(yàn)時(shí)有問題所以通過-m 制定
  1. 直接在YARN上提交運(yùn)行Flink作業(yè)(Run a Flink job on YARN)
  • Flink同樣支持在yarn中啟動一個(gè)獨(dú)立的Flink作業(yè)。具體的命令參數(shù)如下:
    flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 1024 /data/flink/latest/examples/batch/WordCount.jar --input hdfs:///tmp/yarn/photo_test.csv --output hdfs:///tmp/yarn/result_out1
  • 這種方式如果client端斷開缭付,session是會斷開的循未。Flink提供了一種detached YARN session的妖,啟動時(shí)候加上參數(shù)-d或--detached

示例分析

  1. 流處理一
package com.jiuyan.flink.stream
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource
object WikipediaAnalysis {

  def main(args: Array[String]) {

    val see = StreamExecutionEnvironment.getExecutionEnvironment
    //val see = StreamExecutionEnvironment.createLocalEnvironment(2); //創(chuàng)建一個(gè)本地的執(zhí)行環(huán)境嫂粟,可以用于本機(jī)調(diào)試代碼

    val edits = see.addSource(new WikipediaEditsSource()) // 這是一個(gè)Wikipedia IRC log的數(shù)據(jù)流
    val result = edits.keyBy(_.getUser) 
      .timeWindow(Time.seconds(5))  //定一個(gè)5s的時(shí)間窗口
      .fold(("",0l))((acc,event) => {
        val user = event.getUser
        val diff = acc._2 + event.getByteDiff
        (user,diff)
      });
    result.print
    see.execute
  }
}

一些說明:
a. 窗口分類,按分割標(biāo)準(zhǔn)劃分:timeWindow零抬、countWindow宽涌,按窗口行為劃分:Tumbling Window护糖、Sliding Window、自定義窗口
b. flod函數(shù)锰扶,就是一個(gè)折疊操作寝受『艹危可以將KeyedStream → DataStream,如:val result: DataStream[String] = keyedStream.fold("start")((str, i) => { str + "-" + i })蹂楣。 初始為'sart'; 對于一個(gè)int值的序列 (1,2,3,4,5), 將產(chǎn)生序列 "start-1", "start-1-2", "start-1-2-3", ...
c. 如果出現(xiàn)could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent] 的異常痊土,通常是因?yàn)槌绦蛐枰粋€(gè)隱式參數(shù)(implicit parameter)墨林,導(dǎo)入scala的包信息即可,具體:import org.apache.flink.api.scala._

  1. 流處理二
    這個(gè)示例的大致邏輯:在每100毫秒產(chǎn)生一批車速事件的數(shù)據(jù)流中酌呆,計(jì)算10秒內(nèi)隙袁,汽車每行進(jìn)50米內(nèi)的最高速度。具體到實(shí)現(xiàn)邏輯猜揪,根據(jù)carId聚合坛梁,將10s的內(nèi)事件數(shù)據(jù)匯集到一個(gè)窗口划咐,當(dāng)兩個(gè)事件的距離值的差值大于50钧萍,觸發(fā)窗口計(jì)算风瘦。
package com.jiuyan.flink.stream
import java.beans.Transient
import java.util.concurrent.TimeUnit
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger
import scala.language.postfixOps
import scala.util.Random
object TopSpeedWindowing {
  case class CarEvent(carId: Int, speed: Int, distance: Double, time: Long)

  val numOfCars = 2
  val evictionSec = 10
  val triggerMeters = 50d

  def main(args: Array[String]) {
    val params = ParameterTool.fromArgs(args)//參數(shù)解析的工具

    //val env = StreamExecutionEnvironment.createLocalEnvironment(1)
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.getConfig.setGlobalJobParameters(params)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)

    val cars =
      if (params.has("input")) {
        env.readTextFile(params.get("input"))
          .map(parseMap(_))
          .map(x => CarEvent(x._1, x._2, x._3, x._4))
      } else {
        println("Executing TopSpeedWindowing example with default inputs data set.")
        println("Use --input to specify file input.")
        env.addSource(new SourceFunction[CarEvent]() {

          val speeds = Array.fill[Integer](numOfCars)(50)
          val distances = Array.fill[Double](numOfCars)(0d)
          @Transient lazy val rand = new Random()

          var isRunning:Boolean = true

          override def run(ctx: SourceContext[CarEvent]) = {
            while (isRunning) {
              Thread.sleep(100)

              for (carId <- 0 until numOfCars) {
                if (rand.nextBoolean) speeds(carId) = Math.min(100, speeds(carId) + 5)
                else speeds(carId) = Math.max(0, speeds(carId) - 5)

                distances(carId) += speeds(carId) / 3.6d
                val record = CarEvent(carId, speeds(carId),
                  distances(carId), System.currentTimeMillis)
                ctx.collect(record)
              }
            }
          }
          override def cancel(): Unit = isRunning = false
        })
      }

    val topSeed = cars
      .assignAscendingTimestamps( _.time )
      .keyBy("carId")
      .window(GlobalWindows.create) // GlobalWindow是一個(gè)全局窗口
      .evictor(TimeEvictor.of(Time.of(evictionSec * 1000, TimeUnit.MILLISECONDS))) //給定的保留時(shí)間(keep time)作為剔除規(guī)則
      .trigger(DeltaTrigger.of(triggerMeters, new DeltaFunction[CarEvent] {
        def getDelta(oldSp: CarEvent, newSp: CarEvent): Double = newSp.distance - oldSp.distance 
      }, cars.getType().createSerializer(env.getConfig)))
//      .window(Time.of(evictionSec * 1000, (car : CarEvent) => car.time))
//      .every(Delta.of[CarEvent](triggerMeters,
//          (oldSp,newSp) => newSp.distance-oldSp.distance, CarEvent(0,0,0,0)))
      .maxBy("speed")

    if (params.has("output")) {
      topSeed.writeAsText(params.get("output"))
    } else {
      println("Printing result to stdout. Use --output to specify output path.")
      topSeed.print()
    }
    env.execute("TopSpeedWindowing")
  }
  def parseMap(line : String): (Int, Int, Double, Long) = {
    val record = line.substring(1, line.length - 1).split(",")
    (record(0).toInt, record(1).toInt, record(2).toDouble, record(3).toLong)
  }
}

一些說明:
a. 窗口的確定,一般需要定義窗口的類型(GlobalWindows)瞬雹,窗口元素的剔除條件(TimeEvictor)酗捌,以及窗口的出發(fā)條件(DeltaTrigger),以及序列化方式尚镰。DeltaTrigger是基于DeltaFunction和一個(gè)給定的閾值觸發(fā)哪廓,該觸發(fā)器在最后一個(gè)到達(dá)元素和當(dāng)前元素之間計(jì)算一個(gè)delta值跟給定的閾值比較撩独,如果高于給定的閾值账月,則觸發(fā)局齿。
b. 在帶key的數(shù)據(jù)流上應(yīng)用window操作橄登,在無key的數(shù)據(jù)流上應(yīng)用windowAll谣妻。在帶key的數(shù)據(jù)流上以及進(jìn)行并行計(jì)算充坑,而非key的數(shù)據(jù)流不可也榄。
c. 窗口信息參考

  1. 批處理
    這個(gè)示例描述是基礎(chǔ)的page rank算法棵介。
    對于算法的一些背景信息,可以參考:http://www.cnblogs.com/rubinorth/p/5799848.html
    對于Markov(馬可夫)過程的信息可參考: http://blog.csdn.net/weaponsun/article/details/50007411
package com.jiuyan.flink.batch
import java.lang.Iterable
import org.apache.flink.api.common.functions.GroupReduceFunction
import org.apache.flink.api.java.aggregation.Aggregations.SUM
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
import org.apache.flink.util.Collector
import scala.collection.JavaConverters._
object PageRankBasic {

  private final val DAMPENING_FACTOR: Double = 0.85
  private final val EPSILON: Double = 0.0001

  def main(args: Array[String]) {

    val params: ParameterTool = ParameterTool.fromArgs(args)

    // set up execution environment
    //val env = ExecutionEnvironment.getExecutionEnvironment
    val env = ExecutionEnvironment.createLocalEnvironment(2)

    // make parameters available in the web interface
    env.getConfig.setGlobalJobParameters(params)

    // read input data
    val (pages, numPages) = getPagesDataSet(env, params)
    val links = getLinksDataSet(env, params)
    val maxIterations = params.getInt("iterations", 10)

    // assign initial ranks to pages
    val pagesWithRanks = pages.map(p => Page(p, 1.0 / numPages)).withForwardedFields("*->pageId")   //表示map的輸入轉(zhuǎn)發(fā)到Page的pageId字段
    // build adjacency list from link input
    val adjacencyLists = links
      .groupBy("sourceId").reduceGroup( new GroupReduceFunction[Link, AdjacencyList] {
        override def reduce(values: Iterable[Link], out: Collector[AdjacencyList]): Unit = {
          var outputId = -1L
          val outputList = values.asScala map { t => outputId = t.sourceId; t.targetId }
          out.collect(new AdjacencyList(outputId, outputList.toArray))
        }
      })

    // start iteration
    val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) {
      currentRanks =>
        val newRanks = currentRanks
          // distribute ranks to target pages
          .join(adjacencyLists).where("pageId").equalTo("sourceId") {
            (page, adjacent, out: Collector[Page]) =>
              val targets = adjacent.targetIds
              val len = targets.length
              adjacent.targetIds foreach { t => out.collect(Page(t, page.rank /len )) }
          }
          // collect ranks and sum them up
          .groupBy("pageId").aggregate(SUM, "rank")
          // apply dampening factor
          .map { p =>
            Page(p.pageId, (p.rank * DAMPENING_FACTOR) + ((1 - DAMPENING_FACTOR) / numPages))
          }.withForwardedFields("pageId")

        // terminate if no rank update was significant
        val termination = currentRanks.join(newRanks).where("pageId").equalTo("pageId") {
          (current, next, out: Collector[Int]) =>
            // check for significant update
            if (math.abs(current.rank - next.rank) > EPSILON) out.collect(1)
        }
        (newRanks, termination)
    }
    val result = finalRanks

    // emit result
    if (params.has("output")) {
      result.writeAsCsv(params.get("output"), "\n", " ")
      // execute program
      env.execute("Basic PageRank Example")
    } else {
      println("Printing result to stdout. Use --output to specify output path.")
      result.print()
    }
  }

  case class Link(sourceId: Long, targetId: Long)
  case class Page(pageId: Long, rank: Double)
  case class AdjacencyList(sourceId: Long, targetIds: Array[Long])

  private def getPagesDataSet(env: ExecutionEnvironment, params: ParameterTool):
                     (DataSet[Long], Long) = {
    if (params.has("pages") && params.has("numPages")) {
      val pages = env
        .readCsvFile[Tuple1[Long]](params.get("pages"), fieldDelimiter = " ", lineDelimiter = "\n")
        .map(x => x._1)
      (pages, params.getLong("numPages"))
    } else {
      println("Executing PageRank example with default pages data set.")
      println("Use --pages and --numPages to specify file input.")
      (env.generateSequence(1, 15), PageRankData.getNumberOfPages)
    }
  }
  private def getLinksDataSet(env: ExecutionEnvironment, params: ParameterTool):
                      DataSet[Link] = {
    if (params.has("links")) {
      env.readCsvFile[Link](params.get("links"), fieldDelimiter = " ",
        includedFields = Array(0, 1))
    } else {
      println("Executing PageRank example with default links data set.")
      println("Use --links to specify file input.")
      val edges = PageRankData.EDGES.map { case Array(v1, v2) => Link(v1.asInstanceOf[Long],
        v2.asInstanceOf[Long])}
      env.fromCollection(edges)
    }
  }
}

一些說明:
a. 轉(zhuǎn)發(fā)字段注解 withForwardedFields函數(shù):
轉(zhuǎn)發(fā)字段注解定義了輸入對象中哪些字段是在函數(shù)中不會被修改熬芜,直接轉(zhuǎn)發(fā)到output中相同位置或其他位置的圆。
用[field expressions]來確定field轉(zhuǎn)發(fā)信息叮称。 在output中轉(zhuǎn)發(fā)位置相同的filed由它們的位置來確定娱节。 確定的位置必須是input中有效和houtput 中數(shù)據(jù)類型必須相同 舉例來說稠歉, “f2”定義了java input tuple中第三個(gè)字段带饱, 它同樣等同于output tuple中第三個(gè)字段。
不做修改直接轉(zhuǎn)發(fā)到其他位置的field, 通過“filed express”來定義猿诸。 比如”f0->f2”表示 java input tuple中第一個(gè)字段將不做修改直接copy到j(luò)ava 輸出的第三個(gè)字段。 “*”可以表示整個(gè)輸入或輸出旬陡, 比如”f0->*” 表示函數(shù)的輸出就是等同于java 輸入tuple的第一個(gè)字段砰左〔迹可以在一個(gè)string中定義多個(gè)字段轉(zhuǎn)發(fā) "f0; f2->f1; f3->f2"或者多個(gè)單獨(dú)string比如"f0", "f2->f1", "f3->f2"
。 (參考:Semantic Annotations
b. iterateWithTermination: 批量迭代函數(shù)憋他,迭代的終止條件一個(gè)達(dá)設(shè)定的迭代次數(shù)举瑰,再有就是更新集合為空蔬螟。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市耸序,隨后出現(xiàn)的幾起案子坎怪,更是在濱河造成了極大的恐慌,老刑警劉巖嘁酿,帶你破解...
    沈念sama閱讀 216,372評論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件闹司,死亡現(xiàn)場離奇詭異沐飘,居然都是意外死亡耐朴,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評論 3 392
  • 文/潘曉璐 我一進(jìn)店門铐刘,熙熙樓的掌柜王于貴愁眉苦臉地迎上來滨达,“玉大人俯艰,你說我怎么就攤上這事锌订×酒” “怎么了?”我有些...
    開封第一講書人閱讀 162,415評論 0 353
  • 文/不壞的土叔 我叫張陵芹关,是天一觀的道長侥衬。 經(jīng)常有香客問我,道長直颅,這世上最難降的妖魔是什么怀樟? 我笑而不...
    開封第一講書人閱讀 58,157評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮往堡,結(jié)果婚禮上械荷,老公的妹妹穿的比我還像新娘。我一直安慰自己虑灰,他們只是感情好吨瞎,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,171評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著瘩缆,像睡著了一般关拒。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上庸娱,一...
    開封第一講書人閱讀 51,125評論 1 297
  • 那天着绊,我揣著相機(jī)與錄音熟尉,去河邊找鬼归露。 笑死,一個(gè)胖子當(dāng)著我的面吹牛斤儿,可吹牛的內(nèi)容都是我干的剧包。 我是一名探鬼主播,決...
    沈念sama閱讀 40,028評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼往果,長吁一口氣:“原來是場噩夢啊……” “哼疆液!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起陕贮,我...
    開封第一講書人閱讀 38,887評論 0 274
  • 序言:老撾萬榮一對情侶失蹤堕油,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后肮之,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體掉缺,經(jīng)...
    沈念sama閱讀 45,310評論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,533評論 2 332
  • 正文 我和宋清朗相戀三年戈擒,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了眶明。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,690評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡筐高,死狀恐怖搜囱,靈堂內(nèi)的尸體忽然破棺而出丑瞧,到底是詐尸還是另有隱情,我是刑警寧澤犬辰,帶...
    沈念sama閱讀 35,411評論 5 343
  • 正文 年R本政府宣布嗦篱,位于F島的核電站,受9級特大地震影響幌缝,放射性物質(zhì)發(fā)生泄漏灸促。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,004評論 3 325
  • 文/蒙蒙 一涵卵、第九天 我趴在偏房一處隱蔽的房頂上張望浴栽。 院中可真熱鬧,春花似錦轿偎、人聲如沸典鸡。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽萝玷。三九已至,卻和暖如春昆婿,著一層夾襖步出監(jiān)牢的瞬間球碉,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,812評論 1 268
  • 我被黑心中介騙來泰國打工仓蛆, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留睁冬,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,693評論 2 368
  • 正文 我出身青樓看疙,卻偏偏與公主長得像豆拨,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子能庆,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,577評論 2 353

推薦閱讀更多精彩內(nèi)容