Flink流處理API

一、Environment

1.getExecutionEnvironment

創(chuàng)建一個執(zhí)行環(huán)境踢关,表示當前執(zhí)行程序的上下文谦铃。 如果程序是獨立調(diào)用的,則此方法返回本地執(zhí)行環(huán)境;如果從命令行客戶端調(diào)用程序以提交到集群撒会,則此方法返回此集群的執(zhí)行環(huán)境,也就是說诵肛,getExecutionEnvironment會根據(jù)查詢運行的方式?jīng)Q定返回什么樣的運行環(huán)境怔檩,是最常用的一種創(chuàng)建執(zhí)行環(huán)境的方式。

val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment

2.createLocalEnvironment

返回本地執(zhí)行環(huán)境媒吗,需要在調(diào)用時指定默認的并行度乙埃。

val env = StreamExecutionEnvironment.createLocalEnvironment(1)

3.createRemoteEnvironment

返回集群執(zhí)行環(huán)境介袜,將Jar提交到遠程服務器遇伞。需要在調(diào)用時指定JobManager的IP和端口號,并指定要在集群中運行的Jar包鸠珠。

val env = ExecutionEnvironment.createRemoteEnvironment("jobmanager-hostname", 6123,"C://jar//flink//wordcount.jar")

二跳芳、Source

創(chuàng)建一個kafka的工具類

object MyKafkaUtil {
  val prop = new Properties()

  prop.setProperty("bootstrap.servers","hadoop1:9092")
  prop.setProperty("group.id","test")

  def getConsumer(topic:String ):FlinkKafkaConsumer011[String]= {
     val myKafkaConsumer:FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String](topic, new SimpleStringSchema(), prop)
     myKafkaConsumer
  }
}

消費kafka

object StartupApp {
def main(args: Array[String]): Unit = {
       val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
       val kafkaConsumer  =MyKafkaUtil.getConsumer("GMALL_STARTUP")
       val dstream: DataStream[String] = environment.addSource(kafkaConsumer)
       dstream.print()
       environment.execute()
  }
}

Exactly-once two-phase commit
Flink通過checkpoint來保存數(shù)據(jù)是否處理完成的狀態(tài)
由JobManager協(xié)調(diào)各個TaskManager進行checkpoint存儲娄琉,checkpoint保存在 StateBackend中,默認StateBackend是內(nèi)存級的孽水,也可以改為文件級的進行持久化保存女气。
執(zhí)行過程實際上是一個兩段式提交炼鞠,每個算子執(zhí)行完成,會進行“預提交”朝扼,直到執(zhí)行完sink操作霎肯,會發(fā)起“確認提交”观游,如果執(zhí)行失敗,預提交會放棄掉允跑。
如果宕機需要通過StateBackend進行恢復吮蛹,只能恢復所有確認提交的操作。

三潮针、Transform

1.KeyBy和Reduce

spark中的reduceByKey在Flink中被分成兩個算子:KeyBy和Reduce
KeyBy:
DataStream → KeyedStream:輸入必須是Tuple類型倚喂,邏輯地將一個流拆分成不相交的分區(qū),每個分區(qū)包含具有相同key的元素焦读,在內(nèi)部以hash的形式實現(xiàn)的矗晃,KeyedStream是有狀態(tài)的宴倍。
Reduce:
KeyedStream → DataStream:一個分組數(shù)據(jù)流的聚合操作仓技,合并當前的元素和上次聚合的結(jié)果脖捻,產(chǎn)生一個新的值地沮,返回的流中包含每一次聚合的結(jié)果摩疑,而不是只返回最后一次聚合的最終結(jié)果未荒。

2.Split 和 Select

Split:
類似于Flume中的選擇器,在一個DataStream頭部加上不同的戳拆分成多個DataStream速侈。


Split.png

Select:
在splitStream中獲取一個或多個DataStream倚搬。


Select.png
val splitStream: SplitStream[StartUpLog] = startUplogDstream.split { startUplog =>
  var flags:List[String] =  null
  if ("appstore" == startUplog.ch) {
    flags = List(startUplog.ch)
  } else {
    flags = List("other" )
  }
  flags
}
val appStoreStream: DataStream[StartUpLog] = splitStream.select("appstore")
val otherStream: DataStream[StartUpLog] = splitStream.select("other")

3.Connect和 CoMap

Connect:
連接兩個數(shù)據(jù)流


Connect.png

CoMap:


CoMap.png
val connStream: ConnectedStreams[StartUpLog, StartUpLog] = appStoreStream.connect(otherStream)
val allStream: DataStream[String] = connStream.map(
  (log1: StartUpLog) => log1.ch,
  (log2: StartUpLog) => log2.ch
)

4.Union

對兩個或者兩個以上的DataStream進行union操作家卖,產(chǎn)生一個包含所有DataStream元素的新DataStream。

Connect與 Union 區(qū)別:
1.Union之前兩個流的類型必須是一樣上荡,Connect可以不一樣趴樱,在之后的coMap中再去調(diào)整成為一樣的。
2.Connect只能操作兩個流酪捡,Union可以操作多個.

Sink

1.Kafka

在kafka工具類中添加方法

def getProducer(topic:String): FlinkKafkaProducer011[String] ={
  new FlinkKafkaProducer011[String](brokerList,topic,new SimpleStringSchema())
}

主函數(shù)中添加

val myKafkaProducer: FlinkKafkaProducer011[String] = MyKafkaUtil.getProducer("channel_sum")

sumDstream.map( chCount=>chCount._1+":"+chCount._2 ).addSink(myKafkaProducer)

2.Redis

在Redis工具類中添加方法

def getRedisSink(): RedisSink[(String,String)] ={
    new RedisSink[(String,String)](conf,new MyRedisMapper)
  }

  class MyRedisMapper extends RedisMapper[(String,String)]{
    override def getCommandDescription: RedisCommandDescription = {
      new RedisCommandDescription(RedisCommand.HSET, "channel_count")
     // new RedisCommandDescription(RedisCommand.SET  )
    }
    override def getValueFromData(t: (String, String)): String = t._2
    override def getKeyFromData(t: (String, String)): String = t._1
  }

3.Elasticsearch

def  getElasticSearchSink(indexName:String):  ElasticsearchSink[String]  ={
    val esFunc = new ElasticsearchSinkFunction[String] {
      override def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer): Unit = {
        println("試圖保存:"+element)
        val jsonObj: JSONObject = JSON.parseObject(element)
        val indexRequest: IndexRequest = Requests.indexRequest().index(indexName).`type`("_doc").source(jsonObj)
        indexer.add(indexRequest)
        println("保存1條")
      }
    }

    val sinkBuilder = new ElasticsearchSink.Builder[String](httpHosts, esFunc)

    //刷新前緩沖的最大動作量
    sinkBuilder.setBulkFlushMaxActions(10)
     sinkBuilder.build()
  }

4.JDBC 自定義sink

略略略

?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末叁征,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子逛薇,更是在濱河造成了極大的恐慌捺疼,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,194評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件永罚,死亡現(xiàn)場離奇詭異啤呼,居然都是意外死亡卧秘,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,058評論 2 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來哼御,“玉大人,你說我怎么就攤上這事液肌∴露撸” “怎么了?”我有些...
    開封第一講書人閱讀 156,780評論 0 346
  • 文/不壞的土叔 我叫張陵,是天一觀的道長旁舰。 經(jīng)常有香客問我,道長绽快,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,388評論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮询兴,結(jié)果婚禮上诗舰,老公的妹妹穿的比我還像新娘。我一直安慰自己属百,他們只是感情好,可當我...
    茶點故事閱讀 65,430評論 5 384
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著厘肮,像睡著了一般耍属。 火紅的嫁衣襯著肌膚如雪示启。 梳的紋絲不亂的頭發(fā)上夫嗓,一...
    開封第一講書人閱讀 49,764評論 1 290
  • 那天锉桑,我揣著相機與錄音攻柠,去河邊找鬼冒滩。 笑死,一個胖子當著我的面吹牛士八,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播蝗茁,決...
    沈念sama閱讀 38,907評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼饭寺,長吁一口氣:“原來是場噩夢啊……” “哼艰匙!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起健霹,我...
    開封第一講書人閱讀 37,679評論 0 266
  • 序言:老撾萬榮一對情侶失蹤扬跋,失蹤者是張志新(化名)和其女友劉穎倍奢,沒想到半個月后卒煞,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體乖订,經(jīng)...
    沈念sama閱讀 44,122評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡岂丘,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,459評論 2 325
  • 正文 我和宋清朗相戀三年寨蹋,在試婚紗的時候發(fā)現(xiàn)自己被綠了钥庇。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片萤晴。...
    茶點故事閱讀 38,605評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡攀芯,死狀恐怖氧秘,靈堂內(nèi)的尸體忽然破棺而出丸相,到底是詐尸還是另有隱情座硕,我是刑警寧澤华匾,帶...
    沈念sama閱讀 34,270評論 4 329
  • 正文 年R本政府宣布原杂,位于F島的核電站际看,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,867評論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧,春花似錦阱当、人聲如沸弊添。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,734評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽飞苇。三九已至玄柠,卻和暖如春宫患,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,961評論 1 265
  • 我被黑心中介騙來泰國打工尊浓, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留基协,地道東北人泉唁。 一個月前我還...
    沈念sama閱讀 46,297評論 2 360
  • 正文 我出身青樓拴鸵,卻偏偏與公主長得像聘芜,于是被迫代替她去往敵國和親瞎饲。 傳聞我的和親對象是個殘疾皇子口叙,可洞房花燭夜當晚...
    茶點故事閱讀 43,472評論 2 348

推薦閱讀更多精彩內(nèi)容