spark任務(wù)提交有三種方式
1:通過local方式提交
2:通過spark-submit腳本提交到集群
3:通過spark提交的API SparkLauncher提交到集群,這種方式可以將提交過程集成到我們的spring工程中恬涧,更加靈活
先來看一下spark架構(gòu)崭篡,可以幫助理解任務(wù)的提交
驅(qū)動程序:執(zhí)行應(yīng)用程序main方法的進(jìn)程
集群管理器:啟動執(zhí)行器節(jié)點(diǎn)胎挎,有Mesos掘而、YARN(Hadoop)配深、獨(dú)立集群管理器(spark自帶的集群管理器)谆趾,在standalone模式中即為Master主節(jié)點(diǎn)躁愿。
執(zhí)行器節(jié)點(diǎn):工作進(jìn)程,負(fù)責(zé)在spark作業(yè)中運(yùn)行任務(wù)
過程大概如下
①:執(zhí)行器節(jié)點(diǎn)(工作節(jié)點(diǎn))在啟動時(shí)會向驅(qū)動器注冊自己
②:用戶提交任務(wù)沪蓬,驅(qū)動器調(diào)用main方法彤钟,驅(qū)動器與集群管理器通信申請執(zhí)行器資源
③:集群管理器為驅(qū)動器程序啟動執(zhí)行器節(jié)點(diǎn)
④:驅(qū)動器程序執(zhí)行應(yīng)用程序,將任務(wù)發(fā)送到工作節(jié)點(diǎn)
⑤:工作節(jié)點(diǎn)進(jìn)行計(jì)算并保存結(jié)果
⑥:驅(qū)動器main方法退出跷叉,通過集群管理器釋放資源
注意:在客戶端模式下逸雹,spark-submit 會將驅(qū)動器程序運(yùn)行 在 spark-submit 被調(diào)用的這臺機(jī)器上。在集群模式下云挟,驅(qū)動器程序會被傳輸并執(zhí)行 于集群的一個(gè)工作節(jié)點(diǎn)上
一梆砸、本地方式提交
//該代碼是對企業(yè)架構(gòu)下的不同級別的海量告警信息進(jìn)行離線統(tǒng)計(jì),按部門园欣、日期帖世、級別進(jìn)行分組統(tǒng)計(jì)
public static void main(String[] args) {
logger.info("開始執(zhí)行spark任務(wù)");
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
SparkConf conf = new SparkConf()
.setAppName("離線統(tǒng)計(jì)")
//.setMaster("spark://" + SparkConfig.SPARK_MASTER_HOST + ":" + SparkConfig.SPARK_MASTER_PORT)
.setMaster("local")
.set("spark.mongodb.input.uri", MongoConfig.SPARK_MONGODB_URL_PREFIX + BaseConstant.ALARM_SOURCE_TABLE)
.set("spark.mongodb.output.uri", MongoConfig.SPARK_MONGODB_URL_PREFIX + BaseConstant
.ALARM_TARGET_TABLE);
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaMongoRDD<Document> lines = MongoSpark.load(jsc).withPipeline(
singletonList(
getCondition()
)
);
//按部門時(shí)間分組計(jì)算
JavaPairRDD<String, AlarmStatisticBean> pairs =
lines.filter((Function<Document, Boolean>) line -> {
//代碼略掉
//過濾函數(shù),true:不過濾沸枯,false:過濾
return true;
}).mapToPair(
//對RDD中的每個(gè)元素調(diào)用指定函數(shù)日矫,并返回<String, AlarmStatisticBean>類型的對象
//鍵值對, key: orgId_day value: level[] 例:1_20190101, [0,1,0]
(PairFunction<Document, String, AlarmStatisticBean>) line -> {
Long orgId = line.getLong("orgId");
String statisticDate = sdf.format(line.getLong("createTimestamp") * 1000);
AlarmStatisticBean bean = new AlarmStatisticBean();
Long level = line.getLong("levelDictId");
if (level == AlarmTypeEnum.LEVEL1.getType()) {
bean.setLevel1Count(1);
}
if (level == AlarmTypeEnum.LEVEL2.getType()) {
bean.setLevel2Count(1);
}
if (level == AlarmTypeEnum.LEVEL3.getType()) {
bean.setLevel3Count(1);
}
bean.setOrgId(orgId.intValue());
bean.setDay(statisticDate);
String code = orgId + "_" + statisticDate;
return new Tuple2<>(code, bean);
}
//分組多列求和 1_20190101, [x,y,z]
).reduceByKey((Function2<AlarmStatisticBean, AlarmStatisticBean, AlarmStatisticBean>) (v1, v2) -> {
//reduceByKey的作用是合并具有相同鍵的值
v1.setLevel1Count(v1.getLevel1Count() + v2.getLevel1Count());
v1.setLevel2Count(v1.getLevel2Count() + v2.getLevel2Count());
v1.setLevel3Count(v1.getLevel3Count() + v2.getLevel3Count());
return v1;
});
logger.info("------------------------->>>>>" + pairs.count());
List<Document> documents = new ArrayList<>();
//類型轉(zhuǎn)換辉饱,以便持久化到DB(mongo或mysql)
for (Tuple2<String, AlarmStatisticBean> tuple2 : pairs.collect()) {
Document document = new Document("day", tuple2._2.getDay())
.append("level1Count", tuple2._2.getLevel1Count())
.append("level2Count", tuple2._2.getLevel2Count())
.append("level3Count", tuple2._2.getLevel3Count())
.append("orgId", tuple2._2.getOrgId());
documents.add(document);
}
MongoManager.saveToMongo(documents, BaseConstant.ALARM_TARGET_TABLE);
}
直接運(yùn)行上述main方法會將AlarmStatisticBean對象保存到MongoDB中
二搬男、spark-submit腳本
提交到集群時(shí),需要注釋掉local
SparkConf conf = new SparkConf()
.setAppName("離線統(tǒng)計(jì)")
.setMaster("spark://" + SparkConfig.SPARK_MASTER_HOST + ":" + SparkConfig.SPARK_MASTER_PORT)
//.setMaster("local")
...
//帶*的是公司名或項(xiàng)目名稱彭沼,不影響閱讀
spark-submit --class com.*.*.meter.alarm.AlarmStatisticService --master spark://master:7077 /opt/middleware/*-alarm-task-1.0-jar-with-dependencies.ja
--master spark:// 表示會使用獨(dú)立模式缔逛,也就是使用spark自帶的獨(dú)立集群管理器。提交時(shí)使用的主機(jī)名和端口精確匹配用戶頁面中的URL,這里建議直接從http://172...6:8080頁面上復(fù)制URL褐奴,避免不必要的麻煩按脚。
三、SparkLauncher提交
SparkLauncher也提供了兩種方式提交任務(wù)
3.1敦冬、launch
SparkLauncher實(shí)際上是根據(jù)JDK自帶的ProcessBuilder構(gòu)造了一個(gè)UNIXProcess子進(jìn)程提交任務(wù)辅搬,提交的形式跟spark-submit一樣。這個(gè)子進(jìn)程會以阻塞的方式等待程序的運(yùn)行結(jié)果脖旱。簡單來看就是拼接spark-submit命令堪遂,并以子進(jìn)程的方式啟動。
代碼中的process.getInputStream()實(shí)際上對應(yīng)linux進(jìn)程的標(biāo)準(zhǔn)輸出stdout
process.getErrorStream()實(shí)際上對應(yīng)linux進(jìn)程的錯(cuò)誤信息stderr
process.getOutputStream()實(shí)際上對應(yīng)linux進(jìn)程的輸入信息stdin
@Scheduled(fixedRate = 5000 * 60)
public void alarmStatistic() {
logger.info("=====>>>>>離線統(tǒng)計(jì)定時(shí)任務(wù)!", System.currentTimeMillis());
try {
HashMap env = new HashMap();
//這兩個(gè)屬性必須設(shè)置
env.put("HADOOP_CONF_DIR", CommonConfig.HADOOP_CONF_DIR);
env.put("JAVA_HOME", CommonConfig.JAVA_HOME);
SparkLauncher handle = new SparkLauncher(env)
.setSparkHome(SparkConfig.SPARK_HOME)
.setAppResource(CommonConfig.ALARM_JAR_PATH)
.setMainClass(CommonConfig.ALARM_JAR_MAIN_CLASS)
.setMaster("spark://" + SparkConfig.SPARK_MASTER_HOST + ":" + SparkConfig.SPARK_MASTER_PORT)
.setDeployMode(SparkConfig.SPARK_DEPLOY_MODE)
.setVerbose(SparkConfig.SPARK_VERBOSE)
.setConf("spark.app.id", CommonConfig.ALARM_APP_ID)
.setConf("spark.driver.memory", SparkConfig.SPARK_DRIVER_MEMORY)
.setConf("spark.rpc.message.maxSize", SparkConfig.SPARK_RPC_MESSAGE_MAXSIZE)
.setConf("spark.executor.memory", SparkConfig.SPARK_EXECUTOR_MEMORY)
.setConf("spark.executor.instances", SparkConfig.SPARK_EXECUTOR_INSTANCES)
.setConf("spark.executor.cores", SparkConfig.SPARK_EXECUTOR_CORES)
.setConf("spark.default.parallelism", SparkConfig.SPARK_DEFAULT_PARALLELISM)
.setConf("spark.driver.allowMultipleContexts", SparkConfig.SPARK_DRIVER_ALLOWMULTIPLECONTEXTS)
.setVerbose(true);
Process process = handle.launch();
InputStreamRunnable inputStream = new InputStreamRunnable(process.getInputStream(), "alarm task input");
ExecutorUtils.getExecutorService().submit(inputStream);
InputStreamRunnable errorStream = new InputStreamRunnable(process.getErrorStream(), "alarm task error");
ExecutorUtils.getExecutorService().submit(errorStream);
logger.info("Waiting for finish...");
int exitCode = process.waitFor();
logger.info("Finished! Exit code:" + exitCode);
} catch (Exception e) {
logger.error("submit spark task error", e);
}
}
運(yùn)行過程示意圖
1:用戶程序啟動(SparkLauncher萌庆,非驅(qū)動程序)時(shí)會在當(dāng)前節(jié)點(diǎn)上啟動一個(gè)SparkSubmit進(jìn)程溶褪,并將驅(qū)動程序(即spark任務(wù))發(fā)送到任意一個(gè)工作節(jié)點(diǎn)上,在工作節(jié)點(diǎn)上啟動DriverWrapper進(jìn)程
2:驅(qū)動程序會從集群管理器(standalone模式下是master服務(wù)器)申請執(zhí)行器資源
3:集群管理器反饋執(zhí)行器資源給驅(qū)動器
4:驅(qū)動器Driver將任務(wù)發(fā)送到執(zhí)行器節(jié)點(diǎn)執(zhí)行
spark首頁監(jiān)控
可以看到啟動的Driver
進(jìn)一步可以查看到執(zhí)行器情況
也可通過服務(wù)器進(jìn)程查看各進(jìn)程之間的關(guān)系
3.2践险、startApplication()方式
@Scheduled(fixedRate = 5000 * 60)
public void alarmStatistic() {
logger.info("=====>>>>>告警離線統(tǒng)計(jì)定時(shí)任務(wù)!", System.currentTimeMillis());
try {
HashMap env = new HashMap();
//這兩個(gè)屬性必須設(shè)置
env.put("HADOOP_CONF_DIR", CommonConfig.HADOOP_CONF_DIR);
env.put("JAVA_HOME", CommonConfig.JAVA_HOME);
CountDownLatch countDownLatch = new CountDownLatch(1);
SparkAppHandle handle = new SparkLauncher(env)
.setSparkHome(SparkConfig.SPARK_HOME)
.setAppResource(CommonConfig.ALARM_JAR_PATH)
.setMainClass(CommonConfig.ALARM_JAR_MAIN_CLASS)
.setMaster("spark://" + SparkConfig.SPARK_MASTER_HOST + ":" + SparkConfig.SPARK_MASTER_PORT)
// .setMaster("yarn")
.setDeployMode(SparkConfig.SPARK_DEPLOY_MODE)
.setVerbose(SparkConfig.SPARK_VERBOSE)
.setConf("spark.app.id", CommonConfig.ALARM_APP_ID)
.setConf("spark.driver.memory", SparkConfig.SPARK_DRIVER_MEMORY)
.setConf("spark.rpc.message.maxSize", SparkConfig.SPARK_RPC_MESSAGE_MAXSIZE)
.setConf("spark.executor.memory", SparkConfig.SPARK_EXECUTOR_MEMORY)
.setConf("spark.executor.instances", SparkConfig.SPARK_EXECUTOR_INSTANCES)
.setConf("spark.executor.cores", SparkConfig.SPARK_EXECUTOR_CORES)
.setConf("spark.default.parallelism", SparkConfig.SPARK_DEFAULT_PARALLELISM)
.setConf("spark.driver.allowMultipleContexts", SparkConfig.SPARK_DRIVER_ALLOWMULTIPLECONTEXTS)
.setVerbose(true).startApplication(new SparkAppHandle.Listener() {
//這里監(jiān)聽任務(wù)狀態(tài)猿妈,當(dāng)任務(wù)結(jié)束時(shí)(不管是什么原因結(jié)束),isFinal()方法會返回true,否則返回false
@Override
public void stateChanged(SparkAppHandle sparkAppHandle) {
if (sparkAppHandle.getState().isFinal()) {
countDownLatch.countDown();
}
System.out.println("state:" + sparkAppHandle.getState().toString());
}
@Override
public void infoChanged(SparkAppHandle sparkAppHandle) {
System.out.println("Info:" + sparkAppHandle.getState().toString());
}
});
logger.info("The task is executing, please wait ....");
//線程等待任務(wù)結(jié)束
countDownLatch.await();
logger.info("The task is finished!");
} catch (Exception e) {
logger.error("submit spark task error", e);
}
}
這種模式下,據(jù)本人親自測試巍虫,只有在yarn工作模式下可以提交成功彭则,在standalone模式下總是提交失敗,如果有人知道的可以留言告訴我
yarn模式需要安裝hadoop集群占遥,提交任務(wù)的流程基本和上面是一樣的俯抖,不同的是集群管理器不在是spark自帶的集群管理器,而是由yarn來管理瓦胎,這也是官方推薦的提交方式蚌成,比較麻煩的就是需要安裝hadoop集群,hadoop的安裝參加另一篇
Hadoop集群搭建
從監(jiān)控頁面可以看到application的執(zhí)行情況