Spark原理簡(jiǎn)述


Spark 學(xué)習(xí): spark 原理簡(jiǎn)述與 shuffle 過程介紹

簡(jiǎn)述總結(jié)

Spark 是使用 scala 實(shí)現(xiàn)的基于內(nèi)存計(jì)算的大數(shù)據(jù)開源集群計(jì)算環(huán)境.提供了 java,scala, python,R 等語言的調(diào)用接口仇穗。

1 引言

1.1 Hadoop 和 Spark 的關(guān)系

   Google 在 2003 年和 2004 年先后發(fā)表了 Google 文件系統(tǒng) GFS 和 MapReduce 編程模型兩篇文章,. 基于這兩篇開源文檔,06 年 Nutch 項(xiàng)目子項(xiàng)目之一的 Hadoop 實(shí)現(xiàn)了兩個(gè)強(qiáng)有力的開源產(chǎn)品:HDFS 和 MapReduce. Hadoop 成為了典型的大數(shù)據(jù)批量處理架構(gòu),由 HDFS 負(fù)責(zé)靜態(tài)數(shù)據(jù)的存儲(chǔ),并通過 MapReduce 將計(jì)算邏輯分配到各數(shù)據(jù)節(jié)點(diǎn)進(jìn)行數(shù)據(jù)計(jì)算和價(jià)值發(fā)現(xiàn).之后以 HDFS 和 MapReduce 為基礎(chǔ)建立了很多項(xiàng)目,形成了 Hadoop 生態(tài)圈.

而 Spark 則是UC Berkeley AMP lab (加州大學(xué)伯克利分校AMP實(shí)驗(yàn)室)所開源的類Hadoop MapReduce的通用并行框架, 專門用于大數(shù)據(jù)量下的迭代式計(jì)算.是為了跟?Hadoop 配合而開發(fā)出來的,不是為了取代 Hadoop, Spark 運(yùn)算比 Hadoop 的 MapReduce 框架快的原因是因?yàn)?Hadoop 在一次 MapReduce 運(yùn)算之后,會(huì)將數(shù)據(jù)的運(yùn)算結(jié)果從內(nèi)存寫入到磁盤中,第二次 Mapredue 運(yùn)算時(shí)在從磁盤中讀取數(shù)據(jù),所以其瓶頸在2次運(yùn)算間的多余 IO 消耗. Spark 則是將數(shù)據(jù)一直緩存在內(nèi)存中,直到計(jì)算得到最后的結(jié)果,再將結(jié)果寫入到磁盤,所以多次運(yùn)算的情況下, Spark 是比較快的. 其優(yōu)化了迭代式工作負(fù)載1.?

具體區(qū)別如下:?


伯克利大學(xué)將 Spark 的整個(gè)生態(tài)系統(tǒng)成為 伯克利數(shù)據(jù)分析棧(BDAS),在核心框架 Spark 的基礎(chǔ)上,主要提供四個(gè)范疇的計(jì)算框架:?

Spark SQL: 提供了類 SQL 的查詢,返回 Spark-DataFrame 的數(shù)據(jù)結(jié)構(gòu)

Spark Streaming: 流式計(jì)算,主要用于處理線上實(shí)時(shí)時(shí)序數(shù)據(jù)

MLlib: 提供機(jī)器學(xué)習(xí)的各種模型和調(diào)優(yōu)

GraphX: 提供基于圖的算法,如 PageRank

關(guān)于四個(gè)模塊更詳細(xì)的可以參見2這篇博文. 后面介紹的內(nèi)容主要是關(guān)于 MLlib 模塊方面的.?


Spark 的主要特點(diǎn)還包括:

(1)提供 Cache 機(jī)制來支持需要反復(fù)迭代計(jì)算或者多次數(shù)據(jù)共享,減少數(shù)據(jù)讀取的 IO 開銷;

(2)提供了一套支持 DAG 圖的分布式并行計(jì)算的編程框架,減少多次計(jì)算之間中間結(jié)果寫到 Hdfs 的開銷;

(3)使用多線程池模型減少 Task 啟動(dòng)開稍, shuffle 過程中避免不必要的 sort 操作并減少磁盤 IO 操作翎猛。(Hadoop 的 Map 和 reduce 之間的 shuffle 需要 sort)

2 Spark 系統(tǒng)架構(gòu)

首先明確相關(guān)術(shù)語3:

應(yīng)用程序(Application): 基于Spark的用戶程序等缀,包含了一個(gè)Driver Program 和集群中多個(gè)的Executor;

驅(qū)動(dòng)(Driver): 運(yùn)行Application的main()函數(shù)并且創(chuàng)建SparkContext;

執(zhí)行單元(Executor): 是為某Application運(yùn)行在Worker Node上的一個(gè)進(jìn)程掸绞,該進(jìn)程負(fù)責(zé)運(yùn)行Task髓废,并且負(fù)責(zé)將數(shù)據(jù)存在內(nèi)存或者磁盤上震缭,每個(gè)Application都有各自獨(dú)立的Executors;

集群管理程序(Cluster Manager): 在集群上獲取資源的外部服務(wù)(例如:Local嘶窄、Standalone、Mesos或Yarn等集群管理系統(tǒng))脊僚;

操作(Operation): 作用于RDD的各種操作分為Transformation和Action.

整個(gè) Spark 集群中,分為 Master 節(jié)點(diǎn)與 worker 節(jié)點(diǎn),,其中 Master 節(jié)點(diǎn)上常駐 Master 守護(hù)進(jìn)程和 Driver 進(jìn)程, Master 負(fù)責(zé)將串行任務(wù)變成可并行執(zhí)行的任務(wù)集Tasks, 同時(shí)還負(fù)責(zé)出錯(cuò)問題處理等,而 Worker 節(jié)點(diǎn)上常駐 Worker 守護(hù)進(jìn)程, Master 節(jié)點(diǎn)與 Worker 節(jié)點(diǎn)分工不同, Master 負(fù)載管理全部的 Worker 節(jié)點(diǎn),而 Worker 節(jié)點(diǎn)負(fù)責(zé)執(zhí)行任務(wù).?

Driver 的功能是創(chuàng)建 SparkContext, 負(fù)責(zé)執(zhí)行用戶寫的 Application 的 main 函數(shù)進(jìn)程,Application 就是用戶寫的程序.?

Spark 支持不同的運(yùn)行模式,包括Local, Standalone,Mesoses,Yarn 模式.不同的模式可能會(huì)將 Driver 調(diào)度到不同的節(jié)點(diǎn)上執(zhí)行.集群管理模式里, local 一般用于本地調(diào)試.?

每個(gè) Worker 上存在一個(gè)或多個(gè) Executor 進(jìn)程,該對(duì)象擁有一個(gè)線程池,每個(gè)線程負(fù)責(zé)一個(gè) Task 任務(wù)的執(zhí)行.根據(jù) Executor 上 CPU-core 的數(shù)量,其每個(gè)時(shí)間可以并行多個(gè) 跟 core 一樣數(shù)量的 Task4.Task 任務(wù)即為具體執(zhí)行的 Spark 程序的任務(wù).?

5

2.1 spark 運(yùn)行原理

一開始看不懂的話可以看完第三和第四章再回來看.?

底層詳細(xì)細(xì)節(jié)介紹6:?

我們使用spark-submit提交一個(gè)Spark作業(yè)之后相叁,這個(gè)作業(yè)就會(huì)啟動(dòng)一個(gè)對(duì)應(yīng)的Driver進(jìn)程。根據(jù)你使用的部署模式(deploy-mode)不同辽幌,Driver進(jìn)程可能在本地啟動(dòng)增淹,也可能在集群中某個(gè)工作節(jié)點(diǎn)上啟動(dòng)。而Driver進(jìn)程要做的第一件事情舶衬,就是向集群管理器(可以是Spark Standalone集群,也可以是其他的資源管理集群赎离,美團(tuán)?大眾點(diǎn)評(píng)使用的是YARN作為資源管理集群)申請(qǐng)運(yùn)行Spark作業(yè)需要使用的資源逛犹,這里的資源指的就是Executor進(jìn)程。YARN集群管理器會(huì)根據(jù)我們?yōu)镾park作業(yè)設(shè)置的資源參數(shù)梁剔,在各個(gè)工作節(jié)點(diǎn)上虽画,啟動(dòng)一定數(shù)量的Executor進(jìn)程,每個(gè)Executor進(jìn)程都占有一定數(shù)量的內(nèi)存和CPU core荣病。?

在申請(qǐng)到了作業(yè)執(zhí)行所需的資源之后码撰,Driver進(jìn)程就會(huì)開始調(diào)度和執(zhí)行我們編寫的作業(yè)代碼了。Driver進(jìn)程會(huì)將我們編寫的Spark作業(yè)代碼分拆為多個(gè)stage个盆,每個(gè)stage執(zhí)行一部分代碼片段脖岛,并為每個(gè)stage創(chuàng)建一批Task朵栖,然后將這些Task分配到各個(gè)Executor進(jìn)程中執(zhí)行。Task是最小的計(jì)算單元柴梆,負(fù)責(zé)執(zhí)行一模一樣的計(jì)算邏輯(也就是我們自己編寫的某個(gè)代碼片段)陨溅,只是每個(gè)Task處理的數(shù)據(jù)不同而已。一個(gè)stage的所有Task都執(zhí)行完畢之后绍在,會(huì)在各個(gè)節(jié)點(diǎn)本地的磁盤文件中寫入計(jì)算中間結(jié)果门扇,然后Driver就會(huì)調(diào)度運(yùn)行下一個(gè)stage。下一個(gè)stage的Task的輸入數(shù)據(jù)就是上一個(gè)stage輸出的中間結(jié)果偿渡。如此循環(huán)往復(fù)臼寄,直到將我們自己編寫的代碼邏輯全部執(zhí)行完,并且計(jì)算完所有的數(shù)據(jù)溜宽,得到我們想要的結(jié)果為止吉拳。?

Spark是根據(jù)shuffle類算子來進(jìn)行stage的劃分。如果我們的代碼中執(zhí)行了某個(gè)shuffle類算子(比如reduceByKey坑质、join等)合武,那么就會(huì)在該算子處,劃分出一個(gè)stage界限來涡扼〖谔可以大致理解為,shuffle算子執(zhí)行之前的代碼會(huì)被劃分為一個(gè)stage吃沪,shuffle算子執(zhí)行以及之后的代碼會(huì)被劃分為下一個(gè)stage汤善。因此一個(gè)stage剛開始執(zhí)行的時(shí)候,它的每個(gè)Task可能都會(huì)從上一個(gè)stage的Task所在的節(jié)點(diǎn)票彪,去通過網(wǎng)絡(luò)傳輸拉取需要自己處理的所有key红淡,然后對(duì)拉取到的所有相同的key使用我們自己編寫的算子函數(shù)執(zhí)行聚合操作(比如reduceByKey()算子接收的函數(shù))。這個(gè)過程就是shuffle降铸。?

當(dāng)我們?cè)诖a中執(zhí)行了cache/persist等持久化操作時(shí)在旱,根據(jù)我們選擇的持久化級(jí)別的不同,每個(gè)Task計(jì)算出來的數(shù)據(jù)也會(huì)保存到Executor進(jìn)程的內(nèi)存或者所在節(jié)點(diǎn)的磁盤文件中推掸。?

因此Executor的內(nèi)存主要分為三塊:第一塊是讓Task執(zhí)行我們自己編寫的代碼時(shí)使用桶蝎,默認(rèn)是占Executor總內(nèi)存的20%;第二塊是讓Task通過shuffle過程拉取了上一個(gè)stage的Task的輸出后谅畅,進(jìn)行聚合等操作時(shí)使用登渣,默認(rèn)也是占Executor總內(nèi)存的20%;第三塊是讓RDD持久化時(shí)使用毡泻,默認(rèn)占Executor總內(nèi)存的60%胜茧。?

Task的執(zhí)行速度是跟每個(gè)Executor進(jìn)程的CPU core數(shù)量有直接關(guān)系的。一個(gè)CPU core同一時(shí)間只能執(zhí)行一個(gè)線程仇味。而每個(gè)Executor進(jìn)程上分配到的多個(gè)Task呻顽,都是以每個(gè)Task一條線程的方式雹顺,多線程并發(fā)運(yùn)行的。如果CPU core數(shù)量比較充足芬位,而且分配到的Task數(shù)量比較合理无拗,那么通常來說,可以比較快速和高效地執(zhí)行完這些Task線程昧碉。?

以上就是Spark作業(yè)的基本運(yùn)行原理的說明.

在實(shí)際編程中,我們不需關(guān)心以上調(diào)度細(xì)節(jié).只需使用 Spark 提供的指定語言的編程接口調(diào)用相應(yīng)的 API 即可.?

在 Spark API 中, 一個(gè) 應(yīng)用(Application) 對(duì)應(yīng)一個(gè) SparkContext 的實(shí)例英染。一個(gè) 應(yīng)用 可以用于單個(gè) Job,或者分開的多個(gè) Job 的 session被饿,或者響應(yīng)請(qǐng)求的長(zhǎng)時(shí)間生存的服務(wù)器四康。與 MapReduce 不同的是,一個(gè) 應(yīng)用 的進(jìn)程(我們稱之為 Executor)狭握,會(huì)一直在集群上運(yùn)行闪金,即使當(dāng)時(shí)沒有 Job 在上面運(yùn)行。?

而調(diào)用一個(gè)Spark內(nèi)部的 Action 會(huì)產(chǎn)生一個(gè) Spark job 來完成它论颅。 為了確定這些job實(shí)際的內(nèi)容哎垦,Spark 檢查 RDD 的DAG再計(jì)算出執(zhí)行 plan 。這個(gè) plan 以最遠(yuǎn)端的 RDD 為起點(diǎn)(最遠(yuǎn)端指的是對(duì)外沒有依賴的 RDD 或者 數(shù)據(jù)已經(jīng)緩存下來的 RDD)恃疯,產(chǎn)生結(jié)果 RDD 的 Action 為結(jié)束 漏设。并根據(jù)是否發(fā)生 shuffle 劃分 DAG 的 stage.

// parameter

val appName ="RetailLocAdjust"

?valmaster ="local" // 選擇模式

val conf =newSparkConf().setMaster(master).setAppName(appName)

// 啟動(dòng)一個(gè) SparkContext Application

val sc =newSparkContext(conf)

val rdd = sc.textFile("path/...")

  要啟動(dòng) Spark 運(yùn)行程序主要有兩種方式:一種是使用 spark-submit 將腳本文件提交,一種是打開 Spark 跟某種特定語言的解釋器,如:

spark-shell: 啟動(dòng)了 Spark 的 scala 解釋器.

pyspark: 啟動(dòng)了 Spark 的 python 解釋器.

sparkR: 啟動(dòng)了 Spark 的 R 解釋器.?

(以上解釋器位于spark 的 bin 目錄下)

3 RDD 初識(shí)

RDD(Resilent Distributed Datasets)俗稱彈性分布式數(shù)據(jù)集,是 Spark 底層的分布式存儲(chǔ)的數(shù)據(jù)結(jié)構(gòu),可以說是 Spark 的核心,?Spark API 的所有操作都是基于 RDD 的. 數(shù)據(jù)不只存儲(chǔ)在一臺(tái)機(jī)器上,而是分布在多臺(tái)機(jī)器上,實(shí)現(xiàn)數(shù)據(jù)計(jì)算的并行化.彈性表明數(shù)據(jù)丟失時(shí),可以進(jìn)行重建.在Spark 1.5版以后,新增了數(shù)據(jù)結(jié)構(gòu) Spark-DataFrame,仿造的 R 和 python 的類 SQL 結(jié)構(gòu)-DataFrame, 底層為 RDD, 能夠讓數(shù)據(jù)從業(yè)人員更好的操作 RDD.?

在Spark 的設(shè)計(jì)思想中,為了減少網(wǎng)絡(luò)及磁盤 IO 開銷,需要設(shè)計(jì)出一種新的容錯(cuò)方式,于是才誕生了新的數(shù)據(jù)結(jié)構(gòu) RDD. RDD 是一種只讀的數(shù)據(jù)塊,可以從外部數(shù)據(jù)轉(zhuǎn)換而來,你可以對(duì)RDD 進(jìn)行函數(shù)操作(Operation),包括 Transformation 和 Action. 在這里只讀表示當(dāng)你對(duì)一個(gè) RDD 進(jìn)行了操作,那么結(jié)果將會(huì)是一個(gè)新的 RDD, 這種情況放在代碼里,假設(shè)變換前后都是使用同一個(gè)變量表示這一 RDD,RDD 里面的數(shù)據(jù)并不是真實(shí)的數(shù)據(jù),而是一些元數(shù)據(jù)信息,記錄了該 RDD 是通過哪些 Transformation 得到的,在計(jì)算機(jī)中使用 lineage 來表示這種血緣結(jié)構(gòu),lineage 形成一個(gè)有向無環(huán)圖 DAG, 整個(gè)計(jì)算過程中,將不需要將中間結(jié)果落地到 HDFS 進(jìn)行容錯(cuò),加入某個(gè)節(jié)點(diǎn)出錯(cuò),則只需要通過 lineage 關(guān)系重新計(jì)算即可.

1).?RDD 主要具有如下特點(diǎn):

1.它是在集群節(jié)點(diǎn)上的不可變的、已分區(qū)的集合對(duì)象;

2.通過并行轉(zhuǎn)換的方式來創(chuàng)建(如 Map今妄、 filter郑口、join 等);

3.失敗自動(dòng)重建;

4.可以控制存儲(chǔ)級(jí)別(內(nèi)存、磁盤等)來進(jìn)行重用;

5.必須是可序列化的;

6.是靜態(tài)類型的(只讀)盾鳞。

2).?RDD 的創(chuàng)建方式主要有2種:?

- 并行化(Parallelizing)一個(gè)已經(jīng)存在與驅(qū)動(dòng)程序(Driver Program)中的集合如set犬性、list;?

- 讀取外部存儲(chǔ)系統(tǒng)上的一個(gè)數(shù)據(jù)集,比如HDFS腾仅、Hive乒裆、HBase,或者任何提供了Hadoop InputFormat的數(shù)據(jù)源.也可以從本地讀取 txt、csv 等數(shù)據(jù)集

3).?RDD 的操作函數(shù)(operation)主要分為2種類型 Transformation 和 Action.

類別函數(shù)區(qū)別

TransformationMap,filter,groupBy,join, union,reduce,sort,partitionBy返回值還是 RDD,不會(huì)馬上 提交 Spark 集群運(yùn)行

Actioncount,collect,take,save, show返回值不是 RDD,會(huì)形成 DAG 圖,提交 Spark 集群運(yùn)行 并立即返回結(jié)果

Transformation 操作不是馬上提交 Spark 集群執(zhí)行的,Spark 在遇到 Transformation 操作時(shí)只會(huì)記錄需要這樣的操作,并不會(huì)去執(zhí)行,需要等到有 Action 操作的時(shí)候才會(huì)真正啟動(dòng)計(jì)算過程進(jìn)行計(jì)算.針對(duì)每個(gè) Action,Spark 會(huì)生成一個(gè) Job, 從數(shù)據(jù)的創(chuàng)建開始,經(jīng)過 Transformation, 結(jié)尾是 Action 操作.這些操作對(duì)應(yīng)形成一個(gè)有向無環(huán)圖(DAG),形成 DAG 的先決條件是最后的函數(shù)操作是一個(gè)Action.?

如下例子:

val arr = Array("cat", "dog", "lion", "monkey", "mouse")

// create RDD by collection

val rdd = sc.parallize(arr)? ?

// Map: "cat" -> c, cat

val rdd1 = rdd.Map(x => (x.charAt(0), x))

// groupby same key and count

val rdd2 = rdd1.groupBy(x => x._1).Map(x => (x._1, x._2.toList.length))

val result = rdd2.collect()? ? ? ? ? ?

print(result)

// output:Array((d,1), (l,1), (m,2))

首先,當(dāng)你在解釋器里一行行輸入的時(shí)候,實(shí)際上 Spark 并不會(huì)立即執(zhí)行函數(shù),而是當(dāng)你輸入了val result = rdd2.collect()的時(shí)候, Spark 才會(huì)開始計(jì)算,從sc.parallize(arr)?到最后的?collect,形成一個(gè) Job.

4.shuffle 和 stage

shuffle 是劃分 DAG 中 stage 的標(biāo)識(shí),同時(shí)影響 Spark 執(zhí)行速度的關(guān)鍵步驟.?

RDD 的 Transformation 函數(shù)中,又分為窄依賴(narrow dependency)和寬依賴(wide dependency)的操作.窄依賴跟寬依賴的區(qū)別是是否發(fā)生 shuffle(洗牌) 操作.寬依賴會(huì)發(fā)生 shuffle 操作. 窄依賴是子 RDD的各個(gè)分片(partition)不依賴于其他分片,能夠獨(dú)立計(jì)算得到結(jié)果,寬依賴指子 RDD 的各個(gè)分片會(huì)依賴于父RDD 的多個(gè)分片,所以會(huì)造成父 RDD 的各個(gè)分片在集群中重新分片, 看如下兩個(gè)示例:

// Map: "cat" -> c, cat

val rdd1 = rdd.Map(x => (x.charAt(0), x))

// groupby same key and count

val rdd2 = rdd1.groupBy(x => x._1).Map(x => (x._1, x._2.toList.length))

第一個(gè) Map 操作將 RDD 里的各個(gè)元素進(jìn)行映射, RDD 的各個(gè)數(shù)據(jù)元素之間不存在依賴,可以在集群的各個(gè)內(nèi)存中獨(dú)立計(jì)算,也就是并行化,第二個(gè) groupby 之后的 Map 操作,為了計(jì)算相同 key 下的元素個(gè)數(shù),需要把相同 key 的元素聚集到同一個(gè) partition 下,所以造成了數(shù)據(jù)在內(nèi)存中的重新分布,即 shuffle 操作.shuffle 操作是 spark 中最耗時(shí)的操作,應(yīng)盡量避免不必要的 shuffle.?

寬依賴主要有兩個(gè)過程: shuffle write 和 shuffle fetch. 類似 Hadoop 的 Map 和 Reduce 階段.shuffle write 將 ShuffleMapTask 任務(wù)產(chǎn)生的中間結(jié)果緩存到內(nèi)存中, shuffle fetch 獲得 ShuffleMapTask 緩存的中間結(jié)果進(jìn)行 ShuffleReduceTask 計(jì)算,這個(gè)過程容易造成OutOfMemory.?

shuffle 過程內(nèi)存分配使用 ShuffleMemoryManager 類管理,會(huì)針對(duì)每個(gè) Task 分配內(nèi)存,Task 任務(wù)完成后通過 Executor 釋放空間.這里可以把 Task 理解成不同 key 的數(shù)據(jù)對(duì)應(yīng)一個(gè) Task.?早期的內(nèi)存分配機(jī)制使用公平分配,即不同 Task 分配的內(nèi)存是一樣的,但是這樣容易造成內(nèi)存需求過多的 Task 的 OutOfMemory, 從而造成多余的 磁盤 IO 過程,影響整體的效率.(例:某一個(gè) key 下的數(shù)據(jù)明顯偏多,但因?yàn)榇蠹覂?nèi)存都一樣,這一個(gè) key 的數(shù)據(jù)就容易 OutOfMemory).1.5版以后?Task 共用一個(gè)內(nèi)存池,內(nèi)存池的大小默認(rèn)為 JVM 最大運(yùn)行時(shí)內(nèi)存容量的16%,分配機(jī)制如下:假如有 N 個(gè) Task,ShuffleMemoryManager 保證每個(gè) Task 溢出之前至少可以申請(qǐng)到1/2N 內(nèi)存,且至多申請(qǐng)到1/N,N 為當(dāng)前活動(dòng)的 shuffle Task 數(shù),因?yàn)镹 是一直變化的,所以 manager 會(huì)一直追蹤 Task 數(shù)的變化,重新計(jì)算隊(duì)列中的1/N 和1/2N.但是這樣仍然容易造成內(nèi)存需要多的 Task 任務(wù)溢出,所以最近有很多相關(guān)的研究是針對(duì) shuffle 過程內(nèi)存優(yōu)化的.?

如下 DAG 流程圖中,分別讀取數(shù)據(jù),經(jīng)過處理后 join 2個(gè) RDD 得到結(jié)果:?


在這個(gè)圖中,根據(jù)是否發(fā)生 shuffle 操作能夠?qū)⑵浞殖扇缦碌?stage 類型:?


(join 需要針對(duì)同一個(gè) key 合并,所以需要 shuffle)?

運(yùn)行到每個(gè) stage 的邊界時(shí)推励,數(shù)據(jù)在父 stage 中按照 Task 寫到磁盤上鹤耍,而在子 stage 中通過網(wǎng)絡(luò)按照 Task 去讀取數(shù)據(jù)。這些操作會(huì)導(dǎo)致很重的網(wǎng)絡(luò)以及磁盤的I/O吹艇,所以?stage 的邊界是非常占資源的惰蜜,在編寫 Spark 程序的時(shí)候需要盡量避免的?昂拂。父 stage 中 partition 個(gè)數(shù)與子 stage 的 partition 個(gè)數(shù)可能不同受神,所以那些產(chǎn)生 stage 邊界的 Transformation 常常需要接受一個(gè) numPartition 的參數(shù)來覺得子 stage 中的數(shù)據(jù)將被切分為多少個(gè) partition7。?

PS:shuffle 操作的時(shí)候可以用 combiner 壓縮數(shù)據(jù),減少 IO 的消耗

5.性能優(yōu)化

主要是我之前寫腳本的時(shí)候踩過的一些坑和在網(wǎng)上看到的比較好的調(diào)優(yōu)的方法.

5.1 緩存機(jī)制和 cache 的意義

Spark中對(duì)于一個(gè)RDD執(zhí)行多次算子(函數(shù)操作)的默認(rèn)原理是這樣的:每次你對(duì)一個(gè)RDD執(zhí)行一個(gè)算子操作時(shí)格侯,都會(huì)重新從源頭處計(jì)算一遍鼻听,計(jì)算出那個(gè)RDD來财著,然后再對(duì)這個(gè)RDD執(zhí)行你的算子操作。這種方式的性能是很差的撑碴。?

因此對(duì)于這種情況撑教,我們的建議是:對(duì)多次使用的RDD進(jìn)行持久化。?

首先要認(rèn)識(shí)到的是, .Spark 本身就是一個(gè)基于內(nèi)存的迭代式計(jì)算,所以如果程序從頭到尾只有一個(gè) Action 操作且子 RDD 只依賴于一個(gè)父RDD 的話,就不需要使用 cache 這個(gè)機(jī)制, RDD 會(huì)在內(nèi)存中一直從頭計(jì)算到尾,最后才根據(jù)你的 Action 操作返回一個(gè)值或者保存到相應(yīng)的磁盤中.需要 cache 的是當(dāng)存在多個(gè) Action 操作或者依賴于多個(gè) RDD 的時(shí)候, 可以在那之前緩存RDD. 如下:

val rdd = sc.textFile("path/to/file").Map(...).filter(...)

val rdd1 = rdd.Map(x => x+1)

val rdd2 = rdd.Map(x => x+100)

val rdd3 = rdd1.join(rdd2)

rdd3.count()

在這里 有2個(gè) RDD 依賴于 rdd, 會(huì)形成如下的 DAG 圖:?


所以可以在 rdd 生成之后使用 cache 函數(shù)對(duì) rdd 進(jìn)行緩存,這次就不用再從頭開始計(jì)算了.緩存之后過程如下:?

除了 cache 函數(shù)外,緩存還可以使用 persist, cache 是使用的默認(rèn)緩存選項(xiàng),一般默認(rèn)為Memory_only(內(nèi)存中緩存), persist 則可以在緩存的時(shí)候選擇任意一種緩存類型.事實(shí)上, cache 內(nèi)部調(diào)用的是默認(rèn)的 persist.?

持久化的類型8如下:

持久化級(jí)別含義解釋

MEMORY_ONLY使用未序列化的Java對(duì)象格式醉拓,將數(shù)據(jù)保存在內(nèi)存中伟姐。如果內(nèi)存不夠存放所有的數(shù)據(jù),則數(shù)據(jù)可能就不會(huì)進(jìn)行持久化亿卤。那么下次對(duì)這個(gè)RDD執(zhí)行算子操作時(shí)愤兵,那些沒有被持久化的數(shù)據(jù),需要從源頭處重新計(jì)算一遍排吴。這是默認(rèn)的持久化策略秆乳,使用cache()方法時(shí),實(shí)際就是使用的這種持久化策略钻哩。

MEMORY_AND_DISK使用未序列化的Java對(duì)象格式屹堰,優(yōu)先嘗試將數(shù)據(jù)保存在內(nèi)存中。如果內(nèi)存不夠存放所有的數(shù)據(jù)街氢,會(huì)將數(shù)據(jù)寫入磁盤文件中扯键,下次對(duì)這個(gè)RDD執(zhí)行算子時(shí),持久化在磁盤文件中的數(shù)據(jù)會(huì)被讀取出來使用阳仔。

MEMORY_ONLY_SER基本含義同MEMORY_ONLY忧陪。唯一的區(qū)別是,會(huì)將RDD中的數(shù)據(jù)進(jìn)行序列化近范,RDD的每個(gè)partition會(huì)被序列化成一個(gè)字節(jié)數(shù)組嘶摊。這種方式更加節(jié)省內(nèi)存邑跪,從而可以避免持久化的數(shù)據(jù)占用過多內(nèi)存導(dǎo)致頻繁GC族铆。

MEMORY_AND_DISK_SER基本含義同MEMORY_AND_DISK。唯一的區(qū)別是摘昌,會(huì)將RDD中的數(shù)據(jù)進(jìn)行序列化斥杜,RDD的每個(gè)partition會(huì)被序列化成一個(gè)字節(jié)數(shù)組虱颗。這種方式更加節(jié)省內(nèi)存,從而可以避免持久化的數(shù)據(jù)占用過多內(nèi)存導(dǎo)致頻繁GC蔗喂。

DISK_ONLY使用未序列化的Java對(duì)象格式忘渔,將數(shù)據(jù)全部寫入磁盤文件中。

MEMORY_ONLY_2, MEMORY_AND_DISK_2, 等等.對(duì)于上述任意一種持久化策略缰儿,如果加上后綴_2畦粮,代表的是將每個(gè)持久化的數(shù)據(jù),都復(fù)制一份副本,并將副本保存到其他節(jié)點(diǎn)上宣赔。這種基于副本的持久化機(jī)制主要用于進(jìn)行容錯(cuò)预麸。假如某個(gè)節(jié)點(diǎn)掛掉,節(jié)點(diǎn)的內(nèi)存或磁盤中的持久化數(shù)據(jù)丟失了儒将,那么后續(xù)對(duì)RDD計(jì)算時(shí)還可以使用該數(shù)據(jù)在其他節(jié)點(diǎn)上的副本吏祸。如果沒有副本的話,就只能將這些數(shù)據(jù)從源頭處重新計(jì)算一遍了钩蚊。

是否進(jìn)行序列化和磁盤寫入,需要充分考慮所分配到的內(nèi)存資源和可接受的計(jì)算時(shí)間長(zhǎng)短,序列化會(huì)減少內(nèi)存占用,但是反序列化會(huì)延長(zhǎng)時(shí)間,磁盤寫入會(huì)延長(zhǎng)時(shí)間,但是會(huì)減少內(nèi)存占用,也許能提高計(jì)算速度.此外要認(rèn)識(shí)到:cache 的 RDD 會(huì)一直占用內(nèi)存,當(dāng)后期不需要再依賴于他的反復(fù)計(jì)算的時(shí)候,可以使用 unpersist 釋放掉.

5.2 shuffle 的優(yōu)化

我們前面說過,進(jìn)行 shuffle 操作的是是很消耗系統(tǒng)資源的,需要寫入到磁盤并通過網(wǎng)絡(luò)傳輸,有時(shí)還需要對(duì)數(shù)據(jù)進(jìn)行排序.常見的 Transformation 操作如:repartition贡翘,join,cogroup砰逻,以及任何 *By 或者 *ByKey 的 Transformation 都需要 shuffle 數(shù)據(jù)9,合理的選用操作將降低 shuffle 操作的成本,提高運(yùn)算速度.具體如下:?

- 當(dāng)進(jìn)行聯(lián)合的規(guī)約操作時(shí)床估,避免使用 groupByKey。舉個(gè)例子诱渤,rdd.groupByKey().mapValues(_ .sum) 與 rdd.reduceByKey(_ + _) 執(zhí)行的結(jié)果是一樣的丐巫,但是前者需要把全部的數(shù)據(jù)通過網(wǎng)絡(luò)傳遞一遍,而后者只需要根據(jù)每個(gè) key 局部的 partition 累積結(jié)果勺美,在 shuffle 的之后把局部的累積值相加后得到結(jié)果.?

- 當(dāng)輸入和輸入的類型不一致時(shí)递胧,避免使用 reduceByKey。舉個(gè)例子赡茸,我們需要實(shí)現(xiàn)為每一個(gè)key查找所有不相同的 string缎脾。一個(gè)方法是利用 map 把每個(gè)元素的轉(zhuǎn)換成一個(gè) Set,再使用 reduceByKey 將這些 Set 合并起來10.?

- 生成新列的時(shí)候,避免使用單獨(dú)生成一列再 join 回來的方式,而是直接在數(shù)據(jù)上生成.?

- 當(dāng)需要對(duì)兩個(gè) RDD 使用 join 的時(shí)候,如果其中一個(gè)數(shù)據(jù)集特別小,小到能塞到每個(gè) Executor 單獨(dú)的內(nèi)存中的時(shí)候,可以不使用 join, 使用 broadcast 操作將小 RDD 復(fù)制廣播到每個(gè) Executor 的內(nèi)存里 join.(broadcast 的用法可以查看官方 API 文檔)

關(guān)于 shuffle 更多的介紹可以查看11這篇博文.

5.3 資源參數(shù)調(diào)優(yōu)

這些參數(shù)主要在 spark-submit 提交的時(shí)候指定,或者寫在配置文件中啟動(dòng).可以通過 spark-submit –help 查看.?

具體如下12:

參數(shù)說明調(diào)優(yōu)建議

num-Executors該參數(shù)用于設(shè)置Spark作業(yè)總共要用多少個(gè)Executor進(jìn)程來執(zhí)行占卧。這個(gè)參數(shù)非常重要遗菠,如果不設(shè)置的話,默認(rèn)只會(huì)給你啟動(dòng)少量的Executor進(jìn)程华蜒,此時(shí)你的Spark作業(yè)的運(yùn)行速度是非常慢的辙纬。每個(gè)Spark作業(yè)的運(yùn)行一般設(shè)置50~100個(gè)左右的Executor進(jìn)程比較合適。設(shè)置的太少叭喜,無法充分利用集群資源贺拣;設(shè)置的太多的話,大部分隊(duì)列可能無法給予充分的資源捂蕴。

Executor-memory該參數(shù)用于設(shè)置每個(gè)Executor進(jìn)程的內(nèi)存譬涡。Executor內(nèi)存的大小,很多時(shí)候直接決定了Spark作業(yè)的性能啥辨,而且跟常見的JVM OOM異常涡匀,也有直接的關(guān)聯(lián)。每個(gè)Executor進(jìn)程的內(nèi)存設(shè)置4G~8G較為合適溉知。具體的設(shè)置還是得根據(jù)不同部門的資源隊(duì)列來定陨瘩±拔耍可以看看自己團(tuán)隊(duì)的資源隊(duì)列的最大內(nèi)存限制是多少,num-Executors乘以Executor-memory拾酝,就代表了你的Spark作業(yè)申請(qǐng)到的總內(nèi)存量。此外卡者,如果你是跟團(tuán)隊(duì)里其他人共享這個(gè)資源隊(duì)列蒿囤,那么申請(qǐng)的總內(nèi)存量最好不要超過資源隊(duì)列最大總內(nèi)存的1/3~1/2,避免你自己的Spark作業(yè)占用了隊(duì)列所有的資源崇决,導(dǎo)致別的同學(xué)的作業(yè)無法運(yùn)行材诽。

Executor-cores用于設(shè)置每個(gè)Executor進(jìn)程的CPU core數(shù)量。這個(gè)參數(shù)決定了每個(gè)Executor并行執(zhí)行Task線程的能力恒傻。每個(gè)core同一時(shí)間只能執(zhí)行一個(gè)Task線程脸侥,因此每個(gè)Executor的core越多,越能夠快速地執(zhí)行完分配給自己的所有Task線程盈厘。Executor的CPU core數(shù)量設(shè)置為2~4個(gè)較為合適睁枕。同樣得根據(jù)不同部門的資源隊(duì)列來定,可以看看自己的資源隊(duì)列的最大CPU core限制是多少沸手,再依據(jù)設(shè)置的Executor數(shù)量外遇,來決定每個(gè)Executor進(jìn)程可以分配到幾個(gè)CPU core。同樣建議契吉,如果是跟他人共享這個(gè)隊(duì)列跳仿,那么num-Executors * Executor-cores不要超過隊(duì)列總CPU core的1/3~1/2左右比較合適

driver-memory該參數(shù)用于設(shè)置Driver進(jìn)程的內(nèi)存。Driver的內(nèi)存通常來說不設(shè)置捐晶,或者設(shè)置1G左右應(yīng)該就夠了菲语。唯一需要注意的一點(diǎn)是,如果需要使用collect算子將RDD的數(shù)據(jù)全部拉取到Driver上進(jìn)行處理惑灵,那么必須確保Driver的內(nèi)存足夠大山上,否則會(huì)出現(xiàn)OOM內(nèi)存溢出的問題。

spark.default. parallelism該參數(shù)用于設(shè)置每個(gè)stage的默認(rèn)Task數(shù)量英支。這個(gè)參數(shù)極為重要胶哲,如果不設(shè)置可能會(huì)直接影響你的Spark作業(yè)性能。Spark作業(yè)的默認(rèn)Task數(shù)量為500~1000個(gè)較合適潭辈。如果不去設(shè)置這個(gè)參數(shù)鸯屿,那么就會(huì)導(dǎo)致Spark自己根據(jù)底層HDFS的block數(shù)量來設(shè)置Task的數(shù)量,默認(rèn)是一個(gè)HDFS block對(duì)應(yīng)一個(gè)Task把敢。通常來說寄摆,Spark默認(rèn)設(shè)置的數(shù)量是偏少的(比如幾十個(gè)Task),如果Task數(shù)量偏少的話修赞,就會(huì)導(dǎo)致你前面設(shè)置好的Executor的參數(shù)都前功盡棄婶恼。即無論你的Executor進(jìn)程/內(nèi)存/CPU有多大桑阶,但是Task只有幾個(gè),那么90%的Executor進(jìn)程可能根本就沒有Task執(zhí)行勾邦,也就白白浪費(fèi)了資源此Spark官網(wǎng)建議的設(shè)置原則是蚣录,設(shè)置該參數(shù)為num-Executors * Executor-cores的2~3倍較為合適,比如Executor的總CPU core數(shù)量為300個(gè)眷篇,那么設(shè)置1000個(gè)Task是可以的萎河,可以充分地利用Spark集群的資源。

spark.storage. memoryFrAction該參數(shù)用于設(shè)置RDD持久化數(shù)據(jù)在Executor內(nèi)存中能占的比例蕉饼,默認(rèn)是0.6虐杯。也就是說,默認(rèn)Executor 60%的內(nèi)存昧港,可以用來保存持久化的RDD數(shù)據(jù)擎椰。根據(jù)你選擇的不同的持久化策略,如果內(nèi)存不夠時(shí)创肥,可能數(shù)據(jù)就不會(huì)持久化达舒,或者數(shù)據(jù)會(huì)寫入磁盤。如果Spark作業(yè)中叹侄,有較多的RDD持久化操作休弃,該參數(shù)的值可以適當(dāng)提高一些,保證持久化的數(shù)據(jù)能夠容納在內(nèi)存中圈膏。避免內(nèi)存不夠緩存所有的數(shù)據(jù)塔猾,導(dǎo)致數(shù)據(jù)只能寫入磁盤中,降低了性能稽坤。但是如果Spark作業(yè)中的shuffle類操作比較多丈甸,而持久化操作比較少,那么這個(gè)參數(shù)的值適當(dāng)降低一些比較合適尿褪。此外睦擂,如果發(fā)現(xiàn)作業(yè)由于頻繁的gc導(dǎo)致運(yùn)行緩慢(通過Spark web ui可以觀察到作業(yè)的gc耗時(shí)),意味著Task執(zhí)行用戶代碼的內(nèi)存不夠用杖玲,那么同樣建議調(diào)低這個(gè)參數(shù)的值顿仇。

spark.shuffle. memoryFrAction該參數(shù)用于設(shè)置shuffle過程中一個(gè)Task拉取到上個(gè)stage的Task的輸出后,進(jìn)行聚合操作時(shí)能夠使用的Executor內(nèi)存的比例摆马,默認(rèn)20%臼闻。shuffle操作在進(jìn)行聚合時(shí),如果使用的內(nèi)存超出20%的限制囤采,多余的數(shù)據(jù)就會(huì)溢寫到磁盤述呐,此時(shí)會(huì)極大地降低性能。如果Spark作業(yè)中的RDD持久化操作較少蕉毯,shuffle操作較多時(shí)乓搬,建議降低持久化操作的內(nèi)存占比思犁,提高shuffle操作的內(nèi)存占比比例,避免shuffle過程中數(shù)據(jù)過多時(shí)內(nèi)存不夠用进肯,必須溢寫到磁盤上激蹲,降低了性能。此外江掩,如果發(fā)現(xiàn)作業(yè)由于頻繁的gc導(dǎo)致運(yùn)行緩慢学辱,意味著Task執(zhí)行用戶代碼的內(nèi)存不夠用,那么同樣建議調(diào)低這個(gè)參數(shù)的值频敛。

  資源參數(shù)的調(diào)優(yōu),沒有一個(gè)固定的值馅扣,需要根據(jù)自己的實(shí)際情況(包括Spark作業(yè)中的shuffle操作數(shù)量斟赚、RDD持久化操作數(shù)量以及Spark web ui中顯示的作業(yè)gc情況),同時(shí)參考本篇文章中給出的原理以及調(diào)優(yōu)建議差油,合理地設(shè)置上述參數(shù)拗军。

5.4 小結(jié)

對(duì)需要重復(fù)計(jì)算的才使用 cache, 同時(shí)及時(shí)釋放掉(unpersist)不再需要使用的 RDD.

避免使用 shuffle 運(yùn)算.需要的時(shí)候盡量選取較優(yōu)方案.

合理配置 Executor/Task/core 的參數(shù),合理分配持久化/ shuffle的內(nèi)存占比,?

driver-memory: 1G

executor-memory: 4~8G(根據(jù)實(shí)際需求來)

num-executors: 50~100

executor-cores: 2~4

Tasks: 500~1000

6.本地搭建 Spark 開發(fā)環(huán)境

6.1 Spark-Scala-IntelliJ

本地搭建 Spark-scala開發(fā)環(huán)境, 并使用 IntelliJ idea 作為 IDE 的方法,參見博客另一篇文章:?

Spark學(xué)習(xí): Spark-Scala-IntelliJ開發(fā)環(huán)境搭建和編譯Jar包流程

6.2 Spark-Notebook 開發(fā)環(huán)境

本地搭建 Spark-Notebook(python or scala) 開發(fā)環(huán)境, 參見博客另一篇文章(還沒發(fā)出來):?

Spark學(xué)習(xí): Spark-Notebook 開發(fā)環(huán)境

2016.10.08?

databatman

參考文獻(xiàn)

文獻(xiàn):大數(shù)據(jù)分析平臺(tái)建設(shè)與應(yīng)用綜述??

Spark學(xué)習(xí)手冊(cè)(三):Spark模塊摘讀??

Spark入門實(shí)戰(zhàn)系列–3.Spark編程模型(上)–編程模型及SparkShell實(shí)戰(zhàn)??

文獻(xiàn): 基于 spark 平臺(tái)推薦系統(tǒng)研究.??

Apache Spark源碼走讀之7 – Standalone部署方式分析??

Spark性能優(yōu)化指南——基礎(chǔ)篇??

Apache Spark Jobs 性能調(diào)優(yōu)(一)??

Spark性能優(yōu)化指南——基礎(chǔ)篇??

Apache Spark Jobs 性能調(diào)優(yōu)(一)??

Apache Spark Jobs 性能調(diào)優(yōu)(一)??

Apache Spark Jobs 性能調(diào)優(yōu)(一)??

Spark性能優(yōu)化指南——基礎(chǔ)篇??

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市蓄喇,隨后出現(xiàn)的幾起案子发侵,更是在濱河造成了極大的恐慌,老刑警劉巖妆偏,帶你破解...
    沈念sama閱讀 222,590評(píng)論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件刃鳄,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡钱骂,警方通過查閱死者的電腦和手機(jī)叔锐,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,157評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來见秽,“玉大人愉烙,你說我怎么就攤上這事〗馊。” “怎么了步责?”我有些...
    開封第一講書人閱讀 169,301評(píng)論 0 362
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)禀苦。 經(jīng)常有香客問我蔓肯,道長(zhǎng),這世上最難降的妖魔是什么振乏? 我笑而不...
    開封第一講書人閱讀 60,078評(píng)論 1 300
  • 正文 為了忘掉前任省核,我火速辦了婚禮,結(jié)果婚禮上昆码,老公的妹妹穿的比我還像新娘气忠。我一直安慰自己邻储,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 69,082評(píng)論 6 398
  • 文/花漫 我一把揭開白布旧噪。 她就那樣靜靜地躺著吨娜,像睡著了一般。 火紅的嫁衣襯著肌膚如雪淘钟。 梳的紋絲不亂的頭發(fā)上宦赠,一...
    開封第一講書人閱讀 52,682評(píng)論 1 312
  • 那天,我揣著相機(jī)與錄音米母,去河邊找鬼勾扭。 笑死,一個(gè)胖子當(dāng)著我的面吹牛铁瞒,可吹牛的內(nèi)容都是我干的妙色。 我是一名探鬼主播,決...
    沈念sama閱讀 41,155評(píng)論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼慧耍,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼身辨!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起芍碧,我...
    開封第一講書人閱讀 40,098評(píng)論 0 277
  • 序言:老撾萬榮一對(duì)情侶失蹤煌珊,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后泌豆,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體定庵,經(jīng)...
    沈念sama閱讀 46,638評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,701評(píng)論 3 342
  • 正文 我和宋清朗相戀三年踪危,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了洗贰。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,852評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡陨倡,死狀恐怖敛滋,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情兴革,我是刑警寧澤绎晃,帶...
    沈念sama閱讀 36,520評(píng)論 5 351
  • 正文 年R本政府宣布,位于F島的核電站杂曲,受9級(jí)特大地震影響庶艾,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜擎勘,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,181評(píng)論 3 335
  • 文/蒙蒙 一咱揍、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧棚饵,春花似錦煤裙、人聲如沸掩完。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,674評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽且蓬。三九已至,卻和暖如春题翰,著一層夾襖步出監(jiān)牢的瞬間恶阴,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,788評(píng)論 1 274
  • 我被黑心中介騙來泰國(guó)打工豹障, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留冯事,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 49,279評(píng)論 3 379
  • 正文 我出身青樓血公,卻偏偏與公主長(zhǎng)得像昵仅,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子坞笙,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,851評(píng)論 2 361

推薦閱讀更多精彩內(nèi)容