概念介紹
響應式編程是一種面向數(shù)據(jù)流和變化傳播的編程范式
1.基礎(chǔ)概念定義
- 數(shù)據(jù)流:只能以事先規(guī)定好的順序被讀取一次的數(shù)據(jù)的一個序列
- 變化傳播:類似觀察者模式裕寨,變化了要通知別人。
- 編程范式:計算機編程的基本風格或典范模式
1.1竹观、數(shù)據(jù)流
- 在計算機中是數(shù)據(jù)
- 在現(xiàn)實中可以任意對象組成的有順序的隊列
- 就象水管里的水流,在水管的一端一點一點地供水,而在水管的另一端看到的是一股連續(xù)不斷的水流。
1.2、變化傳播
前一個程序的執(zhí)行的結(jié)果影響接下來程序的執(zhí)行與結(jié)果
1.3讹语、編程范式
編程范式(Programming Paradigm)是某種編程語言的典型編程風格或者說是編程方式搞糕。
??隨著編程方法學和軟件工程學的深入勇吊,特別是OO思想的普及,范式(Paradigm)以及編程范式等術(shù)語漸漸出現(xiàn)在人們面前窍仰。面向?qū)ο缶幊?OOP)常常被譽為是一種革命性的的思想汉规,正因為它不同于其他的各種編程范式。編程范式也許是學習任何一門編程語言時要理解的最重要的術(shù)語驹吮。
托馬斯.庫爾提出“科學的革命”的范式論后针史,Robert Floyd在1979年圖靈獎的頒獎演說中使用了編程范式一詞。編程范式一般包括三個方面钥屈,以O(shè)OP為例:
??1悟民,學科的邏輯體系——規(guī)則范式:如 類/對象、繼承篷就、動態(tài)綁定射亏、方法改寫、對象替換等等機制竭业。
??2智润,心理認知因素——心理范式:按照面向?qū)ο缶幊讨窤lan Kay的觀點,“計算就是模擬”未辆。OO范式極其重視隱喻(metaphor)的價值窟绷,通過擬人化,按照自然的方式模擬自然咐柜。
??3兼蜈,自然觀/世界觀——觀念范式:強調(diào)程序的組織技術(shù),視程序為松散耦合的對象/類的組合拙友,以繼承機制將類組織成一個層次結(jié)構(gòu)为狸,把程序運行視為相互服務(wù)的對象之間的對話。
簡單來說遗契,編程范式是程序員看待程序應該具有的觀點辐棒。
??為了進一步加深對編程范式的認識,這里介紹幾種最常見的編程范式。
??需要再次提醒的是:編程范式是編程語言的一種分類方式漾根,它并不針對某種編程語言泰涂。就編程語言而言,一種語言可以適用多種編程范式辐怕。
編程范式示例
自省編程或稱反射編程
2.RxJava簡介
A library for composing asynchronous and event-based programs by using observable sequences.
Asynchronous:
1.異步的,RxJava是一個異步的庫
2.基于回調(diào)的
Event-based:
1.基于事件的
2.事件分發(fā)的庫逼蒙,消息傳遞
2.1RxJava五大元素
1.Observable
2.Observer
3.Subscription
4.OnSubscribe
5.Subscriber
3 RxJava2五大元素
背壓概念
1.異步環(huán)境下產(chǎn)生的問題
2.發(fā)送和處理速度不統(tǒng)一
3.是一種流速控制解決策略
從RxJava一個比較常見的工作場景說起。
RxJava是一個觀察者模式的架構(gòu)寄疏,當這個架構(gòu)中被觀察者(Observable)和觀察者(Subscriber)處在不同的線程環(huán)境中時其做,由于者各自的工作量不一樣,導致它們產(chǎn)生事件和處理事件的速度不一樣赁还,這就會出現(xiàn)兩種情況:
被觀察者產(chǎn)生事件慢一些,觀察者處理事件很快驹沿。那么觀察者就會等著被觀察者發(fā)送事件艘策,(好比觀察者在等米下鍋,程序等待渊季,這沒有問題)朋蔫。
被觀察者產(chǎn)生事件的速度很快,而觀察者處理很慢却汉。那就出問題了驯妄,如果不作處理的話,事件會堆積起來合砂,最終擠爆你的內(nèi)存青扔,導致程序崩潰。(好比被觀察者生產(chǎn)的大米沒人吃翩伪,堆積最后就會爛掉)微猖。
下面我們用代碼演示一下這種崩潰的場景:
//被觀察者在主線程中,每1ms發(fā)送一個事件
Observable.interval(1, TimeUnit.MILLISECONDS)
//.subscribeOn(Schedulers.newThread())
//將觀察者的工作放在新線程環(huán)境中
.observeOn(Schedulers.newThread())
//觀察者處理每1000ms才處理一個事件
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.w("TAG","---->"+aLong);
}
});
在上面的代碼中缘屹,被觀察者發(fā)送事件的速度是觀察者處理速度的1000倍
這段代碼運行之后:
Caused by: rx.exceptions.MissingBackpressureException
拋出MissingBackpressureException往往就是因為凛剥,被觀察者發(fā)送事件的速度太快,而觀察者處理太慢轻姿,而且你還沒有做相應措施犁珠,所以報異常。
而這個MissingBackpressureException異常里面就包含了Backpressure這個單詞互亮,看來背壓肯定和這種異常情況有關(guān)系犁享。
關(guān)于背壓(Backpressure)
我這兩天翻閱了大量的中文和英文資料,我發(fā)現(xiàn)中文資料中胳挎,很多人對于背壓(Backpressure)的理解是有很大問題的饼疙,有的人把它看作一個需要避免的問題,或者程序的異常,有的人則干脆避而不談窑眯,模棱兩可屏积,著實讓人尷尬。
通過參考和對比大量的相關(guān)資料磅甩,我在這里先對背壓(Backpressure)做一個明確的定義:背壓是指在異步場景中炊林,被觀察者發(fā)送事件速度遠快于觀察者的處理速度的情況下,一種告訴上游的被觀察者降低發(fā)送速度的策略
簡而言之卷要,背壓是流速控制的一種策略渣聚。
需要強調(diào)兩點:
背壓策略的一個前提是異步環(huán)境,也就是說僧叉,被觀察者和觀察者處在不同的線程環(huán)境中奕枝。
背壓(Backpressure)并不是一個像flatMap一樣可以在程序中直接使用的操作符,他只是一種控制事件流速的策略瓶堕。
那么我們再回看上面的程序異常就很好理解了隘道,就是當被觀察者發(fā)送事件速度過快的情況下,我們沒有做流速控制郎笆,導致了異常谭梗。
3.1RxJava2基本元素
非背壓
1.Observable
- 1.觀察得到的-被觀察者,不支持背壓
- 2.通過Observable創(chuàng)建一個可觀察的序列(create方法)
- 3.通過subscribe去注冊一個觀察者
2.Observer
- 1.用于接收數(shù)據(jù)-觀察者
- 2.作為Observable的subscribe方法的參數(shù)
3.Disposable
- 1.和RxJava1的Subscription的作用相當
- 2.用于取消訂閱和獲取當前的訂閱狀態(tài)
4.OnSubscrible
- 1.當訂閱時會觸發(fā)此接口調(diào)用
- 2.在Observable內(nèi)部,實際作用是向觀察者發(fā)射數(shù)據(jù)
5.Emitter
- 1.一個發(fā)射數(shù)據(jù)的接口宛蚓,和Observer的方法類似
- 2.本質(zhì)是對Observer和Subscriber的包裝
背壓
1.Flowable
- 1.易流動的---被觀察者激捏,支持背壓
- 2.通過Flowable創(chuàng)建一個可觀察的序列(create方法)
- 3.通過subscribe去注冊一個觀察者
2.Subscriber
- 1.一個單獨接口,和Observer的方法類似
- 2.作為Flowable的subscribe方法的參數(shù)
3.Subscription
- 1.訂閱,和RxJava1的有所不同
- 2.支持背壓凄吏,有用于背壓的request方法
4.FlowableOnSubscribe
- 1.當訂閱時會觸發(fā)此接口調(diào)用
- 2.在Flowable內(nèi)部远舅,實際作用是向觀察者發(fā)射數(shù)據(jù)
5.Emitter
- 1.一個發(fā)射數(shù)據(jù)的接口,和Observer的方法類似
- 2.本質(zhì)是對Observer的Subscriber的包裝
3.2自定義實現(xiàn)RxJava1基本功能
自定義映射關(guān)系類(仿寫相應功能)痕钢,如下:
- Observable -> Caller
- Observer -> Callee
- Subscription -> Calling
- OnSubscribe -> OnCall
- Subscriber -> Receiver
/**
* Created by Xionghu on 2018/6/6.
* Desc: 簡單的調(diào)用
*/
public interface Action1<T>{
void call(T t);
}
public class Caller<T> {
final OnCall<T> onCall;
public Caller(OnCall<T> onCall) {
this.onCall = onCall;
}
public static <T> Caller<T> create(OnCall<T> onCall) {
return new Caller<>(onCall);
}
public Calling call(Receiver<T> receiver) {
this.onCall.call(receiver);
return receiver;
}
public interface OnCall<T> extends Action1<Receiver<T>> {
}
}
public interface Callee<T>{
void onCompleted();
void onError(Throwable t);
void onReceive(T t);
}
/**
* Created by Xionghu on 2018/6/6.
* Desc: 描述打電話
*/
public interface Calling {
void unCall();
boolean isUnCalled();
}
/**
* Created by Xionghu on 2018/6/6.
* Desc: 接收信息的人
*/
public abstract class Receiver<T> implements Callee<T>, Calling {
private volatile boolean unCalled;
@Override
public void unCall() {
unCalled = true;
}
@Override
public boolean isUnCalled() {
return unCalled;
}
}
調(diào)用:
@OnClick(R.id.async)
public void onViewClicked() {
Calling tCalling = Caller.create(new Caller.OnCall<String>() {
@Override
public void call(Receiver<String> stringReceiver) {
if (!stringReceiver.isUnCalled()) {
stringReceiver.onReceive("test");
stringReceiver.onCompleted();
}
}
}).call(new Receiver<String>() {
@Override
public void onCompleted() {
Log.d("kpioneer", "onCompleted");
}
@Override
public void onError(Throwable t) {
}
@Override
public void onReceive(String s) {
Log.d("kpioneer", "onReceive:" + s);
}
});
Caller caller = Caller.create(new Caller.OnCall<String>() {
@Override
public void call(Receiver<String> stringReceiver) {
if (!stringReceiver.isUnCalled()) {
stringReceiver.onReceive("test");
stringReceiver.onCompleted();
}
}
});
caller.call(new Receiver<String>() {
@Override
public void onCompleted() {
Log.d("kpioneer", "onCompleted");
}
@Override
public void onError(Throwable t) {
}
@Override
public void onReceive(String s) {
Log.d("kpioneer", "onReceive:" + s);
}
});
}
}
Log 打印
06-06 18:35:55.210 28537-28537/com.haocai.rxjavademo D/kpioneer: onReceive:test
06-06 18:35:55.210 28537-28537/com.haocai.rxjavademo D/kpioneer: onCompleted
06-06 18:35:55.210 28537-28537/com.haocai.rxjavademo D/kpioneer: onReceive:test
06-06 18:35:55.210 28537-28537/com.haocai.rxjavademo D/kpioneer: onCompleted
3.3自定義實現(xiàn)RxJava2(無背壓)基本功能
自定義映射關(guān)系類(仿寫相應功能)表谊,如下:
1.Observable ->Caller
2.Observer ->Callee
3.Disposable ->Release
4.OnSubscribe -> OnCall
5.Emitter -> Emitter
public abstract class Caller<T> {
public static <T> Caller<T> create(CallerOnCall<T> callerOnCall) {
return new CallerCreate<>(callerOnCall);
}
public void call(Callee<T> callee) {
callActual(callee);
}
protected abstract void callActual(Callee<T> callee);
}
/**
* Created by Xionghu on 2018/6/7.
* Desc: 接電話的人
*/
public interface Callee<T> {
void onCall(Release release);
void onReceive(T t);
void onCompleted();
void onError(Throwable t);
}
public class CallerCreate<T> extends Caller<T> {
private CallerOnCall<T> mCallerOnCall;
public CallerCreate(CallerOnCall<T> mCallerOnCall) {
this.mCallerOnCall = mCallerOnCall;
}
@Override
protected void callActual(Callee<T> callee) {
CreateEmitter<T> tCallerEmitter = new CreateEmitter<>(callee);
callee.onCall(tCallerEmitter);
mCallerOnCall.call(tCallerEmitter);
}
public static final class CreateEmitter<T> extends AtomicReference<Release> implements CallerEmitter<T>, Release {
private Callee<T> mCallee;
public CreateEmitter(Callee<T> mCallee) {
this.mCallee = mCallee;
}
@Override
public void onReceive(T t) {
if (!isReleased()) {
mCallee.onReceive(t);
}
}
@Override
public void onCompleted() {
if (!isReleased()) {
mCallee.onCompleted();
}
}
@Override
public void onError(Throwable t) {
if (!isReleased()) {
mCallee.onError(t);
}
}
@Override
public boolean isReleased() {
return ReleaseHelper.isReleased(get());
}
@Override
public void release() {
ReleaseHelper.release(this);
}
}
}
public interface CallerEmitter<T> extends Emitter<T> {
}
/**
* Created by Xionghu on 2018/6/7.
* Desc: 當打電話的時候
*/
public interface CallerOnCall<T> {
void call(CallerEmitter<T> callerEmitter);
}
/**
* Created by Xionghu on 2018/6/7.
* Desc:
*/
public interface Emitter<T> {
void onReceive(T t);
void onCompleted();
void onError(Throwable t);
}
/**
* Created by Xionghu on 2018/6/7.
* Desc: 掛斷電話
*/
public interface Release {
boolean isReleased();
void release();
}
import java.util.concurrent.atomic.AtomicReference;
/**
* Created by Xionghu on 2018/6/7.
* Desc:
*/
public enum ReleaseHelper implements Release {
RELEASED;
public static boolean isReleased(Release release) {
return release == RELEASED;
}
public static boolean release(AtomicReference<Release> releaseAtomicReference) {
Release current = releaseAtomicReference.get();
Release d = RELEASED;
if (current != d) {
current = releaseAtomicReference.getAndSet(d);
if (current != d) {
if (current != null) {
current.release();
}
return true;
}
}
return false;
}
@Override
public boolean isReleased() {
return true;
}
@Override
public void release() {
}
}
主程序調(diào)用:
@OnClick(R.id.async)
public void onViewClicked() {
Caller.create(new CallerOnCall<String>() {
@Override
public void call(CallerEmitter<String> callerEmitter) {
callerEmitter.onReceive("test");
callerEmitter.onCompleted();
}
}).call(new Callee<String>() {
@Override
public void onCall(Release release) {
Log.d("kpioneer", "onCall");
}
@Override
public void onReceive(String s) {
Log.d("kpioneer", "onReceive:" + s);
}
@Override
public void onCompleted() {
Log.d("kpioneer", "onCompleted");
}
@Override
public void onError(Throwable t) {
}
});
}
06-08 13:24:20.536 20720-20720/com.haocai.rxjavademo D/kpioneer: onCall
06-08 13:24:20.536 20720-20720/com.haocai.rxjavademo D/kpioneer: onReceive:test
06-08 13:24:20.536 20720-20720/com.haocai.rxjavademo D/kpioneer: onCompleted
3.4自定義實現(xiàn)RxJava2(有背壓)基本功能
1.Flowable -> Telephoner
2.Subscriber -> Receiver
3.Subscription -> Drop
4.OnSubscribe -> OnCall
5.Emitter -> Emitter
核心代碼:
public class TelephonerCreate<T> extends Telephoner<T> {
private TelephonerOnCall<T> mTelephonerOnCall;
public TelephonerCreate(TelephonerOnCall<T> telephonerOnCall) {
mTelephonerOnCall = telephonerOnCall;
}
@Override
protected void callActual(Receiver<T> receiver) {
DropEmitter<T> tDropEmitter = new DropEmitter<>(receiver);
receiver.onCall(tDropEmitter);
mTelephonerOnCall.call(tDropEmitter);
}
private static class DropEmitter<T>
extends AtomicLong
implements TelephonerEmitter<T>, Drop {
private Receiver<T> mReceiver;
private volatile boolean mIsDropped;
private DropEmitter(Receiver<T> receiver) {
mReceiver = receiver;
}
@Override
public void onReceive(T t) {
if (get() != 0) {
mReceiver.onReceive(t);
BackpressureHelper.produced(this, 1); //消耗事件 直到為0
}
}
@Override
public void request(long n) {
BackpressureHelper.add(this, n); //n表示數(shù)據(jù)的數(shù)量 添加事件
}
@Override
public void onCompleted() {
mReceiver.onCompleted();
}
@Override
public void drop() {
mIsDropped = true;
}
@Override
public void onError(Throwable t) {
mReceiver.onError(t);
}
}
}
public final class BackpressureHelper {
public static void add(AtomicLong requested, long n) {
long r = requested.get();
if (r == Long.MAX_VALUE) {
return;
}
long u = r + n;
if (u < 0L) {
u = Long.MAX_VALUE;
}
// compareAndSet:如果當前值 == 預期值,則以原子方式將該值設(shè)置為給定的更新值盖喷。這里需要注意的是這個方法的返回值實際上是是否成功修改爆办,而與之前的值無關(guān)。
requested.compareAndSet(r, u);//把requested中的值設(shè)置為u
}
public static void produced(AtomicLong requested, long n) {
long current = requested.get();
if (current == Long.MAX_VALUE) {
return;
}
long update = current - n;
if (update < 0L) {
update = 0L;
}
requested.compareAndSet(current, update);
}
}
public interface Drop {
void request(long n);
void drop();
}
@OnClick(R.id.async)
public void onViewClicked() {
Telephoner.create(new TelephonerOnCall<String>() {
@Override
public void call(TelephonerEmitter<String> telephonerEmitter) {
telephonerEmitter.onReceive("test");
telephonerEmitter.onCompleted();
}
}).call(new Receiver<String>() {
@Override
public void onCall(Drop d) {
d.request(Long.MAX_VALUE); //注釋帶調(diào) 該句 話 就不會打印onReceive:test
Log.d("kpioneer", "onCall");
}
@Override
public void onReceive(String s) {
Log.d("kpioneer", "onReceive:" + s);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
Log.d("kpioneer", "onCompleted");
}
});
}
···
06-08 15:06:25.686 11378-11378/com.haocai.rxjavademo D/kpioneer: onCall
06-08 15:06:25.686 11378-11378/com.haocai.rxjavademo D/kpioneer: onReceive:test
06-08 15:06:25.686 11378-11378/com.haocai.rxjavademo D/kpioneer: onCompleted
···
注意:
d.request(Long.MAX_VALUE); //注釋帶調(diào) 該句 話 就不會打印onReceive:test