spark streaming checkpoint 遇上NotSerializableException

需求:spark streaming消費(fèi)kafka锨匆,offset維護(hù)到kafka topic屯仗,開(kāi)啟checkpoint

環(huán)境:Java鱼填、spark2.3擒贸、spark-streaming-kafka-0-10_2. 11

開(kāi)啟checkpoint后程序遇到序列化異常,解決方案如下:

① 程序用到了自定義ConsumerStrategy窗轩,需要添加序列化

② 異步提交offset到kafka夯秃,commitAsync方法的回調(diào)函數(shù)不能用lambda表達(dá)式。需要實(shí)現(xiàn)OffsetCommitCallback回調(diào)函數(shù)接口并序列化


import com.lazyge.sprk.stream.util.KfkConsumerStrategy;

import org.apache.kafka.clients.consumer.ConsumerConfig;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.common.serialization.StringDeserializer;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.function.FlatMapFunction;

import org.apache.spark.api.java.function.Function;

import org.apache.spark.api.java.function.Function2;

import org.apache.spark.api.java.function.VoidFunction2;

import org.apache.spark.streaming.Durations;

import org.apache.spark.streaming.Time;

import org.apache.spark.streaming.api.java.JavaDStream;

import org.apache.spark.streaming.api.java.JavaInputDStream;

import org.apache.spark.streaming.api.java.JavaStreamingContext;

import org.apache.spark.streaming.kafka010.*;

import java.util.*;

import java.util.concurrent.atomic.AtomicReference;

public class CkptTestSpark {

public static void main(String[] args)throws InterruptedException {

new CkptTestSpark().start();

}

public void start()throws InterruptedException {

JavaStreamingContext ssc = getContext();

JavaInputDStream> stream = getStream(ssc);

AtomicReference offsetRanges =new AtomicReference<>();

JavaDStream wordDstream = stream.transform((Function2>, Time, JavaRDD>) (rdd, time) -> {

if( rdd.rdd()instanceof HasOffsetRanges ){

offsetRanges.set(((HasOffsetRanges) rdd.rdd()).offsetRanges());

}

JavaRDD lines = rdd.map((Function, String>) (record) -> {

return (String)record.value();

});

JavaRDD words = lines.flatMap((FlatMapFunction) (line) -> {

return Arrays.asList(line.split(" ")).iterator();

});

return words;

});

wordDstream.foreachRDD((VoidFunction2, Time>)(rdd, time) -> {

rdd.collect().forEach(System.out::println);

/**

* 開(kāi)啟checkpoint的時(shí)候,OffsetCommitCallback 需要實(shí)現(xiàn)序列化才能使用

* 否則報(bào)錯(cuò):NotSerializableException

*/

if (stream.dstream()instanceof CanCommitOffsets) {

((CanCommitOffsets)stream.dstream()).commitAsync(offsetRanges.get()/*, (OffsetCommitCallback)(ofs, ex) -> {

System.err.println("開(kāi)啟checkpoint的時(shí)候仓洼,OffsetCommitCallback 需要實(shí)現(xiàn)序列化才能使用");

                }*/);

}

});

ssc.start();

System.out.println("beginninignininiinini");

ssc.awaitTermination();

}

private JavaStreamingContext getContext(){

SparkConf conf =new SparkConf()

.setIfMissing("spark.master","local[*]")

.setIfMissing("spark.app.name","ckptTestSpark");

JavaStreamingContext ssc =new JavaStreamingContext(conf, Durations.seconds(5));

ssc.checkpoint("E:\\tmp\\ckptspark");

return ssc;

}

private JavaInputDStream> getStream(JavaStreamingContext ssc){

String brokers ="192.168.99.128:9092,192.168.99.128:9093,192.168.99.128:9094";

String groupId ="testCkpt";

String topics ="first";

Set topicsSet =new HashSet<>(Arrays.asList(topics.split(",")));

Map kafkaParams =new HashMap<>();

kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);

kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

/**

* 開(kāi)啟checkpoint的時(shí)候介陶,ConsumerStrategy 需要實(shí)現(xiàn)序列化才能使用

* 否則報(bào)錯(cuò):NotSerializableException

*/
JavaInputDStream> stream = KafkaUtils.createDirectStream(ssc,LocationStrategies.PreferConsistent(),
new KfkConsumerStrategy(kafkaParams, Arrays.asList(topics)));

/*JavaInputDStream> stream = KafkaUtils.createDirectStream(ssc,LocationStrategies.PreferConsistent(),ConsumerStrategies.Subscribe(topicsSet, kafkaParams));*/

return stream;

}

}

總結(jié):用scala不會(huì)有那么多問(wèn)題

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市色建,隨后出現(xiàn)的幾起案子哺呜,更是在濱河造成了極大的恐慌,老刑警劉巖箕戳,帶你破解...
    沈念sama閱讀 219,270評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件某残,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡陵吸,警方通過(guò)查閱死者的電腦和手機(jī)驾锰,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,489評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)走越,“玉大人,你說(shuō)我怎么就攤上這事耻瑟≈贾福” “怎么了?”我有些...
    開(kāi)封第一講書人閱讀 165,630評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵喳整,是天一觀的道長(zhǎng)谆构。 經(jīng)常有香客問(wèn)我,道長(zhǎng)框都,這世上最難降的妖魔是什么搬素? 我笑而不...
    開(kāi)封第一講書人閱讀 58,906評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮魏保,結(jié)果婚禮上熬尺,老公的妹妹穿的比我還像新娘。我一直安慰自己谓罗,他們只是感情好粱哼,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,928評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著檩咱,像睡著了一般揭措。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上刻蚯,一...
    開(kāi)封第一講書人閱讀 51,718評(píng)論 1 305
  • 那天绊含,我揣著相機(jī)與錄音,去河邊找鬼炊汹。 笑死躬充,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播麻裳,決...
    沈念sama閱讀 40,442評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼口蝠,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了津坑?” 一聲冷哼從身側(cè)響起妙蔗,我...
    開(kāi)封第一講書人閱讀 39,345評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎疆瑰,沒(méi)想到半個(gè)月后眉反,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,802評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡穆役,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,984評(píng)論 3 337
  • 正文 我和宋清朗相戀三年寸五,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片耿币。...
    茶點(diǎn)故事閱讀 40,117評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡梳杏,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出淹接,到底是詐尸還是另有隱情十性,我是刑警寧澤,帶...
    沈念sama閱讀 35,810評(píng)論 5 346
  • 正文 年R本政府宣布塑悼,位于F島的核電站劲适,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏厢蒜。R本人自食惡果不足惜霞势,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,462評(píng)論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望斑鸦。 院中可真熱鬧愕贡,春花似錦、人聲如沸巷屿。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 32,011評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)攒庵。三九已至嘴纺,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間浓冒,已是汗流浹背栽渴。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 33,139評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留稳懒,地道東北人闲擦。 一個(gè)月前我還...
    沈念sama閱讀 48,377評(píng)論 3 373
  • 正文 我出身青樓慢味,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親墅冷。 傳聞我的和親對(duì)象是個(gè)殘疾皇子纯路,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,060評(píng)論 2 355