一、Environment
1.getExecutionEnvironment
創(chuàng)建一個執(zhí)行環(huán)境踢关,表示當前執(zhí)行程序的上下文谦铃。 如果程序是獨立調(diào)用的,則此方法返回本地執(zhí)行環(huán)境;如果從命令行客戶端調(diào)用程序以提交到集群撒会,則此方法返回此集群的執(zhí)行環(huán)境,也就是說诵肛,getExecutionEnvironment會根據(jù)查詢運行的方式?jīng)Q定返回什么樣的運行環(huán)境怔檩,是最常用的一種創(chuàng)建執(zhí)行環(huán)境的方式。
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
2.createLocalEnvironment
返回本地執(zhí)行環(huán)境媒吗,需要在調(diào)用時指定默認的并行度乙埃。
val env = StreamExecutionEnvironment.createLocalEnvironment(1)
3.createRemoteEnvironment
返回集群執(zhí)行環(huán)境介袜,將Jar提交到遠程服務器遇伞。需要在調(diào)用時指定JobManager的IP和端口號,并指定要在集群中運行的Jar包鸠珠。
val env = ExecutionEnvironment.createRemoteEnvironment("jobmanager-hostname", 6123,"C://jar//flink//wordcount.jar")
二跳芳、Source
創(chuàng)建一個kafka的工具類
object MyKafkaUtil {
val prop = new Properties()
prop.setProperty("bootstrap.servers","hadoop1:9092")
prop.setProperty("group.id","test")
def getConsumer(topic:String ):FlinkKafkaConsumer011[String]= {
val myKafkaConsumer:FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String](topic, new SimpleStringSchema(), prop)
myKafkaConsumer
}
}
消費kafka
object StartupApp {
def main(args: Array[String]): Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val kafkaConsumer =MyKafkaUtil.getConsumer("GMALL_STARTUP")
val dstream: DataStream[String] = environment.addSource(kafkaConsumer)
dstream.print()
environment.execute()
}
}
Exactly-once two-phase commit
Flink通過checkpoint來保存數(shù)據(jù)是否處理完成的狀態(tài)
由JobManager協(xié)調(diào)各個TaskManager進行checkpoint存儲娄琉,checkpoint保存在 StateBackend中,默認StateBackend是內(nèi)存級的孽水,也可以改為文件級的進行持久化保存女气。
執(zhí)行過程實際上是一個兩段式提交炼鞠,每個算子執(zhí)行完成,會進行“預提交”朝扼,直到執(zhí)行完sink操作霎肯,會發(fā)起“確認提交”观游,如果執(zhí)行失敗,預提交會放棄掉允跑。
如果宕機需要通過StateBackend進行恢復吮蛹,只能恢復所有確認提交的操作。
三潮针、Transform
1.KeyBy和Reduce
spark中的reduceByKey在Flink中被分成兩個算子:KeyBy和Reduce
KeyBy:
DataStream → KeyedStream:輸入必須是Tuple類型倚喂,邏輯地將一個流拆分成不相交的分區(qū),每個分區(qū)包含具有相同key的元素焦读,在內(nèi)部以hash的形式實現(xiàn)的矗晃,KeyedStream是有狀態(tài)的宴倍。
Reduce:
KeyedStream → DataStream:一個分組數(shù)據(jù)流的聚合操作仓技,合并當前的元素和上次聚合的結(jié)果脖捻,產(chǎn)生一個新的值地沮,返回的流中包含每一次聚合的結(jié)果摩疑,而不是只返回最后一次聚合的最終結(jié)果未荒。
2.Split 和 Select
Split:
類似于Flume中的選擇器,在一個DataStream頭部加上不同的戳拆分成多個DataStream速侈。
Select:
在splitStream中獲取一個或多個DataStream倚搬。
val splitStream: SplitStream[StartUpLog] = startUplogDstream.split { startUplog =>
var flags:List[String] = null
if ("appstore" == startUplog.ch) {
flags = List(startUplog.ch)
} else {
flags = List("other" )
}
flags
}
val appStoreStream: DataStream[StartUpLog] = splitStream.select("appstore")
val otherStream: DataStream[StartUpLog] = splitStream.select("other")
3.Connect和 CoMap
Connect:
連接兩個數(shù)據(jù)流
CoMap:
val connStream: ConnectedStreams[StartUpLog, StartUpLog] = appStoreStream.connect(otherStream)
val allStream: DataStream[String] = connStream.map(
(log1: StartUpLog) => log1.ch,
(log2: StartUpLog) => log2.ch
)
4.Union
對兩個或者兩個以上的DataStream進行union操作家卖,產(chǎn)生一個包含所有DataStream元素的新DataStream。
Connect與 Union 區(qū)別:
1.Union之前兩個流的類型必須是一樣上荡,Connect可以不一樣趴樱,在之后的coMap中再去調(diào)整成為一樣的。
2.Connect只能操作兩個流酪捡,Union可以操作多個.
Sink
1.Kafka
在kafka工具類中添加方法
def getProducer(topic:String): FlinkKafkaProducer011[String] ={
new FlinkKafkaProducer011[String](brokerList,topic,new SimpleStringSchema())
}
主函數(shù)中添加
val myKafkaProducer: FlinkKafkaProducer011[String] = MyKafkaUtil.getProducer("channel_sum")
sumDstream.map( chCount=>chCount._1+":"+chCount._2 ).addSink(myKafkaProducer)
2.Redis
在Redis工具類中添加方法
def getRedisSink(): RedisSink[(String,String)] ={
new RedisSink[(String,String)](conf,new MyRedisMapper)
}
class MyRedisMapper extends RedisMapper[(String,String)]{
override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand.HSET, "channel_count")
// new RedisCommandDescription(RedisCommand.SET )
}
override def getValueFromData(t: (String, String)): String = t._2
override def getKeyFromData(t: (String, String)): String = t._1
}
3.Elasticsearch
def getElasticSearchSink(indexName:String): ElasticsearchSink[String] ={
val esFunc = new ElasticsearchSinkFunction[String] {
override def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer): Unit = {
println("試圖保存:"+element)
val jsonObj: JSONObject = JSON.parseObject(element)
val indexRequest: IndexRequest = Requests.indexRequest().index(indexName).`type`("_doc").source(jsonObj)
indexer.add(indexRequest)
println("保存1條")
}
}
val sinkBuilder = new ElasticsearchSink.Builder[String](httpHosts, esFunc)
//刷新前緩沖的最大動作量
sinkBuilder.setBulkFlushMaxActions(10)
sinkBuilder.build()
}
4.JDBC 自定義sink
略略略