reactor 本質(zhì)上是觀察者模式的使用
添加數(shù)據(jù)
List<String> list = new ArrayList<>();
list.add("1");
list.add("2");
list.add("3");
list.add("4");
list.add("5");
// 添加數(shù)據(jù) 通過(guò)流的方式
Flux.fromStream(list.stream());
// 只包含當(dāng)前的數(shù)據(jù)流
Flux.just("1","2");
// 空的數(shù)據(jù)流
Flux.empty();
// 手動(dòng)添加數(shù)據(jù)流
Flux.create((i) -> {
i.next("1");
i.next("2");
i.next("3");
i.next("4");
});
// 添加數(shù)據(jù)通過(guò)集合的方式
Flux.fromIterable(list);
// 定時(shí)產(chǎn)生一個(gè)數(shù)據(jù)哮塞,當(dāng)前方式是每
Flux.interval(Duration.ofMillis(100))轩勘;
消費(fèi)數(shù)據(jù) subscribe方法
Flux flux = Flux.create((i) -> {
i.next("1");
i.next("2");
i.next("3");
i.next("4");
});
// 消費(fèi)數(shù)據(jù)
flux.subscribe((s) -> {
System.out.println(s);
});
flux.subscribe(System.out::println)
flux.subscribe(new Consumer<String>() {
@Override
public void accept(String s) {
// System.out.println(s);
}
});
輸出結(jié)果
16:37:50.894 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
1
2
3
4
Process finished with exit code 0
數(shù)據(jù)中間處理環(huán)節(jié) do 系列接口
List<String> list = new ArrayList<>();
list.add("1");
list.add("2");
list.add("3");
list.add("4");
list.add("5");
// 添加數(shù)據(jù)源
Flux.fromIterable(list)
// 每處里一個(gè)數(shù)據(jù)前 就會(huì)執(zhí)行該方法
.doOnNext((n) -> {
//System.out.println("next");
})
// 執(zhí)行完數(shù)據(jù)流執(zhí)行該方法
.doFinally((f) -> {
// System.out.println("finally");
})
// 每處里一個(gè)數(shù)據(jù)前就會(huì)執(zhí)行該方法
.doOnEach((e) -> {
//System.out.println("each");
})
//執(zhí)行結(jié)束之后輸出
.doOnComplete(()->{
System.out.println("complete");
})
.subscribe((System.out::println));
執(zhí)行結(jié)果
17:00:42.399 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
next
each
1
next
each
2
next
each
3
next
each
4
next
each
5
each
complete
finally
Process finished with exit code 0
buffer 的使用
//buffer 可以將一個(gè)長(zhǎng)數(shù)據(jù)流切割為想要長(zhǎng)度的數(shù)據(jù)流 用途比如需要保存大量的數(shù)據(jù)的時(shí)候可以用這個(gè)方法
//對(duì)數(shù)據(jù)切割羹应,分段保存
Flux.interval(Duration.ofMillis(10)).buffer(10).subscribe(System.out::println);
Thread.sleep(100000L);
輸出結(jié)果
18:12:44.159 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
[20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
[30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
[40, 41, 42, 43, 44, 45, 46, 47, 48, 49]
[50, 51, 52, 53, 54, 55, 56, 57, 58, 59]
flatmap, map 的區(qū)別
先看下圖
image.png
返回的不一樣
flatMap必須返回的是個(gè)publisher 而map 返回的是一個(gè)對(duì)象 這個(gè)對(duì)象既可以是item 也可以是封裝為Flux 倚评,Mono
List<String> list = new ArrayList<>();
list.add("1");
list.add("2");
list.add("3");
list.add("4");
list.add("5");
Flux.fromIterable(list).flatMap(s -> {
return Mono.just(s);
});
Flux.fromIterable(list).flatMap(s -> {
return Flux.just(s);
});
Flux.fromIterable(list).map(s -> {
return Flux.just(s);
});
Flux.fromIterable(list).map(s -> {
return Mono.just(s);
});
Flux.fromIterable(list).map(s -> {
return s;
});