個(gè)人學(xué)習(xí)筆記分享反镇,當(dāng)前能力有限,請(qǐng)勿貶低送火,菜鳥互學(xué)拳话,大佬繞道
如有勘誤,歡迎指出和討論种吸,本文后期也會(huì)進(jìn)行修正和補(bǔ)充
前言
前面有一篇文章已經(jīng)介紹了訂閱/發(fā)布模式弃衍,即生產(chǎn)者和消費(fèi)者通過一個(gè)中介者來交互
- 生產(chǎn)者只負(fù)責(zé)向中介傳遞數(shù)據(jù),不關(guān)心其余步驟
- 消費(fèi)者在中介者處進(jìn)行注冊(cè)坚俗,告知中介者自己需要數(shù)據(jù)
- 中介者接受來自生產(chǎn)者的數(shù)據(jù)镜盯,并傳遞給在自己這里注冊(cè)過的消費(fèi)者
當(dāng)生產(chǎn)者只有一個(gè)的時(shí)候,可以省略掉中介者猖败,直接在生產(chǎn)者處注冊(cè)消費(fèi)者
通常滿足N-1-N或者1-N的交互模型
消費(fèi)者在中介者處或者直接向生產(chǎn)者訂閱消息速缆,而生產(chǎn)者負(fù)責(zé)發(fā)布消息,由中介者或者生產(chǎn)者
因而被稱為訂閱/發(fā)布模式
可以看到恩闻,注冊(cè)過的消費(fèi)者總是在等待消息艺糜,無論消息來自中介者,或者直接來源于生產(chǎn)者幢尚,最終目的都是觀察生產(chǎn)者
因此這種模式也被稱為觀察者模式
在實(shí)際生活中破停,最常見的就是訂閱,無論是短信訂閱尉剩,還是微信上的訂閱號(hào)真慢,我們都是在作為消費(fèi)者,被動(dòng)的接受消息(雖然很多時(shí)候都是單方面在騷擾我們理茎。黑界。管嬉。)
而在開發(fā)中,生產(chǎn)者負(fù)責(zé)生產(chǎn)消息园爷,不關(guān)心如何被消費(fèi)以及消費(fèi)者是誰宠蚂;消費(fèi)者注冊(cè)并接受消息,不關(guān)心消息的來源和時(shí)間童社;生產(chǎn)者和消費(fèi)者并不需要時(shí)刻保持聯(lián)系
其核心目的還是那個(gè)老生常談的求厕,解耦
1.介紹
適用目的:定義對(duì)象間的一種一對(duì)多的依賴關(guān)系,當(dāng)一個(gè)對(duì)象的狀態(tài)發(fā)生改變時(shí)扰楼,所有依賴于它的對(duì)象都得到通知并被自動(dòng)更新呀癣。
主要解決:一個(gè)對(duì)象狀態(tài)改變給其他對(duì)象通知的問題,而且要考慮到易用和低耦合弦赖,保證高度的協(xié)作项栏。
何時(shí)使用:一個(gè)對(duì)象(目標(biāo)對(duì)象)的狀態(tài)發(fā)生改變,所有的依賴對(duì)象(觀察者對(duì)象)都將得到通知蹬竖,進(jìn)行廣播通知沼沈。
如何解決:使用面向?qū)ο蠹夹g(shù),可以將這種依賴關(guān)系弱化币厕。
關(guān)鍵代碼:在抽象類里有一個(gè)集合存放觀察者們列另。
應(yīng)用實(shí)例:短信/公眾號(hào)推送;平臺(tái)的公告旦装;股票與股民页衙;
優(yōu)點(diǎn):
- 降低了目標(biāo)與觀察者之間的耦合關(guān)系,兩者之間是抽象耦合關(guān)系
- 目標(biāo)與觀察者之間建立了一套觸發(fā)機(jī)制阴绢。
缺點(diǎn):
- 目標(biāo)與觀察者之間的依賴關(guān)系并沒有完全解除店乐,而且有可能出現(xiàn)循環(huán)引用。
- 當(dāng)觀察者對(duì)象很多時(shí)呻袭,通知的發(fā)布會(huì)花費(fèi)很多時(shí)間眨八,影響程序的效率
使用場景:
- 一個(gè)抽象模型有兩個(gè)方面,其中一個(gè)方面依賴于另一個(gè)方面左电。將這些方面封裝在獨(dú)立的對(duì)象中使它們可以各自獨(dú)立地改變和復(fù)用踪古。
- 一個(gè)對(duì)象的改變將導(dǎo)致其他一個(gè)或多個(gè)對(duì)象也發(fā)生改變,而不知道具體有多少對(duì)象將發(fā)生改變券腔,可以降低對(duì)象之間的耦合度。
- 一個(gè)對(duì)象必須通知其他對(duì)象拘泞,而并不知道這些對(duì)象是誰纷纫。
- 需要在系統(tǒng)中創(chuàng)建一個(gè)觸發(fā)鏈,A對(duì)象的行為將影響B(tài)對(duì)象陪腌,B對(duì)象的行為將影響C對(duì)象……辱魁,可以使用觀察者模式創(chuàng)建一種鏈?zhǔn)接|發(fā)機(jī)制烟瞧。
注意事項(xiàng):
- 避免循環(huán)引用
- 異步以防止某一個(gè)觀察者出錯(cuò)導(dǎo)致整個(gè)系統(tǒng)卡殼
2.結(jié)構(gòu)
觀察者模式的主要角色
- 抽象主題(Subject):也叫抽象目標(biāo)類,它提供了一個(gè)用于保存觀察者對(duì)象的聚集類和增加染簇、刪除觀察者對(duì)象的方法参滴,以及通知所有觀察者的抽象方法。
- 具體主題(Concrete Subject):也叫具體目標(biāo)類锻弓,它實(shí)現(xiàn)抽象目標(biāo)中的通知方法砾赔,當(dāng)具體主題的內(nèi)部狀態(tài)發(fā)生改變時(shí),通知所有注冊(cè)過的觀察者對(duì)象青灼。
- 抽象觀察者(Observer):它是一個(gè)抽象類或接口暴心,它包含了一個(gè)更新自己的抽象方法,當(dāng)接到具體主題的更改通知時(shí)被調(diào)用杂拨。
- 具體觀察者(Concrete Observer) :實(shí)現(xiàn)抽象觀察者中定義的抽象方法专普,以便在得到目標(biāo)的更改通知時(shí)更新自身的狀態(tài)。
[圖片上傳失敗...(image-253ff8-1624958200920)]
3.步驟
-
創(chuàng)建抽象目標(biāo)
// 抽象目標(biāo) abstract class Subject { protected Collection<Observer> observers = new HashSet<>(); public void add(Observer observer) { observers.add(observer); } public void remove(Observer observer) { observers.remove(observer); } public abstract void notifyObserver(String msg); }
-
創(chuàng)建具體目標(biāo)弹沽,繼承抽象目標(biāo)檀夹,并實(shí)現(xiàn)其虛擬方法
// 具體目標(biāo) class ConcreteSubject extends Subject { @Override public void notifyObserver(String msg) { System.out.println("具體目標(biāo)發(fā)生改變!" + msg); observers.parallelStream().forEach(m -> m.response(msg)); } }
-
創(chuàng)建抽象觀察者
// 抽象觀察者 interface Observer { void response(String msg); }
-
創(chuàng)建具體觀察者策橘,實(shí)現(xiàn)抽象觀察者接口
// 具體觀察者A class ConcreteObserverA implements Observer { @Override public void response(String msg) { System.out.println("具體觀察者A作出反應(yīng)炸渡!" + msg); } } // 具體觀察者B class ConcreteObserverB implements Observer { @Override public void response(String msg) { System.out.println("具體觀察者B作出反應(yīng)!" + msg); } }
測試代碼
public class ObserverTest {
public static void main(String[] args) {
Subject subject = new ConcreteSubject();
Observer observerA = new ConcreteObserverA();
Observer observerB = new ConcreteObserverB();
subject.add(observerA);
subject.add(observerB);
subject.notifyObserver("hello world");
subject.remove(observerA);
subject.notifyObserver("你好");
}
}
運(yùn)行結(jié)果
[圖片上傳失敗...(image-2fd49f-1624958200920)]
4.擴(kuò)展
實(shí)際上在Java的jdk中役纹,已經(jīng)通過 java.util.Observable 類和 java.util.Observer 接口定義了觀察者模式偶摔,只要實(shí)現(xiàn)他們的子類即可編寫觀察者模式實(shí)例
但是兩個(gè)已被jdk9棄用,官方推薦的做法是使用java.util.concurrent.Flow的API
下面會(huì)對(duì)這兩種分別給出示例
4.1.Observable類 + Observer類
Observable類是抽象目標(biāo)類促脉,持有一個(gè)Vector向量辰斋,用于保存所有要通知的觀察者對(duì)象。
主要方法如下:
-
void addObserver(Observer o)
:用于將新的觀察者對(duì)象添加到向量中 -
void notifyObservers(Object arg)
:調(diào)用向量中所有觀察者的update()
方法瘸味,通知他們數(shù)據(jù)已發(fā)生改變宫仗。通常先通知后放入的觀察者;可以通過參數(shù)arg
向update()
傳遞數(shù)據(jù) -
void setChange()
:用于設(shè)置一個(gè)布爾類型的內(nèi)部標(biāo)志位旁仿,注明目標(biāo)對(duì)象已發(fā)生改變藕夫;當(dāng)它為真時(shí),notifyObservers
才會(huì)通知觀察者
完整示例如下
package com.company.designPattern.observer;
import java.util.Date;
import java.util.Observable;
import java.util.Observer;
// 被觀察者(具體目標(biāo))
class NumObservable extends Observable {
private int num = 0;
public int getNum() {
return num;
}
public void setNum(int num) {
this.num = num;
setChanged();
notifyObservers(new Date());
}
}
// 觀察者A
class ObserverA implements Observer {
@Override
public void update(Observable o, Object arg) {
NumObservable object = (NumObservable) o;
System.out.println("ObserverA: Num has changed to " + object.getNum() + "\n Message: " + arg);
}
}
// 觀察者B
class ObserverB implements Observer {
@Override
public void update(Observable o, Object arg) {
NumObservable object = (NumObservable) o;
System.out.println("ObserverB: Num has changed to " + object.getNum() + "\n Message: " + arg);
}
}
public class ObserverTest1 {
public static void main(String[] args) {
// 創(chuàng)建被觀察者和觀察者
NumObservable observable = new NumObservable();
Observer observerA = new ObserverA();
Observer observerB = new ObserverB();
// 關(guān)聯(lián)
observable.addObserver(observerA);
observable.addObserver(observerB);
// 修改數(shù)據(jù)10
observable.setNum(10);
// 修改數(shù)據(jù)20
observable.setNum(20);
// 解除observerA的觀察關(guān)聯(lián)枯冈,修改數(shù)據(jù)30
observable.deleteObserver(observerA);
observable.setNum(30);
}
}
運(yùn)行結(jié)果
[圖片上傳失敗...(image-beeca4-1624958200920)]
前兩次毅贮,按照后加入先通知的順序,分別通知了A和B
第三次尘奏,解除了A的關(guān)聯(lián)滩褥,所以只通知了B
4.2.Flow API
Flow API 是 Java 9 引入的響應(yīng)式編程的接口,其中包含4個(gè)接口:
- Publisher:發(fā)布者炫加,負(fù)責(zé)發(fā)布消息瑰煎;
- Subscriber:訂閱者铺然,負(fù)責(zé)訂閱處理消息;
- Subscription:訂閱控制類酒甸,可用于發(fā)布者和訂閱者之間通信魄健;
- Processor:處理者,同時(shí)充當(dāng)Publisher和Subscriber的角色插勤。
請(qǐng)注意Flow API僅提供接口沽瘦,并不提供具體實(shí)現(xiàn),請(qǐng)自行按照需求實(shí)現(xiàn)
[圖片上傳失敗...(image-285ca2-1624958200920)]
示例如下
-
定義一個(gè)類饮六,用于訂閱者和發(fā)布者之間傳輸數(shù)據(jù)
/** * 定義一個(gè)用于傳遞數(shù)據(jù)的類 */ class Message { public String msg = ""; public int leftCount = 0; public Message(String msg, int leftCount) { this.msg = msg; this.leftCount = leftCount; } }
可以根據(jù)自己的需求構(gòu)造類的內(nèi)容
-
定義一個(gè)發(fā)布者
/** * 自定義發(fā)布者 * 需要指定訂閱者發(fā)送給發(fā)布者的數(shù)據(jù)類型 */ class MyPublisher implements Flow.Publisher<Message> { private int count = 0; // 計(jì)數(shù)器其垄,從0開始 private final int maxCount; // 最大計(jì)數(shù)器 private int leftCount = 0; // 剩余計(jì)數(shù) private final long interval; // 發(fā)送間隔 private boolean isCanceled; // 是否被取消 /** * 構(gòu)造函數(shù),根據(jù)需要初始化數(shù)據(jù) * * @param interval 初始化發(fā)送間隔 * @param maxCount 最大計(jì)數(shù)器卤橄,達(dá)到數(shù)量后自動(dòng)停止 */ public MyPublisher(long interval, int maxCount) { this.interval = interval; this.maxCount = maxCount; } /** * 訂閱事件 * 在這里定義訂閱者訂閱后的操作绿满,通常是在某條件下傳遞一個(gè)對(duì)象給訂閱者 * 為方便演示,我們每隔一段時(shí)間向訂閱者發(fā)送當(dāng)前計(jì)數(shù)N次窟扑,N由訂閱者傳遞給我們 * * @param subscriber */ @Override public void subscribe(Flow.Subscriber<? super Message> subscriber) { // 使用線程來異步執(zhí)行每個(gè)訂閱操作 new Thread(() -> { try { // 給訂閱者分配一個(gè)控制器 subscriber.onSubscribe(new MySubscription()); // 循環(huán)執(zhí)行核心操作 while (!isCanceled && count < maxCount) { // 當(dāng)剩余數(shù)量大于0時(shí)喇颁,傳遞數(shù)據(jù)給訂閱者 if (leftCount > 0) { subscriber.onNext(new Message(new Date() + ":" + ++count, --leftCount)); Thread.sleep(interval); } } // 結(jié)束訂閱后,通知訂閱者已結(jié)束 subscriber.onComplete(); } catch (Exception e) { // 出現(xiàn)錯(cuò)誤時(shí)嚎货,通知訂閱者發(fā)生錯(cuò)誤 subscriber.onError(e); } }).start(); } /** * 自定義訂閱控制類 * 重寫request和cancel方法橘霎,提供給訂閱者使用 */ private class MySubscription implements Flow.Subscription { /** * 接受到來自訂閱者的數(shù)據(jù)請(qǐng)求 * * @param n 請(qǐng)求次數(shù) */ @Override public void request(long n) { // 將次數(shù)累加到剩余次數(shù)中 leftCount += n; } /** * 接收到來自訂閱者的取消請(qǐng)求 */ @Override public void cancel() { isCanceled = true; } } }
發(fā)布者的核心任務(wù)即
subscribe
,需要在這里定義訂閱后的操作殖属,通常異步執(zhí)行 -
定義一個(gè)訂閱者
/** * 自定義訂閱者 * 需要指定從發(fā)布者接收到的數(shù)據(jù)類型 * 模擬事件:請(qǐng)求一定數(shù)量的數(shù)據(jù)姐叁,并且根據(jù)需要分批請(qǐng)求 */ class MySubscriber implements Flow.Subscriber<Message> { private Flow.Subscription subscription; // 用于持有來自訂閱者的控制器(其實(shí)并不必要) private int perNum; // 每輪數(shù)量 private int count; // 計(jì)數(shù)器 /** * 構(gòu)造函數(shù),根據(jù)需要初始化數(shù)據(jù) * * @param perNum 每輪訂閱次數(shù) * @param count 訂閱次數(shù) */ public MySubscriber(int perNum, int count) { this.perNum = perNum; this.count = count; } /** * 發(fā)起一輪請(qǐng)求 */ private void startNewRound() { System.out.println("Start a new round"); int requestCount = Math.min(count, perNum); count -= requestCount; subscription.request(requestCount); } /** * 訂閱事件 * * @param subscription */ @Override public void onSubscribe(Flow.Subscription subscription) { this.subscription = subscription; // 發(fā)起第一輪請(qǐng)求 startNewRound(); } // 接受來自發(fā)布者的觸發(fā)指令 @Override public void onNext(Message item) { System.out.println("receive message: " + item.msg); System.out.println("now left: " + item.leftCount); // 本輪結(jié)束的時(shí)候洗显,開啟下一輪 if (item.leftCount == 0 && count > 0) { startNewRound(); } } // 接受來自發(fā)布者的錯(cuò)誤 @Override public void onError(Throwable throwable) { System.out.println("onError:" + throwable.getMessage()); } // 接受來自發(fā)布者的完成指令 @Override public void onComplete() { System.out.println("onComplete!"); } }
核心部分為
onSubscribe
和onNext
外潜,分別用于發(fā)起第一次請(qǐng)求,和發(fā)起后續(xù)請(qǐng)求
客戶端代碼
public class FlowDemo {
public static void main(String[] args) {
MyPublisher publisher = new MyPublisher(500L, 10); // 每500ms發(fā)送一次挠唆,最多20次
MySubscriber subscriber = new MySubscriber(3, 20); //每輪發(fā)送3次处窥,總共8輪
publisher.subscribe(subscriber);
try {
Thread.currentThread().join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
運(yùn)行結(jié)果
<img src="https://i.loli.net/2021/06/29/pwYSqAKDyaRWtl8.png" alt="image-20210629165110149" style="zoom:50%;" />
可以看到一共發(fā)起了4輪查詢,最后一輪僅有1個(gè)數(shù)據(jù)
完整demo:https://gitee.com/echo_ye/practice/tree/master/src/main/java/com/company/designPattern/observer
后記
在實(shí)際使用中觀察者模式相當(dāng)常見玄组,其最根本的生產(chǎn)者-消費(fèi)者模型更是成為了面試必考題滔驾。。俄讹。
Flow的做法也是令人眼前一亮哆致,提供全套的模型,但只提供接口患膛,在保證模型的功能和效率的前提下沽瞭,也盡可能的給我們開發(fā)者自由發(fā)揮的空間,可以在開發(fā)中嘗試這種模式
作者:Echo_Ye
WX:Echo_YeZ
Email :echo_yezi@qq.com
個(gè)人站點(diǎn):在搭了在搭了。驹溃。。(右鍵 - 新建文件夾)