對于Android開發(fā)來說, RxJava應(yīng)該是一個(gè)非常流行的框架了拓提。配合Retrofit處理網(wǎng)絡(luò)請求非常方便。最近正好有時(shí)間隧膘,學(xué)習(xí)了一下RxJava的原理代态,在此總結(jié)一下。
我理解的RxJava原理
RxJava的核心原理其實(shí)就是典型的響應(yīng)者模式疹吃,這并不難蹦疑,但是它很巧妙的把這種響應(yīng)者模式進(jìn)行了鏈?zhǔn)降倪B接,這應(yīng)該就是一種響應(yīng)式編程的方式了吧萨驶。我把整個(gè)學(xué)習(xí)過程分成了三步歉摧,一步一步解密RxJava(代碼盡量貼近RxJava源碼)。同時(shí)為了練習(xí)Kotlin,因此決定用Kotlin來進(jìn)行書寫叁温。
第一步:基礎(chǔ)
首先聲明一個(gè)觀察者接口Observer:
interface Observer <T>{
fun onComplete()
fun onError(t:Throwable)
fun onNext(t:T)
}
再聲明一個(gè)訂閱者Subscriber實(shí)現(xiàn)觀察者接口對觀察者進(jìn)行擴(kuò)展:
abstract class Subscriber<T>:Observer<T>{
/*省略了一大堆其它的方法豆挽,用這一個(gè)方法來代替它們*/
fun onStart() {}
}
觀察者齊全了,我們需要被觀察者Observable了(該類也是整個(gè)RxJava的核心所在券盅,所有的操作符,線程調(diào)度的邏輯其實(shí)都在這里)
class Observable <T> constructor(val onSubscribe: OnSubscribe<T>){
companion object{
/**
* 返回一個(gè)帶有OnSubscribe接口實(shí)例的Observable對象
* */
fun <T> create(onSubscribe: OnSubscribe<T>):Observable<T>{
return Observable(onSubscribe)
}
}
/**
*訂閱方法:實(shí)例化觀察者并且可以向該觀察者發(fā)送消息了
* in T表示泛型中的消費(fèi)者 相當(dāng)于Java中的 ? super T(T以及T的父類)
* out T表示泛型中的生產(chǎn)者 相當(dāng)于Java中的 ? extends T(T以及T的子類)
* */
fun subscribe(subscriber: Subscriber<in T>){
subscriber.onStart()
onSubscribe.call(subscriber)
}
//訂閱接口
interface OnSubscribe <T>{
/**
* 把觀察者傳入被觀察者中
* */
fun call(subscriber:Subscriber<in T>)
}
}
OK,RxJava的基礎(chǔ)分析結(jié)束了膛檀,就是這么多锰镀,現(xiàn)在可以進(jìn)行最基本的RxJava功能了
Observable.create(object : Observable.OnSubscribe<String>{//記住這種匿名內(nèi)部類寫法
override fun call(subscriber: Subscriber<in String>) {
subscriber.onNext("123456")
}
}).subscribe(object : Subscriber<String>(){
override fun onComplete() {
}
override fun onError(t: Throwable) {
}
override fun onNext(t: String) {
Toast.makeText(this@MainActivity,t,Toast.LENGTH_SHORT).show()
}
})
第二步:操作符 (第二步開始主要說明邏輯,齊全的項(xiàng)目代碼會在文后的鏈接中)
RxJava真正的魅力就在于它豐富的操作符和線程調(diào)度咖刃,現(xiàn)在我們來用map操作符舉例泳炉,看它是如何實(shí)現(xiàn)的:
interface Transformer<T,R>{//把T類型轉(zhuǎn)換成R類型的接口
fun transform(t:T):R
}
/*
* 每次的操作符都會默認(rèn)內(nèi)部生成一個(gè)觀察者
* 在call方法里去訂閱調(diào)用操作符的被觀察者,
* 并且返回一個(gè)新的被觀察者以讓下一個(gè)觀察者可以訂閱嚎杨,
* 這樣整條邏輯鏈就被連接起來了
*/
fun <R> map(transformer: Transformer<in T, out R>):Observable<R>{
return create(object :OnSubscribe<R>{
override fun call(subscriber: Subscriber<in R>) {
this@Observable.subscribe(object : Subscriber<T>(){//這行是操作符能全部連接起來的關(guān)鍵
override fun onComplete() {
}
override fun onError(t: Throwable) {
}
override fun onNext(t: T) {
subscriber.onNext(transformer.transform(t))
}
})
}
})
}
map操作符實(shí)現(xiàn)了花鹅。看枫浙!其實(shí)它不難刨肃,但是里面的思想很精華。接下來就要思考如果所有的操作符邏輯都寫在這個(gè)Observable類里箩帚,那這耦合性簡直了...所以我們需要解耦
/*MapOnSubscribe(代替OnSubscribe接口的功能) MapSubscriber(代替Map中
默認(rèn)生成的Subscriber對象)
*/
/**
* 解耦后的Map方法
* */
fun <R> map_(transformer: Transformer<T, R>):Observable<R>{
return create(MapOnSubscribe(this@Observable,transformer))
}
好的真友,Map分析完畢。接下來是RxJava的線程調(diào)度了......
第三步:線程調(diào)度
RxJava中的線程調(diào)度分為subscribeOn方法和observeOn兩種方法紧帕,subscribeOn作用于它前面的代碼邏輯盔然,且只有第一個(gè)有效,observeOn作用于它后面的代碼邏輯是嗜,每次的都有效愈案,當(dāng)然說了算的也肯定是最后一個(gè)observeOn方法嘛。
/**
* subscribeOn方法調(diào)度函數(shù)線程
* */
fun subscribeOn(scheduler: Scheduler):Observable<T>{
return create(object : OnSubscribe<T>{
override fun call(subscriber: Subscriber<in T>) {
/*這里this@Observable.subscribe()方法的subscriber參數(shù)就是最后的觀察者對象鹅搪。調(diào)用的任何一個(gè)subscribeOn方法
*都會把該對象往前(上)傳站绪,所以最終決定該對象到底在哪個(gè)線程執(zhí)行的subscribeOn
* 方法就是第一個(gè),因此其它的subscribeOn方法無效
* */
scheduler.createWorker().schedule(Runnable {this@Observable.subscribe(subscriber)})
}
})
}
/**
* observeOn方法調(diào)度函數(shù)線程
* */
fun observeOn(scheduler: Scheduler):Observable<T>{
return create(object : OnSubscribe<T>{
override fun call(subscriber: Subscriber<in T>) {
val worker = scheduler.createWorker()
/*注意這里this@Observable.subscribe的方法參數(shù)則是我們自己建立的Subscriber對象涩嚣,
實(shí)際的最后觀察者會在這里被切換到不同的線程中操作崇众。當(dāng)然有最后決定權(quán)的observeOn
就是最后一個(gè)observeOn方法。
* */
this@Observable.subscribe(object : Subscriber<T>(){
override fun onComplete() {
worker.schedule(java.lang.Runnable { subscriber.onComplete() })
}
override fun onError(t: Throwable) {
worker.schedule(java.lang.Runnable { subscriber.onError(t) })
}
override fun onNext(t: T) {
worker.schedule(java.lang.Runnable { subscriber.onNext(t) })
}
})
}
})
}
貼出的代碼中都有注釋說明航厚,所以也就不廢話了顷歌。這是最后帶有操作符和線程調(diào)度的調(diào)用方法:
Observable.create(object : Observable.OnSubscribe<Int>{
override fun call(subscriber: Subscriber<in Int>) {
subscriber.onNext(123456)
}
}).subscribeOn(Schedulers.io()).map_(object : Observable.Transformer<Int,String>{
override fun transform(t: Int): String {
return "發(fā)射消息"+t
}
}).observeOn(Schedulers.main()).subscribe(object : Subscriber<String>(){
override fun onComplete() {
}
override fun onError(t: Throwable) {
}
override fun onNext(t: String) {
Toast.makeText(this@MainActivity,t,Toast.LENGTH_SHORT).show()
}
})
最后附上項(xiàng)目的下載地址(項(xiàng)目不大,僅供學(xué)習(xí)):
https://github.com/hong890823/HongRxKotlin