所有示例代碼:https://github.com/cumtbzy2011/webfluxdemo
功能與api
背景
Netty作為java領(lǐng)域首屈一指的nio框架鞭盟,其以優(yōu)越的性能被眾多中間件所使用辩蛋。但到了java的web開發(fā)領(lǐng)域默终,卻很難享受到Netty的性能優(yōu)勢立膛。其原因在于傳統(tǒng)web開發(fā)基于servlet容器廷没,許多依賴和開發(fā)框架都是基于servlet實現(xiàn)的蚜枢,比如spring妇蛀。而netty為了保持代碼的簡單和高效,并沒有實現(xiàn)servlet標(biāo)準(zhǔn)凌摄,這就導(dǎo)致將web容器遷移到netty后許多框架和第三方庫不能使用羡蛾,遷移的成本過大。但spring webflux出現(xiàn)改變了這一現(xiàn)狀锨亏。她在兼容原有mvc開發(fā)方式的同時痴怨,重寫和實現(xiàn)了大量第三方庫,在提升性能的同時器予,降低了遷移的成本穴张。同時spring webflux適配多種web容器卿吐,即使仍然使用tomcat也是可以的。
接口聲明
接口聲明除了保留原有注解式聲明的方式,為了滿足reactor的編程風(fēng)格嗤锉,額外支持了函數(shù)式聲明的方式奖地。通工具類RouterFunctions過構(gòu)造RounterFunction對象矛紫,并向Spring注入實現(xiàn)函數(shù)式接口聲明南窗。
@Bean
public TestHandler testHandler() {
return new TestHandler();
}
@Bean
public RouterFunction<ServerResponse> routes(TestHandler testHandler) {
return RouterFunctions.route(RequestPredicates.POST("/route"),
testHandler::echoName);
}
@GetMapping("anno")
public String sayHello(String name) {
return "hello world! " + name;
}
class TestHandler {
public Mono<ServerResponse> echoName(ServerRequest request) {
return request.bodyToMono(Post.class)
.map(Post::getName)
.flatMap(name -> ServerResponse.ok()
.contentType(MediaType.TEXT_PLAIN)
.body(BodyInserters.fromObject("hello world!" + name)));
}
}
在WebFlux中,request和respose不再是原來的ServletRequest和ServletRequest雷则,取而代之的是ServerRequest和ServerResponse辆雾。這兩個對象是webflux新出現(xiàn)的。首先webflux底層如果使用了reactor-netty月劈,那么自然就沒有所謂的servlet一說度迂,另外ServerRequest和ServerResponse提供了對non-blocking和backpressure特性的支持藤乙,提供了將Http消息內(nèi)容轉(zhuǎn)換成Mono和Flux的方法,使響應(yīng)式編程成為了可能英岭。
過濾器Filter
過濾器的使用方法和spring mvc類似湾盒,不過與ServerRequest和ServerResponse相同的是,webflux提供了一個新的過濾器接口WebFilter以提供對Mono和Flux的支持诅妹。代碼如下:
@Component
public class DemoWebFilter implements WebFilter{
@Override
public Mono<Void> filter(ServerWebExchange serverWebExchange, WebFilterChain webFilterChain) {
if (!serverWebExchange.getRequest().getHeaders().containsKey("token")) {
serverWebExchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);
return Mono.empty();
}
return webFilterChain.filter(serverWebExchange);
}
}
值得注意的是Mono<Void>這個返回值罚勾,在框架的很多地方都會用到。他意味著一個空的Mono吭狡,對于任何讀取尖殃,他會立刻發(fā)出一個complete信號。相比與直接返回void划煮,Mono<Void>作為方法的返回值時送丰,可以對該方法進行鏈?zhǔn)秸{(diào)用。另外雖然Mono<Void>雖然沒有返回值弛秋,但是其本身的complete或者error狀態(tài)器躏,也可以注冊回調(diào)進行異步處理。
異常處理
在Spring Webflux中蟹略,異常分兩種登失。一是controller中方法拋出的異常,這在webflux中同樣可以像在mvc中用@ExceptionHandler聲明異常處理方法挖炬。二是在WebHandler API這種比較偏底層的api揽浙,典型的是WebFilter,異常處理使用了支持Mono的新接口:WebExceptionHandler意敛,可用于處理來自WebFilter鏈和WebHandler的異常馅巷。使用WebExceptionHandler時,只要將其聲明為Spring bean即可自動注入并使用草姻,并可選擇通過bean聲明上的@Order或通過實現(xiàn)Ordered來表示優(yōu)先級钓猬。需要注意的是webflux有默認的WebExceptionHandler-DefaultErrorWebExceptionHandler,其order為默認的-1撩独。如果我們想自定義WebExceptionHandler敞曹,那么必須將order聲明為-2以上,否則異常將不會傳遞到我們自定義的WebExceptionHandler中跌榔。
@Component
//要比DefaultErrorWebExceptionHandler優(yōu)先級-1高
//比較底層,如果異常被@ExceptionHandler處理了捶障,那么將不會由此處理
//可以處理filter和webHandler中的異常
@Order(-2)
public class ErrorLogHandler implements WebExceptionHandler {
@Override
public Mono<Void> handle(ServerWebExchange exchange, Throwable ex) {
exchange.getResponse().setStatusCode(HttpStatus.OK);
byte[] bytes = ("ErrorLogHandler: " + ex.getMessage()).getBytes(StandardCharsets.UTF_8);
DataBuffer wrap = exchange.getResponse().bufferFactory().wrap(bytes);
return exchange.getResponse().writeWith(Flux.just(wrap));
}
}
@ExceptionHandler(Exception.class)
public String test(Exception e) {
return "@ExceptionHandler: " + e.getMessage();
}
Multipart和Stream
在基礎(chǔ)框架reactor中Mono代表一個單次發(fā)送的數(shù)據(jù)源僧须,而Flux代表一個可多次發(fā)送的數(shù)據(jù)源。在spring webflux的controller中项炼,Mono很好理解担平,代表前端的一次傳參或接口的一次返回示绊。那么Flux該如何使用呢?簡單來說Flux在這兩個場景下使用:接受Multipart參數(shù)暂论、返回Stream類型數(shù)據(jù)或者用于分批返回面褐。代碼如下:
@PostMapping(value = "", consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
Mono<String> requestBodyFlux(@RequestBody Flux<Part> parts) {
return parts.map(part -> part instanceof FilePart
? part.name() + ":" + ((FilePart) part).filename()
: part.name())
.collect(Collectors.joining(",", "[", "]"));
}
//如果不是application/stream json則呼叫端無法滾動得到結(jié)果,將一直阻塞等待資料流結(jié)束或超時取胎。
@GetMapping(value = "stream", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<Post> getBeanStream() {
return Flux.interval(Duration.ofMillis(500))
.map(l -> new Post("bian", LocalDateTime.now()))
.log();
}
Multipart是Htp請求的一種常見的數(shù)據(jù)結(jié)構(gòu)展哭,常用于表單提交。在spring mvc中闻蛀,表單中的每個鍵值對會映射成一個個part匪傍。到了webflux,自然而然地轉(zhuǎn)換成代表多個表單字段Flux觉痛。而返回值Flux役衡,則對應(yīng)了一種新的MediaType:APPLICATION_STREAM_JSON_VALUE。他的使用需要瀏覽器或者客戶端的支持薪棒。從使用中來看手蝎,瀏覽器會對每一次返回的數(shù)據(jù)分批處理。如果簡單的get調(diào)用俐芯,會在頁面滾動打印返回值棵介,直到Flux發(fā)射完成:
而如果接口并沒有聲明produces = MediaType.APPLICATION_STREAM_JSON_VALUE的媒體類型,瀏覽器將會在Flux所有數(shù)據(jù)發(fā)射完畢后一次性打印泼各。
WebSocket
在webflux中使用WebSocket功能很簡單鞍时,只要注冊WebSocketHandlerAdapter用于websocket協(xié)議的握手,再定義對應(yīng)路徑的websocket消息處理器即可:
@Configuration
@ComponentScan
@EnableWebFlux
class WebConfig {
@Bean
public HandlerMapping handlerMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/echo", new EchoWebSocketHandler());
SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
mapping.setUrlMap(map);
return mapping;
}
@Bean
WebSocketHandlerAdapter webSocketHandlerAdapter(){
return new WebSocketHandlerAdapter();
}
}
public class EchoWebSocketHandler implements WebSocketHandler {
public EchoWebSocketHandler() {
}
@Override
public Mono<Void> handle(WebSocketSession session) {
return session.send( //1. 向一個websocket連接發(fā)送一段消息
session.receive() //2. 獲得入站消息流
.doOnNext( //3. 對每一個websocket消息進行處理扣蜻,相當(dāng)于stream的map逆巍,返回的仍是一個流
WebSocketMessage::retain //4. 保留消息(主要針對池化內(nèi)存(內(nèi)部使用了netty的ByteBuf),使之引用計數(shù)+1莽使,避免過早回收)
)
);
}
}
需要注意的是锐极,通過webSocketSession.receive() 獲得的Flux<WebSocketMessage>,其每一次發(fā)射的數(shù)據(jù)WebSocketMessage如果是再Netty容器中芳肌,是一個對Netty中ByteBuf的保證灵再,而ByteBuf在使用中有一點要注意,就是誰使用誰釋放亿笤、retain()和release()成對出現(xiàn)翎迁。所以當(dāng)把Flux<WebSocketMessage>發(fā)射的WebSocketMessage傳遞給其他方法使用時,注意要retain()增加一次計數(shù)净薛,避免上一級方法release()使ByteBuf引用計數(shù)歸零汪榔,導(dǎo)致過早回收。關(guān)于Netty的內(nèi)存使用肃拜,下面會寫一篇簡要的介紹文章痴腌。
Mongo
MongoDB由于支持異步客戶端雌团,所以很適合在webflux項目中使用,spring-data-reactor也在第一時間做了支持士聪。配合springboot的@EnableMongoAuditing注解锦援,可以很快搭建異步mongo客戶端。相關(guān)代碼如下:
@SpringBootApplication
@EnableMongoAuditing
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
@Component
@Slf4j
class DataInitializer implements CommandLineRunner {
private final PostRepository posts;
public DataInitializer(PostRepository posts) {
this.posts = posts;
}
@Override
public void run(String[] args) {
log.info("start data initialization ...");
this.posts
.deleteAll()
.thenMany(
Flux
.just("bianzhaoyu", "xinan")
.flatMap(
name -> this.posts.save(Post.builder().name(name).age(25).build())
)
)
.log()
.subscribe(
null,
null,
() -> log.info("done initialization...")
);
}
}
@RestController()
@RequestMapping(value = "/posts")
class PostController {
private final PostRepository posts;
public PostController(PostRepository posts) {
this.posts = posts;
}
@GetMapping("")
public Flux<Post> all() {
return this.posts.findAll();
}
@PostMapping("")
public Mono<Post> create(@RequestBody Post post) {
return this.posts.save(post);
}
@GetMapping("/{id}")
public Mono<Post> get(@PathVariable("id") String id) {
return this.posts.findById(id);
}
@PutMapping("/{id}")
public Mono<Post> update(@PathVariable("id") String id, @RequestBody Post post) {
return this.posts.findById(id)
.map(p -> {
p.setName(post.getName());
p.setAge(post.getAge());
return p;
})
.flatMap(p -> this.posts.save(p));
}
@DeleteMapping("/{id}")
public Mono<Void> delete(@PathVariable("id") String id) {
return this.posts.deleteById(id);
}
}
interface PostRepository extends ReactiveMongoRepository<Post, String> {
}
@Data
@ToString
@Builder
@NoArgsConstructor
@AllArgsConstructor
class Post {
@Id
private String id;
private String name;
private Integer age;
@CreatedDate
private LocalDateTime createdDate;
}
配置如下:
spring:
data:
mongodb:
uri: mongodb://localhost:27017/blog
grid-fs-database: images
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
</dependencies>
Redis
異步Redis客戶端的使用和普通Redis客戶端類似剥悟,只是RedisTemplate的方法都原生支持了異步調(diào)用灵寺。使用時只要引入spring-boot-starter-data-redis-reactive依賴,并注冊ReactiveRedisTemplate即可:
@Bean
public ReactiveRedisTemplate<String, Post> reactiveJsonPostRedisTemplate(
ReactiveRedisConnectionFactory connectionFactory) {
RedisSerializationContext<String, Post> serializationContext = RedisSerializationContext
.<String, Post>newSerializationContext(new StringRedisSerializer())
.hashKey(new StringRedisSerializer())
.hashValue(new Jackson2JsonRedisSerializer<>(Post.class))
.build();
return new ReactiveRedisTemplate<>(connectionFactory, serializationContext);
}
@Component
class PostRepository {
ReactiveRedisOperations<String, Post> template;
public PostRepository(ReactiveRedisOperations<String, Post> template) {
this.template = template;
}
Flux<Post> findAll() {
return template.<String, Post>opsForHash().values("posts");
}
Mono<Post> findById(String id) {
return template.<String, Post>opsForHash().get("posts", id);
}
Mono<Post> save(Post post) {
if (post.getId() != null) {
String id = UUID.randomUUID().toString();
post.setId(id);
}
return template.<String, Post>opsForHash().put("posts", post.getId(), post)
.log()
.map(p -> post);
}
Mono<Void> deleteById(String id) {
return template.<String, Post>opsForHash().remove("posts", id)
.flatMap(p -> Mono.<Void>empty());
}
Mono<Boolean> deleteAll() {
return template.<String, Post>opsForHash().delete("posts");
}
}
MySQL
mysql作為現(xiàn)在使用最廣的數(shù)據(jù)存儲工具懦胞,可以說是選擇任何框架時必須考慮到兼容性的一點替久。但是遺憾的是,由于JDBC協(xié)議只支持同步訪問躏尉,spring目前并沒有直接對jdbc的reactor客戶端的支持蚯根。雖然可以通過引入第三方異步數(shù)據(jù)庫連接池,或者將普通jpa方法用Mono胀糜,F(xiàn)lux指定調(diào)用線程池的方式進行包裝颅拦,但是作為關(guān)系型數(shù)據(jù)庫最重要的一點:事務(wù),卻無法用@Transactional實現(xiàn)教藻。雖然可以將一個事務(wù)的代碼寫在一個異步函數(shù)中距帅,但卻無法做到像同步方法那樣,使用@Transactional各個業(yè)務(wù)方法括堤,導(dǎo)致可復(fù)用性和實用性極低碌秸。這里使用一個異步j(luò)dbc線程池rxjava2-jdbc,相比與Mono/Flux包裝的方式悄窃,rxjava2-jdbc在返回一個connection時是異步的讥电,雖然由于jdbc協(xié)議的線程,執(zhí)行sql語句的時候仍然是同步阻塞的轧抗。rxjava-jdbc內(nèi)部維護了一個線程池用于執(zhí)行阻塞代碼恩敌,這也避免了我們自定義線程池的麻煩。
pom依賴:
<dependency>
<groupId>com.github.davidmoten</groupId>
<artifactId>rxjava2-jdbc</artifactId>
<version>0.1-RC23</version>
</dependency>
代碼如下:
/**
* spring-data-jpa是同步的横媚,repository返回的結(jié)果并不是Mono或者Flux形式纠炮。
* 可以使用第三方異步j(luò)dbc連接池rxjava2-jdbc,但是由于每個方法是異步的灯蝴,
* 當(dāng)數(shù)個異步方法組合起來時恢口,并不能保證每個方法都是由一個線程按順序調(diào)用的,
* 這就使基于ThreadLocal的@Transactional無法使用
* 當(dāng)然穷躁,可以手動在一個異步方法中開啟并提交事務(wù)耕肩,但是這還是失去了@Transactional組合
* 不同方法到一個事物的便利性和可擴展性
* @author xinan
*/
@Component
public class RxJava2PostRepository {
private Database db;
RxJava2PostRepository(Database db) {
this.db = db;
}
public Observable<Post> findAll() {
return this.db.select("select * from posts")
.get(
rs -> new Post(rs.getLong("id"),
rs.getString("name"),
rs.getInt("age")
)
)
.toObservable();
}
public Single<Post> findById(Long id) {
return this.db.select("select * from posts where id=?")
.parameter(id)
.get(
rs -> new Post(rs.getLong("id"),
rs.getString("name"),
rs.getInt("age")
)
)
.firstElement()
.toSingle();
}
public Single<Integer> save(Post post) {
return this.db.update("insert into posts(name, age) values(?, ?)")
.parameters(post.getName(), post.getAge())
.returnGeneratedKeys()
.getAs(Integer.class)
.firstElement()
.toSingle();
}
String sql = "insert into posts(title, content) values(?, ?)";
//使用事務(wù)
public Single<Integer> saveTx(Post post) {
return db.connection()
.map(connection -> {
connection.setAutoCommit(false);
PreparedStatement pstmt = connection.prepareStatement(sql);
pstmt.setInt(1, post.getAge());
pstmt.setInt(2, post.getAge());
int i = pstmt.executeUpdate();
pstmt.close();
connection.commit();
return i;
});
}
}