- 首先創(chuàng)建原始被觀察者及觀察者接口
//被觀察者
interface ObservableSource<T> {
//訂閱
fun subscribe(observer: Observer<T>)
}
//觀察者
interface Observer<T> {
//訂閱
fun onSubscribe()
//事件發(fā)送
fun onNext(t: T)
//錯誤
fun onError(t:Throwable)
//事件完成
fun onComplete()
}
- 創(chuàng)建抽象的被觀察者
abstract class Observable<T> : ObservableSource<T> {
override fun subscribe(observer: Observer<T>) {
subscribeActual(observer);
}
//抽象訂閱方法词渤,這里會傳入觀察者的對象,交給數(shù)據(jù)發(fā)送者
protected abstract fun subscribeActual(observer: Observer<T>)
}
- 創(chuàng)建數(shù)據(jù)發(fā)送者接口
interface Emitter<T> {
fun onNext(t: T)
fun onError(t:Throwable)
fun onComplete()
}
- 創(chuàng)建數(shù)據(jù)發(fā)送者與被觀察者建立聯(lián)系的接口
interface ObservableOnSubscribe<T> {
//被訂閱者在此實現(xiàn)方法里發(fā)送數(shù)據(jù)
fun subscribe(emitter: Emitter<T>)
}
- 創(chuàng)建被觀察者實現(xiàn)類及數(shù)據(jù)發(fā)送者實現(xiàn)類
//這里構(gòu)造方法傳入了數(shù)據(jù)發(fā)送者與被觀察者建立聯(lián)系具體實現(xiàn)
class ObservableCreate<T>(var observableOnSubscribe: ObservableOnSubscribe<T>) : Observable<T>() {
//被觀察者訂閱觀察者時权纤,具體實現(xiàn)數(shù)據(jù)發(fā)送
override fun subscribeActual(observer: Observer<T>) {
val emitterCreate = EmitterCreate(observer)
observableOnSubscribe.subscribe(emitterCreate)
//觸發(fā)了onSubscribe
observer.onSubscribe()
}
//構(gòu)造方法傳入了觀察者
class EmitterCreate<T>(private val observer: Observer<T>) : Emitter<T> {
override fun onNext(t: T) {
//觀察者接收到被觀察者通過Emitter發(fā)送的數(shù)據(jù)
observer.onNext(t)
}
override fun onError(t: Throwable) {
observer.onError(t)
}
override fun onComplete() {
observer.onComplete()
}
}
}
這樣我們就完整創(chuàng)建出了:觀察者警医、被觀察者、數(shù)據(jù)發(fā)送者典徘、數(shù)據(jù)發(fā)送者與被觀察者聯(lián)系接口蟀苛。
我們在Observable里實現(xiàn)create(source: ObservableOnSubscribe<T>)創(chuàng)建一個ObservableCreate實例,并傳入一個ObservableOnSubscribe用作數(shù)據(jù)發(fā)送逮诲。
companion object {
fun <T> create(source: ObservableOnSubscribe<T>): Observable<T> {
return ObservableCreate(source)
}
}
Observable.create(object : ObservableOnSubscribe<String> {
override fun subscribe(emitter: Emitter<String>) {
emitter.onNext("HELLO")
}
}).subscribe(object : Observer<String> {
override fun onSubscribe() {
TODO("Not yet implemented")
}
override fun onNext(t: String) {
TODO("Not yet implemented")
}
override fun onError(t: Throwable) {
TODO("Not yet implemented")
}
override fun onComplete() {
TODO("Not yet implemented")
}
})
以上我們通過觀察者模式就簡易實現(xiàn)了帜平,觀察者及被觀察者數(shù)據(jù)發(fā)送的建立聯(lián)系幽告。
后續(xù)將繼續(xù)通過裝飾者模式實現(xiàn)map操作符及線程切換等操作,進一步理解Rxjava裆甩。