網(wǎng)絡模型
前后端都采用epoll/select模型
協(xié)議
測試用json簡單測試劈狐,發(fā)送信息包含2位消息長度 最大包含65535長度數(shù)據(jù) 用下面協(xié)議定制
image.png
協(xié)議代碼
msgbase
using Newtonsoft.Json;
using System;
public class MsgBase
{
public string protoName = "null";
//編碼
public static byte[] Encode(MsgBase msgBase)
{
string s = JsonConvert.SerializeObject(msgBase);
return System.Text.Encoding.UTF8.GetBytes(s);
}
//解碼
public static MsgBase Decode(string protoName, byte[] bytes, int offset, int count)
{
string s = System.Text.Encoding.UTF8.GetString(bytes, offset, count);
MsgBase msgBase = (MsgBase)JsonConvert.DeserializeObject(s, Type.GetType(protoName));
return msgBase;
}
//編碼協(xié)議名(2字節(jié)長度+字符串)
public static byte[] EncodeName(MsgBase msgBase)
{
//名字bytes和長度
byte[] nameBytes = System.Text.Encoding.UTF8.GetBytes(msgBase.protoName);
Int16 len = (Int16)nameBytes.Length;
//申請bytes數(shù)值
byte[] bytes = new byte[2 + len];
//組裝2字節(jié)的長度信息
bytes[0] = (byte)(len % 256);
bytes[1] = (byte)(len / 256);
//組裝名字bytes
Array.Copy(nameBytes, 0, bytes, 2, len);
return bytes;
}
//解碼協(xié)議名(2字節(jié)長度+字符串)
public static string DecodeName(byte[] bytes, int offset, out int count)
{
count = 0;
//必須大于2字節(jié)
if (offset + 2 > bytes.Length)
{
return "";
}
//讀取長度
Int16 len = (Int16)((bytes[offset + 1] << 8) | bytes[offset]);
if (len <= 0)
{
return "";
}
//長度必須足夠
if (offset + 2 + len > bytes.Length)
{
return "";
}
//解析
count = 2 + len;
string name = System.Text.Encoding.UTF8.GetString(bytes, offset + 2, len);
return name;
}
}
c#客戶端
封裝一個好用的byteArr
public class ByteArray
{
//默認大小
const int DEFAULT_SIZE = 1024;
//初始大小
int initSize = 0;
//緩沖區(qū)
public byte[] bytes;
//讀寫位置
public int readIdx = 0;
public int writeIdx = 0;
//容量
private int capacity = 0;
//剩余空間
public int remain { get { return capacity - writeIdx; } }
//數(shù)據(jù)長度
public int length { get { return writeIdx - readIdx; } }
//構造函數(shù)
public ByteArray(int size = DEFAULT_SIZE)
{
bytes = new byte[size];
capacity = size;
initSize = size;
readIdx = 0;
writeIdx = 0;
}
//構造函數(shù)
public ByteArray(byte[] defaultBytes)
{
bytes = defaultBytes;
capacity = defaultBytes.Length;
initSize = defaultBytes.Length;
readIdx = 0;
writeIdx = defaultBytes.Length;
}
//重設尺寸
public void ReSize(int size)
{
if (size < length) return;
if (size < initSize) return;
int n = 1;
while (n < size) n *= 2;
capacity = n;
byte[] newBytes = new byte[capacity];
Array.Copy(bytes, readIdx, newBytes, 0, writeIdx - readIdx);
bytes = newBytes;
writeIdx = length;
readIdx = 0;
}
//寫入數(shù)據(jù)
public int Write(byte[] bs, int offset, int count)
{
if (remain < count)
{
ReSize(length + count);
}
Array.Copy(bs, offset, bytes, writeIdx, count);
writeIdx += count;
return count;
}
//讀取數(shù)據(jù)
public int Read(byte[] bs, int offset, int count)
{
count = Math.Min(count, length);
Array.Copy(bytes, 0, bs, offset, count);
readIdx += count;
CheckAndMoveBytes();
return count;
}
//檢查并移動數(shù)據(jù)
public void CheckAndMoveBytes()
{
if (length < 8)
{
MoveBytes();
}
}
//移動數(shù)據(jù)
public void MoveBytes()
{
if (length > 0)
{
Array.Copy(bytes, readIdx, bytes, 0, length);
}
writeIdx = length;
readIdx = 0;
}
//打印緩沖區(qū)
public override string ToString()
{
return BitConverter.ToString(bytes, readIdx, length);
}
//打印調試信息
public string Debug()
{
return string.Format("readIdx({0}) writeIdx({1}) bytes({2})",
readIdx,
writeIdx,
BitConverter.ToString(bytes, 0, capacity)
);
}
}
客戶端核心代碼
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net.Sockets;
namespace NetWorkUtils.client
{
public static class NetManager
{
//定義套接字
static Socket socket;
//接收緩沖區(qū)
static ByteArray readBuff;
//寫入隊列
static Queue<ByteArray> writeQueue;
//是否正在連接
static bool isConnecting = false;
//是否正在關閉
static bool isClosing = false;
//消息列表
static List<MsgBase> msgList = new List<MsgBase>();
//消息列表長度
static int msgCount = 0;
//每一次Update處理的消息量
readonly static int MAX_MESSAGE_FIRE = 10;
//是否啟用心跳
public static bool isUsePing = true;
//心跳間隔時間
public static int pingInterval = 30;
//上一次發(fā)送PING的時間
static long lastPingTime = 0;
//上一次收到PONG的時間
static long lastPongTime = 0;
//事件
public enum NetEvent
{
ConnectSucc = 1,
ConnectFail = 2,
Close = 3,
}
//事件委托類型
public delegate void EventListener(String err);
//事件監(jiān)聽列表
private static Dictionary<NetEvent, EventListener> eventListeners = new Dictionary<NetEvent, EventListener>();
//添加事件監(jiān)聽
public static void AddEventListener(NetEvent netEvent, EventListener listener)
{
//添加事件
if (eventListeners.ContainsKey(netEvent))
{
eventListeners[netEvent] += listener;
}
//新增事件
else
{
eventListeners[netEvent] = listener;
}
}
//刪除事件監(jiān)聽
public static void RemoveEventListener(NetEvent netEvent, EventListener listener)
{
if (eventListeners.ContainsKey(netEvent))
{
eventListeners[netEvent] -= listener;
}
}
//分發(fā)事件
private static void FireEvent(NetEvent netEvent, String err)
{
if (eventListeners.ContainsKey(netEvent))
{
eventListeners[netEvent](err);
}
}
//消息委托類型
public delegate void MsgListener(MsgBase msgBase);
//消息監(jiān)聽列表
private static Dictionary<string, MsgListener> msgListeners = new Dictionary<string, MsgListener>();
//添加消息監(jiān)聽
public static void AddMsgListener(string msgName, MsgListener listener)
{
//添加
if (msgListeners.ContainsKey(msgName))
{
msgListeners[msgName] += listener;
}
//新增
else
{
msgListeners[msgName] = listener;
}
}
//刪除消息監(jiān)聽
public static void RemoveMsgListener(string msgName, MsgListener listener)
{
if (msgListeners.ContainsKey(msgName))
{
msgListeners[msgName] -= listener;
}
}
//分發(fā)消息
private static void FireMsg(string msgName, MsgBase msgBase)
{
if (msgListeners.ContainsKey(msgName))
{
msgListeners[msgName](msgBase);
}
}
//連接
public static void Connect(string ip, int port)
{
//狀態(tài)判斷
if (socket != null && socket.Connected)
{
Debug.WriteLine("Connect fail, already connected!");
return;
}
if (isConnecting)
{
Debug.WriteLine("Connect fail, isConnecting");
return;
}
//初始化成員
InitState();
//參數(shù)設置
socket.NoDelay = true;
//Connect
isConnecting = true;
socket.BeginConnect(ip, port, ConnectCallback, socket);
}
//初始化狀態(tài)
private static void InitState()
{
//Socket
socket = new Socket(AddressFamily.InterNetwork,
SocketType.Stream, ProtocolType.Tcp);
//接收緩沖區(qū)
readBuff = new ByteArray();
//寫入隊列
writeQueue = new Queue<ByteArray>();
//是否正在連接
isConnecting = false;
//是否正在關閉
isClosing = false;
//消息列表
msgList = new List<MsgBase>();
//消息列表長度
msgCount = 0;
long time = GetTimeStamp();
//上一次發(fā)送PING的時間
lastPingTime = time;
//上一次收到PONG的時間
lastPongTime = time;
//監(jiān)聽PONG協(xié)議
if (!msgListeners.ContainsKey("MsgPong"))
{
AddMsgListener("MsgPong", OnMsgPong);
}
}
//Connect回調
private static void ConnectCallback(IAsyncResult ar)
{
try
{
Socket socket = (Socket)ar.AsyncState;
socket.EndConnect(ar);
Debug.WriteLine("Socket Connect Succ ");
FireEvent(NetEvent.ConnectSucc, "");
isConnecting = false;
//開始接收
socket.BeginReceive(readBuff.bytes, readBuff.writeIdx,
readBuff.remain, 0, ReceiveCallback, socket);
}
catch (SocketException ex)
{
Debug.WriteLine("Socket Connect fail " + ex.ToString());
FireEvent(NetEvent.ConnectFail, ex.ToString());
isConnecting = false;
}
}
//關閉連接
public static void Close()
{
//狀態(tài)判斷
if (socket == null || !socket.Connected)
{
return;
}
if (isConnecting)
{
return;
}
//還有數(shù)據(jù)在發(fā)送
if (writeQueue.Count > 0)
{
isClosing = true;
}
//沒有數(shù)據(jù)在發(fā)送
else
{
socket.Close();
FireEvent(NetEvent.Close, "");
}
}
//發(fā)送數(shù)據(jù)
public static void Send(MsgBase msg)
{
//狀態(tài)判斷
if (socket == null || !socket.Connected)
{
return;
}
if (isConnecting)
{
return;
}
if (isClosing)
{
return;
}
//數(shù)據(jù)編碼
byte[] nameBytes = MsgBase.EncodeName(msg);
byte[] bodyBytes = MsgBase.Encode(msg);
int len = nameBytes.Length + bodyBytes.Length;
byte[] sendBytes = new byte[2 + len];
//組裝長度
sendBytes[0] = (byte)(sendBytes.Length % 256);
sendBytes[1] = (byte)(sendBytes.Length / 256);
//組裝名字
Array.Copy(nameBytes, 0, sendBytes, 2, nameBytes.Length);
//組裝消息體
Array.Copy(bodyBytes, 0, sendBytes, 2 + nameBytes.Length, bodyBytes.Length);
//寫入隊列
ByteArray ba = new ByteArray(sendBytes);
int count = 0; //writeQueue的長度
lock (writeQueue)
{
writeQueue.Enqueue(ba);
count = writeQueue.Count;
}
//send
if (count == 1)
{
socket.BeginSend(sendBytes, 0, sendBytes.Length,
0, SendCallback, socket);
}
}
//Send回調
public static void SendCallback(IAsyncResult ar)
{
//獲取state、EndSend的處理
Socket socket = (Socket)ar.AsyncState;
//狀態(tài)判斷
if (socket == null || !socket.Connected)
{
return;
}
//EndSend
int count = socket.EndSend(ar);
//獲取寫入隊列第一條數(shù)據(jù)
ByteArray ba;
lock (writeQueue)
{
ba = writeQueue.FirstOrDefault();
}
//完整發(fā)送
ba.readIdx += count;
if (ba.length == 0)
{
lock (writeQueue)
{
writeQueue.Dequeue();
ba = writeQueue.FirstOrDefault();
}
}
//繼續(xù)發(fā)送
if (ba != null)
{
socket.BeginSend(ba.bytes, ba.readIdx, ba.length,
0, SendCallback, socket);
}
//正在關閉
else if (isClosing)
{
socket.Close();
}
}
//Receive回調
public static void ReceiveCallback(IAsyncResult ar)
{
try
{
Socket socket = (Socket)ar.AsyncState;
//獲取接收數(shù)據(jù)長度
int count = socket.EndReceive(ar);
readBuff.writeIdx += count;
//處理二進制消息
OnReceiveData();
//繼續(xù)接收數(shù)據(jù)
if (readBuff.remain < 8)
{
readBuff.MoveBytes();
readBuff.ReSize(readBuff.length * 2);
}
socket.BeginReceive(readBuff.bytes, readBuff.writeIdx,
readBuff.remain, 0, ReceiveCallback, socket);
}
catch (SocketException ex)
{
Debug.WriteLine("Socket Receive fail" + ex.ToString());
}
}
//數(shù)據(jù)處理
public static void OnReceiveData()
{
//消息長度
if (readBuff.length <= 2)
{
return;
}
//獲取消息體長度
int readIdx = readBuff.readIdx;
byte[] bytes = readBuff.bytes;
Int16 bodyLength = (Int16)((bytes[readIdx + 1] << 8) | bytes[readIdx]);
if (readBuff.length < bodyLength)
return;
//先去掉2個消息長度信息
readBuff.readIdx += 2;
//解析協(xié)議名
int nameCount = 0;
string protoName = MsgBase.DecodeName(readBuff.bytes, readBuff.readIdx, out nameCount);
if (protoName == "")
{
Debug.WriteLine("OnReceiveData MsgBase.DecodeName fail");
return;
}
readBuff.readIdx += nameCount;
//解析協(xié)議體
int bodyCount = bodyLength - nameCount - 2;
MsgBase msgBase = MsgBase.Decode(protoName, readBuff.bytes, readBuff.readIdx, bodyCount);
readBuff.readIdx += bodyCount;
readBuff.CheckAndMoveBytes();
//添加到消息隊列
lock (msgList)
{
msgList.Add(msgBase);
msgCount++;
}
//繼續(xù)讀取消息
if (readBuff.length > 2)
{
OnReceiveData();
}
}
//Update
public static void Update()
{
MsgUpdate();
PingUpdate();
}
//更新消息
public static void MsgUpdate()
{
//初步判斷叽掘,提升效率
if (msgCount == 0)
{
return;
}
//重復處理消息
for (int i = 0; i < MAX_MESSAGE_FIRE; i++)
{
//獲取第一條消息
MsgBase msgBase = null;
lock (msgList)
{
if (msgList.Count > 0)
{
msgBase = msgList[0];
msgList.RemoveAt(0);
msgCount--;
}
}
//分發(fā)消息
if (msgBase != null)
{
FireMsg(msgBase.protoName, msgBase);
}
//沒有消息了
else
{
break;
}
}
}
//發(fā)送PING協(xié)議
private static void PingUpdate()
{
//是否啟用
if (!isUsePing)
{
return;
}
//發(fā)送PING
long time = GetTimeStamp();
if (time - lastPingTime > pingInterval)
{
MsgPing msgPing = new MsgPing();
Send(msgPing);
lastPingTime = time;
}
//檢測PONG時間
if (time - lastPongTime > pingInterval * 4)
{
Close();
}
}
//監(jiān)聽PONG協(xié)議
private static void OnMsgPong(MsgBase msgBase)
{
lastPongTime = GetTimeStamp();
}
public static long GetTimeStamp()
{
TimeSpan ts = DateTime.UtcNow - new DateTime(1970, 1, 1, 0, 0, 0, 0);
return Convert.ToInt64(ts.TotalSeconds);
}
}
}
java netty服務端
測試期間抚岗,簡單將受到的信息返回pong
maven pom
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId> <!-- Use 'netty-all' for 4.0 or above -->
<version>4.1.53.Final</version>
<scope>compile</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.74</version>
</dependency>
nettyserver
public class NettyServer {
// 通過nio方式來接收連接和處理連接
private EventLoopGroup bg =
new NioEventLoopGroup();
private EventLoopGroup wg =
new NioEventLoopGroup();
// 啟動引導器
private ServerBootstrap b =
new ServerBootstrap();
public void run() {
//1 設置reactor 線程
b.group(bg, wg);
//2 設置nio類型的channel
b.channel(NioServerSocketChannel.class);
//3 設置監(jiān)聽端口
String ip = "127.0.0.1";
b.localAddress(new InetSocketAddress(ip, 8888));
//4 設置通道選項
b.option(ChannelOption.SO_KEEPALIVE, true);
b.option(ChannelOption.ALLOCATOR,
PooledByteBufAllocator.DEFAULT);
//5 裝配流水線
b.childHandler(new ChannelInitializer<SocketChannel>() {
//有連接到達時會創(chuàng)建一個channel
protected void initChannel(SocketChannel ch) throws Exception {
// 管理pipeline中的Handler
ch.pipeline().addLast(new MyDecode());
ch.pipeline().addLast(new MyDecoderName());
}
});
// 6 開始綁定server
// 通過調用sync同步方法阻塞直到綁定成功
ChannelFuture channelFuture = null;
boolean isStart = false;
while (!isStart) {
try {
channelFuture = b.bind().sync();
System.out.println("server啟動, 端口為: " +
channelFuture.channel().localAddress());
isStart = true;
} catch (Exception e) {
e.printStackTrace();
}
}
try {
// 7 監(jiān)聽通道關閉事件
// 應用程序會一直等待膝昆,直到channel關閉
ChannelFuture closeFuture =
channelFuture.channel().closeFuture();
closeFuture.sync();
} catch (
Exception e) {
e.printStackTrace();
} finally {
// 8 優(yōu)雅關閉EventLoopGroup娃弓,
// 釋放掉所有資源包括創(chuàng)建的線程
wg.shutdownGracefully();
bg.shutdownGracefully();
}
}
}
mydecode
public class MyDecode extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
if(byteBuf.readableBytes()<2){
return;
}
//記錄當前ByteBuf的讀指針位置烛恤,以便下面取報文長度字節(jié)
//pos是一個完整報文的開始位置,報文整體會在ByteBuf中移動抵皱,類似內存管理善榛,所以基于字節(jié)的判斷報文長度等等,都是基于pos呻畸,否則可以在byteBuf.readBytes()之后加移盆,byteBuf.discardReadBytes();整理ByteBuf,使pos回到0開始位置
int pos = byteBuf.readerIndex();
System.out.println(pos);
int msgLen = ((byteBuf.getByte(pos +1))<<8) | (byteBuf.getByte(pos));
System.out.println(byteBuf.getByte(pos +1));
System.out.println(byteBuf.getByte(pos));
System.out.println(msgLen);
//收到的報文長度不足一個完整的報文伤为,繼續(xù)接收
if(byteBuf.readableBytes()<msgLen){
return;
}
byte[] msgContent = new byte[msgLen+2];
byteBuf.readBytes(msgContent);
//提出完整報文(readBytes讀到msg中)咒循,放到list給下一個Handler處理
if(msgLen>0){
list.add(msgContent);
}
}
}
handle
public class MyDecoderName extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("receive");
String s = new String((byte[]) msg);
System.out.println(s);
Pong pong = new Pong();
// ctx.writeAndFlush("hello fuck");
/* ByteBuf byteBuf = (ByteBuf) msg;
// 判斷報文長度
int bufLen = byteBuf.readableBytes();
System.out.println("msg:" + bufLen);*/
//業(yè)務的報文處理
//發(fā)送消息如果不是bytebuf則無法發(fā)出
ctx.writeAndFlush(Unpooled.wrappedBuffer(pong.decode()));
}
static class Pong{
private String protoName = "MsgPong";
/**
* @return the String
* @author: yangniuhaojiang
* @title: getProtoName
* @description: update_version: update_date: update_author: update_note:
*/
public String getProtoName() {
return protoName;
}
/**
* @param protoName the String to set
* @author: yangniuhaojiang
* @title: setProtoName
* @description: update_version: update_date: update_author: update_note:
*/
public void setProtoName(String protoName) {
this.protoName = protoName;
}
byte[] decode() {
String body = JSON.toJSONString(this);
byte[] bodyBytes = body.getBytes();
int bodylen = bodyBytes.length;
//名字bytes和長度
byte[] nameBytes1 = this.protoName.getBytes();
int len = nameBytes1.length;
//申請bytes數(shù)值
byte[] nameBytes = new byte[2 + len];
//組裝2字節(jié)的長度信息
nameBytes[0] = (byte) (len % 256);
nameBytes[1] = (byte) (len / 256);
System.arraycopy(nameBytes1, 0, nameBytes, 2, nameBytes1.length);
//組裝名字bytes
byte[] bytes = new byte[2 + bodylen + nameBytes.length];
bytes[0] = (byte) (bytes.length % 256);
bytes[1] = (byte) (bytes.length / 256);
System.arraycopy(nameBytes, 0, bytes, 2, nameBytes.length);
//組裝消息體
System.arraycopy(bodyBytes, 0, bytes, 2 + nameBytes.length, bodyBytes.length);
return bytes;
}
}
}
最終測試
服務端啟動netty
客戶端發(fā)送任意消息后收到pong信息
參考書籍
《Unity3d網(wǎng)絡游戲實戰(zhàn)(第二版)》
《Netty,Redis,Zookeeper高并發(fā)實戰(zhàn)》