Livy Session 詳解(上)

本文基于 incubator-livy 0.4.0-incubating

Livy Rest Api的介紹中我們可以知道限匣,livy 共有兩種 job午笛,分別是 session 和 batch低散。然而,在源碼實現(xiàn)中狞甚,session 和 batch 都是 Session 的子類,rest api 中的 session 對應源碼中的 InteractivateSession躯喇;rest api 中的 batch 對應源碼中的 BatchSession纺座。在之后關于 livy 的所有文章中,session 或 batch 對應 rest api 中的含義捻艳,InteractivateSessionBatchSessionSession 都對應代碼中的含義驾窟。

session 和 batch 的創(chuàng)建過程也很不相同庆猫,batch 的創(chuàng)建以對應的 spark app 啟動為終點认轨;而 session 除了要啟動相應的 spark app,還要能支持共享 sparkContext 來接受一個個 statements 的提交及運行月培,我將 session 的創(chuàng)建分為兩個大步驟:

  • client 端:運行在 LivyServer 中嘁字,接受 request 直到啟動 spark app(注意,這里雖然叫 client 端杉畜,但是運行在 LivyServer 中的)
  • server 端:session 對應的 spark app driver 的啟動

這篇文章主要講講 client 端 都做了些什么

一:整體流程

create session-livy client side.png

一圖勝千言纪蜒,上圖就是創(chuàng)建一個 session 在 client 端的主要流程,我們將以注釋的方式來說明那些沒那么重要或復雜的流程此叠,而核心的流程都在下文中分小節(jié)進行剖析纯续。

二:啟動 session 對應的 spark app

接下來直搗黃龍,直接到第 (8) 步 ContextLauncher#startDriver 看看 session 對應的 spark app 是如何啟動的灭袁。ContextLauncher#startDriver 可以分為兩個大步驟:

  1. 啟動 spark app
  2. 等待 SparkSubmit 退出

2.2:啟動 spark app

startDriver.png

如上圖猬错,startDriver 無非就是 new 了一個 SparkLauncher 對象,進行了配置茸歧、資源倦炒、mainClass 等設置,然后調用 launch() 方法拿到了 SparkSubmit 進程的 對應的 Process 對象 process软瞎。
可以看到逢唤,session 對應的 spark app 的 mainClass 為 org.apache.livy.rsc.driver.RSCDriverBootstrapper

2.3:等待 SparkSubmit 退出

SparkLauncher#launch() 返回的進程是 SparkSubmit 進程,再返回 process 后涤浇,會 new 一個 ContextLauncher.ChildProcess 對象鳖藕,在過程中會新啟動一個線程來一直等待 SparkSubmit 進程退出,該線程中的邏輯如下:若 SparkSubmit 非正常退出(exitCode != 0)只锭,表示 Spark App 啟動失敗著恩,會拋異常

public void run() {
  try {
    int exitCode = child.waitFor();
    if (exitCode != 0) {
      LOG.warn("Child process exited with code {}.", exitCode);
      fail(new IOException(String.format("Child process exited with code %d.", exitCode)));
    }
  } catch (InterruptedException ie) {
    LOG.warn("Waiting thread interrupted, killing child process.");
    Thread.interrupted();
    child.destroy();
  } catch (Exception e) {
    LOG.warn("Exception while waiting for child process.", e);
  }
}

三:與 driver 建立連接

我們知道,session 最大的特點就是可以共享 SparkContext,讓用戶提交的多個代碼片段都能跑在一個 SparkContext 上页滚,這有兩個好處:

  1. 大大加速任務的啟動速度:我們知道召边,在 yarn 上啟動一個 app 是比較耗時的,一般都需要 20s 左右裹驰;而使用 session隧熙,除了啟動 session 也需要相當?shù)暮臅r外,之后提交的代碼片段都將立即執(zhí)行
  2. 共享 RDD幻林、table:持久化的 RDD贞盯、table 都可以被之后的代碼片段使用,這在不同用戶需要在相同的 RDD沪饺、table 上做計算的場景非常有用

而共享 SparkContext 就需要 client 與 driver 之間建立起連接躏敢,能讓 client 向 driver 發(fā)送代碼片段、查詢運行狀態(tài)整葡、獲取運行結果等

3.1:client 傳遞其 RpcServer 信息給 driver

時序圖中的第 (5) 步:RSCClientFactory#createClient件余,在該調用中創(chuàng)建了一個 org.apache.livy.rsc.rpc.RpcServer(后文簡稱 RpcServer)對象賦值給成員 server。該 server 會在 driver 啟動時被 driver 中的 rpc client 連接并告知 driver 中的 RpcServer 的信息遭居,以便之后 client 端可以通過該信息向 driver 中的 RpcServer 發(fā)起連接及請求啼器。由于 driver 可能被 yarn 調度到任何一個節(jié)點啟動,所以無法由 LivyServer 主動與 driver 建立連接俱萍,而是預先在 client 端建立好 RpcServer 等待 driver 來連接端壳。

另外,RpcServer 與 rpc client 是通過一個由 RpcServer 自身生成的 secret 進行匹配的枪蘑。要能讓 driver 連接到該 RpcServer损谦,還需要知道 LivyServer 的 host 和 port,這這些信息都是通過 conf 傳給 driver 的岳颇,在 ContextLauncher 構造函數(shù)中實現(xiàn):

// 生成 client id
this.clientId = UUID.randomUUID().toString();
// 由server 生成 secret
this.secret = factory.getServer().createSecret();

...

conf.set(LAUNCHER_ADDRESS, factory.getServer().getAddress());
conf.set(LAUNCHER_PORT, factory.getServer().getPort());
conf.set(CLIENT_ID, clientId);
conf.set(CLIENT_SECRET, secret);

這些配置最終也將作為啟動 driver 的 conf 的一部分傳給 driver照捡,這樣 driver 在啟動后就知道 client 中的 RpcServer 的地址和 secret 了

3.2:driver 連接 client 并傳遞其 RpcServer 信息

rscdriver-init.png

該過程在 RSCDriver#initializeServer 中實現(xiàn),是 seesion driver 的初始化步驟

3.3:client 接收 driver rpcServer 地址信息并連接

client 傳遞其 RpcServer 信息給 driver 之前已經為 RSCClientFactory 對象的成員 server: RpcServer 注冊了 client 以及相應 client 成功連接的處理函數(shù):

final RegistrationHandler handler = new RegistrationHandler();
factory.getServer().registerClient(clientId, secret, handler);

這里的 clientId赦役、secret 即 3.1 小節(jié)中傳遞給 driver 的麻敌。Registration 類用來處理 driver 端的 rpcClient 連接到 server 時的處理邏輯,即:

private void handle(ChannelHandlerContext ctx, RemoteDriverAddress msg) {
  ContextInfo info = new ContextInfo(msg.host, msg.port, clientId, secret);
  if (promise.trySuccess(info)) {
    timeout.cancel(true);
  }
}

參數(shù) RemoteDriverAddress msg 即在 3.2 小節(jié)中 driver 中的 rpcClient 發(fā)送給 server 的 driver 中 rpcServer 的地址(包括 address掂摔、host)术羔,之后再結合 clientId、serrect 來構造 ContextInfo info 來觸發(fā) promise.trySuccess(info)乙漓,info 表名了 driver 中 rpcServer 的地址已經發(fā)起連接需要的 clientId级历、secret,這與 3.2 小節(jié)中 driver 中的 rpcServer 注冊的 client 信息相符叭披。

在創(chuàng)建 RSCClient 對象時會在 promise 上 add 相應的 listener寥殖,promise.trySuccess(info) 會觸發(fā) onSuccess(ContextInfo info) 進而調用 connectToContext(info)

Utils.addListener(this.contextInfoPromise, new FutureListener<ContextInfo>() {
  @Override
  public void onSuccess(ContextInfo info) throws Exception {
    connectToContext(info);
    ...
  }

  @Override
  public void onFailure(Throwable error) {
    connectionError(error);
    ...
  }
})

connectToContext(info) 方法中會使用拿到的 driver 端 rpcServer 的連接信息發(fā)起連接得到 driverRpc玩讳,即用于向 driver 端 rpcServer 發(fā)送 rpc 調用的 client,這是 RSCClient 的成員嚼贡,之后 RSCClient 和 driver 之間的通信都通過 driverRpc 來進行熏纯。

四:Session 的創(chuàng)建與初始化

new InteractiveSession and init

在與 driver 建立連接之后,會使用 rscClient粤策、livyConf 等信息來創(chuàng)建 InteractiveSession 對象并進行初始化樟澜,流程如上。初始化過程匯總叮盘,比較關鍵的步驟是將 session 信息存儲到 state store 中以便livy server 掛掉后能進行 recovery秩贰;再就是向 driver 發(fā)送一個空的 PingJob 來確定 driver 的狀態(tài)是否 ok,若 PingJob 成功執(zhí)行柔吼,則說明 driver 狀態(tài) ok毒费,將 session 置為 running 狀態(tài);若出錯或失敗愈魏,則說明 driver 出了一些問題觅玻,則將 session 的狀態(tài)置為 error。

在成功完成 session 的創(chuàng)建及初始化后蝌戒,會將 session 添加到 SessionManager 中進行統(tǒng)一管理串塑。SessionManager 的主要職責包括:

  • 持有所有 sessions
  • 清理過期 session
  • 從 state store 中恢復 sessions
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市北苟,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌打瘪,老刑警劉巖友鼻,帶你破解...
    沈念sama閱讀 206,378評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異闺骚,居然都是意外死亡彩扔,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,356評論 2 382
  • 文/潘曉璐 我一進店門僻爽,熙熙樓的掌柜王于貴愁眉苦臉地迎上來虫碉,“玉大人,你說我怎么就攤上這事胸梆《嘏酰” “怎么了?”我有些...
    開封第一講書人閱讀 152,702評論 0 342
  • 文/不壞的土叔 我叫張陵碰镜,是天一觀的道長兢卵。 經常有香客問我,道長绪颖,這世上最難降的妖魔是什么秽荤? 我笑而不...
    開封第一講書人閱讀 55,259評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮,結果婚禮上窃款,老公的妹妹穿的比我還像新娘课兄。我一直安慰自己,他們只是感情好晨继,可當我...
    茶點故事閱讀 64,263評論 5 371
  • 文/花漫 我一把揭開白布第喳。 她就那樣靜靜地躺著臣缀,像睡著了一般仗处。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上防症,一...
    開封第一講書人閱讀 49,036評論 1 285
  • 那天珠月,我揣著相機與錄音扩淀,去河邊找鬼。 笑死啤挎,一個胖子當著我的面吹牛驻谆,可吹牛的內容都是我干的。 我是一名探鬼主播庆聘,決...
    沈念sama閱讀 38,349評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼胜臊,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了伙判?” 一聲冷哼從身側響起象对,我...
    開封第一講書人閱讀 36,979評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎宴抚,沒想到半個月后勒魔,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經...
    沈念sama閱讀 43,469評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡菇曲,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 35,938評論 2 323
  • 正文 我和宋清朗相戀三年冠绢,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片常潮。...
    茶點故事閱讀 38,059評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡弟胀,死狀恐怖,靈堂內的尸體忽然破棺而出喊式,到底是詐尸還是另有隱情孵户,我是刑警寧澤,帶...
    沈念sama閱讀 33,703評論 4 323
  • 正文 年R本政府宣布垃帅,位于F島的核電站延届,受9級特大地震影響,放射性物質發(fā)生泄漏贸诚。R本人自食惡果不足惜方庭,卻給世界環(huán)境...
    茶點故事閱讀 39,257評論 3 307
  • 文/蒙蒙 一厕吉、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧械念,春花似錦头朱、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,262評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至希停,卻和暖如春烁巫,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背宠能。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評論 1 262
  • 我被黑心中介騙來泰國打工亚隙, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人违崇。 一個月前我還...
    沈念sama閱讀 45,501評論 2 354
  • 正文 我出身青樓阿弃,卻偏偏與公主長得像,于是被迫代替她去往敵國和親羞延。 傳聞我的和親對象是個殘疾皇子渣淳,可洞房花燭夜當晚...
    茶點故事閱讀 42,792評論 2 345

推薦閱讀更多精彩內容