SpringBoot——webflux解析

webflux介紹

Spring Boot 2.0
spring.io 官網(wǎng)有句醒目的話是:

BUILD ANYTHING WITH SPRING BOOT
Spring Boot (Boot 顧名思義躯砰,是引導(dǎo)的意思)框架是用于簡(jiǎn)化 Spring 應(yīng)用從搭建到開(kāi)發(fā)的過(guò)程亡脑。應(yīng)用開(kāi)箱即用糜烹,只要通過(guò)一個(gè)指令,包括命令行 java -jar 与殃、SpringApplication 應(yīng)用啟動(dòng)類(lèi) 、 Spring Boot Maven 插件等,就可以啟動(dòng)應(yīng)用了栅螟。另外荆秦,Spring Boot 強(qiáng)調(diào)只需要很少的配置文件,所以在開(kāi)發(fā)生產(chǎn)級(jí) Spring 應(yīng)用中力图,讓開(kāi)發(fā)變得更加高效和簡(jiǎn)易步绸。目前,Spring Boot 版本是 2.x 版本搪哪。Spring Boot 包括 WebFlux靡努。


傳統(tǒng)的以SpringMVC為代表的webmvc技術(shù)使用的是同步阻塞式IO模型


而Spring WebFlux是一個(gè)異步非阻塞式IO模型,可以用少量的容器線程支撐大量的并發(fā)訪問(wèn)晓折,所以Spring WebFlux可以提升吞吐量和伸縮性惑朦,但是接口的響應(yīng)時(shí)間并不會(huì)縮短,其處理結(jié)果還是得由worker線程處理完成之后在返回給請(qǐng)求


webflux應(yīng)用場(chǎng)景

適合IO密集型漓概、磁盤(pán)IO密集漾月、網(wǎng)絡(luò)IO密集等服務(wù)場(chǎng)景,比如微服務(wù)網(wǎng)關(guān)胃珍,就可以使用webflux技術(shù)來(lái)顯著的提升網(wǎng)關(guān)對(duì)下游服務(wù)的吞吐量梁肿,spring cloud gateway就使用了webflux這門(mén)技術(shù)


Spring Boot 2.0 WebFlux

了解 WebFlux,首先了解下什么是 Reactive Streams觅彰。Reactive Streams 是 JVM 中面向流的庫(kù)標(biāo)準(zhǔn)和規(guī)范:

  • 處理可能無(wú)限數(shù)量的元素
  • 按順序處理
  • 組件之間異步傳遞
  • 強(qiáng)制性非阻塞背壓(Backpressure)

Backpressure(背壓)

背壓是一種常用策略吩蔑,使得發(fā)布者擁有無(wú)限制的緩沖區(qū)存儲(chǔ)元素,用于確保發(fā)布者發(fā)布元素太快時(shí)填抬,不會(huì)去壓制訂閱者烛芬。

Reactive Streams(響應(yīng)式流)

一般由以下組成:

  • publisher:發(fā)布者,發(fā)布元素到訂閱者
  • subscriber:訂閱者飒责,消費(fèi)元素
  • subscription:訂閱赘娄,在發(fā)布者中,訂閱被創(chuàng)建時(shí)宏蛉,將與訂閱者共享
  • processor:處理器遣臼,發(fā)布者與訂閱者之間處理數(shù)據(jù),包含了發(fā)布者與訂閱者的共同體
publisher接口規(guī)范
public interface Publisher<T> {
    void subscribe(Subscriber<? super T> var1);
}
subscriber接口規(guī)范
public interface Subscriber<T> {
    void onSubscribe(Subscription var1);

    void onNext(T var1);

    void onError(Throwable var1);

    void onComplete();
}
subscription接口規(guī)范
public interface Subscription {
    void request(long var1);

    void cancel();
}
processor接口規(guī)范
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

響應(yīng)式編程

有了 Reactive Streams 這種標(biāo)準(zhǔn)和規(guī)范拾并,利用規(guī)范可以進(jìn)行響應(yīng)式編程揍堰。那再了解下什么是 Reactive programming 響應(yīng)式編程。響應(yīng)式編程是基于異步和事件驅(qū)動(dòng)的非阻塞程序嗅义,只是垂直通過(guò)在 JVM 內(nèi)啟動(dòng)少量線程擴(kuò)展屏歹,而不是水平通過(guò)集群擴(kuò)展。這就是一個(gè)編程范例芥喇,具體項(xiàng)目中如何體現(xiàn)呢西采?

響應(yīng)式項(xiàng)目編程實(shí)戰(zhàn)中,通過(guò)基于 Reactive Streams 規(guī)范實(shí)現(xiàn)的框架 Reactor 去實(shí)戰(zhàn)继控。Reactor 一般提供兩種響應(yīng)式 API :

  • Mono:實(shí)現(xiàn)發(fā)布者械馆,并返回 0 或 1 個(gè)元素
  • Flux:實(shí)現(xiàn)發(fā)布者胖眷,并返回 N 個(gè)元素

Spring Webflux

Spring Boot Webflux 就是基于 Reactor 實(shí)現(xiàn)的。Spring Boot 2.0 包括一個(gè)新的 spring-webflux 模塊霹崎。該模塊包含對(duì)響應(yīng)式 HTTP 和 WebSocket 客戶端的支持珊搀,以及對(duì) REST,HTML 和 WebSocket 交互等程序的支持尾菇。一般來(lái)說(shuō)境析,Spring MVC 用于同步處理,Spring Webflux 用于異步處理派诬。

Spring Boot Webflux 有兩種編程模型實(shí)現(xiàn)劳淆,一種類(lèi)似 Spring MVC 注解方式,另一種是使用其功能性端點(diǎn)方式默赂。

Spring Boot 2.0 WebFlux 特性

常用的 Spring Boot 2.0 WebFlux 生產(chǎn)的特性如下:

  • 響應(yīng)式 API
  • 編程模型
  • 適用性
  • 內(nèi)嵌容器
  • Starter 組件

還有對(duì)日志沛鸵、Web、消息缆八、測(cè)試及擴(kuò)展等支持曲掰。

響應(yīng)式 API

Reactor 框架是 Spring Boot Webflux 響應(yīng)庫(kù)依賴(lài),通過(guò) Reactive Streams 并與其他響應(yīng)庫(kù)交互奈辰。提供了 兩種響應(yīng)式 API:Mono 和 Flux栏妖。一般是將 Publisher 作為輸入,在框架內(nèi)部轉(zhuǎn)換成 Reactor 類(lèi)型并處理邏輯奖恰,然后返回 Flux 或 Mono 作為輸出吊趾。

spring webflux和spring mvc的異同點(diǎn)


一圖就很明確了,WebFlux 和 MVC 有交集房官,方便大家遷移趾徽。但是注意:

  • MVC 能滿足場(chǎng)景的续滋,就不需要更改為 WebFlux翰守。
  • 要注意容器的支持,可以看看下面內(nèi)嵌容器的支持疲酌。
  • 微服務(wù)體系結(jié)構(gòu)蜡峰,WebFlux 和 MVC 可以混合使用。尤其開(kāi)發(fā) IO 密集型服務(wù)的時(shí)候朗恳,選擇 WebFlux 去實(shí)現(xiàn)湿颅。
  • spring mvc是一個(gè)命令式的編程方式采用同步阻塞方式,方便開(kāi)發(fā)人員編寫(xiě)代碼和調(diào)試粥诫;spring webflux調(diào)試會(huì)非常不方便
  • JDBC連接池和JPA等技術(shù)還是阻塞模型油航,傳統(tǒng)的關(guān)系型數(shù)據(jù)庫(kù)如MySQL也不支持非阻塞的方式獲取數(shù)據(jù),目前只有非關(guān)系型數(shù)據(jù)庫(kù)如Redis怀浆、Mongodb支持非阻塞方式獲取數(shù)據(jù)

編程模型

Spring 5 web 模塊包含了 Spring WebFlux 的 HTTP 抽象谊囚。類(lèi)似 Servlet API , WebFlux 提供了 WebHandler API 去定義非阻塞 API 抽象接口怕享。可以選擇以下兩種編程模型實(shí)現(xiàn):

  • 注解控制層镰踏。和 MVC 保持一致函筋,WebFlux 也支持響應(yīng)性 @RequestBody 注解。
  • 功能性端點(diǎn)奠伪〉剩基于 lambda 輕量級(jí)編程模型,用來(lái)路由和處理請(qǐng)求的小工具绊率。和上面最大的區(qū)別就是谨敛,這種模型,全程控制了請(qǐng)求 - 響應(yīng)的生命流程

內(nèi)嵌容器

跟 Spring Boot 大框架一樣啟動(dòng)應(yīng)用滤否,但 WebFlux 默認(rèn)是通過(guò) Netty 啟動(dòng)佣盒,并且自動(dòng)設(shè)置了默認(rèn)端口為 8080。另外還提供了對(duì) Jetty顽聂、Undertow 等容器的支持肥惭。開(kāi)發(fā)者自行在添加對(duì)應(yīng)的容器 Starter 組件依賴(lài),即可配置并使用對(duì)應(yīng)內(nèi)嵌容器實(shí)例紊搪。

但是要注意蜜葱,必須是 Servlet 3.1+ 容器,如 Tomcat耀石、Jetty牵囤;或者非 Servlet 容器,如 Netty 和 Undertow滞伟。

Netty優(yōu)點(diǎn)

  • API使用簡(jiǎn)單揭鳞、易上手
  • 功能強(qiáng)大、支持多種主流協(xié)議
  • 定制能力強(qiáng)梆奈、可擴(kuò)展性高
  • 性能高野崇、綜合性能最優(yōu)
  • 成熟穩(wěn)定、久經(jīng)考驗(yàn)
  • 社區(qū)活躍亩钟、學(xué)習(xí)資料多

Netty selector模型

Reactor指南

  • Reactor 框架是 Pivotal 公司(開(kāi)發(fā) Spring 等技術(shù)的公司)開(kāi)發(fā)的
  • 實(shí)現(xiàn)了 Reactive Programming 思想乓梨,符合Reactive Streams 規(guī)范(Reactive Streams 是由 Netflix、TypeSafe清酥、Pivotal 等公司發(fā)起的)的一項(xiàng)技術(shù)
  • 側(cè)重于server端的響應(yīng)式編程框架
  • Reactor 框架主要有兩個(gè)主要的模塊:reactor-core 和 reactor-ipc扶镀。前者主要負(fù)責(zé) Reactive Programming 相關(guān)的核心 API 的實(shí)現(xiàn),后者負(fù)責(zé)高性能網(wǎng)絡(luò)通信的實(shí)現(xiàn)焰轻,目前是基于 Netty 實(shí)現(xiàn)的臭觉。

Java原有的異步編程方式

  • Callback:異步方法采用一個(gè)callback作為參數(shù),當(dāng)結(jié)果出來(lái)后回調(diào)這個(gè)callback,例如swings的EventListener
  • Future:異步方法返回一個(gè)Future<T>蝠筑,此時(shí)結(jié)果并不是立刻可以拿到忆肾,需要處理結(jié)束之后才可以使用

Future局限

  • 多個(gè)Future組合不易
  • 調(diào)用Future#get時(shí)仍然會(huì)阻塞
  • 缺乏對(duì)多個(gè)值以及進(jìn)一步的出錯(cuò)處理

Reactor的Publisher

  • Mono 實(shí)現(xiàn)了 org.reactivestreams.Publisher 接口,代表0到1個(gè)元素的響應(yīng)式序列菱肖。
  • Flux 同樣實(shí)現(xiàn)了 org.reactivestreams.Publisher 接口客冈,代表0到N個(gè)元素的結(jié)果。
Flux介紹
  • Flux<T>是一個(gè)標(biāo)準(zhǔn)Publisher<T>稳强,表示0到N個(gè)發(fā)射項(xiàng)的異步序列场仲,可選地以完成信號(hào)或錯(cuò)誤終止。與Reactive Streams規(guī)范中一樣退疫,這三種類(lèi)型的信號(hào)轉(zhuǎn)換為對(duì)下游訂閱者的onNext渠缕、onComplete或onError方法的調(diào)用。
  • 在這種大范圍的可能信號(hào)中褒繁,F(xiàn)lux是通用的reactive 類(lèi)型亦鳞。注意,所有事件棒坏,甚至終止事件燕差,都是可選的:沒(méi)有onNext事件,但是onComplete事件表示一個(gè)空的有限序列坝冕,但是移除onComplete并且您有一個(gè)無(wú)限的空序列(除了關(guān)于取消的測(cè)試之外徒探,沒(méi)有特別有用)。同樣喂窟,無(wú)限序列不一定是空的测暗。例如,F(xiàn)lux.interval(Duration) 產(chǎn)生一個(gè)Flux<Long>磨澡,它是無(wú)限的碗啄,從時(shí)鐘發(fā)出規(guī)則的數(shù)據(jù)。
Mono介紹
  • Mono<T>是一個(gè)專(zhuān)門(mén)的Publisher<T>稳摄,它最多發(fā)出一個(gè)項(xiàng)稚字,然后可選地以onComplete信號(hào)或onError信號(hào)結(jié)束。
  • 它只提供了可用于Flux的操作符的子集秩命,并且一些操作符(特別是那些將Mono與另一個(gè)發(fā)布者組合的操作符)切換到Flux尉共。
  • 例如褒傅,Mono#concatWith(Publisher)返回一個(gè)Flux 弃锐,而Mono#then(Mono)則返回另一個(gè)Mono。
  • 注意殿托,Mono可以用于表示只有完成概念(類(lèi)似于Runnable)的無(wú)值異步進(jìn)程霹菊。若要?jiǎng)?chuàng)建一個(gè),請(qǐng)使用Mono<Void>。
publisher訂閱

reactor實(shí)踐

  • 首先maven工廠引入pom
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTest {

    @Test
    public void testReactor(){
        Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6);
        Mono<Integer> mono = Mono.just(1);

        Integer[] arr = {1,2,3,4,5,6};
        Flux<Integer> flux1 = Flux.fromArray(arr);

        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6);
        Flux<Integer> flux2 = Flux.fromIterable(list);

        Flux<Integer> flux3 = Flux.from(flux);

        Flux<Integer> flux4 = Flux.fromStream(Stream.of(1, 2, 3, 4, 5, 6));

        flux.subscribe();

        flux1.subscribe(System.out::println);

        flux2.subscribe(System.out::println,System.err::println);

        flux3.subscribe(System.out::println,System.err::println,() -> System.out.println("complete"));

        flux4.subscribe(System.out::println,System.err::println,
                () -> System.out.println("complete"),
                subscription -> subscription.request(3));

        flux4.subscribe(new DemoSubscriber());
    }

    class DemoSubscriber extends BaseSubscriber<Integer>{
        @Override
        protected void hookOnSubscribe(Subscription subscription) {
            System.out.println("Subscribe");
            subscription.request(1);
        }

        @Override
        protected void hookOnNext(Integer value) {
            if(value == 4){
                //背壓旋廷,通知數(shù)據(jù)源鸠按,不要發(fā)送數(shù)據(jù)了
                cancel();
            }
            System.out.println(value);
            request(1);
        }
    }
}

Reactor操作符

map - 元素映射為新元素
  • map操作可以將數(shù)據(jù)元素進(jìn)行轉(zhuǎn)換/映射,得到一個(gè)新元素饶碘。


flatMap - 元素映射為流
  • flatMap操作可以將每個(gè)數(shù)據(jù)元素轉(zhuǎn)換/映射為一個(gè)流目尖,然后將這些流合并為一個(gè)大的數(shù)據(jù)流。


filter - 過(guò)濾
  • filter操作可以對(duì)數(shù)據(jù)元素進(jìn)行篩選扎运。


zip - 一對(duì)一合并

看到zip這個(gè)詞可能會(huì)聯(lián)想到拉鏈瑟曲,它能夠?qū)⒍鄠€(gè)流一對(duì)一的合并起來(lái)。zip有多個(gè)方法變體豪治,我們介紹一個(gè)最常見(jiàn)的二合一的洞拨。


更多

Reactor中提供了非常豐富的操作符,除了以上幾個(gè)常見(jiàn)的负拟,還有:

  • 用于編程方式自定義生成數(shù)據(jù)流的creategenerate等及其變體方法烦衣;
  • 用于“無(wú)副作用的peek”場(chǎng)景的doOnNextdoOnError掩浙、doOncomplete花吟、doOnSubscribedoOnCancel等及其變體方法厨姚;
  • 用于數(shù)據(jù)流轉(zhuǎn)換的when示辈、and/ormerge遣蚀、concat矾麻、collectcount芭梯、repeat等及其變體方法险耀;
  • 用于過(guò)濾/揀選的takefirst玖喘、last甩牺、sampleskip累奈、limitRequest等及其變體方法贬派;
  • 用于錯(cuò)誤處理的timeoutonErrorReturn澎媒、onErrorResume搞乏、doFinallyretryWhen等及其變體方法戒努;
  • 用于分批的window请敦、buffergroup等及其變體方法;
  • 用于線程調(diào)度的publishOnsubscribeOn方法侍筛。

使用這些操作符萤皂,你幾乎可以搭建出能夠進(jìn)行任何業(yè)務(wù)需求的數(shù)據(jù)處理管道/流水線。

抱歉以上這些暫時(shí)不能一一介紹匣椰,更多詳情請(qǐng)參考JavaDoc

reactor和java8 stream區(qū)別

  • 形似而神不似
  • reactor:push模式裆熙,服務(wù)端推送數(shù)據(jù)給客戶端
  • java8 stream:pull模式,客戶端主動(dòng)向服務(wù)端請(qǐng)求數(shù)據(jù)

Reactor線程模型

Reactor創(chuàng)建線程的方式
  • Schedulers.immediate():當(dāng)前線程
  • Schedulers.single():可重用的單線程禽笑,注意弛车,這個(gè)方法對(duì)所有調(diào)用者都提供同一個(gè)線程來(lái)使用, 直到該調(diào)度器被廢棄蒲每。如果你想使用獨(dú)占的線程纷跛,請(qǐng)使用Schedulers.newSingle();
  • Schedulers.elastic():彈性線程池邀杏,它根據(jù)需要?jiǎng)?chuàng)建一個(gè)線程池贫奠,重用空閑線程。線程池如果空閑時(shí)間過(guò)長(zhǎng) (默認(rèn)為 60s)就會(huì)被廢棄望蜡。對(duì)于 I/O 阻塞的場(chǎng)景比較適用唤崭。Schedulers.elastic()能夠方便地給一個(gè)阻塞 的任務(wù)分配它自己的線程,從而不會(huì)妨礙其他任務(wù)和資源脖律;
  • Schedulers.parallel():固定大小線程池谢肾,所創(chuàng)建線程池的大小與CPU個(gè)數(shù)等同
  • Schedulers.fromExecutorService(ExecutorService):自定義線程池,基于自定義的ExecutorService創(chuàng)建 Scheduler(雖然不太建議小泉,不過(guò)你也可以使用Executor來(lái)創(chuàng)建)
線程模型
線程切換實(shí)踐
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTest {

    @Test
    public void testReactor() throws InterruptedException {
        Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6);

        flux.map(i -> {
            System.out.println(Thread.currentThread().getName()+"-map1");
            return i * 3;
        }).publishOn(Schedulers.elastic()).map(
                i -> {
                    System.out.println(Thread.currentThread().getName()+"-map2");
                    return i / 3;
                }
        ).subscribeOn(Schedulers.parallel())
                .subscribe(i -> System.out.println(Thread.currentThread().getName()+"-" + i));

        Thread.sleep(10000);
    }
}
線程切換總結(jié)
  • publishOn:它將上游信號(hào)傳給下游芦疏,同時(shí)改變后續(xù)的操作符的執(zhí)行所在線程,直到下一個(gè)publishOn出現(xiàn)在這個(gè)鏈上
  • subscribeOn:作用于向上的訂閱鏈微姊,無(wú)論處于操作鏈的什么位置酸茴,它都會(huì)影響到源頭的線程執(zhí)行環(huán)境,但不會(huì)影響到后續(xù)的publishOn

webflux實(shí)踐

兼容spring mvc的寫(xiě)法

@RestController
public class DemoController {

    @GetMapping("/demo")
    public Mono<String> demo(){
        return Mono.just("demo");
    }
}

spring webflux函數(shù)式寫(xiě)法

@Component
public class DemoHandler {

    public Mono<ServerResponse> hello(ServerRequest request){
        return ok().contentType(MediaType.TEXT_PLAIN)
                .body(Mono.just("hello"),String.class);
    }

    public Mono<ServerResponse> world(ServerRequest request){
        return ok().contentType(MediaType.TEXT_PLAIN)
                .body(Mono.just("world"),String.class);
    }

    public Mono<ServerResponse> times(ServerRequest request){
        //每隔一秒發(fā)送當(dāng)前的時(shí)間
        return ok().contentType(MediaType.TEXT_EVENT_STREAM)
                .body(Flux.interval(Duration.ofSeconds(1))
                        .map(it -> new SimpleDateFormat("HH:mm:ss").format(new Date())),String.class);
    }
}

配置路由

@Configuration
public class RouterConfig {

    @Autowired
    private DemoHandler demoHandler;

    @Bean
    public RouterFunction<ServerResponse> demoRouter(){
        //路由函數(shù)的編寫(xiě)
        return route(GET("/hello"),demoHandler::hello)
                .andRoute(GET("/world"),demoHandler::world)
                .andRoute(GET("/times"),demoHandler::times);
    }
}

連接關(guān)系型數(shù)據(jù)庫(kù)案例

@Component
public class DemoHandler {

    @Autowired
    private PersonService personService;

    public Mono<ServerResponse> queryPerson(ServerRequest request){
        Integer id = Integer.valueOf(request.pathVariable("id"));
        return ok().contentType(MediaType.APPLICATION_JSON_UTF8)
                .body(Mono.just(personService.getPersonById(id)), Person.class);
    }
}

配置路由

@Configuration
public class RouterConfig {

    @Autowired
    private DemoHandler demoHandler;

    @Bean
    public RouterFunction<ServerResponse> demoRouter(){
        //路由函數(shù)的編寫(xiě)
        return route(GET("/hello"),demoHandler::hello)
                .andRoute(GET("/world"),demoHandler::world)
                .andRoute(GET("/times"),demoHandler::times)
                .andRoute(GET("/queryPerson/{id}"),demoHandler::queryPerson);
    }
}

連接非關(guān)系型數(shù)據(jù)庫(kù)案例

  • 引入mongodb的maven
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
  • 在application.properties中配置mongodb屬性
#mongodb
spring.data.mongodb.uri=mongodb://root:yibo@localhost:27017
spring.data.mongodb.database=webflux
  • 編寫(xiě)代碼
@Document(collection = "user")
@Data
public class User {

    @Id
    private String id;

    private String name;

    private int age;
}

@Repository
public interface UserRepository extends ReactiveMongoRepository<User,String> {
}

@Component
public class DemoHandler {

    @Autowired
    private UserRepository userRepository;

    public Mono<ServerResponse> listUser(ServerRequest request){
        return ok().contentType(MediaType.APPLICATION_JSON_UTF8)
                .body(userRepository.findAll(), User.class);
    }

    public Mono<ServerResponse> saveUser(ServerRequest request){
        String name = request.pathVariable("name");
        Integer age = Integer.valueOf(request.pathVariable("age"));
        User user = new User();
        user.setName(name);
        user.setAge(age);
        Mono<User> mono = Mono.just(user);
        return ok().build(userRepository.insert(mono).then());
    }
}
  • 編寫(xiě)路由
@Configuration
public class RouterConfig {

    @Autowired
    private DemoHandler demoHandler;

    @Bean
    public RouterFunction<ServerResponse> demoRouter(){
        //路由函數(shù)的編寫(xiě)
        return route(GET("/hello"),demoHandler::hello)
                .andRoute(GET("/world"),demoHandler::world)
                .andRoute(GET("/times"),demoHandler::times)
                .andRoute(GET("/queryPerson/{id}"),demoHandler::queryPerson)
                .andRoute(GET("/listUser"),demoHandler::listUser)
                .andRoute(GET("/saveUser/{name}/{age}"),demoHandler::saveUser);
    }
}

webflux解析

spring mvc處理流程



具體步驟:

  • 第一步:發(fā)起請(qǐng)求到前端控制器(DispatcherServlet)
  • 第二步:前端控制器請(qǐng)求HandlerMapping查找 Handler (可以根據(jù)xml配置兢交、注解進(jìn)行查找)
    匹配條件包括:請(qǐng)求路徑薪捍、請(qǐng)求方法、header信息等
  • 第三步:處理器映射器HandlerMapping向前端控制器返回Handler配喳,HandlerMapping會(huì)把請(qǐng)求映射為HandlerExecutionChain對(duì)象(包含一個(gè)Handler處理器(頁(yè)面控制器)對(duì)象酪穿,多個(gè)HandlerInterceptor攔截器對(duì)象),通過(guò)這種策略模式晴裹,很容易添加新的映射策略
    HandlerInterceptor是請(qǐng)求路徑上的攔截器被济,需要自己實(shí)現(xiàn)這個(gè)接口以攔截請(qǐng)求,做一些對(duì)handler的前置和后置處理工作息拜。
  • 第四步:前端控制器調(diào)用處理器適配器去執(zhí)行Handler
  • 第五步:處理器適配器HandlerAdapter將會(huì)根據(jù)適配的結(jié)果去執(zhí)行Handler
  • 第六步:Handler執(zhí)行完成給適配器返回ModelAndView
  • 第七步:處理器適配器向前端控制器返回ModelAndView (ModelAndView是springmvc框架的一個(gè)底層對(duì)象溉潭,包括 Model和view)
  • 第八步:前端控制器請(qǐng)求視圖解析器去進(jìn)行視圖解析 (根據(jù)邏輯視圖名解析成真正的視圖(jsp))净响,通過(guò)這種策略很容易更換其他視圖技術(shù)少欺,只需要更改視圖解析器即可
  • 第九步:視圖解析器向前端控制器返回View
  • 第十步:前端控制器進(jìn)行視圖渲染 (視圖渲染將模型數(shù)據(jù)(在ModelAndView對(duì)象中)填充到request域)
  • 第十一步:前端控制器向用戶響應(yīng)結(jié)果

spring webflux處理請(qǐng)求流程

核心控制器DispatcherHandler喳瓣,等同于阻塞方式的DispatcherServlet
DispatcherHandler實(shí)現(xiàn)ApplicationContextAware,那么必然會(huì)調(diào)用setApplicationContext方法

public class DispatcherHandler implements WebHandler, ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) {
        initStrategies(applicationContext);
    }
}
initStrategies初始化

獲取HandlerMapping赞别,HandlerAdapter畏陕,HandlerResultHandler的所有實(shí)例

protected void initStrategies(ApplicationContext context) {
    //獲取HandlerMapping及其子類(lèi)型的bean
    //HandlerMapping根據(jù)請(qǐng)求request獲取handler執(zhí)行鏈
    Map<String, HandlerMapping> mappingBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
            context, HandlerMapping.class, true, false);

    ArrayList<HandlerMapping> mappings = new ArrayList<>(mappingBeans.values());
    //排序
    AnnotationAwareOrderComparator.sort(mappings);
    this.handlerMappings = Collections.unmodifiableList(mappings);

    //獲取HandlerAdapter及其子類(lèi)型的bean
    Map<String, HandlerAdapter> adapterBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
            context, HandlerAdapter.class, true, false);

    this.handlerAdapters = new ArrayList<>(adapterBeans.values());
    //排序
    AnnotationAwareOrderComparator.sort(this.handlerAdapters);

    //獲取HandlerResultHandler及其子類(lèi)型的bean
    Map<String, HandlerResultHandler> beans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
            context, HandlerResultHandler.class, true, false);

    this.resultHandlers = new ArrayList<>(beans.values());
    AnnotationAwareOrderComparator.sort(this.resultHandlers);
}
webflux中引入了一個(gè)新的HandlerMapping,即RouterFunctionMapping

RouterFunctionMapping實(shí)現(xiàn)了InitializingBean仿滔,因此在其實(shí)例化的時(shí)候惠毁,會(huì)調(diào)用afterPropertiesSet方法

public class RouterFunctionMapping extends AbstractHandlerMapping implements InitializingBean {

    @Nullable
    private RouterFunction<?> routerFunction;

    //讀取http傳輸數(shù)據(jù),并解碼成一個(gè)對(duì)象
    private List<HttpMessageReader<?>> messageReaders = Collections.emptyList();

    public RouterFunctionMapping(RouterFunction<?> routerFunction) {
        this.routerFunction = routerFunction;
    }

    @Nullable
    public RouterFunction<?> getRouterFunction() {
        return this.routerFunction;
    }

    public void setMessageReaders(List<HttpMessageReader<?>> messageReaders) {
        this.messageReaders = messageReaders;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        if (CollectionUtils.isEmpty(this.messageReaders)) {
            ServerCodecConfigurer codecConfigurer = ServerCodecConfigurer.create();
            this.messageReaders = codecConfigurer.getReaders();
        }

        if (this.routerFunction == null) {
            //afterPropertiesSet方法調(diào)用的時(shí)候崎页,routerFunction為null
            initRouterFunctions();
        }
    }

    protected void initRouterFunctions() {
        //獲取routerFunctions集合
        List<RouterFunction<?>> routerFunctions = routerFunctions();
         //將一個(gè)請(qǐng)求中含有多個(gè)路由請(qǐng)求RouterFunction合并成一個(gè)RouterFunction
        this.routerFunction = routerFunctions.stream().reduce(RouterFunction::andOther).orElse(null);
        logRouterFunctions(routerFunctions);
    }

    private List<RouterFunction<?>> routerFunctions() {
        //obtainApplicationContext()獲取ApplicationContext對(duì)象
        List<RouterFunction<?>> functions = obtainApplicationContext()
                //獲取指定bean的提供者鞠绰,即上文配置的路由類(lèi)
                .getBeanProvider(RouterFunction.class)
                //排序
                .orderedStream()
                //將流里面的都強(qiáng)轉(zhuǎn)成RouterFunction對(duì)象
                .map(router -> (RouterFunction<?>)router)
                .collect(Collectors.toList());
        return (!CollectionUtils.isEmpty(functions) ? functions : Collections.emptyList());
    }

    private void logRouterFunctions(List<RouterFunction<?>> routerFunctions) {
        //判斷當(dāng)前的日志級(jí)別是否是Debug
        if (logger.isDebugEnabled()) {
            int total = routerFunctions.size();
            String message = total + " RouterFunction(s) in " + formatMappingName();
            if (logger.isTraceEnabled()) {
                if (total > 0) {
                    routerFunctions.forEach(routerFunction -> logger.trace("Mapped " + routerFunction));
                }
                else {
                    logger.trace(message);
                }
            }
            else if (total > 0) {
                logger.debug(message);
            }
        }
    }
    ......
}
webflux中引入了一個(gè)新的HandlerAdapter,即HandlerFunctionAdapter
webflux中引入了一個(gè)新的HandlerResultHandler飒焦,即ServerResponseResultHandler

ServerResponseResultHandler實(shí)現(xiàn)了InitializingBean蜈膨,因此在其實(shí)例化的時(shí)候,會(huì)調(diào)用afterPropertiesSet方法

流式處理請(qǐng)求handler()
@Override
public Mono<Void> handle(ServerWebExchange exchange) {
    //handlerMappings在initStrategies()方法中已經(jīng)構(gòu)造好了
    if (this.handlerMappings == null) {
        return createNotFoundError();
    }
    //構(gòu)造Flux牺荠,數(shù)據(jù)源為handlerMappings集合
    return Flux.fromIterable(this.handlerMappings)
            //獲取Mono<Handler>對(duì)象翁巍,通過(guò)concatMap保證順序和handlerMappings順序一致
            //嚴(yán)格保證順序是因?yàn)樵谝粋€(gè)系統(tǒng)中可能存在一個(gè)Url有多個(gè)能夠處理的HandlerMapping的情況
            .concatMap(mapping -> mapping.getHandler(exchange))
            .next()
            //如果next()娶不到值則拋出錯(cuò)誤
            .switchIfEmpty(createNotFoundError())
            //觸發(fā)HandlerApter的handle方法
            .flatMap(handler -> invokeHandler(exchange, handler))
            //觸發(fā)HandlerResultHandler 的handleResult方法
            .flatMap(result -> handleResult(exchange, result));
}
觸發(fā)HandlerApter的handle方法
private Mono<HandlerResult> invokeHandler(ServerWebExchange exchange, Object handler) {
    if (this.handlerAdapters != null) {
        for (HandlerAdapter handlerAdapter : this.handlerAdapters) {
            if (handlerAdapter.supports(handler)) {
                return handlerAdapter.handle(exchange, handler);
            }
        }
    }
    return Mono.error(new IllegalStateException("No HandlerAdapter: " + handler));
}
觸發(fā)HandlerResultHandler 的handleResult方法
private Mono<Void> handleResult(ServerWebExchange exchange, HandlerResult result) {
    return getResultHandler(result).handleResult(exchange, result)
            .onErrorResume(ex -> result.applyExceptionHandler(ex).flatMap(exceptionResult ->
                    getResultHandler(exceptionResult).handleResult(exchange, exceptionResult)));
}

private HandlerResultHandler getResultHandler(HandlerResult handlerResult) {
    if (this.resultHandlers != null) {
        for (HandlerResultHandler resultHandler : this.resultHandlers) {
            if (resultHandler.supports(handlerResult)) {
                return resultHandler;
            }
        }
    }
    throw new IllegalStateException("No HandlerResultHandler for " + handlerResult.getReturnValue());
}
總結(jié)

DispatcherHandler的流程是

  • 1、通過(guò) HandlerMapping(和DispathcherServlet中的HandlerMapping不同)獲取到HandlerAdapter放到ServerWebExchange的屬性中
  • 2休雌、獲取到HandlerAdapter后觸發(fā)handle方法灶壶,得到HandlerResult
  • 3、通過(guò)HandlerResult杈曲,觸發(fā)handleResult驰凛,針對(duì)不同的返回類(lèi)找到不同的HandlerResultHandler如視圖渲染ViewResolutionResultHandler、ServerResponseResultHandler担扑、ResponseBodyResultHandler洒嗤、ResponseEntityResultHandler不同容器有不同的實(shí)現(xiàn),如Reactor魁亦,Jetty贯吓,Tomcat等。

參考:
https://www.cnblogs.com/limuma/p/9315442.html

https://blog.csdn.net/daniel7443/article/details/80761340

http://www.reibang.com/p/5172c48cb877

https://www.meiwen.com.cn/subject/zvvtqqtx.html

https://blog.51cto.com/liukang/2090191

https://www.cnblogs.com/yjmyzz/p/reactor-tutorial-1.html

https://www.cnblogs.com/davidwang456/p/10396168.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末敬鬓,一起剝皮案震驚了整個(gè)濱河市栋盹,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌利术,老刑警劉巖呈野,帶你破解...
    沈念sama閱讀 221,695評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異印叁,居然都是意外死亡被冒,警方通過(guò)查閱死者的電腦和手機(jī)军掂,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,569評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)昨悼,“玉大人蝗锥,你說(shuō)我怎么就攤上這事÷蚀ィ” “怎么了终议?”我有些...
    開(kāi)封第一講書(shū)人閱讀 168,130評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)葱蝗。 經(jīng)常有香客問(wèn)我穴张,道長(zhǎng),這世上最難降的妖魔是什么两曼? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 59,648評(píng)論 1 297
  • 正文 為了忘掉前任皂甘,我火速辦了婚禮,結(jié)果婚禮上悼凑,老公的妹妹穿的比我還像新娘偿枕。我一直安慰自己,他們只是感情好佛析,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,655評(píng)論 6 397
  • 文/花漫 我一把揭開(kāi)白布益老。 她就那樣靜靜地躺著,像睡著了一般寸莫。 火紅的嫁衣襯著肌膚如雪捺萌。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 52,268評(píng)論 1 309
  • 那天膘茎,我揣著相機(jī)與錄音桃纯,去河邊找鬼。 笑死披坏,一個(gè)胖子當(dāng)著我的面吹牛态坦,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播棒拂,決...
    沈念sama閱讀 40,835評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼伞梯,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了帚屉?” 一聲冷哼從身側(cè)響起谜诫,我...
    開(kāi)封第一講書(shū)人閱讀 39,740評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎攻旦,沒(méi)想到半個(gè)月后喻旷,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,286評(píng)論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡牢屋,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,375評(píng)論 3 340
  • 正文 我和宋清朗相戀三年且预,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了槽袄。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,505評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡锋谐,死狀恐怖遍尺,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情怀估,我是刑警寧澤狮鸭,帶...
    沈念sama閱讀 36,185評(píng)論 5 350
  • 正文 年R本政府宣布合搅,位于F島的核電站多搀,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏灾部。R本人自食惡果不足惜康铭,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,873評(píng)論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望赌髓。 院中可真熱鬧从藤,春花似錦、人聲如沸锁蠕。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,357評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)荣倾。三九已至悯搔,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間舌仍,已是汗流浹背妒貌。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,466評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留铸豁,地道東北人灌曙。 一個(gè)月前我還...
    沈念sama閱讀 48,921評(píng)論 3 376
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像节芥,于是被迫代替她去往敵國(guó)和親在刺。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,515評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容