背景
文章通過(guò)接口層表象來(lái)實(shí)現(xiàn)一個(gè)簡(jiǎn)版且穩(wěn)定的線(xiàn)程調(diào)度庫(kù)刀荒,給予一個(gè)臺(tái)階首懈,當(dāng)你讀完文章的末尾看成,希望你有一探RxJava欲望與信心华畏。
目標(biāo)
- TaskScheduler.executeMain(...); //主線(xiàn)程, 執(zhí)行任務(wù)
- TaskScheduler.executeTask(...); //子線(xiàn)程, 線(xiàn)程池執(zhí)行任務(wù)
- TaskScheduler.executeSingle(...); //子線(xiàn)程, 單線(xiàn)程執(zhí)行任務(wù)
- TaskScheduler.create(...); //任務(wù)調(diào)度
項(xiàng)目
設(shè)計(jì)
- .func(...).func(...).func(...)...順序流執(zhí)行
- .observeOn(...)線(xiàn)程切換
效果圖
TaskScheduler.create(new Task<List<String>>() {
@Override
public List<String> run() {
...do something in io thread
return new ArrayList<>();
}
}).subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.map(new Function<List<String>, String>() {
@Override
public String apply(@NonNull List<String> strings) throws Exception {
...do something in new thread, such as time-consuming, map conversion, etc.
return "";
}
})
.observeOn(Schedulers.io())
.map(new Function<String, Boolean>() {
@Override
public Boolean apply(@NonNull String s) throws Exception {
...do something in io thread, such as time-consuming, map conversion, etc.
return true;
}
})
...
.observeOn(Schedulers.mainThread())
.subscribe(new Observer<Boolean>() {
@Override
public void onNext(@NonNull Boolean result) {
...do something in main thread
}
@Override
public void onError(Throwable e) {
...do something in main thread
}
});
分析
- 線(xiàn)程
- 線(xiàn)程切換
- 任務(wù)調(diào)度
1. 線(xiàn)程
public class TaskManager {
private static TaskManager ins;
private Handler mainHandler;
private ExecutorService cachedThreadPool;
private ExecutorService singleThreadExecutor;
private TaskManager() {
mainHandler = new Handler(Looper.getMainLooper());
cachedThreadPool = Executors.newCachedThreadPool();
singleThreadExecutor = Executors.newSingleThreadExecutor();
}
static TaskManager getIns() {
if (ins == null) {
synchronized (TaskManager.class) {
if (ins == null) {
ins = new TaskManager();
}
}
}
return ins;
}
/**
* Execute sync task in main thread
*/
void executeMain(Runnable runnable) { mainHandler.post(runnable); }
/**
* Execute async task in cached thread pool
*/
void executeTask(Runnable runnable) { cachedThreadPool.execute(runnable); }
/**
* Execute async task in single thread pool
*/
void executeSingle(Runnable runnable) { singleThreadExecutor.execute(runnable); }
/**
* Execute async task in a new thread
*/
void executeNew(Runnable runnable) { new Thread(runnable).start(); }
}
線(xiàn)程切換的方法:拋runnable到相應(yīng)線(xiàn)程鹏秋,由線(xiàn)程來(lái)調(diào)度執(zhí)行runnable,runnable中的方法即在相應(yīng)線(xiàn)程中執(zhí)行亡笑。
如無(wú)這樣的顯式切換線(xiàn)程侣夷,代碼流(無(wú)論多少次方法遞歸調(diào)用)將在當(dāng)前線(xiàn)程一直執(zhí)行下去。同一線(xiàn)程仑乌,代碼總是順序的執(zhí)行百拓。
Log.d("Current Thread", Thread.currentThread().getId() + "--NAME--" + Thread.currentThread().getName());
通過(guò)這行代碼可以打印出當(dāng)前在那一個(gè)線(xiàn)程琴锭。主線(xiàn)程的getName是main。
new Thread(() -> {
// Code block 1
Log.d("Current Thread", Thread.currentThread().getId() + "--NAME--" + Thread.currentThread().getName());
...
new Handler(Looper.getMainLooper()).post(new Runnable() {
@Override
public void run() {
// Code block 2
Log.d("Current Thread", Thread.currentThread().getId() + "--NAME--" + Thread.currentThread().getName());
...
}
});
}).start();
這是一個(gè)通常的代碼形式
Code block 1處在一個(gè)子線(xiàn)程中執(zhí)行代碼耐版,通過(guò)new Handler(Looper.getMainLooper()).post(...)向主線(xiàn)程拋入一個(gè)runnable祠够,runnable進(jìn)入主線(xiàn)程消息隊(duì)列压汪,然后等主線(xiàn)程消息隊(duì)列取出該runnable執(zhí)行時(shí)粪牲,Code line 2處代碼即在主線(xiàn)程中執(zhí)行。
Code block 1與Code block 2在時(shí)間上并行執(zhí)行止剖。線(xiàn)程池同理腺阳。
public class TaskScheduler<T> {
public static void executeMain(Runnable runnable) { TaskManager.getIns().executeMain(runnable); }
public static void executeTask(Runnable runnable) { TaskManager.getIns().executeTask(runnable); }
public static void executeSingle(Runnable runnable) { TaskManager.getIns().executeSingle(runnable); }
...
}
通過(guò)單例簡(jiǎn)單包裝,實(shí)現(xiàn)目標(biāo)1穿香、2亭引、3
2. 線(xiàn)程切換
/**
* Switch thread
* scheduler 線(xiàn)程枚舉,int類(lèi)型: defaultThread皮获、newThread焙蚓、io、mainThread
*/
public static void switchThread(@Scheduler int scheduler, final Runnable runnable) {
if (scheduler == NEW_THREAD) {
new Thread(() -> {
if (runnable != null) {
runnable.run();
}
}).start();
return;
} else if (scheduler == IO) {
TaskScheduler.executeTask(() -> {
if (runnable != null) {
runnable.run();
}
});
return;
} else if (scheduler == MAIN_THREAD) {
if (!isMainThread()) {
TaskScheduler.executeMain(() -> {
if (runnable != null) {
runnable.run();
}
});
return;
}
}
if (runnable != null) {
runnable.run();
}
}
public static boolean isMainThread() {
return Looper.getMainLooper().getThread() == Thread.currentThread();
}
3. 任務(wù)調(diào)度
3.1 開(kāi)始前的準(zhǔn)備
我們先來(lái)定義3個(gè)接口
然后是2個(gè)對(duì)應(yīng)的包裝類(lèi)洒宝,后面會(huì)用到
Task -> TaskEmitter
Function -> FunctionEmitter
public class Emitter {
public int scheduler;
}
public class TaskEmitter<T> extends Emitter {
public Task<T> task;
public TaskEmitter(Task<T> task, @Schedulers.Scheduler int scheduler) {
this.task = task;
this.scheduler = scheduler;
}
}
public class FunctionEmitter<T, R> extends Emitter {
public Function<? super T, ? extends R> function;
public FunctionEmitter(Function<? super T, ? extends R> function, @Schedulers.Scheduler int scheduler) {
this.function = function;
this.scheduler = scheduler;
}
}
3.2 Create
開(kāi)始前购公,我們知道一些開(kāi)源庫(kù)如Glide,慣用.with(...)形式雁歌,這種方式實(shí)質(zhì):靜態(tài)方法 + return new Instance()宏浩,
這里我們也用這種模式來(lái)開(kāi)始create(...)。
實(shí)現(xiàn)分三步走
Step 1: Create
public static <T> TaskScheduler<T> create(final Task<T> task) {
TaskScheduler<T> schedulers = new TaskScheduler<T>();
schedulers.task = task;
return schedulers;
}
創(chuàng)建TaskScheduler實(shí)例靠瞎,持有 源任務(wù)task
public TaskObserve<T> subscribeOn(@Schedulers.Scheduler int scheduler) {
this.subscribeScheduler = scheduler;
return new TaskObserve<T>(new TaskEmitter<T>(task, subscribeScheduler));
}
指定 源任務(wù)task
執(zhí)行所在線(xiàn)程比庄,丟棄當(dāng)前TaskScheduler實(shí)例。
源任務(wù)task
乏盐、 線(xiàn)程枚舉
注入TaskEmitter后佳窑,返回新的實(shí)例TaskObserve,后續(xù)邏輯全由TaskObserve處理
Step 2: TaskObserve中間件
public static class TaskObserve<T> {
private TaskEmitter taskEmitter;
private List<FunctionEmitter> emitters;
private int observeOnScheduler = Schedulers.defaultThread();
TaskObserve(TaskEmitter<T> taskEmitter) {
this.taskEmitter = taskEmitter;
this.emitters = new ArrayList<>();
}
...
}
TaskObserve
: 中間件
父能,初始和map轉(zhuǎn)換時(shí)生成华嘹,包含以下成員
taskEmitter
: 源任務(wù)
emitters
: 轉(zhuǎn)換隊(duì)列
,map轉(zhuǎn)換時(shí)遞增
observeOnScheduler
: 線(xiàn)程枚舉
法竞,observeOn觀察者所在線(xiàn)程耙厚,可重復(fù)調(diào)用,當(dāng)然只保留最后一次指定的線(xiàn)程
TaskObserve(TaskObserve middle) {
this.taskEmitter = middle.taskEmitter;
this.observeOnScheduler = middle.observeOnScheduler;
this.emitters = middle.emitters;
}
public <TR> TaskObserve<TR> map(Function<? super T, ? extends TR> f) {
this.emitters.add(new FunctionEmitter<T, TR>(f, observeOnScheduler));
return new TaskObserve<TR>(this);
}
map轉(zhuǎn)換時(shí)岔霸,將 轉(zhuǎn)換體Function
薛躬、當(dāng)前 線(xiàn)程枚舉
observeOnScheduler注入 FunctionEmitter
,添加到 轉(zhuǎn)換隊(duì)列
呆细。
返回新的實(shí)例TaskObserve型宝,丟棄當(dāng)前TaskObserve實(shí)例八匠,新實(shí)例線(xiàn)程枚舉observeOnScheduler默認(rèn)為默認(rèn)線(xiàn)程
Step 3: Subscribe,才是開(kāi)始E亢ā@媸鳌!
核心思想
- 先執(zhí)行
源任務(wù)
岖寞,返回值- 遞歸從
轉(zhuǎn)換隊(duì)列
取出FunctionEmitter
(含有轉(zhuǎn)換體抡四、線(xiàn)程枚舉),Schedulers.switchThread(...)指定線(xiàn)程執(zhí)行仗谆,轉(zhuǎn)換返回值轉(zhuǎn)換隊(duì)列
執(zhí)行盡指巡,提交任務(wù),任務(wù)結(jié)束
public void subscribe(final Observer<T> callback) {
// 指定源任務(wù)線(xiàn)程枚舉
Schedulers.switchThread(taskEmitter.scheduler, () -> {
try {
// 執(zhí)行源任務(wù)
Object t = taskEmitter.task.run();
// 轉(zhuǎn)換隊(duì)列是否為空
if (assertInterrupt(t)) {
// 轉(zhuǎn)換隊(duì)列空隶垮,提交本次任務(wù)藻雪,任務(wù)結(jié)束
submit(t, callback);
return;
}
// 轉(zhuǎn)換隊(duì)列不為空,繼續(xù)轉(zhuǎn)換
apply(t, emitters, callback);
} catch (Throwable e) {
// 任務(wù)流拋出異常狸吞,即時(shí)中斷勉耀,任務(wù)結(jié)束
error(e, callback);
}
});
}
private boolean assertInterrupt(Object emitter) throws Exception {
if (emitter == null) {
// 轉(zhuǎn)換返回值,不能為NullLF1愠狻!
throw new RuntimeException("Apply output must not be null!");
}
return emitters.size() <= 0;
}
assertInterrupt判斷當(dāng)前轉(zhuǎn)換隊(duì)列暖侨,是否執(zhí)行盡了
Step 3 - 1: Apply轉(zhuǎn)換隊(duì)列轉(zhuǎn)換
private <E, F> void apply(final E o, final List<FunctionEmitter> emitters, final Observer<F> callback) {
// 依次從轉(zhuǎn)換隊(duì)列取出FunctionEmitter椭住,然后移除
final FunctionEmitter<E, F> f = emitters.get(0);
emitters.remove(f);
// 指定當(dāng)前轉(zhuǎn)換線(xiàn)程枚舉
Schedulers.switchThread(f.scheduler, () -> {
try {
// 轉(zhuǎn)換,返回轉(zhuǎn)換值
Object emitter = f.function.apply(o);
// 轉(zhuǎn)換隊(duì)列是否為空
if (assertInterrupt(emitter)) {
// 轉(zhuǎn)換隊(duì)列空字逗,提交本次任務(wù)京郑,任務(wù)結(jié)束
submit(emitter, callback);
return;
}
// 轉(zhuǎn)換隊(duì)列不為空,繼續(xù)轉(zhuǎn)換
apply(emitter, emitters, callback);
} catch (Throwable e) {
// 任務(wù)流拋出異常葫掉,即時(shí)中斷些举,任務(wù)結(jié)束
error(e, callback);
}
});
}
Step 3 - 2: Submit提交
private <S> void submit(final Object result, final Observer<S> callback) {
// 指定當(dāng)前轉(zhuǎn)換線(xiàn)程枚舉,即當(dāng)前中間件線(xiàn)程枚舉observeOnScheduler
Schedulers.switchThread(observeOnScheduler, () -> {
try {
if (callback != null) {
// 成功俭厚,任務(wù)結(jié)束
callback.onNext((S) result);
}
} catch (Throwable e) {
error(e, callback);
}
});
}
private <S> void error(final Throwable e, final Observer<S> callback) {
// 指定當(dāng)前轉(zhuǎn)換線(xiàn)程枚舉户魏,即當(dāng)前中間件線(xiàn)程枚舉observeOnScheduler
Schedulers.switchThread(observeOnScheduler, () -> {
if (callback != null) {
// 出錯(cuò),任務(wù)結(jié)束
callback.onError(e);
}
});
}
小結(jié):
泛型: java泛型屬于類(lèi)型擦除挪挤,無(wú)論T叼丑、F還是R...,最終都是Object扛门。
設(shè)計(jì): 這里的 任務(wù)流
實(shí)現(xiàn)方式為遞歸嵌套調(diào)用鸠信。