Spark Streaming源碼解讀之Receiver生成全生命周期徹底研究和思考

一:Receiver啟動的方式設(shè)想

1.Spark Streaming通過Receiver持續(xù)不斷的從外部數(shù)據(jù)源接收數(shù)據(jù)幔戏,并把數(shù)據(jù)匯報給Driver端,由此每個Batch Durations就可以根據(jù)匯報的數(shù)據(jù)生成不同的Job披诗,在不同的機器之上啟動,每個reveiver 相當(dāng)于一個分片熟空,由于Sapark core 感覺不到它的特殊性藤巢,按普通的調(diào)度,即有可能在同一個Executor之中啟動多個Receiver息罗,這種情況之下導(dǎo)致負(fù)載不均勻或者由于Executor運行本身的故障掂咒,task 有可能啟動失敗,整個job啟動就失敗迈喉,即receiver啟動失敗绍刮。

啟動Receiver

1. 從Spark Core的角度來看,Receiver的啟動Spark Core并不知道挨摸, Receiver是通過Job的方式啟動的孩革,運行在Executor之上的,由task運行得运。

2. 一般情況下膝蜈,只有一個Receiver,但是可以創(chuàng)建不同的數(shù)據(jù)來源的InputDStream.

3.啟動Receiver的時候,實其上一個receiver就是一個partition分片熔掺,由一個Job啟動饱搏,這個Job里面有RDD的transformations操作和action的操作,隨著定時器觸發(fā)置逻,不斷的產(chǎn)生有數(shù)據(jù)接收推沸,每個時間段中產(chǎn)生的接收數(shù)據(jù)實其上就是一個partition分片,

4.? 以上設(shè)計思想產(chǎn)生的如下問題:

(1)如果有多個InputDStream券坞,那就要啟動多個Receiver鬓催,每個Receiver也就相當(dāng)于分片partition,那我啟動Receiver的時候理想的情況下是在不同的機器上啟動Receiver恨锚,但是SparkCore的角度來看就是應(yīng)用程序宇驾,感覺不到Receiver的特殊性,所以就會按照正常的Job啟動的方式來處理猴伶,極有可能在一個Executor上啟動多個Receiver.這樣的話就可能導(dǎo)致負(fù)載不均衡飞苇。(2)有可能啟動Receiver失敗,只要集群存在蜗顽,Receiver就不應(yīng)該啟動失敗布卡。

(3)從運行過程中看,一個Reveiver就是一個partition的話雇盖,啟動的由一個Task忿等,如果Task啟動失敗,相應(yīng)的Receiver也會失敗崔挖。由此贸街,可以得出庵寞,對于Receiver失敗的話,后果是非常嚴(yán)重的薛匪,那么在SparkStreaming如何防止這些事的呢捐川?Spark Streaming源碼分析,在Spark Streaming之中就指定如下信息:

一是Spark使用一個Job啟動一個Receiver.最大程度的保證了負(fù)載均衡逸尖。

二是Spark Streaming已經(jīng)指定每個Receiver運行在那些Executor上古沥,在Receiver運行之前就指定了運行的地方!

三是 如果Receiver啟動失敗娇跟,此時并不是Job失敗岩齿,在內(nèi)部會重新啟動Receiver.

在StreamingContext的start方法被調(diào)用的時候,JobScheduler的start

def start(): Unit = synchronized {

state match {

caseINITIALIZED =>

startSite.set(DStream.getCreationSite())

StreamingContext.ACTIVATION_LOCK.synchronized {

StreamingContext.assertNoOtherContextIsActive()

try {

validate()

// Startthe streaming scheduler in a new thread, so that

thread local properties

// likecall sites and job groups can be reset without

affecting those of the

//current thread.

ThreadUtils.runInNewThread("streaming-start") {

sparkContext.setCallSite(startSite.get)

sparkContext.clearJobGroup()

sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL,"false")

//啟動子線程苞俘,一方面為了本地初始化工作盹沈,另外一方面是不要阻塞主線程。

scheduler.start()

}

state =StreamingContextState.ACTIVE

} catch {

caseNonFatal(e) =>

logError("Error starting the context, marking it as

stopped",e)

scheduler.stop(false)

state =StreamingContextState.STOPPED

throw e

}

StreamingContext.setActiveContext(this)

}

shutdownHookRef = ShutdownHookManager.addShutdownHook(

StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)

//Registering Streaming Metrics at the start of the

StreamingContext

assert(env.metricsSystem != null)

env.metricsSystem.registerSource(streamingSource)

uiTab.foreach(_.attach())

logInfo("StreamingContext started")

case ACTIVE=>

logWarning("StreamingContext has already been started")

case STOPPED=>

throw newIllegalStateException("StreamingContext has already

been stopped")

}

}

2.而在JobScheduler的start方法中ReceiverTracker的start方法被調(diào)用吃谣,Receiver就啟動了乞封。

def start(): Unit = synchronized {

if (eventLoop !=null) return // scheduler has already been

started

logDebug("Starting JobScheduler")

eventLoop = newEventLoop[JobSchedulerEvent]("JobScheduler")

{

overrideprotected def onReceive(event: JobSchedulerEvent):

Unit = processEvent(event)

overrideprotected def onError(e: Throwable): Unit =

reportError("Error in jobscheduler", e)

}

eventLoop.start()

// attach ratecontrollers of input streams to receive batch

completion updates

for {

inputDStream<- ssc.graph.getInputStreams

rateController<- inputDStream.rateController

}ssc.addStreamingListener(rateController)

listenerBus.start(ssc.sparkContext)

receiverTracker =new ReceiverTracker(ssc)

inputInfoTracker= new InputInfoTracker(ssc)

//啟動receiverTracker

receiverTracker.start()

jobGenerator.start()

logInfo("Started JobScheduler")

}

3.ReceiverTracker的start方法啟動RPC消息通信體,為啥呢岗憋?因為receiverTracker會監(jiān)控整個集群中的Receiver歌亲,Receiver轉(zhuǎn)過來要向ReceiverTrackerEndpoint匯報自己的狀態(tài),接收的數(shù)據(jù)澜驮,包括生命周期等信息

def start(): Unit = synchronized {

if(isTrackerStarted) {

throw newSparkException("ReceiverTracker already started")

}

//Receiver的啟動是依據(jù)輸入數(shù)據(jù)流的。

if(!receiverInputStreams.isEmpty) {

endpoint =ssc.env.rpcEnv.setupEndpoint(

"ReceiverTracker",

newReceiverTrackerEndpoint(ssc.env.rpcEnv))

if(!skipReceiverLaunch) launchReceivers()

logInfo("ReceiverTracker started")

trackerState =Started

}

}

4.基于ReceiverInputDStream(是在Driver端)來獲得具體的Receivers實例惋鸥,然后再把他們分不到Worker節(jié)點上杂穷。一個ReceiverInputDStream只產(chǎn)生一個Receiver

private def launchReceivers(): Unit = {

val receivers =receiverInputStreams.map(nis => {

//一個數(shù)據(jù)輸入來源(receiverInputDStream)只產(chǎn)生一個Receiver

val rcvr =nis.getReceiver()

rcvr.setReceiverId(nis.id)

rcvr

})

runDummySparkJob()

logInfo("Starting " + receivers.length + "receivers")

//此時的endpoint就是上面代碼中在ReceiverTracker的start方法中構(gòu)造的ReceiverTrackerEndpoint

endpoint.send(StartAllReceivers(receivers))

}

5. 其中runDummySparkJob()為了確保所有節(jié)點活著,而且避免所有的receivers集中在一個節(jié)點上卦绣。

private def runDummySparkJob(): Unit = {

if(!ssc.sparkContext.isLocal) {

ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x,

1)).reduceByKey(_+ _, 20).collect()

}

assert(getExecutors.nonEmpty)

}

ReceiverInputDStream中的getReceiver()方法獲得receiver對象然后將它發(fā)送到worker節(jié)點上實例化receiver,然后去接收數(shù)據(jù)耐量。

def getReceiver(): Receiver[T] //返回的是Receiver對象

6. ?根據(jù)繼承關(guān)系,這里看一下SocketInputDStream中的getReceiver方法滤港。

def getReceiver(): Receiver[T] = {

newSocketReceiver(host, port, bytesToObjects,

storageLevel)

}

}

啟動后臺線程廊蜒,調(diào)用receive方法。

private[streaming]

class SocketReceiver[T: ClassTag](

host: String,

port: Int,

bytesToObjects:InputStream => Iterator[T],

storageLevel:StorageLevel

) extendsReceiver[T](storageLevel) with Logging {

def onStart() {

// Start thethread that receives data over a connection

newThread("Socket Receiver") {

setDaemon(true)

override defrun() { receive() }

}.start()

}

啟動socket開始接收數(shù)據(jù)溅漾。

/** Create a socket connection and receive data untilreceiver is

stopped */

def receive() {

var socket:Socket = null

try {

logInfo("Connecting to " + host + ":" + port)

socket = newSocket(host, port)

logInfo("Connected to " + host + ":" + port)

val iterator= bytesToObjects(socket.getInputStream())

while(!isStopped && iterator.hasNext) {

store(iterator.next)

}

if(!isStopped()) {

restart("Socket data stream had no more data")

} else {

logInfo("Stopped receiving")

}

} catch {

case e:java.net.ConnectException =>

restart("Error connecting to " + host + ":" + port,e)

caseNonFatal(e) =>

logWarning("Error receiving data", e)

restart("Error receiving data", e)

} finally {

if (socket !=null) {

socket.close()

logInfo("Closed socket to " + host + ":" + port)

}

}

}

}

7.?ReceiverTrackerEndpoint源碼如下:

/** RpcEndpoint to receive messages from the receivers.*/

private class ReceiverTrackerEndpoint(override valrpcEnv: RpcEnv)

extends ThreadSafeRpcEndpoint {

// TODO Removethis thread pool after

https://github.com/apache/spark/issues/7385 is merged

private valsubmitJobThreadPool =

ExecutionContext.fromExecutorService(

ThreadUtils.newDaemonCachedThreadPool("submit-job-thread-pool"))

private valwalBatchingThreadPool =

ExecutionContext.fromExecutorService(

ThreadUtils.newDaemonCachedThreadPool("wal-batching-thread-pool"))

@volatile privatevar active: Boolean = true

override defreceive: PartialFunction[Any, Unit] = {

// Localmessages

caseStartAllReceivers(receivers) =>

valscheduledLocations =

// schedulingPolicy調(diào)度策略

//receivers就是要啟動的receiver

//getExecutors獲得集群中的Executors的列表

// scheduleReceivers就可以確定receiver可以運行在哪些Executor上

schedulingPolicy.scheduleReceivers(receivers,getExecutors)

for (receiver<- receivers) {

//

scheduledLocations根據(jù)receiver的Id就找到了當(dāng)前那些Executors可以運行Receiver

val executors= scheduledLocations(receiver.streamId)

updateReceiverScheduledExecutors(receiver.streamId,

executors)

receiverPreferredLocations(receiver.streamId)

=receiver.preferredLocation

//上述代碼之后要啟動的Receiver確定了山叮,具體Receiver運行在哪些Executors上也確定了。

//循環(huán)receivers添履,每次將一個receiver傳入過去屁倔。

startReceiver(receiver, executors)

}

//用于接收RestartReceiver消息,從新啟動Receiver.

caseRestartReceiver(receiver) =>

// Oldscheduled executors minus the ones that are not active

any more

//如果Receiver失敗的話暮胧,從可選列表中減去锐借。

valoldScheduledExecutors =

//剛在調(diào)度為Receiver分配給哪個Executor的時候會有一些列可選的Executor列表

getStoredScheduledExecutors(receiver.streamId)

//從新獲取Executors

valscheduledLocations = if (oldScheduledExecutors.nonEmpty)

{

// Tryglobal scheduling again

oldScheduledExecutors

} else {

//如果可選的Executor使用完了问麸,則會重新執(zhí)行rescheduleReceiver重新獲取Executor.

valoldReceiverInfo =

receiverTrackingInfos(receiver.streamId)

// Clear"scheduledLocations" to indicate we are going to

do local scheduling

valnewReceiverInfo = oldReceiverInfo.copy(

state =ReceiverState.INACTIVE, scheduledLocations =

None)

receiverTrackingInfos(receiver.streamId) =

newReceiverInfo

schedulingPolicy.rescheduleReceiver(

receiver.streamId,

receiver.preferredLocation,

receiverTrackingInfos,

getExecutors)

}

// Assumethere is one receiver restarting at one time, so we

don't need to update

//receiverTrackingInfos

//重復(fù)調(diào)用startReceiver

startReceiver(receiver, scheduledLocations)

case c:CleanupOldBlocks =>

receiverTrackingInfos.values.flatMap(_.endpoint).foreach(_.send(c))

caseUpdateReceiverRateLimit(streamUID, newRate) =>

for (info<- receiverTrackingInfos.get(streamUID); eP

<- info.endpoint) {

eP.send(UpdateRateLimit(newRate))

}

// Remotemessages

caseReportError(streamId, message, error) =>

reportError(streamId, message, error)

}

8.? 從注釋中可以看到,Spark Streaming指定receiver在那些Executors運行钞翔,而不是基于Spark

Core中的Task來指定严卖。

Spark使用submitJob的方式啟動Receiver,而在應(yīng)用程序執(zhí)行的時候會有很多Receiver,這個時候是啟動一個Receiver呢布轿,還是把所有的Receiver通過這一個Job啟動哮笆?

在ReceiverTracker的receive方法中startReceiver方法第一個參數(shù)就是receiver,從實現(xiàn)的可以看出for循環(huán)不 斷取出receiver,然后調(diào)用startReceiver。由此就可以得出一個Job只啟動一個Receiver.

如果Receiver啟動失敗驮捍,此時并不會認(rèn)為是作業(yè)失敗疟呐,會重新發(fā)消息給ReceiverTrackerEndpoint重新啟動Receiver,這樣也就確保了Receivers一定會被啟動东且,這樣就不會像Task啟動Receiver的話如果失敗受重試次數(shù)的影響启具。

private def startReceiver(

receiver:Receiver[_],

// scheduledLocations指定的是在具體的那臺物理機器上執(zhí)行。

scheduledLocations: Seq[TaskLocation]): Unit = {

//判斷下Receiver的狀態(tài)是否正常珊泳。

defshouldStartReceiver: Boolean = {

// It's okay tostart when trackerState is Initialized or

Started

!(isTrackerStopping || isTrackerStopped)

}

val receiverId =receiver.streamId

//如果不需要啟動Receiver則會調(diào)用onReceiverJobFinish()

if(!shouldStartReceiver) {

onReceiverJobFinish(receiverId)

return

}

valcheckpointDirOption = Option(ssc.checkpointDir)

valserializableHadoopConf =

newSerializableConfiguration(ssc.sparkContext.hadoopConfiguration)

//startReceiverFunc封裝了在worker上啟動receiver的動作鲁冯。

// Function tostart the receiver on the worker node

valstartReceiverFunc: Iterator[Receiver[_]] => Unit =

(iterator:Iterator[Receiver[_]]) => {

if(!iterator.hasNext) {

throw newSparkException(

"Could not start receiver as object not found.")

}

if(TaskContext.get().attemptNumber() == 0) {

valreceiver = iterator.next()

assert(iterator.hasNext == false)

// ReceiverSupervisorImpl是Receiver的監(jiān)控器,同時負(fù)責(zé)數(shù)據(jù)的寫等操作色查。

valsupervisor = new ReceiverSupervisorImpl(

receiver,SparkEnv.get, serializableHadoopConf.value,

checkpointDirOption)

supervisor.start()

supervisor.awaitTermination()

} else {

//如果你想重新啟動receiver的話薯演,你需要重新完成上面的調(diào)度,從新schedule秧了,而不是Task重試跨扮。

// It'srestarted by TaskScheduler, but we want to

reschedule it again. So exit it.

}

}

// Create the RDDusing the scheduledLocations to run the

receiver in a Spark job

val receiverRDD:RDD[Receiver[_]] =

if(scheduledLocations.isEmpty) {

ssc.sc.makeRDD(Seq(receiver), 1)

} else {

valpreferredLocations =

scheduledLocations.map(_.toString).distinct

ssc.sc.makeRDD(Seq(receiver -> preferredLocations))

}

//receiverId可以看出,receiver只有一個

receiverRDD.setName(s"Receiver $receiverId")

ssc.sparkContext.setJobDescription(s"Streaming job running

receiver$receiverId")

ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))

//每個Receiver的啟動都會觸發(fā)一個Job,而不是一個作業(yè)的Task去啟動所有的Receiver.

//應(yīng)用程序一般會有很多Receiver,

//調(diào)用SparkContext的submitJob验毡,為了啟動Receiver衡创,啟動了Spark一個作業(yè).

val future =ssc.sparkContext.submitJob[Receiver[_], Unit,

Unit](

receiverRDD,startReceiverFunc, Seq(0), (_, _) => Unit,

())

// We will keeprestarting the receiver job until ReceiverTracker

is stopped

future.onComplete{

case Success(_)=>

// shouldStartReceiver默認(rèn)是true

if(!shouldStartReceiver) {

onReceiverJobFinish(receiverId)

} else {

logInfo(s"Restarting Receiver $receiverId")

self.send(RestartReceiver(receiver))

}

case Failure(e)=>

if(!shouldStartReceiver) {

onReceiverJobFinish(receiverId)

} else {

logError("Receiver has been stopped. Try to restart it.",

e)

logInfo(s"Restarting Receiver $receiverId")

//RestartReceiver

self.send(RestartReceiver(receiver))

}

//使用線程池的方式提交Job,這樣的好處是可以并發(fā)的啟動Receiver晶通。

}(submitJobThreadPool)

logInfo(s"Receiver ${receiver.streamId} started")

}

9. 當(dāng)Receiver啟動失敗的話璃氢,就會調(diào)用ReceiverTrackEndpoint重新啟動一個Spark

Job去啟動Receiver.

/**

* This messagewill trigger ReceiverTrackerEndpoint to restart a

Spark job for the receiver.

*/

private[streaming] case class

RestartReceiver(receiver:Receiver[_])

extendsReceiverTrackerLocalMessage

11. 當(dāng)Receiver關(guān)閉的話,并不需要重新啟動Spark Job.

/**

* Call when areceiver is terminated. It means we won't restart

its Spark job.

*/

private def onReceiverJobFinish(receiverId: Int): Unit ={

receiverJobExitLatch.countDown()

//使用foreach將receiver從receiverTrackingInfo中去掉狮辽。

receiverTrackingInfos.remove(receiverId).foreach {

receiverTrackingInfo=>

if(receiverTrackingInfo.state == ReceiverState.ACTIVE) {

logWarning(s"Receiver $receiverId exited but didn't

deregister")

}

}

}

12.

Supervisor.start()一也,在子類ReceiverSupervisorImpl中并沒有start方法,因此調(diào)用的是父類ReceiverSupervisor的start方法喉脖。

/** Start the supervisor */

def start() {

onStart() //具體實現(xiàn)是子類實現(xiàn)的椰苟。

startReceiver()

}

Onstart方法源碼如下:

/**

* Called whensupervisor is started.

* Note that thismust be called before the receiver.onStart() is

called to ensure

* things like[[BlockGenerator]]s are started before the receiver

starts sending data.

*/

protected def onStart() { }

其具體實現(xiàn)是在子類的ReceiverSupervivorImpl的onstart方法

override protected def onStart() {

registeredBlockGenerators.foreach { _.start() }

}

此時的start方法調(diào)用的是BlockGenerator的start方法。

/** Start block generating and pushing threads. */

def start(): Unit = synchronized {

if (state ==Initialized) {

state = Active

blockIntervalTimer.start()

blockPushingThread.start()

logInfo("Started BlockGenerator")

} else {

throw newSparkException(

s"Cannotstart BlockGenerator as its not in the Initialized

state [state =$state]")

}

}

備注:

資料來源于:DT_大數(shù)據(jù)夢工廠(Spark發(fā)行版本定制)

更多私密內(nèi)容树叽,請關(guān)注微信公眾號:DT_Spark

如果您對大數(shù)據(jù)Spark感興趣尊剔,可以免費聽由王家林老師每天晚上20:00開設(shè)的Spark永久免費公開課,地址YY房間號:68917580

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市须误,隨后出現(xiàn)的幾起案子挨稿,更是在濱河造成了極大的恐慌,老刑警劉巖京痢,帶你破解...
    沈念sama閱讀 216,496評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件奶甘,死亡現(xiàn)場離奇詭異,居然都是意外死亡祭椰,警方通過查閱死者的電腦和手機臭家,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,407評論 3 392
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來方淤,“玉大人钉赁,你說我怎么就攤上這事⌒” “怎么了你踩?”我有些...
    開封第一講書人閱讀 162,632評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長讳苦。 經(jīng)常有香客問我带膜,道長,這世上最難降的妖魔是什么鸳谜? 我笑而不...
    開封第一講書人閱讀 58,180評論 1 292
  • 正文 為了忘掉前任膝藕,我火速辦了婚禮,結(jié)果婚禮上咐扭,老公的妹妹穿的比我還像新娘芭挽。我一直安慰自己,他們只是感情好蝗肪,可當(dāng)我...
    茶點故事閱讀 67,198評論 6 388
  • 文/花漫 我一把揭開白布袜爪。 她就那樣靜靜地躺著,像睡著了一般穗慕。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上妻导,一...
    開封第一講書人閱讀 51,165評論 1 299
  • 那天逛绵,我揣著相機與錄音,去河邊找鬼倔韭。 笑死术浪,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的寿酌。 我是一名探鬼主播胰苏,決...
    沈念sama閱讀 40,052評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼醇疼!你這毒婦竟也來了硕并?” 一聲冷哼從身側(cè)響起法焰,我...
    開封第一講書人閱讀 38,910評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎倔毙,沒想到半個月后埃仪,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,324評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡陕赃,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,542評論 2 332
  • 正文 我和宋清朗相戀三年卵蛉,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片么库。...
    茶點故事閱讀 39,711評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡傻丝,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出诉儒,到底是詐尸還是另有隱情葡缰,我是刑警寧澤,帶...
    沈念sama閱讀 35,424評論 5 343
  • 正文 年R本政府宣布允睹,位于F島的核電站运准,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏缭受。R本人自食惡果不足惜胁澳,卻給世界環(huán)境...
    茶點故事閱讀 41,017評論 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望米者。 院中可真熱鬧韭畸,春花似錦、人聲如沸蔓搞。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,668評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽喂分。三九已至锦庸,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間蒲祈,已是汗流浹背甘萧。 一陣腳步聲響...
    開封第一講書人閱讀 32,823評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留梆掸,地道東北人扬卷。 一個月前我還...
    沈念sama閱讀 47,722評論 2 368
  • 正文 我出身青樓,卻偏偏與公主長得像酸钦,于是被迫代替她去往敵國和親怪得。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,611評論 2 353

推薦閱讀更多精彩內(nèi)容