RxJava->簡單的線程調度

一直覺得多線程是Android開發(fā)工程師的一個硬傷, 感覺一提到多線程就是Handler;

Example:

Observable
    .create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            e.onNext(1);
            LogUtils.log(Note01.class, "subscribe()->ThreadName:" + Thread.currentThread().getName());
        }
    })
    .subscribeOn(Schedulers.newThread())
    .subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            LogUtils.log(Note01.class, "onSubscribe()->ThreadName:" + Thread.currentThread().getName());
        }

        @Override
        public void onNext(Integer value) {
            LogUtils.log(Note01.class, "onNext()->ThreadName:" + Thread.currentThread().getName());
        }

        @Override
        public void onError(Throwable e) {
            LogUtils.log(Note01.class, "onError()->ThreadName:" + Thread.currentThread().getName());
        }

        @Override
        public void onComplete() {
            LogUtils.log(Note01.class, "onComplete()->ThreadName:" + Thread.currentThread().getName());
        }
    });
  • 打印結果
onSubscribe()->ThreadName:main
subscribe()->ThreadName:RxNewThreadScheduler-2
onNext()->ThreadName:RxNewThreadScheduler-2
onComplete()->ThreadName:RxNewThreadScheduler-2
  • 打印結果是onSubscribe()為主線程, subscribe, onNext, onComplete()均在子線程中調用;

目前有幾個疑問:

  • 1、如何創(chuàng)建線程;
  • 2戒劫、子線程切換到主線程時如何進行主子線程通信;
public final class Schedulers {
    public static Scheduler newThread() {
        return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
    }
    static final Scheduler NEW_THREAD;
    static {
        NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new Callable<Scheduler>() {
            @Override
            public Scheduler call() throws Exception {
                return NewThreadHolder.DEFAULT;
            }
        });
    }
    static final class NewThreadHolder {
        static final Scheduler DEFAULT = NewThreadScheduler.instance();
    }
}
public final class NewThreadScheduler extends Scheduler {
    private static final NewThreadScheduler INSTANCE = new NewThreadScheduler();
}
  • 1、.subscribeOn(Schedulers.newThread())里面?zhèn)魅氲腟chedule實際指向NewThreadScheduler;
  • 2敲霍、其內部實現(xiàn)等待后邊onXXX系列時繼續(xù)分析;
public abstract class Observable<T> implements ObservableSource<T> {
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        return new ObservableSubscribeOn<T>(this, scheduler);
    }
}
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }
}
  • 1快耿、創(chuàng)建ObservableSubscribeOn對象, 并將其引用賦給Observer;
  • 2彼水、將this即ObservableCreate引用賦給AbstractObservableWithUpstream中的ObservableSource;
  • 3条霜、ObservableSubscribeOn內部持有NewThreadScheduler的引用;
.subscribe(new Observer<Integer>() {...}
public abstract class Observable<T> implements ObservableSource<T> {
    @Override
    public final void subscribe(Observer<? super T> observer) {
        subscribeActual(observer);
    }
    protected abstract void subscribeActual(Observer<? super T> observer);
}
  • subscribeActual被子類ObservableSubscribeOn實現(xiàn):
public final class ObservableSubscribeOn<T> {
    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        s.onSubscribe(parent);
        parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
            @Override
            public void run() {
                source.subscribe(parent);
            }
        }));
    }
}
  • disposable指向SubscribeOnObserver;從代碼中可以看出s.onSubscribe(parent);還沒有創(chuàng)建任何線程, 印證了開始的打印結果;
  • 然后看下面代碼是如何創(chuàng)建子線程的;
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
    @Override
    public void run() {
        source.subscribe(parent);
    }
}));
public abstract class Scheduler {
    public Disposable scheduleDirect(Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }
    public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
        final Worker w = createWorker();
        final Runnable decoratedRun = run;
        w.schedule(new Runnable() {
            @Override
            public void run() {
                try {
                    decoratedRun.run();
                } finally {
                    w.dispose();
                }
            }
        }, delay, unit);
        return w;
    }
    public abstract Worker createWorker();
}
  • 前邊提到過Scheduler由NewThreadScheduler實現(xiàn):
public final class NewThreadScheduler extends Scheduler {
    @Override
    public Worker createWorker() {
        return new NewThreadWorker(THREAD_FACTORY);
    }
}
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    private final ScheduledExecutorService executor;
    public NewThreadWorker(ThreadFactory threadFactory) {
        executor = SchedulerPoolFactory.create(threadFactory);
    }
}
  • 下邊看看是如何創(chuàng)建線程:
public interface ThreadFactory {
    Thread newThread(Runnable r);
}
public final class NewThreadScheduler extends Scheduler {
    private static final RxThreadFactory THREAD_FACTORY;
    static {
        THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority);
    }
}
public final class RxThreadFactory extends AtomicLong implements ThreadFactory {
    @Override
    public Thread newThread(Runnable r) {
        StringBuilder nameBuilder = new StringBuilder(prefix).append('-').append(incrementAndGet());
        Thread t = new Thread(r, nameBuilder.toString());
        t.setPriority(priority);
        t.setDaemon(true);
        return t;
    }
}
  • 內部創(chuàng)建線程, 并為線程賦別名;
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    private final ScheduledExecutorService executor;
    public NewThreadWorker(ThreadFactory threadFactory) {
        executor = SchedulerPoolFactory.create(threadFactory);
    }
    @Override
    public Disposable schedule(final Runnable run) {
        return schedule(run, 0, null);
    }

    @Override
    public Disposable schedule(final Runnable action, long delayTime, TimeUnit unit) {
        return scheduleActual(action, delayTime, unit, null);
    }

    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, TimeUnit unit, DisposableContainer parent) {
        ScheduledRunnable sr = new ScheduledRunnable(run, parent);
        Future<?> f = executor.submit((Callable<Object>)sr);
        sr.setFuture(f);
        return sr;
    }
}

executor.submit()->sr的call()方法執(zhí)行:

public final class ScheduledRunnable extends AtomicReferenceArray<Object>
implements Runnable, Callable<Object>, Disposable {
    final Runnable actual;
    public ScheduledRunnable(Runnable actual, DisposableContainer parent) {
        super(2);
        this.actual = actual;
    }
    @Override
    public Object call() {
        // Being Callable saves an allocation in ThreadPoolExecutor
        run();
        return null;
    }
    @Override
    public void run() {
        try {
            try {
                actual.run();
            } catch (Throwable e) {
                // Exceptions.throwIfFatal(e); nowhere to go
                RxJavaPlugins.onError(e);
            }
        } finally {
            Object o = get(PARENT_INDEX);
            if (o != DISPOSED && o != null && compareAndSet(PARENT_INDEX, o, DONE)) {
                ((DisposableContainer)o).delete(this);
            }

            for (;;) {
                o = get(FUTURE_INDEX);
                if (o == DISPOSED || compareAndSet(FUTURE_INDEX, o, DONE)) {
                    break;
                }
            }
        }
    }
}
  • 此處的actual即為我們在ObservableSubscribeOn中new出來的Runnable;
  • 下一篇嘗試分析主子線程切換, 這兩篇文章分析完以后會切換回來去分析Atomic系列, Executor系列以及適配器模式, 代理模式, 裝飾模式
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末间雀,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子镊屎,更是在濱河造成了極大的恐慌惹挟,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,695評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件缝驳,死亡現(xiàn)場離奇詭異连锯,居然都是意外死亡归苍,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,569評論 3 399
  • 文/潘曉璐 我一進店門运怖,熙熙樓的掌柜王于貴愁眉苦臉地迎上來拼弃,“玉大人,你說我怎么就攤上這事摇展∥茄酰” “怎么了?”我有些...
    開封第一講書人閱讀 168,130評論 0 360
  • 文/不壞的土叔 我叫張陵咏连,是天一觀的道長盯孙。 經常有香客問我,道長祟滴,這世上最難降的妖魔是什么振惰? 我笑而不...
    開封第一講書人閱讀 59,648評論 1 297
  • 正文 為了忘掉前任,我火速辦了婚禮垄懂,結果婚禮上骑晶,老公的妹妹穿的比我還像新娘。我一直安慰自己草慧,他們只是感情好桶蛔,可當我...
    茶點故事閱讀 68,655評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著冠蒋,像睡著了一般羽圃。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上抖剿,一...
    開封第一講書人閱讀 52,268評論 1 309
  • 那天朽寞,我揣著相機與錄音,去河邊找鬼斩郎。 笑死脑融,一個胖子當著我的面吹牛,可吹牛的內容都是我干的缩宜。 我是一名探鬼主播肘迎,決...
    沈念sama閱讀 40,835評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼锻煌!你這毒婦竟也來了妓布?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 39,740評論 0 276
  • 序言:老撾萬榮一對情侶失蹤宋梧,失蹤者是張志新(化名)和其女友劉穎匣沼,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體捂龄,經...
    沈念sama閱讀 46,286評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡释涛,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 38,375評論 3 340
  • 正文 我和宋清朗相戀三年加叁,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片唇撬。...
    茶點故事閱讀 40,505評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡它匕,死狀恐怖,靈堂內的尸體忽然破棺而出窖认,到底是詐尸還是另有隱情豫柬,我是刑警寧澤,帶...
    沈念sama閱讀 36,185評論 5 350
  • 正文 年R本政府宣布耀态,位于F島的核電站轮傍,受9級特大地震影響,放射性物質發(fā)生泄漏首装。R本人自食惡果不足惜创夜,卻給世界環(huán)境...
    茶點故事閱讀 41,873評論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望仙逻。 院中可真熱鬧驰吓,春花似錦、人聲如沸系奉。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,357評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽缺亮。三九已至翁涤,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間萌踱,已是汗流浹背葵礼。 一陣腳步聲響...
    開封第一講書人閱讀 33,466評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留并鸵,地道東北人鸳粉。 一個月前我還...
    沈念sama閱讀 48,921評論 3 376
  • 正文 我出身青樓,卻偏偏與公主長得像园担,于是被迫代替她去往敵國和親届谈。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,515評論 2 359

推薦閱讀更多精彩內容