今天看到一個(gè) Observable.fromEmitter 的函數(shù)析苫,這里是這個(gè)函數(shù)的 javadoc
Provides an API (via a cold Observable) that bridges the reactive world with the callback-style, generally non-backpressured world.
Example:
You should call the AsyncEmitter's onNext, onError and onCompleted methods in a serialized fashion. The rest of its methods are thread-safe.
Observable.<Event>fromEmitter(emitter -> {
Callback listener = new Callback() {
@Override
public void onEvent(Event e) {
emitter.onNext(e);
if (e.isLast()) {
emitter.onCompleted();
}
}
@Override
public void onFailure(Exception e) {
emitter.onError(e);
}
};
AutoCloseable c = api.someMethod(listener);
emitter.setCancellation(c::close);
}, BackpressureMode.BUFFER);
這是一個(gè)實(shí)驗(yàn)性的功能,用于和傳統(tǒng)回調(diào)模式的程序?qū)臃看摇榱死斫膺@個(gè)機(jī)制赶舆,我寫了一個(gè)完整的例子龄坪。
package org.wcy123.rxjava1;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;
import org.junit.Test;
import lombok.extern.slf4j.Slf4j;
import rx.AsyncEmitter;
import rx.Observable;
@Slf4j
public class FromEmitterTest {
@Test
public void main1() throws Exception {
final ExecutorService service = Executors.newCachedThreadPool();
final CountDownLatch latch = new CountDownLatch(3 * 4);
Observable.fromEmitter(
emitter -> IntStream.range(0, 3).boxed().forEach(
threadIndex -> service.submit(
() -> {
for (int i = 0; i < 4; ++i) {
emitter.onNext("thread + " + threadIndex
+ " i = " + i);
Utils.sleep(1000);
latch.countDown();
}
if (threadIndex == 2) {
emitter.onCompleted();
}
})),
AsyncEmitter.BackpressureMode.BUFFER)
.subscribe(s -> log.info("item {}", s));
log.info("提前打印這里, subscribe 沒有阻塞住");
log.info("開始等待解鎖");
latch.await();
log.info("解鎖完畢");
}
}
這個(gè)例子的執(zhí)行結(jié)果是
02:12:24.244 [main] INFO org.wcy123.rxjava1.FromEmitterTest - 提前打印這里, subscribe 沒有阻塞住
02:12:24.244 [pool-1-thread-1] INFO org.wcy123.rxjava1.FromEmitterTest - item thread + 0 i = 0
02:12:24.250 [main] INFO org.wcy123.rxjava1.FromEmitterTest - 開始等待解鎖
02:12:24.251 [pool-1-thread-1] INFO org.wcy123.rxjava1.FromEmitterTest - item thread + 1 i = 0
02:12:24.251 [pool-1-thread-1] INFO org.wcy123.rxjava1.FromEmitterTest - item thread + 2 i = 0
02:12:25.245 [pool-1-thread-2] INFO org.wcy123.rxjava1.FromEmitterTest - item thread + 2 i = 1
02:12:25.255 [pool-1-thread-1] INFO org.wcy123.rxjava1.FromEmitterTest - item thread + 0 i = 1
02:12:26.248 [pool-1-thread-3] INFO org.wcy123.rxjava1.FromEmitterTest - item thread + 2 i = 2
02:12:26.249 [pool-1-thread-3] INFO org.wcy123.rxjava1.FromEmitterTest - item thread + 1 i = 2
02:12:26.257 [pool-1-thread-1] INFO org.wcy123.rxjava1.FromEmitterTest - item thread + 0 i = 2
02:12:27.252 [pool-1-thread-2] INFO org.wcy123.rxjava1.FromEmitterTest - item thread + 2 i = 3
02:12:27.258 [pool-1-thread-1] INFO org.wcy123.rxjava1.FromEmitterTest - item thread + 0 i = 3
02:12:28.264 [main] INFO org.wcy123.rxjava1.FromEmitterTest - 解鎖完畢
注意到 log.info("item")
是運(yùn)行在三個(gè)不同的線程中叙身。fromEmitter 的第一個(gè)參數(shù)是一個(gè)函數(shù)封拧,即 f志鹃, 該函數(shù)的第一個(gè)參數(shù)是 emitter ,類型是 AsyncEmitter泽西。fromEmitter 返回一個(gè) Observable , 這個(gè)Obverable 被訂閱的時(shí)候弄跌,就會(huì)運(yùn)行函數(shù) f 。f 運(yùn)行時(shí)尝苇,創(chuàng)建了 3 個(gè)線程,每個(gè)線程里面埠胖,都會(huì)調(diào)用 emitter
來發(fā)布數(shù)據(jù)糠溜,emitter.onNext(...) ,一旦調(diào)用這個(gè)函數(shù)直撤,會(huì)觸發(fā)后面所有的 Observable 定義的行為非竿,觸發(fā) s->log.info("iterm{}", s)
的執(zhí)行。
注意到 thread 0 并沒有機(jī)會(huì)打印出來最后一個(gè) i = 3谋竖, 因?yàn)?thread 2 提前調(diào)用了 emitter.onComplet()
latch.await()
等待所有線程結(jié)束红柱。