1.什么是RxJava?
1.1什么是響應(yīng)式編程?
是一種基于異步數(shù)據(jù)流概念的編程模式(異步數(shù)據(jù)流編程)
數(shù)據(jù)流 ->河流(被觀測鹦肿、被過濾、被操作)
1.2響應(yīng)式編程的設(shè)計原則是:
保持數(shù)據(jù)的不變性
沒有共享
阻塞是有害的
1.3在我們的Java里面提供了解決方案 - RxJava辅柴?
RxJava:Reactive Extensions Java(Java響應(yīng)式編程)
響應(yīng)式編程最初誕生.Net里面
iOS開發(fā)中也有響應(yīng)式編程(block)
// 傳統(tǒng)寫法:加載文件
// new Thread() {
// @Override
// public void run() {
// super.run();
// for (File folder : folders) {
// File[] files = folder.listFiles();
// for (File file : files) {
// if (file.getName().endsWith(".png")) {
// final Bitmap bitmap = getBitmapFromFile(file);
// // 更新UI線程
// runOnUiThread(new Runnable() {
// @Override
// public void run() {
// imageCollectorView.addImage(bitmap);
// }
// });
// }
// }
// }
// }
// }.start();
RxJava寫法
File[] folders = new File[10];
Observable.from(folders)
//便利
.flatMap(new Func1<File, Observable<File>>() {
@Override
public Observable<File> call(File file) {
return Observable.from(file.listFiles());
}
})
//過濾
.filter(new Func1<File, Boolean>() {
@Override
public Boolean call(File file) {
//條件
return file.getName().endsWith(".png");
}
})
//加載圖片
.map(new Func1<File, Bitmap>() {
@Override
public Bitmap call(File file) {
return getBitmapFromFile(file);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
//更新UI
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) {
imageCollectorView.addImage(bitmap);
}
});
文件數(shù)組
flatMap:相當于我們手動的起嵌套循環(huán)
隊列數(shù)據(jù)結(jié)構(gòu)
你會發(fā)現(xiàn)以下這個簡單的案例有哪些優(yōu)勢
第一點:你不需要考慮線程問題
第二點:你不要關(guān)心如何更新UI線程箩溃,如何調(diào)用
2.RxJava整體架構(gòu)設(shè)計?
整體架構(gòu)設(shè)計 -> 主要觀察者模式
同時里面還采用其他的設(shè)計模式 代理模式碌嘀、迭代器模式涣旨、Builder設(shè)計模式(構(gòu)建者模式)
整體RxJava框架,角色劃分:
Observable :被觀察者
Observer : 觀察者
Subscrible : 訂閱
Subjects : 科目
Observable 和 Subjects 是兩個“生產(chǎn)“實體股冗,Observer和Subscrible是兩個“消費”實體
熱Observables 和冷Observables
從發(fā)射物的角度來看霹陡,有兩種不同的Observables:熱的和冷的。一個“熱”的Observable典型的只要一創(chuàng)建完就開始發(fā)射數(shù)據(jù)止状。因此所有后續(xù)訂閱它的觀察者可能從序列中間得某個位置開始接收數(shù)據(jù)(有一些數(shù)據(jù)錯過了)烹棉。一個“冷”的Observable會一直等待,知道由觀察者訂閱它才開始發(fā)射數(shù)據(jù)怯疤,因此這個觀察者可以確保會收到整個數(shù)據(jù)序列浆洗。
熱和冷
熱:主動
場景:容器中目前只有一個觀察者,向所有的觀察者發(fā)送3條數(shù)據(jù)集峦,因為熱Observables一旦創(chuàng)建就立馬發(fā)送消息伏社,假設(shè)我現(xiàn)在發(fā)送到了第二條數(shù)據(jù),突然之后增加了一個觀察者塔淤,這個時候摘昌,第二個觀察者就收不到之前的消息。
冷:被動
場景:容器中目前只有1個觀察者高蜂,因為冷Observables一旦創(chuàng)建就會等待觀察者訂閱聪黎,一定有觀察者訂閱了,我立馬將所有的消息發(fā)送給這個觀察者(訂閱人)
3.RxJava基本API备恤?
第一個案例:如何創(chuàng)建Observables稿饰?
subscribe 相關(guān)源碼:
public final Subscription subscribe(final Observer<? super T> observer) {
if (observer instanceof Subscriber) {
return subscribe((Subscriber<? super T>)observer);
}
if (observer == null) {
throw new NullPointerException("observer is null");
}
return subscribe(new ObserverSubscriber<T>(observer));
}
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
// validate and proceed
if (subscriber == null) {
throw new IllegalArgumentException("subscriber can not be null");
}
if (observable.onSubscribe == null) {
throw new IllegalStateException("onSubscribe function can not be null.");
/*
* the subscribe function can also be overridden but generally that's not the appropriate approach
* so I won't mention that in the exception
*/
}
// new Subscriber so onStart it
subscriber.onStart();
/*
* See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
* to user code from within an Observer"
*/
// if not already wrapped
if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber<T>(subscriber);
}
// The code below is exactly the same an unsafeSubscribe but not used because it would
// add a significant depth to already huge call stacks.
try {
// allow the hook to intercept and/or decorate
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// in case the subscriber can't listen to exceptions anymore
if (subscriber.isUnsubscribed()) {
RxJavaHooks.onError(RxJavaHooks.onObservableError(e));
} else {
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(RxJavaHooks.onObservableError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
RxJavaHooks.onObservableError(r);
// TODO why aren't we throwing the hook's return value.
throw r; // NOPMD
}
}
return Subscriptions.unsubscribed();
}
}
public class SafeSubscriber<T> extends Subscriber<T> {
private final Subscriber<? super T> actual;
boolean done;
public SafeSubscriber(Subscriber<? super T> actual) {
super(actual);
this.actual = actual;
}
/**
* Notifies the Subscriber that the {@code Observable} has finished sending push-based notifications.
* <p>
* The {@code Observable} will not call this method if it calls {@link #onError}.
*/
@Override
public void onCompleted() {
if (!done) {
done = true;
try {
actual.onCompleted();
} catch (Throwable e) {
// we handle here instead of another method so we don't add stacks to the frame
// which can prevent it from being able to handle StackOverflow
Exceptions.throwIfFatal(e);
RxJavaHooks.onError(e);
throw new OnCompletedFailedException(e.getMessage(), e);
} finally { // NOPMD
try {
// Similarly to onError if failure occurs in unsubscribe then Rx contract is broken
// and we throw an UnsubscribeFailureException.
unsubscribe();
} catch (Throwable e) {
RxJavaHooks.onError(e);
throw new UnsubscribeFailedException(e.getMessage(), e);
}
}
}
}
/**
* Notifies the Subscriber that the {@code Observable} has experienced an error condition.
* <p>
* If the {@code Observable} calls this method, it will not thereafter call {@link #onNext} or
* {@link #onCompleted}.
*
* @param e
* the exception encountered by the Observable
*/
@Override
public void onError(Throwable e) {
// we handle here instead of another method so we don't add stacks to the frame
// which can prevent it from being able to handle StackOverflow
Exceptions.throwIfFatal(e);
if (!done) {
done = true;
_onError(e);
}
}
/**
* Provides the Subscriber with a new item to observe.
* <p>
* The {@code Observable} may call this method 0 or more times.
* <p>
* The {@code Observable} will not call this method again after it calls either {@link #onCompleted} or
* {@link #onError}.
*
* @param t
* the item emitted by the Observable
*/
@Override
public void onNext(T t) {
try {
if (!done) {
actual.onNext(t);
}
} catch (Throwable e) {
// we handle here instead of another method so we don't add stacks to the frame
// which can prevent it from being able to handle StackOverflow
Exceptions.throwOrReport(e, this);
}
}
/**
* The logic for {@code onError} without the {@code isFinished} check so it can be called from within
* {@code onCompleted}.
*
* @see <a >the report of this bug</a>
*/
@SuppressWarnings("deprecation")
protected void _onError(Throwable e) { // NOPMD
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
try {
actual.onError(e);
} catch (OnErrorNotImplementedException e2) { // NOPMD
/*
* onError isn't implemented so throw
*
* https://github.com/ReactiveX/RxJava/issues/198
*
* Rx Design Guidelines 5.2
*
* "when calling the Subscribe method that only has an onNext argument, the OnError behavior
* will be to rethrow the exception on the thread that the message comes out from the observable
* sequence. The OnCompleted behavior in this case is to do nothing."
*/
try {
unsubscribe();
} catch (Throwable unsubscribeException) {
RxJavaHooks.onError(unsubscribeException);
throw new OnErrorNotImplementedException("Observer.onError not implemented and error while unsubscribing.", new CompositeException(Arrays.asList(e, unsubscribeException))); // NOPMD
}
throw e2;
} catch (Throwable e2) {
/*
* throw since the Rx contract is broken if onError failed
*
* https://github.com/ReactiveX/RxJava/issues/198
*/
RxJavaHooks.onError(e2);
try {
unsubscribe();
} catch (Throwable unsubscribeException) {
RxJavaHooks.onError(unsubscribeException);
throw new OnErrorFailedException("Error occurred when trying to propagate error to Observer.onError and during unsubscription.", new CompositeException(Arrays.asList(e, e2, unsubscribeException)));
}
throw new OnErrorFailedException("Error occurred when trying to propagate error to Observer.onError", new CompositeException(Arrays.asList(e, e2)));
}
// if we did not throw above we will unsubscribe here, if onError failed then unsubscribe happens in the catch
try {
unsubscribe();
} catch (Throwable unsubscribeException) {
RxJavaHooks.onError(unsubscribeException);
throw new OnErrorFailedException(unsubscribeException);
}
}
/**
* Returns the {@link Subscriber} underlying this {@code SafeSubscriber}.
*
* @return the {@link Subscriber} that was used to create this {@code SafeSubscriber}
*/
public Subscriber<? super T> getActual() {
return actual;
}
}
subscriber 實際上就是Observer
RxJava基本使用 源碼分析
Observable創(chuàng)建原理分析:
第一步:調(diào)用Observable.create()方法
第二步:添加觀察者訂閱監(jiān)聽Observable.OnSubscrible
第三步:在Observable.create方法中創(chuàng)建被觀察者new Observable<T>(hook.onCreate(f))锦秒;
第四步:在Observable類構(gòu)造方法中保存了觀察者訂閱監(jiān)聽
訂閱觀察者原理分析:
第一步:注冊觀察者監(jiān)聽observable.subscribe(new Observer<String>())
第二步:在Observable類中調(diào)用了
public final Subscription subscribe(final Observer<? super T> observer) {
if (observer instanceof Subscriber) {
return subscribe((Subscriber<? super T>)observer);
}
if (observer == null) {
throw new NullPointerException("observer is null");
}
return subscribe(new ObserverSubscriber<T>(observer));
} 方法中注冊觀察者
第三步:在Observable類中調(diào)用了
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}方法
第四步:調(diào)用了Observable.subscribe(subscriber, this);方法
第五步:在 Observable.subscribe方法中調(diào)用了監(jiān)聽觀察者訂閱的回調(diào)接口
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
private Observable<String> observableString;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_simple2);
// 創(chuàng)建一個被觀察者
// 配置回調(diào)接口---OnSubscribe
// 為什么要配置?
// 監(jiān)聽觀察者訂閱湘纵,一旦有觀察者訂閱了脂崔,立馬回調(diào)改接口
observableString = Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> observer) {
Log.i("main", "回到了");
//訪問請求
// 所以在這個方法里面我們可以干一些事情
// 進行數(shù)據(jù)通信(說白了就是通知觀察者)
for (int i = 0; i < 5; i++) {
observer.onNext("第" + i + "個數(shù)據(jù)");
}
//訪問完成
// 當我們的數(shù)據(jù)傳遞完成
observer.onCompleted();
}
});
}
public void click(View v) {
// 觀察者訂閱
// 回調(diào)原理:
// 核心代碼:
// hook.onSubscribeStart(observable,
// observable.onSubscribe).call(subscriber);
observableString.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
Log.i("main", "---onCompleted---");
}
@Override
public void onError(Throwable e) {
System.out.println("Oh,no! Something wrong happened滤淳!");
}
@Override
public void onNext(String item) {
// 接受數(shù)據(jù)
Log.i("main", "觀察者接收到了數(shù)據(jù): " + item);
}
});
}
結(jié)果輸出
08-02 09:53:45.057 16613-16613/com.haocai.architect.rxjava I/main: 回到了
08-02 09:53:45.057 16613-16613/com.haocai.architect.rxjava I/main: 觀察者接收到了數(shù)據(jù): 第0個數(shù)據(jù)
08-02 09:53:45.057 16613-16613/com.haocai.architect.rxjava I/main: 觀察者接收到了數(shù)據(jù): 第1個數(shù)據(jù)
08-02 09:53:45.057 16613-16613/com.haocai.architect.rxjava I/main: 觀察者接收到了數(shù)據(jù): 第2個數(shù)據(jù)
08-02 09:53:45.057 16613-16613/com.haocai.architect.rxjava I/main: 觀察者接收到了數(shù)據(jù): 第3個數(shù)據(jù)
08-02 09:53:45.057 16613-16613/com.haocai.architect.rxjava I/main: 觀察者接收到了數(shù)據(jù): 第4個數(shù)據(jù)
08-02 09:53:45.057 16613-16613/com.haocai.architect.rxjava I/main: ---onCompleted---
observableString.subscribe 中 RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber); 調(diào)用call方法
另一種方式自動發(fā)送
private Observable<String> observableString;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_simple2);
List<String> items = new ArrayList<String>();
items.add("Kpioneer");
items.add("Xpioneer");
items.add("haocai");
items.add("Huhu");
// 框架本身提供了這樣的API
// from: 一旦當你有觀察者注冊梧喷,立馬發(fā)送消息序列
// 框架內(nèi)部實現(xiàn)
// 框架內(nèi)部調(diào)用create方法
// 迭代器模式
// OnSubscribeFromIterable類專門用于遍歷集合
// OnSubscribeFromArray類專門用于遍歷數(shù)組
observableString = Observable.from(items);
}
public void click(View v) {
observableString.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
Log.i("main", "---onCompleted---");
}
@Override
public void onError(Throwable e) {
System.out.println("Oh,no! Something wrong happened!");
}
@Override
public void onNext(String item) {
// 接受數(shù)據(jù)
Log.i("main", "觀察者接收到了數(shù)據(jù): " + item);
}
});
}
結(jié)果輸出
08-02 14:38:14.517 32289-32289/com.haocai.architect.rxjava I/main: 觀察者接收到了數(shù)據(jù): Kpioneer
08-02 14:38:14.517 32289-32289/com.haocai.architect.rxjava I/main: 觀察者接收到了數(shù)據(jù): Xpioneer
08-02 14:38:14.517 32289-32289/com.haocai.architect.rxjava I/main: 觀察者接收到了數(shù)據(jù): haocai
08-02 14:38:14.517 32289-32289/com.haocai.architect.rxjava I/main: 觀察者接收到了數(shù)據(jù): Huhu
08-02 14:38:14.517 32289-32289/com.haocai.architect.rxjava I/main: ---onCompleted---
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.internal.operators;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicLong;
import rx.*;
import rx.Observable.OnSubscribe;
import rx.exceptions.Exceptions;
/**
* Converts an {@code Iterable} sequence into an {@code Observable}.
* <p>
* 
* <p>
* You can convert any object that supports the Iterable interface into an Observable that emits each item in
* the object, with the {@code toObservable} operation.
* @param <T> the value type of the items
*/
public final class OnSubscribeFromIterable<T> implements OnSubscribe<T> {
final Iterable<? extends T> is;
public OnSubscribeFromIterable(Iterable<? extends T> iterable) {
if (iterable == null) {
throw new NullPointerException("iterable must not be null");
}
this.is = iterable;
}
@Override
public void call(final Subscriber<? super T> o) {
Iterator<? extends T> it;
boolean b;
try {
it = is.iterator();
b = it.hasNext();
} catch (Throwable ex) {
Exceptions.throwOrReport(ex, o);
return;
}
if (!o.isUnsubscribed()) {
if (!b) {
o.onCompleted();
} else {
o.setProducer(new IterableProducer<T>(o, it));
}
}
}
static final class IterableProducer<T> extends AtomicLong implements Producer {
/** */
private static final long serialVersionUID = -8730475647105475802L;
// 具體的觀察者
private final Subscriber<? super T> o;
// 具體的數(shù)據(jù)
private final Iterator<? extends T> it;
IterableProducer(Subscriber<? super T> o, Iterator<? extends T> it) {
this.o = o;
this.it = it;
}
@Override
public void request(long n) {
if (get() == Long.MAX_VALUE) {
// already started with fast-path
return;
}
if (n == Long.MAX_VALUE && compareAndSet(0, Long.MAX_VALUE)) {
fastPath();
} else
if (n > 0 && BackpressureUtils.getAndAddRequest(this, n) == 0L) {
slowPath(n);
}
}
void slowPath(long n) {
// backpressure is requested
final Subscriber<? super T> o = this.o;
final Iterator<? extends T> it = this.it;
long r = n;
long e = 0;
for (;;) {
while (e != r) {
if (o.isUnsubscribed()) {
return;
}
T value;
try {
value = it.next();
} catch (Throwable ex) {
Exceptions.throwOrReport(ex, o);
return;
}
o.onNext(value);
if (o.isUnsubscribed()) {
return;
}
boolean b;
try {
b = it.hasNext();
} catch (Throwable ex) {
Exceptions.throwOrReport(ex, o);
return;
}
if (!b) {
if (!o.isUnsubscribed()) {
o.onCompleted();
}
return;
}
e++;
}
r = get();
if (e == r) {
r = BackpressureUtils.produced(this, e);
if (r == 0L) {
break;
}
e = 0L;
}
}
}
void fastPath() {
// fast-path without backpressure
final Subscriber<? super T> o = this.o;
final Iterator<? extends T> it = this.it;
for (;;) {
if (o.isUnsubscribed()) {
return;
}
T value;
try {
value = it.next();
} catch (Throwable ex) {
Exceptions.throwOrReport(ex, o);
return;
}
o.onNext(value);
if (o.isUnsubscribed()) {
return;
}
boolean b;
try {
b = it.hasNext();
} catch (Throwable ex) {
Exceptions.throwOrReport(ex, o);
return;
}
if (!b) {
if (!o.isUnsubscribed()) {
o.onCompleted();
}
return;
}
}
}
}
}