Flink簡單入門

Apache Flink是什么竹宋?

Apache Flink 是一個框架和分布式處理引擎敞贡,用于在無邊界和有邊界數(shù)據(jù)流上進行有狀態(tài)的計算。Flink 能在所有常見集群環(huán)境中運行哗总,并能以內(nèi)存速度和任意規(guī)模進行計算。

Flink 主要包括 DataStream API倍试、DataSet API讯屈、Table API、SQL县习、Graph API 和 FlinkML 等′棠福現(xiàn)在 Flink 也有自己的生態(tài)圈,涉及離線數(shù)據(jù)處理躁愿、實時數(shù)據(jù)處理叛本、SQL 操作、圖計算和機器學(xué)習(xí)庫等彤钟。

Flink 的重要特點

(1)事件驅(qū)動型(Event-driven)

事件驅(qū)動型應(yīng)用是一類具有狀態(tài)的應(yīng)用来候,它從一個或多個事件流提取數(shù)據(jù),并根據(jù)到來的事件觸發(fā)計算逸雹、狀態(tài)更新或其他外部動作营搅。比較典型的就是以 kafka 為代表的消息隊列幾乎都是事件驅(qū)動型應(yīng)用。

與之不同的就是 SparkStreaming 微批次梆砸,如圖:

image.png

事件驅(qū)動型:

image.png

(2)流與批的世界觀

批處理的特點是有界转质、持久、大量辫樱,非常適合需要訪問全套記錄才能完成的計算工作峭拘,一般用于離線統(tǒng)計。
流處理的特點是無界狮暑、實時, 無需針對整個數(shù)據(jù)集執(zhí)行操作鸡挠,而是對通過系統(tǒng)傳輸?shù)拿總€數(shù)據(jù)項執(zhí)行操作,一般用于實時統(tǒng)計搬男。 在 spark 的世界觀中拣展,一切都是由批次組成的,離線數(shù)據(jù)是一個大批次缔逛,而實時數(shù)據(jù)是由一個一個無限的小批次組成的备埃。
而在 flink 的世界觀中,一切都是由流組成的褐奴,離線數(shù)據(jù)是有界限的流按脚,實時數(shù)據(jù)是一個沒有界限的流,這就是所謂的有界流和無界流敦冬。
無界數(shù)據(jù)流:無界數(shù)據(jù)流有一個開始但是沒有結(jié)束辅搬,它們不會在生成時終止并 提供數(shù)據(jù),必須連續(xù)處理無界流脖旱,也就是說必須在獲取后立即處理 event堪遂。對于無界數(shù)據(jù)流我們無法等待所有數(shù)據(jù)都到達介蛉,因為輸入是無界的,并且在任何時間點都不會完成溶褪。處理無界數(shù)據(jù)通常要求以特定順序(例如事件發(fā)生的順序)獲取 event币旧,以便能夠推斷結(jié)果完整性。
有界數(shù)據(jù)流:有界數(shù)據(jù)流有明確定義的開始和結(jié)束猿妈,可以在執(zhí)行任何計算之前通過獲取所有數(shù)據(jù)來處理有界流吹菱,處理有界流不需要有序獲取,因為可以始終對有界數(shù)據(jù)集進行排序彭则,有界流的處理也稱為批處理毁葱。

image

(3) 分層 api

image.png

最底層級的抽象僅僅提供了有狀態(tài)流,它將通過過程函數(shù)(Process Function) 被嵌入到 DataStream API 中贰剥。底層過程函數(shù)(Process Function) 與 DataStream API 相集成,使其可以對某些特定的操作進行底層的抽象筷频,它允許用戶可以自由地處理 來自一個或多個數(shù)據(jù)流的事件蚌成,并使用一致的容錯的狀態(tài)。除此之外凛捏,用戶可以注冊事件時間并處理時間回調(diào)担忧,從而使程序可以處理復(fù)雜的計算。 實際上坯癣,大多數(shù)應(yīng)用并不需要上述的底層抽象瓶盛,而是針對核心 API(Core APIs) 進行編程,比如 DataStream API(有界或無界流數(shù)據(jù))以及 DataSet API(有界數(shù)據(jù) 集)示罗。這些 API 為數(shù)據(jù)處理提供了通用的構(gòu)建模塊惩猫,比如由用戶定義的多種形式的
轉(zhuǎn)換(transformations),連接(joins)蚜点,聚合(aggregations)轧房,窗口操作(windows) 等等。DataSet API 為有界數(shù)據(jù)集提供了額外的支持绍绘,例如循環(huán)與迭代奶镶。這些 API處理的數(shù)據(jù)類型以類(classes)的形式由各自的編程語言所表示。
Table API 是以表為中心的聲明式編程陪拘,其中表可能會動態(tài)變化(在表達流數(shù)據(jù)時)厂镇。Table API 遵循(擴展的)關(guān)系模型:表有二維數(shù)據(jù)結(jié)構(gòu)(schema)(類似于關(guān)系數(shù)據(jù)庫中的表),同時 API 提供可比較的操作左刽,例如 select捺信、project、join悠反、group-by残黑、aggregate 等馍佑。Table API 程序聲明式地定義了什么邏輯操作應(yīng)該執(zhí)行,而不是準確地確定這些操作代碼的看上去如何梨水。
盡管 Table API 可以通過多種類型的用戶自定義函數(shù)(UDF)進行擴展拭荤,其仍不如核心 API 更具表達能力,但是使用起來卻更加簡潔(代碼量更少)疫诽。除此之外舅世,Table API 程序在執(zhí)行之前會經(jīng)過內(nèi)置優(yōu)化器進行化。你可以在表與DataStream/DataSet 之間無縫切換奇徒,以允許程序?qū)?Table API 與DataStream 以及 DataSet 混合使用雏亚。Flink 提供的最高層級的抽象是 SQL 。這一層抽象在語法與表達能力上與Table API 類似摩钙,但是是以 SQL 查詢表達式的形式表現(xiàn)程序罢低。SQL 抽象與 Table API交互密切,同時 SQL 查詢可以直接在 Table API 定義的表上執(zhí)行胖笛。

Flink 幾大模塊

  • ? Flink Table & SQL
  • ? Flink Gelly(圖計算)
  • ? Flink CEP(復(fù)雜事件處理)

Flink應(yīng)用場景

  • 電信市場營銷
    • 數(shù)據(jù)報表网持,廣告投放,業(yè)務(wù)流程需求
  • 物聯(lián)網(wǎng)
    • 傳感器實時采集和顯示长踊、實時報警功舀、交通運輸業(yè)
  • 電信業(yè)
    • 基站流量調(diào)配
  • 銀行金融
    • 實時計算和通知推送,實時檢測異常行為

Flink部署

Flink standalone模式安裝部署身弊,首先到官網(wǎng)!下載頁面下載辟汰,解壓安裝
進入conf目錄打開flink-conf.yaml 進行編輯
進入bin目錄 ,單機模式下使用Standalone 模式

.\start-cluster.bat

打開瀏覽器訪問 http://locahost:8081 對 flink 集群和任務(wù)進行監(jiān)控管理

Flink 架構(gòu)

Flink運行時架構(gòu)主要包括四個不同的組件阱佛,它們會在運行流處理應(yīng)用程序時協(xié)同工作:作業(yè)管理器(JobManager)帖汞、資源管理器(ResourceManager)、任務(wù)管理器(TaskManager)凑术,以及分發(fā)器(Dispatcher)涨冀。因為Flink是用Java和Scala實現(xiàn)的,所以所有組件都會運行在Java虛擬機上麦萤。每個組件的職責(zé)如下:

作業(yè)管理器(JobManager)

  1. 控制一個應(yīng)用程序執(zhí)行的主進程鹿鳖,也就是說,每個應(yīng)用程序都會被一個不同的JobManager 所控制執(zhí)行壮莹。
  2. JobManager 會先接收到要執(zhí)行的應(yīng)用程序翅帜,這個應(yīng)用程序會包括:作業(yè)圖(JobGraph)、邏輯數(shù)據(jù)流圖(logical dataflow graph)和打包了所有的類命满、庫和其它資源的JAR包涝滴。
  3. JobManager 會把JobGraph轉(zhuǎn)換成一個物理層面的數(shù)據(jù)流圖,這個圖被叫做“執(zhí)行圖”(ExecutionGraph),包含了所有可以并發(fā)執(zhí)行的任務(wù)歼疮。
  4. JobManager 會向資源管理器(ResourceManager)請求執(zhí)行任務(wù)必要的資源杂抽,也就是任務(wù)管理器(TaskManager)上的插槽(slot)。一旦它獲取到了足夠的資源韩脏,就會將執(zhí)行圖分發(fā)到真正運行它們的 TaskManager上缩麸。而在運行過程中,JobManager會負責(zé)所有需要中央?yún)f(xié)調(diào)的操作,比如說檢查點(checkpoints)的協(xié)調(diào)。

任務(wù)管理器(TaskManager)

  1. Flink中的工作進程桃纯。通常在Flink中會有多個TaskManager運行,每一個TaskManager都包含了一定數(shù)量的插槽(slots)其爵。插槽的數(shù)量限制了TaskManager能夠執(zhí)行的任務(wù)數(shù)量。
  2. 啟動之后,TaskManager會向資源管理器注冊它的插槽;收到資源管理器的指令后刃唐,TaskManager就會將一個或者多個插槽提供給JobManager調(diào)用。JobManager就可以向插槽分配任務(wù)(tasks)來執(zhí)行了界轩。
  3. 在執(zhí)行過程中唁桩,一個TaskManager可以跟其它運行同一應(yīng)用程序的TaskManager交換數(shù)據(jù)。

資源管理器(ResourceManager)

  1. 主要負責(zé)管理任務(wù)管理器(TaskManager)的插槽(slot)耸棒,TaskManger 插槽是Flink中定義的處理資源單元。
  2. Flink為不同的環(huán)境和資源管理工具提供了不同資源管理器报辱,比如YARN与殃、Mesos、K8s碍现,以及standalone部署幅疼。
  3. 當JobManager申請插槽資源時,ResourceManager會將有空閑插槽的TaskManager分配給JobManager昼接。如果ResourceManager沒有足夠的插槽來滿足JobManager的請求爽篷,它還可以向資源提供平臺發(fā)起會話,以提供啟動TaskManager進程的容器慢睡。

分發(fā)器(Dispatcher)

  1. 可以跨作業(yè)運行逐工,它為應(yīng)用提交提供了REST接口。
  2. 當一個應(yīng)用被提交執(zhí)行時漂辐,分發(fā)器就會啟動并將應(yīng)用移交給一個JobManager泪喊。
  3. Dispatcher也會啟動一個Web UI,用來方便地展示和監(jiān)控作業(yè)執(zhí)行的信息髓涯。
  4. Dispatcher在架構(gòu)中可能并不是必需的袒啼,這取決于應(yīng)用提交運行的方式。

任務(wù)提交流程

這是從一個較為高層級的視角,來看應(yīng)用中各組件的交互協(xié)作蚓再。如果部署的集群環(huán)境不同(例如YARN滑肉,Mesos,Kubernetes摘仅,standalone等)靶庙,其中一些步驟可以被省略,或是有些組件會運行在同一個JVM進程中实檀。


image.png

命令行提交job
bin/flink run -c <入口類> -p <并行度> <jar包路徑> <啟動參數(shù)>

$ bin/flink run -c ** WordCount -p 3 **.jar --host localhost --port 7777
Job has been submitted with JobID 33a5d1f00688a362837830f0b85fd75e

取消任務(wù)
bin/flink cancel <Job的ID>

Yarn模式任務(wù)提交流程

  1. Flink任務(wù)提交后惶洲,Client向HDFS上傳Flink的Jar包和配置
  2. 之后客戶端向Yarn ResourceManager提交任務(wù),ResourceManager分配Container資源并通知對應(yīng)的NodeManager啟動ApplicationMaster
  3. ApplicationMaster啟動后加載Flink的Jar包和配置構(gòu)建環(huán)境膳犹,去啟動JobManager恬吕,之后JobManager向Flink自身的RM進行申請資源,自身的RM向Yarn 的ResourceManager申請資源(因為是yarn模式须床,所有資源歸yarn RM管理)啟動TaskManager
  4. Yarn ResourceManager分配Container資源后铐料,由ApplicationMaster通知資源所在節(jié)點的NodeManager啟動TaskManager
  5. NodeManager加載Flink的Jar包和配置構(gòu)建環(huán)境并啟動TaskManager,TaskManager啟動后向JobManager發(fā)送心跳包豺旬,并等待JobManager向其分配任務(wù)钠惩。


    image.png

創(chuàng)建項目

搭建maven項目

mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.9.0

創(chuàng)建一個簡單工程去坐單詞數(shù)量統(tǒng)計的工程,根據(jù)空格統(tǒng)計單詞數(shù)量

    public class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
         public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            String[] tokens = value.toLowerCase().split(" ");
            for (String token : tokens) {
                if (token.length()  > 0) {
                    out.collect(new Tuple2<>(token, 1));
                }
            }
        }
    }

WordCount

public class WordCount {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> text = env.fromElements("").name("in-memory-input");
        DataStream<Tuple2<String, Integer>> counts =
                // The text lines read from the source are split into words
                // using a user-defined function. The tokenizer, implemented below,
                // will output each word as a (2-tuple) containing (word, 1)
                text.flatMap(new Tokenizer())
                        .name("tokenizer")
                        // keyBy groups tuples based on the "0" field, the word.
                        // Using a keyBy allows performing aggregations and other
                        // stateful transformations over data on a per-key basis.
                        // This is similar to a GROUP BY clause in a SQL query.
                        .keyBy(value -> value.f0)
                        // For each key, we perform a simple sum of the "1" field, the count.
                        // If the input data stream is bounded, sum will output a final count for
                        // each word. If it is unbounded, it will continuously output updates
                        // each time it sees a new instance of each word in the stream.
                        .sum(1)
                        .name("counter");
            counts.print().name("print-sink");
        env.execute("WordCount");
    }
}

程序與數(shù)據(jù)流

1.所有的Flink程序都是由三部分組成的: Source 、Transformation 和 Sink族阅。

  1. Source 負責(zé)讀取數(shù)據(jù)源篓跛,Transformation 利用各種算子進行處理加工,Sink 負責(zé)輸出


    image.png
  2. 在運行時坦刀,F(xiàn)link上運行的程序會被映射成“邏輯數(shù)據(jù)流”(dataflows)愧沟,它包含了這三部分
  3. 每一個dataflow以一個或多個sources開始以一個或多個sinks結(jié)束。dataflow類似于任意的有向無環(huán)圖(DAG)
  4. 在大部分情況下鲤遥,程序中的轉(zhuǎn)換運算(transformations)跟dataflow中的算子(operator)是一一對應(yīng)的關(guān)系


    image.png

任務(wù)調(diào)度原理

  1. 客戶端不是運行時和程序執(zhí)行的一部分沐寺,但它用于準備并發(fā)送dataflow(JobGraph)給Master(JobManager),然后盖奈,客戶端斷開連接或者維持連接以等待接收計算結(jié)果混坞。而Job Manager會產(chǎn)生一個執(zhí)行圖(Dataflow Graph)
  2. 當 Flink 集群啟動后,首先會啟動一個 JobManger 和一個或多個的 TaskManager钢坦。由 Client 提交任務(wù)給 JobManager究孕,JobManager 再調(diào)度任務(wù)到各個 TaskManager 去執(zhí)行,然后 TaskManager 將心跳和統(tǒng)計信息匯報給 JobManager爹凹。TaskManager 之間以流的形式進行數(shù)據(jù)的傳輸蚊俺。上述三者均為獨立的 JVM 進程。
  3. Client 為提交 Job 的客戶端逛万,可以是運行在任何機器上(與 JobManager 環(huán)境連通即可)泳猬。提交 Job 后批钠,Client 可以結(jié)束進程(Streaming的任務(wù)),也可以不結(jié)束并等待結(jié)果返回得封。
  4. JobManager 主要負責(zé)調(diào)度 Job 并協(xié)調(diào) Task 做 checkpoint埋心,職責(zé)上很像 Storm 的 Nimbus。從 Client 處接收到 Job 和 JAR 包等資源后忙上,會生成優(yōu)化后的執(zhí)行計劃拷呆,并以 Task 的單元調(diào)度到各個 TaskManager 去執(zhí)行。
  5. TaskManager 在啟動的時候就設(shè)置好了槽位數(shù)(Slot)疫粥,每個 slot 能啟動一個 Task茬斧,Task 為線程。從 JobManager 處接收需要部署的 Task梗逮,部署啟動后项秉,與自己的上游建立 Netty 連接,接收數(shù)據(jù)并處理慷彤。


    image.png

TaskManger與Slots與parallelism

  1. Flink 中每一個 TaskManager 都是一個JVM進程娄蔼,它可能會在獨立的線程上執(zhí)行一個或多個子任務(wù)
  2. 為了控制一個 TaskManager 能接收多少個 task, TaskManager 通過 task slot 來進行控制(一個 TaskManager 至少有一個 slot)
  3. 圖中每個Task Manager中的Slot為3個底哗,那么兩個Task Manager一共有六個Slot, 而這6個Slot代表著Task Manager最大的并發(fā)執(zhí)行能力岁诉,一共能可以執(zhí)行6個task進行同時執(zhí)行
  4. Slot是靜態(tài)概念,代表著Task Manager具有的并發(fā)執(zhí)行能力跋选,可以通過參數(shù)taskmanager.numberOfTaskSlots進行配置
  5. 為了控制一個 TaskManager 能接收多少個 task涕癣, TaskManager 通過 task slot 來進行控制(一個 TaskManager 至少有一個 slot)
  6. 圖中Source和Map是一個Task,且并行度(我們設(shè)置的setParallelism())都為1前标,指這個task任務(wù)的并行能力為1坠韩,只占用一個Slot資源
    ————————————————
    版權(quán)聲明:本文為CSDN博主「SmallScorpion」的原創(chuàng)文章,遵循CC 4.0 BY-SA版權(quán)協(xié)議候生,轉(zhuǎn)載請附上原文出處鏈接及本聲明。
    原文鏈接:https://blog.csdn.net/qq_40180229/article/details/106321149
    image.png
  7. 在第二張圖中為Flink的共享子任務(wù)绽昼,如果一個TaskManager一個slot唯鸭,那將意味著每個task group運行在獨立的JVM中(該JVM可能是通過一個特定的容器啟動的),而一個TaskManager多個slot意味著更多的subtask可以共享同一個JVM硅确。而在同一個JVM進程中的task將共享TCP連接(基于多路復(fù)用)和心跳消息目溉。它們也可能共享數(shù)據(jù)集和數(shù)據(jù)結(jié)構(gòu),因此這減少了每個task的負載菱农。
  8. 并行度parallelism是動態(tài)概念缭付,即TaskManager運行程序時實際使用的并發(fā)能力,可以通過參數(shù)parallelism.default進行配置循未。


    image.png
  9. 也就是說陷猫,假設(shè)一共有3個TaskManager,每一個TaskManager中的分配3個TaskSlot,也就是每個TaskManager可以接收3個task绣檬,一共9個TaskSlot足陨,如果我們設(shè)置parallelism.default=1,即運行程序默認的并行度為1娇未,9個TaskSlot只用了1個墨缘,有8個空閑,因此零抬,設(shè)置合適的并行度才能提高效率镊讼。


    image.png

    image.png
  10. 一個特定算子的 子任務(wù)(subtask)的個數(shù)被稱之為其并行度(parallelism),我們可以對單獨的每個算子進行設(shè)置并行度平夜,也可以直接用env設(shè)置全局的并行度蝶棋,更可以在頁面中去指定并行度。
  11. 最后褥芒,由于并行度是實際Task Manager處理task 的能力嚼松,而一般情況下,一個 stream 的并行度锰扶,可以認為就是其所有算子中最大的并行度献酗,則可以得出在設(shè)置Slot時,在所有設(shè)置中的最大設(shè)置的并行度大小則就是所需要設(shè)置的Slot的數(shù)量坷牛。


    image.png

資料來源
https://blog.csdn.net/qq_40180229/article/details/106321149
https://zhuanlan.zhihu.com/p/138107079

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末罕偎,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子京闰,更是在濱河造成了極大的恐慌颜及,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,126評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件蹂楣,死亡現(xiàn)場離奇詭異俏站,居然都是意外死亡,警方通過查閱死者的電腦和手機痊土,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,254評論 2 382
  • 文/潘曉璐 我一進店門肄扎,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人赁酝,你說我怎么就攤上這事犯祠。” “怎么了酌呆?”我有些...
    開封第一講書人閱讀 152,445評論 0 341
  • 文/不壞的土叔 我叫張陵衡载,是天一觀的道長。 經(jīng)常有香客問我隙袁,道長痰娱,這世上最難降的妖魔是什么弃榨? 我笑而不...
    開封第一講書人閱讀 55,185評論 1 278
  • 正文 為了忘掉前任,我火速辦了婚禮猜揪,結(jié)果婚禮上惭墓,老公的妹妹穿的比我還像新娘。我一直安慰自己而姐,他們只是感情好腊凶,可當我...
    茶點故事閱讀 64,178評論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著拴念,像睡著了一般钧萍。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上政鼠,一...
    開封第一講書人閱讀 48,970評論 1 284
  • 那天风瘦,我揣著相機與錄音,去河邊找鬼公般。 笑死万搔,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的官帘。 我是一名探鬼主播瞬雹,決...
    沈念sama閱讀 38,276評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼刽虹!你這毒婦竟也來了酗捌?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 36,927評論 0 259
  • 序言:老撾萬榮一對情侶失蹤涌哲,失蹤者是張志新(化名)和其女友劉穎胖缤,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體阀圾,經(jīng)...
    沈念sama閱讀 43,400評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡哪廓,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,883評論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了初烘。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片涡真。...
    茶點故事閱讀 37,997評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖账月,靈堂內(nèi)的尸體忽然破棺而出综膀,到底是詐尸還是另有隱情澳迫,我是刑警寧澤局齿,帶...
    沈念sama閱讀 33,646評論 4 322
  • 正文 年R本政府宣布,位于F島的核電站橄登,受9級特大地震影響抓歼,放射性物質(zhì)發(fā)生泄漏讥此。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,213評論 3 307
  • 文/蒙蒙 一谣妻、第九天 我趴在偏房一處隱蔽的房頂上張望萄喳。 院中可真熱鬧,春花似錦蹋半、人聲如沸他巨。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,204評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽染突。三九已至,卻和暖如春辈灼,著一層夾襖步出監(jiān)牢的瞬間份企,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,423評論 1 260
  • 我被黑心中介騙來泰國打工巡莹, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留司志,地道東北人。 一個月前我還...
    沈念sama閱讀 45,423評論 2 352
  • 正文 我出身青樓降宅,卻偏偏與公主長得像骂远,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子钉鸯,可洞房花燭夜當晚...
    茶點故事閱讀 42,722評論 2 345

推薦閱讀更多精彩內(nèi)容