觀察者模式
場景描述
先看一個關注微信公眾號的業(yè)務淫茵,關注完微信公眾號之后要進行如下操作:
1仍侥、記錄文本日志
2、記錄數(shù)據(jù)庫日志
3、發(fā)送短信
4谅猾、送優(yōu)惠券,積分等
5茁计、其他各類活動等
傳統(tǒng)解決方案:
在關注公眾號邏輯等類內(nèi)部增加相關代碼,完成各種邏輯怀骤。
存在問題:
1、一旦某個業(yè)務邏輯發(fā)生改變,如關注公眾號業(yè)務中增加其他業(yè)務邏輯,需要修改關注公眾號核心文件介返、甚至關注流程拴事。
2、日積月累后,文件冗長,導致后續(xù)維護困難圣蝎。
存在問題原因主要是程序的"緊密耦合",使用觀察模式可以將目前的業(yè)務邏輯優(yōu)化成"松耦合",達到易維護刃宵、易修改的目的,同時也符合面向接口編程的思想。
什么是觀察者模式
觀察者模式(Observer Pattern):定義對象間的一種一對多依賴關系徘公,使得每當一個對象狀態(tài)發(fā)生改變時组去,其相關依賴對象皆得到通知并被自動更新。觀察者模式又叫做發(fā)布-訂閱(Publish/Subscribe)模式步淹。觀察者模式是一種對象行為型模式从隆。
各個字段含義如下:
- Subject: 目標,抽象被觀察者缭裆。 抽象主題角色把所有觀察者對象保存在一個集合里键闺,每個主題都可以有任意數(shù)量的觀察者,抽象主題提供一個接口澈驼,可以增加和刪除觀察者對象辛燥。
- ConcreteSubject: 具體目標,具體的被觀察者。 該角色將有關狀態(tài)存入具體觀察者對象挎塌,在具體主題的內(nèi)部狀態(tài)發(fā)生改變時徘六,給所有注冊過的觀察者發(fā)送通知。
- Observer: 觀察者榴都。 是觀察者者的抽象類待锈,它定義了一個更新接口,使得在得到主題更改通知時更新自己嘴高。
- ConcreteObserver: 具體觀察者竿音。實現(xiàn)抽象觀察者定義的更新接口,以便在得到主題更改通知時更新自身的狀態(tài)拴驮。
觀察者模式的簡單實現(xiàn)
觀察者模式這種發(fā)布-訂閱的形式我們可以拿微信公眾號來舉例春瞬,假設微信用戶就是觀察者,微信公眾號是被觀察者套啤,有多個的微信用戶關注了程序猿這個公眾號宽气,當這個公眾號更新時就會通知這些訂閱的微信用戶∏甭伲看看用代碼如何實現(xiàn):
抽象觀察者(Observer)
//抽象觀察者
public interface Observer {
//收到訂閱消息執(zhí)行的操作
void update(String message);
}
具體觀察者(ConcrereObserver)
//記錄日志
public class LogObserver implements Observer{
@Override
public void update(String message) {
System.out.println("日志訂閱---"+message+"---開始記錄日志");
}
}
//優(yōu)惠券訂閱
public class TicketObserver implements Observer{
@Override
public void update(String message) {
System.out.println("優(yōu)惠券訂閱---"+message+"---開始發(fā)送優(yōu)惠券");
}
}
抽象被觀察者(Subject)
//抽象被觀察者
public interface Subject {
/**
- 增加訂閱者
- @param observer
*/
void attach(Observer observer);
/**
- 刪除訂閱者
- @param observer
*/
void detach(Observer observer);
/**
- 通知訂閱者
*/
void notify(String message);
}
具體被觀察者(ConcreteSubject)
//訂閱公眾號
public class SubscriptionSubject implements Subject {
//儲存訂閱公眾號的業(yè)務
private List<Observer> list= new ArrayList<Observer>();
@Override
public void attach(Observer observer) {
list.add(observer);
}
@Override
public void detach(Observer observer) {
list.remove(observer);
}
@Override
public void notify(String message) {
for (Observer observer : list) {
observer.update(message);
}
}
public void subscribe(){
System.out.println("執(zhí)行訂閱的具體操作");
notify("訂閱成功了");
}
}
客戶端調(diào)用
public class Client {
public static void main(String[] args) {
SubscriptionSubject subscriptionSubject = new SubscriptionSubject();
//添加日志訂閱
Observer logObserver = new LogObserver();
subscriptionSubject.attach(logObserver);
//添加優(yōu)惠券訂閱
TicketObserver ticketObserver = new TicketObserver();
subscriptionSubject.attach(ticketObserver);
//執(zhí)行訂閱的操作
subscriptionSubject.subscribe();
}
}
執(zhí)行結果
執(zhí)行訂閱的具體操作
日志訂閱---訂閱成功了---開始記錄日志
優(yōu)惠券訂閱---訂閱成功了---開始發(fā)送優(yōu)惠券
使用觀察者模式的場景和優(yōu)缺點
- 優(yōu)點
解除耦合抹竹,讓耦合的雙方都依賴于抽象,從而使得各自的變換都不會影響另一邊的變換止潮。
- 缺點
在應用觀察者模式時需要考慮一下開發(fā)效率和運行效率的問題窃判,程序中包括一個被觀察者、多個觀察者喇闸,開發(fā)袄琳、調(diào)試等內(nèi)容會比較復雜,而且在Java中消息的通知一般是順序執(zhí)行燃乍,那么一個觀察者卡頓唆樊,會影響整體的執(zhí)行效率,在這種情況下刻蟹,一般會采用異步實現(xiàn)逗旁。
java中的實現(xiàn)
JDK 提供了 一套 觀察者模式的實現(xiàn),在java.util包中舆瘪, java.util.Observable類和java.util.Observer接口片效。Observable是被觀察者,Observer是觀察者英古。
推模型和拉模型
在觀察者模式中淀衣,又分為推模型和拉模型兩種方式。
- 推模型
主題對象向觀察者推送主題的詳細信息召调,不管觀察者是否需要膨桥,推送的信息通常是主題對象的全部或部分數(shù)據(jù)蛮浑。
- 拉模型
主題對象在通知觀察者的時候,只傳遞少量信息只嚣。如果觀察者需要更具體的信息沮稚,由觀察者主動到主題對象中獲取,相當于是觀察者從主題對象中拉數(shù)據(jù)册舞。一般這種模型的實現(xiàn)中蕴掏,會把主題對象自身通過update()方法傳遞給觀察者,這樣在觀察者需要獲取數(shù)據(jù)的時候环础,就可以通過這個引用來獲取了囚似。
根據(jù)上面的描述剩拢,發(fā)現(xiàn)上面的例子就是典型的推模型线得,下面給出一個拉模型的實例。
拉模型的抽象主題類
拉模型的抽象主題類主要的改變是nodifyObservers()方法徐伐。在循環(huán)通知觀察者的時候贯钩,也就是循環(huán)調(diào)用觀察者的update()方法的時候,傳入的參數(shù)不同了办素。
public abstract class Subject {
/**
- 用來保存注冊的觀察者對象
*/
private List<Observer> list = new ArrayList<Observer>();
/**
- 注冊觀察者對象
@param observer 觀察者對象
*/
public void attach(Observer observer) {
list.add(observer);
System.out.println("Attached an observer");
}
/**
- 刪除觀察者對象
@param observer 觀察者對象
*/
public void detach(Observer observer) {
list.remove(observer);
}
/**
- 通知所有注冊的觀察者對象
*/
public void nodify() {
for (Observer observer : list) {
observer.update(this);
}
}
}
拉模型的具體主題類
跟推模型相比角雷,有一點變化,就是調(diào)用通知觀察者的方法的時候性穿,不需要傳入?yún)?shù)了勺三。
public class ConcreteSubject extends Subject {
private String state;
public String getState() {
return state;
}
public void change(String newState) {
state = newState;
System.out.println("主題狀態(tài)為:" + state);
//狀態(tài)發(fā)生改變,通知各個觀察者
this.nodify();
}
}
拉模型的抽象觀察者類
拉模型通常都是把主題對象當做參數(shù)傳遞需曾。
public interface Observer {
/**
- 更新接口
@param subject 傳入主題對象吗坚,方面獲取相應的主題對象的狀態(tài)
*/
void update(Subject subject);
}
拉模型的具體觀察者類
public class ConcreteObserver implements Observer {
//觀察者的狀態(tài)
private String observerState;
@Override
public void update(Subject subject) {
/**
- 更新觀察者的狀態(tài),使其與目標的狀態(tài)保持一致
*/
observerState = ((ConcreteSubject)subject).getState();
System.out.println("觀察者狀態(tài)為:"+observerState);
}
}
兩種模式的比較
推模型是假定主題對象知道觀察者需要的數(shù)據(jù)呆万;而拉模型是主題對象不知道觀察者具體需要什么數(shù)據(jù)商源,沒有辦法的情況下,干脆把自身傳遞給觀察者谋减,讓觀察者自己去按需要取值牡彻。
推模型可能會使得觀察者對象難以復用,因為觀察者的update()方法是按需要定義的參數(shù)出爹,可能無法兼顧沒有考慮到的使用情況庄吼。這就意味著出現(xiàn)新情況的時候,就可能提供新的 update()方法严就,或者是干脆重新實現(xiàn)觀察者霸褒;而拉模型就不會造成這樣的情況,因為拉模型下盈蛮,update()方法的參數(shù)是主題對象本身废菱,這基本上是主題對象能傳遞的最大數(shù)據(jù)集合了技矮,基本上可以適應各種情況的需要。
RxJava
場景描述
假設有這樣一個需求:界面上有一個自定義的視圖 imageCollectorView (圖片展示)殊轴,它的作用是顯示多張圖片衰倦,并能使用 addImage(Bitmap) 方法來任意增加顯示的圖片。現(xiàn)在需要程序將一個給出的目錄數(shù)組 File[] folders 中每個目錄下的 png 圖片都加載出來并顯示在 imageCollectorView 中旁理。由于讀取圖片的這一過程較為耗時樊零,需要放在后臺執(zhí)行,而圖片的顯示則必須在 UI 線程執(zhí)行孽文。常見的實現(xiàn)方式驻襟,在后臺開啟一個線程,執(zhí)行加載圖片的操作芋哭。代碼如下:
new Thread() {
@Override
public void run() {
super.run();
for (File folder : folders) {
File[] files = folder.listFiles();
for (File file : files) {
if (file.getName().endsWith(".png")) {
final Bitmap bitmap = getBitmapFromFile(file);
getActivity().runOnUiThread(new Runnable() {
@Override
public void run() {
imageCollectorView.addImage(bitmap);
}
});
}
}
}
}
}.start();
而如果使用 RxJava 沉衣,實現(xiàn)方式是這樣的:
Observable.from(folders)
.flatMap(new Func1<File, Observable<File>>() {
@Override
public Observable<File> call(File file) {
return Observable.from(file.listFiles());
}
})
.filter(new Func1<File, Boolean>() {
@Override
public Boolean call(File file) {
return file.getName().endsWith(".png");
}
})
.map(new Func1<File, Bitmap>() {
@Override
public Bitmap call(File file) {
return getBitmapFromFile(file);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) {
imageCollectorView.addImage(bitmap);
}
});
從中可以看出rxjava代碼邏輯的簡潔。rxjava代碼簡潔不是指代碼量少减牺,而是指代碼邏輯的簡潔豌习。
什么是RxJava
原文 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一個在 Java VM 上使用可觀測的序列來組成異步的、基于事件的程序的庫)拔疚。這就是 RxJava 肥隆,概括得非常精準。
其實稚失, RxJava 的本質可以壓縮為異步這一個詞栋艳。說到根上,它就是一個實現(xiàn)異步操作的庫句各。
RxJava的特點
異步操作很關鍵的一點是程序的簡潔性吸占,因為在調(diào)度過程比較復雜的情況下,異步代碼經(jīng)常會既難寫也難被讀懂诫钓。 Android 創(chuàng)造的 AsyncTask 和Handler 旬昭,其實都是為了讓異步代碼更加簡潔。RxJava 的優(yōu)勢也是簡潔菌湃,但它的簡潔的與眾不同之處在于问拘,隨著程序邏輯變得越來越復雜,它依然能夠保持簡潔惧所。
RxJava的 觀察者模式
RxJava 的異步實現(xiàn)骤坐,是通過一種擴展的觀察者模式來實現(xiàn)的。
RxJava 的觀察者模式大致如下圖:
RxJava 有四個基本概念:
- Observable (被觀察者) 產(chǎn)生事件
- Observer (觀察者) 接收事件下愈,并給出響應動作
- subscribe (訂閱) 連接 被觀察者 & 觀察者
- 事件 被觀察者 & 觀察者 溝通的載體
Observable 和 Observer 通過 subscribe() 方法實現(xiàn)訂閱關系纽绍,從而 Observable 可以在需要的時候發(fā)出事件來通知Observer。
與傳統(tǒng)觀察者模式不同势似, RxJava 的事件回調(diào)方法除了普通事件 onNext() (相當于 onClick() / onEvent())之外拌夏,還定義了兩個特殊的事件:onCompleted() 和 onError()僧著。
- onCompleted(): 事件隊列完結。RxJava 不僅把每個事件單獨處理障簿,還會把它們看做一個隊列盹愚。RxJava 規(guī)定,當不會再有新的 onNext() 發(fā)出時站故,需要觸發(fā) onCompleted() 方法作為標志皆怕。
- onError(): 事件隊列異常。在事件處理過程中出異常時西篓,onError() 會被觸發(fā)愈腾,同時隊列自動終止,不允許再有事件發(fā)出岂津。
在一個正確運行的事件序列中, onCompleted() 和 onError() 有且只有一個虱黄,并且是事件序列中的最后一個。需要注意的是寸爆,onCompleted() 和 onError() 二者也是互斥的礁鲁,即在隊列中調(diào)用了其中一個盐欺,就不應該再調(diào)用另一個赁豆。
RxJava的實現(xiàn)
基于以上的概念, RxJava 的基本實現(xiàn)主要有三點:
- 創(chuàng)建 觀察者Observer
Observer 即觀察者冗美,它決定事件觸發(fā)的時候將有怎樣的行為魔种。 RxJava 中的 Observer 接口的實現(xiàn)方式:
Observer<String> observer = new Observer<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};
- 創(chuàng)建 被觀察者Observable
Observable 即被觀察者,它決定什么時候觸發(fā)事件以及觸發(fā)怎樣的事件粉洼。 RxJava 使用 create() 方法來創(chuàng)建一個 Observable 节预,并為它定義事件觸發(fā)規(guī)則:
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("Hi");
subscriber.onNext("Aloha");
subscriber.onCompleted();
}
});
可以看到,這里傳入了一個 OnSubscribe 對象作為參數(shù)属韧。OnSubscribe 會被存儲在返回的 被觀察者Observable 對象中安拟,它的作用相當于一個計劃表,當被觀察者Observable 被訂閱的時候宵喂,OnSubscribe 的 call() 方法會自動被調(diào)用糠赦,事件序列就會依照設定依次觸發(fā)(對于上面的代碼,就是觀察者Subscriber 將會被調(diào)用三次 onNext() 和一次 onCompleted())锅棕。這樣拙泽,由被觀察者調(diào)用了觀察者的回調(diào)方法,就實現(xiàn)了由被觀察者向觀察者的事件傳遞裸燎,即觀察者模式顾瞻。
create() 方法是 RxJava 最基本的創(chuàng)造事件序列的方法荷荤⊥松基于這個方法氓辣, RxJava 還提供了一些方法用來快捷創(chuàng)建事件隊列袱蚓,例如:
- just(T...): 將傳入的參數(shù)依次發(fā)送出來。
Observable observable = Observable.just("Hello", "Hi", "Aloha");
// 將會依次調(diào)用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
- from(T[]) / from(Iterable<? extends T>) : 將傳入的數(shù)組或 Iterable 拆分成具體對象后喇潘,依次發(fā)送出來。
String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words);
// 將會依次調(diào)用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
- Subscribe (訂閱)
創(chuàng)建了被觀察者 Observable 和觀察者 Observer 之后絮吵,再用 subscribe() 方法將它們聯(lián)結起來忱屑,整條鏈子就可以工作了。代碼形式很簡單:
observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);
整個過程中對象間的關系如下圖:
基于事件流的鏈式調(diào)用
上述的實現(xiàn)方式是為了說明Rxjava的原理 & 使用
在實際應用中伴嗡,會將上述步驟&代碼連在一起从铲,從而更加簡潔瘪校、更加優(yōu)雅,即所謂的 RxJava基于事件流的鏈式調(diào)用
// RxJava的鏈式操作
Observable.create(new ObservableOnSubscribe<Integer>() {
// 1. 創(chuàng)建被觀察者 & 生產(chǎn)事件
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).subscribe(new Observer<Integer>() {
// 2. 通過通過訂閱(subscribe)連接觀察者和被觀察者
// 3. 創(chuàng)建觀察者 & 定義響應事件的行為
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開始采用subscribe連接");
}
// 默認最先調(diào)用復寫的 onSubscribe()
@Override
public void onNext(Integer value) {
Log.d(TAG, "對Next事件"+ value +"作出響應" );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應");
}
});
這種 基于事件流的鏈式調(diào)用名段,使得RxJava:
- 邏輯簡潔
- 實現(xiàn)優(yōu)雅
- 使用簡單
更重要的是阱扬,隨著程序邏輯的復雜性提高,它依然能夠保持簡潔 & 優(yōu)雅伸辟。所以麻惶,一般建議使用這種基于事件流的鏈式調(diào)用方式實現(xiàn)RxJava。
線程控制
在 RxJava 的默認規(guī)則中信夫,事件的發(fā)出和消費都是在同一個線程的窃蹋。也就是說,如果只用上面的方法忙迁,實現(xiàn)出來的只是一個同步的觀察者模式脐彩。觀察者模式本身的目的就是『后臺處理,前臺回調(diào)』的異步機制姊扔,因此異步對于 RxJava 是至關重要的惠奸。而要實現(xiàn)異步,則需要用到 RxJava 的另一個概念: Scheduler 恰梢。
在不指定線程的情況下佛南, RxJava 遵循的是線程不變的原則梗掰,即:在哪個線程調(diào)用 subscribe(),就在哪個線程生產(chǎn)事件嗅回;在哪個線程生產(chǎn)事件及穗,就在哪個線程消費事件。如果需要切換線程绵载,就需要用到 Scheduler (調(diào)度器)埂陆。
在RxJava 中,Scheduler ——調(diào)度器娃豹,相當于線程控制器焚虱,RxJava 通過它來指定每一段代碼應該運行在什么樣的線程懂版。
略...
感興趣可參考 給 Android 開發(fā)者的 RxJava 詳解
場景示例
-
需求場景
- 功能邏輯
- 代碼實現(xiàn)
public class RxJavaRetry {
private static final String TAG = "RxJava";
// 可重試次數(shù)
private int maxConnectCount = 10;
// 當前已重試次數(shù)
private int currentRetryCount = 0;
// 重試等待時間
private int waitRetryTime = 0;
@Override
protected void onCreate(Bundle savedInstanceState) {
//省略
// 步驟3:采用Observable<...>形式 對 網(wǎng)絡請求 進行封裝
Observable<Translation> observable = request.getCall();
// 步驟4:發(fā)送網(wǎng)絡請求 & 通過retryWhen()進行重試
// 注:主要異常才會回調(diào)retryWhen()進行重試
observable.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@NonNull Observable<Throwable> throwableObservable) throws Exception {
// 參數(shù)Observable<Throwable>中的泛型 = 上游操作符拋出的異常,可通過該條件來判斷異常的類型
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@NonNull Throwable throwable) throws Exception {
// 輸出異常信息
Log.d(TAG, "發(fā)生異常 = "+ throwable.toString());
/**
- 需求1:根據(jù)異常類型選擇是否重試
- 即,當發(fā)生的異常 = 網(wǎng)絡異常 = IO異常 才選擇重試
*/
if (throwable instanceof IOException){
Log.d(TAG, "屬于IO異常供嚎,需重試" );
/**
- 需求2:限制重試次數(shù)
- 即峭状,當已重試次數(shù) < 設置的重試次數(shù),才選擇重試
*/
if (currentRetryCount < maxConnectCount){
// 記錄重試次數(shù)
currentRetryCount++;
/**
- 需求2:實現(xiàn)重試
- 通過返回的Observable發(fā)送的事件 = Next事件胆敞,從而使得retryWhen()重訂閱移层,最終實現(xiàn)重試功能
需求3:延遲1段時間再重試
- 采用delay操作符 = 延遲一段時間發(fā)送观话,以實現(xiàn)重試間隔設置
需求4:遇到的異常越多频蛔,時間越長
- 在delay操作符的等待時間內(nèi)設置 = 每重試1次晦溪,增多延遲重試時間1s
*/
// 設置等待時間
waitRetryTime = 1000 + currentRetryCount* 1000;
return Observable.just(1).delay(waitRetryTime, TimeUnit.MILLISECONDS);
}else{
// 若重試次數(shù)已 > 設置重試次數(shù)狞换,則不重試
// 通過發(fā)送error來停止重試(可在觀察者的onError()中獲取信息)
return Observable.error(new Throwable("重試次數(shù)已超過設置次數(shù) = " +currentRetryCount + "哀澈,即 不再重試"));
}
}else{
// 若發(fā)生的異常不屬于I/O異常割按,則不重試
// 通過返回的Observable發(fā)送的事件 = Error事件 實現(xiàn)(可在觀察者的onError()中獲取信息)
return Observable.error(new Throwable("發(fā)生了非網(wǎng)絡異常(非I/O異常)"));
}
}
});
}
}).subscribeOn(Schedulers.io()) // 切換到IO線程進行網(wǎng)絡請求
.observeOn(AndroidSchedulers.mainThread()) // 切換回到主線程 處理請求結果
.subscribe(new Observer<Translation>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Translation result) {
// 接收服務器返回的數(shù)據(jù)
Log.d(TAG, "發(fā)送成功");
result.show();
}
@Override
public void onError(Throwable e) {
// 獲取停止重試的信息
Log.d(TAG, e.toString());
}
@Override
public void onComplete() {
}
});
}
}
更多場景可以參考 使用RxJava的最佳開發(fā)場景
HYSTRIX中的觀察者模式
Hystrix中的命令
Hystrix有兩個請求命令 HystrixCommand、HystrixObservableCommand院领。
HystrixCommand用在依賴服務返回單個操作結果的時候丈氓。又兩種執(zhí)行方式
- execute():同步執(zhí)行万俗。從依賴的服務返回一個單一的結果對象闰歪,或是在發(fā)生錯誤的時候拋出異常库倘。
- queue();異步執(zhí)行教翩。直接返回一個Future對象,其中包含了服務執(zhí)行結束時要返回的單一結果對象路捧。
HystrixObservableCommand 用在依賴服務返回多個操作結果的時候。它也實現(xiàn)了兩種執(zhí)行方式
- observe():返回Obervable對象队寇,他代表了操作的多個結果,他是一個HotObservable
- toObservable():同樣返回Observable對象零渐,也代表了操作多個結果诵盼,但它返回的是一個Cold Observable风宁。
在Hystrix的底層實現(xiàn)中大量使用了RxJava戒财。上面提到的Observable對象就是RxJava的核心內(nèi)容之一,可以把Observable對象理解為事件源或是被觀察者幽崩,與其對應的是Subscriber對象冈欢,可以理解為訂閱者或是觀察者凑耻。
- Observable用來向訂閱者Subscriber對象發(fā)布事件香浩,Subscriber對象在接收到事件后對其進行處理,這里所指的事件通常就是對依賴服務的調(diào)用囱晴。
- 一個Observable可以發(fā)出多個事件畸写,直到結束或是發(fā)生異常论笔。
- Observable對象每發(fā)出一個事件狂魔,就會調(diào)用對應觀察者Subscriber對象的onNext()方法最楷。
- 每一個Observable的執(zhí)行,最后一定會通過調(diào)用Subscriber.onCompleted()或是Subscriber.onError()來結束該事件的操作流蚯撩。
調(diào)用實例
public class HelloWorldHystrixObservableCommand extends HystrixObservableCommand<String> {
private final String name;
protected HelloWorldHystrixObservableCommand(String name) {
super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
this.name = name;
}
@Override
protected Observable<String> construct() {
System.out.println("in construct! thread:" + Thread.currentThread().getName());
return (Observable<String>) Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> observer) {
try {
System.out.println("in call of construct! thread:" + Thread.currentThread().getName());
if (!observer.isUnsubscribed()) {
// 直接拋異常退出,不會往下執(zhí)行
// observer.onError(getExecutionException());
observer.onNext("Hello1" + " thread:" + Thread.currentThread().getName());
observer.onNext("Hello2" + " thread:" + Thread.currentThread().getName());
observer.onNext(name + " thread:" + Thread.currentThread().getName());
System.out.println("complete before------" + " thread:" + Thread.currentThread().getName());
// 不會往下執(zhí)行observer的任何方法
observer.onCompleted();
System.out.println("complete after------" + " thread:" + Thread.currentThread().getName());
// 不會執(zhí)行到
observer.onCompleted();
// 不會執(zhí)行到
observer.onNext("abc");
}
} catch (Exception e) {
observer.onError(e);
}
}
});
}
public static void main(String[] args) {
Observable<String> observable = new HelloWorldHystrixObservableCommand("test").observe();
observable.subscribe(new Subscriber<String>() {
public void onCompleted() {
System.out.println("completed");
}
public void onError(Throwable throwable) {
System.out.println("error-----------" + throwable);
}
public void onNext(String v) {
System.out.println("next------------" + v);
}
});
}
}
執(zhí)行結果
in construct! thread:main
in call of construct! thread:main
complete before------ thread:main
complete after------ thread:main
next------------Hello1 thread:main
next------------Hello2 thread:main
next------------test thread:main
completed