精通并發(fā)與 Netty
Netty 是一個異步的蔗衡,事件驅(qū)動的網(wǎng)絡(luò)通信框架渤闷,用于高性能的基于協(xié)議的客戶端和服務(wù)端的開發(fā)狐血。
異步指的是會立即返回,并不知道到底發(fā)送過去沒有潘懊,成功沒有姚糊,一般都會使用監(jiān)聽器來監(jiān)聽返回。
事件驅(qū)動是指開發(fā)者只需要關(guān)注事件對應(yīng)的回調(diào)方法即可授舟,比如 channel active救恨,inactive,read 等等释树。
網(wǎng)絡(luò)通信框架就不用解釋了肠槽,很多你非常熟悉的組件都使用了 netty擎淤,比如 spark,dubbo 等等秸仙。
初步了解 Netty
第一個簡單的例子嘴拢,使用 Netty 實現(xiàn)一個 http 服務(wù)器,客戶端調(diào)用一個沒有參數(shù)的方法寂纪,服務(wù)端返回一個 hello world席吴。
Netty 里面大量的代碼都是對線程的處理和 IO 的異步的操作。
package com.paul;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
public class Server {
public static void main(String[] args) throws InterruptedException {
//定義兩個線程組弊攘,事件循環(huán)組,可以類比與 Tomcat 就是死循環(huán)抢腐,不斷接收客戶端的連接
// boss 線程組不斷從客戶端接受連接,但不處理襟交,由 worker 線程組對連接進行真正的處理
// 一個線程組其實也能完成迈倍,推薦使用兩個
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 服務(wù)端啟動器,可以輕松的啟動服務(wù)端的 channel
ServerBootstrap serverBootstrap = new ServerBootstrap();
//group 方法有兩個捣域,一個接收一個參數(shù)啼染,另一個接收兩個參數(shù)
// childhandler 是我們自己寫的請求處理器
serverBootstrap.group(bossGroup, workerGroup).channel(NioSocketChannel.class)
.childHandler(new ServerInitializer());
//綁定端口
ChannelFuture future = serverBootstrap.bind(8011).sync();
//channel 關(guān)閉的監(jiān)聽
future.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
package com.paul;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpServerCodec;
public class ServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//管道,管道里面可以有很多 handler焕梅,一層層過濾的柑橘
ChannelPipeline pipeline = socketChannel.pipeline();
//HttpServerCodec 是 HttpRequestDecoder 和 HttpReponseEncoder 的組合迹鹅,編碼和解碼的 h handler
pipeline.addLast("httpServerCodec", new HttpServerCodec());
pipeline.addLast("handler", new ServerHandler());
}
}
package com.paul;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;
public class ServerHandler extends SimpleChannelInboundHandler<HttpObject> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject httpObject) throws Exception {
if(httpObject instanceof HttpRequest) {
ByteBuf content = Unpooled.copiedBuffer("hello world", CharsetUtil.UTF_8);
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
//單純的調(diào)用 write 只會放到緩存區(qū),不會真的發(fā)送
channelHandlerContext.writeAndFlush(response);
}
}
}
我們在 SimpleChannelInboundHandler 里分析一下贞言,先看它繼承的 ChannelInboundHandlerAdapter 里面的事件回調(diào)方法斜棚,包括通道注冊,解除注冊该窗,Active弟蚀,InActive等等。
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered();
}
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
}
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
}
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelInactive();
}
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
}
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelWritabilityChanged();
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause);
}
執(zhí)行順序為 handler added->channel registered->channel active->channelRead0->channel inactive->channel unregistered酗失。
Netty 本身并不是遵循 servlet 規(guī)范的义钉。Http 是基于請求和響應(yīng)的無狀態(tài)協(xié)議。Http 1.1 是有 keep-alived 參數(shù)的规肴,如果3秒沒有返回捶闸,則服務(wù)端主動關(guān)閉了解,Http 1.0 則是請求完成直接返回拖刃。
Netty 的連接會被一直保持删壮,我們需要自己去處理這個功能。
在服務(wù)端發(fā)送完畢數(shù)據(jù)后兑牡,可以在服務(wù)端關(guān)閉 Channel央碟。
ctx.channel.close();
Netty 能做什么
- 可以當(dāng)作一個 http 服務(wù)器,但是他并沒有實現(xiàn) servelt 規(guī)范发绢。雖然 Tomcat 底層本身也使用 NIO硬耍,但是 Netty 本身的特點決定了它比 Tomcat 的吞吐量更高。相比于 SpringMVC 等框架边酒,Netty 沒提供路由等功能经柴,這也契合和 Netty 的設(shè)計思路,它更貼近底層墩朦。
- Socket 開發(fā)坯认,也是應(yīng)用最為廣泛的領(lǐng)域,底層傳輸?shù)淖罨A(chǔ)框架氓涣,RPC 框架底層多數(shù)采用 Netty牛哺。直接采用 Http 當(dāng)然也可以,但是效率就低了很多了劳吠。
- 支持長連接的開發(fā)引润,消息推送,聊天痒玩,服務(wù)端向客戶端推送等等都會采用 WebSocket 協(xié)議淳附,就是長連接。
Netty 對 Socket 的實現(xiàn)
對于 Http 編程來說蠢古,我們實現(xiàn)了服務(wù)端就可以了奴曙,客戶端完全可以使用瀏覽器或者 CURL 工具來充當(dāng)。但是對于 Socket 編程來說草讶,客戶端也得我們自己實現(xiàn)洽糟。
服務(wù)器端:
Server 類于上面 Http 服務(wù)器那個一樣,在 ServerInitoalizer 有一些變化
public class ServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//管道堕战,管道里面可以有很多 handler坤溃,一層層過濾的柑橘
ChannelPipeline pipeline = socketChannel.pipeline();
// TCP 粘包 拆包
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));
pipeline.addLast(new LengthFieldPrepender(4));
// 字符串編碼,解碼
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new ServerHandler());
}
}
public class ServerHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(ctx.channel().remoteAddress()+","+msg);
ctx.channel().writeAndFlush("from server:" + UUID.randomUUID());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
客戶端:
public class Client {
public static void main(String[] args) throws InterruptedException {
//客戶端不需要兩個 group践啄,只需要一個就夠了浇雹,直接連接服務(wù)端發(fā)送數(shù)據(jù)就可以了
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try{
Bootstrap bootstrap = new Bootstrap();
//服務(wù)器端既可以使用 handler 也可以使用 childhandler, 客戶端一般使用 handler
//對于 服務(wù)端屿讽,handler 是針對 bossgroup的昭灵,childhandler 是針對 workergorup 的
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.handler(new ClientInitializer());
ChannelFuture channelFuture = bootstrap.connect("localhost",8899).sync();
channelFuture.channel().closeFuture().sync();
}finally {
eventLoopGroup.shutdownGracefully();
}
}
}
public class ClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//管道,管道里面可以有很多 handler伐谈,一層層過濾的柑橘
ChannelPipeline pipeline = socketChannel.pipeline();
// TCP 粘包 拆包
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));
pipeline.addLast(new LengthFieldPrepender(4));
// 字符串編碼烂完,解碼
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new ClientHandler());
}
}
public class ClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(ctx.channel().remoteAddress()+","+msg);
System.out.println("client output:"+ msg);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.channel().writeAndFlush("23123");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
Netty 長連接實現(xiàn)一個聊天室
Server 端:
public class ServerHandler extends SimpleChannelInboundHandler<String> {
//定義 channel group 來管理所有 channel
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.writeAndFlush("[服務(wù)器]-" + channel.remoteAddress() + "加入\n");
channelGroup.add(channel);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.writeAndFlush("[服務(wù)器]-" + channel.remoteAddress() + "離開\n");
//這個 channel 會被自動從 channelGroup 里移除
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
System.out.println(channel.remoteAddress() + "上線");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
System.out.println(channel.remoteAddress() + "離開");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
Client 端:
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
for(;;){
channel.writeAndFlush(br.readLine() + "\r\n");
}
Netty 心跳
集群之間各個節(jié)點的通信,主從節(jié)點之間需要進行數(shù)據(jù)同步诵棵,每當(dāng)主節(jié)點的數(shù)據(jù)發(fā)生變化時抠蚣,通過異步的方式將數(shù)據(jù)同步到從節(jié)點,同步方式可以用日志等等履澳,因此主從節(jié)點之間不是實時一致性而是最終一致性嘶窄。
節(jié)點與節(jié)點之間如何進行通信那怀跛?這種主從模式是需要互相之間有長連接的,這樣來確定對方還活著柄冲,實現(xiàn)方式是互相之間定時發(fā)送心跳數(shù)據(jù)包吻谋。如果發(fā)送幾次后對方還是沒有響應(yīng)的話,就可以認(rèn)為對方已經(jīng)掛掉了现横。
回到客戶端與服務(wù)端的模式漓拾,有人可能會想,客戶端斷開連接后服務(wù)端的 handlerRemoved 等方法不是能感知嗎戒祠?還要心跳干什么哪骇两?
真實情況其實非常復(fù)雜,比如手機客戶端和服務(wù)端進行一個長連接姜盈,客戶端沒有退出應(yīng)用低千,客戶端開了飛行模型,或者強制關(guān)機馏颂,此時雙方是感知不到連接已經(jīng)斷掉了栋操,或者說需要非常長的時間才能感知到,這是我們不想看到的饱亮,這時就需要心跳了矾芙。
來看一個示例:
其他的代碼還是和上面的一樣,我們就不列出來了近上,直接進入主題剔宪,看不同的地方:
服務(wù)端
// Netty 為了支持心跳的 IdleStateHandler,空閑狀態(tài)監(jiān)測處理器。
pipeline.addLast(new IdleStateHandler(5壹无,7葱绒,10,TimeUnit.SECONDS));
來看看 IdleStateHandler 的說明
/*
* Triggers an IdleStateEvent when a Channel has not performed read, write, or both
* operation for a while
* 當(dāng)一個 channel 一斷時間沒有進行 read斗锭,write 就觸發(fā)一個 IdleStateEvent
*/
public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
//三個參數(shù)分別為多長時間沒進行讀躏鱼,寫或者讀寫操作則觸發(fā) event同窘。
}
觸發(fā) event 后我們編寫這個 event 對應(yīng)的處理器。
public class MyHandler extends ChannelInboundHandlerAdapter{
//觸發(fā)某個事件后這個方法就會被調(diào)用
//一個 channelhandlerContext 上下文對象,另一個是事件
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception{
if(evt instanceof IdleStateEvent){
IdleStateEvent event = (IdleStateEvent)evt;
String eventType = null;
switch(event.state()){
case READER_IDLE:
eventType = "讀空閑";
case WRITER_IDLE:
eventType = "寫空閑";
case ALL_IDLE:
eventType = "讀寫空閑";
}
}else{
//繼續(xù)將事件向下一個 handler 傳遞
ctx.
}
}
}
WebSocket 實現(xiàn)與原理分析
WebSocket 是一種規(guī)范娶靡,是 HTML5 規(guī)范的一部分缭受,主要是解決 Http 協(xié)議本身存在的問題娃循∈嗜啵可以實現(xiàn)瀏覽器和服務(wù)端的長連接,連接頭信息只在建立連接時發(fā)送一次聪轿。是在 Http 協(xié)議之上構(gòu)建的爷肝,比如請求連接其實是一個 Http 請求,只不過里面加了一些 WebSocket 信息。也可以用在非瀏覽器場合灯抛,比如 app 上金赦。
Http 是一種無狀態(tài)的基于請求和響應(yīng)的協(xié)議,意思是一定是客戶端想服務(wù)端發(fā)送一個請求对嚼,服務(wù)端給客戶端一個響應(yīng)素邪。Http 1.0 在服務(wù)端給客戶端響應(yīng)后連接就斷了。Http 1.1 增加可 keep-alive猪半,服務(wù)端可以和客戶端在短時間之內(nèi)保持一個連接,某個事件之內(nèi)服務(wù)端和客戶端可以復(fù)用這個鏈接偷线。在這種情況下磨确,網(wǎng)頁聊天就是實現(xiàn)不了的,服務(wù)端的數(shù)據(jù)推送是無法實現(xiàn)的声邦。
以前有一些假的長連接技術(shù)乏奥,比如輪詢,缺點和明顯亥曹,這里就不細說了邓了。
Http 2.0 實現(xiàn)了長連接,但是這不在我們討論范圍之內(nèi)媳瞪。
針對服務(wù)端骗炉,Tomcat 新版本,Spring蛇受,和 Netty 都實現(xiàn)了對 Websocket 的支持句葵。
使用 Netty 對 WebSocket 的支持來實現(xiàn)長連接
其他的部分還是一樣的,先來看服務(wù)端的 WebSocketChannelInitializer兢仰。
public class WebSocketChannelInitializer extends ChannelInitializer<SocketChannel>{
//需要支持 websocket乍丈,我們在 initChannel 是做一點改動
@Override
protected void initChannel(SocketChannel ch) throws Exception{
ChannelPipeline pipeline = ch.pipeline();
//因為 websocket 是基于 http 的,所以要加入 http 相應(yīng)的編解碼器
pipeline.addLast(new HttpServerCodec());
//以塊的方式進行寫的處理器
pipeline.addLast(new ChunkedWriteHandler());
// 進行 http 聚合的處理器把将,將 HttpMessage 和 HttpContent 聚合到 FullHttpRequest 或者
// FullHttpResponse
//HttpObjectAggregator 在基于 netty 的 http 編程使用的非常多轻专,粘包拆包。
pipeline.addLast(new HttpObjectAggregator(8192));
// 針對 websocket 的類,完成 websocket 構(gòu)建的所有繁重工作察蹲,負(fù)責(zé)握手请垛,以及心跳(close,ping洽议,
// pong)的處理叼屠, websocket 通過 frame 幀來傳遞數(shù)據(jù)。
// BinaryWebSocketFrame绞铃,CloseWebSocketFrame镜雨,ContinuationWebSocketFrame,
// PingWebSocketFrame,PongWebSocketFrame荚坞,TextWebSocketFrame挑宠。
// /ws 是 context_path,websocket 協(xié)議標(biāo)準(zhǔn)颓影,ws://server:port/context_path
pipeline.addLast(new WebSocketServerProcotolHandler("/ws"));
pipeline.addLast(new TextWebSocketFrameHandler());
}
}
// websocket 協(xié)議需要用幀來傳遞參數(shù)
public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception{
System.out.println("收到消息:"+ msg.text());
ctx.channel().writeAndFlush(new TextWebSocketFrame("服務(wù)器返回"));
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception{
System.out.println("handlerAdded" + ctx.channel().id.asLongText());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception{
System.out.println("handlerRemoved" + ctx.channel().id.asLongText());
}
}
客戶端我們直接通過瀏覽器的原聲 JS 來寫
<script type="text/javascript">
var socket;
if(window.WebSocket){
socket = new WebSocket("ws://localhost:8899/ws");
socket.onmessage = function(event){
alert(event.data);
}
socket.onopen = function(event){
alert("連接開啟")各淀;
}
socket.onclose = function(event){
alert("連接關(guān)閉");
}
}else{
alert("瀏覽器不支持 WebSocket")诡挂;
}
function send(message){
if(!window.WebSocket){
return;
}
if(socket.readyState == WebSocket.OPEN){
socket.send(message);
}
}
</script>
我們在瀏覽器中通過 F12 看看 Http 協(xié)議升級為 WebSocket 協(xié)議的過程碎浇。