How to Use Incremental Result Collection with Kyuubi

Kyuubi

1. Introductions

Kyuubi is an enhanced edition of the Apache Spark's primordial Thrift JDBC/ODBC Server. It is mainly designed for directly running SQL towards a cluster with all components including HDFS, YARN, Hive MetaStore, and itself secured.
Recently, Kyuubi support incrementally receive partial result data from executor side. The main purpose of this feature is to reduce OutOfMemoryError risks of the Kyuubi Server itself. Kyuubi itself will somehow much more risky than Spark Thrift Server for the reason of multi SparkContext support, so it is important to add this function.

2. Configurations

Name Default Description
spark.kyuubi.operation.incremental.collect false Whether to use incremental result collection from Spark executor side to Kyuubi server side.

As is shown in the above table, there is only one configuration to enable this feature which is disabled by default.

3. How to Configue

3.1 Server level scope

spark.kyuubi.operation.incremental.collect is a Kyuubi type configuration which can simply treated as a Spark one see the differences, which means that

  1. it can be set via --conf spark.kyuubi.operation.incremental.collect=true with $KYUUBI_HOME/bin/start-kyuubi.sh script.
$KYUUBI_HOME/bin/start-kyuubi.sh \
    --master yarn \
    --deploy-mode client \
    --driver-memory 10g \
    --conf spark.kyuubi.operation.incremental.collect=true
  1. it also can be put in the properties file(spark-defaults.conf) used by the spark launched Kyuubi, which always can be found in the $SPARK_HOME/conf directory.

The way how we configure Kyuubi as above means this configuration will be spread server side to affect all KyuubiSession s a.k.a HiveConnection s.

3.2 Session level scope

Kyuubi also treats it as an session level configuration, so we can change it inside session without affecting others. This makes Kyuubi more flexible. We will introduce it in the following.

Let's say we already have a startup Kyuubi Server. We use beeline cli to connect and test.

~/data/apache-spark/spark-2.1.2-bin-hadoop2.7$ bin/beeline -u "jdbc:hive2://kyuubi.server.163.org:10009/;principal=hive/kyuubi.server.163.org@SERVER.163.ORG;hive.server2.proxy.user=hzyaoqin#spark.yarn.queue=default;spark.sql.haha=hehe;spark.scheduler.mode=FAIR;spark.kyuubi.operation.incremental.collect=true"
Connecting to jdbc:hive2://kyuubi.server.163.org10009/;principal=hive/kyuubi.server.163.org@SERVER.163.ORG;hive.server2.proxy.user=hzyaoqin#spark.yarn.queue=default;spark.sql.haha=hehe;spark.scheduler.mode=FAIR;spark.kyuubi.operation.incremental.collect=true
18/05/25 14:49:08 INFO Utils: Supplied authorities: kyuubi.server.163.org:10009
18/05/25 14:49:08 INFO Utils: Resolved authority:kyuubi.server.163.org:10009
18/05/25 14:49:08 INFO HiveConnection: Will try to open client transport with JDBC Uri: jdbc:hive2://kyuubi.server.163.org:10009/;principal=hive/kyuubi.server.163.org@SERVER.163.ORG;hive.server2.proxy.user=hzyaoqin#spark.yarn.queue=default;spark.sql.haha=hehe;spark.scheduler.mode=FAIR;spark.kyuubi.operation.incremental.collect=true
Connected to: Spark SQL (version 2.1.2)
Driver: Hive JDBC (version 1.2.1.spark2)
Transaction isolation: TRANSACTION_REPEATABLE_READ
Beeline version 1.2.1.spark2 by Apache Hive

Like other spark/kyuubi configuration, we can simplely put it in the connection string. And let's test with show tables statement.

0: jdbc:hive2://kyuubi.server.163.org:> show tables;
18/05/25 14:49:54 INFO KyuubiOperation: Running query 'show tables' with 3a1f0ca4-8495-4e23-aa1e-bffb202b49a5
18/05/25 14:49:54 INFO SparkSqlParser: Parsing command: show tables
18/05/25 14:49:54 INFO KyuubiOperation: Executing query in incremental collection mode
18/05/25 14:49:54 INFO DAGScheduler: Asked to cancel job group 3a1f0ca4-8495-4e23-aa1e-bffb202b49a5
+-----------+-----------------+--------------+--+
| database  |    tableName    | isTemporary  |
+-----------+-----------------+--------------+--+
| default   | src             | false        |
| default   | src2            | false        |
| default   | src3            | false        |
| default   | src_parquet_30  | false        |
+-----------+-----------------+--------------+--+
4 rows selected (0.835 seconds)

"KyuubiOperation: Executing query in incremental collection mode" tells us that we collect results incrementally...

And than, let's test select * from src2 statement.

0: jdbc:hive2://kyuubi.server.163.org:> select * from src2;
18/05/25 14:50:00 INFO KyuubiOperation: Running query 'select * from src2' with ca9f76f5-72f7-4c22-86cd-f31c892070c8
18/05/25 14:50:00 INFO SparkSqlParser: Parsing command: select * from src2
18/05/25 14:50:01 INFO CatalystSqlParser: Parsing command: int
18/05/25 14:50:01 INFO CatalystSqlParser: Parsing command: string
18/05/25 14:50:01 INFO deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
18/05/25 14:50:01 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 339.6 KB, free 15.8 GB)
18/05/25 14:50:01 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 31.2 KB, free 15.8 GB)
18/05/25 14:50:01 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.201.168.184:25356 (size: 31.2 KB, free: 15.8 GB)
18/05/25 14:50:01 INFO SparkContext: Created broadcast 0 from toString at KyuubiOperation.scala:319
18/05/25 14:50:01 INFO KyuubiOperation: Executing query in incremental collection mode
18/05/25 14:50:02 INFO GPLNativeCodeLoader: Loaded native gpl library
18/05/25 14:50:02 WARN LzoCompressor: java.lang.UnsatisfiedLinkError: Cannot load liblzo2.so.2 (liblzo2.so.2: cannot open shared object file: No such file or directory)!
18/05/25 14:50:02 ERROR LzoCodec: Failed to load/initialize native-lzo library
18/05/25 14:50:02 INFO FileInputFormat: Total input paths to process : 1
18/05/25 14:50:02 INFO DAGScheduler: Asked to cancel job group ca9f76f5-72f7-4c22-86cd-f31c892070c8
+------+----------+--+
| key  |  value   |
+------+----------+--+
| 238  | val_238  |
| 86   | val_86   |
497 lines are omitted here......
| 97   | val_97   |
+------+----------+--+
500 rows selected (4.938 seconds)
18/05/25 14:50:02 INFO DAGScheduler: Got job 0 (toSeq at KyuubiOperation.scala:233) with 1 output partitions
18/05/25 14:50:02 INFO DAGScheduler: Final stage: ResultStage 0 (toSeq at KyuubiOperation.scala:233)
18/05/25 14:50:02 INFO DAGScheduler: Parents of final stage: List()
18/05/25 14:50:02 INFO DAGScheduler: Missing parents: List()
18/05/25 14:50:02 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[6] at toLocalIterator at KyuubiOperation.scala:324), which has no missing parents
18/05/25 14:50:02 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 8.1 KB, free 15.8 GB)
18/05/25 14:50:02 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 4.4 KB, free 15.8 GB)
18/05/25 14:50:02 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 10.201.168.184:25356 (size: 4.4 KB, free: 15.8 GB)
18/05/25 14:50:02 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:996
18/05/25 14:50:02 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[6] at toLocalIterator at KyuubiOperation.scala:324)
18/05/25 14:50:02 INFO YarnScheduler: Adding task set 0.0 with 1 tasks
18/05/25 14:50:02 INFO FairSchedulableBuilder: Added task set TaskSet_0.0 tasks to pool default
18/05/25 14:50:02 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, hadoop3020.jd.163.org, executor 1, partition 0, NODE_LOCAL, 6005 bytes)
18/05/25 14:50:02 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on hadoop3020.jd.163.org:57229 (size: 4.4 KB, free: 10.5 GB)
18/05/25 14:50:03 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on hadoop3020.jd.163.org:57229 (size: 31.2 KB, free: 10.5 GB)
18/05/25 14:50:05 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 3396 ms on hadoop3020.jd.163.org (executor 1) (1/1)
18/05/25 14:50:05 INFO YarnScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool default
18/05/25 14:50:05 INFO DAGScheduler: ResultStage 0 (toSeq at KyuubiOperation.scala:233) finished in 3.401 s
18/05/25 14:50:05 INFO DAGScheduler: Got job 1 (indices at ColumnBasedSet.scala:60) with 1 output partitions
18/05/25 14:50:05 INFO DAGScheduler: Final stage: ResultStage 1 (indices at ColumnBasedSet.scala:60)
18/05/25 14:50:05 INFO DAGScheduler: Parents of final stage: List()
18/05/25 14:50:05 INFO DAGScheduler: Missing parents: List()
18/05/25 14:50:05 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[6] at toLocalIterator at KyuubiOperation.scala:324), which has no missing parents
18/05/25 14:50:05 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 8.1 KB, free 15.8 GB)
18/05/25 14:50:05 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 4.4 KB, free 15.8 GB)
18/05/25 14:50:05 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.201.168.184:25356 (size: 4.4 KB, free: 15.8 GB)
18/05/25 14:50:05 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:996
18/05/25 14:50:05 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[6] at toLocalIterator at KyuubiOperation.scala:324)
18/05/25 14:50:05 INFO YarnScheduler: Adding task set 1.0 with 1 tasks
18/05/25 14:50:05 INFO FairSchedulableBuilder: Added task set TaskSet_1.0 tasks to pool default
18/05/25 14:50:05 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, hadoop3020.jd.163.org, executor 1, partition 1, NODE_LOCAL, 6005 bytes)
18/05/25 14:50:05 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on hadoop3020.jd.163.org:57229 (size: 4.4 KB, free: 10.5 GB)
18/05/25 14:50:05 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 98 ms on hadoop3020.jd.163.org (executor 1) (1/1)
18/05/25 14:50:05 INFO YarnScheduler: Removed TaskSet 1.0, whose tasks have all completed, from pool default
18/05/25 14:50:05 INFO DAGScheduler: ResultStage 1 (indices at ColumnBasedSet.scala:60) finished in 0.099 s

"KyuubiOperation: Executing query in incremental collection mode" shows..

3.3 By set command

As a runtime configuration, it can be set by SetCommand as following,

set spark.kyuubi.operation.incremental.collect=false;

Let's test this mode in the same beeline client.

0: jdbc:hive2://hzadg-jenkins.server.163.org:> set spark.kyuubi.operation.incremental.collect=false;
18/05/25 15:58:50 INFO KyuubiOperation: Running query 'set spark.kyuubi.operation.incremental.collect=false' with b63966a7-731c-48d6-9b99-f1a738232bb5
18/05/25 15:58:50 INFO SparkSqlParser: Parsing command: set spark.kyuubi.operation.incremental.collect=false
18/05/25 15:58:50 INFO KyuubiOperation: Executing query in incremental collection mode
18/05/25 15:58:50 INFO DAGScheduler: Asked to cancel job group b63966a7-731c-48d6-9b99-f1a738232bb5
+---------------------------------------------+--------+--+
|                     key                     | value  |
+---------------------------------------------+--------+--+
| spark.kyuubi.operation.incremental.collect  | false  |
+---------------------------------------------+--------+--+

As the logs shown above, we still go the incremental way. And than execute select * from src2.

0: jdbc:hive2://kyuubi.server.163.org:> select * from src2;
18/05/25 15:58:54 INFO KyuubiOperation: Running query 'select * from src2' with 01e00534-c927-4a45-be3b-9c0491897322
18/05/25 15:58:54 INFO SparkSqlParser: Parsing command: select * from src2
18/05/25 15:58:54 INFO CatalystSqlParser: Parsing command: int
18/05/25 15:58:54 INFO CatalystSqlParser: Parsing command: string
18/05/25 15:58:54 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 339.6 KB, free 15.8 GB)
18/05/25 15:58:54 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 31.2 KB, free 15.8 GB)
18/05/25 15:58:54 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 10.201.168.184:25356 (size: 31.2 KB, free: 15.8 GB)
18/05/25 15:58:54 INFO SparkContext: Created broadcast 3 from toString at KyuubiOperation.scala:319
18/05/25 15:58:54 INFO FileInputFormat: Total input paths to process : 1
18/05/25 15:58:54 INFO SparkContext: Starting job: collect at KyuubiOperation.scala:326
18/05/25 15:58:54 INFO DAGScheduler: Got job 2 (collect at KyuubiOperation.scala:326) with 2 output partitions
18/05/25 15:58:54 INFO DAGScheduler: Final stage: ResultStage 2 (collect at KyuubiOperation.scala:326)
18/05/25 15:58:54 INFO DAGScheduler: Parents of final stage: List()
18/05/25 15:58:54 INFO DAGScheduler: Missing parents: List()
18/05/25 15:58:54 INFO DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[12] at collect at KyuubiOperation.scala:326), which has no missing parents
18/05/25 15:58:54 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 8.0 KB, free 15.8 GB)
18/05/25 15:58:54 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 4.4 KB, free 15.8 GB)
18/05/25 15:58:54 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on 10.201.168.184:25356 (size: 4.4 KB, free: 15.8 GB)
18/05/25 15:58:54 INFO SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:996
18/05/25 15:58:54 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 2 (MapPartitionsRDD[12] at collect at KyuubiOperation.scala:326)
18/05/25 15:58:54 INFO YarnScheduler: Adding task set 2.0 with 2 tasks
18/05/25 15:58:54 INFO FairSchedulableBuilder: Added task set TaskSet_2.0 tasks to pool default
18/05/25 15:58:54 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2, hadoop3020.jd.163.org, executor 1, partition 0, NODE_LOCAL, 6261 bytes)
18/05/25 15:58:54 INFO TaskSetManager: Starting task 1.0 in stage 2.0 (TID 3, hadoop3020.jd.163.org, executor 1, partition 1, NODE_LOCAL, 6261 bytes)
18/05/25 15:58:54 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on hadoop3020.jd.163.org:57229 (size: 4.4 KB, free: 10.5 GB)
18/05/25 15:58:54 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on hadoop3020.jd.163.org:57229 (size: 31.2 KB, free: 10.5 GB)
18/05/25 15:58:54 INFO TaskSetManager: Finished task 1.0 in stage 2.0 (TID 3) in 219 ms on hadoop3020.jd.163.org (executor 1) (1/2)
18/05/25 15:58:54 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2) in 223 ms on hadoop3020.jd.163.org (executor 1) (2/2)
18/05/25 15:58:54 INFO YarnScheduler: Removed TaskSet 2.0, whose tasks have all completed, from pool default
18/05/25 15:58:54 INFO DAGScheduler: ResultStage 2 (collect at KyuubiOperation.scala:326) finished in 0.219 s
18/05/25 15:58:54 INFO DAGScheduler: Job 2 finished: collect at KyuubiOperation.scala:326, took 0.240928 s
18/05/25 15:58:54 INFO DAGScheduler: Asked to cancel job group 01e00534-c927-4a45-be3b-9c0491897322
+------+----------+--+
| key  |  value   |
+------+----------+--+
| 238  | val_238  |
| 86   | val_86   |
497 lines are omitted here......
| 97   | val_97   |
+------+----------+--+
500 rows selected (0.553 seconds)

"KyuubiOperation: Executing query in incremental collection mode" has gone away. we set back to collect data as a whole part.

Conclusions

Kyuubi now supports incremental result collection in three ways. HOPE this feature helps all of you in Spark SQL productization.

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市抄邀,隨后出現的幾起案子,更是在濱河造成了極大的恐慌,老刑警劉巖吟税,帶你破解...
    沈念sama閱讀 219,270評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件离咐,死亡現場離奇詭異,居然都是意外死亡喷面,警方通過查閱死者的電腦和手機偏灿,發(fā)現死者居然都...
    沈念sama閱讀 93,489評論 3 395
  • 文/潘曉璐 我一進店門丹诀,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人翁垂,你說我怎么就攤上這事铆遭。” “怎么了沿猜?”我有些...
    開封第一講書人閱讀 165,630評論 0 356
  • 文/不壞的土叔 我叫張陵枚荣,是天一觀的道長。 經常有香客問我邢疙,道長棍弄,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,906評論 1 295
  • 正文 為了忘掉前任疟游,我火速辦了婚禮,結果婚禮上痕支,老公的妹妹穿的比我還像新娘颁虐。我一直安慰自己,他們只是感情好卧须,可當我...
    茶點故事閱讀 67,928評論 6 392
  • 文/花漫 我一把揭開白布另绩。 她就那樣靜靜地躺著,像睡著了一般花嘶。 火紅的嫁衣襯著肌膚如雪笋籽。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,718評論 1 305
  • 那天椭员,我揣著相機與錄音车海,去河邊找鬼。 笑死隘击,一個胖子當著我的面吹牛侍芝,可吹牛的內容都是我干的。 我是一名探鬼主播埋同,決...
    沈念sama閱讀 40,442評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼州叠,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了凶赁?” 一聲冷哼從身側響起咧栗,我...
    開封第一講書人閱讀 39,345評論 0 276
  • 序言:老撾萬榮一對情侶失蹤逆甜,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后致板,有當地人在樹林里發(fā)現了一具尸體忆绰,經...
    沈念sama閱讀 45,802評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,984評論 3 337
  • 正文 我和宋清朗相戀三年可岂,在試婚紗的時候發(fā)現自己被綠了错敢。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,117評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡缕粹,死狀恐怖稚茅,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情平斩,我是刑警寧澤亚享,帶...
    沈念sama閱讀 35,810評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站绘面,受9級特大地震影響欺税,放射性物質發(fā)生泄漏。R本人自食惡果不足惜揭璃,卻給世界環(huán)境...
    茶點故事閱讀 41,462評論 3 331
  • 文/蒙蒙 一晚凿、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧瘦馍,春花似錦歼秽、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,011評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至院崇,卻和暖如春肆氓,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背底瓣。 一陣腳步聲響...
    開封第一講書人閱讀 33,139評論 1 272
  • 我被黑心中介騙來泰國打工谢揪, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人濒持。 一個月前我還...
    沈念sama閱讀 48,377評論 3 373
  • 正文 我出身青樓键耕,卻偏偏與公主長得像,于是被迫代替她去往敵國和親柑营。 傳聞我的和親對象是個殘疾皇子屈雄,可洞房花燭夜當晚...
    茶點故事閱讀 45,060評論 2 355

推薦閱讀更多精彩內容

  • 是什么讓我們選擇放棄,直到分手之后的現在我也不知道官套。分手發(fā)生在一個半月以前酒奶,按理說到現在無法緩解或者說理解...
    小Q又回來了閱讀 434評論 0 1
  • 動畫的原理就是每隔一段時間改變畫面蚁孔,這個時間小到眼睛無法識別,所以看起來就像是畫面在動惋嚎。 DOM 動畫也是一樣的杠氢,...
    ltaoo閱讀 714評論 0 0
  • 中午已經近一點了,孩子還沒有回來另伍,焦急的等待中鼻百。正常情況下如不倒車孩子12點就應該到了,或許是倒車了我安慰自己摆尝,站...
    陽光1216閱讀 31評論 0 0
  • 不知不覺温艇,從1月1日開始進入鐘敏嘉老師的夢想群已經90天了,現在回想起來堕汞,當初是抱著試試的心態(tài)進入群的勺爱,沒想...
    gdcandy閱讀 385評論 1 3