spring webflux(一)

所有示例代碼:https://github.com/cumtbzy2011/webfluxdemo

功能與api

背景

Netty作為java領(lǐng)域首屈一指的nio框架鞭盟,其以優(yōu)越的性能被眾多中間件所使用辩蛋。但到了java的web開發(fā)領(lǐng)域默终,卻很難享受到Netty的性能優(yōu)勢立膛。其原因在于傳統(tǒng)web開發(fā)基于servlet容器廷没,許多依賴和開發(fā)框架都是基于servlet實現(xiàn)的蚜枢,比如spring妇蛀。而netty為了保持代碼的簡單和高效,并沒有實現(xiàn)servlet標(biāo)準(zhǔn)凌摄,這就導(dǎo)致將web容器遷移到netty后許多框架和第三方庫不能使用羡蛾,遷移的成本過大。但spring webflux出現(xiàn)改變了這一現(xiàn)狀锨亏。她在兼容原有mvc開發(fā)方式的同時痴怨,重寫和實現(xiàn)了大量第三方庫,在提升性能的同時器予,降低了遷移的成本穴张。同時spring webflux適配多種web容器卿吐,即使仍然使用tomcat也是可以的。

接口聲明

接口聲明除了保留原有注解式聲明的方式,為了滿足reactor的編程風(fēng)格嗤锉,額外支持了函數(shù)式聲明的方式奖地。通工具類RouterFunctions過構(gòu)造RounterFunction對象矛紫,并向Spring注入實現(xiàn)函數(shù)式接口聲明南窗。

    @Bean
    public TestHandler testHandler() {
        return new TestHandler();
    }

    @Bean
    public RouterFunction<ServerResponse> routes(TestHandler testHandler) {
        return RouterFunctions.route(RequestPredicates.POST("/route"),
            testHandler::echoName);
    }

    @GetMapping("anno")
    public String sayHello(String name) {
        return "hello world! " + name;
    }

    class TestHandler {
        public Mono<ServerResponse> echoName(ServerRequest request) {
            return request.bodyToMono(Post.class)
              .map(Post::getName)
              .flatMap(name -> ServerResponse.ok()
                .contentType(MediaType.TEXT_PLAIN)
                .body(BodyInserters.fromObject("hello world!" + name)));
        }
    }

在WebFlux中,request和respose不再是原來的ServletRequest和ServletRequest雷则,取而代之的是ServerRequest和ServerResponse辆雾。這兩個對象是webflux新出現(xiàn)的。首先webflux底層如果使用了reactor-netty月劈,那么自然就沒有所謂的servlet一說度迂,另外ServerRequest和ServerResponse提供了對non-blocking和backpressure特性的支持藤乙,提供了將Http消息內(nèi)容轉(zhuǎn)換成Mono和Flux的方法,使響應(yīng)式編程成為了可能英岭。

過濾器Filter

過濾器的使用方法和spring mvc類似湾盒,不過與ServerRequest和ServerResponse相同的是,webflux提供了一個新的過濾器接口WebFilter以提供對Mono和Flux的支持诅妹。代碼如下:

@Component
public class DemoWebFilter implements WebFilter{

    @Override
    public Mono<Void> filter(ServerWebExchange serverWebExchange, WebFilterChain webFilterChain) {
        if (!serverWebExchange.getRequest().getHeaders().containsKey("token")) {
            serverWebExchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);
            return Mono.empty();
        }
        return webFilterChain.filter(serverWebExchange);
    }
}

值得注意的是Mono<Void>這個返回值罚勾,在框架的很多地方都會用到。他意味著一個空的Mono吭狡,對于任何讀取尖殃,他會立刻發(fā)出一個complete信號。相比與直接返回void划煮,Mono<Void>作為方法的返回值時送丰,可以對該方法進行鏈?zhǔn)秸{(diào)用。另外雖然Mono<Void>雖然沒有返回值弛秋,但是其本身的complete或者error狀態(tài)器躏,也可以注冊回調(diào)進行異步處理。

異常處理

在Spring Webflux中蟹略,異常分兩種登失。一是controller中方法拋出的異常,這在webflux中同樣可以像在mvc中用@ExceptionHandler聲明異常處理方法挖炬。二是在WebHandler API這種比較偏底層的api揽浙,典型的是WebFilter,異常處理使用了支持Mono的新接口:WebExceptionHandler意敛,可用于處理來自WebFilter鏈和WebHandler的異常馅巷。使用WebExceptionHandler時,只要將其聲明為Spring bean即可自動注入并使用草姻,并可選擇通過bean聲明上的@Order或通過實現(xiàn)Ordered來表示優(yōu)先級钓猬。需要注意的是webflux有默認的WebExceptionHandler-DefaultErrorWebExceptionHandler,其order為默認的-1撩独。如果我們想自定義WebExceptionHandler敞曹,那么必須將order聲明為-2以上,否則異常將不會傳遞到我們自定義的WebExceptionHandler中跌榔。

@Component
//要比DefaultErrorWebExceptionHandler優(yōu)先級-1高
//比較底層,如果異常被@ExceptionHandler處理了捶障,那么將不會由此處理
//可以處理filter和webHandler中的異常
@Order(-2)
public class ErrorLogHandler implements WebExceptionHandler {
    @Override
    public Mono<Void> handle(ServerWebExchange exchange, Throwable ex) {
        exchange.getResponse().setStatusCode(HttpStatus.OK);
        byte[] bytes = ("ErrorLogHandler: " + ex.getMessage()).getBytes(StandardCharsets.UTF_8);
        DataBuffer wrap = exchange.getResponse().bufferFactory().wrap(bytes);
        return exchange.getResponse().writeWith(Flux.just(wrap));
    }
}
@ExceptionHandler(Exception.class)
    public String test(Exception e) {
        return "@ExceptionHandler: " + e.getMessage();
}

Multipart和Stream

在基礎(chǔ)框架reactor中Mono代表一個單次發(fā)送的數(shù)據(jù)源僧须,而Flux代表一個可多次發(fā)送的數(shù)據(jù)源。在spring webflux的controller中项炼,Mono很好理解担平,代表前端的一次傳參或接口的一次返回示绊。那么Flux該如何使用呢?簡單來說Flux在這兩個場景下使用:接受Multipart參數(shù)暂论、返回Stream類型數(shù)據(jù)或者用于分批返回面褐。代碼如下:

@PostMapping(value = "", consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
    Mono<String> requestBodyFlux(@RequestBody Flux<Part> parts) {
        return parts.map(part -> part instanceof FilePart
              ? part.name() + ":" + ((FilePart) part).filename()
              : part.name())
          .collect(Collectors.joining(",", "[", "]"));
    }

    //如果不是application/stream json則呼叫端無法滾動得到結(jié)果,將一直阻塞等待資料流結(jié)束或超時取胎。
    @GetMapping(value = "stream", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
    public Flux<Post> getBeanStream() {
        return Flux.interval(Duration.ofMillis(500))
          .map(l -> new Post("bian", LocalDateTime.now()))
          .log();
    }

Multipart是Htp請求的一種常見的數(shù)據(jù)結(jié)構(gòu)展哭,常用于表單提交。在spring mvc中闻蛀,表單中的每個鍵值對會映射成一個個part匪傍。到了webflux,自然而然地轉(zhuǎn)換成代表多個表單字段Flux觉痛。而返回值Flux役衡,則對應(yīng)了一種新的MediaType:APPLICATION_STREAM_JSON_VALUE。他的使用需要瀏覽器或者客戶端的支持薪棒。從使用中來看手蝎,瀏覽器會對每一次返回的數(shù)據(jù)分批處理。如果簡單的get調(diào)用俐芯,會在頁面滾動打印返回值棵介,直到Flux發(fā)射完成:


image.png

而如果接口并沒有聲明produces = MediaType.APPLICATION_STREAM_JSON_VALUE的媒體類型,瀏覽器將會在Flux所有數(shù)據(jù)發(fā)射完畢后一次性打印泼各。

WebSocket

在webflux中使用WebSocket功能很簡單鞍时,只要注冊WebSocketHandlerAdapter用于websocket協(xié)議的握手,再定義對應(yīng)路徑的websocket消息處理器即可:

@Configuration
@ComponentScan
@EnableWebFlux
class WebConfig {

    @Bean
    public HandlerMapping handlerMapping() {
        Map<String, WebSocketHandler> map = new HashMap<>();
        map.put("/echo", new EchoWebSocketHandler());
        SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
        mapping.setUrlMap(map);
        return mapping;
    }
    
    @Bean
    WebSocketHandlerAdapter webSocketHandlerAdapter(){
        return new WebSocketHandlerAdapter();
    }
}
public class EchoWebSocketHandler implements WebSocketHandler {

    public EchoWebSocketHandler() {
    }

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        return session.send(    //1. 向一個websocket連接發(fā)送一段消息
          session.receive()     //2. 獲得入站消息流
            .doOnNext(          //3. 對每一個websocket消息進行處理扣蜻,相當(dāng)于stream的map逆巍,返回的仍是一個流
              WebSocketMessage::retain  //4. 保留消息(主要針對池化內(nèi)存(內(nèi)部使用了netty的ByteBuf),使之引用計數(shù)+1莽使,避免過早回收)
            )
        );
    }
}

需要注意的是锐极,通過webSocketSession.receive() 獲得的Flux<WebSocketMessage>,其每一次發(fā)射的數(shù)據(jù)WebSocketMessage如果是再Netty容器中芳肌,是一個對Netty中ByteBuf的保證灵再,而ByteBuf在使用中有一點要注意,就是誰使用誰釋放亿笤、retain()和release()成對出現(xiàn)翎迁。所以當(dāng)把Flux<WebSocketMessage>發(fā)射的WebSocketMessage傳遞給其他方法使用時,注意要retain()增加一次計數(shù)净薛,避免上一級方法release()使ByteBuf引用計數(shù)歸零汪榔,導(dǎo)致過早回收。關(guān)于Netty的內(nèi)存使用肃拜,下面會寫一篇簡要的介紹文章痴腌。

Mongo

MongoDB由于支持異步客戶端雌团,所以很適合在webflux項目中使用,spring-data-reactor也在第一時間做了支持士聪。配合springboot的@EnableMongoAuditing注解锦援,可以很快搭建異步mongo客戶端。相關(guān)代碼如下:

@SpringBootApplication
@EnableMongoAuditing
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}


@Component
@Slf4j
class DataInitializer implements CommandLineRunner {

    private final PostRepository posts;

    public DataInitializer(PostRepository posts) {
        this.posts = posts;
    }

    @Override
    public void run(String[] args) {
        log.info("start data initialization  ...");
        this.posts
          .deleteAll()
          .thenMany(
            Flux
              .just("bianzhaoyu", "xinan")
              .flatMap(
                name -> this.posts.save(Post.builder().name(name).age(25).build())
              )
          )
          .log()
          .subscribe(
            null,
            null,
            () -> log.info("done initialization...")
          );

    }

}

@RestController()
@RequestMapping(value = "/posts")
class PostController {

    private final PostRepository posts;

    public PostController(PostRepository posts) {
        this.posts = posts;
    }

    @GetMapping("")
    public Flux<Post> all() {
        return this.posts.findAll();
    }

    @PostMapping("")
    public Mono<Post> create(@RequestBody Post post) {
        return this.posts.save(post);
    }

    @GetMapping("/{id}")
    public Mono<Post> get(@PathVariable("id") String id) {
        return this.posts.findById(id);
    }

    @PutMapping("/{id}")
    public Mono<Post> update(@PathVariable("id") String id, @RequestBody Post post) {
        return this.posts.findById(id)
          .map(p -> {
              p.setName(post.getName());
              p.setAge(post.getAge());

              return p;
          })
          .flatMap(p -> this.posts.save(p));
    }

    @DeleteMapping("/{id}")
    public Mono<Void> delete(@PathVariable("id") String id) {
        return this.posts.deleteById(id);
    }

}

interface PostRepository extends ReactiveMongoRepository<Post, String> {
}

@Data
@ToString
@Builder
@NoArgsConstructor
@AllArgsConstructor
class Post {

    @Id
    private String id;
    private String name;
    private Integer age;

    @CreatedDate
    private LocalDateTime createdDate;
}

配置如下:

spring:
  data:
    mongodb:
      uri: mongodb://localhost:27017/blog
      grid-fs-database: images


<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
        </dependency>
</dependencies>

Redis

異步Redis客戶端的使用和普通Redis客戶端類似剥悟,只是RedisTemplate的方法都原生支持了異步調(diào)用灵寺。使用時只要引入spring-boot-starter-data-redis-reactive依賴,并注冊ReactiveRedisTemplate即可:

@Bean
    public ReactiveRedisTemplate<String, Post> reactiveJsonPostRedisTemplate(
      ReactiveRedisConnectionFactory connectionFactory) {
        
        RedisSerializationContext<String, Post> serializationContext = RedisSerializationContext
          .<String, Post>newSerializationContext(new StringRedisSerializer())
          .hashKey(new StringRedisSerializer())
          .hashValue(new Jackson2JsonRedisSerializer<>(Post.class))
          .build();
        return new ReactiveRedisTemplate<>(connectionFactory, serializationContext);
    }

@Component
class PostRepository {

    ReactiveRedisOperations<String, Post> template;

    public PostRepository(ReactiveRedisOperations<String, Post> template) {
        this.template = template;
    }

    Flux<Post> findAll() {
        return template.<String, Post>opsForHash().values("posts");
    }

    Mono<Post> findById(String id) {
        return template.<String, Post>opsForHash().get("posts", id);
    }

    Mono<Post> save(Post post) {
        if (post.getId() != null) {
            String id = UUID.randomUUID().toString();
            post.setId(id);
        }
        return template.<String, Post>opsForHash().put("posts", post.getId(), post)
          .log()
          .map(p -> post);

    }

    Mono<Void> deleteById(String id) {
        return template.<String, Post>opsForHash().remove("posts", id)
          .flatMap(p -> Mono.<Void>empty());
    }

    Mono<Boolean> deleteAll() {
        return template.<String, Post>opsForHash().delete("posts");
    }

}

MySQL

mysql作為現(xiàn)在使用最廣的數(shù)據(jù)存儲工具懦胞,可以說是選擇任何框架時必須考慮到兼容性的一點替久。但是遺憾的是,由于JDBC協(xié)議只支持同步訪問躏尉,spring目前并沒有直接對jdbc的reactor客戶端的支持蚯根。雖然可以通過引入第三方異步數(shù)據(jù)庫連接池,或者將普通jpa方法用Mono胀糜,F(xiàn)lux指定調(diào)用線程池的方式進行包裝颅拦,但是作為關(guān)系型數(shù)據(jù)庫最重要的一點:事務(wù),卻無法用@Transactional實現(xiàn)教藻。雖然可以將一個事務(wù)的代碼寫在一個異步函數(shù)中距帅,但卻無法做到像同步方法那樣,使用@Transactional各個業(yè)務(wù)方法括堤,導(dǎo)致可復(fù)用性和實用性極低碌秸。這里使用一個異步j(luò)dbc線程池rxjava2-jdbc,相比與Mono/Flux包裝的方式悄窃,rxjava2-jdbc在返回一個connection時是異步的讥电,雖然由于jdbc協(xié)議的線程,執(zhí)行sql語句的時候仍然是同步阻塞的轧抗。rxjava-jdbc內(nèi)部維護了一個線程池用于執(zhí)行阻塞代碼恩敌,這也避免了我們自定義線程池的麻煩。
pom依賴:

        <dependency>
            <groupId>com.github.davidmoten</groupId>
            <artifactId>rxjava2-jdbc</artifactId>
            <version>0.1-RC23</version>
        </dependency>

代碼如下:

/**
 * spring-data-jpa是同步的横媚,repository返回的結(jié)果并不是Mono或者Flux形式纠炮。
 *     可以使用第三方異步j(luò)dbc連接池rxjava2-jdbc,但是由于每個方法是異步的灯蝴,
 * 當(dāng)數(shù)個異步方法組合起來時恢口,并不能保證每個方法都是由一個線程按順序調(diào)用的,
 * 這就使基于ThreadLocal的@Transactional無法使用
 *     當(dāng)然穷躁,可以手動在一個異步方法中開啟并提交事務(wù)耕肩,但是這還是失去了@Transactional組合
 * 不同方法到一個事物的便利性和可擴展性
 * @author xinan
 */
@Component
public class RxJava2PostRepository {
    private Database db;

    RxJava2PostRepository(Database db) {
        this.db = db;
    }

    public Observable<Post> findAll() {
        return this.db.select("select * from posts")
            .get(
                rs -> new Post(rs.getLong("id"),
                    rs.getString("name"),
                    rs.getInt("age")
                )
            )
            .toObservable();
    }

    public Single<Post> findById(Long id) {
        return this.db.select("select * from posts where id=?")
            .parameter(id)
            .get(
                rs -> new Post(rs.getLong("id"),
                    rs.getString("name"),
                    rs.getInt("age")
                )
            )
            .firstElement()
            .toSingle();
    }

    public Single<Integer> save(Post post) {
        return this.db.update("insert into posts(name, age) values(?, ?)")
            .parameters(post.getName(), post.getAge())
            .returnGeneratedKeys()
            .getAs(Integer.class)
            .firstElement()
            .toSingle();
    }

    String sql = "insert into posts(title, content) values(?, ?)";

    //使用事務(wù)
    public Single<Integer> saveTx(Post post) {
        return db.connection()
          .map(connection -> {
              connection.setAutoCommit(false);
              PreparedStatement pstmt = connection.prepareStatement(sql);
              pstmt.setInt(1, post.getAge());
              pstmt.setInt(2, post.getAge());
              int i = pstmt.executeUpdate();
              pstmt.close();
              connection.commit();
              return i;
          });
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子看疗,更是在濱河造成了極大的恐慌,老刑警劉巖睦授,帶你破解...
    沈念sama閱讀 222,378評論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件两芳,死亡現(xiàn)場離奇詭異,居然都是意外死亡去枷,警方通過查閱死者的電腦和手機怖辆,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,970評論 3 399
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來删顶,“玉大人竖螃,你說我怎么就攤上這事《河啵” “怎么了特咆?”我有些...
    開封第一講書人閱讀 168,983評論 0 362
  • 文/不壞的土叔 我叫張陵,是天一觀的道長录粱。 經(jīng)常有香客問我腻格,道長,這世上最難降的妖魔是什么啥繁? 我笑而不...
    開封第一講書人閱讀 59,938評論 1 299
  • 正文 為了忘掉前任菜职,我火速辦了婚禮,結(jié)果婚禮上旗闽,老公的妹妹穿的比我還像新娘酬核。我一直安慰自己,他們只是感情好适室,可當(dāng)我...
    茶點故事閱讀 68,955評論 6 398
  • 文/花漫 我一把揭開白布嫡意。 她就那樣靜靜地躺著,像睡著了一般亭病。 火紅的嫁衣襯著肌膚如雪鹅很。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,549評論 1 312
  • 那天罪帖,我揣著相機與錄音促煮,去河邊找鬼。 笑死整袁,一個胖子當(dāng)著我的面吹牛菠齿,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播坐昙,決...
    沈念sama閱讀 41,063評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼绳匀,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起疾棵,我...
    開封第一講書人閱讀 39,991評論 0 277
  • 序言:老撾萬榮一對情侶失蹤戈钢,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后是尔,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體殉了,經(jīng)...
    沈念sama閱讀 46,522評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,604評論 3 342
  • 正文 我和宋清朗相戀三年拟枚,在試婚紗的時候發(fā)現(xiàn)自己被綠了薪铜。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,742評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡恩溅,死狀恐怖隔箍,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情脚乡,我是刑警寧澤蜒滩,帶...
    沈念sama閱讀 36,413評論 5 351
  • 正文 年R本政府宣布,位于F島的核電站奶稠,受9級特大地震影響帮掉,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜窒典,卻給世界環(huán)境...
    茶點故事閱讀 42,094評論 3 335
  • 文/蒙蒙 一蟆炊、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧瀑志,春花似錦涩搓、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,572評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至战得,卻和暖如春充边,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背常侦。 一陣腳步聲響...
    開封第一講書人閱讀 33,671評論 1 274
  • 我被黑心中介騙來泰國打工浇冰, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人聋亡。 一個月前我還...
    沈念sama閱讀 49,159評論 3 378
  • 正文 我出身青樓肘习,卻偏偏與公主長得像,于是被迫代替她去往敵國和親坡倔。 傳聞我的和親對象是個殘疾皇子漂佩,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,747評論 2 361

推薦閱讀更多精彩內(nèi)容