Flink實時處理之DataStream

Flink的API概覽

<v:shapetype id="_x0000_t75" stroked="f" filled="f" path="m@4@5l@4@11@9@11@9@5xe" o:preferrelative="t" o:spt="75" coordsize="21600,21600"><v:stroke joinstyle="miter"><v:formulas></v:formulas><v:path o:connecttype="rect" gradientshapeok="t" o:extrusionok="f"></v:path></v:stroke></v:shapetype><v:shape id="內(nèi)容占位符_x0020_1" style="width:415.2pt;height:222pt;visibility:visible;mso-wrap-style:square" type="#_x0000_t75" o:spid="_x0000_i1027"><v:imagedata o:title="" src="file:///C:\Users\chenning\AppData\Local\Temp\msohtmlclip1\01\clip_image001.png"></v:imagedata></v:shape>

1桐磁、dataStream的數(shù)據(jù)源

1、socket數(shù)據(jù)源

從socket當中接收數(shù)據(jù)讲岁,并統(tǒng)計最近5秒鐘每個單詞出現(xiàn)的次數(shù)

第一步:node01開發(fā)socket服務(wù)

node01執(zhí)行以下命令開啟socket服務(wù)

nc -lk 9000

第二步:開發(fā)代碼實現(xiàn)

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.api.windowing.time.Time object FlinkSource1 { def main(args: Array[String]): Unit = { //獲取程序入口類val streamExecution: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val socketText: DataStream[String] = streamExecution.socketTextStream("node01",9000) //注意:必須要添加這一行隱式轉(zhuǎn)行我擂,否則下面的flatmap方法執(zhí)行會報錯import org.apache.flink.api.scala._ val result: DataStream[(String, Int)] = socketText.flatMap(x => x.split(" "))
.map(x => (x, 1))
.keyBy(0)
.timeWindow(Time.seconds(5), Time.seconds(5)) //統(tǒng)計最近5秒鐘的數(shù)據(jù).sum(1) //打印結(jié)果數(shù)據(jù)result.print().setParallelism(1) //執(zhí)行程序streamExecution.execute()
}
}

2、文件數(shù)據(jù)源

讀取hdfs路徑下面所有的文件數(shù)據(jù)進行處理

第一步:添加maven依賴

<repositories> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories>

<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.0-mr1-cdh5.14.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.0-cdh5.14.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.6.0-cdh5.14.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.6.0-cdh5.14.2</version> </dependency>

第二步:代碼實現(xiàn)

object FlinkSource2 { def main(args: Array[String]): Unit = { val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ //從文本讀取數(shù)據(jù)val hdfStream: DataStream[String] = executionEnvironment.readTextFile("hdfs://node01:8020/flink_input/") val result: DataStream[(String, Int)] = hdfStream.flatMap(x => x.split(" ")).map(x =>(x,1)).keyBy(0).sum(1)

result.print().setParallelism(1)

executionEnvironment.execute("hdfsSource")
}
}

3缓艳、從一個已經(jīng)存在的集合當中獲取數(shù)據(jù)

代碼實現(xiàn)

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} object FlinkSource3 { def main(args: Array[String]): Unit = { val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ val value: DataStream[String] = environment.fromElements[String]("hello world","spark flink") val result2: DataStream[(String, Int)] = value.flatMap(x => x.split(" ")).map(x =>(x,1)).keyBy(0).sum(1)
result2.print().setParallelism(1)
environment.execute()
}
}

4校摩、自定義數(shù)據(jù)源

如果flink自帶的一些數(shù)據(jù)源還不夠的工作使用的話,我們還可以自定義數(shù)據(jù)源

flink提供了大量的已經(jīng)實現(xiàn)好的source方法阶淘,你也可以自定義source

通過實現(xiàn)sourceFunction接口來自定義source衙吩,

或者你也可以通過實現(xiàn)ParallelSourceFunction 接口 or 繼承RichParallelSourceFunction 來自定義source。

1溪窒、通過ParallelSourceFunction 來實現(xiàn)自定義數(shù)據(jù)源

如果需要實現(xiàn)一個多并行度的數(shù)據(jù)源坤塞,那么我們可以通過實現(xiàn)ParallelSourceFunction 接口或者繼承RichParallelSourceFunction 來自定義有并行度的source。

第一步:使用scala代碼實現(xiàn)ParallelSourceFunction接口

import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction} class MyParalleSource extends ParallelSourceFunction[String] { var isRunning:Boolean = **true

override def** run(sourceContext: SourceFunction.SourceContext[String]): Unit = { while (true){
sourceContext.collect("hello world")
}
} override def cancel(): Unit = { isRunning = false }
}

第二步:使用自定義數(shù)據(jù)源

object FlinkSource5 { def main(args: Array[String]): Unit = { val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ val sourceStream: DataStream[String] = environment.addSource(new MyParalleSource) val result: DataStream[(String, Int)] = sourceStream.flatMap(x => x.split(" ")).map(x => (x, 1))
.keyBy(0)
.sum(1)
result.print().setParallelism(2)
environment.execute("paralleSource")
}
}

2澈蚌、dataStream的算子介紹

官網(wǎng)算子介紹:

https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/index.html

flink當中對于實時處理摹芙,有很多的算子,我們可以來看看常用的算子主要有哪些宛瞄,dataStream當中的算子主要分為三大類浮禾,

Transformations****:轉(zhuǎn)換的算子,都是懶執(zhí)行的份汗,只有真正碰到sink****的算子才會真正加載執(zhí)行

partition****:對數(shù)據(jù)進行重新分區(qū)等操作

Sink****:數(shù)據(jù)下沉目的地

<v:shape id="圖片_x0020_14" style="width:415.2pt;height:235.2pt;visibility:visible;mso-wrap-style:square" type="#_x0000_t75" alt="說明: C:\Users\admin\Desktop\20190214114209629.png" o:spid="_x0000_i1026"><v:imagedata o:title="20190214114209629" src="file:///C:\Users\chenning\AppData\Local\Temp\msohtmlclip1\01\clip_image002.png"></v:imagedata></v:shape>

DataStream的Transformations算子

l map:輸入一個元素盈电,然后返回一個元素,中間可以做一些清洗轉(zhuǎn)換等操作

l flatmap:輸入一個元素杯活,可以返回零個匆帚,一個或者多個元素

l filter:過濾函數(shù),對傳入的數(shù)據(jù)進行判斷轩猩,符合條件的數(shù)據(jù)會被留下

l keyBy:根據(jù)指定的key進行分組卷扮,相同key的數(shù)據(jù)會進入同一個分區(qū)【典型用法見備注】

l reduce:對數(shù)據(jù)進行聚合操作,結(jié)合當前元素和上一次reduce返回的值進行聚合操作均践,然后返回一個新的值

l aggregations:sum(),min(),max()等

l window:在后面單獨詳解

l Union:合并多個流晤锹,新的流會包含所有流中的數(shù)據(jù),但是union是一個限制彤委,就是所有合并的流類型必須是一致的鞭铆。

l Connect:和union類似,但是只能連接兩個流,兩個流的數(shù)據(jù)類型可以不同车遂,會對兩個流中的數(shù)據(jù)應(yīng)用不同的處理方法封断。

l CoMap, CoFlatMap:在ConnectedStreams中需要使用這種函數(shù),類似于map和flatmap

l Split:根據(jù)規(guī)則把一個數(shù)據(jù)流切分為多個流

l Select:和split配合使用舶担,選擇切分后的流

案例一:使用union算子來合并多個DataStream

獲取兩個dataStream坡疼,然后使用union將兩個dataStream進行合并

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} object FlinkUnion { def main(args: Array[String]): Unit = { val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ //獲取第一個dataStream val firstStream: DataStream[String] = environment.fromElements("hello world","test scala") //獲取第二個dataStream val secondStream: DataStream[String] = environment.fromElements("second test","spark flink") //將兩個流進行合并起來val unionAll: DataStream[String] = firstStream.union(secondStream) //結(jié)果不做任何處理val unionResult: DataStream[String] = unionAll.map(x => { // println(x) x
}) //調(diào)用sink算子,打印輸出結(jié)果unionResult.print().setParallelism(1) //開始運行environment.execute()
}
}

案例二:使用connect實現(xiàn)不同類型的DataStream進行連接

import org.apache.flink.streaming.api.scala.{ConnectedStreams, DataStream, StreamExecutionEnvironment} object FlinkConnect { def main(args: Array[String]): Unit = { //獲取程序入口類val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //導入隱式轉(zhuǎn)換的包import org.apache.flink.api.scala._ //定義string類型的dataStream val strStream: DataStream[String] = environment.fromElements("hello world","abc test") //定義int類型的dataStream val intStream: DataStream[Int] = environment.fromElements(1,2,3,4,5) //兩個流進行connect操作val connectedStream: ConnectedStreams[String, Int] = strStream.connect(intStream) //通過map對數(shù)據(jù)進行處理衣陶,傳入兩個函數(shù)val connectResult: DataStream[Any] = connectedStream.map(x =>{ x + "abc"},y =>{ y * 2 })
connectResult.print().setParallelism(1)
environment.execute("connect stream")
}
}

案例三:使用split將一個DataStream切成多個DataStream

import java.{lang, util} import org.apache.flink.streaming.api.collector.selector.OutputSelector import org.apache.flink.streaming.api.scala.{DataStream, SplitStream, StreamExecutionEnvironment} object FlinkSplit { def main(args: Array[String]): Unit = { val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ //獲取第一個dataStream val resultDataStream: DataStream[String] = environment.fromElements("hello world","test spark","spark flink") //通過split來對我們的流進行切分操作val splitStream: SplitStream[String] = resultDataStream.split(new OutputSelector[String] { override def select(out: String): lang.Iterable[String] = { val strings = new util.ArrayListString if (out.contains("hello")) { //如果包含hello柄瑰,那么我們就給這個流起名字叫做hello strings.add("hello")
} else {
strings.add("other")
}
strings
}
}) //對我么的stream進行選擇val helloStream: DataStream[String] = splitStream.select("hello") //打印包含hello的所有的字符串helloStream.print().setParallelism(1)
environment.execute()
}
}

DataStream的Partition算子

https://blog.csdn.net/lmalds/article/details/60575205 flink的各種算子介紹

partition算子允許我們對數(shù)據(jù)進行重新分區(qū),或者解決數(shù)據(jù)傾斜等問題

l Random partitioning:隨機分區(qū)

? dataStream.shuffle()

l Rebalancing:對數(shù)據(jù)集進行再平衡剪况,重分區(qū)教沾,消除數(shù)據(jù)傾斜

? dataStream.rebalance()

l Rescaling:Rescaling是通過執(zhí)行oepration算子來實現(xiàn)的。由于這種方式僅發(fā)生在一個單一的節(jié)點译断,因此沒有跨網(wǎng)絡(luò)的數(shù)據(jù)傳輸授翻。

? dataStream.rescale()

<v:shape id="圖片_x0020_34" style="width:279pt;height:196.2pt;visibility:visible;mso-wrap-style:square" type="#_x0000_t75" alt="說明: C:\Users\admin\Desktop\1642492-20190329155510739-1792670965.png" o:spid="_x0000_i1025"><v:imagedata o:title="1642492-20190329155510739-1792670965" src="file:///C:\Users\chenning\AppData\Local\Temp\msohtmlclip1\01\clip_image003.png"></v:imagedata></v:shape>

l Custom partitioning:自定義分區(qū)

? 自定義分區(qū)需要實現(xiàn)Partitioner接口

? dataStream.partitionCustom(partitioner, "someKey")

? 或者dataStream.partitionCustom(partitioner, 0);

l Broadcasting:廣播變量,后面詳細講解

需求:對我們filter過后的數(shù)據(jù)進行重新分區(qū)

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} object FlinkPartition { def main(args: Array[String]): Unit = { val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ val dataStream: DataStream[String] = environment.fromElements("hello world","test spark","abc hello","hello flink") val resultStream: DataStream[(String, Int)] = dataStream.filter(x => x.contains("hello")) // .shuffle //隨機的重新分發(fā)數(shù)據(jù),上游的數(shù)據(jù)孙咪,隨機的發(fā)送到下游的分區(qū)里面去 // .rescale .rebalance //對數(shù)據(jù)重新進行分區(qū)堪唐,涉及到shuffle的過程.flatMap(x => x.split(" "))
.map(x => (x, 1))
.keyBy(0)
.sum(1)

resultStream.print().setParallelism(1)
environment.execute()
}
}

案例實戰(zhàn):自定義分區(qū)策略

如果以上的幾種分區(qū)方式還沒法滿足我們的需求,我們還可以自定義分區(qū)策略來實現(xiàn)數(shù)據(jù)的分區(qū)

需求:自定義分區(qū)策略该贾,實現(xiàn)不同分區(qū)的數(shù)據(jù)發(fā)送到不同分區(qū)里面去進行處理羔杨,將包含hello的字符串發(fā)送到一個分區(qū)里面去,其他的發(fā)送到另外一個分區(qū)里面去

第一步:自定義分區(qū)類

import org.apache.flink.api.common.functions.Partitioner class MyPartitioner extends Partitioner[String]{ override def partition(word: String, num: Int): Int = {
println("****分區(qū)個數(shù)為****" + num) if(word.contains("hello")){ 0 }else{ 1 }
}
}

第二步:代碼實現(xiàn)進行分區(qū)

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} object FlinkCustomerPartition { def main(args: Array[String]): Unit = { val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //設(shè)置我們的分區(qū)數(shù)杨蛋,如果不設(shè)置,默認使用CPU核數(shù)作為分區(qū)個數(shù)

environment.setParallelism(2) import org.apache.flink.api.scala._ //獲取dataStream val sourceStream: DataStream[String] = environment.fromElements("hello world","spark flink","hello world","hive hadoop") val rePartition: DataStream[String] = sourceStream.partitionCustom(new MyPartitioner,x => x +"")
rePartition.map(x =>{
println("****數(shù)據(jù)的****key****為****" + x + "****線程為****" + Thread.currentThread().getId)
x
})
rePartition.print()
environment.execute()

}
}

DataStream的sink算子

https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/

l writeAsText():將元素以字符串形式逐行寫入理澎,這些字符串通過調(diào)用每個元素的toString()方法來獲取

l print() / printToErr():打印每個元素的toString()方法的值到標準輸出或者標準錯誤輸出流中

l 自定義輸出addSink【kafka逞力、redis】

我們可以通過sink算子,將我們的數(shù)據(jù)發(fā)送到指定的地方去糠爬,例如kafka或者redis或者hbase等等寇荧,前面我們已經(jīng)使用過將數(shù)據(jù)打印出來調(diào)用print()方法,接下來我們來實現(xiàn)自定義sink將我們的數(shù)據(jù)發(fā)送到redis里面去

第一步:導入flink整合redis的jar包

** <dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.0</version> </dependency>

第二步:代碼開發(fā)

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.connectors.redis.RedisSink import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper} object Stream2Redis { def main(args: Array[String]): Unit = { //獲取程序入口類val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ //組織數(shù)據(jù)val streamSource: DataStream[String] = executionEnvironment.fromElements("hello world","key value") //將數(shù)據(jù)包裝成為key,value對形式的tuple val tupleValue: DataStream[(String, String)] = streamSource.map(x =>(x.split(" ")(0),x.split(" ")(1))) val builder = new FlinkJedisPoolConfig.Builder

builder.setHost("node03")
builder.setPort(6379)

builder.setTimeout(5000)
builder.setMaxTotal(50)
builder.setMaxIdle(10)
builder.setMinIdle(5) val config: FlinkJedisPoolConfig = builder.build() //獲取redis sink val redisSink = new RedisSink[Tuple2[String,String]](config,new MyRedisMapper) //使用我們自定義的sink tupleValue.addSink(redisSink) //執(zhí)行程序executionEnvironment.execute("redisSink")
}
} class MyRedisMapper extends RedisMapper[Tuple2[String,String]]{ override def getCommandDescription: RedisCommandDescription = { new RedisCommandDescription(RedisCommand.SET)

} override def getKeyFromData(data: (String, String)): String = {
data._1

} override def getValueFromData(data: (String, String)): String = {
data._2

}
}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末执隧,一起剝皮案震驚了整個濱河市揩抡,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌镀琉,老刑警劉巖峦嗤,帶你破解...
    沈念sama閱讀 221,273評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異屋摔,居然都是意外死亡烁设,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,349評論 3 398
  • 文/潘曉璐 我一進店門钓试,熙熙樓的掌柜王于貴愁眉苦臉地迎上來装黑,“玉大人副瀑,你說我怎么就攤上這事×堤罚” “怎么了糠睡?”我有些...
    開封第一講書人閱讀 167,709評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長疚颊。 經(jīng)常有香客問我铜幽,道長,這世上最難降的妖魔是什么串稀? 我笑而不...
    開封第一講書人閱讀 59,520評論 1 296
  • 正文 為了忘掉前任除抛,我火速辦了婚禮,結(jié)果婚禮上母截,老公的妹妹穿的比我還像新娘到忽。我一直安慰自己,他們只是感情好清寇,可當我...
    茶點故事閱讀 68,515評論 6 397
  • 文/花漫 我一把揭開白布喘漏。 她就那樣靜靜地躺著,像睡著了一般华烟。 火紅的嫁衣襯著肌膚如雪翩迈。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,158評論 1 308
  • 那天盔夜,我揣著相機與錄音负饲,去河邊找鬼。 笑死喂链,一個胖子當著我的面吹牛返十,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播椭微,決...
    沈念sama閱讀 40,755評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼洞坑,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了蝇率?” 一聲冷哼從身側(cè)響起迟杂,我...
    開封第一講書人閱讀 39,660評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎本慕,沒想到半個月后排拷,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,203評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡间狂,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,287評論 3 340
  • 正文 我和宋清朗相戀三年攻泼,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,427評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡忙菠,死狀恐怖何鸡,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情牛欢,我是刑警寧澤骡男,帶...
    沈念sama閱讀 36,122評論 5 349
  • 正文 年R本政府宣布,位于F島的核電站傍睹,受9級特大地震影響隔盛,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜拾稳,卻給世界環(huán)境...
    茶點故事閱讀 41,801評論 3 333
  • 文/蒙蒙 一吮炕、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧访得,春花似錦龙亲、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,272評論 0 23
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至搜骡,卻和暖如春拂盯,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背记靡。 一陣腳步聲響...
    開封第一講書人閱讀 33,393評論 1 272
  • 我被黑心中介騙來泰國打工谈竿, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人簸呈。 一個月前我還...
    沈念sama閱讀 48,808評論 3 376
  • 正文 我出身青樓榕订,卻偏偏與公主長得像,于是被迫代替她去往敵國和親蜕便。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,440評論 2 359

推薦閱讀更多精彩內(nèi)容

  • 實時處理代碼開發(fā)開發(fā)flink代碼贩幻,實現(xiàn)統(tǒng)計socket當中的單詞數(shù)量第一步:創(chuàng)建maven工程轿腺,導入jar包<d...
    我還不夠強閱讀 1,874評論 0 0
  • 對于流式處理,如果我們需要求取總和丛楚,平均值族壳,或者最大值,最小值等趣些,是做不到的仿荆,因為數(shù)據(jù)一直在源源不斷的產(chǎn)生,即數(shù)據(jù)...
    我還不夠強閱讀 2,032評論 0 1
  • Spark學習筆記 Data Source->Kafka->Spark Streaming->Parquet->S...
    哎喲喂嘍閱讀 6,631評論 0 51
  • 1 注意import的StreamExecutionEnvironment // java 的頭是 import ...
    君劍閱讀 9,235評論 3 3
  • 1. Overview: Structured Streaming是基于Spark SQL引擎的可擴展、具有容錯性...
    奉先閱讀 2,891評論 0 1