戰(zhàn)士的最高境界,就是不拿盾牌也能開盾墻 --阿利斯塔
在前面的兩篇博文中 使用Netty+Protobuf實現(xiàn)游戲TCP通信 制作一款游戲協(xié)議聯(lián)調(diào)工具 已經(jīng)介紹了Java游戲如何使用Netty和Protobuf實現(xiàn)TCP通信沉迹,本文將實現(xiàn)Java游戲另一種比較常用的通信協(xié)議蒸其,這在當(dāng)今的H5游戲和微信小程序游戲中使用比較廣泛哺窄,即使用Netty和Protobuf實現(xiàn)WebSocket通信字柠,讀者會發(fā)現(xiàn)游戲的WebSocket通信和TCP通信在使用Netty框架后绑榴,它們的代碼結(jié)構(gòu)是不需要怎么改變的孽鸡,這也驗證了使用Netty網(wǎng)絡(luò)框架的另一個優(yōu)點蹂午,即把游戲通信協(xié)議改為另一種時(如TCP改為Websocket),它的改動是比較少的彬碱,如果自己手寫實現(xiàn)網(wǎng)絡(luò)通信豆胸,這樣的改動可能是災(zāi)難性的。此外巷疼,該WebSocket通信實例將會給游戲協(xié)議加一層安全防范機制晚胡,以讓讀者能有一些其他的收獲,即做到循序漸進嚼沿,逐步提高估盘。
在 游戲之網(wǎng)絡(luò)初篇 中已經(jīng)介紹了Websocket和Http的關(guān)系,它其實是Http協(xié)議的升級版骡尽,使用它可以實現(xiàn)web客戶端和服務(wù)器后臺的全雙工通信遣妥。一個Websocket的連接建立過程大致是如下的:客戶端(瀏覽器)首先向服務(wù)端發(fā)起HTTP連接請求,但這個請求中包含了一些與平常HTTP請求不同的附加頭信息攀细,比如會附帶“Upgrade: websocket 和 Connection: Upgrade” 以及Websocket版本信息箫踩,用以表明此HTTP協(xié)議需要升級為Websocket,隨后谭贪,服務(wù)器接收到信息后境钟,如果服務(wù)端也支持Websocket,則會返回一些附帶Websocket的升級信息給客戶端俭识,這樣慨削,客戶端和服務(wù)端的Websocket通信就建立起來了,此后,它們間的通信就可以不用HTTP協(xié)議了理盆,可以直接互發(fā)數(shù)據(jù)了痘煤。
在 使用Netty+Protobuf實現(xiàn)游戲TCP通信 中已經(jīng)介紹過,在網(wǎng)絡(luò)中猿规,數(shù)據(jù)都是以二進制字節(jié)流傳輸?shù)闹钥欤窃谝訵ebsocket通信的游戲中,客戶端(瀏覽器)通常都是處理Json格式的文本協(xié)議的姨俩,因為瀏覽器處理Json非常容易蘸拔,所以,這時客戶端和服務(wù)端在Packet數(shù)據(jù)包中byte[] bytes存儲的實際上是Json文本格式的二進制流(即將類似{"account":xs996,"password":123456,"platform":3}這樣的協(xié)議內(nèi)容編碼成二進制流环葵,而游戲TCP通信通常則是把其中的"xs996,123456,3"編碼成二進制流调窍,可見Websockt的Json格式協(xié)議占用的數(shù)據(jù)包會大點),因此张遭,以Json格式作為內(nèi)容傳輸?shù)亩M制流轉(zhuǎn)為Protobuf的過程是:
String json = new String(packet.getBytes(), Charset.forName("UTF-8"));//Packet轉(zhuǎn)json文本 Message.Builder builder = message.newBuilderForType();//message為具體協(xié)議方法 JsonFormat.merge(json, builder); Message msg = builder.build();//轉(zhuǎn)Protobuf
而TCP通信中通常就是對傳輸數(shù)據(jù)對象編碼成二進制流邓萨,即并不是先轉(zhuǎn)Json再編碼成二進制流,所以它的解碼為Protobuf的過程是:
Message msg = message.newBuilderForType().mergeFrom(packet.getBytes()).build();//轉(zhuǎn)Protobuf菊卷,其中message為具體協(xié)議方法
相應(yīng)的Json文本格式編碼過程如下:
byte[] bytes = JsonFormat.printToString(message).getBytes(Charset.forName("UTF-8")); //message為Protobuf協(xié)議 Packet packet = new Packet(Packet.HEAD_TCP, cmd, bytes);
而TCP通信中普通對象的編碼過程如下:
byte[] bytes = message.toByteArray(); Packet packet = new Packet(Packet.HEAD_TCP, cmd, bytes);
首先看客戶端和服務(wù)端如何接入Websocket的及如何處理握手缔恳,此為本文的重點之一。
先看客戶端的Websocket接入洁闰,核心代碼如下:
NettyWebsocketClient.java
private final CRC16CheckSum checkSum = new CRC16CheckSum();
public void connect(String host, int port){
EventLoopGroup client = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(client);
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//HTTP編解碼器
pipeline.addLast("http_codec", new HttpClientCodec());
//HTTP消息聚合,使用FullHttpResponse和FullHttpRequest到ChannelPipeline中的下一個ChannelHandler歉甚,這就消除了斷裂消息,保證了消息的完整扑眉。
pipeline.addLast("http_aggregator", new HttpObjectAggregator(65536));
pipeline.addLast("protobuf_decoder", new ProtoDecoder(null, 5120));
pipeline.addLast("client_handler", new ClientHandler());
pipeline.addLast("protobuf_encoder", new ProtoEncoder(checkSum, 2048));
}
});
ChannelFuture future;
try {
URI websocketURI = new URI(String.format("ws://%s:%d/", host, port));
HttpHeaders httpHeaders = new DefaultHttpHeaders();
//進行握手
WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(websocketURI, WebSocketVersion.V13, (String)null, true,httpHeaders);
channel = bootstrap.connect(websocketURI.getHost(), websocketURI.getPort()).sync().channel();
ClientHandler handler = (ClientHandler)channel.pipeline().get("client_handler");
handler.setHandshaker(handshaker);
// 通過它構(gòu)造握手響應(yīng)消息返回給客戶端纸泄,
// 同時將WebSocket相關(guān)的編碼和解碼類動態(tài)添加到ChannelPipeline中,用于WebSocket消息的編解碼腰素,
// 添加WebSocketEncoder和WebSocketDecoder之后聘裁,服務(wù)端就可以自動對WebSocket消息進行編解碼了
handshaker.handshake(channel);
//阻塞等待是否握手成功
future = handler.handshakeFuture().sync();
System.out.println("----channel:"+future.channel());
} catch (Exception e) {
e.printStackTrace();
}
//future.channel().closeFuture().awaitUninterruptibly();
}
public void send(Message msg) {
if (channel == null || msg == null || !channel.isWritable()) {
return;
}
int cmd = ProtoManager.getMessageID(msg);
Packet packet = new Packet(Packet.HEAD_TCP, cmd, msg.toByteArray());
channel.writeAndFlush(packet);
}
它的ClientHandler.java因為要處理與服務(wù)端的HTTP握手,及握手成功后數(shù)據(jù)處理耸弄,它的核心代碼如下:
public class ClientHandler extends SimpleChannelInboundHandler<Object> {
WebSocketClientHandshaker handshaker;
ChannelPromise handshakeFuture;
public void handlerAdded(ChannelHandlerContext ctx) {
this.handshakeFuture = ctx.newPromise();
}
public WebSocketClientHandshaker getHandshaker() {
return handshaker;
}
public void setHandshaker(WebSocketClientHandshaker handshaker) {
this.handshaker = handshaker;
}
public ChannelPromise getHandshakeFuture() {
return handshakeFuture;
}
public void setHandshakeFuture(ChannelPromise handshakeFuture) {
this.handshakeFuture = handshakeFuture;
}
public ChannelFuture handshakeFuture() {
return this.handshakeFuture;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
//System.out.println("channelRead0 " + this.handshaker.isHandshakeComplete());
Channel ch = ctx.channel();
FullHttpResponse response;
if (!this.handshaker.isHandshakeComplete()) {
try {
response = (FullHttpResponse)msg;
//握手協(xié)議返回咧虎,設(shè)置結(jié)束握手
this.handshaker.finishHandshake(ch, response);
//設(shè)置成功
this.handshakeFuture.setSuccess();
//System.out.println("WebSocket Client connected! response headers[sec-websocket-extensions]:{}"+response.headers());
} catch (WebSocketHandshakeException var7) {
FullHttpResponse res = (FullHttpResponse)msg;
String errorMsg = String.format("WebSocket Client failed to connect,status:%s,reason:%s", res.status(), res.content().toString(CharsetUtil.UTF_8));
this.handshakeFuture.setFailure(new Exception(errorMsg));
}
} else if (msg instanceof FullHttpResponse) {//1.第一次握手請求消息由HTTP協(xié)議承載,所以它是一個HTTP消息计呈,執(zhí)行handleHttpRequest方法來處理WebSocket握手請求砰诵。
response = (FullHttpResponse)msg;
//this.listener.onFail(response.status().code(), response.content().toString(CharsetUtil.UTF_8));
throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" + response.status() + ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
} else if(msg instanceof Packet){
Packet packet = (Packet)msg;
System.out.println("\n<<<<<<<<<<<<收到服務(wù)端協(xié)議:"+packet.getCmd()+"<<<<<<<<<<<<");
Class<?> clazz = ProtoManager.getRespMap().get(packet.getCmd());
Method m = ClassUtils.findMethod(clazz, "getDefaultInstance");
Message message = (Message) m.invoke(null);
msg = message.newBuilderForType().mergeFrom(packet.getBytes()).build();
ProtoPrinter.print(msg);
}else {//2.客戶端通過socket提交請求消息給服務(wù)端,WebSocketServerHandler接收到的是已經(jīng)解碼后的WebSocketFrame消息捌显。
WebSocketFrame frame = (WebSocketFrame)msg;
if (frame instanceof TextWebSocketFrame) {
TextWebSocketFrame textFrame = (TextWebSocketFrame)frame;
//this.listener.onMessage(textFrame.text());
System.out.println("TextWebSocketFrame");
} else if (frame instanceof BinaryWebSocketFrame) {
BinaryWebSocketFrame binFrame = (BinaryWebSocketFrame)frame;
System.out.println("BinaryWebSocketFrame received------------------------");
} else if (frame instanceof PongWebSocketFrame) {
System.out.println("WebSocket Client received pong");
} else if (frame instanceof CloseWebSocketFrame) {
System.out.println("receive close frame");
//this.listener.onClose(((CloseWebSocketFrame)frame).statusCode(), ((CloseWebSocketFrame)frame).reasonText());
ch.close();
}
}
}
}
這樣茁彭,客戶端的webSocket升級就完成了,再來看服務(wù)端的扶歪。
NettyWebsocketServer.java 它的接入Websocket核心代碼如下:
public class NettyWebsocketServer {
private static final Logger log = LoggerFactory.getLogger(NettyWebsocketServer.class);
private final EventLoopGroup bossGroup;
private final EventLoopGroup workerGroup;
private final ServerBootstrap bootstrap;
private int upLimit = 2048;
private int downLimit = 5120;
//循環(huán)冗余校驗
private final CRC16CheckSum upCheckSum = new CRC16CheckSum();
public NettyWebsocketServer(){
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup(4);
bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 5)
.childOption(ChannelOption.TCP_NODELAY, true);
}
public void bind(String ip, int port) {
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("http-codec", new HttpServerCodec())//HTTP編解碼器
.addLast("aggregator", new HttpObjectAggregator(65536))//HTTP消息聚合
.addLast("websocket", new WebSocketServerProtocolHandler("/", null, true))//處理http升級websocket理肺,還有心跳
.addLast("decoder", new ProtoDecoder(upCheckSum, upLimit))
.addLast("server-handler", new ServerHandler())
.addLast("encoder", new ProtoEncoder(downLimit));
}
});
InetSocketAddress address = new InetSocketAddress(ip, port);
try {
bootstrap.bind(address).sync();
} catch (InterruptedException e) {
log.error("bind {} : {} failed", ip, port, e);
shutdown();
}
}
服務(wù)端的Websocket初始化及握手是通過WebSocketServerProtocolHandler來完成的摄闸,升級后,雙端建立通信通道妹萨,客戶端與服務(wù)端的通信便和以前的《使用Netty+Protobuf實現(xiàn)游戲TCP通信》大同小異了年枕,由此也體現(xiàn)了Netty變更協(xié)議的方便性。
在 游戲如何防刷 一文中乎完,提到了游戲協(xié)議的安全防范熏兄,因為在網(wǎng)絡(luò)傳輸過程中,傳輸?shù)臄?shù)據(jù)是可能被破解和篡改的树姨,游戲中也會如此摩桶,利益的驅(qū)動能使某些人修改協(xié)議,盜刷游戲資源帽揪,這種情況還很常見硝清,因此很有必要對游戲的協(xié)議做一層加密或完整性校驗等防范措施。此為本文的重點之二转晰。
在上面的代碼中芦拿,可以看到編解碼的handler中多了一個CRC16CheckSum對象,它就是用作協(xié)議的完整性校驗的(專業(yè)名詞叫循環(huán)冗余校驗挽霉,該校驗占用兩個字節(jié)防嗡,即包含了一個16位的二進制CRC值变汪,該值由輸入數(shù)據(jù)按照一定規(guī)則計算出來侠坎,然后附加到Packet數(shù)據(jù)包中,接收端在收到數(shù)據(jù)時重新計算該CRC值裙盾,然后與Packet數(shù)據(jù)包中的CRC值進行比較实胸,如果這兩個值不相等,就表示數(shù)據(jù)傳輸發(fā)生了錯誤)番官,它的核心代碼如下:
public class CRC16CheckSum {
public byte[] checksum(byte[] bytes) {
int crc = 0xffff;
for (int i = 0; i < bytes.length; i++) {
if (bytes[i] < 0) {
crc ^= (int) bytes[i] + 256;
} else {
crc ^= (int) bytes[i];
}
for (int j = 0; j < 8; j++) {
if ((crc & 0x0001) != 0) {
crc >>= 1;
crc ^= 0xa001;
} else {
crc >>= 1;
}
}
}
byte[] result = new byte[2];
result[0] = (byte)((crc >> 8) & 0xff);
result[1] = (byte) (crc & 0xff);
return result;
}
public int length() {
return 2;
}
}
在客戶端編碼時庐完,核心代碼如下:
ProtoEncoder.java
public class ProtoEncoder extends ChannelOutboundHandlerAdapter {
private static final Logger log = LoggerFactory.getLogger(ProtoEncoder.class);
public static final AttributeKey<Short> SEND_SID = AttributeKey.valueOf("SEND_SID");
private final int limit;
private final CRC16CheckSum checkSum;
public ProtoEncoder(CRC16CheckSum checkSum, int limit) {
this.checkSum = checkSum;
this.limit = limit;
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof Packet) {
Packet packet = (Packet) msg;
if (packet.getBytes().length > limit && log.isWarnEnabled())
log.warn("cmd[{}], packet size[{}], is over limit[{}]", packet.getCmd(), packet.getBytes().length, limit);
if (checkSum == null) {
int size = 7 + packet.getBytes().length;
ByteBuf buf = ctx.alloc().buffer(size);
try {
buf.writeByte(packet.getHead());
buf.writeShort(packet.getBytes().length + 4);
buf.writeInt(packet.getCmd());
buf.writeBytes(packet.getBytes());
msg = new BinaryWebSocketFrame(buf);
} catch (Exception e) {
buf.release();
throw e;
}
} else {
int size = 7 + packet.getBytes().length + checkSum.length();
ByteBuf buf = ctx.alloc().buffer(size);
try {
buf.writeByte(packet.getHead());
size = 2 + 2 + 4 + packet.getBytes().length;
ByteBuf temp = Unpooled.buffer(size, size);
temp.writeShort(getSid(ctx));
temp.writeShort(packet.getBytes().length + 4);
temp.writeInt(packet.getCmd());
temp.writeBytes(packet.getBytes());
byte[] check = checkSum.checksum(temp.array());
buf.writeBytes(check);
buf.writeBytes(temp);
temp.release();
msg = new BinaryWebSocketFrame(buf);
} catch (Exception e) {
buf.release();
throw e;
}
}
}
super.write(ctx, msg, promise);
}
private short getSid(ChannelHandlerContext ctx) {
Attribute<Short> attr = ctx.channel().attr(SEND_SID);
if (attr.get() == null) {
attr.set((short)1);
return 1;
}
short sid = (short)(attr.get() + 1);
if (sid == Short.MAX_VALUE) {
attr.set((short)0);
} else {
attr.set(sid);
}
return sid;
}
}
在服務(wù)端解碼時核心代碼如下:
ProtoDecoder.java
public class ProtoDecoder extends ChannelInboundHandlerAdapter{
public static final AttributeKey<Short> RECV_SID = AttributeKey.valueOf("RECV_SID");
private final int limit;
private final CRC16CheckSum checkSum;
public ProtoDecoder(CRC16CheckSum checkSum, int limit) {
this.limit = limit;
this.checkSum = checkSum;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof BinaryWebSocketFrame) {
BinaryWebSocketFrame frame = (BinaryWebSocketFrame)msg;
try {
ByteBuf in = frame.content();
if (checkSum == null) {
if (in.readableBytes() < 7)
throw new IllegalArgumentException();
byte head = in.readByte();
short length = in.readShort();
if (length <= 0 || length > limit)
throw new IllegalArgumentException();
int cmd = in.readInt();
if (in.readableBytes() < length - 4)
throw new IllegalArgumentException();
byte[] bytes = new byte[length - 4];
in.readBytes(bytes);
ctx.fireChannelRead(new Packet(head, cmd, bytes));
} else {
if (in.readableBytes() < 7 + checkSum.length())
throw new IllegalArgumentException();
in.markReaderIndex();
byte head = in.readByte();
byte[] orig = new byte[checkSum.length()];
in.readBytes(orig);
short sid = in.readShort();
if (!checkSid(ctx, sid))
throw new IllegalArgumentException();
short length = in.readShort();
if (length <= 0 || length > limit)
throw new IllegalArgumentException();
int cmd = in.readInt();
if (in.readableBytes() < length - 4)
throw new IllegalArgumentException();
byte[] bytes = new byte[length - 4];
in.readBytes(bytes);
byte[] check = new byte[2 + 2 + length];
in.resetReaderIndex();
in.skipBytes(1 + checkSum.length());
in.readBytes(check);
//檢驗循環(huán)冗余檢驗碼是否一致,不是徘熔,則拋棄該協(xié)議
byte[] compare = checkSum.checksum(check);
for (int i = 0; i < orig.length; i++) {
if (orig[i] != compare[i]) {
throw new IllegalArgumentException();
}
}
ctx.fireChannelRead(new Packet(head, sid, cmd, bytes));
}
return;
} finally {
frame.release();
}
}
ctx.fireChannelRead(msg);
}
private boolean checkSid(ChannelHandlerContext ctx, short sid) {
Attribute<Short> attr = ctx.channel().attr(RECV_SID);
if (attr.get() == null) {
attr.set((short)1);
return sid == 1;
}
if (sid != attr.get() + 1)
return false;
if (sid == Short.MAX_VALUE)
attr.set((short)0);
else
attr.set(sid);
return true;
}
}
做了這一層校驗后门躯,在一定程度上防止了協(xié)議被篡改的可能。此外酷师,還有CRC32讶凉,MD5都可以用作協(xié)議完整性校驗,感興趣的讀者可以在網(wǎng)絡(luò)上搜索這兩種實現(xiàn)山孔,一大把懂讯,CRC16在游戲中足以夠用。
此外台颠,通常游戲給協(xié)議做安全防范還有一種方法褐望,在上面代碼中也體現(xiàn)出來了,就是客戶端和服務(wù)端的建立連接的channel可以維護一個私有的協(xié)議序號,每請求一條協(xié)議瘫里,該序號就遞增1醒串,如果兩端序號不相等芹务,說明可能是沒經(jīng)過客戶端程序發(fā)上來的協(xié)議,因此把它丟棄掉,這也是一種簡單的辦法炎功。通常來說,這兩種防范已經(jīng)夠用了畔派,在Websocket游戲通信中抖甘,通常還會做SSL認證(廣泛用于Web瀏覽器與服務(wù)器之間的身份認證和加密數(shù)據(jù)傳輸),Netty對SSL也有很好的支持闷尿,生成證書后塑径,Netty的核心代碼為:
KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance("SunX509");
KeyStore keyStore = KeyStore.getInstance("JKS");
keyStore.load(new FileInputStream(sslKey), sslPass.toCharArray());
keyManagerFactory.init(keyStore,sslPass.toCharArray());
SslContext sslContext = SslContextBuilder.forServer(keyManagerFactory).build();
SSLEngine engine = sslContext.newEngine(ch.alloc());
engine.setUseClientMode(false);
ch.pipeline().addFirst("ssl", new SslHandler(engine));
至此,游戲協(xié)議的安全防范機制就介紹完畢了填具。
該實例源碼在github的地址為:
https://github.com/zhou-hj/NettyProtobufWebsocket.git