spark大數(shù)據(jù)架構初學入門基礎詳解

Spark是什么

a)?是一種通用的大數(shù)據(jù)計算框架

b)?Spark Core?離線計算

Spark SQL?交互式查詢

Spark Streaming?實時流式計算

Spark MLlib?機器學習

Spark GraphX?圖計算

c)?特點:

i.?一站式:一個技術堆棧解決大數(shù)據(jù)領域的計算問題

ii.?基于內(nèi)存

d)?Spark2009年誕生于伯克利大學的AMPLab實驗室

2010年正式開源了Spark項目

2013年Spark成為Apache下的項目

2014年飛速發(fā)展苛吱,成為Apache的頂級項目

2015年在國內(nèi)興起,代替mr,hive,storm等

作者:辛湜(shi)

e)?Spark和Hive:

Spark優(yōu)點:

i.?速度快

ii.?Spark SQL支持大量不同的數(shù)據(jù)源

f)?Spark?和Storm

i.?計算模型不一樣

ii.?Spark吞吐量大

g)?特點:快援所,易用住拭,通用,兼容性

h)?spark運行模式

i.?local(本地)

ii.?standalone(集群)

iii.?on yarn(由?yarn作為資源調(diào)度Spark負責任務調(diào)度和計算)

iv.?on mesos(由mesos作為資源調(diào)度S)

v.?on cloud()

i)?配置步驟

=======================on yarn====================

【說明】

1.?spark任務運行在yarn上澈蟆,由yarn來進行資源調(diào)度和管理睹簇,spark只負責任務的調(diào)度?和計算

2.不需要配置和啟動spark集群

3.只需要在提交任務的節(jié)點上安裝并配置spark on yarn?模式

4.必須找一臺節(jié)點安裝spark

5.?步驟:

i.安裝配置JDK

ii.?vi spark-env.sh

1.?export ?JAVA_HOME=/opt/modules/jdk1.7_6.0

2.?export ?HADOOP_CONF_DIR = /opt/modules/hadoop/etc/hadoop

iii.測試spark on yarn?模式是否安裝成功

iv.網(wǎng)絡測試:http://hadoop-yarn1.beicai.com:8088

=====================sdandalone模式==============

【說明】

1.??spark運行在spark?集群上磨淌,由spark進行資源調(diào)度管理梁只,同時還負責任務的調(diào)度和?計算

2.需要配置和啟動spark集群

3.?步驟:

i.安裝配置JDK

ii.上傳并解壓Spark

iii.建立軟連接?ln -s spark spark?或者修改名稱

iv.?配置環(huán)境變量

v.安裝配置Spark,修改spark配置文件(spark-env.sh, slaves)

1.?vi spark-env.sh

a)?export ?JAVA_HOME=/opt/modules/jdk(jdk位置)

b)?export SPARK_MASTER_IP=hadoop-yarn1.beicai.com

c)?export SPARK_MASTER_PORT=7077

2.??vi slaves(用于指定在哪些節(jié)點上啟動worker)

a)?hadoop-yarn2.beicai.com

hadoop-yarn3.beicai.com

vi.將spark發(fā)送給其他主機

vii.?啟動

/opt/modules/spark/bin/start-all.sh

vii.?查看SparkUI界面:http://hadoop-yarn1.beicai.com:8080

4.?

j)?

一秋忙、Spark原理

1灰追、Spark的運行原理

i弹澎、分布式

Ii、主要基于內(nèi)存(少數(shù)情況基于磁盤)

Iii佩迟、迭代式計算

2音五、Spark?計算模式?VS ?MapReduce ?計算模式對比

Mr這種計算模型比較固定,只有兩種階段坚嗜,map階段和reduce階段苍蔬,兩個階段結束?后,任務就結束了格仲,這意味著我們的操作很有限凯肋,只能在map階段和reduce階段圈盔,?也同時意味著可能需要多個mr任務才能處理完這個job

Spark?是迭代式計算驱敲,一個階段結束后,后面可以有多個階段围辙,直至任務計算完?成姚建,也就意味著我們可以做很多的操作,這就是Spark計算模型比mr?強大的地方

三稿湿、什么是Spark RDD?

1涕俗、什么是RDD再姑?

彈性的,分布式的栖疑,數(shù)據(jù)集

(RDD在邏輯上可以看出來是代表一個HDFS上的文件,他分為多個分區(qū)瓜浸,散落?在Spark的多個節(jié)點上)

3杠巡、RDD----彈性

當RDD的某個分區(qū)的數(shù)據(jù)保存到某個節(jié)點上氢拥,當這個節(jié)點的內(nèi)存有限,保存不了這個?分區(qū)的全部數(shù)據(jù)時叁怪,Spark就會有選擇性的將部分數(shù)據(jù)保存到硬盤上,例如:當worker?的內(nèi)存只能保存20w條數(shù)據(jù)時血柳,但是RDD的這個分區(qū)有30w條數(shù)據(jù)难捌,這時候Spark就?會將多余的10w條數(shù)據(jù),保存到硬盤上去婴栽。Spark的這種有選擇性的在內(nèi)存和硬盤之間的權衡機制就是RDD的彈性特點所在

4愚争、Spark的容錯性

RDD最重要的特性就是,提供了容錯性鞍陨,可以自動的從失敗的節(jié)點上恢復過來缭裆,即如?果某個節(jié)點上的RDD partition(數(shù)據(jù)),因為節(jié)點的故障丟了缝其,那么RDD會自動的通過?自己的數(shù)據(jù)來源重新計算該partition内边,這一切對使用者來說是透明的

2、Spark的開發(fā)類型

(1)辉懒、核心開發(fā):離線批處理?/?演示性的交互式數(shù)據(jù)處理

(2)眶俩、SQL查詢:底層都是RDD和計算操作

(3)、底層都是RDD和計算操作

(4)线罕、機器學習

(5)、圖計算

3询件、Spark?核心開發(fā)(Spark-core == Spark-RDD)步驟

(1)宛琅、創(chuàng)建初始的RDD

(2)、對初始的RDD進行轉(zhuǎn)換操作形成新的RDD红伦,然后對新的RDD再進行操作,直?至操作計算完成

(3)舌缤、將最后的RDD的數(shù)據(jù)保存到某種介質(zhì)中(hive陵吸、hdfs壮虫,MySQL、hbase...)

五饶唤、Spark原理

Driver募狂,Master祸穷,Worker雷滚,Executor,Task各個節(jié)點之間的聯(lián)系

Spark中的各節(jié)點的作用:

1绊含、driver的作用:

(1)躬充、 向master進行任務的注冊

(2)、構建運行任務的基本環(huán)境

(3)盈蛮、接受該任務的executor的反向注冊

(4)抖誉、向?qū)儆谠撊蝿盏膃xecutor分配任務

2、什么是driver樊零?

我們編寫的程序打成jar包后夺艰,然后找一臺能夠連接spark集群的節(jié)點做任務的driver,具體的表現(xiàn)為SparkSubmit

3霞势、Master的作用:

(1)愕贡、監(jiān)控集群固以;

(2)、動態(tài)感知worker的上下線篙螟;

(3)遍略、接受driver端注冊請求绪杏;

(4)势似、任務資源的調(diào)度

4、Worker的作用:

(1)、定時向master匯報狀態(tài)杯拐;

(2)端逼、接受master資源調(diào)度命令,進行資源的調(diào)度

(3)、啟動任務的容器Executor

5仅醇、Executor的作用:

(1)、保存計算的RDD分區(qū)數(shù)據(jù)叶摄;

(2)蛤吓、向Driver反向注冊樊破;

(3)、接受Driver端發(fā)送來的任務Task顺少,作用在RDD上進行執(zhí)行

Spark 編程的流程:

1脆炎、我們編寫的程序打包成jar包秒裕,然后調(diào)用Spark-Submit?腳本做任務的提交

2几蜻、啟動driver做任務的初始化

3、Driver會將任務極其參數(shù)(core体斩,memory梭稚,driver相關的參數(shù))進行封裝成ApplicationDescript通過taskSchedulerImpl?提交給Master

4、Master接受到driver端注冊任務請求時絮吵,會將請求參數(shù)進行解析弧烤,并封裝成APP,然后進行持久化蹬敲,并且加入到其任務隊列中的waitingAPPs

5闹究、當輪到咱們提交的任務運行時价认,master會調(diào)用schedule()這個方法,做任務資源調(diào)度

6梅誓、Master將調(diào)度好的資源封裝成launchExecutor,發(fā)送給指定的worker

7、Worker接收到發(fā)送來的launchExecutor時著摔,會將其解析并封裝成ExecutorRunner倡鲸,然后調(diào)用start方法劝赔,啟動Executor

8赫粥、Executor啟動后,會向任務的Driver進行反向注冊

9利凑、當屬于這個任務的所有executor啟動成功并反向注冊完之后膨报,driver會結束SparkContext對象的初始化

10丈氓、當sc?初始化成功后,意味著運行任務的基本環(huán)境已經(jīng)準備好了置媳,driver會繼續(xù)運行我們編寫好的代碼

11、開始注冊初始的RDD膘掰,并且不斷的進行轉(zhuǎn)換操作拦耐,當觸發(fā)了一個action算子時冈欢,意味著觸發(fā)了一個job宴霸,此時driver就會將RDD之間的依賴關系劃分成一個一個的stage尉剩,并將stage封裝成taskset胎挎,然后將taskset中的每個task進行序列化,封裝成launchtask,發(fā)送給指定的executor執(zhí)行

12页响、Executor接受到driver發(fā)送過來的任務task贴彼,會對task進行反序列化,然后將對應的算子(flatmap,map,reduceByKey约计。。。。)作用在RDD分區(qū)上

六玄组、RDD詳解

1、什么是RDD丸升?

RDD(Resilient Disttibuted Dataset)叫做彈性的分布式的數(shù)據(jù)集,是spark中最基本的數(shù)據(jù)抽象么抗,它代表一個不可變缸沃,可分區(qū)吨枉,里面的元素可并行計算的集合

2、RDD的特點:

自動容錯

位置感知性調(diào)度

伸縮性

3拆又、RDD的屬性:

(1)制轰、一組分片(partition),即數(shù)據(jù)集的基本組成單位鳞仙。對于RDD來說寇蚊,每個分片都會被一個計算任務處理,并決定并行計算的粒度繁扎,用戶可以在創(chuàng)建RDD時指定RDD的分片個數(shù)幔荒,如果沒有指定糊闽,那么就會采用默認值梳玫,默認值就是程序所分配到的CPU Core的數(shù)目

(2)、一個計算每個分區(qū)的函數(shù)右犹。Spark中RDD的計算是以分片為單位的提澎,每個RDD都會實現(xiàn)computer函數(shù)以達到這個目的。Computer函數(shù)會對迭代器進行復合念链,不需要保存每次計算的結果盼忌。

(3)、RDD之間的依賴關系掂墓。RDD的每次轉(zhuǎn)換都會生成一個新的RDD谦纱,所以RDD之間就會形成類似于流水一樣的前后依賴關系。在部分分區(qū)數(shù)據(jù)丟失時君编,Spark可以通過這個依賴關系重新計算丟失的分區(qū)數(shù)據(jù)跨嘉,而不是對RDD的所有分區(qū)進行重新計算。

(4)吃嘿、一個partition祠乃,即RDD的分片函數(shù)。當前Spark中實現(xiàn)了兩種類型的分片函數(shù)兑燥,一個是基于hashPartitioner亮瓷,另外一個是基于范圍的RangePartitioner。只有對于key-value的RDD降瞳,才會有Partitioner嘱支,非key-value的RDD的Partitioner的值是None。Partitioner函數(shù)不但決定了RDD本身的分片數(shù)量挣饥,也決定了partition RDD Shuffle輸出時的分片數(shù)量斗塘。

(5)、一個列表亮靴,存儲存取每個Partition的優(yōu)先位置(preferred location)馍盟。對于一個HDFD文件來說。這個列表保存的就是每個Partition所在的快的位置茧吊。按照“移動數(shù)據(jù)不如移動計算”的理念贞岭。Spark在進行任務調(diào)度的時候惫叛,會盡可能的將計算任務分配到所要處理數(shù)據(jù)塊的存儲位置。

4鳖轰、RDD的創(chuàng)建:

進行Spark核心編程時洒宝,首先要做的事就是創(chuàng)建一個初始的RDD。Spark Core提供了三種創(chuàng)建RDD的方式:

(1)芯侥、使用程序中的集合創(chuàng)建RDD?(調(diào)用parallelize()方法)

(2)泊交、使用本地文件創(chuàng)建RDD(調(diào)用textFile()方法)

(3)、使用HDFD文件創(chuàng)建RDD ?(調(diào)用textFile()方法)

七柱查、算子

1廓俭、什么是算子?

是RDD中定義的作用在每一個RDD分片上的函數(shù)唉工,可以對RDD中的數(shù)據(jù)進行轉(zhuǎn)換?和操作

2研乒、RDD算子的分類

(1)、Transformation算子淋硝,這類算子變換不觸發(fā)提交作業(yè)(特點就是lazy特性)

返回的是一個RDD

(2)雹熬、Action算子,這類算子會觸發(fā)SparkContext提交作業(yè)(觸發(fā)一個spark job的運行谣膳,從而觸發(fā)這個action之前所有的transformation的執(zhí)行)

返回的是一個spark對象

3竿报、常用的Transformation算子

八、RDD分區(qū)排序

I继谚、分區(qū)

兩種實現(xiàn)方式:coalesce和?repartition(底層調(diào)用coalesce)

coalesce(numPartitons,isShuffle)

第一個參數(shù)是重分區(qū)后的數(shù)量烈菌,第二個參數(shù)是是否進行shuffle

如果原來有N個分區(qū),重分區(qū)后有M個分區(qū)

如果?M > N ,必須將第二參數(shù)設置為true(也就是進行shuffle),等價于?repartition(numPartitons) ???如果是false將不起作用 ?

如果M < N

100-->10?重分區(qū)后的分區(qū)數(shù)比原來的小的多犬庇,那么久需要使用shuffle僧界,也即是設置為true

100-->90?重分區(qū)后的分區(qū)數(shù)和原來的差不多的,那么就不需要使用shuffle臭挽,也就是設置為false

II捂襟、排序

sortBy(x => x)這個算子中帶有隱式轉(zhuǎn)換參數(shù)

x?能夠排序(比較大小)欢峰,那么這個類就必須有比較大小的功能葬荷,也就是實現(xiàn)了compareTo?或者compare

實現(xiàn)二次排序有兩種方法:

1、繼承Comparable?接口 或者?Ordered

2纽帖、隱式轉(zhuǎn)換:可以定義隱式轉(zhuǎn)換函數(shù)(Ordered)或者隱式轉(zhuǎn)換值(Ordering)

九宠漩、自定義分區(qū)

自定義分區(qū)

要求:按照key將對應的value輸出到指定的分區(qū)中

解釋:自定義一個自定義分區(qū)類,繼承partitioner懊直,實現(xiàn)他的兩個方法

1扒吁、numPartitions

2、getPartition

具體的功能根據(jù)項目的要求自定義實現(xiàn)室囊,然后調(diào)用partitionBy方法雕崩,new出自定義的類魁索,傳入?yún)?shù)即可

九、RDD持久化原理

1盼铁、持久化場景:對于一個rdd會被多次引用到粗蔚,并且這個rdd計算過程復雜,計算時間特變耗時

2饶火、如何進行持久化鹏控,調(diào)用rdd.persist方法或cache方法,cache方法底層就是調(diào)用persist方法

******************persist(StorageLevel.MEMORY_ONLY)*******************

如果對RDD做持久化肤寝,默認持久化級別是storageLevel.MEMORY_ONLY ,也就是持久化到內(nèi)存中去当辐,這種持久化級別是效率最快的,但是由于是純Java?對象醒陆,保存到內(nèi)存中瀑构,那么內(nèi)存可能保存的數(shù)量就會較少

***************persist(StorageLevel.MEMORY_ONLY_SER)****************

如果當我們集群資源有限時裆针,那么我們可以采用MEMORY_ONLY_SER刨摩,也就是將Java對象進行序列化之后持久到內(nèi)存中去,這種持久化的好處是能夠持久化更多的數(shù)據(jù)到內(nèi)存中世吨,但是由于在持久化時需要序列化澡刹,取出來之后又需要反序列化這一過程,這個過程會消耗CPU計算資源耘婚,性能相對于MEMORY_ONLY?這種持久化級別來說稍微弱點罢浇,但是還是比較高效的

3、如何選擇RDD持久化策略沐祷?

Spark提供的多種持久化級別嚷闭,主要是為了在CPU和內(nèi)存消耗之間進行取舍,下面是一些通用的持久化級別的選擇建議:

1)赖临、優(yōu)先使用MEMORY_ONLY胞锰,如果可以緩存所有數(shù)據(jù)的話,那么就使用這種策略兢榨,因為純內(nèi)存速度最快嗅榕,而且沒有序列化,不需要消耗CPU進行反序列化操作

2)吵聪、如果MEMORY_ONLY策略凌那,無法存儲所有數(shù)據(jù)的話,那么使用MEMORY_ONLY_SER吟逝,將數(shù)據(jù)進行序列化存儲帽蝶,純內(nèi)存操作還是非常快的块攒,只是要消耗CPU進行反序列化

3)励稳、如果需要進行快速的失敗恢復金砍,那么就選擇帶后綴為_2的策略,進行數(shù)據(jù)的備份麦锯,這樣在失敗時恕稠,就不需要重新計算了

4、能不使用DISK相關的策略扶欣,就不要使用鹅巍,有的時候,從磁盤讀取數(shù)據(jù)料祠,還不如重新計算一次

十一骆捧、共享變量

1、共享變量分為兩種:廣播變量?和?累加器

廣播變量(broadcast)

2髓绽、日常所遇問題

因為每個task都需要拷貝這樣的一個副本到executor去執(zhí)行敛苇,那么我們可以想象一下,如果有1000?個task在某個worker上執(zhí)行顺呕,而這個副本有100M枫攀,那么意味著我們需要拷貝100G的數(shù)據(jù)都到某個worker上執(zhí)行,這樣的話會大大消耗我們的網(wǎng)絡流量株茶,同時會加大executor的內(nèi)存消耗来涨,從而增加了我們spark作業(yè)的運行時間,大大降低了spark作業(yè)的運行效率启盛,增加了作業(yè)失敗的概率

3蹦掐、如何解決以上問題,也就是說什么時候使用廣播變量?

當RDD引用到了一個外部變量并且這個外部變量數(shù)據(jù)量不小僵闯,同時這個RDD對應的task數(shù)量特別多卧抗,那么此時使用廣播共享變量再合適不過了

我們可以將這種大的外部變量做成廣播變量,外部變量做成廣播變量的時候鳖粟,那么每個executor的內(nèi)存中只會有一個外部變量社裆,而這個副本針對所有的task都是共享的,這樣的話就減少了網(wǎng)絡流量消耗牺弹,降低了executor的內(nèi)存消耗浦马,提高了spark作業(yè)運行效率和縮短了運行時間,同時降低了作業(yè)失敗的概率

4张漂、廣播變量的使用流程:

1)晶默、某個executor的第一個task先執(zhí)行,首先會從自己的blockManager中查找外部變量航攒,如果沒有就從鄰居的executor的blockManager的內(nèi)存中獲取這個外部變量磺陡,如果還是獲取不到,就從driver端獲取,拷貝這個外部變量到本地的executor的blockManager

2)币他、當這個executor的其他task執(zhí)行時坞靶,就不需要從外面獲取這個外部變量的副本,直接從本地的blockManager中獲取即可

5蝴悉、如何獲取廣播變量的值彰阴?

可以直接調(diào)用廣播變量的value()?這個方法即可

【注意】廣播變量是只讀的,不可寫

累加器(Accumulator)

Spark提供的Accumulator?拍冠,主要用于多個節(jié)點對一個變量進行共享性的操作尿这,Accumulator只是提供了累加的功能。但是卻給我們提供了多個task對一個變量并行操作的功能庆杜,但是task只能對Accumulator進行累加操作

【注意】task只能對Accumulator進行類加操作射众,只有Driver程序可以讀取Accumulator的值

RDD分區(qū)和容錯機制講解

1、RDD?的Lineage血統(tǒng)

RDD只支持粗粒度轉(zhuǎn)換晃财,即在大量記錄上執(zhí)行的單個操作叨橱,將創(chuàng)建RDD的一系列Lineage(血統(tǒng))記錄下來。以便恢復丟失的分區(qū)

2断盛、RDD的依賴關系

RDD和它的父RDD的關系有兩種不同的類型:

1)罗洗、窄依賴(一對一,多對一)

形象的比喻:獨生子女

2)郑临、寬依賴(多對多)

形象的比喻:超生

注釋:劃分stage的依據(jù)就是寬依賴栖博,也就是RDD之間是否有shuffle屑宠,shuffle過程就是一個寬依賴過程厢洞,shuffle之前的tasks就屬于一個stage,shuffle之后的也屬于一個stage典奉,shuffle之前和之后的操作都是窄依賴

【注意】shuffle過程分為:shuffle Write過程 和?shuffle read過程

4躺翻、DAG的生成(有向無環(huán)圖)和任務的劃分

DAG(Directed Acyclic Graph)叫做有向無環(huán)圖(有方向無循環(huán)的圖)

5、一個wordCount過程會產(chǎn)生多少個RDD卫玖?

至少會產(chǎn)生五個RDD公你,

第一個,從HDFS中加載后得到一個RDD(即使用sc.textFile()算子)假瞬,即HadoopRDD

在sc.textFile()過程中還會產(chǎn)生一個RDD(調(diào)用map算子)陕靠,產(chǎn)生一個MapPartitionRDD

第二個,使用flatMap算子脱茉,得到一個MapPartitionRDD

第三個剪芥,使用map算子,得到一個MapPartitionRDD

第四個琴许,使用reduceByKey算子税肪,也就是在經(jīng)過了shuffle過程后又會得到一個shuffledRDD

第五個,使用saveAsTextFile算子,再產(chǎn)生一個MapPartitionRDD?

spark程序提交流程講解

Spark任務簡介:

Spark-submit--->SparkSubmit-->main-->submit-->doRunMain-->RunMain-->通過反射創(chuàng)建我們編寫的主類的實例對象益兄,調(diào)用main方法-->開始執(zhí)行我們編寫的代碼-->初始化SparkContext對象-->創(chuàng)建初始的RDD-->觸發(fā)action算子-->提交job-->worker執(zhí)行任務-->任務結束

Spark任務詳解:

1)锻梳、將我們編寫的程序打成jar包

2)、調(diào)用spark-submit腳本提交任務到集群上運行

3)净捅、運行sparkSubmit的main方法疑枯,在這個方法中通過反射的方式創(chuàng)建我們編寫的主類的實例對象,然后調(diào)用main方法蛔六,開始執(zhí)行我們的代碼(注意神汹,我們的spark程序中的driver就運行在sparkSubmit進程中)

4)、當代碼運行到創(chuàng)建SparkContext對象時古今,那就開始初始化SparkContext對象了

5)屁魏、在初始化SparkContext對象的時候,會創(chuàng)建兩個特別重要的對象捉腥,分別是:DAGScheduler

和TaskScheduler

【DAGScheduler的作用】將RDD的依賴切分成一個一個的stage氓拼,然后將stage作為taskSet提交給DriverActor

6)、在構建taskScheduler的同時抵碟,會創(chuàng)建兩個非常重要的對象桃漾,分別是DriverActor和ClientActor

【clientActor的作用】向master注冊用戶提交的任務

【DriverActor的作用】接受executor的反向注冊,將任務提交給executor

7)拟逮、當clientActor啟動后撬统,會將用戶提交的任務和相關的參數(shù)封裝到ApplicationDescription對象中,然后提交給master進行任務的注冊

8)敦迄、當master接受到clientActor提交的任務請求時恋追,會將請求參數(shù)進行解析,并封裝成Application罚屋,然后將其持久化苦囱,然后將其加入到任務隊列waitingApps中

9)、當輪到我們提交的任務運行時脾猛,就開始調(diào)用schedule()撕彤,進行任務資源的調(diào)度

10)、master將調(diào)度好的資源封裝到launchExecutor中發(fā)送給指定的worker

11)猛拴、worker接受到Maseter發(fā)送來的launchExecutor時羹铅,會將其解壓并封裝到ExecutorRunner中,然后調(diào)用這個對象的start(),?啟動Executor

12)愉昆、Executor啟動后會向DriverActor進行反向注冊

13)职员、driverActor會發(fā)送注冊成功的消息給Executor

14)、Executor接受到DriverActor注冊成功的消息后會創(chuàng)建一個線程池撼唾,用于執(zhí)行DriverActor發(fā)送過來的task任務

15)廉邑、當屬于這個任務的所有的Executor啟動并反向注冊成功后哥蔚,就意味著運行這個任務的環(huán)境已經(jīng)準備好了,driver會結束SparkContext對象的初始化蛛蒙,也就意味著new SparkContext這句代碼運行完成

16)糙箍、當初始化sc成功后,driver端就會繼續(xù)運行我們編寫的代碼牵祟,然后開始創(chuàng)建初始的RDD深夯,然后進行一系列轉(zhuǎn)換操作,當遇到一個action算子時诺苹,也就意味著觸發(fā)了一個job

17)咕晋、driver會將這個job提交給DAGScheduler

18)、DAGScheduler將接受到的job收奔,從最后一個算子向前推導掌呜,將DAG依據(jù)寬依賴劃分成一個一個的stage,然后將stage封裝成taskSet坪哄,并將taskSet中的task提交給DriverActor

19)质蕉、DriverActor接受到DAGScheduler發(fā)送過來的task,會拿到一個序列化器翩肌,對task進行序列化模暗,然后將序列化好的task封裝到launchTask中,然后將launchTask發(fā)送給指定的Executor

20)念祭、Executor接受到了DriverActor發(fā)送過來的launchTask時兑宇,會拿到一個反序列化器,對launchTask進行反序列化粱坤,封裝到TaskRunner中隶糕,然后從Executor這個線程池中獲取一個線程,將反序列化好的任務中的算子作用在RDD對應的分區(qū)上

【注意】

Spark的任務分為為兩種:

a比规、shuffleMapTask:shuffle之前的任務

b若厚、resultTask:shuffle之后的任務

Spark任務的本質(zhì):

將RDD的依賴關系切分成一個一個的stage,然后將stage作為TaskSet分批次的發(fā)送到Executor上執(zhí)行

十三蜒什、Checkpoint

1、使用checkpoint的場景:

某個RDD會被多次引用疤估,計算特別復雜灾常,計算特別耗時

擔心中間某些關鍵的,在后面會反復幾次使用的RDD铃拇,可能會因為節(jié)點的故障钞瀑,導致持久化數(shù)據(jù)的丟失

2、如何對RDD進行checkpoint慷荔?

1)雕什、設置還原點目錄,設置checkpoint目錄

2)、調(diào)用RDD的checkpoint的方法對該RDD進行checkpoint

3贷岸、checkpoint的原理

1)壹士、RDD調(diào)用了checkpoint方法之后,就接受RDDCheckpointData對象的管理

2)偿警、RDDCheckpointData對象會負責將調(diào)用了checkpoint的RDD?的狀態(tài)設置為MarkedForCheckpoint

3)躏救、當這個RDD所在的job運行結束后,會調(diào)用最后一個RDD的doCheckpoint螟蒸,根據(jù)其血統(tǒng)向上查找盒使,查找到被標注為MarkedForCheckpoint狀態(tài)的RDD,將其狀態(tài)改變?yōu)閏heckpointingInProgress

4)七嫌、啟動一個單獨的job少办,將血統(tǒng)中標記為checkpointingInProgress的RDD進行checkpoint,也就是將RDD的數(shù)據(jù)寫入到checkpoint的目錄中去

5)诵原、當某個節(jié)點發(fā)生故障凡泣,導致包括持久化的數(shù)據(jù)全部丟失,此時會從還原點目錄還原RDD的每個分區(qū)的數(shù)據(jù)皮假,這樣就不需要從頭開始計算一次

4鞋拟、checkpoint需要注意的地方

因為RDD在做checkpoint的時候,會單獨啟動一個job對需要進行checkpoint的RDD進行重新計算惹资,這樣就會增加spark作業(yè)運行時間贺纲,所以spark強烈建議在做checkpoint之前,應該對需要進行checkpoint的RDD進行持久化(即調(diào)用?.cache)

5褪测、checkpoint?和持久化的區(qū)別

1)猴誊、是否改變血統(tǒng):

持久化(.cache):不會改變RDD的依賴關系,也就是不會改變其血統(tǒng)

Checkpoint:會改變RDD的血統(tǒng)侮措,做了checkpoint的RDD會清除其所有的依賴關系懈叹,并將其父RDD強制設置為checkpointRDD,并且將RDD的狀態(tài)更改為checkpointed

2)分扎、RDD的數(shù)據(jù)的可靠性:

持久化:只是將RDD的數(shù)據(jù)持久化到內(nèi)存或磁盤中澄成,但是如果節(jié)點發(fā)生故障,那么持久化的數(shù)據(jù)還是會丟失

Checkpoint:checkpoint的數(shù)據(jù)保存在第三方高可靠的分布式的文件系統(tǒng)中畏吓,機試節(jié)點發(fā)生故障墨状,數(shù)據(jù)也不會丟失,所以checkpoint比持久化可靠性更高

6菲饼、后續(xù)

我們實現(xiàn)了checkpoint?之后肾砂,在某個task?又調(diào)用了該RDD的iterator()?方法時,就實現(xiàn)了高容錯機制宏悦,即使RDD的持久化數(shù)據(jù)丟失镐确,或者壓根兒就沒有持久化包吝,但是還是可以通過readCheckpointOrComputer()?方法,優(yōu)先從父RDD-----checkpointRDD中讀取源葫,HDFS(外部文件系統(tǒng))的數(shù)據(jù)

第二部分?spark-sql

一诗越、Spark-SQL前世今生

1、Spark SQL的特點

1)臼氨、支持多種數(shù)據(jù)源:Hive掺喻、RDD、Parquet储矩、JSON感耙、JDBC等。

2)持隧、多種性能優(yōu)化技術:in-memory columnar storage即硼、byte-code generation、cost model動態(tài)評估等屡拨。

3)只酥、組件擴展性:對于SQL的語法解析器、分析器以及優(yōu)化器呀狼,用戶都可以自己重新開發(fā)裂允,并且動態(tài)擴展

2、Spark SQL的性能優(yōu)化技術簡介

1)哥艇、內(nèi)存列存儲(in-memory columnar storage)

2)绝编、字節(jié)碼生成技術(byte-code generation)

3)、Scala代碼編寫的優(yōu)化

3貌踏、Spark SQL and DataFrame

Spark SQL是Spark中的一個模塊十饥,主要用于進行結構化數(shù)據(jù)的處理。它提供的最核心的編程抽象祖乳,就是DataFrame逗堵。同時Spark SQL還可以作為分布式的SQL查詢引擎。Spark SQL最重要的功能之一眷昆,就是從Hive中查詢數(shù)據(jù)蜒秤。

DataFrame,可以理解為是隙赁,以列的形式組織的垦藏,分布式的數(shù)據(jù)集合。它其實和關系型數(shù)據(jù)庫中的表非常類似伞访,但是底層做了很多的優(yōu)化。DataFrame可以通過很多來源進行構建轰驳,包括:結構化的數(shù)據(jù)文件厚掷,Hive中的表弟灼,外部的關系型數(shù)據(jù)庫,以及RDD冒黑。

二田绑、Spark-sql的使用

1、RDD轉(zhuǎn)換為DataFrame(兩種)

1)抡爹、使用反射的方式來推斷包含了特定數(shù)據(jù)類型的RDD的元數(shù)據(jù)

2)掩驱、通過編程接口來創(chuàng)建DataFrame

2、UDF自定義函數(shù)和UDAF自定義聚合函數(shù)

UDF冬竟,其實更多的是針對單行輸入欧穴,返回一個輸出

UDAF,則可以針對多行輸入泵殴,進行聚合計算涮帘,返回一個輸出,功能更加強大

3笑诅、Spark-SQL工作原理

SqlParse ?--------->解析器

Analyser ?--------->分析器

Optimizer ?--------->優(yōu)化器

SparkPlan ?--------->物理計劃

流程:

1)调缨、自己編寫的SQL語句

大家要知道,只要在數(shù)據(jù)庫類型的技術里面吆你,比如:最傳統(tǒng)的MySQL弦叶,Oracle等,包括現(xiàn)在大數(shù)據(jù)領域的數(shù)據(jù)倉庫妇多,比如hive伤哺,他的基本的SQL執(zhí)行的模型,都是類似的砌梆,首先都要生成一條SQL語句的執(zhí)行計劃

2)默责、通過SqlParser(解析器)生成未解析的邏輯計劃(unresolved LogicalPlan)

3)、通過Analyzer(分析器)生成解析后的邏輯計劃(resolved LogicalPlan)

4)咸包、通過Optimizer(優(yōu)化器)生成優(yōu)化后的邏輯計劃(optimized LogicalPlan)

實際上桃序,比如傳統(tǒng)的Oracle等數(shù)據(jù)庫,通常都會生成多個執(zhí)行計劃烂瘫,然后呢媒熊,最后有一個優(yōu)化器,針對多個計劃坟比,選擇一個最好的計劃芦鳍,而SparkSql這兒的優(yōu)化指的是,比如說葛账,剛生成的執(zhí)行計劃中柠衅,有些地方的性能是顯而易見的,不太好籍琳,舉例說明:

比如說菲宴,我們有一個SQL語句贷祈,select name from (select ... from ...) where ..=..;

此時,在執(zhí)行計劃解析出來的時候喝峦,其實就是按照他原封不動的樣子势誊,來解析成可以執(zhí)行的計劃,但是呢谣蠢,Optimizer?在這里其實就會對執(zhí)行計劃進行優(yōu)化粟耻,比如說,發(fā)現(xiàn)where?條件眉踱,其實可以放在子查詢中挤忙,這樣,子查詢的數(shù)量大大變小勋锤,可以優(yōu)化執(zhí)行速度饭玲,此時,可能就會變成如下這樣:select name from (select name from ...where ..=..)

5)叁执、通過SparkPlan茄厘,生成最后的物理計劃(PhysicalPlan)

到物理計劃這里,那么其實就是非程竿穑“接地氣”的計劃了次哈。就是說,已經(jīng)很明朗了吆录,從那幾個文件讀取什么數(shù)據(jù)窑滞,從那幾個文件中讀取,如何進行關聯(lián)等等

6)恢筝、在executor中執(zhí)行物理計劃

邏輯的執(zhí)行計劃哀卫,更多的是偏向于邏輯,比如說吧撬槽,大致就是這種樣子的此改,

From table students=>filter ... => select name ...

這里基本上,邏輯計劃都是采用Tree?侄柔,樹形結構

7)共啃、生成RDD

Select ?name ?from ?students =>?解析,從哪里去查詢暂题,students表移剪,在哪個文件里,從哪個文件中查詢哪些數(shù)據(jù)薪者,比如說是name這個列纵苛,此外,復雜的SQL,還有赶站,比如說查詢時幔虏,是否對表中的數(shù)據(jù)進行過濾和篩選纺念,更不用說贝椿,復雜時,需要有多表的JOIN(咋傳統(tǒng)數(shù)據(jù)庫中陷谱,比如MySQL烙博,執(zhí)行計劃還涉及到如何掃描和利用索引)

4、spark-SQL性能優(yōu)化

1)烟逊、設置shuffle過程的并行度:spark.sql.shuffle.partitions(SQLContext.setConf())

2)渣窜、在hive數(shù)據(jù)倉庫建設過程中,合理設置數(shù)據(jù)類型宪躯,比如能設置為int的乔宿,就不要設置為bigInt,減少數(shù)據(jù)類型導致不必要的內(nèi)存開銷

3)访雪、編寫SQL時详瑞,盡量給出明確的列名,比如select name from students臣缀。不要寫select *?的方式坝橡。

4)、并行處理查詢結果:對于spark-SQL查詢的結果精置,如果數(shù)據(jù)量比較大计寇,比如超過1000條,那么就不要一次性的collect()到driver再處理脂倦,使用foreach()算子番宁,并行處理查詢結果

5)、緩存表:對于一條SQL語句可能對此使用到的表赖阻,可以對其進行緩存蝶押,使用?sqlContext.cacheTable(tableName),或者DataFrame.cache()即可政供,spark-SQL會用內(nèi)存列存儲的格式進行表的緩存播聪,然后spark-sql就可以僅僅掃描需要使用的列,并且自動優(yōu)化壓縮布隔,來最小化內(nèi)存使用和GC開銷离陶,SQLContext.uncacheTable(tableName)可以將表從緩存中移除靶端,用SQLContext寄悯。setConf(),設置spark.sql.inMemoryColumnarStorage.batchSize參數(shù)(默認10000)灵份,可以設置列存儲的單位

6)哀军、廣播join表:spark.sql.autoBroadcastJoinThreshold沉眶,默認10485760 (10 MB)打却。在內(nèi)存夠用的情況下,可以增加其大小谎倔,參數(shù)設置了一個表在join的時候柳击,最大在多大以內(nèi),可以被廣播出去優(yōu)化性能

5片习、Hive on Spark配置

1)捌肴、安轉(zhuǎn)配置好Hive和Spark

2)、Set hive.execution.engine=spark;

3)藕咏、set spark.master=spark://mini1:7077

第三部分spark-streaming

1, ?Dstream

?

Dstream是sparkStreaming的數(shù)據(jù)模型状知,本質(zhì)就是一連串不間斷的RDD,但是它是一個時間段的RDD.這些時間段的RDD源源不斷的連接在一起孽查。

這個時間可以自己設置饥悴,時間設置的越短,實時性越高盲再,但是性能消耗也越大西设。

2, ?spark streaming從kafka獲取數(shù)據(jù),有哪幾種方式洲胖?

?

有兩種方式:

1.通過receiver的方式济榨,

2,通過direct的方式绿映,dirrect的方式需要自己來管理偏移量擒滑。

?

3, ?sparkStreaming和storm的區(qū)別

sparkStreaming是spark里面的一個做流式準實時計算的組件,它使用的數(shù)據(jù)結構是Dstream叉弦,Dstream里面是一連串時間片的rdd丐一。

相比于storm,sparkStreaming在實時性淹冰,保證數(shù)據(jù)不丟失方面都不占用優(yōu)勢库车,spark streaming在spark支持者眼中的優(yōu)勢是spark Streaming具有高吞吐性,最本質(zhì)來說樱拴,sparkStreaming相比于storm的優(yōu)勢是sparkStreaming可以和spark core柠衍,spark SQL無縫整合。

4.對于需要多次引用的晶乔,并且這個dstream計算時間特別耗時珍坊,數(shù)據(jù)特別重要,那么我們就需要對dstream進行checkpoint正罢,(只有多次引用的阵漏,進行持久化就可以了),因為即使對這個dstream進行持久化,數(shù)據(jù)也可能會丟失履怯,而checkpoint數(shù)據(jù)丟失的可能性小回还,但是這樣會影響spark-streaming的數(shù)據(jù)吞吐量,因為在做計算的同時叹洲,還需要將數(shù)據(jù)寫入到外部存儲系統(tǒng)中柠硕,會降低spark性能,影響吞吐量疹味,非必要情況下不建議使用

5.如何對dstream做checkpoint

?

首先設置還原點目錄仅叫,其次調(diào)用dstream的checkpoint方法

【注意】:dstream的checkpoint的周期一定要是產(chǎn)生batch時間的整數(shù)倍,同時spark官方建議將checkpoint的時間設置為至少10秒糙捺。通常來說,將checkpoint間隔設置為窗口操作的滑動間隔的5-10倍

6.spark程序在啟動時笙隙,會去這個checkpointPath目錄下查看是否有保存的driver的元數(shù)據(jù)(1.dstream的操作轉(zhuǎn)換關系洪灯,2.未處理完的batch)信息,當spark-streaming程序在二次啟動后就會去checkpointPath目錄下還原這個程序竟痰,加載未處理的batch元數(shù)據(jù)信息在內(nèi)存中恢復签钩,繼續(xù)進行任務處理

7.為了保證spark-streaming程序7*24小時運行,那么我們程序應該具備高可靠性坏快,怎樣具備高可靠性铅檩?

?

a.程序出現(xiàn)故障,driver死掉了莽鸿,流式程序應該具備自動重啟的功能

b.沒有計算完成的rdd在程序異常停止后昧旨,下次啟動后還會將未處理的rdd進行處理

【注意】:要在spark_submit中,添加--deploy-mode參數(shù)祥得,默認其值為client兔沃,即在提交應用的機器上啟動driver,但是要能夠自動重啟driver级及,就必須將其值設置為cluster乒疏;此外,需要添加--supervise參數(shù)饮焦,失敗后自動重啟

//spark_submit --executor-memory 1g --total-execute-cores 5 --deploy-model cluster --supervise

8.啟用預寫機制

a.預寫日志機制怕吴,簡寫為WAL,全稱為Write Ahead Log县踢,從spark1.2版本開始转绷,就引入了基于容錯的文件系統(tǒng)的WAL機制。如果啟用該機制殿雪,Receiver接收到的所有數(shù)據(jù)都會寫入配置的checkpoint目錄中的預寫日志暇咆。這中機制可以讓driver在恢復的時候,避免數(shù)據(jù)丟失,并且可以確保整個實時計算過程中零數(shù)據(jù)丟失

b.要配置該機制爸业,首先調(diào)用StreamingContext的checkpoint()方法設置一個checkpoint目錄其骄,然后需要將spark.streaming.receiver.writeAheadLog.enable參數(shù)設置為true

然而,這種極強的可靠性機制扯旷,會導致Receiver的吞吐量大幅度下降拯爽,因為單位時間內(nèi),有相當一部分時間需要將數(shù)據(jù)寫入預寫日志钧忽。如果又希望開啟預寫日志機制毯炮,確保數(shù)據(jù)零損失,又不希望影響系統(tǒng)的吞吐量耸黑,那么可以創(chuàng)建多個輸入DStream桃煎,啟動多個Receiver

此外,在啟用了預寫日志機制之后大刊,推薦將復制持久化機制禁用掉为迈,因為所有數(shù)據(jù)已經(jīng)保存在容錯的文件系統(tǒng)中,不需要在用復制機制進行持久化缺菌,保存一份副本葫辐,只要將輸入的DStream的持久化機制設置一下即可,persist(StorageLevel.MEMORY_AND_DISK_SER)伴郁。

9.spark-Streaming checkpoint概述

每一個spark-streaming應用耿战,正常來說,都是7*24小時運轉(zhuǎn)的焊傅,這就是實時計算程序的特點剂陡,因為要持續(xù)不斷的對數(shù)據(jù)進行計算,因此租冠,對實時計算應用的要求鹏倘,應該是必須要能夠?qū)εc應用程序邏輯無關的失敗,進行容錯

如果要實現(xiàn)這個目標顽爹,Spark Streaming程序就必須將足夠的信息checkpoint到容錯的存儲系統(tǒng)上纤泵,從而讓它能夠從失敗中進行恢復

有兩種數(shù)據(jù)需要被進行checkpoint

1.元數(shù)據(jù)checkpoint-將定義了流式計算邏輯的信息,保存到容錯的存儲系統(tǒng)上镜粤,比如HDFS捏题,當運行spark Streaming應用程序的Driver進程所在的節(jié)點失敗時,該信息可以用于進行恢復肉渴,元數(shù)據(jù)信息包括:

a.配置信息---創(chuàng)建spark Streaming應用程序的配置信息公荧,比如sparkConf中的信息

b.DStream的操作信息---定義了spark Stream應用程序的計算邏輯的DStream操作信息

c.未處理的batch信息---那些job正在排隊,還沒處理的batch信息

2.數(shù)據(jù)checkpoint---將實時計算過程中產(chǎn)生的RDD的數(shù)據(jù)保存到可靠的存儲系統(tǒng)中同规。對于一些將多個batch的數(shù)據(jù)進行聚合循狰,有狀態(tài)的transformation操作窟社,這是非常有用的,在這種transformation操作中绪钥,生成的RDD是依賴于之前batch的RDD的灿里,這會導致隨著時間的推移,RDD的依賴鏈條變得越來越長程腹,要避免由于依賴鏈條越來越長匣吊,導致的一起變得越來越長的失敗恢復時間,有狀態(tài)的transformation操作執(zhí)行過程中中間產(chǎn)生的RDD,會定期被checkpoint到可靠的存儲系統(tǒng)上寸潦,比如HDFS色鸳,從而削減RDD的依賴鏈條進而縮短失敗恢復時,RDD的恢復時間见转。一句話概括命雀,元數(shù)據(jù)checkpoint主要是為了從driver失敗中進行恢復;而RDD checkpoint主要是為了使用到有狀態(tài)的transformation操作時池户,能夠在其生產(chǎn)出的數(shù)據(jù)丟失時咏雌,進行快速的失敗恢復

10.何時啟用checkpoint機制?

a.使用了有狀態(tài)的transformation操作----比如updateStateByKey校焦,或者reduceByKeyAndWindow操作被使用了,那么checkpoint目錄要求必須提供的统倒,也就是必須開啟checkpoint機制寨典,從而進行周期性的RDD checkpoint

b.要保證可以從Driver失敗中進行恢復-----元數(shù)據(jù)checkpoint需要啟用,來進行這種情況的恢復

【注意】并不是說房匆,所有的spark streaming應用程序耸成,都要啟用checkpoint機制,如果既不強制要求從Driver失敗中自動進行恢復浴鸿,又沒使用有狀態(tài)的transformation操作井氢,那么就不需要啟用checkpoint,事實上岳链,這么做反而是有助于提升性能的

11.如何自動從Driver失敗中恢復過來

?

要能夠自動從Driver失敗中恢復過來運行spark Streaming應用程序的集群花竞,就必須監(jiān)控Driver運行的過程,并且在他失敗時將他重啟掸哑,對于spark自身的standalone模式约急,需要進行一些配置去supervise driver,在他失敗時將其重啟

首先苗分,要在spark-submit中厌蔽,添加--deploy-mode參數(shù),默認其值為client摔癣,即在提交應用的機器上啟動Driver奴饮,但是纬向,要能夠自動重啟Driver,就必須將其值設置為cluster戴卜,此外逾条,需要添加--supervise參數(shù)。

最后叉瘩,前面有很多朋友都想學習大數(shù)據(jù)方面的技術膳帕。最近我正好整理了一套學習資料和視頻,大家可以加我的微信:Lxiao_28 備注“領取資料” 我會免費分享給大家學習哦~

?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末薇缅,一起剝皮案震驚了整個濱河市危彩,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌泳桦,老刑警劉巖汤徽,帶你破解...
    沈念sama閱讀 206,839評論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異灸撰,居然都是意外死亡谒府,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,543評論 2 382
  • 文/潘曉璐 我一進店門浮毯,熙熙樓的掌柜王于貴愁眉苦臉地迎上來完疫,“玉大人,你說我怎么就攤上這事债蓝】呛祝” “怎么了?”我有些...
    開封第一講書人閱讀 153,116評論 0 344
  • 文/不壞的土叔 我叫張陵饰迹,是天一觀的道長芳誓。 經(jīng)常有香客問我,道長啊鸭,這世上最難降的妖魔是什么锹淌? 我笑而不...
    開封第一講書人閱讀 55,371評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮赠制,結果婚禮上赂摆,老公的妹妹穿的比我還像新娘。我一直安慰自己憎妙,他們只是感情好库正,可當我...
    茶點故事閱讀 64,384評論 5 374
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著厘唾,像睡著了一般褥符。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上抚垃,一...
    開封第一講書人閱讀 49,111評論 1 285
  • 那天喷楣,我揣著相機與錄音趟大,去河邊找鬼。 笑死铣焊,一個胖子當著我的面吹牛逊朽,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播曲伊,決...
    沈念sama閱讀 38,416評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼叽讳,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了坟募?” 一聲冷哼從身側響起岛蚤,我...
    開封第一講書人閱讀 37,053評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎懈糯,沒想到半個月后涤妒,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,558評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡赚哗,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,007評論 2 325
  • 正文 我和宋清朗相戀三年她紫,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片屿储。...
    茶點故事閱讀 38,117評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡贿讹,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出够掠,到底是詐尸還是另有隱情围详,我是刑警寧澤,帶...
    沈念sama閱讀 33,756評論 4 324
  • 正文 年R本政府宣布祖屏,位于F島的核電站,受9級特大地震影響买羞,放射性物質(zhì)發(fā)生泄漏袁勺。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,324評論 3 307
  • 文/蒙蒙 一畜普、第九天 我趴在偏房一處隱蔽的房頂上張望期丰。 院中可真熱鬧,春花似錦吃挑、人聲如沸钝荡。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,315評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽埠通。三九已至,卻和暖如春逛犹,著一層夾襖步出監(jiān)牢的瞬間端辱,已是汗流浹背梁剔。 一陣腳步聲響...
    開封第一講書人閱讀 31,539評論 1 262
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留舞蔽,地道東北人荣病。 一個月前我還...
    沈念sama閱讀 45,578評論 2 355
  • 正文 我出身青樓,卻偏偏與公主長得像渗柿,于是被迫代替她去往敵國和親个盆。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 42,877評論 2 345

推薦閱讀更多精彩內(nèi)容