本文基于 incubator-livy 0.4.0-incubating
Livy Session 詳解(上) - 簡書 一文主要介紹了 session 整體的啟動(dòng)流程并詳細(xì)分析了 client 端(livy server 端)是如何啟動(dòng) driver 以及建立連接的睛竣。本文將進(jìn)一步分析 session server 端(即 driver 內(nèi)部)是如何啟動(dòng)淫痰、初始化的以及執(zhí)行代碼片段的凶伙。
注:如果對(duì) livy 的整體架構(gòu)以及 session client 端不了解直焙,請(qǐng)先閱讀以下兩篇相關(guān)文章:
一、整體啟動(dòng)撞反、初始化流程
如上圖所示宰睡,driver 內(nèi)部的啟動(dòng)流程可以分為以下五個(gè)步驟:
- 創(chuàng)建 ReplDriver 實(shí)例
- 初始化 server
- 初始化 SparkContext
- 創(chuàng)建 JobContextImpl 實(shí)例并執(zhí)行 jobs
- 等待退出
1.1、創(chuàng)建 ReplDriver 實(shí)例
ReplDriver 是 InteractiveSession 對(duì)應(yīng)的 Spark App driver熟妓,用來接收 livy server 的各種請(qǐng)求并進(jìn)行處理。也是 RSCDriver 的子類栏尚,RSCDriver:
- 持有等待 RSCClient 進(jìn)行連接的
RpcServer server
- 初始化 SparkContext
- 處理各種請(qǐng)求:CancelJob起愈、EndSession、JobRequest译仗、BypassJobRequest抬虽、SyncJobRequest、GetBypassJobStatus
- 處理 add file 請(qǐng)求
除了能處理 RSCDriver 支持的請(qǐng)求外纵菌,ReplDriver 還能處理:BaseProtocol.ReplJobRequest阐污、BaseProtocol.CancelReplJobRequest、BaseProtocol.GetReplJobResults 請(qǐng)求咱圆,這些請(qǐng)求對(duì)應(yīng)的是序列化的 job (GitHub - cloudera/livy: Livy is an open source REST interface for interacting with Apache Spark from anywhere)相關(guān)的請(qǐng)求笛辟。
1.2、初始化 server
這一步在 RSCDriver#initializeServer()
中調(diào)用序苏,用于連接 client 并告知 server 端 rpc 地址隘膘,client 獲知 server rpc 地址后會(huì)進(jìn)行連接并發(fā)送請(qǐng)求。
1.3杠览、初始化 SparkContext
1.3.1、創(chuàng)建解釋器
會(huì)根據(jù)不同的 kind 創(chuàng)建不同類型的解釋器纵势,kind 在創(chuàng)建 session 的 request body 中指定踱阿。這些解釋器有繼承共同的 treat Interpreter,其類圖如下:
其中的 execute 方法用來執(zhí)行代碼片段:
- pyspark 類型的解釋器用于執(zhí)行 python钦铁、pyspark 代碼片段
- pyspark3類型的解釋器用于執(zhí)行 python3软舌、 python3 spark 代碼片段
- spark 類型的解釋器用于執(zhí)行 scala、scala spark 代碼片段
- sparks 類型的解釋器用于執(zhí)行 r牛曹、r spark 代碼片段
1.3.2佛点、創(chuàng)建 repl/Session
repl/Session(用于和 sessions/Session
進(jìn)行區(qū)分,后文簡稱 Session)是 server 端中至關(guān)重要的類黎比。主要職責(zé)是:
- 啟動(dòng) interpreter超营,并獲取 SparkContext
- 持有線程池來異步執(zhí)行 statements(通過 interpreter 來執(zhí)行)
- 持有線程池來異步取消 statements
- 管理一個(gè) session 下所有的 statements
在構(gòu)造 Session 的過程中,會(huì)初始化用于執(zhí)行 statement 的 interpreterExecutor阅虫,如下:
private val interpreterExecutor = ExecutionContext.fromExecutorService(
Executors.newSingleThreadExecutor())
可以看到演闭,這個(gè)線程只有一個(gè)線程,也就是說在一個(gè) Session 中的 statement 是串行的颓帝,一個(gè) statement 執(zhí)行完才會(huì)執(zhí)行下一個(gè)米碰。這種串行的方式有明顯的弊端窝革,即當(dāng) Session 的資源足以執(zhí)行多個(gè) statement 時(shí),也只能一個(gè)接著一個(gè)執(zhí)行吕座,這既浪費(fèi)了資源虐译,有延長了任務(wù)運(yùn)行的整體時(shí)間。那為什么還要這么做呢吴趴?主要是因?yàn)槟壳?livy 中的一個(gè) Session 僅包含一個(gè) interpreter漆诽,如果一個(gè) interpreter 同時(shí)執(zhí)行多段代碼片段,很容易會(huì)出現(xiàn)穿插執(zhí)行的錯(cuò)誤史侣。要解決這一困境的思路主要有兩個(gè):
- 不使用 interpreter 來執(zhí)行代碼片段
- 一個(gè) Session 包含多個(gè) interpreter拴泌,每個(gè) interpreter 同一時(shí)間也只執(zhí)行一個(gè) statement
目前,我們正在做這方面的工作惊橱,等完工之后可以再進(jìn)一步說明下蚪腐。
1.3.3、啟動(dòng) Session
主要是調(diào)用 interpreter#start
税朴,該啟動(dòng)也是提交到 interpreterExecutor 中執(zhí)行的回季,在啟動(dòng)后就會(huì)將 Session 的 state 修改為 idle。我們來看看 Spark 類型的 Session 的 interpreter 啟動(dòng)過程:
SparkInterpreter#start()
以上正林,就是 Session server 端的詳細(xì)的啟動(dòng)過程泡一,下一篇我們將看看代碼片段是怎么執(zhí)行的。