服務降級熔斷 - 觀察者模式

觀察者模式

場景描述

先看一個關注微信公眾號的業(yè)務淫茵,關注完微信公眾號之后要進行如下操作:

1仍侥、記錄文本日志

2、記錄數(shù)據(jù)庫日志

3、發(fā)送短信

4谅猾、送優(yōu)惠券,積分等

5茁计、其他各類活動等

傳統(tǒng)解決方案:

在關注公眾號邏輯等類內(nèi)部增加相關代碼,完成各種邏輯怀骤。

存在問題:

1、一旦某個業(yè)務邏輯發(fā)生改變,如關注公眾號業(yè)務中增加其他業(yè)務邏輯,需要修改關注公眾號核心文件介返、甚至關注流程拴事。

2、日積月累后,文件冗長,導致后續(xù)維護困難圣蝎。

存在問題原因主要是程序的"緊密耦合",使用觀察模式可以將目前的業(yè)務邏輯優(yōu)化成"松耦合",達到易維護刃宵、易修改的目的,同時也符合面向接口編程的思想。

什么是觀察者模式

觀察者模式(Observer Pattern):定義對象間的一種一對多依賴關系徘公,使得每當一個對象狀態(tài)發(fā)生改變時组去,其相關依賴對象皆得到通知并被自動更新。觀察者模式又叫做發(fā)布-訂閱(Publish/Subscribe)模式步淹。觀察者模式是一種對象行為型模式从隆。

UML圖

各個字段含義如下:

  • Subject: 目標,抽象被觀察者缭裆。 抽象主題角色把所有觀察者對象保存在一個集合里键闺,每個主題都可以有任意數(shù)量的觀察者,抽象主題提供一個接口澈驼,可以增加和刪除觀察者對象辛燥。
  • ConcreteSubject: 具體目標,具體的被觀察者。 該角色將有關狀態(tài)存入具體觀察者對象挎塌,在具體主題的內(nèi)部狀態(tài)發(fā)生改變時徘六,給所有注冊過的觀察者發(fā)送通知。
  • Observer: 觀察者榴都。 是觀察者者的抽象類待锈,它定義了一個更新接口,使得在得到主題更改通知時更新自己嘴高。
  • ConcreteObserver: 具體觀察者竿音。實現(xiàn)抽象觀察者定義的更新接口,以便在得到主題更改通知時更新自身的狀態(tài)拴驮。

觀察者模式的簡單實現(xiàn)

觀察者模式這種發(fā)布-訂閱的形式我們可以拿微信公眾號來舉例春瞬,假設微信用戶就是觀察者,微信公眾號是被觀察者套啤,有多個的微信用戶關注了程序猿這個公眾號宽气,當這個公眾號更新時就會通知這些訂閱的微信用戶∏甭伲看看用代碼如何實現(xiàn):

抽象觀察者(Observer)

//抽象觀察者

public interface Observer {

//收到訂閱消息執(zhí)行的操作

void update(String message);

}

具體觀察者(ConcrereObserver)

//記錄日志

public class LogObserver implements Observer{

@Override

public void update(String message) {

System.out.println("日志訂閱---"+message+"---開始記錄日志");

}

}

//優(yōu)惠券訂閱

public class TicketObserver implements Observer{

@Override

public void update(String message) {

System.out.println("優(yōu)惠券訂閱---"+message+"---開始發(fā)送優(yōu)惠券");

}

}

抽象被觀察者(Subject)

//抽象被觀察者

public interface Subject {

/**

  • 增加訂閱者
  • @param observer

*/

void attach(Observer observer);

/**

  • 刪除訂閱者
  • @param observer

*/

void detach(Observer observer);

/**

  • 通知訂閱者

*/

void notify(String message);

}

具體被觀察者(ConcreteSubject)

//訂閱公眾號

public class SubscriptionSubject implements Subject {

//儲存訂閱公眾號的業(yè)務

private List<Observer> list= new ArrayList<Observer>();

@Override

public void attach(Observer observer) {

list.add(observer);

}

@Override

public void detach(Observer observer) {

list.remove(observer);

}

@Override

public void notify(String message) {

for (Observer observer : list) {

observer.update(message);

}

}

public void subscribe(){

System.out.println("執(zhí)行訂閱的具體操作");

notify("訂閱成功了");

}

}

客戶端調(diào)用

public class Client {

public static void main(String[] args) {

SubscriptionSubject subscriptionSubject = new SubscriptionSubject();

//添加日志訂閱

Observer logObserver = new LogObserver();

subscriptionSubject.attach(logObserver);

//添加優(yōu)惠券訂閱

TicketObserver ticketObserver = new TicketObserver();

subscriptionSubject.attach(ticketObserver);

//執(zhí)行訂閱的操作

subscriptionSubject.subscribe();

}

}

執(zhí)行結果

執(zhí)行訂閱的具體操作

日志訂閱---訂閱成功了---開始記錄日志

優(yōu)惠券訂閱---訂閱成功了---開始發(fā)送優(yōu)惠券

使用觀察者模式的場景和優(yōu)缺點

  • 優(yōu)點

解除耦合抹竹,讓耦合的雙方都依賴于抽象,從而使得各自的變換都不會影響另一邊的變換止潮。

  • 缺點

在應用觀察者模式時需要考慮一下開發(fā)效率和運行效率的問題窃判,程序中包括一個被觀察者、多個觀察者喇闸,開發(fā)袄琳、調(diào)試等內(nèi)容會比較復雜,而且在Java中消息的通知一般是順序執(zhí)行燃乍,那么一個觀察者卡頓唆樊,會影響整體的執(zhí)行效率,在這種情況下刻蟹,一般會采用異步實現(xiàn)逗旁。

java中的實現(xiàn)

JDK 提供了 一套 觀察者模式的實現(xiàn),在java.util包中舆瘪, java.util.Observable類和java.util.Observer接口片效。Observable是被觀察者,Observer是觀察者英古。

推模型和拉模型

在觀察者模式中淀衣,又分為推模型和拉模型兩種方式。

  • 推模型

主題對象向觀察者推送主題的詳細信息召调,不管觀察者是否需要膨桥,推送的信息通常是主題對象的全部或部分數(shù)據(jù)蛮浑。

  • 拉模型

主題對象在通知觀察者的時候,只傳遞少量信息只嚣。如果觀察者需要更具體的信息沮稚,由觀察者主動到主題對象中獲取,相當于是觀察者從主題對象中拉數(shù)據(jù)册舞。一般這種模型的實現(xiàn)中蕴掏,會把主題對象自身通過update()方法傳遞給觀察者,這樣在觀察者需要獲取數(shù)據(jù)的時候环础,就可以通過這個引用來獲取了囚似。

根據(jù)上面的描述剩拢,發(fā)現(xiàn)上面的例子就是典型的推模型线得,下面給出一個拉模型的實例。

拉模型的抽象主題類

拉模型的抽象主題類主要的改變是nodifyObservers()方法徐伐。在循環(huán)通知觀察者的時候贯钩,也就是循環(huán)調(diào)用觀察者的update()方法的時候,傳入的參數(shù)不同了办素。

public abstract class Subject {

/**

  • 用來保存注冊的觀察者對象

*/

private List<Observer> list = new ArrayList<Observer>();

/**

  • 注冊觀察者對象
  • @param observer 觀察者對象

*/

public void attach(Observer observer) {

list.add(observer);

System.out.println("Attached an observer");

}

/**

  • 刪除觀察者對象
  • @param observer 觀察者對象

*/

public void detach(Observer observer) {

list.remove(observer);

}

/**

  • 通知所有注冊的觀察者對象

*/

public void nodify() {

for (Observer observer : list) {

observer.update(this);

}

}

}

拉模型的具體主題類

跟推模型相比角雷,有一點變化,就是調(diào)用通知觀察者的方法的時候性穿,不需要傳入?yún)?shù)了勺三。

public class ConcreteSubject extends Subject {

private String state;

public String getState() {

return state;

}

public void change(String newState) {

state = newState;

System.out.println("主題狀態(tài)為:" + state);

//狀態(tài)發(fā)生改變,通知各個觀察者

this.nodify();

}

}

拉模型的抽象觀察者類

拉模型通常都是把主題對象當做參數(shù)傳遞需曾。

public interface Observer {

/**

  • 更新接口
  • @param subject 傳入主題對象吗坚,方面獲取相應的主題對象的狀態(tài)

*/

void update(Subject subject);

}

拉模型的具體觀察者類

public class ConcreteObserver implements Observer {

//觀察者的狀態(tài)

private String observerState;

@Override

public void update(Subject subject) {

/**

  • 更新觀察者的狀態(tài),使其與目標的狀態(tài)保持一致

*/

observerState = ((ConcreteSubject)subject).getState();

System.out.println("觀察者狀態(tài)為:"+observerState);

}

}

兩種模式的比較

  • 推模型是假定主題對象知道觀察者需要的數(shù)據(jù)呆万;而拉模型是主題對象不知道觀察者具體需要什么數(shù)據(jù)商源,沒有辦法的情況下,干脆把自身傳遞給觀察者谋减,讓觀察者自己去按需要取值牡彻。

  • 推模型可能會使得觀察者對象難以復用,因為觀察者的update()方法是按需要定義的參數(shù)出爹,可能無法兼顧沒有考慮到的使用情況庄吼。這就意味著出現(xiàn)新情況的時候,就可能提供新的 update()方法严就,或者是干脆重新實現(xiàn)觀察者霸褒;而拉模型就不會造成這樣的情況,因為拉模型下盈蛮,update()方法的參數(shù)是主題對象本身废菱,這基本上是主題對象能傳遞的最大數(shù)據(jù)集合了技矮,基本上可以適應各種情況的需要。

RxJava

場景描述

假設有這樣一個需求:界面上有一個自定義的視圖 imageCollectorView (圖片展示)殊轴,它的作用是顯示多張圖片衰倦,并能使用 addImage(Bitmap) 方法來任意增加顯示的圖片。現(xiàn)在需要程序將一個給出的目錄數(shù)組 File[] folders 中每個目錄下的 png 圖片都加載出來并顯示在 imageCollectorView 中旁理。由于讀取圖片的這一過程較為耗時樊零,需要放在后臺執(zhí)行,而圖片的顯示則必須在 UI 線程執(zhí)行孽文。常見的實現(xiàn)方式驻襟,在后臺開啟一個線程,執(zhí)行加載圖片的操作芋哭。代碼如下:

new Thread() {

@Override

public void run() {

super.run();

for (File folder : folders) {

File[] files = folder.listFiles();

for (File file : files) {

if (file.getName().endsWith(".png")) {

final Bitmap bitmap = getBitmapFromFile(file);

getActivity().runOnUiThread(new Runnable() {

@Override

public void run() {

imageCollectorView.addImage(bitmap);

}

});

}

}

}

}

}.start();

而如果使用 RxJava 沉衣,實現(xiàn)方式是這樣的:

Observable.from(folders)

.flatMap(new Func1<File, Observable<File>>() {

@Override

public Observable<File> call(File file) {

return Observable.from(file.listFiles());

}

})

.filter(new Func1<File, Boolean>() {

@Override

public Boolean call(File file) {

return file.getName().endsWith(".png");

}

})

.map(new Func1<File, Bitmap>() {

@Override

public Bitmap call(File file) {

return getBitmapFromFile(file);

}

})

.subscribeOn(Schedulers.io())

.observeOn(AndroidSchedulers.mainThread())

.subscribe(new Action1<Bitmap>() {

@Override

public void call(Bitmap bitmap) {

imageCollectorView.addImage(bitmap);

}

});

從中可以看出rxjava代碼邏輯的簡潔。rxjava代碼簡潔不是指代碼量少减牺,而是指代碼邏輯的簡潔豌习。

什么是RxJava

原文 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一個在 Java VM 上使用可觀測的序列來組成異步的、基于事件的程序的庫)拔疚。這就是 RxJava 肥隆,概括得非常精準。

其實稚失, RxJava 的本質可以壓縮為異步這一個詞栋艳。說到根上,它就是一個實現(xiàn)異步操作的庫句各。

RxJava的特點

異步操作很關鍵的一點是程序的簡潔性吸占,因為在調(diào)度過程比較復雜的情況下,異步代碼經(jīng)常會既難寫也難被讀懂诫钓。 Android 創(chuàng)造的 AsyncTask 和Handler 旬昭,其實都是為了讓異步代碼更加簡潔。RxJava 的優(yōu)勢也是簡潔菌湃,但它的簡潔的與眾不同之處在于问拘,隨著程序邏輯變得越來越復雜,它依然能夠保持簡潔惧所。

RxJava的 觀察者模式

RxJava 的異步實現(xiàn)骤坐,是通過一種擴展的觀察者模式來實現(xiàn)的。

RxJava 的觀察者模式大致如下圖:

RxJava 有四個基本概念:

  1. Observable (被觀察者) 產(chǎn)生事件
  2. Observer (觀察者) 接收事件下愈,并給出響應動作
  3. subscribe (訂閱) 連接 被觀察者 & 觀察者
  4. 事件 被觀察者 & 觀察者 溝通的載體

Observable 和 Observer 通過 subscribe() 方法實現(xiàn)訂閱關系纽绍,從而 Observable 可以在需要的時候發(fā)出事件來通知Observer。

與傳統(tǒng)觀察者模式不同势似, RxJava 的事件回調(diào)方法除了普通事件 onNext() (相當于 onClick() / onEvent())之外拌夏,還定義了兩個特殊的事件:onCompleted() 和 onError()僧著。

  • onCompleted(): 事件隊列完結。RxJava 不僅把每個事件單獨處理障簿,還會把它們看做一個隊列盹愚。RxJava 規(guī)定,當不會再有新的 onNext() 發(fā)出時站故,需要觸發(fā) onCompleted() 方法作為標志皆怕。
  • onError(): 事件隊列異常。在事件處理過程中出異常時西篓,onError() 會被觸發(fā)愈腾,同時隊列自動終止,不允許再有事件發(fā)出岂津。

在一個正確運行的事件序列中, onCompleted() 和 onError() 有且只有一個虱黄,并且是事件序列中的最后一個。需要注意的是寸爆,onCompleted() 和 onError() 二者也是互斥的礁鲁,即在隊列中調(diào)用了其中一個盐欺,就不應該再調(diào)用另一個赁豆。

RxJava的實現(xiàn)

基于以上的概念, RxJava 的基本實現(xiàn)主要有三點:

  1. 創(chuàng)建 觀察者Observer

Observer 即觀察者冗美,它決定事件觸發(fā)的時候將有怎樣的行為魔种。 RxJava 中的 Observer 接口的實現(xiàn)方式:

Observer<String> observer = new Observer<String>() {

@Override

public void onNext(String s) {

Log.d(tag, "Item: " + s);

}

@Override

public void onCompleted() {

Log.d(tag, "Completed!");

}

@Override

public void onError(Throwable e) {

Log.d(tag, "Error!");

}

};

  1. 創(chuàng)建 被觀察者Observable

Observable 即被觀察者,它決定什么時候觸發(fā)事件以及觸發(fā)怎樣的事件粉洼。 RxJava 使用 create() 方法來創(chuàng)建一個 Observable 节预,并為它定義事件觸發(fā)規(guī)則:

Observable observable = Observable.create(new Observable.OnSubscribe<String>() {

@Override

public void call(Subscriber<? super String> subscriber) {

subscriber.onNext("Hello");

subscriber.onNext("Hi");

subscriber.onNext("Aloha");

subscriber.onCompleted();

}

});

可以看到,這里傳入了一個 OnSubscribe 對象作為參數(shù)属韧。OnSubscribe 會被存儲在返回的 被觀察者Observable 對象中安拟,它的作用相當于一個計劃表,當被觀察者Observable 被訂閱的時候宵喂,OnSubscribe 的 call() 方法會自動被調(diào)用糠赦,事件序列就會依照設定依次觸發(fā)(對于上面的代碼,就是觀察者Subscriber 將會被調(diào)用三次 onNext() 和一次 onCompleted())锅棕。這樣拙泽,由被觀察者調(diào)用了觀察者的回調(diào)方法,就實現(xiàn)了由被觀察者向觀察者的事件傳遞裸燎,即觀察者模式顾瞻。

create() 方法是 RxJava 最基本的創(chuàng)造事件序列的方法荷荤⊥松基于這個方法氓辣, RxJava 還提供了一些方法用來快捷創(chuàng)建事件隊列袱蚓,例如:

  • just(T...): 將傳入的參數(shù)依次發(fā)送出來。

Observable observable = Observable.just("Hello", "Hi", "Aloha");

// 將會依次調(diào)用:

// onNext("Hello");

// onNext("Hi");

// onNext("Aloha");

// onCompleted();

  • from(T[]) / from(Iterable<? extends T>) : 將傳入的數(shù)組或 Iterable 拆分成具體對象后喇潘,依次發(fā)送出來。

String[] words = {"Hello", "Hi", "Aloha"};

Observable observable = Observable.from(words);

// 將會依次調(diào)用:

// onNext("Hello");

// onNext("Hi");

// onNext("Aloha");

// onCompleted();

  1. Subscribe (訂閱)

創(chuàng)建了被觀察者 Observable 和觀察者 Observer 之后絮吵,再用 subscribe() 方法將它們聯(lián)結起來忱屑,整條鏈子就可以工作了。代碼形式很簡單:

observable.subscribe(observer);

// 或者:

observable.subscribe(subscriber);

整個過程中對象間的關系如下圖:

基于事件流的鏈式調(diào)用

上述的實現(xiàn)方式是為了說明Rxjava的原理 & 使用

在實際應用中伴嗡,會將上述步驟&代碼連在一起从铲,從而更加簡潔瘪校、更加優(yōu)雅,即所謂的 RxJava基于事件流的鏈式調(diào)用

// RxJava的鏈式操作

Observable.create(new ObservableOnSubscribe<Integer>() {

// 1. 創(chuàng)建被觀察者 & 生產(chǎn)事件

@Override

public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {

emitter.onNext(1);

emitter.onNext(2);

emitter.onNext(3);

emitter.onComplete();

}

}).subscribe(new Observer<Integer>() {

// 2. 通過通過訂閱(subscribe)連接觀察者和被觀察者

// 3. 創(chuàng)建觀察者 & 定義響應事件的行為

@Override

public void onSubscribe(Disposable d) {

Log.d(TAG, "開始采用subscribe連接");

}

// 默認最先調(diào)用復寫的 onSubscribe()

@Override

public void onNext(Integer value) {

Log.d(TAG, "對Next事件"+ value +"作出響應" );

}

@Override

public void onError(Throwable e) {

Log.d(TAG, "對Error事件作出響應");

}

@Override

public void onComplete() {

Log.d(TAG, "對Complete事件作出響應");

}

});

這種 基于事件流的鏈式調(diào)用名段,使得RxJava:

  • 邏輯簡潔
  • 實現(xiàn)優(yōu)雅
  • 使用簡單

更重要的是阱扬,隨著程序邏輯的復雜性提高,它依然能夠保持簡潔 & 優(yōu)雅伸辟。所以麻惶,一般建議使用這種基于事件流的鏈式調(diào)用方式實現(xiàn)RxJava。

線程控制

在 RxJava 的默認規(guī)則中信夫,事件的發(fā)出和消費都是在同一個線程的窃蹋。也就是說,如果只用上面的方法忙迁,實現(xiàn)出來的只是一個同步的觀察者模式脐彩。觀察者模式本身的目的就是『后臺處理,前臺回調(diào)』的異步機制姊扔,因此異步對于 RxJava 是至關重要的惠奸。而要實現(xiàn)異步,則需要用到 RxJava 的另一個概念: Scheduler 恰梢。

在不指定線程的情況下佛南, RxJava 遵循的是線程不變的原則梗掰,即:在哪個線程調(diào)用 subscribe(),就在哪個線程生產(chǎn)事件嗅回;在哪個線程生產(chǎn)事件及穗,就在哪個線程消費事件。如果需要切換線程绵载,就需要用到 Scheduler (調(diào)度器)埂陆。

在RxJava 中,Scheduler ——調(diào)度器娃豹,相當于線程控制器焚虱,RxJava 通過它來指定每一段代碼應該運行在什么樣的線程懂版。

略...

感興趣可參考 給 Android 開發(fā)者的 RxJava 詳解

場景示例

  • 需求場景


  • 功能邏輯
  • 代碼實現(xiàn)

public class RxJavaRetry {

private static final String TAG = "RxJava";

// 可重試次數(shù)

private int maxConnectCount = 10;

// 當前已重試次數(shù)

private int currentRetryCount = 0;

// 重試等待時間

private int waitRetryTime = 0;

@Override

protected void onCreate(Bundle savedInstanceState) {

//省略

// 步驟3:采用Observable<...>形式 對 網(wǎng)絡請求 進行封裝

Observable<Translation> observable = request.getCall();

// 步驟4:發(fā)送網(wǎng)絡請求 & 通過retryWhen()進行重試

// 注:主要異常才會回調(diào)retryWhen()進行重試

observable.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {

@Override

public ObservableSource<?> apply(@NonNull Observable<Throwable> throwableObservable) throws Exception {

// 參數(shù)Observable<Throwable>中的泛型 = 上游操作符拋出的異常,可通過該條件來判斷異常的類型

return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {

@Override

public ObservableSource<?> apply(@NonNull Throwable throwable) throws Exception {

// 輸出異常信息

Log.d(TAG, "發(fā)生異常 = "+ throwable.toString());

/**

  • 需求1:根據(jù)異常類型選擇是否重試
  • 即,當發(fā)生的異常 = 網(wǎng)絡異常 = IO異常 才選擇重試

*/

if (throwable instanceof IOException){

Log.d(TAG, "屬于IO異常供嚎,需重試" );

/**

  • 需求2:限制重試次數(shù)
  • 即峭状,當已重試次數(shù) < 設置的重試次數(shù),才選擇重試

*/

if (currentRetryCount < maxConnectCount){

// 記錄重試次數(shù)

currentRetryCount++;

/**

  • 需求2:實現(xiàn)重試
  • 通過返回的Observable發(fā)送的事件 = Next事件胆敞,從而使得retryWhen()重訂閱移层,最終實現(xiàn)重試功能
  • 需求3:延遲1段時間再重試

  • 采用delay操作符 = 延遲一段時間發(fā)送观话,以實現(xiàn)重試間隔設置
  • 需求4:遇到的異常越多频蛔,時間越長

  • 在delay操作符的等待時間內(nèi)設置 = 每重試1次晦溪,增多延遲重試時間1s

*/

// 設置等待時間

waitRetryTime = 1000 + currentRetryCount* 1000;

return Observable.just(1).delay(waitRetryTime, TimeUnit.MILLISECONDS);

}else{

// 若重試次數(shù)已 > 設置重試次數(shù)狞换,則不重試

// 通過發(fā)送error來停止重試(可在觀察者的onError()中獲取信息)

return Observable.error(new Throwable("重試次數(shù)已超過設置次數(shù) = " +currentRetryCount + "哀澈,即 不再重試"));

}

}else{

// 若發(fā)生的異常不屬于I/O異常割按,則不重試

// 通過返回的Observable發(fā)送的事件 = Error事件 實現(xiàn)(可在觀察者的onError()中獲取信息)

return Observable.error(new Throwable("發(fā)生了非網(wǎng)絡異常(非I/O異常)"));

}

}

});

}

}).subscribeOn(Schedulers.io()) // 切換到IO線程進行網(wǎng)絡請求

.observeOn(AndroidSchedulers.mainThread()) // 切換回到主線程 處理請求結果

.subscribe(new Observer<Translation>() {

@Override

public void onSubscribe(Disposable d) {

}

@Override

public void onNext(Translation result) {

// 接收服務器返回的數(shù)據(jù)

Log.d(TAG, "發(fā)送成功");

result.show();

}

@Override

public void onError(Throwable e) {

// 獲取停止重試的信息

Log.d(TAG, e.toString());

}

@Override

public void onComplete() {

}

});

}

}

更多場景可以參考 使用RxJava的最佳開發(fā)場景

HYSTRIX中的觀察者模式

Hystrix中的命令

Hystrix有兩個請求命令 HystrixCommand、HystrixObservableCommand院领。

HystrixCommand用在依賴服務返回單個操作結果的時候丈氓。又兩種執(zhí)行方式

  • execute():同步執(zhí)行万俗。從依賴的服務返回一個單一的結果對象闰歪,或是在發(fā)生錯誤的時候拋出異常库倘。
  • queue();異步執(zhí)行教翩。直接返回一個Future對象,其中包含了服務執(zhí)行結束時要返回的單一結果對象路捧。

HystrixObservableCommand 用在依賴服務返回多個操作結果的時候。它也實現(xiàn)了兩種執(zhí)行方式

  • observe():返回Obervable對象队寇,他代表了操作的多個結果,他是一個HotObservable
  • toObservable():同樣返回Observable對象零渐,也代表了操作多個結果诵盼,但它返回的是一個Cold Observable风宁。

在Hystrix的底層實現(xiàn)中大量使用了RxJava戒财。上面提到的Observable對象就是RxJava的核心內(nèi)容之一,可以把Observable對象理解為事件源或是被觀察者幽崩,與其對應的是Subscriber對象冈欢,可以理解為訂閱者或是觀察者凑耻。

  1. Observable用來向訂閱者Subscriber對象發(fā)布事件香浩,Subscriber對象在接收到事件后對其進行處理,這里所指的事件通常就是對依賴服務的調(diào)用囱晴。
  2. 一個Observable可以發(fā)出多個事件畸写,直到結束或是發(fā)生異常论笔。
  3. Observable對象每發(fā)出一個事件狂魔,就會調(diào)用對應觀察者Subscriber對象的onNext()方法最楷。
  4. 每一個Observable的執(zhí)行,最后一定會通過調(diào)用Subscriber.onCompleted()或是Subscriber.onError()來結束該事件的操作流蚯撩。

調(diào)用實例

public class HelloWorldHystrixObservableCommand extends HystrixObservableCommand<String> {

private final String name;

protected HelloWorldHystrixObservableCommand(String name) {

super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));

this.name = name;

}

@Override

protected Observable<String> construct() {

System.out.println("in construct! thread:" + Thread.currentThread().getName());

return (Observable<String>) Observable.create(new Observable.OnSubscribe<String>() {

@Override

public void call(Subscriber<? super String> observer) {

try {

System.out.println("in call of construct! thread:" + Thread.currentThread().getName());

if (!observer.isUnsubscribed()) {

// 直接拋異常退出,不會往下執(zhí)行

// observer.onError(getExecutionException());

observer.onNext("Hello1" + " thread:" + Thread.currentThread().getName());

observer.onNext("Hello2" + " thread:" + Thread.currentThread().getName());

observer.onNext(name + " thread:" + Thread.currentThread().getName());

System.out.println("complete before------" + " thread:" + Thread.currentThread().getName());

// 不會往下執(zhí)行observer的任何方法

observer.onCompleted();

System.out.println("complete after------" + " thread:" + Thread.currentThread().getName());

// 不會執(zhí)行到

observer.onCompleted();

// 不會執(zhí)行到

observer.onNext("abc");

}

} catch (Exception e) {

observer.onError(e);

}

}

});

}

public static void main(String[] args) {

Observable<String> observable = new HelloWorldHystrixObservableCommand("test").observe();

observable.subscribe(new Subscriber<String>() {

public void onCompleted() {

System.out.println("completed");

}

public void onError(Throwable throwable) {

System.out.println("error-----------" + throwable);

}

public void onNext(String v) {

System.out.println("next------------" + v);

}

});

}

}

執(zhí)行結果

in construct! thread:main

in call of construct! thread:main

complete before------ thread:main

complete after------ thread:main

next------------Hello1 thread:main

next------------Hello2 thread:main

next------------test thread:main

completed

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末芽卿,一起剝皮案震驚了整個濱河市卸例,隨后出現(xiàn)的幾起案子姑原,更是在濱河造成了極大的恐慌呜舒,老刑警劉巖唤殴,帶你破解...
    沈念sama閱讀 219,490評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件蔚袍,死亡現(xiàn)場離奇詭異,居然都是意外死亡闰蚕,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,581評論 3 395
  • 文/潘曉璐 我一進店門盼玄,熙熙樓的掌柜王于貴愁眉苦臉地迎上來埃儿,“玉大人,你說我怎么就攤上這事剃斧。” “怎么了根蟹?”我有些...
    開封第一講書人閱讀 165,830評論 0 356
  • 文/不壞的土叔 我叫張陵蚌堵,是天一觀的道長督赤。 經(jīng)常有香客問我,道長泻蚊,這世上最難降的妖魔是什么躲舌? 我笑而不...
    開封第一講書人閱讀 58,957評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮性雄,結果婚禮上没卸,老公的妹妹穿的比我還像新娘。我一直安慰自己秒旋,他們只是感情好约计,可當我...
    茶點故事閱讀 67,974評論 6 393
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著煤蚌,像睡著了一般。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,754評論 1 307
  • 那天并村,我揣著相機與錄音膝昆,去河邊找鬼骄呼。 笑死嫉沽,一個胖子當著我的面吹牛漱牵,可吹牛的內(nèi)容都是我干的闻镶。 我是一名探鬼主播墩剖,決...
    沈念sama閱讀 40,464評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼进倍,長吁一口氣:“原來是場噩夢啊……” “哼烙如!你這毒婦竟也來了?” 一聲冷哼從身側響起黍图,我...
    開封第一講書人閱讀 39,357評論 0 276
  • 序言:老撾萬榮一對情侶失蹤顾犹,失蹤者是張志新(化名)和其女友劉穎锄奢,沒想到半個月后灰伟,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體涣雕,經(jīng)...
    沈念sama閱讀 45,847評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡先蒋,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,995評論 3 338
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了借笙。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片君编。...
    茶點故事閱讀 40,137評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡琴拧,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出风罩,到底是詐尸還是另有隱情淋硝,我是刑警寧澤,帶...
    沈念sama閱讀 35,819評論 5 346
  • 正文 年R本政府宣布宽菜,位于F島的核電站谣膳,受9級特大地震影響,放射性物質發(fā)生泄漏铅乡。R本人自食惡果不足惜继谚,卻給世界環(huán)境...
    茶點故事閱讀 41,482評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望阵幸。 院中可真熱鬧花履,春花似錦、人聲如沸挚赊。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,023評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽荠割。三九已至妹卿,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間蔑鹦,已是汗流浹背夺克。 一陣腳步聲響...
    開封第一講書人閱讀 33,149評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留举反,地道東北人懊直。 一個月前我還...
    沈念sama閱讀 48,409評論 3 373
  • 正文 我出身青樓,卻偏偏與公主長得像火鼻,于是被迫代替她去往敵國和親室囊。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,086評論 2 355

推薦閱讀更多精彩內(nèi)容