Operator differences
Most operators are still there in 2.x and practically all of them have the same behavior as they had in 1.x. The following subsections list each base reactive type and the difference between 1.x and 2.x.
Generally, many operators gained overloads that now allow specifying the internal buffer size or prefetch amount they should run their upstream (or inner sources).
Some operator overloads have been renamed with a postfix, such as fromArray, fromIterable etc. The reason for this is that when the library is compiled with Java 8, the javac often can't disambiguate between functional interface types.
Operators marked as @Beta or @Experimental in 1.x are promoted to standard.
1.x Observable to 2.x Flowable
Factory methods:
1.x | 2.x |
---|---|
amb | added amb(ObservableSource...) overload, 2-9 argument versions dropped |
RxRingBuffer.SIZE | bufferSize() |
combineLatest | added varargs overload, added overloads with bufferSize argument, combineLatest(List) dropped |
concat | added overload with prefetch argument, 5-9 source overloads dropped, use concatArray instead |
N/A | added concatArray and concatArrayDelayError |
N/A | added concatArrayEager and concatArrayEagerDelayError |
concatDelayError | added overloads with option to delay till the current ends or till the very end |
concatEagerDelayError | added overloads with option to delay till the current ends or till the very end |
create(SyncOnSubscribe) | replaced with generate + overloads (distinct interfaces, you can implement them all at once) |
create(AsnycOnSubscribe) | not present |
create(OnSubscribe) | repurposed with safe create(FlowableOnSubscribe, BackpressureStrategy), raw support via unsafeCreate() |
from | disambiguated into fromArray, fromIterable, fromFuture |
N/A | added fromPublisher |
fromAsync | renamed to create() |
N/A | added intervalRange() |
limit | dropped, use take |
merge | added overloads with prefetch |
mergeDelayError | added overloads with prefetch |
sequenceEqual | added overload with bufferSize |
switchOnNext | added overload with prefetch |
switchOnNextDelayError | added overload with prefetch |
timer | deprecated overloads dropped |
zip | added overloads with bufferSize and delayErrors capabilities, disambiguated to zipArray and zipIterable |
Instance methods:
1.x | 2.x |
---|---|
all | RC3 returns Single<Boolean> now |
any | RC3 returns Single<Boolean> now |
asObservable | renamed to hide(), hides all identities now |
buffer | overloads with custom Collection supplier |
cache(int) | deprecated and dropped |
collect | RC3 returns Single<U> |
collect(U, Action2<U, T>) | disambiguated to collectInto and RC3 returns Single<U> |
concatMap | added overloads with prefetch |
concatMapDelayError | added overloads with prefetch, option to delay till the current ends or till the very end |
concatMapEager | added overloads with prefetch |
concatMapEagerDelayError | added overloads with prefetch, option to delay till the current ends or till the very end |
count | RC3 returns Single<Long> now |
countLong | dropped, use count |
distinct | overload with custom Collection supplier. |
doOnCompleted | renamed to doOnComplete, note the missing d ! |
doOnUnsubscribe | renamed to Flowable.doOnCancel and doOnDispose for the others, additional info |
N/A | added doOnLifecylce to handle onSubscribe, request and cancel peeking |
elementAt(int) | RC3 no longer signals NoSuchElementException if the source is shorter than the index |
elementAt(Func1, int) | dropped, use filter(predicate).elementAt(int) |
elementAtOrDefault(int, T) | renamed to elementAt(int, T) and RC3 returns Single<T> |
elementAtOrDefault(Func1, int, T) | dropped, use filter(predicate).elementAt(int, T) |
first() | RC3 renamed to firstElement and returns Maybe<T> |
first(Func1) | dropped, use filter(predicate).first() |
firstOrDefault(T) | renamed to first(T) and RC3 returns Single<T> |
firstOrDefault(Func1, T) | dropped, use filter(predicate).first(T) |
flatMap | added overloads with prefetch |
N/A | added forEachWhile(Predicate<T>, [Consumer<Throwable>, [Action]]) for conditionally stopping consumption |
groupBy | added overload with bufferSize and delayError option, the custom internal map version didn't make it into RC1 |
ignoreElements | RC3 returns Completable |
isEmpty | RC3 returns Single<Boolean> |
last() | RC3 renamed to lastElement and returns Maybe<T> |
last(Func1) | dropped, use filter(predicate).last() |
lastOrDefault(T) | renamed to last(T) and RC3 returns Single<T> |
lastOrDefault(Func1, T) | dropped, use filter(predicate).last(T) |
nest | dropped, use manual just |
publish(Func1) | added overload with prefetch |
reduce(Func2) | RC3 returns Maybe<T> |
N/A | added reduceWith(Callable, BiFunction) to reduce in a Subscriber-individual manner, returns Single<T> |
N/A | added repeatUntil(BooleanSupplier) |
repeatWhen(Func1, Scheduler) | dropped the overload,use subscribeOn(Scheduler).repeatWhen(Function) instead |
retry | added retry(Predicate), retry(int, Predicate) |
N/A | added retryUntil(BooleanSupplier) |
retryWhen(Func1, Scheduler) | dropped the overload, use subscribeOn(Scheduler).retryWhen(Function) instead |
sample | doesn't emit the very last item if the upstream completes within the period,added overloads with emitLast parameter |
N/A | added scanWith(Callable, BiFunction) to scan in a Subscriber-individual manner |
single() | RC3 renamed to singleElement and returns Maybe<T> |
single(Func1) | dropped, use filter(predicate).single() |
singleOrDefault(T) | renamed to single(T) and RC3 returns Single<T> |
singleOrDefault(Func1, T) | dropped, use filter(predicate).single(T) |
skipLast | added overloads with bufferSize and delayError options |
startWith | 2-9 argument version dropped, use startWithArray instead |
N/A | added startWithArray to disambiguate |
N/A | added subscribeWith that returns its input after subscription |
switchMap | added overload with prefetch argument |
switchMapDelayError | added overload with prefetch argument |
takeLastBuffer | dropped |
N/A | added test() (returns TestSubscriber subscribed to this) with overloads to fluently test |
throttleLast | doesn't emit the very last item if the upstream completes within the period, use sample with the emitLast parameter |
timeout(Func0<Observable>, ...) | signature changed to timeout(Publisher, ...) and dropped the function, use defer(Callable<Publisher>>) if necessary |
toBlocking().y | inlined as blockingY() operators, except toFuture |
toCompletable | RC3 dropped, use ignoreElements |
toList | RC3 returns Single<List<T>> |
toMap | RC3 returns Single<Map<K, V>> |
toMultimap | RC3 returns Single<Map<K, Collection<V>>> |
N/A | added toFuture |
N/A | added toObservable |
toSingle | RC3 dropped, use single(T) |
toSortedList | RC3 returns Single<List<T>> |
withLatestFrom | 5-9 source overloads dropped |
zipWith | added overloads with prefetch and delayErrors options |
Different return types
- Some operators that produced exactly one value or an error now return Single in 2.x (or Maybe if an empty source is allowed).
- (Remark: this is "experimental" in RC2 and RC3 to see how it feels to program with such mixed-type sequences and whether or not there has to be too much toObservable/toFlowable back-conversion.)
Operator | Old return type | New return type | Remark |
---|---|---|---|
all(Predicate) | Observable<Boolean> | Single<Boolean> | Emits true if all elements match the predicate |
any(Predicate) | Observable<Boolean> | Single<Boolean> | Emits true if any elements match the predicate |
count() | Observable<Long> | Single<Long> | Counts the number of elements in the sequence |
elementAt(int) | Observable<T> | Maybe<T> | Emits the element at the given index or completes |
elementAt(int, T) | Observable<T> | Single<T> | Emits the element at the given index or the default |
elementAtOrError(int) | Observable<T> | Single<T> | Emits the indexth element or a NoSuchElementException |
first(T) | Observable<T> | Single<T> | Emits the very first element or NoSuchElementException |
firstElement() | Observable<T> | Maybe<T> | Emits the very first element or completes |
firstOrError() | Observable<T> | Single<T> | Emits the first element or a NoSuchElementException if the source is empty |
ignoreElements() | Observable<T> | Completable | Ignore all but the terminal events |
isEmpty() | Observable<Boolean> | Single<Boolean> | Emits true if the source is empty |
last(T) | Observable<T> | Single<T> | Emits the very last element or the default item |
lastElement() | Observable<T> | Maybe<T> | Emits the very last element or completes |
lastOrError() | Observable<T> | Single<T> | Emits the lastelement or a NoSuchElementException if the source is empty |
reduce(BiFunction) | Observable<T> | Maybe<T> | Emits the reduced value or completes |
reduce(Callable, BiFunction) | Observable<U> | Single<U> | Emits the reduced value (or the initial value) |
reduceWith(U, BiFunction) | Observable<U> | Single<U> | Emits the reduced value (or the initial value) |
single(T) | Observable<T> | Single<T> | Emits the only element or the default item |
singleElement() | Observable<T> | Maybe<T> | Emits the only element or completes |
singleOrError() | Observable<T> | Single<T> | Emits the one and only element, IndexOutOfBoundsException if the source is longer than 1 item or a NoSuchElementException if the source is empty |
toList() | Observable<List<T>> | Single<List<T>> | collects all elements into a List |
toMap() | Observable<Map<K, V>> | Single<Map<K, V>> | collects all elements into a Map |
toMultimap() | Observable<Map<K, Collection<V>>> | Single<Map<K, Collection<V>>> | collects all elements into a Map with collection |
toSortedList() | Observable<List<T>> | Single<List<T>> | collects all elements into a List and sorts it |
Removals
- To make sure the final API of 2.0 is clean as possible, we remove methods and other components between release candidates without deprecating them.
Removed in version | Component | Remark |
---|---|---|
RC3 | Flowable.toCompletable() | use Flowable.ignoreElements() |
RC3 | Flowable.toSingle() | use Flowable.single(T) |
RC3 | Flowable.toMaybe() | use Flowable.singleElement() |
RC3 | Observable.toCompletable() | use Observable.ignoreElements() |
RC3 | Observable.toSingle() | use Observable.single(T) |
RC3 | Observable.toMaybe() | use Observable.singleElement() |
Miscellaneous changes
doOnCancel/doOnDispose/unsubscribeOn
In 1.x, the doOnUnsubscribe was always executed on a terminal event because 1.x' SafeSubscriber called unsubscribe on itself. This was practically unnecessary and the Reactive-Streams specification states that when a terminal event arrives at a Subscriber, the upstream Subscription should be considered cancelled and thus calling cancel() is a no-op.
For the same reason, unsubscribeOn is not called on the regular termination path but only when there is an actual cancel (or dispose) call on the chain.
Therefore, the following sequence won't call doOnCancel:
Flowable.just(1, 2, 3)
.doOnCancel(() -> System.out.println("Cancelled!"))
.subscribe(System.out::println);
However, the following will call since the take operator cancels after the set amount of onNext events have been delivered:
Flowable.just(1, 2, 3)
.doOnCancel(() -> System.out.println("Cancelled!"))
.take(2)
.subscribe(System.out::println);
If you need to perform cleanup on both regular termination or cancellation, consider the operator using instead.
Alternatively, the doFinally operator (introduced in 2.0.1 and standardized in 2.1) calls a developer specified Action that gets executed after a source completed, failed with an error or got cancelled/disposed:
Flowable.just(1, 2, 3)
.doFinally(() -> System.out.println("Finally"))
.subscribe(System.out::println);
Flowable.just(1, 2, 3)
.doFinally(() -> System.out.println("Finally"))
.take(2) // cancels the above after 2 elements
.subscribe(System.out::println);