Spark從入門到入土(二):任務(wù)提交

spark任務(wù)提交有三種方式

1:通過local方式提交
2:通過spark-submit腳本提交到集群
3:通過spark提交的API SparkLauncher提交到集群,這種方式可以將提交過程集成到我們的spring工程中恬涧,更加靈活

先來看一下spark架構(gòu)崭篡,可以幫助理解任務(wù)的提交
任務(wù)提交

驅(qū)動程序:執(zhí)行應(yīng)用程序main方法的進(jìn)程
集群管理器:啟動執(zhí)行器節(jié)點(diǎn)胎挎,有Mesos掘而、YARN(Hadoop)配深、獨(dú)立集群管理器(spark自帶的集群管理器)谆趾,在standalone模式中即為Master主節(jié)點(diǎn)躁愿。
執(zhí)行器節(jié)點(diǎn):工作進(jìn)程,負(fù)責(zé)在spark作業(yè)中運(yùn)行任務(wù)
過程大概如下
①:執(zhí)行器節(jié)點(diǎn)(工作節(jié)點(diǎn))在啟動時(shí)會向驅(qū)動器注冊自己
②:用戶提交任務(wù)沪蓬,驅(qū)動器調(diào)用main方法彤钟,驅(qū)動器與集群管理器通信申請執(zhí)行器資源
③:集群管理器為驅(qū)動器程序啟動執(zhí)行器節(jié)點(diǎn)
④:驅(qū)動器程序執(zhí)行應(yīng)用程序,將任務(wù)發(fā)送到工作節(jié)點(diǎn)
⑤:工作節(jié)點(diǎn)進(jìn)行計(jì)算并保存結(jié)果
⑥:驅(qū)動器main方法退出跷叉,通過集群管理器釋放資源

注意:在客戶端模式下逸雹,spark-submit 會將驅(qū)動器程序運(yùn)行 在 spark-submit 被調(diào)用的這臺機(jī)器上。在集群模式下云挟,驅(qū)動器程序會被傳輸并執(zhí)行 于集群的一個(gè)工作節(jié)點(diǎn)上

一梆砸、本地方式提交
//該代碼是對企業(yè)架構(gòu)下的不同級別的海量告警信息進(jìn)行離線統(tǒng)計(jì),按部門园欣、日期帖世、級別進(jìn)行分組統(tǒng)計(jì)
public static void main(String[] args) {
        logger.info("開始執(zhí)行spark任務(wù)");
        SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");

        SparkConf conf = new SparkConf()
                .setAppName("離線統(tǒng)計(jì)")
                //.setMaster("spark://" + SparkConfig.SPARK_MASTER_HOST + ":" + SparkConfig.SPARK_MASTER_PORT)
                .setMaster("local")
                .set("spark.mongodb.input.uri", MongoConfig.SPARK_MONGODB_URL_PREFIX + BaseConstant.ALARM_SOURCE_TABLE)
                .set("spark.mongodb.output.uri", MongoConfig.SPARK_MONGODB_URL_PREFIX + BaseConstant
                        .ALARM_TARGET_TABLE);
        JavaSparkContext jsc = new JavaSparkContext(conf);
        JavaMongoRDD<Document> lines = MongoSpark.load(jsc).withPipeline(
                singletonList(
                        getCondition()
                )
        );

        //按部門時(shí)間分組計(jì)算
        JavaPairRDD<String, AlarmStatisticBean> pairs =
                lines.filter((Function<Document, Boolean>) line -> {
                    //代碼略掉
                    //過濾函數(shù),true:不過濾沸枯,false:過濾
                    return true;
                }).mapToPair( 
                        //對RDD中的每個(gè)元素調(diào)用指定函數(shù)日矫,并返回<String, AlarmStatisticBean>類型的對象
                        //鍵值對, key: orgId_day  value: level[]   例:1_20190101, [0,1,0]
                        (PairFunction<Document, String, AlarmStatisticBean>) line -> {
                            Long orgId = line.getLong("orgId");
                            String statisticDate = sdf.format(line.getLong("createTimestamp") * 1000);
                            AlarmStatisticBean bean = new AlarmStatisticBean();
                            Long level = line.getLong("levelDictId");
                            if (level == AlarmTypeEnum.LEVEL1.getType()) {
                                bean.setLevel1Count(1);
                            }
                            if (level == AlarmTypeEnum.LEVEL2.getType()) {
                                bean.setLevel2Count(1);
                            }
                            if (level == AlarmTypeEnum.LEVEL3.getType()) {
                                bean.setLevel3Count(1);
                            }
                            bean.setOrgId(orgId.intValue());
                            bean.setDay(statisticDate);
                            String code = orgId + "_" + statisticDate;
                            return new Tuple2<>(code, bean);
                        }
                        //分組多列求和 1_20190101, [x,y,z]
                ).reduceByKey((Function2<AlarmStatisticBean, AlarmStatisticBean, AlarmStatisticBean>) (v1, v2) -> {
                   //reduceByKey的作用是合并具有相同鍵的值
                    v1.setLevel1Count(v1.getLevel1Count() + v2.getLevel1Count());
                    v1.setLevel2Count(v1.getLevel2Count() + v2.getLevel2Count());
                    v1.setLevel3Count(v1.getLevel3Count() + v2.getLevel3Count());
                    return v1;
                });
        logger.info("------------------------->>>>>" + pairs.count());
        List<Document> documents = new ArrayList<>();

        //類型轉(zhuǎn)換辉饱,以便持久化到DB(mongo或mysql)
        for (Tuple2<String, AlarmStatisticBean> tuple2 : pairs.collect()) {
            Document document = new Document("day", tuple2._2.getDay())
                    .append("level1Count", tuple2._2.getLevel1Count())
                    .append("level2Count", tuple2._2.getLevel2Count())
                    .append("level3Count", tuple2._2.getLevel3Count())
                    .append("orgId", tuple2._2.getOrgId());
            documents.add(document);
        }

        MongoManager.saveToMongo(documents, BaseConstant.ALARM_TARGET_TABLE);
    }

直接運(yùn)行上述main方法會將AlarmStatisticBean對象保存到MongoDB中

二搬男、spark-submit腳本

提交到集群時(shí),需要注釋掉local

SparkConf conf = new SparkConf()
                .setAppName("離線統(tǒng)計(jì)")
                .setMaster("spark://" + SparkConfig.SPARK_MASTER_HOST + ":" + SparkConfig.SPARK_MASTER_PORT)
                //.setMaster("local")
...
//帶*的是公司名或項(xiàng)目名稱彭沼,不影響閱讀
spark-submit --class com.*.*.meter.alarm.AlarmStatisticService --master spark://master:7077 /opt/middleware/*-alarm-task-1.0-jar-with-dependencies.ja

--master spark:// 表示會使用獨(dú)立模式缔逛,也就是使用spark自帶的獨(dú)立集群管理器。提交時(shí)使用的主機(jī)名和端口精確匹配用戶頁面中的URL,這里建議直接從http://172...6:8080頁面上復(fù)制URL褐奴,避免不必要的麻煩按脚。

三、SparkLauncher提交

SparkLauncher也提供了兩種方式提交任務(wù)

3.1敦冬、launch

SparkLauncher實(shí)際上是根據(jù)JDK自帶的ProcessBuilder構(gòu)造了一個(gè)UNIXProcess子進(jìn)程提交任務(wù)辅搬,提交的形式跟spark-submit一樣。這個(gè)子進(jìn)程會以阻塞的方式等待程序的運(yùn)行結(jié)果脖旱。簡單來看就是拼接spark-submit命令堪遂,并以子進(jìn)程的方式啟動。
代碼中的process.getInputStream()實(shí)際上對應(yīng)linux進(jìn)程的標(biāo)準(zhǔn)輸出stdout
process.getErrorStream()實(shí)際上對應(yīng)linux進(jìn)程的錯(cuò)誤信息stderr
process.getOutputStream()實(shí)際上對應(yīng)linux進(jìn)程的輸入信息stdin

@Scheduled(fixedRate = 5000 * 60)
    public void alarmStatistic() {
        logger.info("=====>>>>>離線統(tǒng)計(jì)定時(shí)任務(wù)!", System.currentTimeMillis());

        try {
            HashMap env = new HashMap();
            //這兩個(gè)屬性必須設(shè)置
            env.put("HADOOP_CONF_DIR", CommonConfig.HADOOP_CONF_DIR);
            env.put("JAVA_HOME", CommonConfig.JAVA_HOME);

            SparkLauncher handle = new SparkLauncher(env)
                    .setSparkHome(SparkConfig.SPARK_HOME)
                    .setAppResource(CommonConfig.ALARM_JAR_PATH)
                    .setMainClass(CommonConfig.ALARM_JAR_MAIN_CLASS)
                    .setMaster("spark://" + SparkConfig.SPARK_MASTER_HOST + ":" + SparkConfig.SPARK_MASTER_PORT)
                    .setDeployMode(SparkConfig.SPARK_DEPLOY_MODE)
                    .setVerbose(SparkConfig.SPARK_VERBOSE)
                    .setConf("spark.app.id", CommonConfig.ALARM_APP_ID)
                    .setConf("spark.driver.memory", SparkConfig.SPARK_DRIVER_MEMORY)
                    .setConf("spark.rpc.message.maxSize", SparkConfig.SPARK_RPC_MESSAGE_MAXSIZE)
                    .setConf("spark.executor.memory", SparkConfig.SPARK_EXECUTOR_MEMORY)
                    .setConf("spark.executor.instances", SparkConfig.SPARK_EXECUTOR_INSTANCES)
                    .setConf("spark.executor.cores", SparkConfig.SPARK_EXECUTOR_CORES)
                    .setConf("spark.default.parallelism", SparkConfig.SPARK_DEFAULT_PARALLELISM)
                    .setConf("spark.driver.allowMultipleContexts", SparkConfig.SPARK_DRIVER_ALLOWMULTIPLECONTEXTS)
                    .setVerbose(true);

            Process process = handle.launch();
            InputStreamRunnable inputStream = new InputStreamRunnable(process.getInputStream(), "alarm task input");
            ExecutorUtils.getExecutorService().submit(inputStream);
            InputStreamRunnable errorStream = new InputStreamRunnable(process.getErrorStream(), "alarm task error");
            ExecutorUtils.getExecutorService().submit(errorStream);

            logger.info("Waiting for finish...");
            int exitCode = process.waitFor();
            logger.info("Finished! Exit code:" + exitCode);
        } catch (Exception e) {
            logger.error("submit spark task error", e);
        }
    }
運(yùn)行過程示意圖
運(yùn)行過程

1:用戶程序啟動(SparkLauncher萌庆,非驅(qū)動程序)時(shí)會在當(dāng)前節(jié)點(diǎn)上啟動一個(gè)SparkSubmit進(jìn)程溶褪,并將驅(qū)動程序(即spark任務(wù))發(fā)送到任意一個(gè)工作節(jié)點(diǎn)上,在工作節(jié)點(diǎn)上啟動DriverWrapper進(jìn)程
2:驅(qū)動程序會從集群管理器(standalone模式下是master服務(wù)器)申請執(zhí)行器資源
3:集群管理器反饋執(zhí)行器資源給驅(qū)動器
4:驅(qū)動器Driver將任務(wù)發(fā)送到執(zhí)行器節(jié)點(diǎn)執(zhí)行

spark首頁監(jiān)控

可以看到啟動的Driver


首頁監(jiān)控-驅(qū)動器

進(jìn)一步可以查看到執(zhí)行器情況


首頁監(jiān)控-執(zhí)行器
也可通過服務(wù)器進(jìn)程查看各進(jìn)程之間的關(guān)系
Master節(jié)點(diǎn)

工作節(jié)點(diǎn)-驅(qū)動器

工作節(jié)點(diǎn)-執(zhí)行器1

工作節(jié)點(diǎn)-執(zhí)行器2
3.2践险、startApplication()方式
    @Scheduled(fixedRate = 5000 * 60)
    public void alarmStatistic() {
        logger.info("=====>>>>>告警離線統(tǒng)計(jì)定時(shí)任務(wù)!", System.currentTimeMillis());

        try {
            HashMap env = new HashMap();
            //這兩個(gè)屬性必須設(shè)置
            env.put("HADOOP_CONF_DIR", CommonConfig.HADOOP_CONF_DIR);
            env.put("JAVA_HOME", CommonConfig.JAVA_HOME);

            CountDownLatch countDownLatch = new CountDownLatch(1);
            SparkAppHandle handle = new SparkLauncher(env)
                    .setSparkHome(SparkConfig.SPARK_HOME)
                    .setAppResource(CommonConfig.ALARM_JAR_PATH)
                    .setMainClass(CommonConfig.ALARM_JAR_MAIN_CLASS)
                    .setMaster("spark://" + SparkConfig.SPARK_MASTER_HOST + ":" + SparkConfig.SPARK_MASTER_PORT)
//                    .setMaster("yarn")
                    .setDeployMode(SparkConfig.SPARK_DEPLOY_MODE)
                    .setVerbose(SparkConfig.SPARK_VERBOSE)
                    .setConf("spark.app.id", CommonConfig.ALARM_APP_ID)
                    .setConf("spark.driver.memory", SparkConfig.SPARK_DRIVER_MEMORY)
                    .setConf("spark.rpc.message.maxSize", SparkConfig.SPARK_RPC_MESSAGE_MAXSIZE)
                    .setConf("spark.executor.memory", SparkConfig.SPARK_EXECUTOR_MEMORY)
                    .setConf("spark.executor.instances", SparkConfig.SPARK_EXECUTOR_INSTANCES)
                    .setConf("spark.executor.cores", SparkConfig.SPARK_EXECUTOR_CORES)
                    .setConf("spark.default.parallelism", SparkConfig.SPARK_DEFAULT_PARALLELISM)
                    .setConf("spark.driver.allowMultipleContexts", SparkConfig.SPARK_DRIVER_ALLOWMULTIPLECONTEXTS)
                    .setVerbose(true).startApplication(new SparkAppHandle.Listener() {
                        //這里監(jiān)聽任務(wù)狀態(tài)猿妈,當(dāng)任務(wù)結(jié)束時(shí)(不管是什么原因結(jié)束),isFinal()方法會返回true,否則返回false
                        @Override
                        public void stateChanged(SparkAppHandle sparkAppHandle) {
                            if (sparkAppHandle.getState().isFinal()) {
                                countDownLatch.countDown();
                            }
                            System.out.println("state:" + sparkAppHandle.getState().toString());
                        }

                        @Override
                        public void infoChanged(SparkAppHandle sparkAppHandle) {
                            System.out.println("Info:" + sparkAppHandle.getState().toString());
                        }
                    });
            logger.info("The task is executing, please wait ....");
            //線程等待任務(wù)結(jié)束
            countDownLatch.await();
            logger.info("The task is finished!");
        } catch (Exception e) {
            logger.error("submit spark task error", e);
        }
    }

這種模式下,據(jù)本人親自測試巍虫,只有在yarn工作模式下可以提交成功彭则,在standalone模式下總是提交失敗,如果有人知道的可以留言告訴我
yarn模式需要安裝hadoop集群占遥,提交任務(wù)的流程基本和上面是一樣的俯抖,不同的是集群管理器不在是spark自帶的集群管理器,而是由yarn來管理瓦胎,這也是官方推薦的提交方式蚌成,比較麻煩的就是需要安裝hadoop集群,hadoop的安裝參加另一篇
Hadoop集群搭建

從監(jiān)控頁面可以看到application的執(zhí)行情況

yarn監(jiān)控頁面

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末奕坟,一起剝皮案震驚了整個(gè)濱河市请唱,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌,老刑警劉巖霍比,帶你破解...
    沈念sama閱讀 218,858評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異翠勉,居然都是意外死亡积担,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,372評論 3 395
  • 文/潘曉璐 我一進(jìn)店門芝硬,熙熙樓的掌柜王于貴愁眉苦臉地迎上來蚜点,“玉大人,你說我怎么就攤上這事拌阴∩芑妫” “怎么了?”我有些...
    開封第一講書人閱讀 165,282評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長陪拘。 經(jīng)常有香客問我厂镇,道長,這世上最難降的妖魔是什么左刽? 我笑而不...
    開封第一講書人閱讀 58,842評論 1 295
  • 正文 為了忘掉前任捺信,我火速辦了婚禮,結(jié)果婚禮上欠痴,老公的妹妹穿的比我還像新娘迄靠。我一直安慰自己,他們只是感情好喇辽,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,857評論 6 392
  • 文/花漫 我一把揭開白布掌挚。 她就那樣靜靜地躺著,像睡著了一般茵臭。 火紅的嫁衣襯著肌膚如雪疫诽。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,679評論 1 305
  • 那天旦委,我揣著相機(jī)與錄音奇徒,去河邊找鬼。 笑死缨硝,一個(gè)胖子當(dāng)著我的面吹牛摩钙,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播查辩,決...
    沈念sama閱讀 40,406評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼胖笛,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了宜岛?” 一聲冷哼從身側(cè)響起长踊,我...
    開封第一講書人閱讀 39,311評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎萍倡,沒想到半個(gè)月后身弊,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,767評論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡列敲,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,945評論 3 336
  • 正文 我和宋清朗相戀三年阱佛,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片戴而。...
    茶點(diǎn)故事閱讀 40,090評論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡凑术,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出所意,到底是詐尸還是另有隱情淮逊,我是刑警寧澤催首,帶...
    沈念sama閱讀 35,785評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站壮莹,受9級特大地震影響翅帜,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜命满,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,420評論 3 331
  • 文/蒙蒙 一涝滴、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧胶台,春花似錦歼疮、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,988評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至铸磅,卻和暖如春赡矢,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背阅仔。 一陣腳步聲響...
    開封第一講書人閱讀 33,101評論 1 271
  • 我被黑心中介騙來泰國打工吹散, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人八酒。 一個(gè)月前我還...
    沈念sama閱讀 48,298評論 3 372
  • 正文 我出身青樓空民,卻偏偏與公主長得像,于是被迫代替她去往敵國和親羞迷。 傳聞我的和親對象是個(gè)殘疾皇子界轩,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,033評論 2 355

推薦閱讀更多精彩內(nèi)容