RxJava2筆記(三须揣、訂閱線程切換)

在前面兩篇文章RxJava2筆記(一盐股、事件訂閱流程)RxJava2筆記(二、事件取消流程)中耻卡,我們分別了解了事件的訂閱以及取消是如何進行的疯汁,接下來我們將要介紹RxJava的線程切換。

對RxJava有過了解的肯定知道卵酪,前面的代碼都是運行在主線程當(dāng)中幌蚊,讓我們來確認(rèn)下,繼續(xù)對代碼做一些改動凛澎,添加些打印日志霹肝,打印各個方法的當(dāng)前運行線程:

Observer<Integer> observer = new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--運行線程:" + Thread.currentThread().getName());
        disposable = d;
    }

    @Override
    public void onNext(Integer integer) {
        Log.i(TAG, "onNext: " + integer + " --運行線程:" + Thread.currentThread().getName());
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError: " + e.getMessage());
        e.printStackTrace();
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete--運行線程:" + Thread.currentThread().getName());
    }
};

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) {
        Log.i(TAG, "subscribe--運行線程:" + Thread.currentThread().getName());
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onNext(3);
        emitter.onComplete();
    }
}).subscribe(observer);

輸出結(jié)果如下:

I/MainActivity: onSubscribe--運行線程:main
    subscribe--運行線程:main
I/MainActivity: onNext: 1 --運行線程:main
    onNext: 2 --運行線程:main
    onNext: 3 --運行線程:main
    onComplete--運行線程:main

可以看到,所有的線程都運行在主線程當(dāng)中塑煎。

而在實際開發(fā)中沫换,我們需要為網(wǎng)絡(luò)請求相關(guān)代碼單獨開啟一個子線程,將網(wǎng)絡(luò)請求代碼運行在這個子線程中最铁,當(dāng)網(wǎng)絡(luò)請求結(jié)束返回數(shù)據(jù)并開始更新UI界面時讯赏,我們需要將線程切換回主線程后才能去更新UI界面,否則就會報錯(android.view.ViewRootImpl$CalledFromWrongThreadException: Only the original thread that created a view hierarchy can touch its views. at android.view.ViewRootImpl.checkThread(ViewRootImpl.java:6838))
這段話的意思其實就是我們只可以在主線程中更新UI冷尉,當(dāng)我們在其他線程中更新UI時漱挎,就會拋出這個異常。

而使用RxJava切換線程也非常簡單雀哨,只需要添加兩個調(diào)用方法即可:


image.png

我們在訂閱線程添加了一行代碼TimeUnit.MILLISECONDS.sleep(1000)來模擬網(wǎng)絡(luò)請求磕谅。如果在此處不添加這行代碼,當(dāng)我們把observeOn(AndroidSchedulers.mainThread())這行代碼去掉時雾棺,程序仍能正常運行膊夹,有興趣的讀者可以試一下。至于原因捌浩,參考這篇文章Android中子線程真的不能更新UI嗎放刨?

好了尸饺,按照上圖的代碼进统,我們再打印下結(jié)果(打印結(jié)果中間亂入了兩行助币,這是因為訂閱線程睡眠了1s的緣故,我們直接無視掉就行):

I/MainActivity: onSubscribe--運行線程:main
I/MainActivity: subscribe--運行線程:RxCachedThreadScheduler-1
I/MainActivity: onNext: 1 --運行線程:main
I/MainActivity: onNext: 2 --運行線程:main
I/OpenGLRenderer: Initialized EGL, version 1.4
W/art: Before Android 4.1, method int android.support.v7.widget.DropDownListView.lookForSelectablePosition(int, boolean) would have incorrectly overridden the package-private method in android.widget.ListView
I/MainActivity: onNext: 3 --運行線程:main
I/MainActivity: onComplete--運行線程:main

從上面的結(jié)果可以看到螟碎,我們的訂閱線程(即發(fā)射數(shù)據(jù)事件所在的線程眉菱,也就是被觀察者)運行在了名為RxCachedThreadScheduler-1這個線程中,而observer的四個方法(觀察者)均運行在main線程中(也就是UI線程)抚芦。接下來我們就開始分析這兩個線程切換代碼做了哪些工作倍谜。


我們先從訂閱線程開始:

將訂閱線程切換到子線程運行

將訂閱線程切換到子線程只需要調(diào)用方法subscribeOn(Schedulers.io())就能完成切換。那么這個方法做了哪些工作呢叉抡?我們點進去看看:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

該方法接收一個Scheduler類型的參數(shù),這個Scheduler類就是負(fù)責(zé)線程調(diào)度的答毫,關(guān)于這個類我們稍后再討論褥民。沿著上面的代碼接著分析下去,subscribeOn(Scheduler scheduler)方法將傳入的scheduler參數(shù)又傳遞到了new ObservableSubscribeOn<T>(this, scheduler)這個構(gòu)造方法中洗搂,然后通過RxJavaPlugins.onAssembly將這個生成的ObservableSubscribeOn對象作為鉤子返回生成一個新的Observable對象消返。我們接著來看ObservableSubscribeOn這個類:

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T>{
}

發(fā)現(xiàn)它繼承了AbstractObservableWithUpstream這個類

/**
 * Base class for operators with a source consumable.
 *
 * @param <T> the input source type
 * @param <U> the output type
 */
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {

    /** The source consumable Observable. */
    protected final ObservableSource<T> source;

    /**
     * Constructs the ObservableSource with the given consumable.
     * @param source the consumable Observable
     */
    AbstractObservableWithUpstream(ObservableSource<T> source) {
        this.source = source;
    }

    @Override
    public final ObservableSource<T> source() {
        return source;
    }
}

通過源碼可以發(fā)現(xiàn),AbstractObservableWithUpstream繼承了Observable耘拇,其內(nèi)部有個ObservableSource類型成員變量source撵颊,這個ObservableSource又是什么?我們點進去看下

/**
 * Represents a basic, non-backpressured {@link Observable} source base interface,
 * consumable via an {@link Observer}.
 *
 * @param <T> the element type
 * @since 2.0
 */
public interface ObservableSource<T> {

    /**
     * Subscribes the given Observer to this ObservableSource instance.
     * @param observer the Observer, not null
     * @throws NullPointerException if {@code observer} is null
     */
    void subscribe(@NonNull Observer<? super T> observer);
}

可以看出惫叛,ObservableSource是一個接口倡勇,Observable實現(xiàn)了這個接口,而這個接口和前面的ObservableOnSubscribe接口如出一轍嘉涌。因此AbstractObservableWithUpstream中的source成員變量就是用來保存上游傳遞下來的observable妻熊,也就是Observable.create方法生成的數(shù)據(jù)源。
我們來看下ObservableSubscribeOn的源碼:

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    //1仑最、線程調(diào)度器扔役,這里是負(fù)責(zé)訂閱線程切換
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        //2、保存上游傳遞的observable
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        //3警医、封裝外部傳遞進來的觀察者對象(將observer包裝起來亿胸,這是一個裝飾器模式)
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        //4、建立訂閱關(guān)系(觀察者調(diào)用自己的onSubscribe方法)
        s.onSubscribe(parent);
        //5预皇、將訂閱線程中的disposable賦值給parent(也就是SubscribeOnObserver對象)
        //這里使用scheduler開始線程調(diào)度侈玄,將外部observer的包裝對象parent用SubscribeTask構(gòu)造方法包裝起來并使這個SubscribeTask運行在我們指定的線程中
        //這個SubscribeTask是一個Runnable,實際上真正的訂閱是發(fā)生在它的run()方法里面深啤,而這個run()方法正是運行在我們前面指定的線程中(比如Schedulers.io()指定的IO線程)拗馒。
        //這樣我們就完成了線程切換
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
    //......代碼省略
    }

    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            //6、訂閱事件發(fā)生
            source.subscribe(parent);
        }
    }
}

在步驟3中溯街,這個外部observer的包裝類SubscribeOnObserver跟前面所講的CreateEmitter相比較起來诱桂,雖然它們都是observer包裝類洋丐,但還是有一定的區(qū)別的,我們來分析下這個類:

static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
    private static final long serialVersionUID = 8094547886072529208L;
    //7挥等、保存外部傳進來的observer對象
    //(這里的actual用于保存下游傳遞過來的observer友绝,
    //當(dāng)前這個類SubscribeOnObserver對象可以看做是介于上游observable和下游observer之間的一個中間observer,主要用于輔助訂閱線程切換)
    final Observer<? super T> actual;
    //8肝劲、保存訂閱發(fā)生時生成的disposable迁客,用于后續(xù)的解除訂閱
    final AtomicReference<Disposable> s;

    SubscribeOnObserver(Observer<? super T> actual) {
        this.actual = actual;
        this.s = new AtomicReference<Disposable>();
    }
        
    @Override
    public void onSubscribe(Disposable s) {
        //9、該方法在調(diào)用source.subscribe(parent)時最先調(diào)用辞槐,
        //將當(dāng)前訂閱事件產(chǎn)生的disposable保存到AtomicReference<Disposable> s這個成員變量中掷漱,用于之后取消訂閱
        //(從上面的代碼可以看出這里傳入的參數(shù)實際上就是SubscribeOnObserver自身對象)
        DisposableHelper.setOnce(this.s, s);
    }

    @Override
    public void onNext(T t) {
        //10、這里調(diào)用下游observer的onNext方法榄檬,actual保存的就是下游傳遞過來的observer
        //后面的onComplete和onError方法同理
        actual.onNext(t);
    }

    @Override
    public void onError(Throwable t) {
        actual.onError(t);
    }

    @Override
    public void onComplete() {
        actual.onComplete();
    }

    @Override
    public void dispose() {
        //11卜范、取消訂閱時除了要終止當(dāng)前訂閱事件(onSubscribe方法被調(diào)用時保存的disposable(保存在其成員變量s中)),
        //還要終止scheduler線程調(diào)度時保存其返回的task(保存在其自身)鹿榜,
        //因為網(wǎng)絡(luò)請求任務(wù)是運行在獨立的線程中海雪,終止訂閱事件時,我們也需要終止相應(yīng)運行的線程
        DisposableHelper.dispose(s);
        DisposableHelper.dispose(this);
    }

    @Override
    public boolean isDisposed() {
        return DisposableHelper.isDisposed(get());
    }

    //12舱殿、這個方法是在ObservableSubscribeOn類中的subscribeActual方法內(nèi)部調(diào)用的奥裸,
    //用于保存scheduler進行線程調(diào)度時返回的task(實際上返回的是DisposeTask類型的對象,它實現(xiàn)了Disposable接口沪袭,這個后面再討論)
    //這里是將scheduler線程調(diào)度時返回的task保存到自身
    //(SubscribeOnObserver繼承了AtomicReference<Disposable>類(繼承該類保證了線程安全)和Disposable接口)
    void setDisposable(Disposable d) {
        DisposableHelper.setOnce(this, d);
    }
}

我們來梳理下思路:

  • 1湾宙、當(dāng)我們調(diào)用subscribeOn(Schedulers.io())方法時,調(diào)用這個方法的對象實際上是Observable.create方法生成的一個新的observable對象(即ObservableCreate對象枝恋,該對象有個source成員變量创倔,用于保存上游傳遞過來的observable被觀察者,也就是事件源)焚碌;調(diào)用subscribeOn這個方法又會產(chǎn)生一個新的observable對象(通過new ObservableSubscribeOn<T>(this, scheduler)得到)畦攘,用于進一步的事件操作。
  • 2十电、步驟1中提到的ObservableSubscribeOn構(gòu)造方法第一個參數(shù)是ObservableSource接口類型(Observable實現(xiàn)了這個接口)的參數(shù)知押,該參數(shù)實際上接收的是步驟1中的ObservableCreate對象,并保存在其成員變量source中(類型為ObservableSource鹃骂,通過繼承AbstractObservableWithUpstream得到)台盯。
  • 3、在ObservableSubscribeOn類的subscribeActual(final Observer<? super T> s)方法中(這個方法接收的observer參數(shù)是從下游傳遞過來的)畏线,將外部傳進來的observer對象用SubscribeOnObserver內(nèi)部類包裝起來(保存在其成員變量actual中)得到該內(nèi)部類的一個對象parent(通過SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s)静盅,這個內(nèi)部類主要用于輔助訂閱線程切換)。
  • 4寝殴、然后通過s.onSubscribe(parent)建立訂閱關(guān)系蒿叠,最后執(zhí)行parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)))明垢,通過scheduler.scheduleDirect(new SubscribeTask(parent))完成線程切換。(備注:SubscribeTask是一個Runnable市咽,這里是將parent傳入到SubscribeTask的構(gòu)造方法中痊银,并在SubscribeTask的run()方法中執(zhí)行訂閱(source.subscribe(parent)))。

至此施绎,訂閱線程的切換流程就介紹完了溯革,本文也就告一段落。下面谷醉,還有幾個細(xì)節(jié)致稀,如果只關(guān)注訂閱線程切換流程的話,可以結(jié)束本文的閱讀了俱尼。


留下的幾個問題:

調(diào)用subscribeOn(Schedulers.io())這個傳入的Schedulers.io()到底是啥豺裆?

我們點進去看下代碼:

/**
 * Returns a default, shared {@link Scheduler} instance intended for IO-bound work.
 * ......已省略
 * @return a {@link Scheduler} meant for IO-bound work
 */
@NonNull
public static Scheduler io() {
     return RxJavaPlugins.onIoScheduler(IO);
}

看注釋,我們了解到這個方法是返回一個綁定在IO操作的默認(rèn)共享實例号显,返回類型是Scheduler類型。RxJavaPlugins我們前面提到過躺酒,它的作用是返回一個鉤子押蚤,這里返回的是一個常量IO,這個IO主要負(fù)責(zé)網(wǎng)絡(luò)通信任務(wù)羹应。其實Schedulers類內(nèi)部聲明了很多個常量揽碘,我們大致看下:

public final class Schedulers {
    @NonNull
    static final Scheduler SINGLE;

    @NonNull
    static final Scheduler COMPUTATION;

    @NonNull
    static final Scheduler IO;

    @NonNull
    static final Scheduler TRAMPOLINE;

    @NonNull
    static final Scheduler NEW_THREAD;

    static final class SingleHolder {
        static final Scheduler DEFAULT = new SingleScheduler();
    }

    static final class ComputationHolder {
        static final Scheduler DEFAULT = new ComputationScheduler();
    }

    static final class IoHolder {
        static final Scheduler DEFAULT = new IoScheduler();
    }

    static final class NewThreadHolder {
        static final Scheduler DEFAULT = new NewThreadScheduler();
    }

    static {
        SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());

        COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());

        IO = RxJavaPlugins.initIoScheduler(new IOTask());

        TRAMPOLINE = TrampolineScheduler.instance();

        NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
    }
    //......代碼省略
}

Schedulers類一共內(nèi)置了5個Scheduler類型的常量,除了IO之外园匹,其他幾個也簡單介紹下:

  • SINGLE:創(chuàng)建一個共享的單一線程雳刺,所有工作均在這個線程里面執(zhí)行
  • COMPUTATION:創(chuàng)建一個指定數(shù)量線程的線程池,主要適用于計算密集型的任務(wù)
  • IO:創(chuàng)建一個預(yù)置一定數(shù)量線程的線程池裸违,主要用于IO密集型的任務(wù)
  • TRAMPOLINE:使用該種方式的任務(wù)會在當(dāng)前線程上運行掖桦,但并不會馬上執(zhí)行,任務(wù)會被保存在一個隊列中供汛,等當(dāng)前任務(wù)執(zhí)行完后再從隊列中把該任務(wù)取出來并執(zhí)行枪汪。
  • NEW_THREAD:直接啟動一個新線程執(zhí)行指定任務(wù)。
  • 還有一種創(chuàng)建線程的方式:Scheduler from(@NonNull Executor executor)怔昨,這是由我們自己指定線程創(chuàng)建以及調(diào)度方式雀久。

上面六種方式常用的為IO和COMPUTATION,本文我們就分析下這個IO趁舀,COMPUTATION其實和IO是類似的赖捌,讀者可以自行分析。
我們看到這個常量IO執(zhí)行了靜態(tài)代碼初始化矮烹,初始化為一個IOTask對象:

static final class IOTask implements Callable<Scheduler> {
    @Override
    public Scheduler call() throws Exception {
        return IoHolder.DEFAULT;
    }
}

Callable和Runnable類似越庇,區(qū)別是使用Callable可以得到返回值罩锐,而使用Runnable則沒有返回值。
上面的代碼在call()方法內(nèi)返回了IoHolder.DEFAULT悦荒,最終返回一個IoScheduler類型的對象:

IO = RxJavaPlugins.initIoScheduler(new IOTask());

static final class IoHolder {
    static final Scheduler DEFAULT = new IoScheduler();
}

有人在這里可能就有疑問唯欣,Schedulers.io()返回的是一個Scheduler對象啊,這個IOTask的call()方法是什么時候執(zhí)行的呢搬味?
我們可以看到境氢,IO這個變量在聲明的時候它的類型就是Scheduler類型,因此可以想到它在初始化的時候必然是經(jīng)過了一些處理碰纬。我們看下IO的初始化代碼:

IO = RxJavaPlugins.initIoScheduler(new IOTask());

這里直接將IOTask對象作為參數(shù)傳遞給 RxJavaPlugins.initIoScheduler(@NonNull Callable<Scheduler> defaultScheduler)方法萍聊,這個方法正好接收一個Callable對象,看來call()方法應(yīng)該就是在這里面執(zhí)行的悦析,點進去看下:

@NonNull
public static Scheduler initIoScheduler(@NonNull Callable<Scheduler> defaultScheduler) {
    ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null");
    Function<? super Callable<Scheduler>, ? extends Scheduler> f = onInitIoHandler;
    if (f == null) {
        return callRequireNonNull(defaultScheduler);
    }
    return applyRequireNonNull(f, defaultScheduler);
}

我們看下這個方法寿桨,首先驗證傳入的參數(shù)不能為空,然后定義一個Function類型的臨時變量f强戴,并用同是類型為Function的成員變量onInitIoHandler為其賦值亭螟,然后判斷f==null是否成立,也就是onInitIoHandler==null是否成立骑歹。這里f==null是成立的预烙,原因是RxJavaPlugins內(nèi)部并沒有onInitIoHandler預(yù)初始化方法,只有與其相關(guān)的Getter和Setter方法道媚,而在本文開始到目前為止扁掸,onInitIoHandler的Setter方法也并沒有被調(diào)用過,因此onInitIoHandler為null最域,自然f==null也就為true谴分,因此調(diào)用callRequireNonNull(defaultScheduler)方法,我們點進這個方法看下:

static Scheduler callRequireNonNull(@NonNull Callable<Scheduler> s) {
    try {
        return ObjectHelper.requireNonNull(s.call(), "Scheduler Callable result can't be null");
    } catch (Throwable ex) {
        throw ExceptionHelper.wrapOrThrow(ex);
    }
}

終于镀脂,我們看到了call()方法的調(diào)用牺蹄。上面說到,在IOTask類中狗热,call()方法返回的是一個IoScheduler對象钞馁,因此結(jié)合前面的分析,這個IO在靜態(tài)初始化的時候就被初始化為一個IoScheduler對象匿刮。

這里還有一個問題僧凰,f==null這個條件什么時候不成立呢?當(dāng)然是我們想要自己處理傳入的Callable對象了熟丸。那要如何自己處理呢训措?我們自己聲明一個繼承自Function接口的處理類,并實現(xiàn)接口中的apply方法(當(dāng)然了這里面最終肯定是要調(diào)用call()方法返回Scheduler對象的,我們可以在返回之前做一些自己的處理)绩鸣,然后調(diào)用onInitIoHandler的靜態(tài)Setter方法(RxJavaPlugins類中的成員變量都是靜態(tài)的怀大,因此其Setter和Getter方法也都是靜態(tài)的)為onInitIoHandler賦值,此時由于onInitIoHandler是不為null的呀闻,因此f==null不成立化借,自然就執(zhí)行下面的方法,我們簡要的看下代碼:

@NonNull
static Scheduler applyRequireNonNull(@NonNull Function<? super Callable<Scheduler>, ? extends Scheduler> f, Callable<Scheduler> s) {
    //這里調(diào)用apply(f, s)捡多,f為我們自定義的處理類對象
    return ObjectHelper.requireNonNull(apply(f, s), "Scheduler Callable result can't be null");
}

@NonNull
static <T, R> R apply(@NonNull Function<T, R> f, @NonNull T t) {
    try {
        //調(diào)用f中的apply方法蓖康,也就是我們自己定義的處理方法,t為Callable對象
        return f.apply(t);
    } catch (Throwable ex) {
        throw ExceptionHelper.wrapOrThrow(ex);
    }
}

至此垒手,我們可以回答前面的問題了蒜焊,Schedulers.io()返回的就是一個IoScheduler對象,這個IoScheduler實際上就是在IO線程調(diào)度時用來管理IO線程的科贬。

scheduler.scheduleDirect(new SubscribeTask(parent))具體是如何完成線程調(diào)度的泳梆?

我們點進這個方法看一下:

@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
    return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}

直接無延遲調(diào)用了scheduleDirect方法

@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    //1、創(chuàng)建一個worker榜掌,其實現(xiàn)了Disposable接口优妙,是一個工作者類
    final Worker w = createWorker();
    //2、這里我們可能要對run參數(shù)做一些處理憎账,也可能不處理鳞溉,取決為我們是否為RxJavaPlugins類的成員變量onScheduleHandler賦值(自定義繼承Function的實現(xiàn)類處理run)。
    //一般情況下我們可以認(rèn)為decoratedRun=run鼠哥。
    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    //3、包裝類看政。DisposeTask實現(xiàn)了Disposable, Runnable, SchedulerRunnableIntrospection這三個接口朴恳,        
    //主要用于管理任務(wù)Runnable及其工作Worker
    DisposeTask task = new DisposeTask(decoratedRun, w);
    //4、開始執(zhí)行任務(wù)
    w.schedule(task, delay, unit);
    //5允蚣、返回這個管理者
    return task;
}

上面的方法將外面?zhèn)魅脒M來的run傳遞給了DisposeTask于颖,然后調(diào)用worker來進行任務(wù)調(diào)度,看起來主要的任務(wù)是在DisposeTask里面執(zhí)行的嚷兔,我看下DisposeTask這個類的代碼:

static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection {
    //6森渐、外部傳進來的Runnable任務(wù)
    final Runnable decoratedRun;
    //7、調(diào)度工作者worker
    final Worker w;
    //8冒晰、當(dāng)前線程
    Thread runner;

    DisposeTask(Runnable decoratedRun, Worker w) {
        this.decoratedRun = decoratedRun;
        this.w = w;
    }

    @Override
    public void run() {
        //9同衣、保存decoratedRun執(zhí)行時候所在的線程
        runner = Thread.currentThread();
        try {
            //10、執(zhí)行decoratedRun的run()方法
            decoratedRun.run();
        } finally {
            dispose();
            runner = null;
        }
    }

    @Override
    public void dispose() {
        if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
            //11壶运、當(dāng)DisposeTask被取消執(zhí)行任務(wù)時耐齐,如果取消任務(wù)時所在的線程和decoratedRun任務(wù)執(zhí)行時所在的線程為同一個線程,
            //并且調(diào)度工作者worker類型為NewThreadWorker類型,直接結(jié)束任務(wù)埠况,關(guān)閉底層執(zhí)行程序(這里是同步任務(wù)場景)
            ((NewThreadWorker)w).shutdown();
        } else {
            //12耸携、當(dāng)DisposeTask被取消執(zhí)行任務(wù)時,告訴worker工作者取消調(diào)度任務(wù)(IO異步任務(wù)場景)
            w.dispose();
        }
    }

    @Override
    public boolean isDisposed() {
        return w.isDisposed();
    }

    @Override
    public Runnable getWrappedRunnable() {
        return this.decoratedRun;
    }
}

具體的步驟都在上面的代碼中標(biāo)注了出來辕翰,我們可以看到夺衍,在DisposeTask的run方法中最終執(zhí)行了decoratedRun.run(),而這個decoratedRun也就是我們在ObservableSubscribeOn類的subscribeActual(final Observer<? super T> s)方法里面所執(zhí)行的代碼parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)))傳進來的new SubscribeTask(parent))對象喜命,通過本文前面的分析我們知道沟沙,SubscribeTask也是一個Runnable,實際上的訂閱source.subscribe(parent)正是在其run()方法中渊抄。

至此我們明白了scheduler.scheduleDirect(new SubscribeTask(parent))都做了哪些工作尝胆,還有一個問題,我們通過查看Worker類的源碼會發(fā)現(xiàn)這是一個抽象類护桦,而createWorker方法也是一個抽象方法含衔,我們在上面說到在IO線程調(diào)度時實際上管理IO線程的是IoScheduler,這個類也是繼承自Scheduler二庵,我們?nèi)oScheduler類看下createWorker方法:

public Worker createWorker() {
    return new EventLoopWorker(pool.get());
}

該方法直接返回了EventLoopWorker對象(管理Worker--負(fù)責(zé)任務(wù)調(diào)度的工作者贪染,CachedWorkerPool--管理Worker的一個隊列陪每,CompositeDisposable--批量管理訂閱狀態(tài)的disposable容器 ):

static final class EventLoopWorker extends Scheduler.Worker {
    private final CompositeDisposable tasks;
    private final CachedWorkerPool pool;
    private final ThreadWorker threadWorker;
    //保存訂閱狀態(tài)
    final AtomicBoolean once = new AtomicBoolean();

    EventLoopWorker(CachedWorkerPool pool) {
        this.pool = pool;
        this.tasks = new CompositeDisposable();
        this.threadWorker = pool.get();
    }

    //取消訂閱時械筛,釋放當(dāng)前的threadWorker 
    @Override
    public void dispose() {
        //如果成功的將disposed狀態(tài)設(shè)置為true,則取消訂閱铜跑,并將釋放當(dāng)前的threadWorker資源因妙,并將其添加到Worker管理池中
        if (once.compareAndSet(false, true)) {
            tasks.dispose();

            // releasing the pool should be the last action
            pool.release(threadWorker);
        }
    }

    @Override
    public boolean isDisposed() {
        return once.get();
    }

    @NonNull
    @Override
    public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
        if (tasks.isDisposed()) {
            // don't schedule, we are unsubscribed
            return EmptyDisposable.INSTANCE;
        }
        //實際線程調(diào)度工作
        return threadWorker.scheduleActual(action, delayTime, unit, tasks);
    }
}

雖然EventLoopWorker也繼承了Scheduler的內(nèi)部類Worker痰憎,但在scheduler方法中,把實際的線程調(diào)度工作轉(zhuǎn)發(fā)給了ThreadWorker去進行攀涵,ThreadWorker又繼承了NewThreadWorker類铣耘,我們點進scheduleActual(action, delayTime, unit, tasks)方法看下(該方法位于NewThreadWorker類中):

@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
    Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    //1、包裝類以故,管理任務(wù)線程蜗细,內(nèi)部實現(xiàn)了一系列方法,包括線程任務(wù)的執(zhí)行怒详,設(shè)置任務(wù)標(biāo)志位炉媒,任務(wù)的取消等。
    //這個parent參數(shù)就是我們平常使用的CompositeDisposable對象昆烁,負(fù)責(zé)訂閱批量管理
    ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

    if (parent != null) {
        if (!parent.add(sr)) {
            return sr;
        }
    }

    Future<?> f;
    try {
        //2吊骤、將任務(wù)線程ScheduledRunnable放入線程池執(zhí)行任務(wù),根據(jù)是否有延遲時間調(diào)用相應(yīng)的方法
        if (delayTime <= 0) {
            f = executor.submit((Callable<Object>)sr);
        } else {
            f = executor.schedule((Callable<Object>)sr, delayTime, unit);
        }
        //3静尼、設(shè)置ScheduledRunnable在線程池中的執(zhí)行狀態(tài)
        sr.setFuture(f);
    } catch (RejectedExecutionException ex) {
        if (parent != null) {
            //4水援、如果ScheduledRunnable在執(zhí)行過程中出錯密强,并且有設(shè)置訂閱狀態(tài)管理容器,將ScheduledRunnable的訂閱狀態(tài)從該容器中移除
            parent.remove(sr);
        }
        RxJavaPlugins.onError(ex);
    }
    //5蜗元、返回任務(wù)線程包裝類給外部或渤,用于該任務(wù)的管理
    return sr;
}

既然返回的是ScheduledRunnable對象,我們就來看下這個類:

//這里即實現(xiàn)了Runnable接口也實現(xiàn)了Callable接口奕扣,用于應(yīng)對需要返回值和不需要返回值的情況
public final class ScheduledRunnable extends AtomicReferenceArray<Object> implements Runnable, Callable<Object>, Disposable {

    private static final long serialVersionUID = -6120223772001106981L;
    final Runnable actual;

    /** Indicates that the parent tracking this task has been notified about its completion. */
    static final Object PARENT_DISPOSED = new Object();
    /** Indicates the dispose() was called from within the run/call method. */
    static final Object SYNC_DISPOSED = new Object();
    /** Indicates the dispose() was called from another thread. */
    static final Object ASYNC_DISPOSED = new Object();

    static final Object DONE = new Object();
    //保存追蹤該任務(wù)的外部任務(wù)狀態(tài)的標(biāo)識
    static final int PARENT_INDEX = 0;
    //保存該任務(wù)運行狀態(tài)的標(biāo)識
    static final int FUTURE_INDEX = 1;
    //保存其他線程像該任務(wù)發(fā)出的指令標(biāo)識
    static final int THREAD_INDEX = 2;

    /**
     * Creates a ScheduledRunnable by wrapping the given action and setting
     * up the optional parent.
     * @param actual the runnable to wrap, not-null (not verified)
     * @param parent the parent tracking container or null if none
     */
    public ScheduledRunnable(Runnable actual, DisposableContainer parent) {
        super(3);
        this.actual = actual;
        //如果我們在外部通過一個disposable容器來管理當(dāng)前任務(wù)薪鹦,就保存這個disposable容器
        this.lazySet(0, parent);
    }

    @Override
    public Object call() {
        // Being Callable saves an allocation in ThreadPoolExecutor
        run();
        return null;
    }

    @Override
    public void run() {
        //設(shè)置任務(wù)執(zhí)行線程為當(dāng)前所在線程
        lazySet(THREAD_INDEX, Thread.currentThread());
        try {
            try {
                //執(zhí)行實際IO任務(wù)
                actual.run();
            } catch (Throwable e) {
                // Exceptions.throwIfFatal(e); nowhere to go
                RxJavaPlugins.onError(e);
            }
        } finally {
            //任務(wù)執(zhí)行期間出現(xiàn)錯誤,重置當(dāng)前執(zhí)行線程為null
            lazySet(THREAD_INDEX, null);
            //獲取追蹤次任務(wù)的外部任務(wù)狀態(tài)
            Object o = get(PARENT_INDEX);
            if (o != PARENT_DISPOSED && compareAndSet(PARENT_INDEX, o, DONE) && o != null) {
                //如果追蹤該任務(wù)狀態(tài)的外部任務(wù)還沒有被終止惯豆,將它設(shè)置為DONE狀態(tài)池磁,并將當(dāng)前任務(wù)從訂閱狀態(tài)管理容器中刪除
                ((DisposableContainer)o).delete(this);
            }

            //任務(wù)在執(zhí)行出錯時,確保任務(wù)的執(zhí)行結(jié)果狀態(tài)為SYNC_DISPOSED楷兽,ASYNC_DISPOSED以及DONE中的任意一個即可
            for (;;) {
                o = get(FUTURE_INDEX);
                if (o == SYNC_DISPOSED || o == ASYNC_DISPOSED || compareAndSet(FUTURE_INDEX, o, DONE)) {
                    break;
                }
            }
        }
    }

    //設(shè)置Runnable任務(wù)的運行狀態(tài)
    public void setFuture(Future<?> f) {
        for (;;) {
            Object o = get(FUTURE_INDEX);
            if (o == DONE) {
                return;
            }
            if (o == SYNC_DISPOSED) {
                f.cancel(false);
                return;
            }
            if (o == ASYNC_DISPOSED) {
                f.cancel(true);
                return;
            }
            if (compareAndSet(FUTURE_INDEX, o, f)) {
                return;
            }
        }
    }

    //結(jié)束Runnable任務(wù)運行
    @Override
    public void dispose() {
        for (;;) {
            //獲取任務(wù)運行狀態(tài)
            Object o = get(FUTURE_INDEX);
            //如果任務(wù)執(zhí)行結(jié)果狀態(tài)為以下三個狀態(tài)中的任意一個地熄,表示任務(wù)已被結(jié)束,什么也不做
            if (o == DONE || o == SYNC_DISPOSED || o == ASYNC_DISPOSED) {
                break;
            }
            //判斷任務(wù)在執(zhí)行期間所在的線程以及執(zhí)行終止后所在的線程是否一致
            boolean async = get(THREAD_INDEX) != Thread.currentThread();
            //設(shè)置該任務(wù)是在同步環(huán)境下結(jié)束的還是在異步環(huán)境下結(jié)束的
            if (compareAndSet(FUTURE_INDEX, o, async ? ASYNC_DISPOSED : SYNC_DISPOSED)) {
                if (o != null) {
                    //取消任務(wù)的執(zhí)行
                    ((Future<?>)o).cancel(async);
                }
                break;
            }
        }
       
        for (;;) {
            //獲取追蹤該任務(wù)的外部任務(wù)所處狀態(tài)
            Object o = get(PARENT_INDEX);
            if (o == DONE || o == PARENT_DISPOSED || o == null) {
                //如果外部任務(wù)狀態(tài)是終止?fàn)顟B(tài)芯杀,則什么也不做
                return;
            }
            if (compareAndSet(PARENT_INDEX, o, PARENT_DISPOSED)) {
                //如果外部任務(wù)狀態(tài)不是終止?fàn)顟B(tài)端考,將其設(shè)置為終止?fàn)顟B(tài),并將當(dāng)前任務(wù)從狀態(tài)容器中移除
                ((DisposableContainer)o).delete(this);
                return;
            }
        }
    }

    //根據(jù)追蹤該任務(wù)的父任務(wù)狀態(tài)并根據(jù)父任務(wù)是否結(jié)束來判斷該任務(wù)是否結(jié)束
    @Override
    public boolean isDisposed() {
        Object o = get(PARENT_INDEX);
        return o == PARENT_DISPOSED || o == DONE;
    }
}

通過上面代碼我們可以看到揭厚,NewThreadWorker類內(nèi)部確實是通過線程池來執(zhí)行我們提交的IO任務(wù)却特,這個線程池是什么時候創(chuàng)建的呢?答案是在createWorker()方法調(diào)用的時候筛圆,即我們創(chuàng)建worker工作者對象時裂明。通過調(diào)用EventLoopWorker的構(gòu)造方法生成該對象并返回。

EventLoopWorker(CachedWorkerPool pool) {
    this.pool = pool;
    this.tasks = new CompositeDisposable();
    this.threadWorker = pool.get();
}

這個構(gòu)造方法接收一個CachedWorkerPool類型的參數(shù)太援,這個CachedWorkerPool并不是線程池闽晦,而是用來管理我們創(chuàng)建的Worker工作者。

static final class CachedWorkerPool implements Runnable {
    //空閑Worker在隊列中的存活時間
    private final long keepAliveTime;
    //線程安全的任務(wù)隊列提岔,用于保存處于空閑狀態(tài)的Worker工作者
    private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
    //Disposable容器尼荆,可以用于統(tǒng)一管理多個訂閱任務(wù)狀態(tài)
    final CompositeDisposable allWorkers;
    //可以實現(xiàn)循環(huán)或延時執(zhí)行任務(wù)的線程池
    private final ScheduledExecutorService evictorService;
    //保存延遲周期任務(wù)
    private final Future<?> evictorTask;
    //線程創(chuàng)建工廠方法
    private final ThreadFactory threadFactory;

    CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
        this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
        this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
        this.allWorkers = new CompositeDisposable();
        this.threadFactory = threadFactory;

        ScheduledExecutorService evictor = null;
        Future<?> task = null;
        //如果空閑隊列中的Worker有存活時間,則創(chuàng)建線程池和相應(yīng)的延遲周期任務(wù)
        //該類實現(xiàn)了Runnable接口唧垦,并在run()方法中調(diào)用方法evictExpiredWorkers(),用于移除空閑隊列中超過設(shè)置的存活時間的Worker
        if (unit != null) {
            //創(chuàng)建線程池
            evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
            //并將當(dāng)前Runnable任務(wù)對象(即CachedWorkerPool)添加到周期延遲執(zhí)行任務(wù)中液样,然后啟動線程池執(zhí)行該周期延遲任務(wù)振亮,最后返回執(zhí)行結(jié)果
            //由于run()方法中主要是為了在周期時間內(nèi)檢查Worker空閑隊列中緩存的Worker是否超過存活時間,因此此處周期延遲任務(wù)的延遲時間與Worker的存活時間一致
            task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
        }
        evictorService = evictor;
        evictorTask = task;
    }

    //CachedWorkerPool內(nèi)部的線程池啟動后開始執(zhí)行
    @Override
    public void run() {
        evictExpiredWorkers();
    }

    //從空閑隊列中取出一個緩存的工作者Worker
    ThreadWorker get() {
        if (allWorkers.isDisposed()) {
            return SHUTDOWN_THREAD_WORKER;
        }
        while (!expiringWorkerQueue.isEmpty()) {
            ThreadWorker threadWorker = expiringWorkerQueue.poll();
            if (threadWorker != null) {
                return threadWorker;
            }
        }

        //如果隊列中沒有緩存的Worker鞭莽,則新建一個坊秸,并將其添加到disposable容器中,便于訂閱任務(wù)的統(tǒng)一管理
        ThreadWorker w = new ThreadWorker(threadFactory);
        allWorkers.add(w);
        return w;
    }

    //釋放一個ThreadWorker澎怒,并為其設(shè)置存活時間褒搔,將其添加到空閑隊列中
    void release(ThreadWorker threadWorker) {
        // Refresh expire time before putting worker back in pool
        threadWorker.setExpirationTime(now() + keepAliveTime);
        expiringWorkerQueue.offer(threadWorker);
    }

    //移除空閑隊列中所有超過存活時間(keepAliveTime)的Worker
    void evictExpiredWorkers() {
        if (!expiringWorkerQueue.isEmpty()) {
            long currentTimestamp = now();

            for (ThreadWorker threadWorker : expiringWorkerQueue) {
                if (threadWorker.getExpirationTime() <= currentTimestamp) {
                    if (expiringWorkerQueue.remove(threadWorker)) {
                        allWorkers.remove(threadWorker);
                    }
                } else {
                    // Queue is ordered with the worker that will expire first in the beginning, so when we
                    // find a non-expired worker we can stop evicting.
                    break;
                }
            }
        }
    }
    //......代碼省略
}

在get()方法中,先檢查空閑隊列有沒有緩存的Worker,如果沒有則創(chuàng)建一個星瘾。

static final class ThreadWorker extends NewThreadWorker {
        private long expirationTime;

        ThreadWorker(ThreadFactory threadFactory) {
            super(threadFactory);
            this.expirationTime = 0L;
        }
        //......代碼省略
    }

ThreadWorker繼承自NewThreadWorker走孽,雖然它們的名字都帶了Thread,但它們并不是線程琳状,而是一個調(diào)度訂閱任務(wù)的工作者磕瓷,NewThreadWorker繼承自Scheduler內(nèi)的一個內(nèi)部類--Worker。

public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    private final ScheduledExecutorService executor;

    volatile boolean disposed;

    public NewThreadWorker(ThreadFactory threadFactory) {
        executor = SchedulerPoolFactory.create(threadFactory);
    }
    //......代碼省略
}

我們看到創(chuàng)建線程池的代碼了念逞,繼續(xù)點進去看下:

//SchedulerPoolFactory.create
public static ScheduledExecutorService create(ThreadFactory factory) {
    final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
    if (PURGE_ENABLED && exec instanceof ScheduledThreadPoolExecutor) {
        ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
        POOLS.put(e, exec);
    }
    return exec;
}

//Executors.newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(
        int corePoolSize, ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

//new ScheduledThreadPoolExecutor
public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE,
            DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
            new DelayedWorkQueue(), threadFactory);
}

這里創(chuàng)建了一個線程池困食,其核心線程數(shù)為1,可創(chuàng)建的最大線程為Integer.MAX_VALUE翎承,非核心線程在線程池中的存活時間為10硕盹,存活時間單位為MILLISECONDS,線程管理隊列為DelayedWorkQueue叨咖,線程創(chuàng)建的工廠類為我們外面?zhèn)魅氲墓S類(這里是RxJava默認(rèn)實現(xiàn)的工廠類==>RxThreadFactory)瘩例。
最后NewThreadWorker就是通過這里創(chuàng)建的線程池來執(zhí)行具體的IO任務(wù)==>SubscribeTask。

由上面代碼可以看到芒澜,每個Worker內(nèi)部都自己創(chuàng)建了一個獨立的線程池來執(zhí)行IO任務(wù)仰剿,這些Worker又都是通過CachedWorkerPool來統(tǒng)一管理的,那么這個CachedWorkerPool又是什么時候創(chuàng)建的呢痴晦?答案是在我們調(diào)用Schedulers.io()時南吮。
前面我們提到過,當(dāng)調(diào)用Schedulers.io()時誊酌,最終會調(diào)用IoScheduler類的構(gòu)造方法產(chǎn)生一個IoScheduler對象部凑。

public final class IoScheduler extends Scheduler {
    //......代碼省略
    private static final String WORKER_THREAD_NAME_PREFIX = "RxCachedThreadScheduler";
    static final RxThreadFactory WORKER_THREAD_FACTORY;
    static final CachedWorkerPool NONE;
    
static {
        //......代碼省略
        //創(chuàng)建線程的工廠類,為每個創(chuàng)建的線程名添加前綴(RxCachedThreadScheduler)
        WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority);
        //初始化一個沒有線程池的CachedWorkerPool碧浊,該Worker管理池內(nèi)空閑隊列中的Worker存活時間為0涂邀,Worker內(nèi)部線程池創(chuàng)建線程的工廠類為RxThreadFactory
        NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY);
        //清空CachedWorkerPool內(nèi)部狀態(tài)
        NONE.shutdown();
    }

    public IoScheduler() {
        this(WORKER_THREAD_FACTORY);
    }
    
    public IoScheduler(ThreadFactory threadFactory) {
        //創(chuàng)建線程的工廠類對象
        this.threadFactory = threadFactory;
        //包裝類,以保證多線程環(huán)境下CachedWorkerPool可以正常工作
        this.pool = new AtomicReference<CachedWorkerPool>(NONE);
        start();
    }

    public void start() {
        //新建一個有線程池以及Worker存活時間的CachedWorkerPool
        CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
        //這里通過AtomicReference的CAS方法來確保NONE能被正常更新為update
        if (!pool.compareAndSet(NONE, update)) {
            update.shutdown();
        }
    }

    //......代碼省略
}

至此:訂閱線程切換也就告一段落了箱锐,文章的后面還扯了很多題外話比勉,下一篇文章RxJava2筆記(四、觀察者線程切換)我們繼續(xù)介紹觀察者線程切換(即如何將線程由子線程切換回主線程從而進行UI更新)驹止。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末浩聋,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子臊恋,更是在濱河造成了極大的恐慌衣洁,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,427評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件抖仅,死亡現(xiàn)場離奇詭異坊夫,居然都是意外死亡砖第,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,551評論 3 395
  • 文/潘曉璐 我一進店門环凿,熙熙樓的掌柜王于貴愁眉苦臉地迎上來梧兼,“玉大人,你說我怎么就攤上這事拷邢「ぴ海” “怎么了?”我有些...
    開封第一講書人閱讀 165,747評論 0 356
  • 文/不壞的土叔 我叫張陵瞭稼,是天一觀的道長忽洛。 經(jīng)常有香客問我,道長环肘,這世上最難降的妖魔是什么欲虚? 我笑而不...
    開封第一講書人閱讀 58,939評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮悔雹,結(jié)果婚禮上复哆,老公的妹妹穿的比我還像新娘。我一直安慰自己腌零,他們只是感情好梯找,可當(dāng)我...
    茶點故事閱讀 67,955評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著益涧,像睡著了一般锈锤。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上闲询,一...
    開封第一講書人閱讀 51,737評論 1 305
  • 那天久免,我揣著相機與錄音,去河邊找鬼扭弧。 笑死阎姥,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的鸽捻。 我是一名探鬼主播呼巴,決...
    沈念sama閱讀 40,448評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼御蒲!你這毒婦竟也來了衣赶?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,352評論 0 276
  • 序言:老撾萬榮一對情侶失蹤删咱,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后豪筝,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體痰滋,經(jīng)...
    沈念sama閱讀 45,834評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡摘能,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,992評論 3 338
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了敲街。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片团搞。...
    茶點故事閱讀 40,133評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖多艇,靈堂內(nèi)的尸體忽然破棺而出逻恐,到底是詐尸還是另有隱情,我是刑警寧澤峻黍,帶...
    沈念sama閱讀 35,815評論 5 346
  • 正文 年R本政府宣布复隆,位于F島的核電站,受9級特大地震影響姆涩,放射性物質(zhì)發(fā)生泄漏挽拂。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,477評論 3 331
  • 文/蒙蒙 一骨饿、第九天 我趴在偏房一處隱蔽的房頂上張望亏栈。 院中可真熱鬧,春花似錦宏赘、人聲如沸绒北。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,022評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽闷游。三九已至,卻和暖如春箕母,著一層夾襖步出監(jiān)牢的瞬間储藐,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,147評論 1 272
  • 我被黑心中介騙來泰國打工嘶是, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留钙勃,地道東北人。 一個月前我還...
    沈念sama閱讀 48,398評論 3 373
  • 正文 我出身青樓聂喇,卻偏偏與公主長得像辖源,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子希太,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,077評論 2 355