Flink入門學(xué)習(xí)

Flink 原理架構(gòu)

下圖是官網(wǎng)的一個架構(gòu)圖,有以下特點:
1:數(shù)據(jù)源有實時數(shù)據(jù)非實時數(shù)據(jù)后专,比如數(shù)據(jù)庫褪迟、文件系統(tǒng)等
2:

image.png

用來做什么

  • 處理有邊界和無邊界的數(shù)據(jù)


    image.png

    1:無邊界數(shù)據(jù)流(Unbounded streams)
    2:有邊界數(shù)據(jù)流(bounded Streams)

部署

可以和各種流行的分布式資源管理工具整合食磕,比如: Hadoop YARN, Apache Mesos, and Kubernetes

利用內(nèi)存計算

Flink 主要使用內(nèi)存來計算尽棕。
因此延遲低,
Flink 周期性的異步的把內(nèi)存中的狀態(tài)持久化彬伦,來保證出現(xiàn)故障的時候能夠保證 exactly-onece 的效果滔悉。


image.png

Flink的一些入門操作

Flink是什么

Flink 作為新一代流式大數(shù)據(jù)處理框架,已獲得阿里单绑、美團等諸多大廠的青睞
流處理并不是一個新概念氧敢,但是要做好并不是一件容易的事情。提到流處理询张,我們最先想到的可能是金融交易孙乖、信號檢測以及地圖導(dǎo)航等領(lǐng)域的應(yīng)用。但是近年來隨著信息技術(shù)的發(fā)展份氧,除了前面提到的三個領(lǐng)域唯袄,其它方向?qū)?shù)據(jù)時效性的要求也越來越高。隨著 Hadoop 生態(tài)的崛起蜗帜,Storm恋拷、Spark Streaming、Samza厅缺、MillWheel 等一眾流處理技術(shù)開始走入大眾視野蔬顾,但是我們最熟悉的應(yīng)該還是 Storm 和 Spark Steaming宴偿。
“高吞吐”、“低延遲”和”exactly-once“是衡量一個流處理框架的重要指標诀豁。 Storm 雖然提供了低延遲的流處理窄刘,但是在高吞吐方面的表現(xiàn)并不算佳,可以說基本滿足不了日益暴漲的數(shù)據(jù)量舷胜,而且也沒辦法保證精準一次消費娩践。Spark Streaming 中通過微批次的批處理來模擬流處理,只要當批處理的批次分的足夠小烹骨,那么從宏觀上來看就是流處理翻伺,這也是 Spark Steaming 的核心思想。通過微觀批處理的方式沮焕,Spark Streaming 也實現(xiàn)了高吞吐和 exactly-once 語義吨岭,時效性也有了大幅提升,在很長一段時間里占據(jù)流處理榜首峦树。但是受限于其實現(xiàn)方式辣辫,依然存在幾秒的延遲,對于那些實時性要求較高的領(lǐng)域來說依然不夠完美空入。 在這樣的背景下络它,F(xiàn)link 應(yīng)用而生族檬。
Apache Flink 是為分布式歪赢、高性能、隨時可用以及準確的流處理應(yīng)用程序打造的開源流處理框架单料,用于對無界和有界數(shù)據(jù)流進行有狀態(tài)計算埋凯。Flink 最早起源于在 2010 ~ 2014 年,由 3 所地處柏林的大學(xué)和歐洲的一些其它大學(xué)共同進行研究的名為 Stratosphere 的項目扫尖。2014 年 4 月 Stratosphere 將其捐贈給 Apache 軟件基 金會白对, 初始成員是 Stratosphere 系統(tǒng)的核心開發(fā)人員,2014 年 12 月换怖,F(xiàn)link 一躍成為 Apache 軟件基金會的頂級項目甩恼。在 2015 年,阿里也加入到了 Flink 的開發(fā)工作中沉颂,并貢獻了至少 150 萬行代碼条摸。
Flink 一詞在德語中有著“靈巧”、“快速”的意思铸屉,它的 logo 原型也是柏林常見的一種松鼠钉蒲,以身材嬌小、靈活著稱彻坛,為該項目取這樣的名字和選定這樣的 logo 也正好符合 Flink 的特點和愿景顷啼。
注意踏枣,雖然我們說 Flink 是一個流處理框架,但是它同樣可以進行批處理钙蒙。因為在 Flink 的世界觀里茵瀑,批處理是流處理的一種特殊形式,這和 Spark 不同仪搔,在 Spark 中瘾婿,流處理是通過大批量的微批處理實現(xiàn)的。

Flink入門的一些知識點

image.png

Flink本地安裝及啟動和任務(wù)提交

下載flink烤咧,到flink官網(wǎng)下載
然后:

$ tar -xzf flink-1.13.2-bin-scala_2.11.tgz
$ cd flink-1.13.2-bin-scala_2.11
啟動flink
$ ./bin/start-cluster.sh
提交任務(wù)
$ ./bin/flink run examples/streaming/WordCount.jar
$ tail log/flink-*-taskexecutor-*.out
  (nymph,1)
  (in,3)
  (thy,1)
  (orisons,1)
  (be,4)
  (all,2)
  (my,1)
  (sins,1)
  (remember,1)
  (d,4)
關(guān)閉集群
$ ./bin/stop-cluster.sh

Flink界面
http://localhost:8081/

image.png

搭建Flink開發(fā)環(huán)境

建立IDEA maven項目


image.png

image.png

在Java同級目錄下新建scala目錄偏陪,并設(shè)置為source folder


image.png

右鍵點擊項目,增加Scala支持
image.png

接下來建立兩個 Scala的 Object:


image.png

然后配置maven以來和build內(nèi)容:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>kenian.test</groupId>
    <artifactId>FlinkLearning</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>1.11.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>1.11.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- 該插件用于將 Scala 代碼編譯成 class 文件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.4.6</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>   
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass></mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

注意煮嫌,高版本的flink笛谦,需要增加下面的依賴才能夠在本地啟動

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.12</artifactId>
    <version>1.11.0</version>
</dependency>

注意,如果要連接kafka昌阿,可以加上

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.12</artifactId>
    <version>1.9.1</version>
</dependency>

編寫批處理代碼并啟動

package test.kenian

import org.apache.flink.api.scala._


object BatchWordCount {
  def main(args: Array[String]): Unit = {
    // 創(chuàng)建執(zhí)行環(huán)境
    val env = ExecutionEnvironment.getExecutionEnvironment
    // 從文本讀取數(shù)據(jù)
    val inputPath = "D:/ubuntu_linux_base/words.txt"
    val inputDS: DataSet[String] = env.readTextFile(inputPath)
    // 計算邏輯
    val wordCountDS: AggregateDataSet[(String, Int)] = inputDS
      .flatMap(_.split(" "))
      .map((_, 1))
      .groupBy(0)
      .sum(1)
    // 打印輸出
    wordCountDS.print()
  }
}

直接IDEA啟動饥脑,得到下面的結(jié)果


image.png

編寫流代碼并且啟動

package test.kenian

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

object StreamWordCount {
  def main(args: Array[String]): Unit = {
    // 創(chuàng)建執(zhí)行環(huán)境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //  監(jiān)控Socket數(shù)據(jù)
    val textDstream: DataStream[String] = env.socketTextStream("localhost", 9999)
    // 導(dǎo)入隱式轉(zhuǎn)換
    import org.apache.flink.api.scala._
    // 計算邏輯
    val dataStream: DataStream[(String, Int)] = textDstream
      .flatMap(_.split(" "))
      .filter(_.nonEmpty)
      .map((_, 1))
      .keyBy(0)
      .sum(1)

    // 設(shè)置并行度
    dataStream.print().setParallelism(1)

    // 執(zhí)行
    env.execute("Socket stream word count")
  }
}

STEP1
    執(zhí)行 下面命令:
    nc -l -p 9999

STEP2:
    啟動Steaming代碼

STEP3
  在nc 命令下數(shù)據(jù)數(shù)據(jù),然后查看結(jié)果
  hello world 
  hello flink 
  hello spark 
  hello java

輸出結(jié)果如下:

(hello,1)
(flink,1)
(hello,2)
(spark,1)
(java,1)
(hello,3)

Flink算子

基礎(chǔ)算子

Map
Filter
Flatmap


image.png
package test.kenian.operator

import org.apache.flink.api.scala._

object BaseOperator {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val data = env.fromCollection(List(1, 2, 3, 4, 5))
    println("--------- map --------------")
    //map 算子
    val data2 = data.map( x => x * 10)
    data2.print()

    println("--------- filter --------------")
    // filter 算子
    val data3 = data.filter( x => x >= 3)
    data3.print()

    // flatmap 算子
    println("--------- flatmap --------------")
    // flatmap 最終會導(dǎo)致數(shù)據(jù)行數(shù)的改變懦冰,但是如果是map不會導(dǎo)致這兒問題
    val dataStr = env.fromCollection(List("a b","c d","e f"))
    val dataStr2 = dataStr.flatMap(_.split(" "))
    dataStr2.print()

    println("--------- map --------------")
    val dataStr1 = dataStr.map(_.split(" "))
    dataStr1.print()



  }
}

結(jié)果如下:
--------- map --------------
10
20
30
40
50
--------- filter --------------
3
4
5
--------- flatmap --------------
a
b
c
d
e
f
--------- map --------------
[Ljava.lang.String;@5ec46cdd
[Ljava.lang.String;@2324bfe7
[Ljava.lang.String;@112d1c8e

基于Key的算子

基于Key的算子分為三個大類:
? KeyBy
? Rolling Aggregation
o sum
o min
o max
o minBy
o maxBy
? reduce

KeyBy算子

該算子和其他算子一般一起使用灶轰,比如和Sum,看下面代碼

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

object KeyByOperator {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val data: DataStream[String] = env.readTextFile("D:/ubuntu_linux_base/userlog.txt")

    val userLogStream: DataStream[UserLog] = data.map(fun = log => {
      val arr: Array[String] = log.split(" ")
      UserLog(arr(0), arr(1), arr(2).toInt)
    })

    userLogStream
      .keyBy("city")
      .sum("duration")
      .print()

    env.execute("Key By Operator")
  }


  case class UserLog(name: String, city: String, duration: Int)

}

執(zhí)行結(jié)果如下:

6> UserLog(Jack,Beijing,100)
16> UserLog(Joker,Shanghai,200)
18> UserLog(William,Chengdu,600)
18> UserLog(William,Chengdu,900)
6> UserLog(Jack,Beijing,500)
16> UserLog(Joker,Shanghai,400)

原始數(shù)據(jù)如下:

Jack Beijing 100
Bob Chengdu 300
William Chengdu 600
Lily Shanghai 200
Loius Beijing 400
Joker Shanghai 200

你可能會疑惑刷钢,輸出的不應(yīng)該是有兩列笋颤,city 和 sum(duration)嗎?請注意内地,我們這里的計算是流處理伴澄,而不是離線的批處理,我們創(chuàng)建的環(huán)境是 StreamExecutionEnvironment阱缓。程序從上往下一行一行讀取文本非凌,然后按照 city 字段分組,當執(zhí)行到第一行的時候荆针,只有它自己敞嗡,所以輸出自己本身。當執(zhí)行到第二行的時候航背,city為 Chengdu 的也只有一行喉悴,所以也輸出了它自己。當程序執(zhí)行到第三行的時候沃粗,第二個 Chengdu 出現(xiàn)了粥惧,所以 sum 的結(jié)果是 900(300 + 600)。當程序執(zhí)行到第四行的時候最盅,Shanghai 第一次出現(xiàn)突雪,所以也只有它自己起惕。當程序執(zhí)行到第 5 行的時候,第二個 Beijing 出現(xiàn)了咏删,所以輸出的是 500(100 + 400)惹想。當程序執(zhí)行到第 6 行,第二個 Shanghai 出現(xiàn)了督函,所以輸出的是 400(200 + 200)嘀粱。

maxBy算子

和max的區(qū)別是,max只會展示對應(yīng)值的最大值辰狡。但是maxBy會替換整行的值锋叨。
有點類似于,max是屬于開窗宛篇,獨立出來一個值用于存儲最大值娃磺。
MaxBy 是直接找到最大的那行并保留下來。

Reduce算子

代碼如下:

userLogStream
  .keyBy("city")
  .reduce((x, y) => {
    UserLog(y.name, y.city, x.duration + y.duration)
  }).print

可以對數(shù)據(jù)做出聚合的效果叫倍。

多流轉(zhuǎn)換算子

Union 合并數(shù)據(jù)流

作用是把多個流的數(shù)據(jù)合并到一起偷卧,比如從兩個系統(tǒng)取的同一種交易信息,則可以合并到一起進行處理吆倦。
樣例代碼听诸,健康兩個端口 9999 9998 ,使用nc命令發(fā)送消息蚕泽,然后把消息展示出來晌梨。查看效果

ackage test.kenian.operator

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

object UnionOperator {
  def main(args: Array[String]): Unit = {
    // 創(chuàng)建執(zhí)行環(huán)境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 接收淘寶的訂單信息
    val taobaoOrder: DataStream[String] = env.socketTextStream("localhost", 9998)
    // 接收天貓的訂單信息
    val tianmaoOrder: DataStream[String] = env.socketTextStream("localhost", 9999)

    // 把兩個環(huán)境的信息合并
    val dataStream: DataStream[String] = taobaoOrder.union(tianmaoOrder)


    // 設(shè)置并行度

    // 設(shè)置并行度
    dataStream.print().setParallelism(1)

    // 執(zhí)行
    env.execute("Socket stream word count")
  }
}

Split select 分離數(shù)據(jù)流

Split 和 Select是完全想法的一種做法,
Split 可以把現(xiàn)有的一個數(shù)據(jù)流分為多個赛糟,然后通過select 可以單獨的查詢分裂出來的某個數(shù)據(jù)流派任。
這里有個小DEMO砸逊,就是根據(jù)溫度把不同的人放到不同的數(shù)據(jù)流里面璧南,比如說檢查出高危新冠人群。

package test.kenian.operator

import org.apache.flink.streaming.api.scala._

object SelectOperator {
  def main(args: Array[String]): Unit = {
    // 創(chuàng)建執(zhí)行環(huán)境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 接收天貓的訂單信息
    val dataStream: DataStream[String] = env.socketTextStream("localhost", 9999)

    val splitStream = dataStream.map(line => {
      val arr: Array[String] = line.split(" ")
      People(arr(0), arr(1).toFloat)
    }).split(people => {
      if (people.temperature > 36) Seq("fever") else Seq("normal")
    })

    val normal: DataStream[People] = splitStream.select("normal")
    val fever: DataStream[People] = splitStream.select("fever")

//    normal.print()
    fever.print()

    // 執(zhí)行
    env.execute("Socket Stream Split and Select")


  }


  case class People(name: String, temperature: Float)

}

以上代碼师逸,使用nc發(fā)送的數(shù)據(jù)個人的溫度大于36都會被上報

Source 和 Sink

Source

Flink自帶常見source有:
? env.readTextFile()
? env.socketTextStream()
? env.fromCollection()
? env.fromElement()
? env.generateSequence()

自定義source

但是實際工作中司倚,更加常見的是使用一些第三方的工具作為source,比如kafka
Kafka其實是自定義source的一種篓像。自定義source的原理如下:

package test.kenian.source

import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._

object Source {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val data = env.addSource(new MySource())

    data.setParallelism(1).print()


    env.execute()
  }

  /**
    * 自定義source动知,該source實現(xiàn)兩個方法
    * 分別是斷開方法和搜集數(shù)據(jù)方法
    *
    */
  class MySource extends SourceFunction[Integer] {
    var flag = true

    override def run(ctx: SourceFunction.SourceContext[Integer]): Unit = {
      var count = 0
      val list = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
      while (flag) {
        for (elem <- list) {
          ctx.collect(elem)  // 如果把數(shù)據(jù)導(dǎo)入到flink中,這是個橋梁员辩。如果自定義source盒粮,需要在這個地方進行控制,如何把數(shù)據(jù)放入到 sourceContext中
          count += 1

          if (count == 300) cancel()
        }
      }
    }

    override def cancel(): Unit = {
      flag = false
    }
  }


}

下面代碼中自定義了一個source奠滑,該source被flink流式處理丹皱。一個會發(fā)送300個數(shù)字過去妒穴。
然后在代碼中使用了該source。通過該source接收數(shù)據(jù)摊崭。

Kafka source

啟動kafka

到kafka的安裝目錄

nohup bin/zookeeper-server-start.sh config/zookeeper.properties > zookeeper.log 2>&1 &
nohup bin/kafka-server-start.sh config/server.properties  > kafka.log 2>&1 &
在kafka上建立topic
bin/kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic shiyanlou
在topic中寫入數(shù)據(jù)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic shiyanlou

然后可以手寫的方式隨便寫一些

實驗是否可以通過命令讀取數(shù)據(jù)

bin/kafka-console-consumer.sh --topic shiyanlou --from-beginning --bootstrap-server localhost:9092
通過Java代碼讀取kafka

這里需要注意一個非常重要的點讼油,因為我的本地沒有配置遠程機器的別名什么的,導(dǎo)致沒法連接上他們的環(huán)境呢簸。因此需要配置kafka的這兩個參數(shù):

advertised.host.name=127.0.0.1
advertised.port=9092

這兩個參數(shù)要根據(jù)自己的實際情況重新配置矮台,我因為是本地因此配置的就是127.0.0.1
Java代碼如下:

import java.util.Properties

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer}
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.kafka.common.serialization.StringDeserializer

object KafkaSource {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    import org.apache.flink.streaming.api.scala._

    val prop = new Properties()
    prop.setProperty("bootstrap.servers", "localhost:9092")
    prop.setProperty("zookeeper.connect", "localhost:2181")
    prop.setProperty("key.deserializer", classOf[StringDeserializer].getName)
    prop.setProperty("value.deserializer", classOf[StringDeserializer].getName)
    prop.setProperty("auto.offset.reset", "latest")

    // 添加 Kafka Source
    val data = env.addSource(new FlinkKafkaConsumer[String]("shiyanlou", new SimpleStringSchema(), prop))

    data.print()
    env.execute()
  }
}

Sink

常見flink自帶sink

? writeAsText
? writeAsCsv
? writeToSocket

注意,生成文件的個數(shù)和并行度分區(qū)數(shù)相關(guān)

自定義sink

代碼如下根时,類似source瘦赫,flink和sink對象的溝通通過的是 SinkFunction.Context

import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object FlinkSink {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val data = env.readTextFile("D:/ubuntu_linux_base/score.txt")
    data.addSink(new MySink())
    env.execute()
  }

  class MySink extends SinkFunction[String] {
    override def invoke(value: String, context: SinkFunction.Context[_]): Unit = {
      val time = context.currentProcessingTime()
      val waterMark = context.currentWatermark()
      println(s"$value : $time : $waterMark")
    }
  }

}

我們可以從該對象中取出對應(yīng)的結(jié)果值,然后打印出來

Flink狀態(tài)管理

什么叫flink的狀態(tài)

有狀態(tài)的計算是流處理框架中的重要功能之一蛤迎,因為很多復(fù)雜的業(yè)務(wù)場景都會涉及到數(shù)據(jù)前后狀態(tài)耸彪。Flink 本身也是帶狀態(tài)的流處理引擎,如果你在此之前有看過 Flink 的官方文檔忘苛,應(yīng)該有注意到Stateful這個關(guān)鍵詞蝉娜。本節(jié)實驗我們就重點學(xué)習(xí) Flink 中的狀態(tài)(State)相關(guān)知識點。

狀態(tài)相關(guān)知識點

  • State 分類
    • Keyed State
    • Operator State
  • Checkpoint
  • StateBackend

State的兩種類型:Keyed State 和 Operate State

Operator State:Operator State 可以作用在所有算子上扎唾,每個算子中并行的 Task 都可以共享一個狀態(tài)召川,或者說同?個算?中的多個 Task 的狀態(tài)是相同的。但是請注意胸遇,算子狀態(tài)不能由相同或不同算子的另一個實例訪問荧呐。Operator State 支持三種基本數(shù)據(jù)結(jié)構(gòu),分別是:
o ListState:存儲列表類型的狀態(tài)纸镊。
o UnionListState:存儲列表類型的狀態(tài)倍阐。和 ListState 的區(qū)別是,如果發(fā)生故障逗威,ListState 會將該算子的所有并發(fā)的狀態(tài)實例進行匯總峰搪,然后均分給新的 Task;而 UnionListState 只是將所有并發(fā)的狀態(tài)實例匯總起來凯旭,具體的劃分行為則由用戶進行定義概耻。
BroadcastState:用于廣播的算子狀態(tài)。如果一個算子有多項任務(wù)罐呼,并且它的每項任務(wù)狀態(tài)又都相同鞠柄,這種情況就可以使用廣播狀態(tài)。
? Keyed State:Keyed State 是作用在 KeyedStream 上的嫉柴。從名稱中就可以看出來厌杜,它的特點是和 Key 強相關(guān)的。在任務(wù)處理中计螺,F(xiàn)link 為每個 Key 維護一個狀態(tài)實例夯尽,而且相同 Key 所對應(yīng)的數(shù)據(jù)都會被分配到同一個任務(wù)中執(zhí)行侧馅。Keyed State 支持五種基本數(shù)據(jù)結(jié)構(gòu),分別是:
o ValueState:保存單個 Value呐萌,可以針對該 Value 進行 get/set 操作馁痴。
o ListState:保存一個列表,列表中可以存儲多個 Value肺孤÷拊危可以針對列表進行 add、get赠堵、update 操作小渊。
o MapState:保存 Key-Value 類型的值∶0龋可以針對其進行 get酬屉、put、remove 操作揍愁,還可以使用 contains 判斷某個 key 是否存在呐萨。
o ReducingState:保存一個單一值,該值是添加到狀態(tài)的所有值聚合的結(jié)果莽囤。
o AggregatingState:保存一個單一值谬擦,該值是添加到狀態(tài)的所有值聚合的結(jié)果。與 ReducingState 有些不同朽缎,聚合類型可能不同于添加到狀態(tài)的元素的類型惨远。接口和 ListState 相同,但是使用 add(IN)添加的元素本質(zhì)是通過使用指定的 AggregateFunction 進行聚合话肖。

Flink 架構(gòu)

主要進程

JobManager 作業(yè)管理器

控制一個應(yīng)用程序執(zhí)行的主進程北秽,也就是說,每個應(yīng)用程序都會被一個不同的 JobManager 所控制執(zhí)行最筒。JobManager 會先接收到要執(zhí)行的應(yīng)用程序贺氓,這個應(yīng)用程序會包括:作業(yè)圖(JobGraph)、邏輯數(shù)據(jù)流圖(logical dataflow graph)和打包了所有的類是钥、庫和其它資源的 JAR 包掠归。JobManager 會把 JobGraph 轉(zhuǎn)換成一個物理層面的數(shù)據(jù)流圖缅叠,這個圖被叫做“執(zhí)行圖”(ExecutionGraph)悄泥,包含了所有可以并發(fā)執(zhí)行的任務(wù)。JobManager 會向資源管理器(ResourceManager)請求執(zhí)行任務(wù)必要的資源肤粱,也就是任務(wù)管理器(TaskManager)上的插槽(slot)弹囚。一旦它獲取到了足夠的資源,就會將執(zhí)行圖分發(fā)到真正運行它們的 TaskManager 上领曼。而在運行過程中鸥鹉,JobManager 會負責(zé)所有需要中央?yún)f(xié)調(diào)的操作蛮穿,比如說檢查點(checkpoints)的協(xié)調(diào)。

ResourceManager 資源管理器

主要負責(zé)管理任務(wù)管理器(TaskManager)的插槽(slot)毁渗,TaskManger 插槽是 Flink 中定義的處理資源單元践磅。Flink 為不同的環(huán)境和資源管理工具提供了不同資源管理器,比如 YARN灸异、Mesos府适、K8s,以及 standalone 部署肺樟。當 JobManager 申請插槽資源時檐春,ResourceManager 會將有空閑插槽的 TaskManager 分配給 JobManager。如果 ResourceManager 沒有足夠的插槽來滿足 JobManager 的請求么伯,它還可以向資源提供平臺發(fā)起會話疟暖,以提供啟動 TaskManager 進程的容器。另外田柔,ResourceManager 還負責(zé)終止空閑的 TaskManager俐巴,釋放計算資源。

TaskManager 任務(wù)管理器

Flink 中的工作進程硬爆。通常在 Flink 中會有多個 TaskManager 運行窜骄,每一個 TaskManager 都包含了一定數(shù)量的插槽(slots)。插槽的數(shù)量限制了 TaskManager 能夠執(zhí)行的任務(wù)數(shù)量摆屯。啟動之后邻遏,TaskManager 會向資源管理器注冊它的插槽;收到資源管理器的指令后虐骑,TaskManager 就會將一個或者多個插槽提供給 JobManager 調(diào)用准验。JobManager 就可以向插槽分配任務(wù)(tasks)來執(zhí)行了。在執(zhí)行過程中廷没,一個 TaskManager 可以跟其它運行同一應(yīng)用程序的 TaskManager 交換數(shù)據(jù)糊饱。

Dispatcher 以及分發(fā)器

可以跨作業(yè)運行,它為應(yīng)用提交提供了 REST 接口颠黎。當一個應(yīng)用被提交執(zhí)行時另锋,分發(fā)器就會啟動并將應(yīng)用移交給一個 JobManager。由于是 REST 接口狭归,所以 Dispatcher 可以作為集群的一個 HTTP 接入點夭坪,這樣就能夠不受防火墻阻擋。Dispatcher 也會啟動一個 Web UI过椎,用來方便地展示和監(jiān)控作業(yè)執(zhí)行的信息室梅。Dispatcher 在架構(gòu)中可能并不是必需的,這取決于應(yīng)用提交運行的方式。

Slots

每?個 TaskManager(worker)是?個 JVM 進程亡鼠,它可能會在獨?的線程上執(zhí)??個或多個 subtask赏殃。為了控制?個 worker 能接收多少個 task,worker 通過 task slot 來進?控制(?個 worker ?少有?個 task slot)间涵。每個 task slot 表示 TaskManager 擁有資源的?個固定??的?集仁热。假如?個 TaskManager 有三個 slot,那么它會將其管理的內(nèi)存分成三份給各個 slot勾哩。資源 slot 化意味著?個 subtask 將不需要跟來?其 他 job 的 subtask 競爭被管理的內(nèi)存股耽,取?代之的是它將擁有?定數(shù)量的內(nèi)存儲備。需要注意的是钳幅,這? 不會涉及到 CPU 的隔離物蝙,slot ?前僅僅?來隔離 task 的受管理的內(nèi)存。
通過調(diào)整 task slot 的數(shù)量敢艰,允許?戶定義 subtask 之間如何互相隔離诬乞。如果?個 TaskManager ?個 slot,那將意味著每個 task group 運?在獨?的 JVM 中(該 JVM 可能是通過?個特定的容器啟動的)钠导,? ?個 TaskManager 多個 slot 意味著更多的 subtask 可以共享同?個 JVM震嫉。?在同?個 JVM 進程中的 task 將 共享 TCP 連接(基于多路復(fù)?)和?跳消息。它們也可能共享數(shù)據(jù)集和數(shù)據(jù)結(jié)構(gòu)牡属,因此這減少了每個 task 的負載票堵。
Task Slot 是靜態(tài)的概念,是指 TaskManager 具有的并發(fā)執(zhí)?能?逮栅,可以通過參數(shù) taskmanager.numberOfTaskSlots 進?配置悴势,?并?度 parallelism 是動態(tài)概念,即 TaskManager 運? 程序時實際使?的并發(fā)能?措伐,可以通過參數(shù) parallelism.default 進?配置特纤。也就是說,假設(shè)?共有 3 個 TaskManager侥加,每?個 TaskManager 中分配 3 個 TaskSlot捧存,也就是每個 TaskManager 可以接收 3 個 task,?共 9 個 TaskSlot担败,如果我們設(shè)置 parallelism.default=1昔穴,即運?程序默認的并?度為 1,9 個 TaskSlot 只?了 1 個提前,有 8 個空閑吗货,因此,設(shè)置合適的并?度才能提?效率岖研。

image.png

窗口

Window分類

根據(jù)上游數(shù)據(jù)集的類型可以分為:
Keyed Window
Global Window

根據(jù)業(yè)務(wù)場景來分卿操,又可以分為:
Count Window
Time Window警检。

TIME WINDOW

滾動窗口(Tumbling Window)

滾動窗口是按照固定時間進行切分孙援,而且所有窗口之間的數(shù)據(jù)不會重疊害淤,使用時只需要指定一個窗口長度即可。


image.png

代碼樣例:
滾動窗口的窗口大型厥邸(window size)是固定的窥摄,而且相鄰窗口之間是連續(xù)的。現(xiàn)在有這樣的業(yè)務(wù)場:某公司要求每 10 秒統(tǒng)計一次最近 10 秒內(nèi)各個電商平臺的訂單數(shù)量并輸出到大屏幕础淤,這時候就需要用到滾動窗口了崭放,我們只需要將窗口大小設(shè)置為 10 秒就可以。我們使用 netcat 發(fā)送 Socket 數(shù)據(jù)來模擬訂單流量鸽凶。

//  監(jiān)控Socket數(shù)據(jù)
val textDstream: DataStream[String] = env.socketTextStream("localhost", 9999)

val dataStream: DataStream[(String, Int)] = textDstream
  .filter(_.nonEmpty)
  .map((_, 1))
  .keyBy(0)
  .timeWindow(Time.seconds(10))
  .sum(1)

滑動窗口(Sliding Window)

滑動窗口(Sliding Window):滑動窗口有兩個參數(shù)桌硫,分別是窗口大小和窗口滑動時間言津,它是允許不同窗口的元素重疊的(同一個元素可以出現(xiàn)在不同的窗口中)。窗口大小指定數(shù)據(jù)統(tǒng)計的時間跨度,而滑動時間指定的是相鄰兩個窗口時間的時間偏移量融击。當滑動時間小于窗口大小的時候,數(shù)據(jù)會發(fā)生重疊闹啦;當滑動窗口大于窗口大小的時候遣蚀,窗口會出現(xiàn)不連續(xù)的情況(部分元素不會納入統(tǒng)計);當滑動時間和窗口大小相等的時候姑食,滑動窗口就是滾動窗口波岛,從這個角度來看,滾動窗口是滑動窗口的一個特殊存在音半。

image.png

簡單來說则拷,會有數(shù)據(jù)的重復(fù)統(tǒng)計。不同的窗口里面曹鸠,會有交叉的數(shù)據(jù)隔躲。
滾動窗口屬于特殊滑動窗口

代碼和滾動窗口類似:多了個滑動時間

//  監(jiān)控Socket數(shù)據(jù)
val textDstream: DataStream[String] = env.socketTextStream("localhost", 9999)

val dataStream: DataStream[(String, Int)] = textDstream
  .filter(_.nonEmpty)
  .map((_, 1))
  .keyBy(0)
  .timeWindow(Time.seconds(10), Time.seconds(5))
  .sum(1)

會話窗口(Session Window)

與前滾動窗口和滑動窗口不同的是,會話窗口沒有固定的滑動時間和窗口大小物延,而是通過一個 session gap 來指定窗口間隔宣旱。如果在 session gap 規(guī)定的時間內(nèi)沒有活躍數(shù)據(jù)進入的話,則認為當前窗口結(jié)束叛薯,下一個窗口開始浑吟。session gap 可以理解為相鄰元素的最大時間差。

簡單理解下耗溜,類似Tomcat的session组力,只要在當前session下有數(shù)據(jù)過來,則session繼續(xù)保持抖拴;超過session規(guī)定等待時間燎字,則session失效腥椒;當前session失效后,如果有新的數(shù)據(jù)進來候衍,則建立新的session笼蛛。

把一個session內(nèi)的數(shù)據(jù)進行統(tǒng)計

image.png

代碼如下:

val dataStream: DataStream[(String, Int)] = textDstream
  .filter(_.nonEmpty)
  .map((_, 1))
  .keyBy(0)
  .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
  .sum(1)

CountWindow

基于輸入數(shù)據(jù)量定義,與時間無關(guān)蛉鹿。Count Window 也可以細分為滾動窗口和滑動窗口滨砍,邏輯和 Time Window 中的滾動窗口和滑動窗口的邏輯類似,只是窗口大小和觸發(fā)條件由時間換成了相同 Key 元素的數(shù)量妖异。窗口大小是由相同 Key 元素的數(shù)量來觸發(fā)執(zhí)行惋戏,執(zhí)行時只計算元素數(shù)量達到窗口大小的 key 對應(yīng)的結(jié)果。

簡單來說他膳,key重復(fù)次數(shù)到達閾值响逢,則觸發(fā)計算。
適合業(yè)務(wù)那種規(guī)定某個消息出現(xiàn)多少次則計算或者上報一次

val dataStream: DataStream[(String, Int)] = textDstream
      .filter(_.nonEmpty)
      .map((_, 1))
      .keyBy(0)
//      .countWindow(3)  // 一個key出現(xiàn)了三次棕孙,則會觸發(fā)一次 針對該key的 計算舔亭。且只計算這3個key
      .countWindow(3,2) // 一個key出現(xiàn)了兩次,則觸發(fā)一次針對該key的計算散罕。且會計算該key最近3個key分歇。也就是說,會有重復(fù)統(tǒng)計的情況
      .sum(1)

Time和WaterMark

在流式數(shù)據(jù)處理中欧漱,如何保證數(shù)據(jù)的全局有序和 Exactly Once(精準一次消費)是非常重要的职抡。雖然數(shù)據(jù)在上游產(chǎn)生的時候是唯一并且有序的,但是數(shù)據(jù)從產(chǎn)生到進入 Flink 的過程中误甚,中間可能會由于負載均衡缚甩、網(wǎng)絡(luò)傳輸、分區(qū)等等原因造成數(shù)據(jù)亂序窑邦,為了應(yīng)對這種情況擅威,所以我們引入了時間語義和 WaterMark 的概念

內(nèi)容范圍
? Time
o Event Time (事件生成時間)
o Ingestion Time (事件接入時間)
o Processing Time (事件處理時間)
? Watermark
o Watermark 的概念
o Watermark 的使用

TIME時間語義

image.png

? Event Time:指的是事件產(chǎn)生的時間。通常由事件中的某個時間戳字段來表示冈钦,比如用戶登錄日志中所攜帶的時間字段郊丛、天氣信號檢測系統(tǒng)中采集到的天氣數(shù)據(jù)中所攜帶的時間字段。
? Ingestion Time:指的是事件進入 Flink 中的時間瞧筛。
? Processing Time:指的是時間被處理時的當前系統(tǒng)時間厉熟。比如某條數(shù)據(jù)進入 Flink 之后,我們運行到某個算子時的系統(tǒng)時間较幌,就是 Processing Time揍瑟。Processing Time 是 Flink 中的默認時間屬性
也就是說乍炉,我們窗口默認處理的時間其實是 Processing Time绢片。
但是有時候希望是 Ingestion Time滤馍。。底循。巢株。。而且從業(yè)務(wù)角度來說此叠,我們更加想使用 Event Time 纯续。随珠。灭袁。。窗看。茸歧。。显沈。软瞎。。拉讯。涤浇,可以參考下面代碼:

// 設(shè)置使用EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 設(shè)置使用IngestionTime
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

Watermark 原理

一般業(yè)務(wù)中以Event Time作為處理時間,但是魔慷,Event 產(chǎn)生的比較早只锭,到達flink的時間有可能比較遲。這個時候需要flink進行等待院尔。比如12:00:00 的event Time蜻展,F(xiàn)link等到12:00:05

WaterMark 代碼樣例

// 指定使用waterMark的方式處理處理時間,也就是指定EventTime邀摆,該時間是事件的業(yè)務(wù)日期
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val data: DataStream[UserLog] = env.socketTextStream("localhost", 9999)
  .map(line => {
    val arr = line.split(",")

    UserLog(arr(0).toLong, arr(1), arr(2), arr(3), arr(4).toLong)
  })

// 指定DataStream的WaterMark字段纵顾,并且以該字段作為基礎(chǔ)來處理數(shù)據(jù)
// 指定WaterMark字段以后,同時指定具體的等待時間
data.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[UserLog](Time.seconds(2)) {
  override def extractTimestamp(element: UserLog): Long = {
    element.time
  }
}).print()

Table API 和 SQL

image.png

首先是TableAPI栋盹,其實就是一種聲明式編程的方式施逾,類似于Spark的DataFrame和DataSet。
這是SQL的底層例获。

其實還是要加一些POM的依賴汉额,當然具體的版本根據(jù)自己的情況來

<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.12</artifactId>
            <version>1.11.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>1.11.0</version>
        </dependency>


         <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
            <version>1.11.0</version>
        </dependency>

TableAPI

看下面代碼樣例,在Flink批處理中躏敢,使用對Table的一些簡單SELECT 操作闷愤。

//初始化Table API的上下文環(huán)境
val tableEnv = BatchTableEnvironment.create(env)

tableEnv
  .connect(new FileSystem().path("D:/ubuntu_linux_base/userlog.log"))
  .withFormat(new OldCsv())
  .withSchema(new Schema()
    .field("time", DataTypes.BIGINT())
    .field("action", DataTypes.STRING())
    .field("city", DataTypes.STRING())
    .field("IP", DataTypes.STRING())
    .field("user_id", DataTypes.BIGINT())
  )
  .createTemporaryTable("temp_userlog")

tableEnv.from("temp_userlog").printSchema()
val res = tableEnv.from("temp_userlog").select("city")
tableEnv.toDataSet[Row](res).print();

這里面讀取以逗號分隔的文件,且最終定義了多個列件余。
然后讀取文件的schema信息讥脐,和展示文件的某個列的數(shù)據(jù)遭居。
如果是流式處理,其實代碼差不多旬渠,看下面代碼:

import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.types._


object StreamTableTest {
  def main(args: Array[String]): Unit = {
    import org.apache.flink.streaming.api.scala._
    val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = StreamTableEnvironment.create(streamEnv)
    val data = streamEnv.socketTextStream("localhost", 9999)
      .map(line => {
        val arr = line.split(",")
        UserLog(arr(0).toLong, arr(1), arr(2), arr(3), arr(4).toLong)
      })

    val table: Table = tableEnv.fromDataStream(data)
    table.printSchema()

    val res = table
  .where('city === "北京" || 'city === "成都")
  .groupBy('city)
  .select('city, 'user_id.count as 'cnt )

tableEnv
  .toRetractStream[Row](res)
  .print()


    streamEnv.execute()
  }

  case class UserLog(time: Long, action: String, city: String, ip: String, user_id: Long)


區(qū)別不是很大俱萍,TableAPI 聲明式編程適合批處理和流式處理兩種情況。且處理的過程中告丢,可以使用各種查詢條件等枪蘑,非常方便。

Flink SQL

創(chuàng)建 MyFlinkSql object岖免。還是針對”過濾出城市為北京和成都的用戶岳颇,并分別統(tǒng)計這兩個城市中的用戶數(shù)量“這個業(yè)務(wù)邏輯,對應(yīng)到 Flink SQL 中的語法為:

object MyFlinkSql {

  def main(args: Array[String]): Unit = {
    val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = StreamTableEnvironment.create(streamEnv)
    val data = streamEnv.socketTextStream("localhost", 9999)
      .map(line => {
        val arr = line.split(",")

        UserLog(arr(0).toLong, arr(1), arr(2), arr(3), arr(4))
      })

    val table = tableEnv.fromDataStream(data)
    //tableEnv.registerDataStream("temp_userlog", data) // 也可以直接在SQL中使用指定臨時表的名字的方式
    val res = tableEnv.sqlQuery(
      s"""
         |select
         |        city, count(user_id) as cnt
         |from
         |        $table
         |where
         |        city = '北京' or city = '成都'
         |group by
         |   city
         |""".stripMargin)

    tableEnv
      .toRetractStream[Row](res)
      .filter(_._1 == true)
      .print()

    tableEnv.execute("Table API")
  }

  case class UserLog(time: Long, action: String, city: String, ip: String, user_id: String)

}

可以通過sql的方式對流式的數(shù)據(jù)進行統(tǒng)計颅湘。

輸入下面數(shù)據(jù):

20210301121533,login,北京,118.128.11.31,0001
20210301121536,login,上海,10.90.113.150,0002
20210301121544,login,成都,112.112.31.33,0003
20210301121559,login,成都,101.132.93.24,0004
20210301121612,login,上海,189.112.89.78,0005
20210301121638,login,北京,113.52.101.50,0006
20210301121533,login,北京,118.128.11.31,0001
20210301121536,login,上海,10.90.113.150,0002
20210301121544,login,成都,112.112.31.33,0003
20210301121559,login,成都,101.132.93.24,0004
20210301121612,login,上海,189.112.89.78,0005
20210301121638,login,北京,113.52.101.50,0006
20210301121533,login,北京,118.128.11.31,0001
20210301121536,login,上海,10.90.113.150,0002
20210301121544,login,成都,112.112.31.33,0003
20210301121559,login,成都,101.132.93.24,0004
20210301121612,login,上海,189.112.89.78,0005
20210301121638,login,北京,113.52.101.50,0006

Flink處理結(jié)果如下:

13> (true,成都,1)
16> (true,北京,1)
13> (false,成都,1)
13> (true,成都,2)
16> (false,北京,1)
16> (true,北京,2)
16> (false,北京,2)
16> (true,北京,3)
13> (false,成都,2)
13> (true,成都,3)
13> (false,成都,3)
13> (true,成都,4)
16> (false,北京,3)
16> (true,北京,4)
16> (false,北京,4)
16> (true,北京,5)
13> (false,成都,4)
13> (true,成都,5)
13> (false,成都,5)
13> (true,成都,6)
16> (false,北京,5)
16> (true,北京,6)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末话侧,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子闯参,更是在濱河造成了極大的恐慌瞻鹏,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,590評論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件鹿寨,死亡現(xiàn)場離奇詭異新博,居然都是意外死亡,警方通過查閱死者的電腦和手機脚草,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,157評論 3 399
  • 文/潘曉璐 我一進店門赫悄,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人玩讳,你說我怎么就攤上這事涩蜘。” “怎么了熏纯?”我有些...
    開封第一講書人閱讀 169,301評論 0 362
  • 文/不壞的土叔 我叫張陵同诫,是天一觀的道長。 經(jīng)常有香客問我樟澜,道長误窖,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 60,078評論 1 300
  • 正文 為了忘掉前任秩贰,我火速辦了婚禮霹俺,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘毒费。我一直安慰自己丙唧,他們只是感情好,可當我...
    茶點故事閱讀 69,082評論 6 398
  • 文/花漫 我一把揭開白布觅玻。 她就那樣靜靜地躺著想际,像睡著了一般培漏。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上胡本,一...
    開封第一講書人閱讀 52,682評論 1 312
  • 那天牌柄,我揣著相機與錄音,去河邊找鬼侧甫。 笑死珊佣,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的披粟。 我是一名探鬼主播咒锻,決...
    沈念sama閱讀 41,155評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼僻爽!你這毒婦竟也來了虫碉?” 一聲冷哼從身側(cè)響起贾惦,我...
    開封第一講書人閱讀 40,098評論 0 277
  • 序言:老撾萬榮一對情侶失蹤胸梆,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后须板,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體碰镜,經(jīng)...
    沈念sama閱讀 46,638評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,701評論 3 342
  • 正文 我和宋清朗相戀三年习瑰,在試婚紗的時候發(fā)現(xiàn)自己被綠了绪颖。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,852評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡甜奄,死狀恐怖柠横,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情课兄,我是刑警寧澤牍氛,帶...
    沈念sama閱讀 36,520評論 5 351
  • 正文 年R本政府宣布,位于F島的核電站烟阐,受9級特大地震影響搬俊,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜蜒茄,卻給世界環(huán)境...
    茶點故事閱讀 42,181評論 3 335
  • 文/蒙蒙 一唉擂、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧檀葛,春花似錦玩祟、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,674評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽庆聘。三九已至,卻和暖如春勺卢,著一層夾襖步出監(jiān)牢的瞬間伙判,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,788評論 1 274
  • 我被黑心中介騙來泰國打工黑忱, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留宴抚,地道東北人。 一個月前我還...
    沈念sama閱讀 49,279評論 3 379
  • 正文 我出身青樓甫煞,卻偏偏與公主長得像菇曲,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子抚吠,可洞房花燭夜當晚...
    茶點故事閱讀 45,851評論 2 361

推薦閱讀更多精彩內(nèi)容