SparkStreaming? ? 維護????Kafka? ? 消息偏移量據(jù)個人了解有兩種方式
一牺堰、利用????SparkStreaming? ? 自帶的????Checkpoint? ? 方法來維護
二雇毫、自己來編寫維護????Kafka? ? 消息偏移量的代碼
首先說明下集群中的各組件版本
Hadoop-version:????2.7.1
Spark-version? ? :????1.6.2
zookeeper-version:????3.4.6
jdk-version:? ? ? ? ? ? ? 1.8
maven-version:? ? ? ? 3.3.3
kafka-version:? ? ? ? ? ?0.10.0
集群搭建方式? ? HDP(ambari)
首先說第一種????SparkStreaming? ? 自帶的????Checkpoint? ? 方法,以下為代碼示例
object Test {
def main(args: Array[String]): Unit = {
//? ? TODO:創(chuàng)建檢查點的位置可以設(shè)置為Hdfs如果程序重新啟動spark程序會到此目錄中檢查并恢復(fù)
? ? val checkpointDirectory ="";
//? ? TODO:調(diào)用getOrCreate方法,這個方法入如果是第一次運行該作業(yè)
? ? //? ? TODO:沒有checkpointDirectory該文件時
? ? //? ? TODO:將會重新創(chuàng)建一個StreamingContext搬俊,并從最新或是最老的偏移量處開始消費
? ? val ssc: StreamingContext = StreamingContext.getOrCreate(checkpointDirectory, () => {
createStreaming(checkpointDirectory)
})
ssc.start()
ssc.awaitTermination()
}
//? TODO:你的業(yè)務(wù)邏輯不應(yīng)該寫在main函數(shù)中,而是應(yīng)該寫在創(chuàng)建
? //? TODO:StreamingContext的方法中
? def createStreaming(checkpointDirectory:String): StreamingContext = {
? ??????val brokerlist:String ="192.168.1.1:6667,192.168.1.1:6667"
? ?? ???val topic =Set("test")
? ??????val kafkaParams:Map[String,String] =Map[String,String]("metadata.broker.list" -> brokerlist)
? ??????val conf: SparkConf =new SparkConf()
? ??????????????????????????????????.setMaster("local")
? ??????????????????????????????????.setAppName(Test.getClass.getSimpleName)
? ??????val ssc: StreamingContext =new StreamingContext(conf,Seconds(3))
//? ? TODO:在這里創(chuàng)建檢查點曲饱,至于檢查點在哪里創(chuàng)建悠抹,具體需要看個人的業(yè)務(wù)需求
? ??? ??ssc.checkpoint(checkpointDirectory)
? ??????val data: InputDStream[(String,String)] =
? ??????????????????????KafkaUtils.createDirectStream[String,String, StringDecoder, StringDecoder](ssc, kafkaParams, topic)
//? TODO:以下代碼就是你的業(yè)務(wù)邏輯了,這里只是循環(huán)遍歷打印了Key扩淀,Value
? ??? ??data.foreachRDD(rdd => {
? ??????????????rdd.foreachPartition(partition => {
? ??????????????????????if (partition.isEmpty) {
? ??????????????????????????????for (tuple <- partition) {
? ??????????????????????????????????println(tuple._1 + tuple._2)
? ??????????????????????????}
? ??????????????????????}
? ??????????????????})
? ??????????????})
? ??????????ssc
? ??????????}
? ??????}
說一下這個方法的缺點:
這個方法不適合總是需要迭代升級的應(yīng)用楔敌,因為這個方法會在你建立檢查點時將你的jar包信息以序列化的方式存在此目錄中,
如果你的作業(yè)掛掉重新啟動時驻谆,這時候是沒有問題的卵凑,因為什么都沒有改變。
但是在你的應(yīng)用迭代升級時你的代碼發(fā)生了變化胜臊,這是程序會發(fā)現(xiàn)其中的變化勺卢,你迭代升級后的版本將無法運行,就算是啟動成功了象对,
運行的也還是迭代升級之前的代碼黑忱。所還是以失敗而告終!勒魔!
在Spark官方文檔中給出了兩個解決辦法
第一個:老的作業(yè)不停機甫煞,新作業(yè)個老作業(yè)同時運行一段時間,這樣是不安全的9诰睢8Х汀!
會導(dǎo)致數(shù)據(jù)重復(fù)消費弟胀,也有可能會發(fā)生數(shù)據(jù)丟失等問題
第二個:就是我要講的自己維護消息偏移量
以下是自己維護消息偏移量代碼示例
object Test{
? ??????def main(args: Array[String]): Unit = {
//? ? TODO: Zookeeper 集群地址和端口
??? ??????????? val zkHost:String ="192.168.1.1:2181,192.168.1.1:2181,192.168.1.1:2181"
? ? //? ? TODO:Kafka集群地址及端口
??? ??????????? val brokerlist:String ="192.168.1.1:6667,192.168.1.1:6667,192.168.1.1:6667"
? ? //? ? TODO:指定消費 Kafka 主題
??? ??????????? val topic:String ="test"
? ??? ??????????val kafkaParams:Map[String,String] =Map[String,String]("metadata.broker.list" -> brokerlist)
? ??? ??????????val conf: SparkConf =new SparkConf()
? ??????????????????????????????.setMaster("local[8]")
? ??????????????????????????????.setAppName(NGBoss_Dcc_Analysis.getClass.getSimpleName)
//? ? TODO:獲取一個StreamingContext對象
? ?? ???????????val ssc: StreamingContext =new StreamingContext(conf,Seconds(5))
//? ? TODO:將獲取的Zookeeper客戶端
? ??? ??????????val zkClient =new ZkClient(zkHost)
? ??????????????var offsetRanges: Array[OffsetRange] =Array[OffsetRange]()
//? ? TODO:設(shè)置在zookeeper中存儲offset的路徑
? ??? ??????????val topicDirs: ZKGroupTopicDirs =new ZKGroupTopicDirs("TEST_TOPIC_spark_streaming", topic)
? ??? ??????????val children = zkClient.countChildren(s"${topicDirs.consumerOffsetDir}")
? ??? ??????????var fromOffsets:Map[TopicAndPartition, Long] =Map()
? ??????????????var kafkaStream: InputDStream[(String, Array[Byte])] =null
? ??? ? ? ? ? ? ? ? ? ? ??if (children >0) {
//TODO:如果 zookeeper 中有保存 offset楷力,我們會利用這個 offset 作為 kafkaStream 的起始位置
//TODO:如果保存過 offset喊式,這里更好的做法,還應(yīng)該和? kafka 上最小的 offset 做對比萧朝,不然會報 OutOfRange 的錯誤
? ? ?? ?????????????????????????for (i <-0 until children) {
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??val partitionOffset = zkClient.readData[String](s"${topicDirs.consumerOffsetDir}/${i}")
? ??????????????????????????????????val tp =TopicAndPartition(topic, i)
//TODO:將不同 partition 對應(yīng)的 offset 增加到 fromOffsets 中
? ? ? ?? ???????????fromOffsets += (tp -> partitionOffset.toLong)
? ??????????????????}
//TODO:這個會將 kafka 的消息進行 transform岔留,最終 kafak 的數(shù)據(jù)都會變成 (topic_name, message) 這樣的 tuple
? ? ?? ???????val messageHandler: (MessageAndMetadata[String, Array[Byte]]) => (String, Array[Byte]) = (mmd:?? ??????????????????????????????MessageAndMetadata[String, Array[Byte]]) => (mmd.topic, mmd.message())
? ????????????kafkaStream = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, ByteDecoder,?? ??????????????????????????????(String, Array[Byte])](ssc, kafkaParams, fromOffsets, messageHandler)
? ??????????}else {
//TODO:如果未保存,根據(jù) kafkaParam 的配置使用最新或者最舊的 offset
? ? ?? ???????????kafkaStream = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, ByteDecoder]? ??????????????????????????????(ssc, kafkaParams,Set(topic))
? ??????????????}
? ??????????kafkaStream.foreachRDD(rdd => {
? ??????????????????if (!rdd.isEmpty()) {
? ??????????????????????rdd.foreachPartition(partition=> {
? ??????????????????????????????partition.foreach(tuple => {
? ??????????????????????????????????println(tuple._1 + tuple._2)
? ??????????????????????????????})
? ??????????????????????})
? ??????????????????offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
? ??????????????????????????????for (o <- offsetRanges) {
? ??????????????????????????????val zkPath:String =s"${topicDirs.consumerOffsetDir}/${o.partition}"
? ? ? ? ? //TODO:將該 partition 的 offset 保存到 zookeeper
? ? ? ? ?? ??????????????????????ZkUtils.updatePersistentPath(zkClient, zkPath, o.fromOffset.toString)
? ??????????????????????????}
? ??????????????????????}
? ??????????????????})
? ??????????ssc.start()
? ??????ssc.awaitTermination()
? ??????}
? ??}
第二種方法中offset可以存儲在zookeeper中也可以存儲在數(shù)據(jù)庫中
自己維護offset是不會發(fā)生Checkpoint?中的問題剪勿,是我目前知道的最好的一個解決方案贸诚。
但是也有其中的缺點,如果從失敗中恢復(fù)運行時不能獲取到Key值厕吉,默認它的Key就是主題。
還有就是不能同時消費維護多個主題中的偏移量械念。