響應(yīng)式編程之手寫Reactor-Netty

前言

從使用到源碼,研究了很久WebFlux及Reactor

響應(yīng)式編程之Reactor

響應(yīng)式編程之Reactive streams

響應(yīng)式編程之手寫Reactor

響應(yīng)式編程之WebFlux

響應(yīng)式編程之Reactor-Netty

今天準(zhǔn)備整合一下知識(shí)浑度,自己寫出一個(gè)類似Reactor-Netty的框架千埃,可以練習(xí)一下Reactor的使用,同時(shí)回顧一下netty的知識(shí)

原材料即ReactorNetty

最終實(shí)現(xiàn)如下的效果即可,既可以像Reactor-Netty一樣寫一個(gè)接口,并支持響應(yīng)式返回夯膀,底層使用Netty進(jìn)行網(wǎng)絡(luò)通訊

DisposableServer server = HttpServer.create().port(7892) // 綁定端口
        .route( // 路由
                routes -> routes.get("/hello", (request, response) ->
                        response.sendString(Mono.just("Hello World"))
                ).get("/hello2", (request, response) ->
                        response.sendString(Mono.just("Hello World2"))
                )
        )
        .bindNow();
server.onDispose().block();

此時(shí)訪問(wèn)端口7892的"/hello"路徑就會(huì)返回“Hello World”

依賴

要實(shí)現(xiàn)出這樣的效果,首先就是要引入兩個(gè)依賴Reactor苍蔬,Netty

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.3.8.RELEASE</version>
</dependency>
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.51.Final</version>
</dependency>

netty服務(wù)

然后思路也并不復(fù)雜棍郎,不過(guò)就是定義一個(gè)類:HttpServer,然后create方法時(shí)啟動(dòng)一個(gè)Netty服務(wù)端即可银室,嘗試一下如下

public class HttpServerV1 {

    ServerBootstrap bootstrap; // netty服務(wù)構(gòu)造器

    public static HttpServerV1 create() {// 靜態(tài)創(chuàng)建
        return new HttpServerV1();
    }

    public HttpServerV1() { // 初始化涂佃,開始創(chuàng)建netty服務(wù)端構(gòu)造器
        bootstrap = new ServerBootstrap();
        bootstrap.group(new NioEventLoopGroup(1), new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_REUSEADDR, true)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) { // 用一個(gè)簡(jiǎn)單的時(shí)間處理器励翼,單純打印
                        ch.pipeline().addLast(new HttpRequestDecoder(), new HttpResponseEncoder(), new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                                if (msg instanceof DefaultHttpRequest) {
                                    DefaultHttpRequest request = (DefaultHttpRequest) msg; // 請(qǐng)求信息
                                    ByteBuf result = Unpooled.copiedBuffer("Hello World: " + request.uri(), CharsetUtil.UTF_8);
                                    DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, result);
                                    ctx.writeAndFlush(response); // 返回
                                    ctx.channel().close(); // 關(guān)閉連接
                                }
                            }
                        });
                    }
                });
    }

    public HttpServerV1 port(int port) { // 設(shè)置端口
        bootstrap.localAddress(new InetSocketAddress(port));
        return this;
    }

    public HttpServerV1 bindNow() { // 開始綁定端口
        bootstrap.bind();
        return this;
    }
}

有了netty很簡(jiǎn)單就寫完了,一個(gè)簡(jiǎn)單的web接口:請(qǐng)求后返回“hello world”+ 請(qǐng)求路徑辜荠,使用如下

public static void main(String[] args) {
    HttpServerV1.create().port(7893).bindNow();
}

此時(shí)瀏覽器訪問(wèn)7893端口汽抚,輸出“Hello world”+ 請(qǐng)求路徑

Hello world

守護(hù)線程&阻塞

此時(shí)再回頭看reactor-netty的使用例子,有一句server.onDispose().block()伯病,意思是阻塞至通道服務(wù)關(guān)閉造烁,如果去掉block()方法則運(yùn)行的服務(wù)很快結(jié)束了

去掉block()
程序直接結(jié)束

這里我當(dāng)時(shí)比較奇怪,為什么我寫的HttpServer會(huì)一直運(yùn)行不需要寫什么阻塞

調(diào)查了一下午笛,發(fā)現(xiàn)原來(lái)reactor-netty創(chuàng)建的NioEventLoop都是守護(hù)線程惭蟋,所以main線程如果結(jié)束后netty就停止了,至于為什么是守護(hù)線程药磺,可能是因?yàn)闉榱嘶厥召Y源吧

總之不管因?yàn)槭裁锤孀椋乙策@么干吧,先建一個(gè)線程工廠癌佩,生產(chǎn)的線程都是守護(hù)線程

public class ReactorNettyThreadFactory implements ThreadFactory {
    AtomicInteger threadNo = new AtomicInteger(0);
    @Override
    public Thread newThread(Runnable r) {
        Thread thread = new Thread(r, "reactor-nio-" + (threadNo.incrementAndGet()));
        thread.setDaemon(true); // 守護(hù)線程
        return thread;
    }
}

此時(shí)Netty服務(wù)初始化代碼變?yōu)?/p>

 ThreadFactory threadFactory = new ReactorNettyThreadFactory();
 bootstrap
    .group(new NioEventLoopGroup(1, threadFactory), new NioEventLoopGroup(threadFactory)

這是所有的EventLoop的線程都是守護(hù)線程木缝,如果main方法執(zhí)行完畢程序就結(jié)束了,這樣肯定不行围辙,所以main方法中一定要加上阻塞才能讓服務(wù)一直運(yùn)行

阻塞到什么時(shí)候吶我碟,我們是web服務(wù)程序,應(yīng)該阻塞到服務(wù)通道關(guān)閉姚建,而剛好Netty的bind()方法可以獲取到channel關(guān)閉的Future矫俺,此時(shí)bindNow方法變?yōu)槿缦?/p>

private ChannelFuture closeFuture; // 通道的關(guān)閉的Future
public HttpServer bindNow() {
    closeFuture = bootstrap.bind().channel().closeFuture();
    return this;
}

main方法如何阻塞到channel關(guān)閉吶,一個(gè)closeFuture.sync()其實(shí)就可以掸冤,但我們使用Reactor厘托,當(dāng)然要發(fā)揮Reactor的優(yōu)勢(shì),因?yàn)槲覀兛赡苓€會(huì)在close事件發(fā)生時(shí)訂閱一些操作贩虾,所以我們把closeFuture轉(zhuǎn)換為Reactor的Mono發(fā)布者,發(fā)布得就是通道關(guān)閉事件沥阱,取名為onDispose缎罢,即服務(wù)關(guān)閉的發(fā)布者

public Mono<Void> onDispose() { // 這里源碼實(shí)現(xiàn)更復(fù)雜,簡(jiǎn)化一下
    return Mono.create(sink->{
        closeFuture.addListener((ChannelFutureListener) future -> sink.success());
    });
}

此時(shí)回到使用考杉,使用代碼如下:

public static void main(String[] args) {
    HttpServer httpServer = HttpServer.create()
            .port(7893)
            .bindNow();
    httpServer.onDispose().block();
}

感覺(jué)上就和reactor-netty的使用很像了策精,如果不block(),程序立馬結(jié)束

但此時(shí)我們的web服務(wù)只有一個(gè)崇棠,無(wú)法根據(jù)路徑走不同的方法咽袜,所以下一步:加路由

路由

路由也好理解,就是一個(gè)path到方法的映射map枕稀,先對(duì)照reactor-netty學(xué)一下我們的方法應(yīng)該是如何抽象

首先有兩個(gè)參數(shù):request(用于獲取請(qǐng)求的參數(shù))询刹,response(用于寫回響應(yīng))

request簡(jiǎn)單一點(diǎn)直接用netty的DefaultHttpRequest

但response可不簡(jiǎn)單谜嫉,它有一個(gè)send方法用于寫回?cái)?shù)據(jù),它接受的參數(shù)是一個(gè)Publisher凹联,所以這個(gè)方法的作用是在Publisher發(fā)布時(shí)能寫回?cái)?shù)據(jù)至客戶端channel沐兰,所以send方法本質(zhì)是訂閱一個(gè)程序數(shù)據(jù)準(zhǔn)備好后,發(fā)布數(shù)據(jù)至客戶端的步驟蔽挠,由于writeAndFlush也是異步操作住闯,所以要再返回一個(gè)Publisher發(fā)布寫完事件,以便后續(xù)關(guān)閉通道的相關(guān)處理澳淑,由于這個(gè)發(fā)布者只是事件沒(méi)有數(shù)據(jù)所以是Void比原,整個(gè)過(guò)程使用flatMap即可實(shí)現(xiàn),如下

public class HttpServerResponse {

    private ChannelHandlerContext ctx;

    public HttpServerResponse(ChannelHandlerContext ctx) {
        this.ctx = ctx;
    }

    public Mono<Void> sendString(Mono<String> publisher) {
        return send(publisher.flatMap(content-> Mono.just(Unpooled.copiedBuffer(content, CharsetUtil.UTF_8))));
    }

    public Mono<Void> send(Mono<ByteBuf> publisher) {
        return publisher.flatMap(content-> Mono.create(sink-> {
            ChannelFuture channelFuture = ctx.writeAndFlush(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content));
            channelFuture.addListener(future -> {
                sink.success();
            });
        }));
    }
}

此時(shí)我們的自定義方法的結(jié)構(gòu)出來(lái)了杠巡,兩個(gè)參數(shù):netty的HttpRequest和自己封裝的HttpServerResponse量窘,一個(gè)返回結(jié)果:Publisher<Void>

可以用JDK的BiFunction代表方法的抽象

BiFunction<? super HttpRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler

我們把一個(gè)映射和方法的對(duì)應(yīng)用實(shí)體描述一下:

@AllArgsConstructor
static final class HttpRouteHandler {
    private String path; // 路徑
    private BiFunction<? super HttpRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler; // 方法

    public Publisher<Void> apply(HttpRequest request,
                                 HttpServerResponse response) { // 執(zhí)行方法
        return handler.apply(request, response);
    }

    public boolean test(HttpRequest request) { // 是否是某個(gè)請(qǐng)求
        return request.uri().equals(path);
    }
}

再用一個(gè)集合存儲(chǔ)所有path->方法的映射

public class HttpServerRoutes {

    private List<HttpRouteHandler> handlers = new ArrayList<>(); // 映射集合

    // 添加get請(qǐng)求path和方法映射
    public HttpServerRoutes get(String path,
                                BiFunction<? super HttpRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) {
        handlers.add(new HttpRouteHandler(path, handler));
        return this;
    }

    // 選擇路由對(duì)應(yīng)的處理方法執(zhí)行
    public Publisher<Void> apply(HttpRequest request, HttpServerResponse response) {
        for (HttpRouteHandler handler : handlers) {
            if (handler.test(request)) { // 路徑對(duì)應(yīng)上
                return handler.apply(request, response); // 執(zhí)行
            }
        }
        return Mono.empty();
    }

}

最終

最后就是我們的HttpServer構(gòu)建器,要可以配置路由忽孽,并再請(qǐng)求到達(dá)時(shí)執(zhí)行路由的方法绑改,完整代碼如下

public class HttpServer {

    ServerBootstrap bootstrap; // netty服務(wù)構(gòu)造器

    ChannelFuture closeFuture; // 通道的關(guān)閉的Future

    HttpServerRoutes handler; // 路由

    public static HttpServer create() {
        return new HttpServer();
    }

    /**
     * 初始化,開始創(chuàng)建netty服務(wù)端構(gòu)造器
     */
    public HttpServer() {
        bootstrap = new ServerBootstrap();
        ThreadFactory threadFactory = new ReactorNettyThreadFactory();
        bootstrap.group(new NioEventLoopGroup(1, threadFactory), new NioEventLoopGroup(threadFactory))
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_REUSEADDR, true)
                .childHandler(new ChannelInitializer<SocketChannel>() {//創(chuàng)建通道初始化對(duì)象兄一,設(shè)置初始化參數(shù)
                    @Override
                    protected void initChannel(SocketChannel ch) { // 用一個(gè)簡(jiǎn)單的時(shí)間處理器厘线,單純打印
                        ch.pipeline().addLast(new HttpRequestDecoder(), new HttpResponseEncoder(), new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                                if (msg instanceof DefaultHttpRequest) {
                                    DefaultHttpRequest request = (DefaultHttpRequest) msg; // 請(qǐng)求
                                    HttpServerResponse response = new HttpServerResponse(ctx); // 響應(yīng)
                                    handler.apply(request, response) // 執(zhí)行方法
                                    .subscribe(new ChannelDisposeSubscriber(ctx)); // 訂閱
                                }
                            }
                        });
                    }
                });
    }

    public HttpServer port(int port) {
        bootstrap.localAddress(new InetSocketAddress(port));
        return this;
    }


    /**
     * 設(shè)置路由
     * @return
     */
    public HttpServer route(Consumer<? super HttpServerRoutes> routesBuilder) {
        handler = new HttpServerRoutes();
        routesBuilder.accept(handler);
        return this;
    }

    public HttpServer bindNow() {
        closeFuture = bootstrap.bind().channel().closeFuture();
        return this;
    }

    public Mono<Void> onDispose() {
        return Mono.create(sink->{
            closeFuture.addListener((ChannelFutureListener) future -> sink.success());
        });
    }
}

其中handler.apply方法完成了訂閱操作,訂閱的就是響應(yīng)已寫回客戶端的事件出革,所以對(duì)應(yīng)的處理就是關(guān)閉客戶端通道

@AllArgsConstructor
public class ChannelDisposeSubscriber implements Subscriber<Void> {

    private ChannelHandlerContext ctx;

    @Override
    public void onComplete() {
        ctx.close(); // 寫回響應(yīng)數(shù)據(jù)后關(guān)閉通道
    }
}

到此一個(gè)基于基于Netty的http服務(wù)就寫完了造壮,可以接受響應(yīng)式的返回結(jié)果,使用如下

public static void main(String[] args) {
    HttpServer httpServer = HttpServer.create()
            .port(7893)
            .route(routes -> routes
                    .get("/hello",
                            (request, response) -> response.sendString(Mono.just("Hello World"))
                    ).get("/hello2",
                            (request, response) -> response.send(Mono.just(Unpooled.copiedBuffer("Hello World2", CharsetUtil.UTF_8)))
                    ).get("/hello3",
                            (request, response) -> response.sendString(Mono.create(sink->{
                                try {Thread.sleep(1000);} catch (InterruptedException e) {}
                                sink.success("Hello World3");
                            }))
                    )
            )
            .bindNow();
    httpServer.onDispose().block();
}

測(cè)試結(jié)果如下

測(cè)試

小結(jié)

不得不說(shuō)骂束,初次使用Reactor寫功能耳璧,跟原命令行寫法的思維差異真的很大,總結(jié)如下

  • 服務(wù)維護(hù)一個(gè)path至方法的映射
  • 請(qǐng)求到達(dá)執(zhí)行對(duì)應(yīng)方法展箱,反回的是一個(gè)發(fā)布者旨枯,發(fā)布的事件是請(qǐng)求處理結(jié)束
  • 執(zhí)行方法后得到返回的發(fā)布者后立即訂閱,訂閱的處理是關(guān)閉連接
  • 方法內(nèi)部通過(guò)執(zhí)行response.send方法可以給執(zhí)行結(jié)果發(fā)布者(類似Mono和Flux)添加一個(gè)把結(jié)果發(fā)送到客戶端的處理過(guò)程

個(gè)人認(rèn)為response.send也應(yīng)該封裝進(jìn)框架中混驰,而不是讓用戶自己寫攀隔,因?yàn)槲覀儗懸粋€(gè)接口一定是要有返回值的,就像如果使用的是WebFlux栖榨,一般請(qǐng)求是不需要管response的昆汹,方法直接返回Mono就可以了

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市婴栽,隨后出現(xiàn)的幾起案子满粗,更是在濱河造成了極大的恐慌,老刑警劉巖愚争,帶你破解...
    沈念sama閱讀 218,451評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件映皆,死亡現(xiàn)場(chǎng)離奇詭異挤聘,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)劫扒,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,172評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門檬洞,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人沟饥,你說(shuō)我怎么就攤上這事添怔。” “怎么了贤旷?”我有些...
    開封第一講書人閱讀 164,782評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵广料,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我幼驶,道長(zhǎng)艾杏,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,709評(píng)論 1 294
  • 正文 為了忘掉前任盅藻,我火速辦了婚禮购桑,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘氏淑。我一直安慰自己勃蜘,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,733評(píng)論 6 392
  • 文/花漫 我一把揭開白布假残。 她就那樣靜靜地躺著缭贡,像睡著了一般。 火紅的嫁衣襯著肌膚如雪辉懒。 梳的紋絲不亂的頭發(fā)上阳惹,一...
    開封第一講書人閱讀 51,578評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音眶俩,去河邊找鬼莹汤。 笑死,一個(gè)胖子當(dāng)著我的面吹牛颠印,可吹牛的內(nèi)容都是我干的纲岭。 我是一名探鬼主播,決...
    沈念sama閱讀 40,320評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼嗽仪,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼荒勇!你這毒婦竟也來(lái)了柒莉?” 一聲冷哼從身側(cè)響起闻坚,我...
    開封第一講書人閱讀 39,241評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎兢孝,沒(méi)想到半個(gè)月后窿凤,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體仅偎,經(jīng)...
    沈念sama閱讀 45,686評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,878評(píng)論 3 336
  • 正文 我和宋清朗相戀三年雳殊,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了橘沥。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,992評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡夯秃,死狀恐怖座咆,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情仓洼,我是刑警寧澤介陶,帶...
    沈念sama閱讀 35,715評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站色建,受9級(jí)特大地震影響哺呜,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜箕戳,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,336評(píng)論 3 330
  • 文/蒙蒙 一某残、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧陵吸,春花似錦玻墅、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,912評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至旨指,卻和暖如春赏酥,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背谆构。 一陣腳步聲響...
    開封第一講書人閱讀 33,040評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工裸扶, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人搬素。 一個(gè)月前我還...
    沈念sama閱讀 48,173評(píng)論 3 370
  • 正文 我出身青樓呵晨,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親熬尺。 傳聞我的和親對(duì)象是個(gè)殘疾皇子摸屠,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,947評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容