前言
從使用到源碼,研究了很久WebFlux及Reactor
今天準(zhǔn)備整合一下知識(shí)浑度,自己寫出一個(gè)類似Reactor-Netty的框架千埃,可以練習(xí)一下Reactor的使用,同時(shí)回顧一下netty的知識(shí)
原材料即Reactor
,Netty
最終實(shí)現(xiàn)如下的效果即可,既可以像Reactor-Netty一樣寫一個(gè)接口,并支持響應(yīng)式返回夯膀,底層使用Netty進(jìn)行網(wǎng)絡(luò)通訊
DisposableServer server = HttpServer.create().port(7892) // 綁定端口
.route( // 路由
routes -> routes.get("/hello", (request, response) ->
response.sendString(Mono.just("Hello World"))
).get("/hello2", (request, response) ->
response.sendString(Mono.just("Hello World2"))
)
)
.bindNow();
server.onDispose().block();
此時(shí)訪問(wèn)端口7892的"/hello"路徑就會(huì)返回“Hello World”
依賴
要實(shí)現(xiàn)出這樣的效果,首先就是要引入兩個(gè)依賴Reactor
苍蔬,Netty
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.3.8.RELEASE</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.51.Final</version>
</dependency>
netty服務(wù)
然后思路也并不復(fù)雜棍郎,不過(guò)就是定義一個(gè)類:HttpServer
,然后create方法時(shí)啟動(dòng)一個(gè)Netty服務(wù)端即可银室,嘗試一下如下
public class HttpServerV1 {
ServerBootstrap bootstrap; // netty服務(wù)構(gòu)造器
public static HttpServerV1 create() {// 靜態(tài)創(chuàng)建
return new HttpServerV1();
}
public HttpServerV1() { // 初始化涂佃,開始創(chuàng)建netty服務(wù)端構(gòu)造器
bootstrap = new ServerBootstrap();
bootstrap.group(new NioEventLoopGroup(1), new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_REUSEADDR, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) { // 用一個(gè)簡(jiǎn)單的時(shí)間處理器励翼,單純打印
ch.pipeline().addLast(new HttpRequestDecoder(), new HttpResponseEncoder(), new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof DefaultHttpRequest) {
DefaultHttpRequest request = (DefaultHttpRequest) msg; // 請(qǐng)求信息
ByteBuf result = Unpooled.copiedBuffer("Hello World: " + request.uri(), CharsetUtil.UTF_8);
DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, result);
ctx.writeAndFlush(response); // 返回
ctx.channel().close(); // 關(guān)閉連接
}
}
});
}
});
}
public HttpServerV1 port(int port) { // 設(shè)置端口
bootstrap.localAddress(new InetSocketAddress(port));
return this;
}
public HttpServerV1 bindNow() { // 開始綁定端口
bootstrap.bind();
return this;
}
}
有了netty很簡(jiǎn)單就寫完了,一個(gè)簡(jiǎn)單的web接口:請(qǐng)求后返回“hello world”+ 請(qǐng)求路徑辜荠,使用如下
public static void main(String[] args) {
HttpServerV1.create().port(7893).bindNow();
}
此時(shí)瀏覽器訪問(wèn)7893端口汽抚,輸出“Hello world”+ 請(qǐng)求路徑
守護(hù)線程&阻塞
此時(shí)再回頭看reactor-netty的使用例子,有一句server.onDispose().block()
伯病,意思是阻塞至通道服務(wù)關(guān)閉造烁,如果去掉block()方法則運(yùn)行的服務(wù)很快結(jié)束了
這里我當(dāng)時(shí)比較奇怪,為什么我寫的HttpServer會(huì)一直運(yùn)行不需要寫什么阻塞
調(diào)查了一下午笛,發(fā)現(xiàn)原來(lái)reactor-netty創(chuàng)建的NioEventLoop都是守護(hù)線程惭蟋,所以main線程如果結(jié)束后netty就停止了,至于為什么是守護(hù)線程药磺,可能是因?yàn)闉榱嘶厥召Y源吧
總之不管因?yàn)槭裁锤孀椋乙策@么干吧,先建一個(gè)線程工廠癌佩,生產(chǎn)的線程都是守護(hù)線程
public class ReactorNettyThreadFactory implements ThreadFactory {
AtomicInteger threadNo = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "reactor-nio-" + (threadNo.incrementAndGet()));
thread.setDaemon(true); // 守護(hù)線程
return thread;
}
}
此時(shí)Netty服務(wù)初始化代碼變?yōu)?/p>
ThreadFactory threadFactory = new ReactorNettyThreadFactory();
bootstrap
.group(new NioEventLoopGroup(1, threadFactory), new NioEventLoopGroup(threadFactory)
這是所有的EventLoop的線程都是守護(hù)線程木缝,如果main方法執(zhí)行完畢程序就結(jié)束了,這樣肯定不行围辙,所以main方法中一定要加上阻塞才能讓服務(wù)一直運(yùn)行
阻塞到什么時(shí)候吶我碟,我們是web服務(wù)程序,應(yīng)該阻塞到服務(wù)通道關(guān)閉姚建,而剛好Netty的bind()方法可以獲取到channel關(guān)閉的Future矫俺,此時(shí)bindNow方法變?yōu)槿缦?/p>
private ChannelFuture closeFuture; // 通道的關(guān)閉的Future
public HttpServer bindNow() {
closeFuture = bootstrap.bind().channel().closeFuture();
return this;
}
main方法如何阻塞到channel關(guān)閉吶,一個(gè)closeFuture.sync()
其實(shí)就可以掸冤,但我們使用Reactor厘托,當(dāng)然要發(fā)揮Reactor的優(yōu)勢(shì),因?yàn)槲覀兛赡苓€會(huì)在close事件發(fā)生時(shí)訂閱一些操作贩虾,所以我們把closeFuture轉(zhuǎn)換為Reactor的Mono發(fā)布者,發(fā)布得就是通道關(guān)閉事件沥阱,取名為onDispose
缎罢,即服務(wù)關(guān)閉的發(fā)布者
public Mono<Void> onDispose() { // 這里源碼實(shí)現(xiàn)更復(fù)雜,簡(jiǎn)化一下
return Mono.create(sink->{
closeFuture.addListener((ChannelFutureListener) future -> sink.success());
});
}
此時(shí)回到使用考杉,使用代碼如下:
public static void main(String[] args) {
HttpServer httpServer = HttpServer.create()
.port(7893)
.bindNow();
httpServer.onDispose().block();
}
感覺(jué)上就和reactor-netty的使用很像了策精,如果不block(),程序立馬結(jié)束
但此時(shí)我們的web服務(wù)只有一個(gè)崇棠,無(wú)法根據(jù)路徑走不同的方法咽袜,所以下一步:加路由
路由
路由也好理解,就是一個(gè)path到方法的映射map枕稀,先對(duì)照reactor-netty學(xué)一下我們的方法應(yīng)該是如何抽象
首先有兩個(gè)參數(shù):request(用于獲取請(qǐng)求的參數(shù))询刹,response(用于寫回響應(yīng))
request簡(jiǎn)單一點(diǎn)直接用netty的DefaultHttpRequest
但response可不簡(jiǎn)單谜嫉,它有一個(gè)send方法用于寫回?cái)?shù)據(jù),它接受的參數(shù)是一個(gè)Publisher凹联,所以這個(gè)方法的作用是在Publisher發(fā)布時(shí)能寫回?cái)?shù)據(jù)至客戶端channel沐兰,所以send方法本質(zhì)是訂閱一個(gè)程序數(shù)據(jù)準(zhǔn)備好后,發(fā)布數(shù)據(jù)至客戶端的步驟蔽挠,由于writeAndFlush也是異步操作住闯,所以要再返回一個(gè)Publisher發(fā)布寫完事件,以便后續(xù)關(guān)閉通道的相關(guān)處理澳淑,由于這個(gè)發(fā)布者只是事件沒(méi)有數(shù)據(jù)所以是Void
比原,整個(gè)過(guò)程使用flatMap
即可實(shí)現(xiàn),如下
public class HttpServerResponse {
private ChannelHandlerContext ctx;
public HttpServerResponse(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
public Mono<Void> sendString(Mono<String> publisher) {
return send(publisher.flatMap(content-> Mono.just(Unpooled.copiedBuffer(content, CharsetUtil.UTF_8))));
}
public Mono<Void> send(Mono<ByteBuf> publisher) {
return publisher.flatMap(content-> Mono.create(sink-> {
ChannelFuture channelFuture = ctx.writeAndFlush(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content));
channelFuture.addListener(future -> {
sink.success();
});
}));
}
}
此時(shí)我們的自定義方法的結(jié)構(gòu)出來(lái)了杠巡,兩個(gè)參數(shù):netty的HttpRequest
和自己封裝的HttpServerResponse
量窘,一個(gè)返回結(jié)果:Publisher<Void>
可以用JDK的BiFunction代表方法的抽象
BiFunction<? super HttpRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler
我們把一個(gè)映射和方法的對(duì)應(yīng)用實(shí)體描述一下:
@AllArgsConstructor
static final class HttpRouteHandler {
private String path; // 路徑
private BiFunction<? super HttpRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler; // 方法
public Publisher<Void> apply(HttpRequest request,
HttpServerResponse response) { // 執(zhí)行方法
return handler.apply(request, response);
}
public boolean test(HttpRequest request) { // 是否是某個(gè)請(qǐng)求
return request.uri().equals(path);
}
}
再用一個(gè)集合存儲(chǔ)所有path->方法的映射
public class HttpServerRoutes {
private List<HttpRouteHandler> handlers = new ArrayList<>(); // 映射集合
// 添加get請(qǐng)求path和方法映射
public HttpServerRoutes get(String path,
BiFunction<? super HttpRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) {
handlers.add(new HttpRouteHandler(path, handler));
return this;
}
// 選擇路由對(duì)應(yīng)的處理方法執(zhí)行
public Publisher<Void> apply(HttpRequest request, HttpServerResponse response) {
for (HttpRouteHandler handler : handlers) {
if (handler.test(request)) { // 路徑對(duì)應(yīng)上
return handler.apply(request, response); // 執(zhí)行
}
}
return Mono.empty();
}
}
最終
最后就是我們的HttpServer構(gòu)建器,要可以配置路由忽孽,并再請(qǐng)求到達(dá)時(shí)執(zhí)行路由的方法绑改,完整代碼如下
public class HttpServer {
ServerBootstrap bootstrap; // netty服務(wù)構(gòu)造器
ChannelFuture closeFuture; // 通道的關(guān)閉的Future
HttpServerRoutes handler; // 路由
public static HttpServer create() {
return new HttpServer();
}
/**
* 初始化,開始創(chuàng)建netty服務(wù)端構(gòu)造器
*/
public HttpServer() {
bootstrap = new ServerBootstrap();
ThreadFactory threadFactory = new ReactorNettyThreadFactory();
bootstrap.group(new NioEventLoopGroup(1, threadFactory), new NioEventLoopGroup(threadFactory))
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_REUSEADDR, true)
.childHandler(new ChannelInitializer<SocketChannel>() {//創(chuàng)建通道初始化對(duì)象兄一,設(shè)置初始化參數(shù)
@Override
protected void initChannel(SocketChannel ch) { // 用一個(gè)簡(jiǎn)單的時(shí)間處理器厘线,單純打印
ch.pipeline().addLast(new HttpRequestDecoder(), new HttpResponseEncoder(), new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof DefaultHttpRequest) {
DefaultHttpRequest request = (DefaultHttpRequest) msg; // 請(qǐng)求
HttpServerResponse response = new HttpServerResponse(ctx); // 響應(yīng)
handler.apply(request, response) // 執(zhí)行方法
.subscribe(new ChannelDisposeSubscriber(ctx)); // 訂閱
}
}
});
}
});
}
public HttpServer port(int port) {
bootstrap.localAddress(new InetSocketAddress(port));
return this;
}
/**
* 設(shè)置路由
* @return
*/
public HttpServer route(Consumer<? super HttpServerRoutes> routesBuilder) {
handler = new HttpServerRoutes();
routesBuilder.accept(handler);
return this;
}
public HttpServer bindNow() {
closeFuture = bootstrap.bind().channel().closeFuture();
return this;
}
public Mono<Void> onDispose() {
return Mono.create(sink->{
closeFuture.addListener((ChannelFutureListener) future -> sink.success());
});
}
}
其中handler.apply方法完成了訂閱操作,訂閱的就是響應(yīng)已寫回客戶端的事件出革,所以對(duì)應(yīng)的處理就是關(guān)閉客戶端通道
@AllArgsConstructor
public class ChannelDisposeSubscriber implements Subscriber<Void> {
private ChannelHandlerContext ctx;
@Override
public void onComplete() {
ctx.close(); // 寫回響應(yīng)數(shù)據(jù)后關(guān)閉通道
}
}
到此一個(gè)基于基于Netty的http服務(wù)就寫完了造壮,可以接受響應(yīng)式的返回結(jié)果,使用如下
public static void main(String[] args) {
HttpServer httpServer = HttpServer.create()
.port(7893)
.route(routes -> routes
.get("/hello",
(request, response) -> response.sendString(Mono.just("Hello World"))
).get("/hello2",
(request, response) -> response.send(Mono.just(Unpooled.copiedBuffer("Hello World2", CharsetUtil.UTF_8)))
).get("/hello3",
(request, response) -> response.sendString(Mono.create(sink->{
try {Thread.sleep(1000);} catch (InterruptedException e) {}
sink.success("Hello World3");
}))
)
)
.bindNow();
httpServer.onDispose().block();
}
測(cè)試結(jié)果如下
小結(jié)
不得不說(shuō)骂束,初次使用Reactor寫功能耳璧,跟原命令行寫法的思維差異真的很大,總結(jié)如下
- 服務(wù)維護(hù)一個(gè)path至方法的映射
- 請(qǐng)求到達(dá)執(zhí)行對(duì)應(yīng)方法展箱,反回的是一個(gè)發(fā)布者旨枯,發(fā)布的事件是請(qǐng)求處理結(jié)束
- 執(zhí)行方法后得到返回的發(fā)布者后立即訂閱,訂閱的處理是關(guān)閉連接
- 方法內(nèi)部通過(guò)執(zhí)行
response.send
方法可以給執(zhí)行結(jié)果發(fā)布者(類似Mono和Flux)添加一個(gè)把結(jié)果發(fā)送到客戶端的處理過(guò)程
個(gè)人認(rèn)為response.send
也應(yīng)該封裝進(jìn)框架中混驰,而不是讓用戶自己寫攀隔,因?yàn)槲覀儗懸粋€(gè)接口一定是要有返回值的,就像如果使用的是WebFlux栖榨,一般請(qǐng)求是不需要管response的昆汹,方法直接返回Mono就可以了