Rxjava操作符二

runAsync

rxjava-async模塊還包含一個runAsync操作符。它很特殊轴或,返回一個叫做StoppableObservable的特殊Observable。

傳遞一個Action和一個SchedulerrunAsync基公,它返回一個使用這個Action產(chǎn)生數(shù)據(jù)的StoppableObservable需纳。這個Action接受一個Observable和一個Subscription作為參數(shù),它使用Subscription檢查unsubscribed條件蛤织,一旦發(fā)現(xiàn)條件為真就立即停止發(fā)射數(shù)據(jù)赴叹。在任何時(shí)候你都可以使用unsubscribe方法手動停止一個StoppableObservable(這會同時(shí)取消訂閱與這個StoppableObservable關(guān)聯(lián)的Subscription)。

由于runAsync會立即調(diào)用Action并開始發(fā)射數(shù)據(jù)指蚜,在你創(chuàng)建StoppableObservable之后到你的觀察者準(zhǔn)備好接受數(shù)據(jù)之前這段時(shí)間里乞巧,可能會有一部分?jǐn)?shù)據(jù)會丟失。如果這不符合你的要求摊鸡,可以使用runAsync的一個變體绽媒,它也接受一個Subject參數(shù),傳遞一個ReplaySubject給它免猾,你可以獲取其它丟失的數(shù)據(jù)了是辕。

在RxJava中還有一個版本的From操作符可以將Future轉(zhuǎn)換為Observable,與start相似猎提。

Timer

創(chuàng)建一個Observable获三,它在一個給定的延遲后發(fā)射一個特殊的值。

timer

Timer操作符創(chuàng)建一個在給定的時(shí)間段之后返回一個特殊值的Observable。

RxJava將這個操作符實(shí)現(xiàn)為timer函數(shù)疙教。

timer返回一個Observable棺聊,它在延遲一段給定的時(shí)間后發(fā)射一個簡單的數(shù)字0。

timer操作符默認(rèn)在computation調(diào)度器上執(zhí)行贞谓。有一個變體可以通過可選參數(shù)指定Scheduler躺屁。

變換操作

這個頁面展示了可用于對Observable發(fā)射的數(shù)據(jù)執(zhí)行變換操作的各種操作符。

  • map(?) — 對序列的每一項(xiàng)都應(yīng)用一個函數(shù)來變換Observable發(fā)射的數(shù)據(jù)序列
  • flatMap(?), concatMap(?), and flatMapIterable(?) — 將Observable發(fā)射的數(shù)據(jù)集合變換為Observables集合经宏,然后將這些Observable發(fā)射的數(shù)據(jù)平坦化的放進(jìn)一個單獨(dú)的Observable
  • switchMap(?) — 將Observable發(fā)射的數(shù)據(jù)集合變換為Observables集合,然后只發(fā)射這些Observables最近發(fā)射的數(shù)據(jù)
  • scan(?) — 對Observable發(fā)射的每一項(xiàng)數(shù)據(jù)應(yīng)用一個函數(shù)驯击,然后按順序依次發(fā)射每一個值
  • groupBy(?) — 將Observable分拆為Observable集合烁兰,將原始Observable發(fā)射的數(shù)據(jù)按Key分組,每一個Observable發(fā)射一組不同的數(shù)據(jù)
  • buffer(?) — 它定期從Observable收集數(shù)據(jù)到一個集合徊都,然后把這些數(shù)據(jù)集合打包發(fā)射沪斟,而不是一次發(fā)射一個
  • window(?) — 定期將來自O(shè)bservable的數(shù)據(jù)分拆成一些Observable窗口,然后發(fā)射這些窗口暇矫,而不是每次發(fā)射一項(xiàng)
  • cast(?) — 在發(fā)射之前強(qiáng)制將Observable發(fā)射的所有數(shù)據(jù)轉(zhuǎn)換為指定類型

Buffer

定期收集Observable的數(shù)據(jù)放進(jìn)一個數(shù)據(jù)包裹主之,然后發(fā)射這些數(shù)據(jù)包裹,而不是一次發(fā)射一個值李根。

buffer

Buffer操作符將一個Observable變換為另一個槽奕,原來的Observable正常發(fā)射數(shù)據(jù),變換產(chǎn)生的Observable發(fā)射這些數(shù)據(jù)的緩存集合房轿。Buffer操作符在很多語言特定的實(shí)現(xiàn)中有很多種變體粤攒,它們在如何緩存這個問題上存在區(qū)別。

注意:如果原來的Observable發(fā)射了一個onError通知囱持,Buffer會立即傳遞這個通知夯接,而不是首先發(fā)射緩存的數(shù)據(jù),即使在這之前緩存中包含了原始Observable發(fā)射的數(shù)據(jù)纷妆。

Window操作符與Buffer類似盔几,但是它在發(fā)射之前把收集到的數(shù)據(jù)放進(jìn)單獨(dú)的Observable,而不是放進(jìn)一個數(shù)據(jù)結(jié)構(gòu)掩幢。

在RxJava中有許多Buffer的變體:

buffer(count)

buffer3

buffer(count)以列表(List)的形式發(fā)射非重疊的緩存逊拍,每一個緩存至多包含來自原始Observable的count項(xiàng)數(shù)據(jù)(最后發(fā)射的列表數(shù)據(jù)可能少于count項(xiàng))

buffer(count, skip)

buffer4

buffer(count,?skip)從原始Observable的第一項(xiàng)數(shù)據(jù)開始創(chuàng)建新的緩存,此后每當(dāng)收到skip項(xiàng)數(shù)據(jù)粒蜈,用count項(xiàng)數(shù)據(jù)填充緩存:開頭的一項(xiàng)和后續(xù)的count-1項(xiàng)顺献,它以列表(List)的形式發(fā)射緩存,取決于countskip的值枯怖,這些緩存可能會有重疊部分(比如skip < count時(shí))注整,也可能會有間隙(比如skip > count時(shí))。

buffer(bufferClosingSelector)

buffer1

當(dāng)它訂閱原來的Observable時(shí),buffer(bufferClosingSelector)開始將數(shù)據(jù)收集到一個List肿轨,然后它調(diào)用bufferClosingSelector生成第二個Observable寿冕,當(dāng)?shù)诙€Observable發(fā)射一個TClosing時(shí),buffer發(fā)射當(dāng)前的List椒袍,然后重復(fù)這個過程:開始組裝一個新的List驼唱,然后調(diào)用bufferClosingSelector創(chuàng)建一個新的Observable并監(jiān)視它。它會一直這樣做直到原來的Observable執(zhí)行完成驹暑。

buffer(boundary)

buffer8

buffer(boundary)監(jiān)視一個名叫boundary的Observable玫恳,每當(dāng)這個Observable發(fā)射了一個值,它就創(chuàng)建一個新的List開始收集來自原始Observable的數(shù)據(jù)并發(fā)射原來的List优俘。

buffer(bufferOpenings, bufferClosingSelector)

buffer2

buffer(bufferOpenings,?bufferClosingSelector)監(jiān)視這個叫bufferOpenings的Observable(它發(fā)射BufferOpening對象)京办,每當(dāng)bufferOpenings發(fā)射了一個數(shù)據(jù)時(shí),它就創(chuàng)建一個新的List開始收集原始Observable的數(shù)據(jù)帆焕,并將bufferOpenings傳遞給closingSelector函數(shù)惭婿。這個函數(shù)返回一個Observable。buffer監(jiān)視這個Observable叶雹,當(dāng)它檢測到一個來自這個Observable的數(shù)據(jù)時(shí)财饥,就關(guān)閉List并且發(fā)射它自己的數(shù)據(jù)(之前的那個List)。

buffer(timespan, unit[, scheduler])

buffer5

buffer(timespan,?unit)定期以List的形式發(fā)射新的數(shù)據(jù)折晦,每個時(shí)間段钥星,收集來自原始Observable的數(shù)據(jù)(從前面一個數(shù)據(jù)包裹之后,或者如果是第一個數(shù)據(jù)包裹筋遭,從有觀察者訂閱原來的Observale之后開始)打颤。還有另一個版本的buffer接受一個Scheduler參數(shù),默認(rèn)情況下會使用computation調(diào)度器漓滔。

buffer(timespan, unit, count[, scheduler])

buffer6

每當(dāng)收到來自原始Observable的count項(xiàng)數(shù)據(jù)编饺,或者每過了一段指定的時(shí)間后,buffer(timespan,?unit,?count)就以List的形式發(fā)射這期間的數(shù)據(jù)响驴,即使數(shù)據(jù)項(xiàng)少于count項(xiàng)透且。還有另一個版本的buffer接受一個Scheduler參數(shù),默認(rèn)情況下會使用computation調(diào)度器豁鲤。

buffer(timespan, timeshift, unit[, scheduler])

buffer7

buffer(timespan,?timeshift,?unit)在每一個timeshift時(shí)期內(nèi)都創(chuàng)建一個新的List,然后用原始Observable發(fā)射的每一項(xiàng)數(shù)據(jù)填充這個列表(在把這個List當(dāng)做自己的數(shù)據(jù)發(fā)射前秽誊,從創(chuàng)建時(shí)開始,直到過了timespan這么長的時(shí)間)琳骡。如果timespan長于timeshift锅论,它發(fā)射的數(shù)據(jù)包將會重疊,因此可能包含重復(fù)的數(shù)據(jù)項(xiàng)楣号。

還有另一個版本的buffer接受一個Scheduler參數(shù)最易,默認(rèn)情況下會使用computation調(diào)度器怒坯。

buffer-backpressure

你可以使用Buffer操作符實(shí)現(xiàn)反壓backpressure(意思是,處理這樣一個Observable:它產(chǎn)生數(shù)據(jù)的速度可能比它的觀察者消費(fèi)數(shù)據(jù)的速度快)藻懒。

bp.buffer2

Buffer操作符可以將大量的數(shù)據(jù)序列縮減為較少的數(shù)據(jù)緩存序列剔猿,讓它們更容易處理。例如嬉荆,你可以按固定的時(shí)間間隔归敬,定期關(guān)閉和發(fā)射來自一個爆發(fā)性O(shè)bservable的數(shù)據(jù)緩存。這相當(dāng)于一個緩沖區(qū)鄙早。

示例代碼

Observable<List<Integer>> burstyBuffered = bursty.buffer(500, TimeUnit.MILLISECONDS);
bp.buffer1

或者汪茧,如果你想更進(jìn)一步,可以在爆發(fā)期將數(shù)據(jù)收集到緩存限番,然后在爆發(fā)期終止時(shí)發(fā)射這些數(shù)據(jù)陆爽,使用 Debounce 操作符給buffer操作符發(fā)射一個緩存關(guān)閉指示器(buffer closing indicator)可以做到這一點(diǎn)。

代碼示例:

// we have to multicast the original bursty Observable so we can use it
// both as our source and as the source for our buffer closing selector:
Observable<Integer> burstyMulticast = bursty.publish().refCount();
// burstyDebounced will be our buffer closing selector:
Observable<Integer> burstyDebounced = burstyMulticast.debounce(10, TimeUnit.MILLISECONDS);
// and this, finally, is the Observable of buffers we're interested in:
Observable<List<Integer>> burstyBuffered = burstyMulticast.buffer(burstyDebounced);

參見

FlatMap

FlatMap將一個發(fā)射數(shù)據(jù)的Observable變換為多個Observables扳缕,然后將它們發(fā)射的數(shù)據(jù)合并后放進(jìn)一個單獨(dú)的Observable

flatMap

FlatMap操作符使用一個指定的函數(shù)對原始Observable發(fā)射的每一項(xiàng)數(shù)據(jù)執(zhí)行變換操作,這個函數(shù)返回一個本身也發(fā)射數(shù)據(jù)的Observable别威,然后FlatMap合并這些Observables發(fā)射的數(shù)據(jù)躯舔,最后將合并后的結(jié)果當(dāng)做它自己的數(shù)據(jù)序列發(fā)射。

這個方法是很有用的省古,例如粥庄,當(dāng)你有一個這樣的Observable:它發(fā)射一個數(shù)據(jù)序列,這些數(shù)據(jù)本身包含Observable成員或者可以變換為Observable豺妓,因此你可以創(chuàng)建一個新的Observable發(fā)射這些次級Observable發(fā)射的數(shù)據(jù)的完整集合惜互。

注意:FlatMap對這些Observables發(fā)射的數(shù)據(jù)做的是合并(merge)操作,因此它們可能是交錯的琳拭。

在許多語言特定的實(shí)現(xiàn)中训堆,還有一個操作符不會讓變換后的Observables發(fā)射的數(shù)據(jù)交錯,它按照嚴(yán)格的順序發(fā)射這些數(shù)據(jù)白嘁,這個操作符通常被叫作ConcatMap或者類似的名字坑鱼。

mergeMap

RxJava將這個操作符實(shí)現(xiàn)為flatMap函數(shù)。

注意:如果任何一個通過這個flatMap操作產(chǎn)生的單獨(dú)的Observable調(diào)用onError異常終止了絮缅,這個Observable自身會立即調(diào)用onError并終止懈贺。

這個操作符有一個接受額外的int參數(shù)的一個變體逊抡。這個參數(shù)設(shè)置flatMap從原來的Observable映射Observables的最大同時(shí)訂閱數(shù)。當(dāng)達(dá)到這個限制時(shí),它會等待其中一個終止然后再訂閱另一個搬泥。

mergeMap.nce

還有一個版本的flatMap為原始Observable的每一項(xiàng)數(shù)據(jù)和每一個通知創(chuàng)建一個新的Observable(并對數(shù)據(jù)平坦化)。

它也有一個接受額外int參數(shù)的變體隐绵。

mergeMap.r

還有一個版本的flatMap會使用原始Observable的數(shù)據(jù)觸發(fā)的Observable組合這些數(shù)據(jù)愧怜,然后發(fā)射這些數(shù)據(jù)組合缠局。它也有一個接受額外int參數(shù)的版本。

flatMapIterable

mergeMapIterable

flatMapIterable這個變體成對的打包數(shù)據(jù)蔼两,然后生成Iterable而不是原始數(shù)據(jù)和生成的Observables甩鳄,但是處理方式是相同的。

concatMap

concatMap

還有一個concatMap操作符额划,它類似于最簡單版本的flatMap妙啃,但是它按次序連接而不是合并那些生成的Observables,然后產(chǎn)生自己的數(shù)據(jù)序列俊戳。

switchMap

switchMap

RxJava還實(shí)現(xiàn)了switchMap操作符揖赴。它和flatMap很像,除了一點(diǎn):當(dāng)原始Observable發(fā)射一個新的數(shù)據(jù)(Observable)時(shí)抑胎,它將取消訂閱并停止監(jiān)視產(chǎn)生執(zhí)之前那個數(shù)據(jù)的Observable燥滑,只監(jiān)視當(dāng)前這一個。

split

St.split

在特殊的StringObservable類(默認(rèn)沒有包含在RxJava中)中還有一個split操作符阿逃。它將一個發(fā)射字符串的Observable轉(zhuǎn)換為另一個發(fā)射字符串的Observable铭拧,只不過,后者將原始的數(shù)據(jù)序列當(dāng)做一個數(shù)據(jù)流恃锉,使用一個正則表達(dá)式邊界分割它們搀菩,然后合并發(fā)射分割的結(jié)果。

GroupBy

將一個Observable分拆為一些Observables集合破托,它們中的每一個發(fā)射原始Observable的一個子序列

groupBy

GroupBy操作符將原始Observable分拆為一些Observables集合肪跋,它們中的每一個發(fā)射原始Observable數(shù)據(jù)序列的一個子序列。哪個數(shù)據(jù)項(xiàng)由哪一個Observable發(fā)射是由一個函數(shù)判定的土砂,這個函數(shù)給每一項(xiàng)指定一個Key州既,Key相同的數(shù)據(jù)會被同一個Observable發(fā)射。

RxJava實(shí)現(xiàn)了groupBy操作符萝映。它返回Observable的一個特殊子類GroupedObservable吴叶,實(shí)現(xiàn)了GroupedObservable接口的對象有一個額外的方法getKey,這個Key用于將數(shù)據(jù)分組到指定的Observable序臂。

有一個版本的groupBy允許你傳遞一個變換函數(shù)晤郑,這樣它可以在發(fā)射結(jié)果GroupedObservable之前改變數(shù)據(jù)項(xiàng)。

注意:groupBy將原始Observable分解為一個發(fā)射多個GroupedObservable的Observable贸宏,一旦有訂閱造寝,每個GroupedObservable就開始緩存數(shù)據(jù)。因此吭练,如果你忽略這些GroupedObservable中的任何一個诫龙,這個緩存可能形成一個潛在的內(nèi)存泄露。因此鲫咽,如果你不想觀察签赃,也不要忽略GroupedObservable谷异。你應(yīng)該使用像take(0)這樣會丟棄自己的緩存的操作符。

如果你取消訂閱一個GroupedObservable锦聊,那個Observable將會終止歹嘹。如果之后原始的Observable又發(fā)射了一個與這個Observable的Key匹配的數(shù)據(jù),groupBy將會為這個Key創(chuàng)建一個新的GroupedObservable孔庭。

groupBy默認(rèn)不在任何特定的調(diào)度器上執(zhí)行尺上。

Map

對Observable發(fā)射的每一項(xiàng)數(shù)據(jù)應(yīng)用一個函數(shù),執(zhí)行變換操作

map

Map操作符對原始Observable發(fā)射的每一項(xiàng)數(shù)據(jù)應(yīng)用一個你選擇的函數(shù)圆到,然后返回一個發(fā)射這些結(jié)果的Observable怎抛。

RxJava將這個操作符實(shí)現(xiàn)為map函數(shù)。這個操作符默認(rèn)不在任何特定的調(diào)度器上執(zhí)行芽淡。

cast

cast

cast操作符將原始Observable發(fā)射的每一項(xiàng)數(shù)據(jù)都強(qiáng)制轉(zhuǎn)換為一個指定的類型马绝,然后再發(fā)射數(shù)據(jù),它是map的一個特殊版本挣菲。

下面是常用的操作符列表:

  1. 創(chuàng)建操作 Create, Defer, Empty/Never/Throw, From, Interval, Just, Range, Repeat, Start, Timer
  2. 變換操作 Buffer, FlatMap, GroupBy, Map, Scan和Window
  3. 過濾操作 Debounce, Distinct, ElementAt, Filter, First, IgnoreElements, Last, Sample, Skip, SkipLast, Take, TakeLast
  4. 組合操作 And/Then/When, CombineLatest, Join, Merge, StartWith, Switch, Zip
  5. 錯誤處理 Catch和Retry
  6. 輔助操作 Delay, Do, Materialize/Dematerialize, ObserveOn, Serialize, Subscribe, SubscribeOn, TimeInterval, Timeout, Timestamp, Using
  7. 條件和布爾操作 All, Amb, Contains, DefaultIfEmpty, SequenceEqual, SkipUntil, SkipWhile, TakeUntil, TakeWhile
  8. 算術(shù)和集合操作 Average, Concat, Count, Max, Min, Reduce, Sum
  9. 轉(zhuǎn)換操作 To
  10. 連接操作 Connect, Publish, RefCount, Replay
  11. 反壓操作富稻,用于增加特殊的流程控制策略的操作符

這些操作符并不全都是ReactiveX的核心組成部分,有一些是語言特定的實(shí)現(xiàn)或可選的模塊白胀。

RxJava

在RxJava中唉窃,一個實(shí)現(xiàn)了Observer接口的對象可以訂閱(subscribe)一個Observable 類的實(shí)例。訂閱者(subscriber)對Observable發(fā)射(emit)的任何數(shù)據(jù)或數(shù)據(jù)序列作出響應(yīng)纹笼。這種模式簡化了并發(fā)操作,因?yàn)樗恍枰枞却齇bservable發(fā)射數(shù)據(jù)苟跪,而是創(chuàng)建了一個處于待命狀態(tài)的觀察者哨兵廷痘,哨兵在未來某個時(shí)刻響應(yīng)Observable的通知。

Single

介紹

RxJava(以及它派生出來的RxGroovy和RxScala)中有一個名為Single的Observable變種件已。

Single類似于Observable笋额,不同的是,它總是只發(fā)射一個值篷扩,或者一個錯誤通知兄猩,而不是發(fā)射一系列的值。

因此鉴未,不同于Observable需要三個方法onNext, onError, onCompleted枢冤,訂閱Single只需要兩個方法:

  • onSuccess - Single發(fā)射單個的值到這個方法
  • onError - 如果無法發(fā)射需要的值,Single發(fā)射一個Throwable對象到這個方法

Single只會調(diào)用這兩個方法中的一個铜秆,而且只會調(diào)用一次淹真,調(diào)用了任何一個方法之后,訂閱關(guān)系終止连茧。

Single的操作符

Single也可以組合使用多種操作核蘸,一些操作符讓你可以混合使用Observable和Single:

操作符 返回值 說明
compose Single 創(chuàng)建一個自定義的操作符
concat and concatWith Observable 連接多個Single和Observable發(fā)射的數(shù)據(jù)
create Single 調(diào)用觀察者的create方法創(chuàng)建一個Single
error Single 返回一個立即給訂閱者發(fā)射錯誤通知的Single
flatMap Single 返回一個Single巍糯,它發(fā)射對原Single的數(shù)據(jù)執(zhí)行flatMap操作后的結(jié)果
flatMapObservable Observable 返回一個Observable,它發(fā)射對原Single的數(shù)據(jù)執(zhí)行flatMap操作后的結(jié)果
from Single 將Future轉(zhuǎn)換成Single
just Single 返回一個發(fā)射一個指定值的Single
map Single 返回一個Single客扎,它發(fā)射對原Single的數(shù)據(jù)執(zhí)行map操作后的結(jié)果
merge Single 將一個Single(它發(fā)射的數(shù)據(jù)是另一個Single祟峦,假設(shè)為B)轉(zhuǎn)換成另一個Single(它發(fā)射來自另一個Single(B)的數(shù)據(jù))
merge and mergeWith Observable 合并發(fā)射來自多個Single的數(shù)據(jù)
observeOn Single 指示Single在指定的調(diào)度程序上調(diào)用訂閱者的方法
onErrorReturn Single 將一個發(fā)射錯誤通知的Single轉(zhuǎn)換成一個發(fā)射指定數(shù)據(jù)項(xiàng)的Single
subscribeOn Single 指示Single在指定的調(diào)度程序上執(zhí)行操作
timeout Single 它給原有的Single添加超時(shí)控制,如果超時(shí)了就發(fā)射一個錯誤通知
toSingle Single 將一個發(fā)射單個值的Observable轉(zhuǎn)換為一個Single
zip and zipWith Single 將多個Single轉(zhuǎn)換為一個徙鱼,后者發(fā)射的數(shù)據(jù)是對前者應(yīng)用一個函數(shù)后的結(jié)果

操作符圖示

詳細(xì)的圖解可以參考英文文檔:Single

Subject

Subject可以看成是一個橋梁或者代理宅楞,在某些ReactiveX實(shí)現(xiàn)中(如RxJava),它同時(shí)充當(dāng)了Observer和Observable的角色疆偿。因?yàn)樗且粋€Observer咱筛,它可以訂閱一個或多個Observable;又因?yàn)樗且粋€Observable杆故,它可以轉(zhuǎn)發(fā)它收到(Observe)的數(shù)據(jù)迅箩,也可以發(fā)射新的數(shù)據(jù)。

由于一個Subject訂閱一個Observable处铛,它可以觸發(fā)這個Observable開始發(fā)射數(shù)據(jù)(如果那個Observable是"冷"的--就是說饲趋,它等待有訂閱才開始發(fā)射數(shù)據(jù))。因此有這樣的效果撤蟆,Subject可以把原來那個"冷"的Observable變成"熱"的奕塑。

Subject的種類

針對不同的場景一共有四種類型的Subject。他們并不是在所有的實(shí)現(xiàn)中全部都存在家肯,而且一些實(shí)現(xiàn)使用其它的命名約定(例如龄砰,在RxScala中Subject被稱作PublishSubject)。

AsyncSubject

一個AsyncSubject只在原始Observable完成后讨衣,發(fā)射來自原始Observable的最后一個值换棚。(如果原始Observable沒有發(fā)射任何值,AsyncObject也不發(fā)射任何值)它會把這最后一個值發(fā)射給任何后續(xù)的觀察者反镇。
img

然而固蚤,如果原始的Observable因?yàn)榘l(fā)生了錯誤而終止,AsyncSubject將不會發(fā)射任何數(shù)據(jù)歹茶,只是簡單的向前傳遞這個錯誤通知夕玩。
img

BehaviorSubject

當(dāng)觀察者訂閱BehaviorSubject時(shí),它開始發(fā)射原始Observable最近發(fā)射的數(shù)據(jù)(如果此時(shí)還沒有收到任何數(shù)據(jù)惊豺,它會發(fā)射一個默認(rèn)值)燎孟,然后繼續(xù)發(fā)射其它任何來自原始Observable的數(shù)據(jù)。
img

然而尸昧,如果原始的Observable因?yàn)榘l(fā)生了一個錯誤而終止缤弦,BehaviorSubject將不會發(fā)射任何數(shù)據(jù),只是簡單的向前傳遞這個錯誤通知彻磁。
img

PublishSubject

PublishSubject只會把在訂閱發(fā)生的時(shí)間點(diǎn)之后來自原始Observable的數(shù)據(jù)發(fā)射給觀察者碍沐。需要注意的是狸捅,PublishSubject可能會一創(chuàng)建完成就立刻開始發(fā)射數(shù)據(jù)(除非你可以阻止它發(fā)生),因此這里有一個風(fēng)險(xiǎn):在Subject被創(chuàng)建后到有觀察者訂閱它之前這個時(shí)間段內(nèi)累提,一個或多個數(shù)據(jù)可能會丟失尘喝。如果要確保來自原始Observable的所有數(shù)據(jù)都被分發(fā),你需要這樣做:或者使用Create創(chuàng)建那個Observable以便手動給它引入"冷"Observable的行為(當(dāng)所有觀察者都已經(jīng)訂閱時(shí)才開始發(fā)射數(shù)據(jù))斋陪,或者改用ReplaySubject朽褪。
img

如果原始的Observable因?yàn)榘l(fā)生了一個錯誤而終止,PublishSubject將不會發(fā)射任何數(shù)據(jù)无虚,只是簡單的向前傳遞這個錯誤通知缔赠。
img

ReplaySubject

ReplaySubject會發(fā)射所有來自原始Observable的數(shù)據(jù)給觀察者,無論它們是何時(shí)訂閱的友题。也有其它版本的ReplaySubject嗤堰,在重放緩存增長到一定大小的時(shí)候或過了一段時(shí)間后會丟棄舊的數(shù)據(jù)(原始Observable發(fā)射的)。

如果你把ReplaySubject當(dāng)作一個觀察者使用度宦,注意不要從多個線程中調(diào)用它的onNext方法(包括其它的on系列方法)踢匣,這可能導(dǎo)致同時(shí)(非順序)調(diào)用,這會違反Observable協(xié)議戈抄,給Subject的結(jié)果增加了不確定性离唬。

img

RxJava的對應(yīng)類

假設(shè)你有一個Subject,你想把它傳遞給其它的代理或者暴露它的Subscriber接口划鸽,你可以調(diào)用它的asObservable方法输莺,這個方法返回一個Observable。具體使用方法可以參考Javadoc文檔裸诽。

串行化

如果你把 Subject 當(dāng)作一個 Subscriber 使用嫂用,注意不要從多個線程中調(diào)用它的onNext方法(包括其它的on系列方法),這可能導(dǎo)致同時(shí)(非順序)調(diào)用崭捍,這會違反Observable協(xié)議,給Subject的結(jié)果增加了不確定性啰脚。

要避免此類問題殷蛇,你可以將 Subject 轉(zhuǎn)換為一個 SerializedSubject ,類似于這樣:

mySafeSubject = new SerializedSubject( myUnsafeSubject );

調(diào)度器 Scheduler

如果你想給Observable操作符鏈添加多線程功能橄浓,你可以指定操作符(或者特定的Observable)在特定的調(diào)度器(Scheduler)上執(zhí)行粒梦。

某些ReactiveX的Observable操作符有一些變體,它們可以接受一個Scheduler參數(shù)荸实。這個參數(shù)指定操作符將它們的部分或全部任務(wù)放在一個特定的調(diào)度器上執(zhí)行匀们。

使用ObserveOn和SubscribeOn操作符,你可以讓Observable在一個特定的調(diào)度器上執(zhí)行准给,ObserveOn指示一個Observable在一個特定的調(diào)度器上調(diào)用觀察者的onNext, onError和onCompleted方法泄朴,SubscribeOn更進(jìn)一步重抖,它指示Observable將全部的處理過程(包括發(fā)射數(shù)據(jù)和通知)放在特定的調(diào)度器上執(zhí)行。

RxJava示例

調(diào)度器的種類

下表展示了RxJava中可用的調(diào)度器種類:

調(diào)度器類型 效果
Schedulers.computation(?) 用于計(jì)算任務(wù)祖灰,如事件循環(huán)或和回調(diào)處理钟沛,不要用于IO操作(IO操作請使用Schedulers.io());默認(rèn)線程數(shù)等于處理器的數(shù)量
Schedulers.from(executor) 使用指定的Executor作為調(diào)度器
Schedulers.immediate(?) 在當(dāng)前線程立即開始執(zhí)行任務(wù)
Schedulers.io(?) 用于IO密集型任務(wù)局扶,如異步阻塞IO操作恨统,這個調(diào)度器的線程池會根據(jù)需要增長;對于普通的計(jì)算任務(wù)三妈,請使用Schedulers.computation()畜埋;Schedulers.io(?)默認(rèn)是一個CachedThreadScheduler,很像一個有線程緩存的新線程調(diào)度器
Schedulers.newThread(?) 為每個任務(wù)創(chuàng)建一個新線程
Schedulers.trampoline(?) 當(dāng)其它排隊(duì)的任務(wù)完成后畴蒲,在當(dāng)前線程排隊(duì)開始執(zhí)行

默認(rèn)調(diào)度器

在RxJava中悠鞍,某些Observable操作符的變體允許你設(shè)置用于操作執(zhí)行的調(diào)度器,其它的則不在任何特定的調(diào)度器上執(zhí)行饿凛,或者在一個指定的默認(rèn)調(diào)度器上執(zhí)行狞玛。下面的表格個列出了一些操作符的默認(rèn)調(diào)度器:

操作符 調(diào)度器
buffer(timespan) computation
buffer(timespan,?count) computation
buffer(timespan,?timeshift) computation
debounce(timeout,?unit) computation
delay(delay,?unit) computation
delaySubscription(delay,?unit) computation
interval computation
repeat trampoline
replay(time,?unit) computation
replay(buffersize,?time,?unit) computation
replay(selector,?time,?unit) computation
replay(selector,?buffersize,?time,?unit) computation
retry trampoline
sample(period,?unit) computation
skip(time,?unit) computation
skipLast(time,?unit) computation
take(time,?unit) computation
takeLast(time,?unit) computation
takeLast(count,?time,?unit) computation
takeLastBuffer(time,?unit) computation
takeLastBuffer(count,?time,?unit) computation
throttleFirst computation
throttleLast computation
throttleWithTimeout computation
timeInterval immediate
timeout(timeoutSelector) immediate
timeout(firstTimeoutSelector,?timeoutSelector) immediate
timeout(timeoutSelector,?other) immediate
timeout(timeout,?timeUnit) computation
timeout(firstTimeoutSelector,?timeoutSelector,?other) immediate
timeout(timeout,?timeUnit,?other) computation
timer computation
timestamp immediate
window(timespan) computation
window(timespan,?count) computation
window(timespan,?timeshift) computation

使用調(diào)度器

除了將這些調(diào)度器傳遞給RxJava的Observable操作符,你也可以用它們調(diào)度你自己的任務(wù)涧窒。下面的示例展示了Scheduler.Worker的用法:

worker = Schedulers.newThread().createWorker();
worker.schedule(new Action0() {

    @Override
    public void call() {
        yourWork();
    }

});
// some time later...
worker.unsubscribe();

遞歸調(diào)度器

要調(diào)度遞歸的方法調(diào)用心肪,你可以使用schedule,然后再用schedule(this)纠吴,示例:

worker = Schedulers.newThread().createWorker();
worker.schedule(new Action0() {

    @Override
    public void call() {
        yourWork();
        // recurse until unsubscribed (schedule will do nothing if unsubscribed)
        worker.schedule(this);
    }

});
// some time later...
worker.unsubscribe();

檢查或設(shè)置取消訂閱狀態(tài)

Worker類的對象實(shí)現(xiàn)了Subscription接口硬鞍,使用它的isUnsubscribed和unsubscribe方法,所以你可以在訂閱取消時(shí)停止任務(wù)戴已,或者從正在調(diào)度的任務(wù)內(nèi)部取消訂閱固该,示例:

Worker worker = Schedulers.newThread().createWorker();
Subscription mySubscription = worker.schedule(new Action0() {

    @Override
    public void call() {
        while(!worker.isUnsubscribed()) {
            status = yourWork();
            if(QUIT == status) { worker.unsubscribe(); }
        }
    }

});

Worker同時(shí)是Subscription,因此你可以(通常也應(yīng)該)調(diào)用它的unsubscribe方法通知可以掛起任務(wù)和釋放資源了糖儡。

延時(shí)和周期調(diào)度器

你可以使用schedule(action,delayTime,timeUnit)在指定的調(diào)度器上延時(shí)執(zhí)行你的任務(wù)伐坏,下面例子中的任務(wù)將在500毫秒之后開始執(zhí)行:

someScheduler.schedule(someAction, 500, TimeUnit.MILLISECONDS);

使用另一個版本的schedule,schedulePeriodically(action,initialDelay,period,timeUnit)方法讓你可以安排一個定期執(zhí)行的任務(wù)握联,下面例子的任務(wù)將在500毫秒之后執(zhí)行桦沉,然后每250毫秒執(zhí)行一次:

someScheduler.schedulePeriodically(someAction, 500, 250, TimeUnit.MILLISECONDS);

測試調(diào)度器

TestScheduler讓你可以對調(diào)度器的時(shí)鐘表現(xiàn)進(jìn)行手動微調(diào)。這對依賴精確時(shí)間安排的任務(wù)的測試很有用處金闽。這個調(diào)度器有三個額外的方法:

  • advanceTimeTo(time,unit) 向前波動調(diào)度器的時(shí)鐘到一個指定的時(shí)間點(diǎn)
  • advanceTimeBy(time,unit) 將調(diào)度器的時(shí)鐘向前撥動一個指定的時(shí)間段
  • triggerActions(?) 開始執(zhí)行任何計(jì)劃中的但是未啟動的任務(wù)纯露,如果它們的計(jì)劃時(shí)間等于或者早于調(diào)度器時(shí)鐘的當(dāng)前時(shí)間

操作符分類

ReactiveX的每種編程語言的實(shí)現(xiàn)都實(shí)現(xiàn)了一組操作符的集合。不同的實(shí)現(xiàn)之間有很多重疊的部分代芜,也有一些操作符只存在特定的實(shí)現(xiàn)中埠褪。每種實(shí)現(xiàn)都傾向于用那種編程語言中他們熟悉的上下文中相似的方法給這些操作符命名。

本文首先會給出ReactiveX的核心操作符列表和對應(yīng)的文檔鏈接,后面還有一個決策樹用于幫助你根據(jù)具體的場景選擇合適的操作符钞速。最后有一個語言特定實(shí)現(xiàn)的按字母排序的操作符列表贷掖。

如果你想實(shí)現(xiàn)你自己的操作符,可以參考這里:實(shí)現(xiàn)自定義操作符

創(chuàng)建操作

用于創(chuàng)建Observable的操作符

  • Create — 通過調(diào)用觀察者的方法從頭創(chuàng)建一個Observable
  • Defer — 在觀察者訂閱之前不創(chuàng)建這個Observable玉工,為每一個觀察者創(chuàng)建一個新的Observable
  • Empty/Never/Throw — 創(chuàng)建行為受限的特殊Observable
  • From — 將其它的對象或數(shù)據(jù)結(jié)構(gòu)轉(zhuǎn)換為Observable
  • Interval — 創(chuàng)建一個定時(shí)發(fā)射整數(shù)序列的Observable
  • Just — 將對象或者對象集合轉(zhuǎn)換為一個會發(fā)射這些對象的Observable
  • Range — 創(chuàng)建發(fā)射指定范圍的整數(shù)序列的Observable
  • Repeat — 創(chuàng)建重復(fù)發(fā)射特定的數(shù)據(jù)或數(shù)據(jù)序列的Observable
  • Start — 創(chuàng)建發(fā)射一個函數(shù)的返回值的Observable
  • Timer — 創(chuàng)建在一個指定的延遲之后發(fā)射單個數(shù)據(jù)的Observable

變換操作

這些操作符可用于對Observable發(fā)射的數(shù)據(jù)進(jìn)行變換羽资,詳細(xì)解釋可以看每個操作符的文檔

  • Buffer — 緩存,可以簡單的理解為緩存遵班,它定期從Observable收集數(shù)據(jù)到一個集合屠升,然后把這些數(shù)據(jù)集合打包發(fā)射,而不是一次發(fā)射一個
  • FlatMap — 扁平映射狭郑,將Observable發(fā)射的數(shù)據(jù)變換為Observables集合腹暖,然后將這些Observable發(fā)射的數(shù)據(jù)平坦化的放進(jìn)一個單獨(dú)的Observable,可以認(rèn)為是一個將嵌套的數(shù)據(jù)結(jié)構(gòu)展開的過程翰萨。
  • GroupBy — 分組脏答,將原來的Observable分拆為Observable集合,將原始Observable發(fā)射的數(shù)據(jù)按Key分組亩鬼,每一個Observable發(fā)射一組不同的數(shù)據(jù)
  • Map — 映射殖告,通過對序列的每一項(xiàng)都應(yīng)用一個函數(shù)變換Observable發(fā)射的數(shù)據(jù),實(shí)質(zhì)是對序列中的每一項(xiàng)執(zhí)行一個函數(shù)雳锋,函數(shù)的參數(shù)就是這個數(shù)據(jù)項(xiàng)
  • Scan — 掃描黄绩,對Observable發(fā)射的每一項(xiàng)數(shù)據(jù)應(yīng)用一個函數(shù),然后按順序依次發(fā)射這些值
  • Window — 窗口玷过,定期將來自O(shè)bservable的數(shù)據(jù)分拆成一些Observable窗口爽丹,然后發(fā)射這些窗口,而不是每次發(fā)射一項(xiàng)辛蚊。類似于Buffer粤蝎,但Buffer發(fā)射的是數(shù)據(jù),Window發(fā)射的是Observable袋马,每一個Observable發(fā)射原始Observable的數(shù)據(jù)的一個子集

過濾操作

這些操作符用于從Observable發(fā)射的數(shù)據(jù)中進(jìn)行選擇

  • Debounce — 只有在空閑了一段時(shí)間后才發(fā)射數(shù)據(jù)初澎,通俗的說,就是如果一段時(shí)間沒有操作虑凛,就執(zhí)行一次操作
  • Distinct — 去重碑宴,過濾掉重復(fù)數(shù)據(jù)項(xiàng)
  • ElementAt — 取值,取特定位置的數(shù)據(jù)項(xiàng)
  • Filter — 過濾卧檐,過濾掉沒有通過謂詞測試的數(shù)據(jù)項(xiàng)墓懂,只發(fā)射通過測試的
  • First — 首項(xiàng)焰宣,只發(fā)射滿足條件的第一條數(shù)據(jù)
  • IgnoreElements — 忽略所有的數(shù)據(jù)霉囚,只保留終止通知(onError或onCompleted)
  • Last — 末項(xiàng),只發(fā)射最后一條數(shù)據(jù)
  • Sample — 取樣匕积,定期發(fā)射最新的數(shù)據(jù)盈罐,等于是數(shù)據(jù)抽樣榜跌,有的實(shí)現(xiàn)里叫ThrottleFirst
  • Skip — 跳過前面的若干項(xiàng)數(shù)據(jù)
  • SkipLast — 跳過后面的若干項(xiàng)數(shù)據(jù)
  • Take — 只保留前面的若干項(xiàng)數(shù)據(jù)
  • TakeLast — 只保留后面的若干項(xiàng)數(shù)據(jù)

組合操作

組合操作符用于將多個Observable組合成一個單一的Observable

  • And/Then/When — 通過模式(And條件)和計(jì)劃(Then次序)組合兩個或多個Observable發(fā)射的數(shù)據(jù)集
  • CombineLatest — 當(dāng)兩個Observables中的任何一個發(fā)射了一個數(shù)據(jù)時(shí),通過一個指定的函數(shù)組合每個Observable發(fā)射的最新數(shù)據(jù)(一共兩個數(shù)據(jù))盅粪,然后發(fā)射這個函數(shù)的結(jié)果
  • Join — 無論何時(shí)钓葫,如果一個Observable發(fā)射了一個數(shù)據(jù)項(xiàng),只要在另一個Observable發(fā)射的數(shù)據(jù)項(xiàng)定義的時(shí)間窗口內(nèi)票顾,就將兩個Observable發(fā)射的數(shù)據(jù)合并發(fā)射
  • Merge — 將兩個Observable發(fā)射的數(shù)據(jù)組合并成一個
  • StartWith — 在發(fā)射原來的Observable的數(shù)據(jù)序列之前础浮,先發(fā)射一個指定的數(shù)據(jù)序列或數(shù)據(jù)項(xiàng)
  • Switch — 將一個發(fā)射Observable序列的Observable轉(zhuǎn)換為這樣一個Observable:它逐個發(fā)射那些Observable最近發(fā)射的數(shù)據(jù)
  • Zip — 打包,使用一個指定的函數(shù)將多個Observable發(fā)射的數(shù)據(jù)組合在一起奠骄,然后將這個函數(shù)的結(jié)果作為單項(xiàng)數(shù)據(jù)發(fā)射

錯誤處理

這些操作符用于從錯誤通知中恢復(fù)

  • Catch — 捕獲豆同,繼續(xù)序列操作,將錯誤替換為正常的數(shù)據(jù)含鳞,從onError通知中恢復(fù)
  • Retry — 重試影锈,如果Observable發(fā)射了一個錯誤通知,重新訂閱它蝉绷,期待它正常終止

輔助操作

一組用于處理Observable的操作符

  • Delay — 延遲一段時(shí)間發(fā)射結(jié)果數(shù)據(jù)
  • Do — 注冊一個動作占用一些Observable的生命周期事件鸭廷,相當(dāng)于Mock某個操作
  • Materialize/Dematerialize — 將發(fā)射的數(shù)據(jù)和通知都當(dāng)做數(shù)據(jù)發(fā)射,或者反過來
  • ObserveOn — 指定觀察者觀察Observable的調(diào)度程序(工作線程)
  • Serialize — 強(qiáng)制Observable按次序發(fā)射數(shù)據(jù)并且功能是有效的
  • Subscribe — 收到Observable發(fā)射的數(shù)據(jù)和通知后執(zhí)行的操作
  • SubscribeOn — 指定Observable應(yīng)該在哪個調(diào)度程序上執(zhí)行
  • TimeInterval — 將一個Observable轉(zhuǎn)換為發(fā)射兩個數(shù)據(jù)之間所耗費(fèi)時(shí)間的Observable
  • Timeout — 添加超時(shí)機(jī)制熔吗,如果過了指定的一段時(shí)間沒有發(fā)射數(shù)據(jù)辆床,就發(fā)射一個錯誤通知
  • Timestamp — 給Observable發(fā)射的每個數(shù)據(jù)項(xiàng)添加一個時(shí)間戳
  • Using — 創(chuàng)建一個只在Observable的生命周期內(nèi)存在的一次性資源

條件和布爾操作

這些操作符可用于單個或多個數(shù)據(jù)項(xiàng),也可用于Observable

  • All — 判斷Observable發(fā)射的所有的數(shù)據(jù)項(xiàng)是否都滿足某個條件
  • Amb — 給定多個Observable磁滚,只讓第一個發(fā)射數(shù)據(jù)的Observable發(fā)射全部數(shù)據(jù)
  • Contains — 判斷Observable是否會發(fā)射一個指定的數(shù)據(jù)項(xiàng)
  • DefaultIfEmpty — 發(fā)射來自原始Observable的數(shù)據(jù)佛吓,如果原始Observable沒有發(fā)射數(shù)據(jù),就發(fā)射一個默認(rèn)數(shù)據(jù)
  • SequenceEqual — 判斷兩個Observable是否按相同的數(shù)據(jù)序列
  • SkipUntil — 丟棄原始Observable發(fā)射的數(shù)據(jù)垂攘,直到第二個Observable發(fā)射了一個數(shù)據(jù)维雇,然后發(fā)射原始Observable的剩余數(shù)據(jù)
  • SkipWhile — 丟棄原始Observable發(fā)射的數(shù)據(jù),直到一個特定的條件為假晒他,然后發(fā)射原始Observable剩余的數(shù)據(jù)
  • TakeUntil — 發(fā)射來自原始Observable的數(shù)據(jù)吱型,直到第二個Observable發(fā)射了一個數(shù)據(jù)或一個通知
  • TakeWhile — 發(fā)射原始Observable的數(shù)據(jù),直到一個特定的條件為真陨仅,然后跳過剩余的數(shù)據(jù)

算術(shù)和聚合操作

這些操作符可用于整個數(shù)據(jù)序列

  • Average — 計(jì)算Observable發(fā)射的數(shù)據(jù)序列的平均值津滞,然后發(fā)射這個結(jié)果
  • Concat — 不交錯的連接多個Observable的數(shù)據(jù)
  • Count — 計(jì)算Observable發(fā)射的數(shù)據(jù)個數(shù),然后發(fā)射這個結(jié)果
  • Max — 計(jì)算并發(fā)射數(shù)據(jù)序列的最大值
  • Min — 計(jì)算并發(fā)射數(shù)據(jù)序列的最小值
  • Reduce — 按順序?qū)?shù)據(jù)序列的每一個應(yīng)用某個函數(shù)灼伤,然后返回這個值
  • Sum — 計(jì)算并發(fā)射數(shù)據(jù)序列的和

連接操作

一些有精確可控的訂閱行為的特殊Observable

  • Connect — 指示一個可連接的Observable開始發(fā)射數(shù)據(jù)給訂閱者
  • Publish — 將一個普通的Observable轉(zhuǎn)換為可連接的
  • RefCount — 使一個可連接的Observable表現(xiàn)得像一個普通的Observable
  • Replay — 確保所有的觀察者收到同樣的數(shù)據(jù)序列触徐,即使他們在Observable開始發(fā)射數(shù)據(jù)之后才訂閱

轉(zhuǎn)換操作

  • To — 將Observable轉(zhuǎn)換為其它的對象或數(shù)據(jù)結(jié)構(gòu)
  • Blocking 阻塞Observable的操作符

操作符決策樹

幾種主要的需求

  • 直接創(chuàng)建一個Observable(創(chuàng)建操作)
  • 組合多個Observable(組合操作)
  • 對Observable發(fā)射的數(shù)據(jù)執(zhí)行變換操作(變換操作)
  • 從Observable發(fā)射的數(shù)據(jù)中取特定的值(過濾操作)
  • 轉(zhuǎn)發(fā)Observable的部分值(條件/布爾/過濾操作)
  • 對Observable發(fā)射的數(shù)據(jù)序列求值(算術(shù)/聚合操作)

這個頁面展示了創(chuàng)建Observable的各種方法。

  • just(?) — 將一個或多個對象轉(zhuǎn)換成發(fā)射這個或這些對象的一個Observable
  • from(?) — 將一個Iterable, 一個Future, 或者一個數(shù)組轉(zhuǎn)換成一個Observable
  • repeat(?) — 創(chuàng)建一個重復(fù)發(fā)射指定數(shù)據(jù)或數(shù)據(jù)序列的Observable
  • repeatWhen(?) — 創(chuàng)建一個重復(fù)發(fā)射指定數(shù)據(jù)或數(shù)據(jù)序列的Observable狐赡,它依賴于另一個Observable發(fā)射的數(shù)據(jù)
  • create(?) — 使用一個函數(shù)從頭創(chuàng)建一個Observable
  • defer(?) — 只有當(dāng)訂閱者訂閱才創(chuàng)建Observable撞鹉;為每個訂閱創(chuàng)建一個新的Observable
  • range(?) — 創(chuàng)建一個發(fā)射指定范圍的整數(shù)序列的Observable
  • interval(?) — 創(chuàng)建一個按照給定的時(shí)間間隔發(fā)射整數(shù)序列的Observable
  • timer(?) — 創(chuàng)建一個在給定的延時(shí)之后發(fā)射單個數(shù)據(jù)的Observable
  • empty(?) — 創(chuàng)建一個什么都不做直接通知完成的Observable
  • error(?) — 創(chuàng)建一個什么都不做直接通知錯誤的Observable
  • never(?) — 創(chuàng)建一個不發(fā)射任何數(shù)據(jù)的Observable
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
禁止轉(zhuǎn)載,如需轉(zhuǎn)載請通過簡信或評論聯(lián)系作者。
  • 序言:七十年代末鸟雏,一起剝皮案震驚了整個濱河市享郊,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌孝鹊,老刑警劉巖炊琉,帶你破解...
    沈念sama閱讀 221,888評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異又活,居然都是意外死亡苔咪,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,677評論 3 399
  • 文/潘曉璐 我一進(jìn)店門柳骄,熙熙樓的掌柜王于貴愁眉苦臉地迎上來悼泌,“玉大人,你說我怎么就攤上這事夹界」堇铮” “怎么了?”我有些...
    開封第一講書人閱讀 168,386評論 0 360
  • 文/不壞的土叔 我叫張陵可柿,是天一觀的道長鸠踪。 經(jīng)常有香客問我,道長复斥,這世上最難降的妖魔是什么营密? 我笑而不...
    開封第一講書人閱讀 59,726評論 1 297
  • 正文 為了忘掉前任,我火速辦了婚禮目锭,結(jié)果婚禮上评汰,老公的妹妹穿的比我還像新娘。我一直安慰自己痢虹,他們只是感情好被去,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,729評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著奖唯,像睡著了一般惨缆。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上丰捷,一...
    開封第一講書人閱讀 52,337評論 1 310
  • 那天坯墨,我揣著相機(jī)與錄音,去河邊找鬼病往。 笑死捣染,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的停巷。 我是一名探鬼主播耍攘,決...
    沈念sama閱讀 40,902評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼累贤,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了少漆?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,807評論 0 276
  • 序言:老撾萬榮一對情侶失蹤硼被,失蹤者是張志新(化名)和其女友劉穎示损,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體嚷硫,經(jīng)...
    沈念sama閱讀 46,349評論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡检访,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,439評論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了仔掸。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片脆贵。...
    茶點(diǎn)故事閱讀 40,567評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖起暮,靈堂內(nèi)的尸體忽然破棺而出卖氨,到底是詐尸還是另有隱情,我是刑警寧澤负懦,帶...
    沈念sama閱讀 36,242評論 5 350
  • 正文 年R本政府宣布筒捺,位于F島的核電站,受9級特大地震影響纸厉,放射性物質(zhì)發(fā)生泄漏系吭。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,933評論 3 334
  • 文/蒙蒙 一颗品、第九天 我趴在偏房一處隱蔽的房頂上張望肯尺。 院中可真熱鬧,春花似錦躯枢、人聲如沸则吟。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,420評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽逾滥。三九已至,卻和暖如春败匹,著一層夾襖步出監(jiān)牢的瞬間寨昙,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,531評論 1 272
  • 我被黑心中介騙來泰國打工掀亩, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留舔哪,地道東北人。 一個月前我還...
    沈念sama閱讀 48,995評論 3 377
  • 正文 我出身青樓槽棍,卻偏偏與公主長得像捉蚤,于是被迫代替她去往敵國和親抬驴。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,585評論 2 359