RxJava源碼分析-線程切換

RxJava源碼分析-線程切換

接著上篇分析筐摘,本篇我們來揭開RxJava線程切換的神秘面試,先上一段代碼

Observable.just("hello,world!")
        .map { res->
            Log.d("Observable", "thread:" + Thread.currentThread().name)
            res+"1234"
        }
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe({ res ->
            Log.d("Observable", "thread:" + Thread.currentThread().name)
            Log.d("Observable", "length:" + res.length)
        }, { e ->
            Log.d("Observable", "thread:" + Thread.currentThread().name)
            e.printStackTrace()
        }, {
            Log.d("Observable", "thread:" + Thread.currentThread().name)
            Log.d("Observable", "onComplete")
        },{
            Log.d("Observable", "thread:" + Thread.currentThread().name)
            Log.d("Observable", "onSubscribe")
        })

這段代碼執(zhí)行玩打印的log如下

05-21 10:45:06.109 17068-17068/com.example.pandaguo.rxdemo D/Observable: thread:main
    onSubscribe
05-21 10:45:06.115 17068-17086/com.example.pandaguo.rxdemo D/Observable: thread:RxCachedThreadScheduler-1
05-21 10:45:06.165 17068-17068/com.example.pandaguo.rxdemo D/Observable: thread:main
    length:16
    thread:main
    onComplete

可以看到其中在map操作符中執(zhí)行的代碼是在RxCachedThreadScheduler-1線程中執(zhí)行船老,而其余的均是在UI線程咖熟,為什么呢?

  • 本文的重點(diǎn)不在數(shù)據(jù)流向分析柳畔,因此前面幾個(gè)函數(shù)不在仔細(xì)分析
Observable.just("hello,world!")
        .map { res->
            Log.d("Observable", "thread:" + Thread.currentThread().name)
            res+"1234"
        }

代碼執(zhí)行到這里馍管,我們可以拿到經(jīng)過封裝的數(shù)據(jù)源ObservableMap,其實(shí)就是個(gè)Observable荸镊,那么接下來調(diào)用subscribeOn(Schedulers.io())來進(jìn)行線程切換的操作了咽斧,我們來一點(diǎn)點(diǎn)的分析,首先看下Schedulers.io()
是怎么創(chuàng)建一個(gè)Scheduler對(duì)象IO的

Schedulers.io()

public final class Schedulers {
    ...
    static final Scheduler IO;
    ...
    static {
        IO = RxJavaPlugins.initIoScheduler(new IOTask());
        ...
    }
    ...
    static final class IoHolder {
        static final Scheduler DEFAULT = new IoScheduler();
    }
    ...
    static final class IOTask implements Callable<Scheduler> {
        @Override
        public Scheduler call() throws Exception {
            return IoHolder.DEFAULT;
        }
    }
    ...
    public static Scheduler io() {
        return RxJavaPlugins.onIoScheduler(IO);
    }
    ...
}
  • 一部分代碼需要結(jié)合著RxJavaPlugins來一起看
public final class RxJavaPlugins {
    ...
    public static Scheduler initIoScheduler(@NonNull Callable<Scheduler> defaultScheduler) {
        ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null");
        //初始化時(shí)候 onInitIoHandler = null
        Function<? super Callable<Scheduler>, ? extends Scheduler> f = onInitIoHandler;
        if (f == null) {
            return callRequireNonNull(defaultScheduler);
        }
        return applyRequireNonNull(f, defaultScheduler);
    }
    ...
    static Scheduler callRequireNonNull(@NonNull Callable<Scheduler> s) {
        try {
            return ObjectHelper.requireNonNull(s.call(), "Scheduler Callable result can't be null");
        } catch (Throwable ex) {
            throw ExceptionHelper.wrapOrThrow(ex);
        }
    }
    ...
    public static Scheduler onIoScheduler(@NonNull Scheduler defaultScheduler) {
        //初始化時(shí)候onIoHandler = null
        Function<? super Scheduler, ? extends Scheduler> f = onIoHandler;
        if (f == null) {
            return defaultScheduler;
        }
        return apply(f, defaultScheduler);
    }
    ...
}
  • 上面代碼首先Schedulers.io()會(huì)調(diào)用RxJavaPlugins.onIoScheduler(IO)躬存,這里傳入的IO實(shí)際上是一早就初始化的RxJavaPlugins.initIoScheduler(new IOTask()), IOTaskSchedulers的一個(gè)靜態(tài)內(nèi)部類张惹,實(shí)現(xiàn)了Callable接口,并且在call()方法中返回了一個(gè)IoHolder.DEFAULT岭洲,這個(gè)IoHolder其實(shí)是一個(gè)Schedulers的靜態(tài)內(nèi)部類宛逗,然后默認(rèn)會(huì)持有一個(gè)IoScheduler對(duì)象DEFAULT
  • RxJavaPlugins.initIoScheduler(new IOTask())會(huì)調(diào)用到callRequireNonNull(),我們來看下這個(gè)方法回去調(diào)用s.call()盾剩,s的類型是IOTask
  • 說了那么多Schedulers.io()最終就是創(chuàng)建了一個(gè)類型為IoScheduler的對(duì)象雷激,我們先不去看IoScheduler的實(shí)現(xiàn),先來分析Observable.subscribeOn()的實(shí)現(xiàn)

Observable.subscribeOn()

public abstract class Observable<T> implements ObservableSource<T> {
    ...
     public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
    ...
}
  • 還是要結(jié)合RxJavaPlugins來一起看
public final class RxJavaPlugins {
    ...
    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }
    ...
}
  • 這里其實(shí)RxJavaPlugins.onAssembly()實(shí)際上就還是返回了傳入的參數(shù)告私,也就是創(chuàng)建拿到了一個(gè)ObservableSubscribeOn對(duì)象屎暇,那么線程切換的核心邏輯也就在這個(gè)類中實(shí)現(xiàn),接下來我們來分析這個(gè)類

ObservableSubscribeOn

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);
        
        //這里是線程切換的關(guān)鍵
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

        private static final long serialVersionUID = 8094547886072529208L;
        final Observer<? super T> actual;

        final AtomicReference<Disposable> s;

        SubscribeOnObserver(Observer<? super T> actual) {
            this.actual = actual;
            this.s = new AtomicReference<Disposable>();
        }

        @Override
        public void onSubscribe(Disposable s) {
            DisposableHelper.setOnce(this.s, s);
        }

        @Override
        public void onNext(T t) {
            actual.onNext(t);
        }

        @Override
        public void onError(Throwable t) {
            actual.onError(t);
        }

        @Override
        public void onComplete() {
            actual.onComplete();
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(s);
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }

        void setDisposable(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }
    }

    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            //訂閱實(shí)際發(fā)生的位置
            source.subscribe(parent);
        }
    }
}
  • 我們來看下subscribeActual的實(shí)現(xiàn)驻粟,s.onSubscribe(parent);執(zhí)行的時(shí)候我們并沒有看到線程切換的業(yè)務(wù)根悼,所以我們可以肯定Observ.onSubscribe()一定是在UI線程回調(diào)的,那么為什么map操作符中的邏輯是在另一個(gè)線程呢?parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));parent.setDisposable()從實(shí)現(xiàn)上來看是沒有做線程切換的邏輯挤巡,new SubscribeTask(parent)從實(shí)現(xiàn)上來看僅僅是讓訂閱的操作發(fā)生在SubscribeTask執(zhí)行的線程剩彬,等等我有一個(gè)大膽的想法 ,既然有Runnable對(duì)象了矿卑,那么scheduler.scheduleDirect()會(huì)不會(huì)就是實(shí)際上去切換線程的操作呢喉恋?我們來追下代碼

Scheduler.scheduleDirect()

public abstract class Scheduler {
    ...
    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }
    ...
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        DisposeTask task = new DisposeTask(decoratedRun, w);

        w.schedule(task, delay, unit);

        return task;
    }
    ...
    public abstract Worker createWorker();
    ...
}
  • Scheduler是一個(gè)抽象類!
  • 上面代碼可以看到母廷,實(shí)際上執(zhí)行的邏輯是在另一個(gè)重載方法scheduleDirect中轻黑,這里調(diào)用createWorker()創(chuàng)建了Worker對(duì)象w,然后調(diào)用了w.schedule(task, delay, unit);去實(shí)現(xiàn)了線程切換的邏輯
  • 啥徘意?你說這樣就實(shí)現(xiàn)了線程切換苔悦,我們不信!那我就證明給你看撒椎咧,還記得前面說得么Schedulers.io()最終創(chuàng)建了一個(gè)IoScheduler對(duì)象玖详,我們來看下它的定義

IOScheduler

public final class IoScheduler extends Scheduler {
    private static final String WORKER_THREAD_NAME_PREFIX = "RxCachedThreadScheduler";
    static final RxThreadFactory WORKER_THREAD_FACTORY;

    private static final String EVICTOR_THREAD_NAME_PREFIX = "RxCachedWorkerPoolEvictor";
    static final RxThreadFactory EVICTOR_THREAD_FACTORY;

    private static final long KEEP_ALIVE_TIME = 60;
    private static final TimeUnit KEEP_ALIVE_UNIT = TimeUnit.SECONDS;

    static final ThreadWorker SHUTDOWN_THREAD_WORKER;
    final ThreadFactory threadFactory;
    final AtomicReference<CachedWorkerPool> pool;
    
    /** The name of the system property for setting the thread priority for this Scheduler. */
    private static final String KEY_IO_PRIORITY = "rx2.io-priority";

    static final CachedWorkerPool NONE;
    static {
        SHUTDOWN_THREAD_WORKER = new ThreadWorker(new RxThreadFactory("RxCachedThreadSchedulerShutdown"));
        SHUTDOWN_THREAD_WORKER.dispose();

        int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
                Integer.getInteger(KEY_IO_PRIORITY, Thread.NORM_PRIORITY)));

        WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority);

        EVICTOR_THREAD_FACTORY = new RxThreadFactory(EVICTOR_THREAD_NAME_PREFIX, priority);

        NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY);
        NONE.shutdown();
    }
    
    static final class CachedWorkerPool implements Runnable {
        private final long keepAliveTime;
        private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
        final CompositeDisposable allWorkers;
        private final ScheduledExecutorService evictorService;
        private final Future<?> evictorTask;
        private final ThreadFactory threadFactory;

        CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
            this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
            this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
            this.allWorkers = new CompositeDisposable();
            this.threadFactory = threadFactory;

            ScheduledExecutorService evictor = null;
            Future<?> task = null;
            if (unit != null) {
                evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
                task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
            }
            evictorService = evictor;
            evictorTask = task;
        }

        @Override
        public void run() {
            evictExpiredWorkers();
        }

        ThreadWorker get() {
            if (allWorkers.isDisposed()) {
                return SHUTDOWN_THREAD_WORKER;
            }
            while (!expiringWorkerQueue.isEmpty()) {
                ThreadWorker threadWorker = expiringWorkerQueue.poll();
                if (threadWorker != null) {
                    return threadWorker;
                }
            }

            // No cached worker found, so create a new one.
            ThreadWorker w = new ThreadWorker(threadFactory);
            allWorkers.add(w);
            return w;
        }

        void release(ThreadWorker threadWorker) {
            // Refresh expire time before putting worker back in pool
            threadWorker.setExpirationTime(now() + keepAliveTime);

            expiringWorkerQueue.offer(threadWorker);
        }

        void evictExpiredWorkers() {
            if (!expiringWorkerQueue.isEmpty()) {
                long currentTimestamp = now();

                for (ThreadWorker threadWorker : expiringWorkerQueue) {
                    if (threadWorker.getExpirationTime() <= currentTimestamp) {
                        if (expiringWorkerQueue.remove(threadWorker)) {
                            allWorkers.remove(threadWorker);
                        }
                    } else {
                        // Queue is ordered with the worker that will expire first in the beginning, so when we
                        // find a non-expired worker we can stop evicting.
                        break;
                    }
                }
            }
        }

        long now() {
            return System.nanoTime();
        }

        void shutdown() {
            allWorkers.dispose();
            if (evictorTask != null) {
                evictorTask.cancel(true);
            }
            if (evictorService != null) {
                evictorService.shutdownNow();
            }
        }
    }
    
    public IoScheduler() {
        this(WORKER_THREAD_FACTORY);
    }

    public IoScheduler(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
        this.pool = new AtomicReference<CachedWorkerPool>(NONE);
        start();
    }

    @Override
    public void start() {
        CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
        if (!pool.compareAndSet(NONE, update)) {
            update.shutdown();
        }
    }
    @Override
    public void shutdown() {
        for (;;) {
            CachedWorkerPool curr = pool.get();
            if (curr == NONE) {
                return;
            }
            if (pool.compareAndSet(curr, NONE)) {
                curr.shutdown();
                return;
            }
        }
    }
    @Override
    public Worker createWorker() {
        return new EventLoopWorker(pool.get());
    }
    ...
    static final class EventLoopWorker extends Scheduler.Worker {
        private final CompositeDisposable tasks;
        private final CachedWorkerPool pool;
        private final ThreadWorker threadWorker;

        final AtomicBoolean once = new AtomicBoolean();

        EventLoopWorker(CachedWorkerPool pool) {
            this.pool = pool;
            this.tasks = new CompositeDisposable();
            this.threadWorker = pool.get();
        }

        @Override
        public void dispose() {
            if (once.compareAndSet(false, true)) {
                tasks.dispose();

                // releasing the pool should be the last action
                pool.release(threadWorker);
            }
        }

        @Override
        public boolean isDisposed() {
            return once.get();
        }

        @NonNull
        @Override
        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }

            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }
    }
    ...
}
  • 代碼比較長,我們對(duì)著Scheduler.scheduleDirect()的流程來看下IOScheduler勤讽,首先我們來看createWorker()
  • 可以看到這里是通過pool.get()獲取了一個(gè)CachedWorkerPool蟋座,這個(gè)pool是IOScheduler的成員變量是在構(gòu)造方法中進(jìn)行初始化的,它是一個(gè)AtomicReference能夠保證針對(duì)持有對(duì)象的原子操作,換句話說能夠保證線程的安全
public IoScheduler(ThreadFactory threadFactory) {
    this.threadFactory = threadFactory;
    this.pool = new AtomicReference<CachedWorkerPool>(NONE);
    start();
}
  • 默認(rèn)其持有的是一個(gè)NONE脚牍,是在靜態(tài)代碼塊中初始化完成的
static {
    ...
    NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY);
    NONE.shutdown();
}
  • 緊接著的start方法中會(huì)替換為新的值向臀,并且這個(gè)CachedWorkerPool處于非shutDown狀態(tài)
public void start() {
    CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
    if (!pool.compareAndSet(NONE, update)) {
        update.shutdown();
    }
}
  • CachedWorkerPool是什么我們后面再去分析,我們回到Scheduler.scheduleDirect()接著往下看诸狭,后續(xù)會(huì)將創(chuàng)建的Worker對(duì)象w與傳入的Runnable接口對(duì)象run封裝成一個(gè)DisposeTask對(duì)象task券膀,之后調(diào)用Worker對(duì)象w的schedule方法也就是EventLoopWorker的schedule方法
  • 這里說下DisposeTask其實(shí)就是代理了傳入的Runnable對(duì)象run,在其run()方法中會(huì)調(diào)用到傳入的Runnable對(duì)象的run方法
static final class DisposeTask implements Disposable, 
    Runnable, SchedulerRunnableIntrospection {
        final Runnable decoratedRun;
        ...
        DisposeTask(Runnable decoratedRun, Worker w) {
            this.decoratedRun = decoratedRun;
            ...
        }
        ...
        public void run() {
            runner = Thread.currentThread();
            try {
                decoratedRun.run();
            } finally {
                dispose();
                runner = null;
            }
        }
        ...
}
  • EventLoopWorker.schedule這里會(huì)調(diào)用threadWorker.scheduleActual(action, delayTime, unit, tasks)
public Disposable schedule(@NonNull Runnable action, long delayTime, 
    @NonNull TimeUnit unit) {
        ...
        return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
  • threadWorker是在EventLoopWorker的構(gòu)造方法中進(jìn)行初始化的
EventLoopWorker(CachedWorkerPool pool) {
    ...
    this.threadWorker = pool.get();
}
  • 可以看到就是從CachedWorkerPool中獲取的驯遇,我們來看下CachedWorkerPool的get()方法
static final class CachedWorkerPool implements Runnable {
    ...
    ThreadWorker get() {
        if (allWorkers.isDisposed()) {
            return SHUTDOWN_THREAD_WORKER;
        }
        while (!expiringWorkerQueue.isEmpty()) {
            ThreadWorker threadWorker = expiringWorkerQueue.poll();
            if (threadWorker != null) {
                return threadWorker;
            }
        }
        
        // No cached worker found, so create a new one.
        ThreadWorker w = new ThreadWorker(threadFactory);
        allWorkers.add(w);
        return w;
    }
    ...
}
  • 這段代碼其實(shí)就是先從緩存中看看能不能拿到一個(gè)已經(jīng)緩存下來的ThreadWorker芹彬,如果沒有就創(chuàng)建一個(gè)新的ThreadWorker對(duì)象并緩存起來,接下來我們看下ThreadWorker的實(shí)現(xiàn)

ThreadWorker

static final class ThreadWorker extends NewThreadWorker {
    private long expirationTime;

    ThreadWorker(ThreadFactory threadFactory) {
        super(threadFactory);
        this.expirationTime = 0L;
    }

    public long getExpirationTime() {
        return expirationTime;
    }

    public void setExpirationTime(long expirationTime) {
        this.expirationTime = expirationTime;
    }
}
  • 可以看到并沒有實(shí)現(xiàn)特殊的方法叉庐,所以其大部門實(shí)現(xiàn)都是在NewThreadWorker中的舒帮,回到前面分析的,我們說過EventLoopWorker.schedule會(huì)調(diào)用threadWorker.scheduleActual(action, delayTime, unit, tasks)陡叠,在NewThreadWorker中最終會(huì)調(diào)用到scheduleActual這個(gè)方法玩郊,我們來看下具體實(shí)現(xiàn)
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
    Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    //包裝一層傳入的參數(shù)run
    ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

    if (parent != null) {
        if (!parent.add(sr)) {
            return sr;
        }
    }

    Future<?> f;
    try {
        //通過線程池去執(zhí)行run
        if (delayTime <= 0) {
            f = executor.submit((Callable<Object>)sr);
        } else {
            f = executor.schedule((Callable<Object>)sr, delayTime, unit);
        }
        sr.setFuture(f);
    } catch (RejectedExecutionException ex) {
        if (parent != null) {
            parent.remove(sr);
        }
        RxJavaPlugins.onError(ex);
    }

    return sr;
}
  • 這里會(huì)將傳入的Runnable再做一層包裝,之后通過線程池去submit或者schedule執(zhí)行對(duì)應(yīng)的任務(wù)枉阵,這個(gè)Runnable對(duì)象就是前面我們分析的SubscribeTask译红,還記得SubscribeTask的run方法執(zhí)行了什么么?
    source.subscribe(parent);
  • 所以map操作符中所有的流程就是執(zhí)行在線程池之中
  • 對(duì)于ObservableOn()原理也是一樣的兴溜,只不過開源庫中通過Handler將獲取MainThread.Looper然后將其切回UI線程临庇,看客大佬們可以跟下源碼分析一波
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末反璃,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子假夺,更是在濱河造成了極大的恐慌,老刑警劉巖斋攀,帶你破解...
    沈念sama閱讀 216,372評(píng)論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件已卷,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡淳蔼,警方通過查閱死者的電腦和手機(jī)侧蘸,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來鹉梨,“玉大人讳癌,你說我怎么就攤上這事〈嬖恚” “怎么了晌坤?”我有些...
    開封第一講書人閱讀 162,415評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長旦袋。 經(jīng)常有香客問我骤菠,道長,這世上最難降的妖魔是什么疤孕? 我笑而不...
    開封第一講書人閱讀 58,157評(píng)論 1 292
  • 正文 為了忘掉前任商乎,我火速辦了婚禮,結(jié)果婚禮上祭阀,老公的妹妹穿的比我還像新娘鹉戚。我一直安慰自己,他們只是感情好专控,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,171評(píng)論 6 388
  • 文/花漫 我一把揭開白布抹凳。 她就那樣靜靜地躺著,像睡著了一般踩官。 火紅的嫁衣襯著肌膚如雪却桶。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,125評(píng)論 1 297
  • 那天蔗牡,我揣著相機(jī)與錄音颖系,去河邊找鬼。 笑死辩越,一個(gè)胖子當(dāng)著我的面吹牛嘁扼,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播黔攒,決...
    沈念sama閱讀 40,028評(píng)論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼趁啸,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼强缘!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起不傅,我...
    開封第一講書人閱讀 38,887評(píng)論 0 274
  • 序言:老撾萬榮一對(duì)情侶失蹤旅掂,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后访娶,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體商虐,經(jīng)...
    沈念sama閱讀 45,310評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,533評(píng)論 2 332
  • 正文 我和宋清朗相戀三年崖疤,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了秘车。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,690評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡劫哼,死狀恐怖叮趴,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情权烧,我是刑警寧澤眯亦,帶...
    沈念sama閱讀 35,411評(píng)論 5 343
  • 正文 年R本政府宣布,位于F島的核電站豪嚎,受9級(jí)特大地震影響搔驼,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜侈询,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,004評(píng)論 3 325
  • 文/蒙蒙 一舌涨、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧扔字,春花似錦囊嘉、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至震檩,卻和暖如春琢蛤,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背抛虏。 一陣腳步聲響...
    開封第一講書人閱讀 32,812評(píng)論 1 268
  • 我被黑心中介騙來泰國打工博其, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人迂猴。 一個(gè)月前我還...
    沈念sama閱讀 47,693評(píng)論 2 368
  • 正文 我出身青樓慕淡,卻偏偏與公主長得像,于是被迫代替她去往敵國和親沸毁。 傳聞我的和親對(duì)象是個(gè)殘疾皇子峰髓,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,577評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容

  • RxJava的被觀察者在使用操作符時(shí)可以利用線程調(diào)度器--Scheduler來切換線程傻寂,例如 被觀察者(Obser...
    fengzhizi715閱讀 4,938評(píng)論 0 8
  • 我從去年開始使用 RxJava ,到現(xiàn)在一年多了携兵。今年加入了 Flipboard 后疾掰,看到 Flipboard 的...
    Jason_andy閱讀 5,468評(píng)論 7 62
  • 前言我從去年開始使用 RxJava ,到現(xiàn)在一年多了徐紧。今年加入了 Flipboard 后个绍,看到 Flipboard...
    占導(dǎo)zqq閱讀 9,164評(píng)論 6 151
  • 9月26日杭州“明月入懷-中國團(tuán)扇文化印象特展”開幕,因?yàn)橛泄蕦m博物院館藏的宋徽宗《枇杷山鳥圖》浪汪、馬遠(yuǎn)《寒山子...
    東有喬木閱讀 402評(píng)論 1 2
  • 好像打我記事以來我已經(jīng)是一個(gè)在校生了,無論當(dāng)時(shí)是有多小凛虽,是一年級(jí)還是幼兒園死遭!雖然到了現(xiàn)在記憶還是有點(diǎn)模糊,不...
    鄒戒戒閱讀 544評(píng)論 0 9