Netty基礎(chǔ)(未完待續(xù))

不多BB,我們開(kāi)門(mén)見(jiàn)山
Netty是一個(gè)提供了易于使用的API的NIO框架,具有高并發(fā)互订、高性能的特點(diǎn)

Netty的Reactor線(xiàn)程模型

https://blog.csdn.net/bingxuesiyang/article/details/89888664

單線(xiàn)程模型

所有的IO操作及業(yè)務(wù)處理都是由一個(gè)NIO線(xiàn)程處理唆缴。

Reactor內(nèi)部通過(guò)selector 監(jiān)控連接事件碳却,收到事件后通過(guò)dispatch進(jìn)行分發(fā),如果是連接建立的事件奈嘿,則由Acceptor處理,Acceptor通過(guò)accept接受連接旺韭,并創(chuàng)建一個(gè)Handler來(lái)處理連接后續(xù)的各種事件氛谜,如果是讀寫(xiě)事件,直接調(diào)用連接對(duì)應(yīng)的Handler來(lái)處理区端。
Handler完成read->業(yè)務(wù)處理(decode->compute->encode)->send的全部流程。
這種模型好處是簡(jiǎn)單澳腹,壞處卻很明顯织盼,當(dāng)某個(gè)Handler阻塞時(shí),會(huì)導(dǎo)致其他客戶(hù)端的handler和accpetor都得不到執(zhí)行酱塔,無(wú)法做到高性能沥邻,只適用于業(yè)務(wù)處理非常快速的場(chǎng)景羊娃。

缺點(diǎn):?jiǎn)尉€(xiàn)程N(yùn)IO負(fù)載過(guò)重唐全,并發(fā)高時(shí)產(chǎn)生任務(wù)堆積,延遲過(guò)高蕊玷,不適合并發(fā)高的場(chǎng)景

多線(xiàn)程模型

由一個(gè)NIO線(xiàn)程處理客戶(hù)端連接和讀寫(xiě)邮利,由一組線(xiàn)程池處理業(yè)務(wù)

主線(xiàn)程中,Reactor對(duì)象通過(guò)selector監(jiān)控連接事件垃帅,收到事件后通過(guò)dispatch進(jìn)行分發(fā)延届,如果是連接建立事件,則由Acceptor處理贸诚,Acceptor通過(guò)accept接收連接方庭,并創(chuàng)建一個(gè)Handler來(lái)處理后續(xù)事件,而Handler只負(fù)責(zé)響應(yīng)事件酱固,不進(jìn)行業(yè)務(wù)操作械念,也就是只進(jìn)行read讀取數(shù)據(jù)和write寫(xiě)出數(shù)據(jù),業(yè)務(wù)處理交給一組線(xiàn)程池進(jìn)行處理运悲。
線(xiàn)程池分配一個(gè)線(xiàn)程完成真正的業(yè)務(wù)處理龄减,然后將響應(yīng)結(jié)果交給主進(jìn)程的Handler處理,Handler將結(jié)果send給client扇苞。
單Reactor承當(dāng)所有事件的監(jiān)聽(tīng)和響應(yīng)欺殿,而當(dāng)我們的服務(wù)端遇到大量的客戶(hù)端同時(shí)進(jìn)行連接,或者在請(qǐng)求連接時(shí)執(zhí)行一些耗時(shí)操作鳖敷,比如身份認(rèn)證脖苏,權(quán)限檢查等,這種瞬時(shí)的高并發(fā)就容易成為性能瓶頸定踱。

缺點(diǎn):連接處理能力有限棍潘,客戶(hù)連接并發(fā)高時(shí),會(huì)產(chǎn)生連接延遲過(guò)高

主從線(xiàn)程模型

一個(gè)NIO線(xiàn)程處理連接,一組NIO線(xiàn)程池處理IO讀寫(xiě)操作亦歉,一組線(xiàn)程池處理業(yè)務(wù)操作

存在多個(gè)Reactor恤浪,每個(gè)Reactor都有自己的selector選擇器,線(xiàn)程和dispatch肴楷。
主線(xiàn)程中的mainReactor通過(guò)自己的selector監(jiān)控連接建立事件水由,收到事件后通過(guò)Accpetor接收,將新的連接分配給某個(gè)子線(xiàn)程,子線(xiàn)程的個(gè)數(shù)和cpu的數(shù)量一致赛蔫,主線(xiàn)程通過(guò)輪詢(xún)的方式分發(fā)連接事件砂客。
子線(xiàn)程中的subReactor將mainReactor分配的連接加入連接隊(duì)列中通過(guò)自己的selector進(jìn)行監(jiān)聽(tīng),并創(chuàng)建一個(gè)Handler用于處理后續(xù)事件呵恢,而Handler只負(fù)責(zé)響應(yīng)事件鞠值,不進(jìn)行業(yè)務(wù)操作,也就是只進(jìn)行read讀取數(shù)據(jù)和write寫(xiě)出數(shù)據(jù)渗钉,業(yè)務(wù)處理交給一組線(xiàn)程池進(jìn)行處理彤恶。
線(xiàn)程池分配一個(gè)線(xiàn)程完成真正的業(yè)務(wù)處理,然后將響應(yīng)結(jié)果交給主進(jìn)程的Handler處理鳄橘,Handler將結(jié)果send給clien

缺點(diǎn):無(wú)

Netty的核心概念

BossEventLoopGroup 和 WorkerEventLoopGroup 包含一個(gè)或者多個(gè) NioEventLoop声离。BossEventLoopGroup 負(fù)責(zé)監(jiān)聽(tīng)客戶(hù)端的 Accept 事件,當(dāng)事件觸發(fā)時(shí)挥唠,將事件注冊(cè)至 WorkerEventLoopGroup 中的一個(gè) NioEventLoop 上抵恋。每新建一個(gè) Channel, 只選擇一個(gè) NioEventLoop 與其綁定宝磨。所以說(shuō) Channel 生命周期的所有事件處理都是線(xiàn)程獨(dú)立的弧关,不同的 NioEventLoop 線(xiàn)程之間不會(huì)發(fā)生任何交集。

NioEventLoop 完成數(shù)據(jù)讀取后唤锉,會(huì)調(diào)用綁定的 ChannelPipeline 進(jìn)行事件傳播世囊,ChannelPipeline 也是線(xiàn)程安全的,數(shù)據(jù)會(huì)被傳遞到 ChannelPipeline 的第一個(gè) ChannelHandler 中窿祥。一個(gè)Channel包含一個(gè)ChannelPipeline株憾,創(chuàng)建Channel時(shí)會(huì)自動(dòng)創(chuàng)建一個(gè)ChannelPipeline,每個(gè)Channel都有一個(gè)管理它的pipeline,而一個(gè)Channel綁定一個(gè)NioEventLoop晒衩,所以一個(gè)pipeline也是綁定一個(gè)NioEventLoop嗤瞎,這關(guān)聯(lián)是永久性的,而pipeline是多個(gè)ChannelHandler的容器听系,一個(gè)pipeline中的ChannelHandler是由EventLoop串行化執(zhí)行贝奇,數(shù)據(jù)處理完成后,將加工完成的數(shù)據(jù)再傳遞給下一個(gè) ChannelHandler靠胜,整個(gè)過(guò)程是串行化執(zhí)行掉瞳,不會(huì)發(fā)生線(xiàn)程上下文切換的問(wèn)題毕源。

EventLoop和EventLoopGroup

當(dāng)系統(tǒng)在運(yùn)行過(guò)程中,如果頻繁的進(jìn)行線(xiàn)程上下文切換陕习,會(huì)帶來(lái)額外的性能損耗霎褐。多線(xiàn)程并發(fā)執(zhí)行某個(gè)業(yè)務(wù)流程,業(yè)務(wù)開(kāi)發(fā)者還需要時(shí)刻對(duì)線(xiàn)程安全保持警惕该镣,哪些數(shù)據(jù)可能會(huì)被并發(fā)修改冻璃,如何保護(hù)?這不僅降低了開(kāi)發(fā)效率损合,也會(huì)帶來(lái)額外的性能損耗俱饿。
為了解決上述問(wèn)題,Netty采用了串行化設(shè)計(jì)理念塌忽,從消息的讀取、編碼以及后續(xù) ChannelHandler 的執(zhí)行失驶,始終都由 IO 線(xiàn)程 EventLoop 負(fù)責(zé)土居,這就意外著整個(gè)流程不會(huì)進(jìn)行線(xiàn)程上下文的切換,數(shù)據(jù)也不會(huì)面臨被并發(fā)修改的風(fēng)險(xiǎn)嬉探。

EventLoopGroup負(fù)責(zé)為每個(gè)新創(chuàng)建的Channel分配一個(gè)EventLoop,目前使用輪詢(xún)算法分配擦耀。一個(gè)EventLoop可以被分配給多個(gè)Channel。
一個(gè)Channel的生命周期都使用相同的EventLoop處理事件涩堤,這意味著在只涉及一個(gè)Channel狀態(tài)變化的處理不用擔(dān)心其線(xiàn)程安全問(wèn)題眷蜓。

EventLoop:一個(gè)EventLoopGroup包含一個(gè)或者多個(gè)EventLoop,一個(gè)EventLoop在它的生命周期內(nèi)只和一個(gè)Thread綁定胎围,所有由EventLoop處理的I/O事件都將在它專(zhuān)有的Thread上被處理吁系,一個(gè)Channel在它的生命周期內(nèi)只注冊(cè)于一個(gè)EventLoop,一個(gè)EventLoop可能會(huì)被分配給一個(gè)或者多個(gè)Channel白魂。每當(dāng)事件發(fā)生時(shí)汽纤,應(yīng)用程序都會(huì)將產(chǎn)生的事件放入事件隊(duì)列當(dāng)中,然后 EventLoop 會(huì)從隊(duì)列中取出事件執(zhí)行

EventLoopGroup:是一組 EventLoop 的抽象福荸,一個(gè) EventLoopGroup 當(dāng)中會(huì)包含一個(gè)或多個(gè) EventLoop蕴坪,EventLoopGroup 提供 next 接口,可以從一組 EventLoop 里面按照一定規(guī)則獲取其中一個(gè) EventLoop 來(lái)處理任務(wù)敬锐。

在 Netty 服務(wù)器端編程中我們需要 BossEventLoopGroup 和WorkerEventLoopGroup 兩個(gè) EventLoopGroup 來(lái)進(jìn)行工作

BossEventLoopGroup:對(duì)應(yīng) Reactor 模式的 mainReactor 背传,用于服務(wù)端接受客戶(hù)端的連接。BossEventLoopGroup 通常是一個(gè)單線(xiàn)程的 EventLoop台夺,EventLoop 維護(hù)著一個(gè)注冊(cè)了 ServerSocketChannel 的 Selector 實(shí)例径玖,EventLoop 的實(shí)現(xiàn)涵蓋 IO 事件的分離,和分發(fā)(Dispatcher)谒养,EventLoop 的實(shí)現(xiàn)充當(dāng) Reactor 模式中的分發(fā)(Dispatcher)的角色挺狰。

WorkerEventLoopGroup: 對(duì)應(yīng) Reactor 模式的 subReactor 明郭,用于進(jìn)行 SocketChannel 的數(shù)據(jù)讀寫(xiě)。對(duì)于 EventLoopGroup 丰泊,如果未傳遞方法參數(shù) nThreads 薯定,表示使用 CPU 個(gè)數(shù) Reactor 。workerGroup會(huì)由 next 選擇其中一個(gè) EventLoop 來(lái)將這個(gè)SocketChannel 注冊(cè)到其維護(hù)的 Selector 并對(duì)其后續(xù)的 IO 事件進(jìn)行處理瞳购。

ChannelHandler和ChannelPipeline

ChannelHandler是一個(gè)接口话侄,專(zhuān)門(mén)處理 I/O 或攔截 I/O 操作,并將其轉(zhuǎn)發(fā)到其 ChannelPipeline(業(yè)務(wù)處理鏈)中的下一個(gè)處理程序学赛。ChannelPipeline是ChannelHandler的容器年堆,而每個(gè)Channel都會(huì)有一個(gè)ChannelPipeline,負(fù)責(zé)ChannelHandler的管理和事件攔截與調(diào)度盏浇。Channel會(huì)將事件扔到ChannelPipeline中变丧,然后事件會(huì)被ChannelPipeline安排一系列的ChannelHandler攔截處理,例如編解碼事件绢掰、TCP的粘包拆包事件痒蓬、用戶(hù)自定義Handler等,經(jīng)過(guò)一系列加工后滴劲,事件的消息會(huì)被添加緩沖區(qū)中等待Channel的刷新和發(fā)送攻晒。

事件出站(Outbound)和入站(Inbound)

  • Inbound事件:通常由I/O線(xiàn)程觸發(fā),例如TCP鏈路連接班挖、鏈路關(guān)閉鲁捏、讀事件、異常通知等萧芙,它們對(duì)應(yīng)下圖的左半部分给梅,這些方法被封裝在ChannelInboundHandler里面
  • Outbound事件:通常是用戶(hù)主動(dòng)發(fā)起的網(wǎng)絡(luò)IO操作,例如發(fā)起連接操作末购、綁定操作破喻、消息發(fā)送等,它們對(duì)應(yīng)下圖的右半部分盟榴,這些方法被封裝在ChannelOutboundHandler里面

ChannelPipeline為ChannelHandler鏈提供了一個(gè)容器并定義了用于沿著鏈傳播入站和出站事件流的API曹质,當(dāng)一個(gè)數(shù)據(jù)流進(jìn)入 ChannlePipeline 時(shí),它會(huì)從 ChannelPipeline 頭部開(kāi)始傳給第一個(gè) ChannelInboundHandler 擎场,當(dāng)?shù)谝粋€(gè)處理完后再傳給下一個(gè)羽德,一直傳遞到管道的尾部。
與之相對(duì)應(yīng)的是迅办,當(dāng)數(shù)據(jù)被寫(xiě)出時(shí)宅静,它會(huì)從管道的尾部開(kāi)始,先經(jīng)過(guò)管道尾部的 “最后” 一個(gè)ChannelOutboundHandler站欺,當(dāng)它處理完成后會(huì)傳遞給前一個(gè) ChannelOutboundHandler 姨夹。

事件出站

ChannelHandlerContext

保存 Channel 相關(guān)的所有上下文信息纤垂,同時(shí)關(guān)聯(lián)一個(gè) ChannelHandler 對(duì)象。I/O 事件由 ChannelInboundHandler 或 ChannelOutboundHandler 處理磷账,并通過(guò)調(diào)用 ChannelHandlerContext 中定義的事件傳播方法峭沦。

一個(gè) Channel 包含了一個(gè) ChannelPipeline,而 ChannelPipeline 中又維護(hù)了一個(gè)由 ChannelHandlerContext 組成的雙向鏈表逃糟,并且每個(gè) ChannelHandlerContext 中又關(guān)聯(lián)著一個(gè) ChannelHandler吼鱼。入站事件和出站事件在一個(gè)雙向鏈表中,入站事件會(huì)從鏈表 head 往后傳遞到最后一個(gè)入站的 handler绰咽,出站事件會(huì)從鏈表 tail 往前傳遞到最前一個(gè)出站的 handler菇肃,兩種類(lèi)型的 handler 互不干擾。

ServerBootstrap和Bootstrap

ServerBootstrap是服務(wù)端的啟動(dòng)助手取募,Bootstrap是客戶(hù)端的啟動(dòng)助手琐谤,它們的目的主要是降低開(kāi)發(fā)的復(fù)雜度

TCP的粘包和拆包

在TCP協(xié)議中一個(gè)完整的數(shù)據(jù)包可能會(huì)被TCP拆分為多個(gè)包發(fā)送,或者將多個(gè)小的數(shù)據(jù)包封裝成大的數(shù)據(jù)包發(fā)送玩敏,這就會(huì)發(fā)生TCP的粘包和拆包的問(wèn)題笑跛。
產(chǎn)生原因:

  • 要發(fā)送的數(shù)據(jù)大于TCP發(fā)送緩沖區(qū)剩余空間大小,將會(huì)發(fā)生拆包聊品。
  • 待發(fā)送數(shù)據(jù)大于MSS(最大報(bào)文長(zhǎng)度),TCP在傳輸前將進(jìn)行拆包几苍。
  • 要發(fā)送的數(shù)據(jù)小于TCP發(fā)送緩沖區(qū)的大小翻屈,TCP將多次寫(xiě)入緩沖區(qū)的數(shù)據(jù)一次發(fā)送出去,將會(huì)發(fā)生粘包妻坝。
  • 接收數(shù)據(jù)端的應(yīng)用層沒(méi)有及時(shí)讀取接收緩沖區(qū)中的數(shù)據(jù)伸眶,將發(fā)生粘包。
TCP的粘包和拆包的解決方法

粘包問(wèn)題解決

  • FixedLengthFrameDecoder:定長(zhǎng)協(xié)議解碼器刽宪,我們可以指定固定的字節(jié)數(shù)算一個(gè)完整的報(bào)文
  • LineBasedFrameDecoder:行分隔符解碼器厘贼,遇到\n或者\(yùn)r\n,則認(rèn)為是一個(gè)完整的報(bào)文
  • DelimiterBasedFrameDecoder:分隔符解碼器圣拄,與LineBasedFrameDecoder類(lèi)似嘴秸,只不過(guò)分隔符可以自己指定
  • LengthFieldBasedFrameDecoder:長(zhǎng)度編碼解碼器,將報(bào)文劃分為報(bào)文頭/報(bào)文體庇谆,根據(jù)報(bào)文頭中的Length字段確定報(bào)文體的長(zhǎng)度岳掐,因此報(bào)文提的長(zhǎng)度是可變的
  • JsonObjectDecoder:json格式解碼器,當(dāng)檢測(cè)到匹配數(shù)量的"{" 饭耳、”}”或”[””]”時(shí),則認(rèn)為是一個(gè)完整的json對(duì)象或者json數(shù)組。

Netty高性能的原因

  • IO 線(xiàn)程模型:同步非阻塞款青,用最少的資源做更多的事。
  • 內(nèi)存零拷貝:盡量減少不必要的內(nèi)存拷貝衰腌,實(shí)現(xiàn)了更高效率的傳輸。
  • 內(nèi)存池設(shè)計(jì):申請(qǐng)的內(nèi)存可以重用觅赊,主要指直接內(nèi)存右蕊。內(nèi)部實(shí)現(xiàn)是用一顆二叉查找樹(shù)管理內(nèi)存分配情況。
  • 串形化處理讀寫(xiě):避免使用鎖帶來(lái)的性能開(kāi)銷(xiāo)茉兰。
  • 高性能序列化協(xié)議:支持 protobuf 等高性能序列化協(xié)議尤泽。

Netty的序列化協(xié)議:

  • XML,優(yōu)點(diǎn):人機(jī)可讀性好规脸,可指定元素或特性的名稱(chēng)坯约。缺點(diǎn):序列化數(shù)據(jù)只包含數(shù)據(jù)本身以及類(lèi)的結(jié)構(gòu),不包括類(lèi)型標(biāo)識(shí)和程序集信息莫鸭;只能序列化公共屬性和字段闹丐;不能序列化方法;文件龐大被因,文件格式復(fù)雜卿拴,傳輸占帶寬。適用場(chǎng)景:當(dāng)做配置文件存儲(chǔ)數(shù)據(jù)梨与,實(shí)時(shí)數(shù)據(jù)轉(zhuǎn)換堕花。

  • JSON,是一種輕量級(jí)的數(shù)據(jù)交換格式粥鞋,優(yōu)點(diǎn):兼容性高缘挽、數(shù)據(jù)格式比較簡(jiǎn)單,易于讀寫(xiě)呻粹、序列化后數(shù)據(jù)較小壕曼,可擴(kuò)展性好,兼容性好等浊、與XML相比腮郊,其協(xié)議比較簡(jiǎn)單,解析速度比較快筹燕。缺點(diǎn):數(shù)據(jù)的描述性比XML差轧飞、不適合性能要求為ms級(jí)別的情況、額外空間開(kāi)銷(xiāo)比較大撒踪。適用場(chǎng)景(可替代XML):跨防火墻訪問(wèn)踪少、可調(diào)式性要求高、基于Web browser的Ajax請(qǐng)求糠涛、傳輸數(shù)據(jù)量相對(duì)小援奢,實(shí)時(shí)性要求相對(duì)低(例如秒級(jí)別)的服務(wù)。

  • Fastjson忍捡,采用一種“假定有序快速匹配”的算法集漾。優(yōu)點(diǎn):接口簡(jiǎn)單易用切黔、目前java語(yǔ)言中最快的json庫(kù)。缺點(diǎn):過(guò)于注重快具篇,而偏離了“標(biāo)準(zhǔn)”及功能性纬霞、代碼質(zhì)量不高,文檔不全驱显。適用場(chǎng)景:協(xié)議交互诗芜、Web輸出、Android客戶(hù)端

  • Thrift埃疫,不僅是序列化協(xié)議伏恐,還是一個(gè)RPC框架。優(yōu)點(diǎn):序列化后的體積小, 速度快栓霜、支持多種語(yǔ)言和豐富的數(shù)據(jù)類(lèi)型翠桦、對(duì)于數(shù)據(jù)字段的增刪具有較強(qiáng)的兼容性、支持二進(jìn)制壓縮編碼胳蛮。缺點(diǎn):使用者較少销凑、跨防火墻訪問(wèn)時(shí),不安全仅炊、不具有可讀性斗幼,調(diào)試代碼時(shí)相對(duì)困難、不能與其他傳輸層協(xié)議共同使用(例如HTTP)抚垄、無(wú)法支持向持久層直接讀寫(xiě)數(shù)據(jù)孟岛,即不適合做數(shù)據(jù)持久化序列化協(xié)議。適用場(chǎng)景:分布式系統(tǒng)的RPC解決方案

  • Avro督勺,Hadoop的一個(gè)子項(xiàng)目,解決了JSON的冗長(zhǎng)和沒(méi)有IDL的問(wèn)題斤贰。優(yōu)點(diǎn):支持豐富的數(shù)據(jù)類(lèi)型智哀、簡(jiǎn)單的動(dòng)態(tài)語(yǔ)言結(jié)合功能、具有自我描述屬性荧恍、提高了數(shù)據(jù)解析速度瓷叫、快速可壓縮的二進(jìn)制數(shù)據(jù)形式、可以實(shí)現(xiàn)遠(yuǎn)程過(guò)程調(diào)用RPC送巡、支持跨編程語(yǔ)言實(shí)現(xiàn)摹菠。缺點(diǎn):對(duì)于習(xí)慣于靜態(tài)類(lèi)型語(yǔ)言的用戶(hù)不直觀。適用場(chǎng)景:在Hadoop中做Hive骗爆、Pig和MapReduce的持久化數(shù)據(jù)格式次氨。

  • Protobuf,將數(shù)據(jù)結(jié)構(gòu)以.proto文件進(jìn)行描述摘投,通過(guò)代碼生成工具可以生成對(duì)應(yīng)數(shù)據(jù)結(jié)構(gòu)的-POJO對(duì)象和Protobuf相關(guān)的方法和屬性煮寡。優(yōu)點(diǎn):序列化后碼流小虹蓄,性能高、結(jié)構(gòu)化數(shù)據(jù)存儲(chǔ)格式(XML JSON等)幸撕、通過(guò)標(biāo)識(shí)字段的順序薇组,可以實(shí)現(xiàn)協(xié)議的前向兼容、結(jié)構(gòu)化的文檔更容易管理和維護(hù)坐儿。缺點(diǎn):需要依賴(lài)于工具生成代碼律胀、支持的語(yǔ)言相對(duì)較少,官方只支持Java 貌矿、C++ 炭菌、python。適用場(chǎng)景:對(duì)性能要求高的RPC調(diào)用站叼、具有良好的跨防火墻的訪問(wèn)屬性娃兽、適合應(yīng)用層對(duì)象的持久化

  • protostuff 基于protobuf協(xié)議,但不需要配置proto文件尽楔,直接導(dǎo)包即可

  • Jboss marshaling 可以直接序列化java類(lèi)投储, 無(wú)須實(shí)java.io.Serializable接口

  • MessagePack 一個(gè)高效的二進(jìn)制序列化格式

  • Hessian 采用二進(jìn)制協(xié)議的輕量級(jí)remoting onhttp工具

  • kryo 基于protobuf協(xié)議,只支持java語(yǔ)言,需要注冊(cè)(Registration)阔馋,然后序列化(Output)玛荞,反序列化(Input)

Netty實(shí)戰(zhàn):1、利用Netty實(shí)現(xiàn)的聊天室 2呕寝、利用Netty實(shí)現(xiàn)Http協(xié)議 3勋眯、利用Netty實(shí)現(xiàn)Websocket協(xié)議

1、利用Netty實(shí)現(xiàn)的聊天室:

Sever端

public class ChatServer {
    private int port;

    public ChatServer(int port) {
        this.port = port;
    }

    public void start(){
        EventLoopGroup boss=new NioEventLoopGroup();
        EventLoopGroup works=new NioEventLoopGroup();
        try {
            ServerBootstrap boot=new ServerBootstrap();
            boot.group(boss,works)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel sc) throws Exception {
                            ChannelPipeline pipeline = sc.pipeline();
                            pipeline.addLast("line",new LineBasedFrameDecoder(1024));
                            pipeline.addLast("encode", new StringEncoder());
                            pipeline.addLast("decode", new StringDecoder());
                            pipeline.addLast(new ServerHander());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG,128)
                    .option(ChannelOption.SO_KEEPALIVE,true);
            ChannelFuture future = boot.bind(this.port).sync();
            System.out.println("服務(wù)已經(jīng)啟動(dòng)...............");
            future.channel().closeFuture().sync();
            System.out.println("服務(wù)已經(jīng)關(guān)閉...............");
        }catch (Exception e){

        }finally {
            boss.shutdownGracefully();
            works.shutdownGracefully();
        }

    }

    public static void main(String[] args) {
        new ChatServer(8888).start();
    }
}

SeverHander

public class ServerHander extends SimpleChannelInboundHandler<String> {
    public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    /**
     * 每當(dāng)從客戶(hù)端有消息寫(xiě)入時(shí)
     *
     */
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        Channel inComing = channelHandlerContext.channel();
        for (Channel channel : channels) {
            if (channel != inComing) {
                channel.writeAndFlush("[用戶(hù)" + inComing.remoteAddress() + " 說(shuō):]" + s + "\n");
            } else {
                channel.writeAndFlush("[我說(shuō):]" + s + "\n");
            }
        }
    }

    /**
     * 當(dāng)有客戶(hù)端連接時(shí)下梢,handlerAdded會(huì)執(zhí)行,就把該客戶(hù)端的通道記錄下來(lái)客蹋,加入隊(duì)列
     *
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel inComing = ctx.channel();//獲得客戶(hù)端通道
        //通知其他客戶(hù)端有新人進(jìn)入
        for (Channel channel : channels) {
            if (channel != inComing) {
                channel.writeAndFlush("[歡迎: " + inComing.remoteAddress() + "] 進(jìn)入聊天室!\n");
            }
        }
        channels.add(inComing);//加入隊(duì)列
    }

    /**
     * 斷開(kāi)連接
     *
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        Channel outComing = ctx.channel();//獲得客戶(hù)端通道
        //通知其他客戶(hù)端有人離開(kāi)
        for (Channel channel : channels) {
            if (channel != outComing) {
                channel.writeAndFlush("[再見(jiàn): ]" + outComing.remoteAddress() + " 離開(kāi)聊天室孽江!\n");
            }
        }

        channels.remove(outComing);
    }


    /**
     * 當(dāng)服務(wù)器監(jiān)聽(tīng)到客戶(hù)端活動(dòng)時(shí)
     *
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel inComing = ctx.channel();
        System.out.println("[" + inComing.remoteAddress() + "]: 進(jìn)入聊天室");
    }

    /**
     * 離線(xiàn)
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        Channel inComing = ctx.channel();
        System.out.println("[" + inComing.remoteAddress() + "]: 離線(xiàn)");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        Channel inComing = ctx.channel();
        System.out.println(inComing.remoteAddress() + "通訊異常讶坯!");
        ctx.close();
    }
}

Client端

public class ChatClient {
    private String host;
    private int port;

    public ChatClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void start() {
        EventLoopGroup works = new NioEventLoopGroup();
        try {
            Bootstrap boot = new Bootstrap();
            boot.group(works).channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel sc) throws Exception {
                            ChannelPipeline pipeline = sc.pipeline();
                            pipeline.addLast("line",new LineBasedFrameDecoder(1024));
                            pipeline.addLast("encode", new StringEncoder());//編碼器
                            pipeline.addLast("decode", new StringDecoder());//解碼器
                            pipeline.addLast(new ClientHander());
                        }
                    });
            ChannelFuture future = boot.connect(this.host, this.port).sync();
            System.out.println("客戶(hù)端已經(jīng)連接");
            future.channel().closeFuture().sync();
            System.out.println("客戶(hù)端已經(jīng)關(guān)閉");
        } catch (Exception e) {

        } finally {
            works.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        new ChatClient("localhost",8888).start();
    }
}

ClientHander

public class ClientHander extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        System.out.println(s);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        new Thread(()-> {
                Scanner scanner = new Scanner(System.in);
                while (scanner.hasNextLine()) {
                    String s = scanner.nextLine()+"\n";
                    ByteBuf buffer = Unpooled.buffer(s.length());
                    buffer.writeBytes(s.getBytes());
                    ctx.writeAndFlush(buffer);
                }
        }).start();
    }
}

啟動(dòng)Sever

image.png

啟動(dòng)一個(gè)Client

image.png

啟動(dòng)第二個(gè)Client

image.png
image.png
image.png
image.png

2、利用Netty實(shí)現(xiàn)的Http協(xié)議:

Server端

public class HttpServer {

    public static void main(String[] args) throws Exception {

        // 定義一對(duì)線(xiàn)程組
        // 主線(xiàn)程組, 用于接受客戶(hù)端的連接岗屏,但是不做任何處理辆琅,跟老板一樣,不做事
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        // 從線(xiàn)程組, 老板線(xiàn)程組會(huì)把任務(wù)丟給他这刷,讓手下線(xiàn)程組去做任務(wù)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        
        try {
            // netty服務(wù)器的創(chuàng)建, ServerBootstrap 是一個(gè)啟動(dòng)類(lèi)
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)           // 設(shè)置主從線(xiàn)程組
                            .channel(NioServerSocketChannel.class)
                            .childHandler(new HttpServerInitializer()); 
            
            // 啟動(dòng)server婉烟,并且設(shè)置8088為啟動(dòng)的端口號(hào),同時(shí)啟動(dòng)方式為同步
            ChannelFuture channelFuture = serverBootstrap.bind(8088).sync();
            
            // 監(jiān)聽(tīng)關(guān)閉的channel暇屋,設(shè)置位同步方式
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

}

public class HttpServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel channel) throws Exception {
        // 通過(guò)SocketChannel去獲得對(duì)應(yīng)的管道
        ChannelPipeline pipeline = channel.pipeline();
        
        // 通過(guò)管道似袁,添加handler
        // HttpServerCodec是由netty自己提供的助手類(lèi),可以理解為攔截器
        // 當(dāng)請(qǐng)求到服務(wù)端,我們需要做解碼叔营,響應(yīng)到客戶(hù)端做編碼
        pipeline.addLast("HttpServerCodec", new HttpServerCodec());
        
        // 添加自定義的助手類(lèi)屋彪,返回 "hello netty~"
        pipeline.addLast("customHandler", new CustomHandler());
    }

}
public class CustomHandler extends SimpleChannelInboundHandler<HttpObject> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) 
            throws Exception {
        // 獲取channel
        Channel channel = ctx.channel();
        
        if (msg instanceof HttpRequest) {
            // 顯示客戶(hù)端的遠(yuǎn)程地址
            System.out.println(channel.remoteAddress());
            
            // 定義發(fā)送的數(shù)據(jù)消息
            ByteBuf content = Unpooled.copiedBuffer("Hello netty~", CharsetUtil.UTF_8);
            
            // 構(gòu)建一個(gè)http response
            FullHttpResponse response = 
                    new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, 
                            HttpResponseStatus.OK, 
                            content);
            // 為響應(yīng)增加數(shù)據(jù)類(lèi)型和長(zhǎng)度
            response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
            response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
            
            // 把響應(yīng)刷到客戶(hù)端
            ctx.writeAndFlush(response);
        }
        
    }
/*
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel注冊(cè)到NioEventLoop");
        super.channelRegistered(ctx);
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel取消和NioEventLoop的綁定");
        super.channelUnregistered(ctx);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel準(zhǔn)備就緒");
        super.channelActive(ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel被關(guān)閉");
        super.channelInactive(ctx);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel讀數(shù)據(jù)完成");
        super.channelReadComplete(ctx);
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        System.out.println("用戶(hù)事件觸發(fā)。绒尊。畜挥。");
        super.userEventTriggered(ctx, evt);
    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel可寫(xiě)更改");
        super.channelWritabilityChanged(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("捕獲到異常");
        super.exceptionCaught(ctx, cause);
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.out.println("新事件被添加");
        super.handlerAdded(ctx);
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        System.out.println("事件被移除");
        super.handlerRemoved(ctx);
    }
*/
}

3、利用Netty實(shí)現(xiàn)的Websocket協(xié)議:

由于Websocket的握手需要使用http,所以在pipline中需要注冊(cè)支持http的事件HttpServerCodec
Server端

public class WSServer {

    public static void main(String[] args) throws Exception {
        
        EventLoopGroup mainGroup = new NioEventLoopGroup();
        EventLoopGroup subGroup = new NioEventLoopGroup();
        
        try {
            ServerBootstrap server = new ServerBootstrap();
            server.group(mainGroup, subGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new WSServerInitialzer());
            
            ChannelFuture future = server.bind(8088).sync();
            
            future.channel().closeFuture().sync();
        } finally {
            mainGroup.shutdownGracefully();
            subGroup.shutdownGracefully();
        }
    }
    
}

public class WSServerInitialzer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        
        // websocket 基于http協(xié)議婴谱,所以要有http編解碼器
        pipeline.addLast(new HttpServerCodec());
        // 對(duì)寫(xiě)大數(shù)據(jù)流的支持 
        pipeline.addLast(new ChunkedWriteHandler());
        // 對(duì)httpMessage進(jìn)行聚合蟹但,聚合成FullHttpRequest或FullHttpResponse
        // 幾乎在netty中的網(wǎng)絡(luò)編程,都會(huì)使用到此hanler
        pipeline.addLast(new HttpObjectAggregator(1024*64));

    // 針對(duì)客戶(hù)端谭羔,如果在1分鐘時(shí)沒(méi)有向服務(wù)端發(fā)送讀寫(xiě)心跳(ALL)华糖,則主動(dòng)斷開(kāi)
        // 如果是讀空閑或者寫(xiě)空閑,不處理
        pipeline.addLast(new IdleStateHandler(8, 10, 12));
        // 自定義的空閑狀態(tài)檢測(cè)
        pipeline.addLast(new HeartBeatHandler());



        /**
         * websocket 服務(wù)器處理的協(xié)議瘟裸,用于指定給客戶(hù)端連接訪問(wèn)的路由 : /ws
         * 本handler會(huì)幫你處理一些繁重的復(fù)雜的事
         * 會(huì)幫你處理握手動(dòng)作: handshaking(close, ping, pong) ping + pong = 心跳
         * 對(duì)于websocket來(lái)講客叉,都是以frames進(jìn)行傳輸?shù)模煌臄?shù)據(jù)類(lèi)型對(duì)應(yīng)的frames也不同
         */
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
        
        // 自定義的handler
        pipeline.addLast(new ChatHandler());
    }

}

TextWebSocketFrame: 在netty中话告,是用于為websocket專(zhuān)門(mén)處理文本的對(duì)象兼搏,frame是消息的載體

public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    // 用于記錄和管理所有客戶(hù)端的channle
    private static ChannelGroup clients = 
            new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) 
            throws Exception {
        // 獲取客戶(hù)端傳輸過(guò)來(lái)的消息
        String content = msg.text();
        System.out.println("接受到的數(shù)據(jù):" + content);
        
//      for (Channel channel: clients) {
//          channel.writeAndFlush(
//              new TextWebSocketFrame(
//                      "[服務(wù)器在]" + LocalDateTime.now() 
//                      + "接受到消息, 消息為:" + content));
//      }
        // 下面這個(gè)方法,和上面的for循環(huán)效果是一樣的
        clients.writeAndFlush(
                new TextWebSocketFrame(
                        "[服務(wù)器在]" + LocalDateTime.now() 
                        + "接受到消息, 消息為:" + content));
        
    }

    /**
     * 當(dāng)客戶(hù)端連接服務(wù)端之后(打開(kāi)連接)
     * 獲取客戶(hù)端的channle沙郭,并且放到ChannelGroup中去進(jìn)行管理
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        clients.add(ctx.channel());
    }

      @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        
        String channelId = ctx.channel().id().asShortText();
        System.out.println("客戶(hù)端被移除佛呻,channelId為:" + channelId);
         //System.out.println("客戶(hù)端斷開(kāi),channle對(duì)應(yīng)的長(zhǎng)id為:" + ctx.channel().id().asLongText());
        //System.out.println("客戶(hù)端斷開(kāi)病线,channle對(duì)應(yīng)的短id為:" + ctx.channel().id().asShortText());
        // 當(dāng)觸發(fā)handlerRemoved吓著,ChannelGroup會(huì)自動(dòng)移除對(duì)應(yīng)客戶(hù)端的channel
        clients.remove(ctx.channel());
    }
/**
發(fā)生異常時(shí)進(jìn)行捕獲
*/
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        // 發(fā)生異常之后關(guān)閉連接(關(guān)閉channel),隨后從ChannelGroup中移除
        ctx.channel().close();
        clients.remove(ctx.channel());
    }   
    
}

心跳檢測(cè)

/**
 * @Description: 用于檢測(cè)channel的心跳handler 
 *               繼承ChannelInboundHandlerAdapter送挑,從而不需要實(shí)現(xiàn)channelRead0方法
 */
public class HeartBeatHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        
        // 判斷evt是否是IdleStateEvent(用于觸發(fā)用戶(hù)事件绑莺,包含 讀空閑/寫(xiě)空閑/讀寫(xiě)空閑 )
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent)evt;     // 強(qiáng)制類(lèi)型轉(zhuǎn)換
            
            if (event.state() == IdleState.READER_IDLE) {
                System.out.println("進(jìn)入讀空閑...");
            } else if (event.state() == IdleState.WRITER_IDLE) {
                System.out.println("進(jìn)入寫(xiě)空閑...");
            } else if (event.state() == IdleState.ALL_IDLE) {
                
                System.out.println("channel關(guān)閉前,users的數(shù)量為:" + ChatHandler.users.size());
                
                Channel channel = ctx.channel();
                // 關(guān)閉無(wú)用的channel惕耕,以防資源浪費(fèi)
                channel.close();
                
                System.out.println("channel關(guān)閉后纺裁,users的數(shù)量為:" + ChatHandler.users.size());
            }
        }
        
    }
    
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市赡突,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌区赵,老刑警劉巖惭缰,帶你破解...
    沈念sama閱讀 206,968評(píng)論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異笼才,居然都是意外死亡漱受,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,601評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)昂羡,“玉大人絮记,你說(shuō)我怎么就攤上這事∨跋龋” “怎么了怨愤?”我有些...
    開(kāi)封第一講書(shū)人閱讀 153,220評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)蛹批。 經(jīng)常有香客問(wèn)我撰洗,道長(zhǎng),這世上最難降的妖魔是什么腐芍? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,416評(píng)論 1 279
  • 正文 為了忘掉前任差导,我火速辦了婚禮,結(jié)果婚禮上猪勇,老公的妹妹穿的比我還像新娘设褐。我一直安慰自己,他們只是感情好泣刹,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,425評(píng)論 5 374
  • 文/花漫 我一把揭開(kāi)白布助析。 她就那樣靜靜地躺著,像睡著了一般项玛。 火紅的嫁衣襯著肌膚如雪貌笨。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 49,144評(píng)論 1 285
  • 那天襟沮,我揣著相機(jī)與錄音锥惋,去河邊找鬼。 笑死开伏,一個(gè)胖子當(dāng)著我的面吹牛膀跌,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播固灵,決...
    沈念sama閱讀 38,432評(píng)論 3 401
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼捅伤,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了巫玻?” 一聲冷哼從身側(cè)響起丛忆,我...
    開(kāi)封第一講書(shū)人閱讀 37,088評(píng)論 0 261
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎仍秤,沒(méi)想到半個(gè)月后熄诡,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,586評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡诗力,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,028評(píng)論 2 325
  • 正文 我和宋清朗相戀三年凰浮,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,137評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡袜茧,死狀恐怖菜拓,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情笛厦,我是刑警寧澤纳鼎,帶...
    沈念sama閱讀 33,783評(píng)論 4 324
  • 正文 年R本政府宣布,位于F島的核電站递递,受9級(jí)特大地震影響喷橙,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜登舞,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,343評(píng)論 3 307
  • 文/蒙蒙 一贰逾、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧菠秒,春花似錦疙剑、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,333評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至禁灼,卻和暖如春管挟,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背弄捕。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,559評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工僻孝, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人守谓。 一個(gè)月前我還...
    沈念sama閱讀 45,595評(píng)論 2 355
  • 正文 我出身青樓穿铆,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親斋荞。 傳聞我的和親對(duì)象是個(gè)殘疾皇子荞雏,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,901評(píng)論 2 345