計(jì)算機(jī)就跟比基尼一樣懒棉,省去了人們?cè)S多的胡思亂想。
buffer(Publisher<?> other)
buffer(Publisher<?> other, Supplier<C> bufferSupplier)
根據(jù)other
信號(hào)來(lái)決定每次緩存的個(gè)數(shù), 當(dāng)other.onNext
的時(shí)候把緩存提交到消費(fèi)者的onNext
public final Flux<List<T>> buffer(Publisher<?> other)
public final <C extends Collection<? super T>> Flux<C> buffer(Publisher<?> other, Supplier<C> bufferSupplier)
public void test5() throws InterruptedException {
Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6).delayElements(Duration.ofSeconds(2));
Flux<List<Integer>> buffer = flux.buffer(Duration.ofSeconds(3));
buffer.subscribe(integers -> {
System.out.println("產(chǎn)品到達(dá),進(jìn)行卸貨:");
integers.stream().forEach(integer -> {
System.out.println(integer);
});
});
Thread.sleep(20000);
}
buffer(Duration bufferingTimespan)
buffer(Duration bufferingTimespan, Scheduler timer)
這兩個(gè)方法是上面兩個(gè)方法的應(yīng)用, 傳入的other
是FluxInterval
, 周期性觸發(fā)onNext
示意圖說(shuō)明的很清楚.