- Apache Kylin with Apache Livy(incubator)
- 前言
- Livy介紹
- 為什么使用Livy
- 當前Spark存在的問題
- Livy優(yōu)勢
- Kylin with Livy
- 引入Livy之前Kyin是如何使用Spark的
- Livy for Kylin詳細解析
- Livy在Kylin中的應(yīng)用
- 引入Livy對kylin的好處
- 如何在Kylin中啟用Livy
- 常見問題
- 總結(jié)
- 參考文章
一天通、前言
為了更方便地向Spark提交仗岸、管理和監(jiān)控任務(wù)甥捺,有些用戶會使用Livy作為Spark的交互接口,在最新的Apache Kylin3.0版本中,Kylin加入了通過Apache Livy遞交Spark任務(wù)的新功能[KYLIN-3795],感謝滴滴靳國衛(wèi)同學的貢獻。
二莉撇、Livy介紹
Apache Livy是一個基于Spark的開源REST 服務(wù),是Apache基金會的一個孵化項目惶傻,它能夠通過 REST 的方式將代碼片段或是序列化的二進制代碼提交到 Spark 集群中去執(zhí)行棍郎。它提供了如下基本功能:
- 提交 Scala、Python 或是 R 代碼片段到遠端的 Spark 集群上執(zhí)行
- 提交 Java达罗、Scala坝撑、Python 所編寫的 Spark 作業(yè)到遠端的 Spark 集群上執(zhí)行
三、為什么使用Livy
1. 當前Spark存在的問題
Spark當前支持兩種交互方式:
- 交互式處理
用戶使用spark-shell或pyspark腳本啟動Spark應(yīng)用程序粮揉,伴隨應(yīng)用程序啟動的同時Spark會在當前終端啟動REPL(Read–Eval–Print Loop)來接收用戶的代碼輸入巡李,并將其編譯成Spark作業(yè) - 批處理
批處理的程序邏輯由用戶實現(xiàn)并編譯打包成jar包,spark-submit腳本啟動Spark應(yīng)用程序來執(zhí)行用戶所編寫的邏輯扶认,與交互式處理不同的是批處理程序在執(zhí)行過程中用戶沒有與Spark進行任何的交互侨拦。
兩種方式都需要用戶登錄到Gateway節(jié)點上通過腳本啟動Spark進程,但是會出現(xiàn)以下問題:
- 增加Gateway節(jié)點的資源使用負擔和故障發(fā)生的可能性
- 同時Gateway節(jié)點的故障會帶來單點問題辐宾,造成Spark程序的失敗狱从。
- 難以管理、審計以及與已有的權(quán)限管理工具的集成叠纹。由于Spark采用腳本的方式啟動應(yīng)用程序季研,因此相比于WEB方式少了許多管理、審計的便利性誉察,同時也難以與已有的工具結(jié)合与涡,如Apache Knox等。
- 將Gateway節(jié)點上的部署細節(jié)以及配置不可避免地暴露給了登陸用戶。
2. Livy優(yōu)勢
一方面接受并解析用戶的REST請求驼卖,轉(zhuǎn)換成相應(yīng)的操作氨肌;另一方面它管理著用戶所啟動的所有的Spark集群。
Livy具有如下功能:
- 通過Livy session實時提交代碼片段與Spark的REPL進行交互
- 通過Livy batch提交Scala, Java, Python編寫的二進制包來提交批處理任務(wù)
- 多用戶能夠使用同一個服務(wù)器(支持用戶模擬)
- 能夠通過REST接口在任何設(shè)備上提交任務(wù)酌畜、查看任務(wù)執(zhí)行狀態(tài)和結(jié)果
四怎囚、Kylin with Livy
1. 引入Livy之前Kyin是如何使用Spark的
Spark是在Kylin v2.0引入的,主要應(yīng)用于Cube構(gòu)建桥胞,構(gòu)建過程介紹可以查看這篇博客
下面是SparkExecutable類的doWork方法關(guān)于提交Spark job的一段代碼,我們可以看到Kylin會從配置中獲取Spark job包的路徑(默認為$KYIN_HOME/lib)恳守,通過本地指令的形式提交Spark job,然后循環(huán)獲取Spark job的執(zhí)行狀態(tài)和結(jié)果埠戳。我們可以看到Kylin單獨開了一個線程在本地向Spark客戶端發(fā)送來job請求并且循環(huán)獲取結(jié)果井誉,額外增加了節(jié)點系統(tǒng)壓力。
@Override
protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
//略...
String jobJar = config.getKylinJobJarPath(); //獲取job jar的路徑
//略...
final String cmd = String.format(Locale.ROOT, stringBuilder.toString(), hadoopConf,KylinConfig.getSparkHome(), jars, jobJar, formatArgs()); //構(gòu)建本地command
//略...
//創(chuàng)建指令執(zhí)行線程
Callable callable = new Callable<Pair<Integer, String>>() {
@Override
public Pair<Integer, String> call() throws Exception {
Pair<Integer, String> result;
try {
result = exec.execute(cmd, patternedLogger);
} catch (Exception e) {
logger.error("error run spark job:", e);
result = new Pair<>(-1, e.getMessage());
}
return result;
}
};
//略...
try {
Future<Pair<Integer, String>> future = executorService.submit(callable);
Pair<Integer, String> result = null;
while (!isDiscarded() && !isPaused()) {
if (future.isDone()) {
result = future.get(); //循環(huán)獲取指令執(zhí)行結(jié)果
break;
} else {
Thread.sleep(5000); //每隔5秒檢查一次job執(zhí)行狀態(tài)
}
}
//略...
} catch (Exception e) {
logger.error("Error run spark job:", e);
return ExecuteResult.createError(e);
}
//略...
}
2. Livy for Kylin詳細解析
Livy向Spark提交job一共有兩種整胃,分別是Session和Batch,Kyin是通過Batch的方式提交job的喳钟,需要提前構(gòu)建好Spark job對應(yīng)的jar包并上傳到HDFS中屁使,并且將配置項
kylin.engine.livy-conf.livy-key.file=hdfs:///path-to-kylin-job-jar加入到kyiln.properties中。
Batch一共具有如下九種狀態(tài):
public enum LivyStateEnum {
starting, running, success, dead, error, not_started, idle, busy, shutting_down;
}
下面是SparkExecutableLivy類的doWork方法和LivyRestExecutor類的execute方法關(guān)于提交Spark job的一段代碼奔则,Kylin通過livyRestBuilder讀取配置文件獲取Spark job的包路徑蛮寂,然后通過restClient向Livy發(fā)送Http請求。在提交job之后會每隔10秒查詢一次job執(zhí)行的結(jié)果易茬,直到j(luò)ob的狀態(tài)變?yōu)閟hutting_down, error, dead, success中的一種酬蹋。每一次都是通過Http的方式發(fā)送請求,相比較于通過本地Spark客戶端提交任務(wù)抽莱,更加穩(wěn)定而且減少了Kylin節(jié)點系統(tǒng)壓力范抓。
@Override
protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
//略...
livyRestBuilder.setLivyTypeEnum(LivyTypeEnum.job);
executor.execute(livyRestBuilder, patternedLogger); //調(diào)用LivyRestExecutor類的execute方法
if (isDiscarded()) {
return new ExecuteResult(ExecuteResult.State.DISCARDED, "Discarded");
}
if (isPaused()) {
return new ExecuteResult(ExecuteResult.State.STOPPED, "Stopped");
}
//略...
}
public void execute(LivyRestBuilder livyRestBuilder, Logger logAppender) {
LivyRestClient restClient = new LivyRestClient();
String result = restClient.livySubmitJobBatches(dataJson); //向Livy發(fā)送http請求
JSONObject resultJson = new JSONObject(result);
String state = resultJson.getString("state"); //得到Livy請求結(jié)果
final String livyTaskId = resultJson.getString("id");
while (!LivyStateEnum.shutting_down.toString().equalsIgnoreCase(state)
&& !LivyStateEnum.error.toString().equalsIgnoreCase(state)
&& !LivyStateEnum.dead.toString().equalsIgnoreCase(state)
&& !LivyStateEnum.success.toString().equalsIgnoreCase(state)) {
String statusResult = restClient.livyGetJobStatusBatches(livyTaskId); //獲取Spark job執(zhí)行狀態(tài)
JSONObject stateJson = new JSONObject(statusResult);
if (!state.equalsIgnoreCase(stateJson.getString("state"))) {
logAppender.log("Livy status Result: " + stateJson.getString("state"));
}
state = stateJson.getString("state");
Thread.sleep(10*1000); //每10秒檢查一次結(jié)果
}
}
3. Livy在Kylin中的應(yīng)用
構(gòu)建Intermediate Flat Hive Table和Redistribute Flat Hive Table原本都是通過Hive客戶端(Cli或Beeline)進行構(gòu)建的,引入Livy之后是Kylin通過Livy來調(diào)用SparkSQL進行構(gòu)建食铐,提高了平表的構(gòu)建速度匕垫。在引入Livy之后,Cube的構(gòu)建主要改變的是以下幾個步驟虐呻,對應(yīng)的任務(wù)日志輸出如下:
-
構(gòu)建Intermediate Flat Hive Table
-
構(gòu)建Redistribute Flat Hive Table
-
使用Spark-Submit的地方都用Livy的Batch API進行替換
-
構(gòu)建Cube
-
轉(zhuǎn)換Cuboid為HFile
-
4. 引入Livy對kylin的好處
- 無需準備Spark的客戶端配置象泵,Kylin部署更加輕量化
- Kylin節(jié)點系統(tǒng)壓力更低,無需在Kylin節(jié)點啟動Spark客戶端
- 構(gòu)建Flat Hive Table更快斟叼,通過Livy可以使用Spark SQL構(gòu)建平表偶惠,而Spark SQL要快于Hive
- 提交job更快,job狀態(tài)獲取更方便
5. 如何在Kylin中啟用Livy
在Kylin啟用Livy前朗涩,請先確保Livy能夠正常工作
-
在Kylin.properties中忽孽,加入如下配置,并重啟使之生效
//此處為CDH5.7環(huán)境下的配置 kylin.engine.livy-conf.livy-enabled=true kylin.engine.livy-conf.livy-url=http://cdh-client:8998 kylin.engine.livy-conf.livy-key.file=hdfs:///path/kylin-job-3.0.0-SNAPSHOT.jar //請根據(jù)個人環(huán)境替換對應(yīng)版本的包 kylin.engine.livy-conf.livy-arr.jars=hdfs:///path/hbase-client-1.2.0-cdh5.7.5.jar,hdfs:///path/hbase-common-1.2.0-cdh5.7.5.jar,hdfs:///path/hbase-hadoop-compat-1.2.0-cdh5.7.5.jar,hdfs:///path/hbase-hadoop2-compat-1.2.0-cdh5.7.5.jar,hdfs:///path/hbase-server-1.2.0-cdh5.7.5.jar,hdfs:///path/htrace-core-3.2.0-incubating.jar,hdfs:///path/metrics-core-2.2.0.jar
其中l(wèi)ivy-key.file和livy-arr.jars地址之間不要有空格,否則可能會出不可預知的錯誤扒腕。
Cube構(gòu)建引擎選用Spark
五绢淀、常見問題
以下問題往往為使用不當和配置錯誤的原因,非Kylin本身存在的問題瘾腰,此處僅為友情提示皆的。
-
Table or view not found
輸出日志:Exception in thread "main" org.apache.spark.sql.AnalysisException: Table or view not found: `DEFAULT`.`KYLIN_SALES`; line 21 pos 6;
解決方法:
//將hive-site.xml拷貝到spark的配置文件目錄中 ln -s /etc/hive/conf/hive-site.xml $SPARK_CONF_DIR
-
livy request 400 error
解決方法://kylin.properties Livy配置項jar包地址之間不要留空格 //此處為CDH5.7環(huán)境下的依賴包,請根據(jù)個人環(huán)境替換對應(yīng)版本的包 kylin.engine.livy-conf.livy-arr.jars=hdfs:///path/hbase-client-1.2.0-cdh5.7.5.jar,hdfs:///path/hbase-common-1.2.0-cdh5.7.5.jar,hdfs:///path/hbase-hadoop-compat-1.2.0-cdh5.7.5.jar,hdfs:///path/hbase-hadoop2-compat-1.2.0-cdh5.7.5.jar,hdfs:///path/
-
NoClassDefFoundError
輸出日志:NoClassDefFoundError: org/apache/hadoop/hbase/protobuf/generated/HFileProtos
解決方法:
find /opt -type f -name "hbase-protocol*.jar" cp /path/to/hbase-protocol-1.2.0-cdh5.7.5.jar $SPARK_HOME/jars
-
livy sql執(zhí)行錯誤
解決方法://kylin.properties中添加如下配置 kylin.source.hive.quote-enabled=false
六蹋盆、總結(jié)
Livy本質(zhì)上是在Spark上的REST服務(wù)费薄,對于Kylin cube的構(gòu)建沒有本質(zhì)上的性能提升,但是通過引入Livy栖雾,Kyin能夠直接通過Spark SQL代替Hive構(gòu)建Flat Table楞抡,而且管理Spark job也更加方便。但是Livy當前也存在一些問題析藕,比如使用較低或較高版本的Spark無法正常工作以及單點故障等問題召廷,用戶可以考慮自身的實際場景選擇是否需要在Kylin中使用Livy。