不多BB,我們開(kāi)門(mén)見(jiàn)山
Netty是一個(gè)提供了易于使用的API的NIO框架,具有高并發(fā)互订、高性能的特點(diǎn)
Netty的Reactor線(xiàn)程模型
https://blog.csdn.net/bingxuesiyang/article/details/89888664
單線(xiàn)程模型
所有的IO操作及業(yè)務(wù)處理都是由一個(gè)NIO線(xiàn)程處理唆缴。
Reactor內(nèi)部通過(guò)selector 監(jiān)控連接事件碳却,收到事件后通過(guò)dispatch進(jìn)行分發(fā),如果是連接建立的事件奈嘿,則由Acceptor處理,Acceptor通過(guò)accept接受連接旺韭,并創(chuàng)建一個(gè)Handler來(lái)處理連接后續(xù)的各種事件氛谜,如果是讀寫(xiě)事件,直接調(diào)用連接對(duì)應(yīng)的Handler來(lái)處理区端。
Handler完成read->業(yè)務(wù)處理(decode->compute->encode)->send的全部流程。
這種模型好處是簡(jiǎn)單澳腹,壞處卻很明顯织盼,當(dāng)某個(gè)Handler阻塞時(shí),會(huì)導(dǎo)致其他客戶(hù)端的handler和accpetor都得不到執(zhí)行酱塔,無(wú)法做到高性能沥邻,只適用于業(yè)務(wù)處理非常快速的場(chǎng)景羊娃。
多線(xiàn)程模型
由一個(gè)NIO線(xiàn)程處理客戶(hù)端連接和讀寫(xiě)邮利,由一組線(xiàn)程池處理業(yè)務(wù)
主線(xiàn)程中,Reactor對(duì)象通過(guò)selector監(jiān)控連接事件垃帅,收到事件后通過(guò)dispatch進(jìn)行分發(fā)延届,如果是連接建立事件,則由Acceptor處理贸诚,Acceptor通過(guò)accept接收連接方庭,并創(chuàng)建一個(gè)Handler來(lái)處理后續(xù)事件,而Handler只負(fù)責(zé)響應(yīng)事件酱固,不進(jìn)行業(yè)務(wù)操作械念,也就是只進(jìn)行read讀取數(shù)據(jù)和write寫(xiě)出數(shù)據(jù),業(yè)務(wù)處理交給一組線(xiàn)程池進(jìn)行處理运悲。
線(xiàn)程池分配一個(gè)線(xiàn)程完成真正的業(yè)務(wù)處理龄减,然后將響應(yīng)結(jié)果交給主進(jìn)程的Handler處理,Handler將結(jié)果send給client扇苞。
單Reactor承當(dāng)所有事件的監(jiān)聽(tīng)和響應(yīng)欺殿,而當(dāng)我們的服務(wù)端遇到大量的客戶(hù)端同時(shí)進(jìn)行連接,或者在請(qǐng)求連接時(shí)執(zhí)行一些耗時(shí)操作鳖敷,比如身份認(rèn)證脖苏,權(quán)限檢查等,這種瞬時(shí)的高并發(fā)就容易成為性能瓶頸定踱。
主從線(xiàn)程模型
一個(gè)NIO線(xiàn)程處理連接,一組NIO線(xiàn)程池處理IO讀寫(xiě)操作亦歉,一組線(xiàn)程池處理業(yè)務(wù)操作
存在多個(gè)Reactor恤浪,每個(gè)Reactor都有自己的selector選擇器,線(xiàn)程和dispatch肴楷。
主線(xiàn)程中的mainReactor通過(guò)自己的selector監(jiān)控連接建立事件水由,收到事件后通過(guò)Accpetor接收,將新的連接分配給某個(gè)子線(xiàn)程,子線(xiàn)程的個(gè)數(shù)和cpu的數(shù)量一致赛蔫,主線(xiàn)程通過(guò)輪詢(xún)的方式分發(fā)連接事件砂客。
子線(xiàn)程中的subReactor將mainReactor分配的連接加入連接隊(duì)列中通過(guò)自己的selector進(jìn)行監(jiān)聽(tīng),并創(chuàng)建一個(gè)Handler用于處理后續(xù)事件呵恢,而Handler只負(fù)責(zé)響應(yīng)事件鞠值,不進(jìn)行業(yè)務(wù)操作,也就是只進(jìn)行read讀取數(shù)據(jù)和write寫(xiě)出數(shù)據(jù)渗钉,業(yè)務(wù)處理交給一組線(xiàn)程池進(jìn)行處理彤恶。
線(xiàn)程池分配一個(gè)線(xiàn)程完成真正的業(yè)務(wù)處理,然后將響應(yīng)結(jié)果交給主進(jìn)程的Handler處理鳄橘,Handler將結(jié)果send給clien
缺點(diǎn):無(wú)
Netty的核心概念
BossEventLoopGroup 和 WorkerEventLoopGroup 包含一個(gè)或者多個(gè) NioEventLoop声离。BossEventLoopGroup 負(fù)責(zé)監(jiān)聽(tīng)客戶(hù)端的 Accept 事件,當(dāng)事件觸發(fā)時(shí)挥唠,將事件注冊(cè)至 WorkerEventLoopGroup 中的一個(gè) NioEventLoop 上抵恋。每新建一個(gè) Channel, 只選擇一個(gè) NioEventLoop 與其綁定宝磨。所以說(shuō) Channel 生命周期的所有事件處理都是線(xiàn)程獨(dú)立的弧关,不同的 NioEventLoop 線(xiàn)程之間不會(huì)發(fā)生任何交集。
NioEventLoop 完成數(shù)據(jù)讀取后唤锉,會(huì)調(diào)用綁定的 ChannelPipeline 進(jìn)行事件傳播世囊,ChannelPipeline 也是線(xiàn)程安全的,數(shù)據(jù)會(huì)被傳遞到 ChannelPipeline 的第一個(gè) ChannelHandler 中窿祥。一個(gè)Channel包含一個(gè)ChannelPipeline株憾,創(chuàng)建Channel時(shí)會(huì)自動(dòng)創(chuàng)建一個(gè)ChannelPipeline,每個(gè)Channel都有一個(gè)管理它的pipeline,而一個(gè)Channel綁定一個(gè)NioEventLoop晒衩,所以一個(gè)pipeline也是綁定一個(gè)NioEventLoop嗤瞎,這關(guān)聯(lián)是永久性的,而pipeline是多個(gè)ChannelHandler的容器听系,一個(gè)pipeline中的ChannelHandler是由EventLoop串行化執(zhí)行贝奇,數(shù)據(jù)處理完成后,將加工完成的數(shù)據(jù)再傳遞給下一個(gè) ChannelHandler靠胜,整個(gè)過(guò)程是串行化執(zhí)行掉瞳,不會(huì)發(fā)生線(xiàn)程上下文切換的問(wèn)題毕源。
EventLoop和EventLoopGroup
當(dāng)系統(tǒng)在運(yùn)行過(guò)程中,如果頻繁的進(jìn)行線(xiàn)程上下文切換陕习,會(huì)帶來(lái)額外的性能損耗霎褐。多線(xiàn)程并發(fā)執(zhí)行某個(gè)業(yè)務(wù)流程,業(yè)務(wù)開(kāi)發(fā)者還需要時(shí)刻對(duì)線(xiàn)程安全保持警惕该镣,哪些數(shù)據(jù)可能會(huì)被并發(fā)修改冻璃,如何保護(hù)?這不僅降低了開(kāi)發(fā)效率损合,也會(huì)帶來(lái)額外的性能損耗俱饿。
為了解決上述問(wèn)題,Netty采用了串行化
設(shè)計(jì)理念塌忽,從消息的讀取、編碼以及后續(xù) ChannelHandler 的執(zhí)行失驶,始終都由 IO 線(xiàn)程 EventLoop 負(fù)責(zé)土居,這就意外著整個(gè)流程不會(huì)進(jìn)行線(xiàn)程上下文的切換,數(shù)據(jù)也不會(huì)面臨被并發(fā)修改的風(fēng)險(xiǎn)嬉探。
EventLoopGroup負(fù)責(zé)為每個(gè)新創(chuàng)建的Channel分配一個(gè)EventLoop,目前使用輪詢(xún)算法分配擦耀。一個(gè)EventLoop可以被分配給多個(gè)Channel。
一個(gè)Channel的生命周期都使用相同的EventLoop處理事件涩堤,這意味著在只涉及一個(gè)Channel狀態(tài)變化的處理不用擔(dān)心其線(xiàn)程安全問(wèn)題眷蜓。
EventLoop:一個(gè)EventLoopGroup包含一個(gè)或者多個(gè)EventLoop,一個(gè)EventLoop在它的生命周期內(nèi)只和一個(gè)Thread綁定胎围,所有由EventLoop處理的I/O事件都將在它專(zhuān)有的Thread上被處理吁系,一個(gè)Channel在它的生命周期內(nèi)只注冊(cè)于一個(gè)EventLoop,一個(gè)EventLoop可能會(huì)被分配給一個(gè)或者多個(gè)Channel白魂。每當(dāng)事件發(fā)生時(shí)汽纤,應(yīng)用程序都會(huì)將產(chǎn)生的事件放入事件隊(duì)列當(dāng)中,然后 EventLoop 會(huì)從隊(duì)列中取出事件執(zhí)行
EventLoopGroup:是一組 EventLoop 的抽象福荸,一個(gè) EventLoopGroup 當(dāng)中會(huì)包含一個(gè)或多個(gè) EventLoop蕴坪,EventLoopGroup 提供 next 接口,可以從一組 EventLoop 里面按照一定規(guī)則獲取其中一個(gè) EventLoop 來(lái)處理任務(wù)敬锐。
在 Netty 服務(wù)器端編程中我們需要 BossEventLoopGroup 和WorkerEventLoopGroup 兩個(gè) EventLoopGroup 來(lái)進(jìn)行工作
BossEventLoopGroup:對(duì)應(yīng) Reactor 模式的 mainReactor 背传,用于服務(wù)端接受客戶(hù)端的連接。BossEventLoopGroup 通常是一個(gè)單線(xiàn)程的 EventLoop台夺,EventLoop 維護(hù)著一個(gè)注冊(cè)了 ServerSocketChannel 的 Selector 實(shí)例径玖,EventLoop 的實(shí)現(xiàn)涵蓋 IO 事件的分離,和分發(fā)(Dispatcher)谒养,EventLoop 的實(shí)現(xiàn)充當(dāng) Reactor 模式中的分發(fā)(Dispatcher)的角色挺狰。
WorkerEventLoopGroup: 對(duì)應(yīng) Reactor 模式的 subReactor 明郭,用于進(jìn)行 SocketChannel 的數(shù)據(jù)讀寫(xiě)。對(duì)于 EventLoopGroup 丰泊,如果未傳遞方法參數(shù) nThreads 薯定,表示使用 CPU 個(gè)數(shù) Reactor 。workerGroup會(huì)由 next 選擇其中一個(gè) EventLoop 來(lái)將這個(gè)SocketChannel 注冊(cè)到其維護(hù)的 Selector 并對(duì)其后續(xù)的 IO 事件進(jìn)行處理瞳购。
ChannelHandler和ChannelPipeline
ChannelHandler是一個(gè)接口话侄,專(zhuān)門(mén)處理 I/O 或攔截 I/O 操作,并將其轉(zhuǎn)發(fā)到其 ChannelPipeline(業(yè)務(wù)處理鏈)中的下一個(gè)處理程序学赛。ChannelPipeline是ChannelHandler的容器年堆,而每個(gè)Channel都會(huì)有一個(gè)ChannelPipeline,負(fù)責(zé)ChannelHandler的管理和事件攔截與調(diào)度盏浇。Channel會(huì)將事件扔到ChannelPipeline中变丧,然后事件會(huì)被ChannelPipeline安排一系列的ChannelHandler攔截處理,例如編解碼事件绢掰、TCP的粘包拆包事件痒蓬、用戶(hù)自定義Handler等,經(jīng)過(guò)一系列加工后滴劲,事件的消息會(huì)被添加緩沖區(qū)中等待Channel的刷新和發(fā)送攻晒。
事件出站(Outbound)和入站(Inbound)
- Inbound事件:通常由I/O線(xiàn)程觸發(fā),例如TCP鏈路連接班挖、鏈路關(guān)閉鲁捏、讀事件、異常通知等萧芙,它們對(duì)應(yīng)下圖的左半部分给梅,這些方法被封裝在ChannelInboundHandler里面
- Outbound事件:通常是用戶(hù)主動(dòng)發(fā)起的網(wǎng)絡(luò)IO操作,例如發(fā)起連接操作末购、綁定操作破喻、消息發(fā)送等,它們對(duì)應(yīng)下圖的右半部分盟榴,這些方法被封裝在ChannelOutboundHandler里面
ChannelPipeline為ChannelHandler鏈提供了一個(gè)容器并定義了用于沿著鏈傳播入站和出站事件流的API曹质,當(dāng)一個(gè)數(shù)據(jù)流進(jìn)入 ChannlePipeline 時(shí),它會(huì)從 ChannelPipeline 頭部開(kāi)始傳給第一個(gè) ChannelInboundHandler 擎场,當(dāng)?shù)谝粋€(gè)處理完后再傳給下一個(gè)羽德,一直傳遞到管道的尾部。
與之相對(duì)應(yīng)的是迅办,當(dāng)數(shù)據(jù)被寫(xiě)出時(shí)宅静,它會(huì)從管道的尾部開(kāi)始,先經(jīng)過(guò)管道尾部的 “最后” 一個(gè)ChannelOutboundHandler站欺,當(dāng)它處理完成后會(huì)傳遞給前一個(gè) ChannelOutboundHandler 姨夹。
ChannelHandlerContext
保存 Channel 相關(guān)的所有上下文信息纤垂,同時(shí)關(guān)聯(lián)一個(gè) ChannelHandler 對(duì)象。I/O 事件由 ChannelInboundHandler 或 ChannelOutboundHandler 處理磷账,并通過(guò)調(diào)用 ChannelHandlerContext 中定義的事件傳播方法峭沦。
ServerBootstrap和Bootstrap
ServerBootstrap是服務(wù)端的啟動(dòng)助手取募,Bootstrap是客戶(hù)端的啟動(dòng)助手琐谤,它們的目的主要是降低開(kāi)發(fā)的復(fù)雜度
TCP的粘包和拆包
在TCP協(xié)議中一個(gè)完整的數(shù)據(jù)包可能會(huì)被TCP拆分為多個(gè)包發(fā)送,或者將多個(gè)小的數(shù)據(jù)包封裝成大的數(shù)據(jù)包發(fā)送玩敏,這就會(huì)發(fā)生TCP的粘包和拆包的問(wèn)題笑跛。
產(chǎn)生原因:
- 要發(fā)送的數(shù)據(jù)大于TCP發(fā)送緩沖區(qū)剩余空間大小,將會(huì)發(fā)生拆包聊品。
- 待發(fā)送數(shù)據(jù)大于MSS(最大報(bào)文長(zhǎng)度),TCP在傳輸前將進(jìn)行拆包几苍。
- 要發(fā)送的數(shù)據(jù)小于TCP發(fā)送緩沖區(qū)的大小翻屈,TCP將多次寫(xiě)入緩沖區(qū)的數(shù)據(jù)一次發(fā)送出去,將會(huì)發(fā)生粘包妻坝。
- 接收數(shù)據(jù)端的應(yīng)用層沒(méi)有及時(shí)讀取接收緩沖區(qū)中的數(shù)據(jù)伸眶,將發(fā)生粘包。
TCP的粘包和拆包的解決方法
粘包問(wèn)題解決
- FixedLengthFrameDecoder:定長(zhǎng)協(xié)議解碼器刽宪,我們可以指定固定的字節(jié)數(shù)算一個(gè)完整的報(bào)文
- LineBasedFrameDecoder:行分隔符解碼器厘贼,遇到\n或者\(yùn)r\n,則認(rèn)為是一個(gè)完整的報(bào)文
- DelimiterBasedFrameDecoder:分隔符解碼器圣拄,與LineBasedFrameDecoder類(lèi)似嘴秸,只不過(guò)分隔符可以自己指定
- LengthFieldBasedFrameDecoder:長(zhǎng)度編碼解碼器,將報(bào)文劃分為報(bào)文頭/報(bào)文體庇谆,根據(jù)報(bào)文頭中的Length字段確定報(bào)文體的長(zhǎng)度岳掐,因此報(bào)文提的長(zhǎng)度是可變的
- JsonObjectDecoder:json格式解碼器,當(dāng)檢測(cè)到匹配數(shù)量的"{" 饭耳、”}”或”[””]”時(shí),則認(rèn)為是一個(gè)完整的json對(duì)象或者json數(shù)組。
Netty高性能的原因
- IO 線(xiàn)程模型:同步非阻塞款青,用最少的資源做更多的事。
- 內(nèi)存零拷貝:盡量減少不必要的內(nèi)存拷貝衰腌,實(shí)現(xiàn)了更高效率的傳輸。
- 內(nèi)存池設(shè)計(jì):申請(qǐng)的內(nèi)存可以重用觅赊,主要指直接內(nèi)存右蕊。內(nèi)部實(shí)現(xiàn)是用一顆二叉查找樹(shù)管理內(nèi)存分配情況。
- 串形化處理讀寫(xiě):避免使用鎖帶來(lái)的性能開(kāi)銷(xiāo)茉兰。
- 高性能序列化協(xié)議:支持 protobuf 等高性能序列化協(xié)議尤泽。
Netty的序列化協(xié)議:
XML,優(yōu)點(diǎn):人機(jī)可讀性好规脸,可指定元素或特性的名稱(chēng)坯约。缺點(diǎn):序列化數(shù)據(jù)只包含數(shù)據(jù)本身以及類(lèi)的結(jié)構(gòu),不包括類(lèi)型標(biāo)識(shí)和程序集信息莫鸭;只能序列化公共屬性和字段闹丐;不能序列化方法;文件龐大被因,文件格式復(fù)雜卿拴,傳輸占帶寬。適用場(chǎng)景:當(dāng)做配置文件存儲(chǔ)數(shù)據(jù)梨与,實(shí)時(shí)數(shù)據(jù)轉(zhuǎn)換堕花。
JSON,是一種輕量級(jí)的數(shù)據(jù)交換格式粥鞋,優(yōu)點(diǎn):兼容性高缘挽、數(shù)據(jù)格式比較簡(jiǎn)單,易于讀寫(xiě)呻粹、序列化后數(shù)據(jù)較小壕曼,可擴(kuò)展性好,兼容性好等浊、與XML相比腮郊,其協(xié)議比較簡(jiǎn)單,解析速度比較快筹燕。缺點(diǎn):數(shù)據(jù)的描述性比XML差轧飞、不適合性能要求為ms級(jí)別的情況、額外空間開(kāi)銷(xiāo)比較大撒踪。適用場(chǎng)景(可替代XML):跨防火墻訪問(wèn)踪少、可調(diào)式性要求高、基于Web browser的Ajax請(qǐng)求糠涛、傳輸數(shù)據(jù)量相對(duì)小援奢,實(shí)時(shí)性要求相對(duì)低(例如秒級(jí)別)的服務(wù)。
Fastjson忍捡,采用一種“假定有序快速匹配”的算法集漾。優(yōu)點(diǎn):接口簡(jiǎn)單易用切黔、目前java語(yǔ)言中最快的json庫(kù)。缺點(diǎn):過(guò)于注重快具篇,而偏離了“標(biāo)準(zhǔn)”及功能性纬霞、代碼質(zhì)量不高,文檔不全驱显。適用場(chǎng)景:協(xié)議交互诗芜、Web輸出、Android客戶(hù)端
Thrift埃疫,不僅是序列化協(xié)議伏恐,還是一個(gè)RPC框架。優(yōu)點(diǎn):序列化后的體積小, 速度快栓霜、支持多種語(yǔ)言和豐富的數(shù)據(jù)類(lèi)型翠桦、對(duì)于數(shù)據(jù)字段的增刪具有較強(qiáng)的兼容性、支持二進(jìn)制壓縮編碼胳蛮。缺點(diǎn):使用者較少销凑、跨防火墻訪問(wèn)時(shí),不安全仅炊、不具有可讀性斗幼,調(diào)試代碼時(shí)相對(duì)困難、不能與其他傳輸層協(xié)議共同使用(例如HTTP)抚垄、無(wú)法支持向持久層直接讀寫(xiě)數(shù)據(jù)孟岛,即不適合做數(shù)據(jù)持久化序列化協(xié)議。適用場(chǎng)景:分布式系統(tǒng)的RPC解決方案
Avro督勺,Hadoop的一個(gè)子項(xiàng)目,解決了JSON的冗長(zhǎng)和沒(méi)有IDL的問(wèn)題斤贰。優(yōu)點(diǎn):支持豐富的數(shù)據(jù)類(lèi)型智哀、簡(jiǎn)單的動(dòng)態(tài)語(yǔ)言結(jié)合功能、具有自我描述屬性荧恍、提高了數(shù)據(jù)解析速度瓷叫、快速可壓縮的二進(jìn)制數(shù)據(jù)形式、可以實(shí)現(xiàn)遠(yuǎn)程過(guò)程調(diào)用RPC送巡、支持跨編程語(yǔ)言實(shí)現(xiàn)摹菠。缺點(diǎn):對(duì)于習(xí)慣于靜態(tài)類(lèi)型語(yǔ)言的用戶(hù)不直觀。適用場(chǎng)景:在Hadoop中做Hive骗爆、Pig和MapReduce的持久化數(shù)據(jù)格式次氨。
Protobuf,將數(shù)據(jù)結(jié)構(gòu)以.proto文件進(jìn)行描述摘投,通過(guò)代碼生成工具可以生成對(duì)應(yīng)數(shù)據(jù)結(jié)構(gòu)的-POJO對(duì)象和Protobuf相關(guān)的方法和屬性煮寡。優(yōu)點(diǎn):序列化后碼流小虹蓄,性能高、結(jié)構(gòu)化數(shù)據(jù)存儲(chǔ)格式(XML JSON等)幸撕、通過(guò)標(biāo)識(shí)字段的順序薇组,可以實(shí)現(xiàn)協(xié)議的前向兼容、結(jié)構(gòu)化的文檔更容易管理和維護(hù)坐儿。缺點(diǎn):需要依賴(lài)于工具生成代碼律胀、支持的語(yǔ)言相對(duì)較少,官方只支持Java 貌矿、C++ 炭菌、python。適用場(chǎng)景:對(duì)性能要求高的RPC調(diào)用站叼、具有良好的跨防火墻的訪問(wèn)屬性娃兽、適合應(yīng)用層對(duì)象的持久化
protostuff 基于protobuf協(xié)議,但不需要配置proto文件尽楔,直接導(dǎo)包即可
Jboss marshaling 可以直接序列化java類(lèi)投储, 無(wú)須實(shí)java.io.Serializable接口
MessagePack 一個(gè)高效的二進(jìn)制序列化格式
Hessian 采用二進(jìn)制協(xié)議的輕量級(jí)remoting onhttp工具
kryo 基于protobuf協(xié)議,只支持java語(yǔ)言,需要注冊(cè)(Registration)阔馋,然后序列化(Output)玛荞,反序列化(Input)
Netty實(shí)戰(zhàn):1、利用Netty實(shí)現(xiàn)的聊天室 2呕寝、利用Netty實(shí)現(xiàn)Http協(xié)議 3勋眯、利用Netty實(shí)現(xiàn)Websocket協(xié)議
1、利用Netty實(shí)現(xiàn)的聊天室:
Sever端
public class ChatServer {
private int port;
public ChatServer(int port) {
this.port = port;
}
public void start(){
EventLoopGroup boss=new NioEventLoopGroup();
EventLoopGroup works=new NioEventLoopGroup();
try {
ServerBootstrap boot=new ServerBootstrap();
boot.group(boss,works)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
ChannelPipeline pipeline = sc.pipeline();
pipeline.addLast("line",new LineBasedFrameDecoder(1024));
pipeline.addLast("encode", new StringEncoder());
pipeline.addLast("decode", new StringDecoder());
pipeline.addLast(new ServerHander());
}
})
.option(ChannelOption.SO_BACKLOG,128)
.option(ChannelOption.SO_KEEPALIVE,true);
ChannelFuture future = boot.bind(this.port).sync();
System.out.println("服務(wù)已經(jīng)啟動(dòng)...............");
future.channel().closeFuture().sync();
System.out.println("服務(wù)已經(jīng)關(guān)閉...............");
}catch (Exception e){
}finally {
boss.shutdownGracefully();
works.shutdownGracefully();
}
}
public static void main(String[] args) {
new ChatServer(8888).start();
}
}
SeverHander
public class ServerHander extends SimpleChannelInboundHandler<String> {
public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
/**
* 每當(dāng)從客戶(hù)端有消息寫(xiě)入時(shí)
*
*/
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
Channel inComing = channelHandlerContext.channel();
for (Channel channel : channels) {
if (channel != inComing) {
channel.writeAndFlush("[用戶(hù)" + inComing.remoteAddress() + " 說(shuō):]" + s + "\n");
} else {
channel.writeAndFlush("[我說(shuō):]" + s + "\n");
}
}
}
/**
* 當(dāng)有客戶(hù)端連接時(shí)下梢,handlerAdded會(huì)執(zhí)行,就把該客戶(hù)端的通道記錄下來(lái)客蹋,加入隊(duì)列
*
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel inComing = ctx.channel();//獲得客戶(hù)端通道
//通知其他客戶(hù)端有新人進(jìn)入
for (Channel channel : channels) {
if (channel != inComing) {
channel.writeAndFlush("[歡迎: " + inComing.remoteAddress() + "] 進(jìn)入聊天室!\n");
}
}
channels.add(inComing);//加入隊(duì)列
}
/**
* 斷開(kāi)連接
*
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel outComing = ctx.channel();//獲得客戶(hù)端通道
//通知其他客戶(hù)端有人離開(kāi)
for (Channel channel : channels) {
if (channel != outComing) {
channel.writeAndFlush("[再見(jiàn): ]" + outComing.remoteAddress() + " 離開(kāi)聊天室孽江!\n");
}
}
channels.remove(outComing);
}
/**
* 當(dāng)服務(wù)器監(jiān)聽(tīng)到客戶(hù)端活動(dòng)時(shí)
*
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel inComing = ctx.channel();
System.out.println("[" + inComing.remoteAddress() + "]: 進(jìn)入聊天室");
}
/**
* 離線(xiàn)
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel inComing = ctx.channel();
System.out.println("[" + inComing.remoteAddress() + "]: 離線(xiàn)");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Channel inComing = ctx.channel();
System.out.println(inComing.remoteAddress() + "通訊異常讶坯!");
ctx.close();
}
}
Client端
public class ChatClient {
private String host;
private int port;
public ChatClient(String host, int port) {
this.host = host;
this.port = port;
}
public void start() {
EventLoopGroup works = new NioEventLoopGroup();
try {
Bootstrap boot = new Bootstrap();
boot.group(works).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
ChannelPipeline pipeline = sc.pipeline();
pipeline.addLast("line",new LineBasedFrameDecoder(1024));
pipeline.addLast("encode", new StringEncoder());//編碼器
pipeline.addLast("decode", new StringDecoder());//解碼器
pipeline.addLast(new ClientHander());
}
});
ChannelFuture future = boot.connect(this.host, this.port).sync();
System.out.println("客戶(hù)端已經(jīng)連接");
future.channel().closeFuture().sync();
System.out.println("客戶(hù)端已經(jīng)關(guān)閉");
} catch (Exception e) {
} finally {
works.shutdownGracefully();
}
}
public static void main(String[] args) {
new ChatClient("localhost",8888).start();
}
}
ClientHander
public class ClientHander extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
System.out.println(s);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
new Thread(()-> {
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String s = scanner.nextLine()+"\n";
ByteBuf buffer = Unpooled.buffer(s.length());
buffer.writeBytes(s.getBytes());
ctx.writeAndFlush(buffer);
}
}).start();
}
}
啟動(dòng)Sever
啟動(dòng)一個(gè)Client
啟動(dòng)第二個(gè)Client
2、利用Netty實(shí)現(xiàn)的Http協(xié)議:
Server端
public class HttpServer {
public static void main(String[] args) throws Exception {
// 定義一對(duì)線(xiàn)程組
// 主線(xiàn)程組, 用于接受客戶(hù)端的連接岗屏,但是不做任何處理辆琅,跟老板一樣,不做事
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 從線(xiàn)程組, 老板線(xiàn)程組會(huì)把任務(wù)丟給他这刷,讓手下線(xiàn)程組去做任務(wù)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// netty服務(wù)器的創(chuàng)建, ServerBootstrap 是一個(gè)啟動(dòng)類(lèi)
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup) // 設(shè)置主從線(xiàn)程組
.channel(NioServerSocketChannel.class)
.childHandler(new HttpServerInitializer());
// 啟動(dòng)server婉烟,并且設(shè)置8088為啟動(dòng)的端口號(hào),同時(shí)啟動(dòng)方式為同步
ChannelFuture channelFuture = serverBootstrap.bind(8088).sync();
// 監(jiān)聽(tīng)關(guān)閉的channel暇屋,設(shè)置位同步方式
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
public class HttpServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
// 通過(guò)SocketChannel去獲得對(duì)應(yīng)的管道
ChannelPipeline pipeline = channel.pipeline();
// 通過(guò)管道似袁,添加handler
// HttpServerCodec是由netty自己提供的助手類(lèi),可以理解為攔截器
// 當(dāng)請(qǐng)求到服務(wù)端,我們需要做解碼叔营,響應(yīng)到客戶(hù)端做編碼
pipeline.addLast("HttpServerCodec", new HttpServerCodec());
// 添加自定義的助手類(lèi)屋彪,返回 "hello netty~"
pipeline.addLast("customHandler", new CustomHandler());
}
}
public class CustomHandler extends SimpleChannelInboundHandler<HttpObject> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg)
throws Exception {
// 獲取channel
Channel channel = ctx.channel();
if (msg instanceof HttpRequest) {
// 顯示客戶(hù)端的遠(yuǎn)程地址
System.out.println(channel.remoteAddress());
// 定義發(fā)送的數(shù)據(jù)消息
ByteBuf content = Unpooled.copiedBuffer("Hello netty~", CharsetUtil.UTF_8);
// 構(gòu)建一個(gè)http response
FullHttpResponse response =
new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.OK,
content);
// 為響應(yīng)增加數(shù)據(jù)類(lèi)型和長(zhǎng)度
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
// 把響應(yīng)刷到客戶(hù)端
ctx.writeAndFlush(response);
}
}
/*
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel注冊(cè)到NioEventLoop");
super.channelRegistered(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel取消和NioEventLoop的綁定");
super.channelUnregistered(ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel準(zhǔn)備就緒");
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel被關(guān)閉");
super.channelInactive(ctx);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel讀數(shù)據(jù)完成");
super.channelReadComplete(ctx);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
System.out.println("用戶(hù)事件觸發(fā)。绒尊。畜挥。");
super.userEventTriggered(ctx, evt);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel可寫(xiě)更改");
super.channelWritabilityChanged(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("捕獲到異常");
super.exceptionCaught(ctx, cause);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("新事件被添加");
super.handlerAdded(ctx);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("事件被移除");
super.handlerRemoved(ctx);
}
*/
}
3、利用Netty實(shí)現(xiàn)的Websocket協(xié)議:
由于Websocket的握手需要使用http,所以在pipline中需要注冊(cè)支持http的事件HttpServerCodec
Server端
public class WSServer {
public static void main(String[] args) throws Exception {
EventLoopGroup mainGroup = new NioEventLoopGroup();
EventLoopGroup subGroup = new NioEventLoopGroup();
try {
ServerBootstrap server = new ServerBootstrap();
server.group(mainGroup, subGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new WSServerInitialzer());
ChannelFuture future = server.bind(8088).sync();
future.channel().closeFuture().sync();
} finally {
mainGroup.shutdownGracefully();
subGroup.shutdownGracefully();
}
}
}
public class WSServerInitialzer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// websocket 基于http協(xié)議婴谱,所以要有http編解碼器
pipeline.addLast(new HttpServerCodec());
// 對(duì)寫(xiě)大數(shù)據(jù)流的支持
pipeline.addLast(new ChunkedWriteHandler());
// 對(duì)httpMessage進(jìn)行聚合蟹但,聚合成FullHttpRequest或FullHttpResponse
// 幾乎在netty中的網(wǎng)絡(luò)編程,都會(huì)使用到此hanler
pipeline.addLast(new HttpObjectAggregator(1024*64));
// 針對(duì)客戶(hù)端谭羔,如果在1分鐘時(shí)沒(méi)有向服務(wù)端發(fā)送讀寫(xiě)心跳(ALL)华糖,則主動(dòng)斷開(kāi)
// 如果是讀空閑或者寫(xiě)空閑,不處理
pipeline.addLast(new IdleStateHandler(8, 10, 12));
// 自定義的空閑狀態(tài)檢測(cè)
pipeline.addLast(new HeartBeatHandler());
/**
* websocket 服務(wù)器處理的協(xié)議瘟裸,用于指定給客戶(hù)端連接訪問(wèn)的路由 : /ws
* 本handler會(huì)幫你處理一些繁重的復(fù)雜的事
* 會(huì)幫你處理握手動(dòng)作: handshaking(close, ping, pong) ping + pong = 心跳
* 對(duì)于websocket來(lái)講客叉,都是以frames進(jìn)行傳輸?shù)模煌臄?shù)據(jù)類(lèi)型對(duì)應(yīng)的frames也不同
*/
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
// 自定義的handler
pipeline.addLast(new ChatHandler());
}
}
TextWebSocketFrame: 在netty中话告,是用于為websocket專(zhuān)門(mén)處理文本的對(duì)象兼搏,frame是消息的載體
public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
// 用于記錄和管理所有客戶(hù)端的channle
private static ChannelGroup clients =
new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg)
throws Exception {
// 獲取客戶(hù)端傳輸過(guò)來(lái)的消息
String content = msg.text();
System.out.println("接受到的數(shù)據(jù):" + content);
// for (Channel channel: clients) {
// channel.writeAndFlush(
// new TextWebSocketFrame(
// "[服務(wù)器在]" + LocalDateTime.now()
// + "接受到消息, 消息為:" + content));
// }
// 下面這個(gè)方法,和上面的for循環(huán)效果是一樣的
clients.writeAndFlush(
new TextWebSocketFrame(
"[服務(wù)器在]" + LocalDateTime.now()
+ "接受到消息, 消息為:" + content));
}
/**
* 當(dāng)客戶(hù)端連接服務(wù)端之后(打開(kāi)連接)
* 獲取客戶(hù)端的channle沙郭,并且放到ChannelGroup中去進(jìn)行管理
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
clients.add(ctx.channel());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
String channelId = ctx.channel().id().asShortText();
System.out.println("客戶(hù)端被移除佛呻,channelId為:" + channelId);
//System.out.println("客戶(hù)端斷開(kāi),channle對(duì)應(yīng)的長(zhǎng)id為:" + ctx.channel().id().asLongText());
//System.out.println("客戶(hù)端斷開(kāi)病线,channle對(duì)應(yīng)的短id為:" + ctx.channel().id().asShortText());
// 當(dāng)觸發(fā)handlerRemoved吓著,ChannelGroup會(huì)自動(dòng)移除對(duì)應(yīng)客戶(hù)端的channel
clients.remove(ctx.channel());
}
/**
發(fā)生異常時(shí)進(jìn)行捕獲
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
// 發(fā)生異常之后關(guān)閉連接(關(guān)閉channel),隨后從ChannelGroup中移除
ctx.channel().close();
clients.remove(ctx.channel());
}
}
心跳檢測(cè)
/**
* @Description: 用于檢測(cè)channel的心跳handler
* 繼承ChannelInboundHandlerAdapter送挑,從而不需要實(shí)現(xiàn)channelRead0方法
*/
public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// 判斷evt是否是IdleStateEvent(用于觸發(fā)用戶(hù)事件绑莺,包含 讀空閑/寫(xiě)空閑/讀寫(xiě)空閑 )
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent)evt; // 強(qiáng)制類(lèi)型轉(zhuǎn)換
if (event.state() == IdleState.READER_IDLE) {
System.out.println("進(jìn)入讀空閑...");
} else if (event.state() == IdleState.WRITER_IDLE) {
System.out.println("進(jìn)入寫(xiě)空閑...");
} else if (event.state() == IdleState.ALL_IDLE) {
System.out.println("channel關(guān)閉前,users的數(shù)量為:" + ChatHandler.users.size());
Channel channel = ctx.channel();
// 關(guān)閉無(wú)用的channel惕耕,以防資源浪費(fèi)
channel.close();
System.out.println("channel關(guān)閉后纺裁,users的數(shù)量為:" + ChatHandler.users.size());
}
}
}
}