Flink的API概覽
<v:shapetype id="_x0000_t75" stroked="f" filled="f" path="m@4@5l@4@11@9@11@9@5xe" o:preferrelative="t" o:spt="75" coordsize="21600,21600"><v:stroke joinstyle="miter"><v:formulas></v:formulas><v:path o:connecttype="rect" gradientshapeok="t" o:extrusionok="f"></v:path></v:stroke></v:shapetype><v:shape id="內(nèi)容占位符_x0020_1" style="width:415.2pt;height:222pt;visibility:visible;mso-wrap-style:square" type="#_x0000_t75" o:spid="_x0000_i1027"><v:imagedata o:title="" src="file:///C:\Users\chenning\AppData\Local\Temp\msohtmlclip1\01\clip_image001.png"></v:imagedata></v:shape>
1桐磁、dataStream的數(shù)據(jù)源
1、socket數(shù)據(jù)源
從socket當中接收數(shù)據(jù)讲岁,并統(tǒng)計最近5秒鐘每個單詞出現(xiàn)的次數(shù)
第一步:node01開發(fā)socket服務(wù)
node01執(zhí)行以下命令開啟socket服務(wù)
nc -lk 9000
第二步:開發(fā)代碼實現(xiàn)
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.api.windowing.time.Time object FlinkSource1 { def main(args: Array[String]): Unit = { //獲取程序入口類val streamExecution: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val socketText: DataStream[String] = streamExecution.socketTextStream("node01",9000) //注意:必須要添加這一行隱式轉(zhuǎn)行我擂,否則下面的flatmap方法執(zhí)行會報錯import org.apache.flink.api.scala._ val result: DataStream[(String, Int)] = socketText.flatMap(x => x.split(" "))
.map(x => (x, 1))
.keyBy(0)
.timeWindow(Time.seconds(5), Time.seconds(5)) //統(tǒng)計最近5秒鐘的數(shù)據(jù).sum(1) //打印結(jié)果數(shù)據(jù)result.print().setParallelism(1) //執(zhí)行程序streamExecution.execute()
}
}
2、文件數(shù)據(jù)源
讀取hdfs路徑下面所有的文件數(shù)據(jù)進行處理
第一步:添加maven依賴
<repositories> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories>
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.0-mr1-cdh5.14.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.0-cdh5.14.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.6.0-cdh5.14.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.6.0-cdh5.14.2</version> </dependency>
第二步:代碼實現(xiàn)
object FlinkSource2 { def main(args: Array[String]): Unit = { val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ //從文本讀取數(shù)據(jù)val hdfStream: DataStream[String] = executionEnvironment.readTextFile("hdfs://node01:8020/flink_input/") val result: DataStream[(String, Int)] = hdfStream.flatMap(x => x.split(" ")).map(x =>(x,1)).keyBy(0).sum(1)
result.print().setParallelism(1)
executionEnvironment.execute("hdfsSource")
}
}
3缓艳、從一個已經(jīng)存在的集合當中獲取數(shù)據(jù)
代碼實現(xiàn)
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} object FlinkSource3 { def main(args: Array[String]): Unit = { val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ val value: DataStream[String] = environment.fromElements[String]("hello world","spark flink") val result2: DataStream[(String, Int)] = value.flatMap(x => x.split(" ")).map(x =>(x,1)).keyBy(0).sum(1)
result2.print().setParallelism(1)
environment.execute()
}
}
4校摩、自定義數(shù)據(jù)源
如果flink自帶的一些數(shù)據(jù)源還不夠的工作使用的話,我們還可以自定義數(shù)據(jù)源
flink提供了大量的已經(jīng)實現(xiàn)好的source方法阶淘,你也可以自定義source
通過實現(xiàn)sourceFunction接口來自定義source衙吩,
或者你也可以通過實現(xiàn)ParallelSourceFunction 接口 or 繼承RichParallelSourceFunction 來自定義source。
1溪窒、通過ParallelSourceFunction 來實現(xiàn)自定義數(shù)據(jù)源
如果需要實現(xiàn)一個多并行度的數(shù)據(jù)源坤塞,那么我們可以通過實現(xiàn)ParallelSourceFunction 接口或者繼承RichParallelSourceFunction 來自定義有并行度的source。
第一步:使用scala代碼實現(xiàn)ParallelSourceFunction接口
import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction} class MyParalleSource extends ParallelSourceFunction[String] { var isRunning:Boolean = **true
override def** run(sourceContext: SourceFunction.SourceContext[String]): Unit = { while (true){
sourceContext.collect("hello world")
}
} override def cancel(): Unit = { isRunning = false }
}
第二步:使用自定義數(shù)據(jù)源
object FlinkSource5 { def main(args: Array[String]): Unit = { val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ val sourceStream: DataStream[String] = environment.addSource(new MyParalleSource) val result: DataStream[(String, Int)] = sourceStream.flatMap(x => x.split(" ")).map(x => (x, 1))
.keyBy(0)
.sum(1)
result.print().setParallelism(2)
environment.execute("paralleSource")
}
}
2澈蚌、dataStream的算子介紹
官網(wǎng)算子介紹:
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/index.html
flink當中對于實時處理摹芙,有很多的算子,我們可以來看看常用的算子主要有哪些宛瞄,dataStream當中的算子主要分為三大類浮禾,
Transformations****:轉(zhuǎn)換的算子,都是懶執(zhí)行的份汗,只有真正碰到sink****的算子才會真正加載執(zhí)行
partition****:對數(shù)據(jù)進行重新分區(qū)等操作
Sink****:數(shù)據(jù)下沉目的地
<v:shape id="圖片_x0020_14" style="width:415.2pt;height:235.2pt;visibility:visible;mso-wrap-style:square" type="#_x0000_t75" alt="說明: C:\Users\admin\Desktop\20190214114209629.png" o:spid="_x0000_i1026"><v:imagedata o:title="20190214114209629" src="file:///C:\Users\chenning\AppData\Local\Temp\msohtmlclip1\01\clip_image002.png"></v:imagedata></v:shape>
DataStream的Transformations算子
l map:輸入一個元素盈电,然后返回一個元素,中間可以做一些清洗轉(zhuǎn)換等操作
l flatmap:輸入一個元素杯活,可以返回零個匆帚,一個或者多個元素
l filter:過濾函數(shù),對傳入的數(shù)據(jù)進行判斷轩猩,符合條件的數(shù)據(jù)會被留下
l keyBy:根據(jù)指定的key進行分組卷扮,相同key的數(shù)據(jù)會進入同一個分區(qū)【典型用法見備注】
l reduce:對數(shù)據(jù)進行聚合操作,結(jié)合當前元素和上一次reduce返回的值進行聚合操作均践,然后返回一個新的值
l aggregations:sum(),min(),max()等
l window:在后面單獨詳解
l Union:合并多個流晤锹,新的流會包含所有流中的數(shù)據(jù),但是union是一個限制彤委,就是所有合并的流類型必須是一致的鞭铆。
l Connect:和union類似,但是只能連接兩個流,兩個流的數(shù)據(jù)類型可以不同车遂,會對兩個流中的數(shù)據(jù)應(yīng)用不同的處理方法封断。
l CoMap, CoFlatMap:在ConnectedStreams中需要使用這種函數(shù),類似于map和flatmap
l Split:根據(jù)規(guī)則把一個數(shù)據(jù)流切分為多個流
l Select:和split配合使用舶担,選擇切分后的流
案例一:使用union算子來合并多個DataStream
獲取兩個dataStream坡疼,然后使用union將兩個dataStream進行合并
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} object FlinkUnion { def main(args: Array[String]): Unit = { val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ //獲取第一個dataStream val firstStream: DataStream[String] = environment.fromElements("hello world","test scala") //獲取第二個dataStream val secondStream: DataStream[String] = environment.fromElements("second test","spark flink") //將兩個流進行合并起來val unionAll: DataStream[String] = firstStream.union(secondStream) //結(jié)果不做任何處理val unionResult: DataStream[String] = unionAll.map(x => { // println(x) x
}) //調(diào)用sink算子,打印輸出結(jié)果unionResult.print().setParallelism(1) //開始運行environment.execute()
}
}
案例二:使用connect實現(xiàn)不同類型的DataStream進行連接
import org.apache.flink.streaming.api.scala.{ConnectedStreams, DataStream, StreamExecutionEnvironment} object FlinkConnect { def main(args: Array[String]): Unit = { //獲取程序入口類val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //導入隱式轉(zhuǎn)換的包import org.apache.flink.api.scala._ //定義string類型的dataStream val strStream: DataStream[String] = environment.fromElements("hello world","abc test") //定義int類型的dataStream val intStream: DataStream[Int] = environment.fromElements(1,2,3,4,5) //兩個流進行connect操作val connectedStream: ConnectedStreams[String, Int] = strStream.connect(intStream) //通過map對數(shù)據(jù)進行處理衣陶,傳入兩個函數(shù)val connectResult: DataStream[Any] = connectedStream.map(x =>{ x + "abc"},y =>{ y * 2 })
connectResult.print().setParallelism(1)
environment.execute("connect stream")
}
}
案例三:使用split將一個DataStream切成多個DataStream
import java.{lang, util} import org.apache.flink.streaming.api.collector.selector.OutputSelector import org.apache.flink.streaming.api.scala.{DataStream, SplitStream, StreamExecutionEnvironment} object FlinkSplit { def main(args: Array[String]): Unit = { val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ //獲取第一個dataStream val resultDataStream: DataStream[String] = environment.fromElements("hello world","test spark","spark flink") //通過split來對我們的流進行切分操作val splitStream: SplitStream[String] = resultDataStream.split(new OutputSelector[String] { override def select(out: String): lang.Iterable[String] = { val strings = new util.ArrayListString if (out.contains("hello")) { //如果包含hello柄瑰,那么我們就給這個流起名字叫做hello strings.add("hello")
} else {
strings.add("other")
}
strings
}
}) //對我么的stream進行選擇val helloStream: DataStream[String] = splitStream.select("hello") //打印包含hello的所有的字符串helloStream.print().setParallelism(1)
environment.execute()
}
}
DataStream的Partition算子
https://blog.csdn.net/lmalds/article/details/60575205 flink的各種算子介紹
partition算子允許我們對數(shù)據(jù)進行重新分區(qū),或者解決數(shù)據(jù)傾斜等問題
l Random partitioning:隨機分區(qū)
? dataStream.shuffle()
l Rebalancing:對數(shù)據(jù)集進行再平衡剪况,重分區(qū)教沾,消除數(shù)據(jù)傾斜
? dataStream.rebalance()
l Rescaling:Rescaling是通過執(zhí)行oepration算子來實現(xiàn)的。由于這種方式僅發(fā)生在一個單一的節(jié)點译断,因此沒有跨網(wǎng)絡(luò)的數(shù)據(jù)傳輸授翻。
? dataStream.rescale()
<v:shape id="圖片_x0020_34" style="width:279pt;height:196.2pt;visibility:visible;mso-wrap-style:square" type="#_x0000_t75" alt="說明: C:\Users\admin\Desktop\1642492-20190329155510739-1792670965.png" o:spid="_x0000_i1025"><v:imagedata o:title="1642492-20190329155510739-1792670965" src="file:///C:\Users\chenning\AppData\Local\Temp\msohtmlclip1\01\clip_image003.png"></v:imagedata></v:shape>
l Custom partitioning:自定義分區(qū)
? 自定義分區(qū)需要實現(xiàn)Partitioner接口
? dataStream.partitionCustom(partitioner, "someKey")
? 或者dataStream.partitionCustom(partitioner, 0);
l Broadcasting:廣播變量,后面詳細講解
需求:對我們filter過后的數(shù)據(jù)進行重新分區(qū)
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} object FlinkPartition { def main(args: Array[String]): Unit = { val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ val dataStream: DataStream[String] = environment.fromElements("hello world","test spark","abc hello","hello flink") val resultStream: DataStream[(String, Int)] = dataStream.filter(x => x.contains("hello")) // .shuffle //隨機的重新分發(fā)數(shù)據(jù),上游的數(shù)據(jù)孙咪,隨機的發(fā)送到下游的分區(qū)里面去 // .rescale .rebalance //對數(shù)據(jù)重新進行分區(qū)堪唐,涉及到shuffle的過程.flatMap(x => x.split(" "))
.map(x => (x, 1))
.keyBy(0)
.sum(1)
resultStream.print().setParallelism(1)
environment.execute()
}
}
案例實戰(zhàn):自定義分區(qū)策略
如果以上的幾種分區(qū)方式還沒法滿足我們的需求,我們還可以自定義分區(qū)策略來實現(xiàn)數(shù)據(jù)的分區(qū)
需求:自定義分區(qū)策略该贾,實現(xiàn)不同分區(qū)的數(shù)據(jù)發(fā)送到不同分區(qū)里面去進行處理羔杨,將包含hello的字符串發(fā)送到一個分區(qū)里面去,其他的發(fā)送到另外一個分區(qū)里面去
第一步:自定義分區(qū)類
import org.apache.flink.api.common.functions.Partitioner class MyPartitioner extends Partitioner[String]{ override def partition(word: String, num: Int): Int = {
println("****分區(qū)個數(shù)為****" + num) if(word.contains("hello")){ 0 }else{ 1 }
}
}
第二步:代碼實現(xiàn)進行分區(qū)
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} object FlinkCustomerPartition { def main(args: Array[String]): Unit = { val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //設(shè)置我們的分區(qū)數(shù)杨蛋,如果不設(shè)置,默認使用CPU核數(shù)作為分區(qū)個數(shù)
environment.setParallelism(2) import org.apache.flink.api.scala._ //獲取dataStream val sourceStream: DataStream[String] = environment.fromElements("hello world","spark flink","hello world","hive hadoop") val rePartition: DataStream[String] = sourceStream.partitionCustom(new MyPartitioner,x => x +"")
rePartition.map(x =>{
println("****數(shù)據(jù)的****key****為****" + x + "****線程為****" + Thread.currentThread().getId)
x
})
rePartition.print()
environment.execute()
}
}
DataStream的sink算子
https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/
l writeAsText():將元素以字符串形式逐行寫入理澎,這些字符串通過調(diào)用每個元素的toString()方法來獲取
l print() / printToErr():打印每個元素的toString()方法的值到標準輸出或者標準錯誤輸出流中
l 自定義輸出addSink【kafka逞力、redis】
我們可以通過sink算子,將我們的數(shù)據(jù)發(fā)送到指定的地方去糠爬,例如kafka或者redis或者hbase等等寇荧,前面我們已經(jīng)使用過將數(shù)據(jù)打印出來調(diào)用print()方法,接下來我們來實現(xiàn)自定義sink將我們的數(shù)據(jù)發(fā)送到redis里面去
第一步:導入flink整合redis的jar包
** <dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.0</version> </dependency>
第二步:代碼開發(fā)
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.connectors.redis.RedisSink import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper} object Stream2Redis { def main(args: Array[String]): Unit = { //獲取程序入口類val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ //組織數(shù)據(jù)val streamSource: DataStream[String] = executionEnvironment.fromElements("hello world","key value") //將數(shù)據(jù)包裝成為key,value對形式的tuple val tupleValue: DataStream[(String, String)] = streamSource.map(x =>(x.split(" ")(0),x.split(" ")(1))) val builder = new FlinkJedisPoolConfig.Builder
builder.setHost("node03")
builder.setPort(6379)
builder.setTimeout(5000)
builder.setMaxTotal(50)
builder.setMaxIdle(10)
builder.setMinIdle(5) val config: FlinkJedisPoolConfig = builder.build() //獲取redis sink val redisSink = new RedisSink[Tuple2[String,String]](config,new MyRedisMapper) //使用我們自定義的sink tupleValue.addSink(redisSink) //執(zhí)行程序executionEnvironment.execute("redisSink")
}
} class MyRedisMapper extends RedisMapper[Tuple2[String,String]]{ override def getCommandDescription: RedisCommandDescription = { new RedisCommandDescription(RedisCommand.SET)
} override def getKeyFromData(data: (String, String)): String = {
data._1
} override def getValueFromData(data: (String, String)): String = {
data._2
}
}