RxJava入門

RxJava Github地址:https://github.com/ReactiveX/RxJava

首先引入庫文件(當前最新版本為2.1.4)

dependencies {
    ...
    compile 'io.reactivex.rxjava2:rxjava:2.1.4'
}

先來一個簡單例子

例1

        //創(chuàng)建被觀察者
        Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("1");
                subscriber.onNext("2");
                subscriber.onNext("3");
                subscriber.onCompleted();
            }
        });

        //創(chuàng)建觀察者
        Observer observer = new Observer<String>() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println(e);
            }

            @Override
            public void onNext(String s) {
                System.out.println(s);
            }
        };
        
        //訂閱
        observable.subscribe(observer);

 結果:
1
2
3
onCompleted
(1)我們先來看看第一步掌挚,創(chuàng)建被觀察者:

Observable.create方法(2.0后已經(jīng)Deprecated)

    @Deprecated
    public static <T> Observable<T> create(OnSubscribe<T> f) {
        return new Observable<T>(RxJavaHooks.onCreate(f));
    }

OnSubscribe是接口享甸, 繼承了Action1
注意這里的參數(shù)從轉換為了Subscriber<? super T>

    public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {

    }

Action1接口,只有一個call方法 有一個參數(shù)知举,無返回值

public interface Action1<T> extends Action {
    void call(T t);
}

這就與以上例1中的創(chuàng)建觀察者相對應了补履, Observable.create()傳入泛型為String的onSubscribe, 并實現(xiàn)接口call(T t),這里的T也變?yōu)榱薙ubscriber<? super String>類型

Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
            }
});
(2)我們再來看看Observer
public interface Observer<T> {

    //通知觀察者,被觀察者完成了基于推送的通知添坊。onCompleted()與onError()設計上只會有一個被調用
    void onCompleted();

    //通知觀察者,出錯了箫锤,一但出錯贬蛙,被觀察者將不再繼續(xù)調用接下來的步驟,如onNext, onCompleted
    void onError(Throwable e);

    //向觀察者提供新的觀察事件
    void onNext(T t);
}
(3)最后實現(xiàn)訂閱關系谚攒,以原代碼角度來分析阳准,如何將Observable與Observer相關聯(lián)

Observable.create()時,新建了一個Observable實例

    @Deprecated
    public static <T> Observable<T> create(OnSubscribe<T> f) {
        return new Observable<T>(RxJavaHooks.onCreate(f));
    }

構造方法中只是將onSubscribe對象保存起來

    protected Observable(OnSubscribe<T> f) {
        this.onSubscribe = f;
    }

我們再來看看Observable.subscribe()方法馏臭, 這里判斷observer是不是Subscriber的實例野蝇,不是的話通過新建ObserverSubscriber類來轉換成為Subscriber,所以最終都是調用subscribe(Subscriber)

Subscriber 與Observer類似括儒, 實現(xiàn)了Observer, 同時也實現(xiàn)了Subscription, 多了兩個方法:unsubscribe(), isUnsubscribed()

    public final Subscription subscribe(final Observer<? super T> observer) {
        if (observer instanceof Subscriber) {
            return subscribe((Subscriber<? super T>)observer);
        }
        if (observer == null) {
            throw new NullPointerException("observer is null");
        }
        return subscribe(new ObserverSubscriber<T>(observer));
    }


    public final class ObserverSubscriber<T> extends Subscriber<T> {
        final Observer<? super T> observer;
        public ObserverSubscriber(Observer<? super T> observer) {
            this.observer = observer;
        }
       @Override
        public void onNext(T t) {
            observer.onNext(t);
        }
        @Override
        public void onError(Throwable e) {
            observer.onError(e);
        }
        @Override
        public void onCompleted() {
            observer.onCompleted();
        }
   }

最后調用 了Observable.subscribe(subscriber, this)绕沈,注意這里關鍵的this

    public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
    }
    static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
        //......

        // Subscriber類中的onStart()為空實現(xiàn), 在Observer所有方法之前被調用帮寻。
        subscriber.onStart();

        //......
        try {
            //這里的observable實際就是當前對象,在這里調用了當前對象中onSubscribe的call.并將例一中的observer 以Subscriber封裝后的實例作為call的參數(shù)返回乍狐。
            RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
            return RxJavaHooks.onObservableReturn(subscriber);
        } catch (Throwable e) {
            //......
            subscriber.onError(RxJavaHooks.onObservableError(e));
            //最后返回未取消訂閱的Subscription對象
            return Subscriptions.unsubscribed();
        }
    }

簡單來看一看RxJavaHooks中的onObservableStart方法

    public static <T> Observable.OnSubscribe<T> onObservableStart(Observable<T> instance, Observable.OnSubscribe<T> onSubscribe) {
        Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableStart;
        if (f != null) {
            //經(jīng)過一系列調用, 最終還是返回的onSubscribe本身固逗,參考下面的分析
            return f.call(instance, onSubscribe);
        }
        //如果onObservableStart為null便將傳進來的onSubscribe原封返回
        return onSubscribe;
    }

來簡單分析一下f.call();

static void init() {
        //......
        onObservableStart = new Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe>() {
            @Override
            public Observable.OnSubscribe call(Observable t1, Observable.OnSubscribe t2) {
                return RxJavaPlugins.getInstance().getObservableExecutionHook().onSubscribeStart(t1, t2);
            }
        };
        //......
}

    @Deprecated
    public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) {
        return onSubscribe;//將傳進來的onSubscribe原封返回
    }

至此浅蚪,例1所有代碼便分析完成。

例1步驟可以簡化成以下

        Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("1");
                subscriber.onNext("2");
                subscriber.onNext("3");
                subscriber.onCompleted();
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted");
            }                            
            @Override
            public void onError(Throwable e) {
                System.out.println(e);
            }
            @Override
            public void onNext(String s) {
                System.out.println(s);
            }
        });

或lambda表達式:

        Observable.create((Observable.OnSubscribe<String>) subscriber -> {
            subscriber.onNext("1");
            subscriber.onNext("2");
            subscriber.onNext("3");
            subscriber.onCompleted();
        }).subscribe(new Observer<String>() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted");
            }
            @Override
            public void onError(Throwable e) {
                System.out.println(e);
            }
            @Override
            public void onNext(String s) {
                System.out.println(s);
            }
        });

操作符

這和java8中的Stream很相似抒蚜,可參考我的另一篇文章《函數(shù)式編程 Lambda及Stream》

下面舉幾個常用的例子

from

Paste_Image.png

接收數(shù)組掘鄙,實現(xiàn)Iterable的子類對象耘戚,及Future對象

        String[] array = new String[]{"a", "b", "c"};
        Observable.from(Arrays.asList(array))
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        System.out.println("onCompleted");
                    }
                    @Override
                    public void onError(Throwable e) {
                    }
                    @Override
                    public void onNext(String s) {
                        System.out.println(s);
                    }
                });
結果:
a
b
c
onCompleted

just

看原碼也是調用的 from((T[])new Object[] { })

Paste_Image.png
        Observable.just("1", "2", "3")
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        System.out.println("onCompleted");
                    }
                    @Override
                    public void onError(Throwable e) {
                    }
                    @Override
                    public void onNext(String s) {
                        System.out.println(s);
                    }
                });
結果:
1
2
3
onCompleted

defer 延遲

  • 即用普通的創(chuàng)建Observable方法
    private static String string = "0";
    public static void main(String[] args) {
        Observable observable = Observable.just(string);
        string = "1";
        observable.subscribe(System.out::println);
    }

結果:0
  • 使用defer創(chuàng)建Observable
    private static String string = "0";
    public static void main(String[] args) {
        Observable observable = Observable.defer(() -> Observable.just(string));
        string = "1";
        observable.subscribe(System.out::println);
    }
結果:1

封裝時可先使用defer,后面調用時便會根據(jù)不同的參數(shù)創(chuàng)建相應的新的Observable

map 變換

接收一個Func1類型的參數(shù)(一個參數(shù)嗡髓,一個返回值 )

Paste_Image.png
        String[] array = new String[]{"a", "b", "c"};
        Observable.from(array)
                .map(String::toUpperCase)
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        System.out.println("onCompleted");
                    }
                    @Override
                    public void onError(Throwable e) {
                    }
                    @Override
                    public void onNext(String s) {
                        System.out.println(s);
                    }
                });
結果:
A
B
C
onCompleted

flatMap變換

以原有Observable為基礎,將發(fā)射的每一條數(shù)據(jù)再分成若干個Observable,最后合并收津,這就是flat的意思饿这,壓平
注意這里與map不同的是,返回值是一個Observable對象撞秋。


        String[] array = new String[]{"aa", "bbb", "cc"};
        Observable.from(array)
                .map(String::toUpperCase)
                .flatMap(s -> {
                    List<String> singleLetter = new ArrayList<String>();
                    for (int i = 0; i < s.length(); i++) {
                        singleLetter.add(String.valueOf(s.charAt(i)));
                    }
                    return Observable.from(singleLetter);
                })
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        System.out.println("onCompleted");
                    }
                    @Override
                    public void onError(Throwable e) {
                    }
                    @Override
                    public void onNext(String s) {
                        System.out.println(s);
                    }
                });
結果:
A
A
B
B
B
C
C
onCompleted

scan 累加器

由初始值與累加器構成

Paste_Image.png
  • 接收一個Func2<T, T, T> 類型參數(shù)
        String[] array = new String[]{"aa", "bbb", "cc"};
        Observable.from(array)
                .map(String::toUpperCase)
                .flatMap(s -> {
                    List<String> singleLetter = new ArrayList<String>();
                    for (int i = 0; i < s.length(); i++) {
                        singleLetter.add(String.valueOf(s.charAt(i)));
                    }
                    return Observable.from(singleLetter);
                })
                .scan(((s1, s2) -> s1 + s2)) 
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        System.out.println("onCompleted");
                    }

                    @Override
                    public void onError(Throwable e) {
                    }

                    @Override
                    public void onNext(String s) {
                        System.out.println(s);
                    }
                });
結果:
A
AA
AAB
AABB
AABBB
AABBBC
AABBBCC
onCompleted
在未賦初值的情況下长捧,是不調用該方法的(看下圖,scan方法被調用前吻贿,Console面板中已經(jīng)顯示出發(fā)射的第一條數(shù)據(jù)串结。)
發(fā)射的第一條數(shù)據(jù)會作為初始值,發(fā)射第二條數(shù)據(jù)時調用該方法并將初始值傳給s1, 將第二條數(shù)據(jù)傳給s2
并將結果作為下一條數(shù)據(jù)的初始值。
Paste_Image.png
  • 接收(R initialValue, Func2<R, ? super T, R> accumulator) 兩個參數(shù)
        String[] array = new String[]{"aa", "bbb", "cc"};
        Observable.from(array)
                .map(String::toUpperCase)
                .flatMap(s -> {
                    List<String> singleLetter = new ArrayList<String>();
                    for (int i = 0; i < s.length(); i++) {
                        singleLetter.add(String.valueOf(s.charAt(i)));
                    }
                    return Observable.from(singleLetter);
                })
                .scan("-", ((s1, s2) -> s1 + s2))
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        System.out.println("onCompleted");
                    }

                    @Override
                    public void onError(Throwable e) {
                    }

                    @Override
                    public void onNext(String s) {
                        System.out.println(s);
                    }
                });


結果:
- //在scan被第一次調用前肌割,已經(jīng)被打印出來
-A
-AA
-AAB
-AABB
-AABBB
-AABBBC
-AABBBCC
onCompleted

filter 過濾出符合條件的

參數(shù)為Func1<? super T, Boolean>卧蜓, 返回布爾值

        Observable.from(array)
                .filter(s -> s.length()<=2)
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        System.out.println("onCompleted");
                    }

                    @Override
                    public void onError(Throwable e) {
                    }

                    @Override
                    public void onNext(String s) {
                        System.out.println(s);
                    }
                });
結果:
aa
cc
onCompleted

take

        String[] array = new String[]{"aa", "bbb", "cc"};
        Observable.from(array)
                .map(String::toUpperCase)
                .flatMap(s -> {
                    List<String> singleLetter = new ArrayList<String>();
                    for (int i = 0; i < s.length(); i++) {
                        singleLetter.add(String.valueOf(s.charAt(i)));
                    }
                    return Observable.from(singleLetter);
                })
                .take(5)//取出5個數(shù)據(jù)
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        System.out.println("onCompleted");
                    }
                    @Override
                    public void onError(Throwable e) {
                    }
                    @Override
                    public void onNext(String s) {
                        System.out.println(s);
                    }
                });
結果:
A
A
B
B
B
onCompleted

takeFirst

        String[] array = new String[]{"aa", "bbb", "cccc", "dddd"};
        Observable.from(array)
                .map(String::toUpperCase)
                .takeFirst(s -> s.length()==4)
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        System.out.println("onCompleted");
                    }
                    @Override
                    public void onError(Throwable e) {
                    }
                    @Override
                    public void onNext(String s) {
                        System.out.println(s);
                    }
                });
結果:
CCCC
onCompleted

takeLast

        Observable.from(array)
                .map(String::toUpperCase)
                .flatMap(s -> {
                    List<String> singleLetter = new ArrayList<String>();
                    for (int i = 0; i < s.length(); i++) {
                        singleLetter.add(String.valueOf(s.charAt(i)));
                    }
                    return Observable.from(singleLetter);
                })
                .takeLast(2) // 取出最后2個數(shù)據(jù)
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        System.out.println("onCompleted");
                    }
                    @Override
                    public void onError(Throwable e) {
                    }
                    @Override
                    public void onNext(String s) {
                        System.out.println(s);
                    }
                });
結果:
C
C
onCompleted

take與takeLast以時間為限制取數(shù)據(jù)

public final Observable<T> take(long time, TimeUnit unit) //只取time時間內發(fā)射的數(shù)據(jù)
public final Observable<T> takeLast(long time, TimeUnit unit) //只取time時間結束前的數(shù)據(jù)

first

  • public final Observable<T> first() {
        String[] array = new String[]{"aa", "bbb", "cccc", "dddd"};
        Observable.from(array)
                .map(String::toUpperCase)
                .first()
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        System.out.println("onCompleted");
                    }
                    @Override
                    public void onError(Throwable e) {
                    }
                    @Override
                    public void onNext(String s) {
                        System.out.println(s);
                    }
                });
結果:
AA
onCompleted
  • public final Observable<T> first(Func1<? super T, Boolean> predicate) {
        String[] array = new String[]{"aa", "bbb", "cccc", "dddd"};
        Observable.from(array)
                .map(String::toUpperCase)
                .first(s -> s.length()==4)
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        System.out.println("onCompleted");
                    }
                    @Override
                    public void onError(Throwable e) {
                    }
                    @Override
                    public void onNext(String s) {
                        System.out.println(s);
                    }
                });
結果:
CCCC
onCompleted
  • public final Observable<T> firstOrDefault(T defaultValue) {
        String[] array = new String[]{};
        Observable.from(array)
                .map(String::toUpperCase)
                .firstOrDefault("null")
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        System.out.println("onCompleted");
                    }

                    @Override
                    public void onError(Throwable e) {
                    }

                    @Override
                    public void onNext(String s) {
                        System.out.println(s);
                    }
                });
結果:
null
onCompleted
  • public final Observable<T> firstOrDefault(T defaultValue, Func1<? super T, Boolean> predicate) {
        String[] array = new String[]{"aa", "bbb", "cccc", "dddd"};
        Observable.from(array)
                .map(String::toUpperCase)
                .firstOrDefault("null", s -> s.equals("fuck"))
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        System.out.println("onCompleted");
                    }

                    @Override
                    public void onError(Throwable e) {
                    }

                    @Override
                    public void onNext(String s) {
                        System.out.println(s);
                    }
                });
結果:
null
onCompleted

skip

        String[] array = new String[]{"aa", "bbb", "cccc", "dddd"};
        Observable.from(array)
                .map(String::toUpperCase)
                .skip(2)
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        System.out.println("onCompleted");
                    }

                    @Override
                    public void onError(Throwable e) {
                    }

                    @Override
                    public void onNext(String s) {
                        System.out.println(s);
                    }
                });
結果:
CCCC
DDDD
onCompleted

skipLast

        String[] array = new String[]{"aa", "bbb", "cccc", "dddd"};
        Observable.from(array)
                .map(String::toUpperCase)
                .skipLast(2)
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        System.out.println("onCompleted");
                    }
                    @Override
                    public void onError(Throwable e) {
                    }
                    @Override
                    public void onNext(String s) {
                        System.out.println(s);
                    }
                });
結果:
AA
BBB
onCompleted

skipWhile

        String[] array = new String[]{"aa", "bbb", "cccc", "dddd"};
        Observable.from(array)
                .map(String::toUpperCase)
                .skipWhile(s -> s.length()<4)
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        System.out.println("onCompleted");
                    }
                    @Override
                    public void onError(Throwable e) {
                    }
                    @Override
                    public void onNext(String s) {
                        System.out.println(s);
                    }
                });
結果: 
CCCC
DDDD
onCompleted

distinct 去重

        String[] array = new String[]{"aa", "aa", "cccc", "dddd"};
        Observable.from(array)
                .map(String::toUpperCase)
                .distinct()
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        System.out.println("onCompleted");
                    }
                    @Override
                    public void onError(Throwable e) {
                    }
                    @Override
                    public void onNext(String s) {
                        System.out.println(s);
                    }
                });
結果:
AA
CCCC
DDDD
onCompleted

elementAt

        String[] array = new String[]{"a", "bb", "ccc", "dddd"};
        Observable.from(array)
                .map(String::toUpperCase)
                .elementAt(2)
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        System.out.println("onCompleted");
                    }
                    @Override
                    public void onError(Throwable e) {
                    }
                    @Override
                    public void onNext(String s) {
                        System.out.println(s);
                    }
                });
結果:
CCC
onCompleted

merge 合并數(shù)據(jù)源

        Observable<String> justA = Observable.just("a1", "a2", "a3");
        Observable<String> justB = Observable.just("b1", "b2", "b3");
        Observable.merge(justB,justA)
                .map(String::toUpperCase)
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        System.out.println("onCompleted");
                    }
                    @Override
                    public void onError(Throwable e) {
                    }
                    @Override
                    public void onNext(String s) {
                        System.out.println(s);
                    }
                });
結果:
B1
B2
B3
A1
A2
A3
onCompleted


zip 合并數(shù)據(jù)

        Observable<String> justA = Observable.just("a1", "a2", "a3");
        Observable<String> justB = Observable.just("b1", "b2", "b3", "b4");
        Observable.zip(justA,justB,(s1, s2) -> s1+s2)
                .map(String::toUpperCase)
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        System.out.println("onCompleted");
                    }
                    @Override
                    public void onError(Throwable e) {
                    }
                    @Override
                    public void onNext(String s) {
                        System.out.println(s);
                    }
                });
結果:
A1B1
A2B2
A3B3
onCompleted

startWith

        Observable<String> justA = Observable.just("a1", "a2", "a3");
        Observable<String> justB = Observable.just("b1", "b2", "b3", "b4");
        Observable.zip(justA,justB,(s1, s2) -> s1+s2)
                .map(String::toUpperCase)
                .startWith("fuck")
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        System.out.println("onCompleted");
                    }
                    @Override
                    public void onError(Throwable e) {
                    }
                    @Override
                    public void onNext(String s) {
                        System.out.println(s);
                    }
                });
結果:
fuck
A1B1
A2B2
A3B3
onCompleted

empty

創(chuàng)建一個不發(fā)射任何數(shù)據(jù)但是正常終止的Observable

        Observable.empty().subscribe(new Subscriber<Object>() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println(e);
            }

            @Override
            public void onNext(Object o) {
                System.out.println(o);
            }
        });
結果:
onCompleted

簡單分析一下原碼:

public static <T> Observable<T> empty() {
    return EmptyObservableHolder.instance();
}

public enum EmptyObservableHolder implements OnSubscribe<Object> {
    INSTANCE
    ;

    static final Observable<Object> EMPTY = Observable.unsafeCreate(INSTANCE);

    public static <T> Observable<T> instance() {
        return (Observable<T>)EMPTY;
    }

    @Override
    public void call(Subscriber<? super Object> child) {
        child.onCompleted();//只調用了onCompleted()方法
    }
}

never

        Observable.never().subscribe(new Subscriber<Object>() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println(e);
            }

            @Override
            public void onNext(Object o) {
                System.out.println(o);
            }
        });
結果什么都沒有
原碼分析:
public static <T> Observable<T> never() {
    return NeverObservableHolder.instance();
}

public enum NeverObservableHolder implements OnSubscribe<Object> {
    INSTANCE
    ;

    static final Observable<Object> NEVER = Observable.unsafeCreate(INSTANCE);

    @SuppressWarnings("unchecked")
    public static <T> Observable<T> instance() {
        return (Observable<T>)NEVER;
    }

    @Override
    public void call(Subscriber<? super Object> child) {
        // deliberately no op //什么也沒做
    }
}

range

        Observable.range(10, 4).subscribe(System.out::println);
結果:
10
11
12
13

interval

        Observable.interval(1, TimeUnit.SECONDS, Schedulers.trampoline()).subscribe(System.out::println);
結果:從0開始每一秒按順序輸出
注意,這里需要設置Schedulers.trampoline(),在當前線程將任務插入隊列中把敞。

timer

        Observable.timer(3, TimeUnit.SECONDS, Schedulers.trampoline()).subscribe(System.out::println);
結果:三秒后輸出0后結束

delay

        String[] arrays = new String[]{"1", "2", "3", "4"};
        Observable.from(arrays).delay(1, TimeUnit.SECONDS, Schedulers.trampoline()).subscribe(System.out::println);
結果:運行后弥奸,一秒后輸出1, 每一少輸出下一個奋早,輸出4再一秒后結束

contains

        String[] arrays = new String[]{"1", "2", "3", "4"};
        Observable.from(arrays).contains("1").subscribe(System.out::println);
結果:true

all

        Integer[] ints = new Integer[]{1, 2, 3, 4};
        Observable.from(ints).all(integer -> integer < 3).subscribe(System.out::println);
結果:false

isEmpty

        String[] arrays = new String[]{};
        Observable.from(arrays).isEmpty().subscribe(System.out::println);
結果:true

exists

        Integer[] ints = new Integer[]{1, 2, 3, 4};
        Observable.from(ints).exists(integer -> integer > 3).subscribe(System.out::println);
結果:true

count

        Integer[] ints = new Integer[]{1, 2, 3, 4, 1};
        Observable.from(ints).count().subscribe(System.out::println);
結果:5

reduce 與scan相似盛霎,不同的是直接輸出結果。

        Integer[] ints = new Integer[]{1, 2, 3, 4};
        Observable.from(ints).reduce((integer, integer2) -> integer + integer2).subscribe(System.out::println);
        Observable.from(ints).reduce(10, (integer, integer2) -> integer + integer2).subscribe(System.out::println);
結果:
10
20

collect

與reduce相似, 但collect是用來將源Observable發(fā)射的數(shù)據(jù)給收集到一個數(shù)據(jù)結構里面

        Integer[] ints = new Integer[]{1, 2, 3, 4};
        Observable.from(ints)
//                .collect((Func0<ArrayList<Integer>>) ArrayList::new, (list, integer) -> list.add(integer))
                .collect((Func0<ArrayList<Integer>>) ArrayList::new, ArrayList::add)
                .subscribe(System.out::println);

buffer

Paste_Image.png
        Observable.interval(1, TimeUnit.SECONDS, Schedulers.trampoline())
                .buffer(3)
                .subscribe(System.out::println);

結果:每三個放入緩存再輸出
[0, 1, 2]
[3, 4, 5]
[6, 7, 8]
[9, 10, 11]
[12, 13, 14]
[15, 16, 17]
耽装。愤炸。。
        Observable.interval(1, TimeUnit.SECONDS, Schedulers.trampoline())
                .buffer(3, 2) //(多少為一組剂邮, 生成新的buffer的間隔)
                .subscribe(System.out::println);
結果:
[0, 1, 2]
[2, 3, 4]
[4, 5, 6]
[6, 7, 8]
[8, 9, 10]
[10, 11, 12]
摇幻。。挥萌。

RxJava1.0 與RxJava2.0區(qū)別:

  1. Rxjava1.0資源庫都在rx包下,而RxJava2.0都在io.reactivex包下
import rx.Observable;
import rx.Subscriber;
---------------------
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.annotations.NonNull;

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末绰姻,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子引瀑,更是在濱河造成了極大的恐慌狂芋,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,589評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件憨栽,死亡現(xiàn)場離奇詭異帜矾,居然都是意外死亡,警方通過查閱死者的電腦和手機屑柔,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,615評論 3 396
  • 文/潘曉璐 我一進店門屡萤,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人掸宛,你說我怎么就攤上這事死陆。” “怎么了唧瘾?”我有些...
    開封第一講書人閱讀 165,933評論 0 356
  • 文/不壞的土叔 我叫張陵措译,是天一觀的道長。 經(jīng)常有香客問我饰序,道長领虹,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,976評論 1 295
  • 正文 為了忘掉前任求豫,我火速辦了婚禮塌衰,結果婚禮上诉稍,老公的妹妹穿的比我還像新娘。我一直安慰自己最疆,他們只是感情好均唉,可當我...
    茶點故事閱讀 67,999評論 6 393
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著肚菠,像睡著了一般舔箭。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上蚊逢,一...
    開封第一講書人閱讀 51,775評論 1 307
  • 那天层扶,我揣著相機與錄音,去河邊找鬼烙荷。 笑死镜会,一個胖子當著我的面吹牛,可吹牛的內容都是我干的终抽。 我是一名探鬼主播戳表,決...
    沈念sama閱讀 40,474評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼昼伴!你這毒婦竟也來了匾旭?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 39,359評論 0 276
  • 序言:老撾萬榮一對情侶失蹤圃郊,失蹤者是張志新(化名)和其女友劉穎价涝,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體持舆,經(jīng)...
    沈念sama閱讀 45,854評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡色瘩,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 38,007評論 3 338
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了逸寓。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片居兆。...
    茶點故事閱讀 40,146評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖竹伸,靈堂內的尸體忽然破棺而出泥栖,到底是詐尸還是另有隱情,我是刑警寧澤佩伤,帶...
    沈念sama閱讀 35,826評論 5 346
  • 正文 年R本政府宣布聊倔,位于F島的核電站晦毙,受9級特大地震影響生巡,放射性物質發(fā)生泄漏。R本人自食惡果不足惜见妒,卻給世界環(huán)境...
    茶點故事閱讀 41,484評論 3 331
  • 文/蒙蒙 一孤荣、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧,春花似錦盐股、人聲如沸钱豁。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,029評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽牲尺。三九已至,卻和暖如春幌蚊,著一層夾襖步出監(jiān)牢的瞬間谤碳,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,153評論 1 272
  • 我被黑心中介騙來泰國打工溢豆, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留蜒简,地道東北人。 一個月前我還...
    沈念sama閱讀 48,420評論 3 373
  • 正文 我出身青樓漩仙,卻偏偏與公主長得像搓茬,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子队他,可洞房花燭夜當晚...
    茶點故事閱讀 45,107評論 2 356

推薦閱讀更多精彩內容