Flink內(nèi)部Exactly Once三板斧:狀態(tài)隐岛、狀態(tài)后端與檢查點

Flink是一個分布式的流處理引擎史飞,而流處理的其中一個特點就是7X24尖昏。那么,如何保障Flink作業(yè)的持續(xù)運行呢构资?Flink的內(nèi)部會將應用狀態(tài)(state)存儲到本地內(nèi)存或者嵌入式的kv數(shù)據(jù)庫(RocksDB)中抽诉,由于采用的是分布式架構,F(xiàn)link需要對本地生成的狀態(tài)進行持久化存儲吐绵,以避免因應用或者節(jié)點機器故障等原因導致數(shù)據(jù)的丟失迹淌,F(xiàn)link是通過checkpoint(檢查點)的方式將狀態(tài)寫入到遠程的持久化存儲,從而就可以實現(xiàn)不同語義的結果保障己单。通過本文唉窃,你可以了解到什么是Flink的狀態(tài),F(xiàn)link的狀態(tài)是怎么存儲的荷鼠,F(xiàn)link可選擇的狀態(tài)后端(statebackend)有哪些句携,什么是全局一致性檢查點,F(xiàn)link內(nèi)部如何通過檢查點實現(xiàn)Exactly Once的結果保障允乐。另外矮嫉,本文內(nèi)容較長削咆,建議關注加收藏。

什么是狀態(tài)


引子

關于什么是狀態(tài)蠢笋,我們先不做過多的分析拨齐。首先看一個代碼案例,其中案例1是Spark的WordCount代碼昨寞,案例2是Flink的WorkCount代碼瞻惋。

案例1:Spark WC

object WordCount {

? def main(args:Array[String]){

? val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")

? val ssc = new StreamingContext(conf, Seconds(5))

? val lines = ssc.socketTextStream("localhost", 9999)

? val words = lines.flatMap(_.split(" "))

? val pairs = words.map(word => (word, 1))

? val wordCounts = pairs.reduceByKey(_ + _)

? wordCounts.print()

? ssc.start()

? ssc.awaitTermination()}}

輸入:

C:\WINDOWS\system32>nc -lp 9999

hello spark

hello spark

輸出:

案例2:Flink WC

public class WordCount {

? ? public static void main(String[] args) throws Exception {

? ? ? ? StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

? ? ? ? DataStreamSource<String> streamSource = env.socketTextStream("localhost", 9999);

? ? ? ? SingleOutputStreamOperator<Tuple2<String,Integer>> words = streamSource.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {

? ? ? ? ? ? @Override

? ? ? ? ? ? public void flatMap(String value, Collector<Tuple2<String,Integer>> out) throws Exception {

? ? ? ? ? ? ? ? String[] splits = value.split("\\s");

? ? ? ? ? ? ? ? for (String word : splits) {

? ? ? ? ? ? ? ? ? ? out.collect(Tuple2.of(word, 1));

? ? ? ? ? ? ? ? }

? ? ? ? ? ? }

? ? ? ? });

? ? ? ? words.keyBy(0).sum(1).print();

? ? ? ? env.execute("WC");

? ? }}

輸入:

C:\WINDOWS\system32>nc -lp 9999

hello

Flink

hello

Flink

輸出:

從上面的兩個例子可以看出,在使用Spark進行詞頻統(tǒng)計時援岩,當前的統(tǒng)計結果不受歷史統(tǒng)計結果的影響歼狼,只計算接收的當前數(shù)據(jù)的結果,這個就可以理解為無狀態(tài)的計算享怀。再來看一下Flink的例子羽峰,可以看出當?shù)诙卧~頻統(tǒng)計時,把第一次的結果值也統(tǒng)計在了一起添瓷,即Flink把上一次的計算結果保存在了狀態(tài)里梅屉,第二次計算的時候會先拿到上一次的結果狀態(tài)隧土,然后結合新到來的數(shù)據(jù)再進行計算秽褒,這就可以理解成有狀態(tài)的計算喝峦,如下圖所示郑藏。



狀態(tài)的類別

Flink提供了兩種基本類型的狀態(tài):分別是Keyed State和Operator State。根據(jù)不同的狀態(tài)管理方式醒串,每種狀態(tài)又有兩種存在形式帖鸦,分別為:managed(托管狀態(tài))和raw(原生狀態(tài))易遣。具體如下表格所示咱筛。需要注意的是庶近,由于Flink推薦使用managed state,所以下文主要討論managed state眷蚓,對于raw state,本文不會做過多的討論反番。

managed state & raw state區(qū)別

Keyed State & Operator State

Keyed State

Keyed State只能由作用在KeyedStream上面的函數(shù)使用沙热,該狀態(tài)與某個key進行綁定,即每一個key對應一個state罢缸。Keyed State按照key進行維護和訪問的篙贸,F(xiàn)link會為每一個Key都維護一個狀態(tài)實例,該狀態(tài)實例總是位于處理該key記錄的算子任務上枫疆,因此同一個key的記錄可以訪問到一樣的狀態(tài)爵川。如下圖所示,可以通過在一條流上使用keyBy()方法來生成一個KeyedStream息楔。Flink提供了很多種keyed state寝贡,具體如下:

ValueState<T>

用于保存類型為T的單個值扒披。用戶可以通過ValueState.value()來獲取該狀態(tài)值,通過ValueState.update()來更新該狀態(tài)圃泡。使用ValueStateDescriptor來獲取狀態(tài)句柄碟案。

ListState<T>

用于保存類型為T的元素列表,即key的狀態(tài)值是一個列表颇蜡。用戶可以使用ListState.add()或者ListState.addAll()將新元素添加到列表中价说,通過ListState.get()訪問狀態(tài)元素,該方法會返回一個可遍歷所有元素的Iterable對象风秤,注意ListState不支持刪除單個元素鳖目,但是用戶可以使用update(List values)來更新整個列表。使用ListStateDescriptor來獲取狀態(tài)句柄缤弦。

ReducingState<T>

調用add()方法添加值時领迈,會立即返回一個使用ReduceFunction聚合后的值,用戶可以使用ReducingState.get()來獲取該狀態(tài)值甸鸟。使用ReducingStateDescriptor來獲取狀態(tài)句柄惦费。

AggregatingState<IN, OUT>

與ReducingState類似,不同的是它使用的是AggregateFunction來聚合內(nèi)部的值抢韭,AggregatingState.get()方法會計算最終的結果并將其返回薪贫。使用AggregatingStateDescriptor來獲取狀態(tài)句柄

MapState<UK, UV>

用于保存一組key、value的映射刻恭,類似于java的Map集合瞧省。用戶可以通過get(UK key)方法獲取key對應的狀態(tài),可以通過put(UK k,UV value)方法添加一個鍵值鳍贾,可以通過remove(UK key)刪除給定key的值鞍匾,可以通過contains(UK key)判斷是否存在對應的key。使用MapStateDescriptor來獲取狀態(tài)句柄骑科。

FoldingState<T, ACC>

在Flink 1.4的版本中標記過時橡淑,在未來的版本中會被移除,使用AggregatingState進行代替咆爽。

值得注意的是梁棠,上面的狀態(tài)原語都支持通過State.clear()方法來進行清除狀態(tài)。另外斗埂,上述的狀態(tài)原語僅用于與狀態(tài)進行交互符糊,真正的狀態(tài)是存儲在狀態(tài)后端(后面會介紹狀態(tài)后端)的,通過該狀態(tài)原語相當于持有了狀態(tài)的句柄(handle)呛凶。

keyed State使用案例

下面給出一個MapState的使用案例男娄,關于ValueState的使用情況可以參考官網(wǎng),具體如下:

public class MapStateExample {

? ? //統(tǒng)計每個用戶每種行為的個數(shù)

? ? public static class UserBehaviorCnt extends RichFlatMapFunction<Tuple3<Long, String, String>, Tuple3<Long, String, Integer>> {

? ? ? ? //定義一個MapState句柄

? ? ? ? private transient MapState<String, Integer> behaviorCntState;

? ? ? ? // 初始化狀態(tài)

? ? ? ? @Override

? ? ? ? public void open(Configuration parameters) throws Exception {

? ? ? ? ? ? super.open(parameters);

? ? ? ? ? ? MapStateDescriptor<String, Integer> userBehaviorMapStateDesc = new MapStateDescriptor<>(

? ? ? ? ? ? ? ? ? ? "userBehavior",? // 狀態(tài)描述符的名稱

? ? ? ? ? ? ? ? ? ? TypeInformation.of(new TypeHint<String>() {}),? // MapState狀態(tài)的key的數(shù)據(jù)類型

? ? ? ? ? ? ? ? ? ? TypeInformation.of(new TypeHint<Integer>() {})? // MapState狀態(tài)的value的數(shù)據(jù)類型

? ? ? ? ? ? );

? ? ? ? ? ? behaviorCntState = getRuntimeContext().getMapState(userBehaviorMapStateDesc); // 獲取狀態(tài)

? ? ? ? }

? ? ? ? @Override

? ? ? ? public void flatMap(Tuple3<Long, String, String> value, Collector<Tuple3<Long, String, Integer>> out) throws Exception {

? ? ? ? ? ? Integer behaviorCnt = 1;

? ? ? ? ? ? // 如果當前狀態(tài)包括該行為,則+1

? ? ? ? ? ? if (behaviorCntState.contains(value.f1)) {

? ? ? ? ? ? ? ? behaviorCnt = behaviorCntState.get(value.f1) + 1;

? ? ? ? ? ? }

? ? ? ? ? ? // 更新狀態(tài)

? ? ? ? ? ? behaviorCntState.put(value.f1, behaviorCnt);

? ? ? ? ? ? out.collect(Tuple3.of(value.f0, value.f1, behaviorCnt));

? ? ? ? }

? ? }

? ? public static void main(String[] args) throws Exception {

? ? ? ? StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

? ? ? ? // 模擬數(shù)據(jù)源[userId,behavior,product]

? ? ? ? DataStreamSource<Tuple3<Long, String, String>> userBehaviors = env.fromElements(

? ? ? ? ? ? ? ? Tuple3.of(1L, "buy", "iphone"),

? ? ? ? ? ? ? ? Tuple3.of(1L, "cart", "huawei"),

? ? ? ? ? ? ? ? Tuple3.of(1L, "buy", "logi"),

? ? ? ? ? ? ? ? Tuple3.of(1L, "fav", "oppo"),

? ? ? ? ? ? ? ? Tuple3.of(2L, "buy", "huawei"),

? ? ? ? ? ? ? ? Tuple3.of(2L, "buy", "onemore"),

? ? ? ? ? ? ? ? Tuple3.of(2L, "fav", "iphone"));

? ? ? ? userBehaviors

? ? ? ? ? ? ? ? .keyBy(0)

? ? ? ? ? ? ? ? .flatMap(new UserBehaviorCnt())

? ? ? ? ? ? ? ? .print();

? ? ? ? env.execute("MapStateExample");

? ? }}

結果輸出:


狀態(tài)的生命周期管理(TTL)

對于任何類型Keyed State都可以設定狀態(tài)的生命周期(TTL),即狀態(tài)的存活時間模闲,以確保能夠在規(guī)定時間內(nèi)及時地清理狀態(tài)數(shù)據(jù)建瘫。如果配置了狀態(tài)的TTL,那么當狀態(tài)過期時围橡,存儲的狀態(tài)會被清除暖混。狀態(tài)生命周期功能可以通過StateTtlConfig配置,然后將StateTtlConfig配置傳入StateDescriptor中的enableTimeToLive方法中即可翁授。代碼示例如下:

StateTtlConfig ttlConfig = StateTtlConfig

? ? ? ? ? ? ? ? // 指定TTL時長為10S

? ? ? ? ? ? ? ? .newBuilder(Time.seconds(10))

? ? ? ? ? ? ? ? // 只對創(chuàng)建和寫入操作有效

? ? ? ? ? ? ? ? .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)

? ? ? ? ? ? ? ? // 不返回過期的數(shù)據(jù)

? ? ? ? ? ? ? ? .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)

? ? ? ? ? ? ? ? .build();

? ? ? ? // 初始化狀態(tài)

? ? ? ? @Override

? ? ? ? public void open(Configuration parameters) throws Exception {

? ? ? ? ? ? super.open(parameters);

? ? ? ? ? ? MapStateDescriptor<String, Integer> userBehaviorMapStateDesc = new MapStateDescriptor<>(

? ? ? ? ? ? ? ? ? ? "userBehavior",? // 狀態(tài)描述符的名稱

? ? ? ? ? ? ? ? ? ? TypeInformation.of(new TypeHint<String>() {}),? // MapState狀態(tài)的key的數(shù)據(jù)類型

? ? ? ? ? ? ? ? ? ? TypeInformation.of(new TypeHint<Integer>() {})? // MapState狀態(tài)的value的數(shù)據(jù)類型

? ? ? ? ? ? );

? ? ? ? ? ? // 設置stateTtlConfig

? ? ? ? ? ? userBehaviorMapStateDesc.enableTimeToLive(ttlConfig);

? ? ? ? ? ? behaviorCntState = getRuntimeContext().getMapState(userBehaviorMapStateDesc); // 獲取狀態(tài)

? ? ? ? }

在StateTtlConfig創(chuàng)建時拣播,newBuilder方法是必須要指定的,newBuilder中設定過期時間的參數(shù)收擦。對于其他參數(shù)都是可選的或使用默認值贮配。其中setUpdateType方法中傳入的類型有三種:

public enum UpdateType {

? ? ? ? //禁用TTL,永遠不會過期

? ? ? ? Disabled,

? ? ? ? // 創(chuàng)建和寫入時更新TTL

? ? ? ? OnCreateAndWrite,

? ? ? ? // 與OnCreateAndWrite類似,但是在讀操作時也會更新TTL

? ? ? ? OnReadAndWrite

? ? }

值得注意的是塞赂,過期的狀態(tài)數(shù)據(jù)根據(jù)UpdateType參數(shù)進行配置泪勒,只有被寫入或者讀取的時間才會更新TTL,也就是說如果某個狀態(tài)指標一直不被使用或者更新宴猾,則永遠不會觸發(fā)對該狀態(tài)數(shù)據(jù)的清理操作圆存,這種情況可能會導致系統(tǒng)中的狀態(tài)數(shù)據(jù)越來越大。目前用戶可以使用StateTtlConfig.cleanupFullSnapshot設定當觸發(fā)State Snapshot的時候清理狀態(tài)數(shù)據(jù)仇哆,但是改配置不適合用于RocksDB做增量Checkpointing的操作沦辙。

上面的StateTtlConfig創(chuàng)建時,可以指定setStateVisibility讹剔,用于狀態(tài)的可見性配置油讯,根據(jù)過期數(shù)據(jù)是否被清理來確定是否返回狀態(tài)數(shù)據(jù)。

/**

? ? * 是否返回過期的數(shù)據(jù)

? ? */

? ? public enum StateVisibility {

? ? ? ? //如果數(shù)據(jù)沒有被清理延欠,就可以返回

? ? ? ? ReturnExpiredIfNotCleanedUp,

? ? ? ? //永遠不返回過期的數(shù)據(jù),默認值

? ? ? ? NeverReturnExpired

? ? }



Operator State

Operator State的作用于是某個算子任務陌兑,這意味著所有在同一個并行任務之內(nèi)的記錄都能訪問到相同的狀態(tài) 。算子狀態(tài)不能通過其他任務訪問由捎,無論該任務是相同的算子兔综。如下圖所示。

Operator State是一種non-keyed state狞玛,與并行的操作算子實例相關聯(lián)邻奠,例如在Kafka Connector中,每個Kafka消費端算子實例都對應到Kafka的一個分區(qū)中为居,維護Topic分區(qū)和Offsets偏移量作為算子的Operator State。在Flink中可以實現(xiàn)ListCheckpointed<T extends Serializable>接口或者CheckpointedFunction 接口來實現(xiàn)一個Operator State杀狡。

首先蒙畴,我們先看一下這兩個接口的具體實現(xiàn),然后再給出這兩種接口的具體使用案例。先看一下ListCheckpointed接口的源碼膳凝,如下:

public interface ListCheckpointed<T extends Serializable> {

/**

* 獲取某個算子實例的當前狀態(tài)碑隆,該狀態(tài)包括該算子實例之前被調用時的所有結果

* 以列表的形式返回一個函數(shù)狀態(tài)的快照

* Flink觸發(fā)生成檢查點時調用該方法

* @param checkpointId checkpoint的ID,是一個唯一的、單調遞增的值

* @param timestamp Job Manager觸發(fā)checkpoint時的時間戳

* @return? 返回一個operator state list,如果為null時,返回空list

* @throws Exception

*/

List<T> snapshotState(long checkpointId, long timestamp) throws Exception;/**

* 初始化函數(shù)狀態(tài)時調用蹬音,可能是在作業(yè)啟動時或者故障恢復時

* 根據(jù)提供的列表恢復函數(shù)狀態(tài)

* 注意:當實現(xiàn)該方法時上煤,需要在RichFunction#open()方法之前調用該方法

* @param state 被恢復算子實例的state列表 ,可能為空

* @throws Exception

*/

void restoreState(List<T> state) throws Exception;}

使用Operator ListState時著淆,在進行擴縮容時劫狠,重分布的策略(狀態(tài)恢復的模式)如下圖所示:

上面的重分布策略為Even-split Redistribution,即每個算子實例中含有部分狀態(tài)元素的List列表永部,整個狀態(tài)數(shù)據(jù)是所有List列表的合集独泞。當觸發(fā)restore/redistribution動作時,通過將狀態(tài)數(shù)據(jù)平均分配成與算子并行度相同數(shù)量的List列表苔埋,每個task實例中有一個List懦砂,其可以為空或者含有多個元素。

我們再來看一下CheckpointedFunction接口组橄,源碼如下:

public interface CheckpointedFunction {

/**

* 會在生成檢查點之前調用

* 該方法的目的是確保檢查點開始之前所有狀態(tài)對象都已經(jīng)更新完畢

* @param context 使用FunctionSnapshotContext作為參數(shù)

*? ? ? ? ? ? ? ? 從FunctionSnapshotContext可以獲取checkpoint的元數(shù)據(jù)信息荞膘,

*? ? ? ? ? ? ? ? 比如checkpoint編號,JobManager在初始化checkpoint時的時間戳

* @throws Exception

*/

void snapshotState(FunctionSnapshotContext context) throws Exception;

/**

* 在創(chuàng)建checkpointedFunction的并行實例時被調用玉工,

* 在應用啟動或者故障重啟時觸發(fā)該方法的調用

* @param context 傳入FunctionInitializationContext對象羽资,

*? ? ? ? ? ? ? ? ? 可以使用該對象訪問OperatorStateStore和 KeyedStateStore對象,

*? ? ? ? ? ? ? ? ? 這兩個對象可以獲取狀態(tài)的句柄瓮栗,即通過Flink runtime來注冊函數(shù)狀態(tài)并返回state對象

*? ? ? ? ? ? ? ? ? 比如:ValueState削罩、ListState等

* @throws Exception

*/

void initializeState(FunctionInitializationContext context) throws Exception;}

CheckpointedFunction接口是用于指定有狀態(tài)函數(shù)的最底層的接口,該接口提供了用于注冊和維護keyed state 與operator state的hook(即可以同時使用keyed state 和operator state)费奸,另外也是唯一支持使用list union state弥激。關于Union List State,使用的是Flink為Operator state提供的另一種重分布的策略:Union Redistribution,即每個算子實例中含有所有狀態(tài)元素的List列表愿阐,當觸發(fā)restore/redistribution動作時微服,每個算子都能夠獲取到完整的狀態(tài)元素列表。具體如下圖所示:

ListCheckpointed

ListCheckpointed接口和CheckpointedFunction接口相比在靈活性上相對弱一些缨历,只能支持List類型的狀態(tài)以蕴,并且在數(shù)據(jù)恢復的時候僅支持even-redistribution策略。該接口不像Flink提供的Keyed State(比如Value State辛孵、ListState)那樣直接在狀態(tài)后端(state backend)注冊丛肮,需要將operator state實現(xiàn)為成員變量,然后通過接口提供的回調函數(shù)與狀態(tài)后端進行交互魄缚。使用代碼案例如下

public class ListCheckpointedExample {

? ? private static class UserBehaviorCnt extends RichFlatMapFunction<Tuple3<Long, String, String>, Tuple2<String, Long>> implements ListCheckpointed<Long> {

? ? ? ? private Long userBuyBehaviorCnt = 0L;

? ? ? ? @Override

? ? ? ? public void flatMap(Tuple3<Long, String, String> value, Collector<Tuple2<String, Long>> out) throws Exception {

? ? ? ? ? ? if(value.f1.equals("buy")){

? ? ? ? ? ? ? ? userBuyBehaviorCnt ++;

? ? ? ? ? ? ? ? out.collect(Tuple2.of("buy",userBuyBehaviorCnt));

? ? ? ? ? ? }

? ? ? ? }

? ? ? ? @Override

? ? ? ? public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {

? ? ? ? ? ? //返回單個元素的List集合宝与,該集合元素是用戶購買行為的數(shù)量

? ? ? ? ? ? return Collections.singletonList(userBuyBehaviorCnt);

? ? ? ? }

? ? ? ? @Override

? ? ? ? public void restoreState(List<Long> state) throws Exception {

? ? ? ? ? ? // 在進行擴縮容之后焚廊,進行狀態(tài)恢復,需要把其他subtask的狀態(tài)加在一起

? ? ? ? ? ? for (Long cnt : state) {

? ? ? ? ? ? ? ? userBuyBehaviorCnt += 1;

? ? ? ? ? ? }

? ? ? ? }

? ? }

? ? public static void main(String[] args) throws Exception {

? ? ? ? StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

? ? ? ? // 模擬數(shù)據(jù)源[userId,behavior,product]

? ? ? ? DataStreamSource<Tuple3<Long, String, String>> userBehaviors = env.fromElements(

? ? ? ? ? ? ? ? Tuple3.of(1L, "buy", "iphone"),

? ? ? ? ? ? ? ? Tuple3.of(1L, "cart", "huawei"),

? ? ? ? ? ? ? ? Tuple3.of(1L, "buy", "logi"),

? ? ? ? ? ? ? ? Tuple3.of(1L, "fav", "oppo"),

? ? ? ? ? ? ? ? Tuple3.of(2L, "buy", "huawei"),

? ? ? ? ? ? ? ? Tuple3.of(2L, "buy", "onemore"),

? ? ? ? ? ? ? ? Tuple3.of(2L, "fav", "iphone"));

? ? ? ? userBehaviors

? ? ? ? ? ? ? ? .flatMap(new UserBehaviorCnt())

? ? ? ? ? ? ? ? .print();

? ? ? ? env.execute("ListCheckpointedExample");

? ? }

}


CheckpointedFunction

CheckpointedFunction接口提供了更加豐富的操作习劫,比如支持Union list state咆瘟,可以訪問keyedState,關于重分布策略诽里,如果使用Even-split Redistribution策略袒餐,則通過context. getListState(descriptor)獲取Operator State;如果使用UnionRedistribution策略谤狡,則通過context. getUnionList State(descriptor)來獲取灸眼。使用案例如下:

public class CheckpointFunctionExample {

? ? private static class UserBehaviorCnt implements CheckpointedFunction, FlatMapFunction<Tuple3<Long, String, String>, Tuple3<Long, Long, Long>> {

? ? ? ? // 統(tǒng)計每個operator實例的用戶行為數(shù)量的本地變量

? ? ? ? private Long opUserBehaviorCnt = 0L;

? ? ? ? // 每個key的state,存儲key對應的相關狀態(tài)

? ? ? ? private ValueState<Long> keyedCntState;

? ? ? ? // 定義operator state,存儲算子的狀態(tài)

? ? ? ? private ListState<Long> opCntState;

? ? ? ? @Override

? ? ? ? public void flatMap(Tuple3<Long, String, String> value, Collector<Tuple3<Long, Long, Long>> out) throws Exception {

? ? ? ? ? ? if (value.f1.equals("buy")) {

? ? ? ? ? ? ? ? // 更新算子狀態(tài)本地變量值

? ? ? ? ? ? ? ? opUserBehaviorCnt += 1;

? ? ? ? ? ? ? ? Long keyedCount = keyedCntState.value();

? ? ? ? ? ? ? ? // 更新keyedstate的狀態(tài) ,判斷狀態(tài)是否為null豌汇,否則空指針異常

? ? ? ? ? ? ? ? keyedCntState.update(keyedCount == null ? 1L : keyedCount + 1 );

? ? ? ? ? ? ? ? // 結果輸出

? ? ? ? ? ? ? ? out.collect(Tuple3.of(value.f0, keyedCntState.value(), opUserBehaviorCnt));

? ? ? ? ? ? }

? ? ? ? }

? ? ? ? @Override

? ? ? ? public void snapshotState(FunctionSnapshotContext context) throws Exception {

? ? ? ? ? ? // 使用opUserBehaviorCnt本地變量更新operator state

? ? ? ? ? ? opCntState.clear();

? ? ? ? ? ? opCntState.add(opUserBehaviorCnt);

? ? ? ? }

? ? ? ? @Override

? ? ? ? public void initializeState(FunctionInitializationContext context) throws Exception {

? ? ? ? ? ? // 通過KeyedStateStore,定義keyedState的StateDescriptor描述符

? ? ? ? ? ? ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("keyedCnt", TypeInformation.of(new TypeHint<Long>() {

? ? ? ? ? ? }));

? ? ? ? ? ? // 通過OperatorStateStore,定義OperatorState的StateDescriptor描述符

? ? ? ? ? ? ListStateDescriptor opStateDescriptor = new ListStateDescriptor("opCnt", TypeInformation.of(new TypeHint<Long>() {

? ? ? ? ? ? }));

? ? ? ? ? ? // 初始化keyed state狀態(tài)值

? ? ? ? ? ? keyedCntState = context.getKeyedStateStore().getState(valueStateDescriptor);

? ? ? ? ? ? // 初始化operator state狀態(tài)

? ? ? ? ? ? opCntState = context.getOperatorStateStore().getListState(opStateDescriptor);

? ? ? ? ? ? // 初始化本地變量operator state

? ? ? ? ? ? for (Long state : opCntState.get()) {

? ? ? ? ? ? ? ? opUserBehaviorCnt += state;

? ? ? ? ? ? }

? ? ? ? }

? ? }

? ? public static void main(String[] args) throws Exception {

? ? ? ? StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

? ? ? ? // 模擬數(shù)據(jù)源[userId,behavior,product]

? ? ? ? DataStreamSource<Tuple3<Long, String, String>> userBehaviors = env.fromElements(

? ? ? ? ? ? ? ? Tuple3.of(1L, "buy", "iphone"),

? ? ? ? ? ? ? ? Tuple3.of(1L, "cart", "huawei"),

? ? ? ? ? ? ? ? Tuple3.of(1L, "buy", "logi"),

? ? ? ? ? ? ? ? Tuple3.of(1L, "fav", "oppo"),

? ? ? ? ? ? ? ? Tuple3.of(2L, "buy", "huawei"),

? ? ? ? ? ? ? ? Tuple3.of(2L, "buy", "onemore"),

? ? ? ? ? ? ? ? Tuple3.of(2L, "fav", "iphone"));

? ? ? ? userBehaviors

? ? ? ? ? ? ? ? .keyBy(0)

? ? ? ? ? ? ? ? .flatMap(new UserBehaviorCnt())

? ? ? ? ? ? ? ? .print();

? ? ? ? env.execute("CheckpointFunctionExample");

? ? }

}


什么是狀態(tài)后端


上面使用的狀態(tài)都需要存儲到狀態(tài)后端(StateBackend)幢炸,然后在checkpoint觸發(fā)時,將狀態(tài)持久化到外部存儲系統(tǒng)拒贱。Flink提供了三種類型的狀態(tài)后端宛徊,分別是基于內(nèi)存的狀態(tài)后端(MemoryStateBackend、基于文件系統(tǒng)的狀態(tài)后端(FsStateBackend)以及基于RockDB作為存儲介質的RocksDB StateBackend逻澳。這三種類型的StateBackend都能夠有效地存儲Flink流式計算過程中產(chǎn)生的狀態(tài)數(shù)據(jù)闸天,在默認情況下Flink使用的是MemoryStateBackend,區(qū)別見下表斜做。下面分別對每種狀態(tài)后端的特點進行說明苞氮。

狀態(tài)后端的類別

MemoryStateBackend

MemoryStateBackend將狀態(tài)數(shù)據(jù)全部存儲在JVM堆內(nèi)存中,包括用戶在使用DataStream API中創(chuàng)建的Key/Value State瓤逼,窗口中緩存的狀態(tài)數(shù)據(jù)笼吟,以及觸發(fā)器等數(shù)據(jù)。MemoryStateBackend具有非嘲云欤快速和高效的特點贷帮,但也具有非常多的限制,最主要的就是內(nèi)存的容量限制诱告,一旦存儲的狀態(tài)數(shù)據(jù)過多就會導致系統(tǒng)內(nèi)存溢出等問題撵枢,從而影響整個應用的正常運行。同時如果機器出現(xiàn)問題精居,整個主機內(nèi)存中的狀態(tài)數(shù)據(jù)都會丟失锄禽,進而無法恢復任務中的狀態(tài)數(shù)據(jù)。因此從數(shù)據(jù)安全的角度建議用戶盡可能地避免在生產(chǎn)環(huán)境中使用MemoryStateBackend靴姿。Flink將MemoryStateBackend作為默認狀態(tài)后端沃但。

MemoryStateBackend比較適合用于測試環(huán)境中,并用于本地調試和驗證佛吓,不建議在生產(chǎn)環(huán)境中使用宵晚。但如果應用狀態(tài)數(shù)據(jù)量不是很大恨旱,例如使用了大量的非狀態(tài)計算算子,也可以在生產(chǎn)環(huán)境中使MemoryStateBackend.

FsStateBackend

FsStateBackend是基于文件系統(tǒng)的一種狀態(tài)后端坝疼,這里的文件系統(tǒng)可以是本地文件系統(tǒng),也可以是HDFS分布式文件系統(tǒng)谆沃。創(chuàng)建FsStateBackend的構造函數(shù)如下:

FsStateBackend(Path checkpointDataUri, boolean asynchronousSnapshots)

其中path如果為本地路徑钝凶,其格式為“file:///data/flink/checkpoints”,如果path為HDFS路徑唁影,其格式為“hdfs://nameservice/flink/checkpoints”耕陷。FsStateBackend中第二個Boolean類型的參數(shù)指定是否以同步的方式進行狀態(tài)數(shù)據(jù)記錄,默認采用異步的方式將狀態(tài)數(shù)據(jù)同步到文件系統(tǒng)中据沈,異步方式能夠盡可能避免在Checkpoint的過程中影響流式計算任務哟沫。如果用戶想采用同步的方式進行狀態(tài)數(shù)據(jù)的檢查點數(shù)據(jù),則將第二個參數(shù)指定為True即可锌介。

相比于MemoryStateBackend, FsStateBackend更適合任務狀態(tài)非常大的情況嗜诀,例如應用中含有時間范圍非常長的窗口計算,或Key/value State狀態(tài)數(shù)據(jù)量非常大的場景孔祸,這時系統(tǒng)內(nèi)存不足以支撐狀態(tài)數(shù)據(jù)的存儲隆敢。同時FsStateBackend最大的好處是相對比較穩(wěn)定,在checkpoint時崔慧,將狀態(tài)持久化到像HDFS分布式文件系統(tǒng)中拂蝎,能最大程度保證狀態(tài)數(shù)據(jù)的安全性。

RocksDBStateBackend

與前面的狀態(tài)后端不同惶室,RocksDBStateBackend需要單獨引入相關的依賴包温自。RocksDB 是一個 key/value 的內(nèi)存存儲系統(tǒng),類似于HBase皇钞,是一種內(nèi)存磁盤混合的 LSM DB悼泌。當寫數(shù)據(jù)時會先寫進write buffer(類似于HBase的memstore),然后在flush到磁盤文件鹅士,當讀取數(shù)據(jù)時會現(xiàn)在block cache(類似于HBase的block cache)券躁,所以速度會很快。

RocksDBStateBackend在性能上要比FsStateBackend高一些掉盅,主要是因為借助于RocksDB存儲了最新熱數(shù)據(jù)也拜,然后通過異步的方式再同步到文件系統(tǒng)中,但RocksDBStateBackend和MemoryStateBackend相比性能就會較弱一些趾痘。

需要注意 RocksDB 不支持同步的 Checkpoint慢哈,構造方法中沒有同步快照這個選項。不過 RocksDB 支持增量的 Checkpoint永票,也是目前唯一增量 Checkpoint 的 Backend卵贱,意味著并不需要把所有 sst 文件上傳到 Checkpoint 目錄滥沫,僅需要上傳新生成的 sst 文件即可。它的 Checkpoint 存儲在外部文件系統(tǒng)(本地或HDFS)键俱,其容量限制只要單個 TaskManager 上 State 總量不超過它的內(nèi)存+磁盤兰绣,單 Key最大 2G,總大小不超過配置的文件系統(tǒng)容量即可编振。對于超大狀態(tài)的作業(yè)缀辩,例如天級窗口聚合等場景下可以使會用該狀態(tài)后端。

配置狀態(tài)后端


Flink默認使用的狀態(tài)后端是MemoryStateBackend踪央,所以不需要顯示配置臀玄。對于其他的狀態(tài)后端,都需要進行顯性配置畅蹂。在Flink中包含了兩種級別的StateBackend配置:一種是在程序中進行配置健无,該配置只對當前應用有效;另外一種是通過flink-conf.yaml進行全局配置液斜,一旦配置就會對整個Flink集群上的所有應用有效累贤。

應用級別配置

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));

如果使用RocksDBStateBackend則需要單獨引入rockdb依賴庫,如下:

<dependency>

? ? <groupId>org.apache.flink</groupId>

? ? <artifactId>flink-statebackend-rocksdb_2.11</artifactId>

? ? <version>1.10.0</version>

? ? <scope>provided</scope></dependency>

使用方式與FsStateBackend類似,如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:40010/flink/checkpoints"));

集群級別配置

具體的配置項在flink-conf.yaml文件中旗唁,如下代碼所示畦浓,參數(shù)state.backend指明StateBackend類型,state.checkpoints.dir配置具體的狀態(tài)存儲路徑检疫,代碼中使用filesystem作為StateBackend讶请,然后指定相應的HDFS文件路徑作為state的checkpoint文件夾。

# 使用filesystem存儲

state.backend:filesystem

# checkpoint存儲路徑

state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints

如果想用RocksDBStateBackend配置集群級別的狀態(tài)后端屎媳,可以使用下面的配置:

# 操作RocksDBStateBackend的線程數(shù)量夺溢,默認值為1

state.backend.rocksdb.checkpoint.transfer.thread.num: 1# 指定RocksDB存儲狀態(tài)數(shù)據(jù)的本地文件路徑

state.backend.rocksdb.localdir: /var/rockdb/checkpoints

# 用于指定定時器服務的工廠類實現(xiàn)類,默認為“HEAP”烛谊,也可以指定為“RocksDB”

state.backend.rocksdb.timer-service.factory: HEAP

什么是Checkpoint(檢查點)

上面講解了Flink的狀態(tài)以及狀態(tài)后端风响,狀態(tài)是存儲在狀態(tài)后端。為了保證state容錯丹禀,F(xiàn)link提供了處理故障的措施状勤,這種措施稱之為checkpoint(一致性檢查點)。checkpoint是Flink實現(xiàn)容錯的核心功能双泪,主要是周期性地觸發(fā)checkpoint持搜,將state生成快照持久化到外部存儲系統(tǒng)(比如HDFS)。這樣一來焙矛,如果Flink程序出現(xiàn)故障葫盼,那么就可以從上一次checkpoint中進行狀態(tài)恢復,從而提供容錯保障村斟。另外贫导,通過checkpoint機制抛猫,F(xiàn)link可以實現(xiàn)Exactly-once語義(Flink內(nèi)部的Exactly-once,關于端到端的exactly_once,Flink是通過兩階段提交協(xié)議實現(xiàn)的)。下面將會詳細分析Flink的checkpoint機制孩灯。

檢查點的生成

如上圖闺金,輸入流是用戶行為數(shù)據(jù),包括購買(buy)和加入購物車(cart)兩種峰档,每種行為數(shù)據(jù)都有一個偏移量掖看,統(tǒng)計每種行為的個數(shù)。

第一步:JobManager checkpoint coordinator 觸發(fā)checkpoint面哥。

第二步:假設當消費到[cart,3]這條數(shù)據(jù)時毅待,觸發(fā)了checkpoint尚卫。那么此時數(shù)據(jù)源會把消費的偏移量3寫入持久化存儲。

第三步:當寫入結束后尸红,source會將state handle(狀態(tài)存儲路徑)反饋給JobManager的checkpoint coordinator吱涉。

第四步:接著算子count buy與count cart也會進行同樣的步驟

第五步:等所有的算子都完成了上述步驟之后,即當 Checkpoint coordinator 收集齊所有 task 的 state handle外里,就認為這一次的 Checkpoint 全局完成了怎爵,向持久化存儲中再備份一個 Checkpoint meta 文件,那么整個checkpoint也就完成了盅蝗,如果中間有一個不成功鳖链,那么本次checkpoin就宣告失敗。

檢查點的恢復

通過上面的分析墩莫,或許你已經(jīng)對Flink的checkpoint有了初步的認識芙委。那么接下來,我們看一下是如何從檢查點恢復的狂秦。

任務失敗

重啟作業(yè)

恢復檢查點

繼續(xù)處理數(shù)據(jù)

上述過程具體總結如下:

第一步:重啟作業(yè)

第二步:從上一次檢查點恢復狀態(tài)數(shù)據(jù)

第三步:繼續(xù)處理新的數(shù)據(jù)


Flink內(nèi)部Exactly-Once實現(xiàn)


Flink提供了精確一次的處理語義灌侣,精確一次的處理語義可以理解為:數(shù)據(jù)可能會重復計算,但是結果狀態(tài)只有一個裂问。Flink通過Checkpoint機制實現(xiàn)了精確一次的處理語義侧啼,F(xiàn)link在觸發(fā)Checkpoint時會向Source端插入checkpoint barrier,checkpoint barriers是從source端插入的堪簿,并且會向下游算子進行傳遞痊乾。checkpoint barriers攜帶一個checkpoint ID,用于標識屬于哪一個checkpoint戴甩,checkpoint barriers將流邏輯是哪個分為了兩部分符喝。對于雙流的情況,通過barrier對齊的方式實現(xiàn)精確一次的處理語義甜孤。

關于什么是checkpoint barrier协饲,可以看一下CheckpointBarrier類的源碼描述畏腕,如下:

/**

* Checkpoint barriers用來在數(shù)據(jù)流中實現(xiàn)checkpoint對齊的.

* Checkpoint barrier由JobManager的checkpoint coordinator插入到Source中,

* Source會把barrier廣播發(fā)送到下游算子,當一個算子接收到了其中一個輸入流的Checkpoint barrier時,

* 它就會知道已經(jīng)處理完了本次checkpoint與上次checkpoint之間的數(shù)據(jù).

*

* 一旦某個算子接收到了所有輸入流的checkpoint barrier時,

* 意味著該算子的已經(jīng)處理完了截止到當前checkpoint的數(shù)據(jù)茉稠,

* 可以觸發(fā)checkpoint描馅,并將barrier向下游傳遞

*

* 根據(jù)用戶選擇的處理語義,在checkpoint完成之前會緩存后一次checkpoint的數(shù)據(jù)而线,

* 直到本次checkpoint完成(exactly once)

*

* checkpoint barrier的id是嚴格單調遞增的

*

*/

public class CheckpointBarrier extends RuntimeEvent {...}

可以看出checkpoint barrier主要功能是實現(xiàn)checkpoint對齊的铭污,從而可以實現(xiàn)Exactly-Once處理語義。

下面將會對checkpoint過程進行分解膀篮,具體如下:

圖1嘹狞,包括兩個流,每個任務都會消費一條用戶行為數(shù)據(jù)(包括購買(buy)和加購(cart))誓竿,數(shù)字代表該數(shù)據(jù)的偏移量磅网,count buy任務統(tǒng)計購買行為的個數(shù),coun cart統(tǒng)計加購行為的個數(shù)筷屡。

圖2涧偷,觸發(fā)checkpoint,JobManager會向每個數(shù)據(jù)源發(fā)送一個新的checkpoint編號毙死,以此來啟動檢查點生成流程燎潮。

圖3,當Source任務收到消息后扼倘,會停止發(fā)出數(shù)據(jù)确封,然后利用狀態(tài)后端觸發(fā)生成本地狀態(tài)檢查點,并把該checkpoint barrier以及checkpoint id廣播至所有傳出的數(shù)據(jù)流分區(qū)再菊。狀態(tài)后端會在checkpoint完成之后通知任務隅肥,隨后任務會向Job Manager發(fā)送確認消息。在將checkpoint barrier發(fā)出之后袄简,Source任務恢復正常工作腥放。

圖4,Source任務發(fā)出的checkpoint barrier會發(fā)送到與之相連的下游算子任務绿语,當任務收到一個新的checkpoint barrier時秃症,會繼續(xù)等待其他輸入分區(qū)的checkpoint barrier到來,這個過程稱之為barrier 對齊吕粹,checkpoint barrier到來之前會把到來的數(shù)據(jù)線緩存起來种柑。

圖5,任務收齊了全部輸入分區(qū)的checkpoint barrier之后匹耕,會通知狀態(tài)后端開始生成checkpoint聚请,同時會把checkpoint barrier廣播至下游算子。

圖6,任務在發(fā)出checkpoint barrier之后驶赏,開始處理因barrier對齊產(chǎn)生的緩存數(shù)據(jù)炸卑,在緩存的數(shù)據(jù)處理完之后,就會繼續(xù)處理輸入流數(shù)據(jù)煤傍。

圖7盖文,最終checkpoint barrier會被傳送到sink端,sink任務接收到checkpoint barrier之后蚯姆,會向其他算子任務一樣五续,將自身的狀態(tài)寫入checkpoint,之后向Job Manager發(fā)送確認消息龄恋。Job Manager接收到所有任務返回的確認消息之后疙驾,就會將此次檢查點標記為完成。

使用案例

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// checkpoint的時間間隔郭毕,如果狀態(tài)比較大荆萤,可以適當調大該值

env.enableCheckpointing(1000);

// 配置處理語義,默認是exactly-once

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// 兩個checkpoint之間的最小時間間隔铣卡,防止因checkpoint時間過長,導致checkpoint積壓

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

// checkpoint執(zhí)行的上限時間偏竟,如果超過該閾值煮落,則會中斷checkpoint

env.getCheckpointConfig().setCheckpointTimeout(60000);

// 最大并行執(zhí)行的檢查點數(shù)量,默認為1踊谋,可以指定多個蝉仇,從而同時出發(fā)多個checkpoint,提升效率

env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// 設定周期性外部檢查點殖蚕,將狀態(tài)數(shù)據(jù)持久化到外部系統(tǒng)中轿衔,

// 使用該方式不會在任務正常停止的過程中清理掉檢查點數(shù)據(jù)

env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

總結

本文首先從Flink的狀態(tài)入手,通過Spark的WordCount和Flink的Work Count進行說明什么是狀態(tài)睦疫。接著對狀態(tài)的分類以及狀態(tài)的使用進行了詳細說明害驹。然后對Flink提供的三種狀態(tài)后端進行討論,并給出了狀態(tài)后端的使用說明蛤育。最后,以圖解加文字的形式詳細解釋了Flink的checkpoint機制,并給出了使用Checkpoint時的程序配置使碾。

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末款筑,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子咕娄,更是在濱河造成了極大的恐慌亥揖,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,546評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件圣勒,死亡現(xiàn)場離奇詭異费变,居然都是意外死亡摧扇,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,224評論 3 395
  • 文/潘曉璐 我一進店門胡控,熙熙樓的掌柜王于貴愁眉苦臉地迎上來扳剿,“玉大人,你說我怎么就攤上這事昼激”诱溃” “怎么了?”我有些...
    開封第一講書人閱讀 164,911評論 0 354
  • 文/不壞的土叔 我叫張陵橙困,是天一觀的道長瞧掺。 經(jīng)常有香客問我,道長凡傅,這世上最難降的妖魔是什么辟狈? 我笑而不...
    開封第一講書人閱讀 58,737評論 1 294
  • 正文 為了忘掉前任,我火速辦了婚禮夏跷,結果婚禮上哼转,老公的妹妹穿的比我還像新娘。我一直安慰自己槽华,他們只是感情好壹蔓,可當我...
    茶點故事閱讀 67,753評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著猫态,像睡著了一般佣蓉。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上亲雪,一...
    開封第一講書人閱讀 51,598評論 1 305
  • 那天勇凭,我揣著相機與錄音,去河邊找鬼义辕。 笑死虾标,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的灌砖。 我是一名探鬼主播夺巩,決...
    沈念sama閱讀 40,338評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼周崭!你這毒婦竟也來了柳譬?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 39,249評論 0 276
  • 序言:老撾萬榮一對情侶失蹤续镇,失蹤者是張志新(化名)和其女友劉穎美澳,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,696評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡制跟,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,888評論 3 336
  • 正文 我和宋清朗相戀三年舅桩,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片雨膨。...
    茶點故事閱讀 40,013評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡擂涛,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出聊记,到底是詐尸還是另有隱情撒妈,我是刑警寧澤,帶...
    沈念sama閱讀 35,731評論 5 346
  • 正文 年R本政府宣布排监,位于F島的核電站狰右,受9級特大地震影響,放射性物質發(fā)生泄漏舆床。R本人自食惡果不足惜棋蚌,卻給世界環(huán)境...
    茶點故事閱讀 41,348評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望挨队。 院中可真熱鬧谷暮,春花似錦、人聲如沸盛垦。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,929評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽情臭。三九已至,卻和暖如春赌蔑,著一層夾襖步出監(jiān)牢的瞬間俯在,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,048評論 1 270
  • 我被黑心中介騙來泰國打工娃惯, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留跷乐,地道東北人。 一個月前我還...
    沈念sama閱讀 48,203評論 3 370
  • 正文 我出身青樓趾浅,卻偏偏與公主長得像愕提,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子皿哨,可洞房花燭夜當晚...
    茶點故事閱讀 44,960評論 2 355

推薦閱讀更多精彩內(nèi)容