netty實(shí)現(xiàn)
一:實(shí)現(xiàn)原理
實(shí)現(xiàn)原理:
??長(zhǎng)連接的維持是要客戶端程序定時(shí)向服務(wù)端程序發(fā)送一個(gè)維持連接包的。如果長(zhǎng)時(shí)間未發(fā)送維持連接包,服務(wù)端程序?qū)嚅_(kāi)連接牲尺。
客戶端:
??Client通過(guò)持有Socket的對(duì)象甥啄,可以隨時(shí)(使用sendObject方法)發(fā)送Massage Object(消息)給服務(wù)端咒精。如果keepAliveDelay毫秒(程序中是2秒)內(nèi)未發(fā)送任何數(shù)據(jù)锌钮,則自動(dòng)發(fā)送一個(gè)KeepAlive Object(心跳)給服務(wù)端陨闹,用于維持連接樟凄。
??由于我們向服務(wù)端可以發(fā)送很多不同的消息對(duì)象聘芜,服務(wù)端也可以返回不同的對(duì)象。所以對(duì)于返回對(duì)象的處理缝龄,要編寫具體的ObjectAction實(shí)現(xiàn)類進(jìn)行處理汰现。通過(guò)Client.addActionMap方法進(jìn)行添加。這樣叔壤,程序會(huì)回調(diào)處理瞎饲。
服務(wù)端:
??由于客戶端會(huì)定時(shí)(keepAliveDelay毫秒)發(fā)送維持連接的信息過(guò)來(lái),所以服務(wù)端要有一個(gè)檢測(cè)機(jī)制炼绘。即當(dāng)服務(wù)端receiveTimeDelay毫秒(程序中是3秒)內(nèi)未接收任何數(shù)據(jù)嗅战,則自動(dòng)斷開(kāi)與客戶端的連接(直接調(diào)用socket的close方法,關(guān)閉掉socket即可)俺亮。ActionMapping的原理與客戶端相似(相同)驮捍。通過(guò)添加相應(yīng)的ObjectAction實(shí)現(xiàn)類,可以實(shí)現(xiàn)不同對(duì)象的響應(yīng)脚曾、應(yīng)答過(guò)程东且。
二:實(shí)現(xiàn)代碼示例
1:定義傳輸對(duì)象
public class KeepAlive implements Serializable {
private static final long serialVersionUID = 5731607955571883638L;
@Override
public String toString() {
return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + "維持連接包";
}
}
2:定義server端
/**
* <p>
* 服務(wù)端接口請(qǐng)求叛甫,并響應(yīng)争便。
* 心跳方式:超過(guò)3s未收到client響應(yīng),則認(rèn)為client端已經(jīng)斷開(kāi)审姓,則此時(shí)斷開(kāi)server端囤踩。
* </p>
*/
public class Server {
public static void main(String[] args) {
int port = 65432;
Server server = new Server(port);
server.start();
}
public interface ObjectAction {
Object action(Object rev, Server server);
}
public static final class DefaultObjectAction implements ObjectAction {
@Override
public Object action(Object rev, Server server) {
System.out.println("處理并返回:" + rev);
return rev;
}
}
private int port;
private volatile boolean running = false;
private long receiveTimeDelay = 3000;
/**
* 存儲(chǔ)接收到對(duì)象和對(duì)象類型的映射旨椒。
* 對(duì)象類型 <==> 對(duì)象。
* 因此client和server可以發(fā)送不同類型的對(duì)象消息
*/
private ConcurrentHashMap<Class, ObjectAction> actionMapping = new ConcurrentHashMap<Class, ObjectAction>();
/**
* 監(jiān)聽(tīng)線程堵漱,用于接口client端的心跳連接
*/
private Thread connWatchDog;
public Server(int port) {
this.port = port;
}
public void start() {
if (running) {
return;
}
running = true;
connWatchDog = new Thread(new ConnWatchDog());
connWatchDog.start();
}
@SuppressWarnings("deprecation")
public void stop() {
if (running) {
running = false;
}
if (connWatchDog != null) {
connWatchDog.stop();
}
}
public void addActionMap(Class<Object> cls, ObjectAction action) {
actionMapping.put(cls, action);
}
class ConnWatchDog implements Runnable {
@Override
public void run() {
try {
ServerSocket ss = new ServerSocket(port, 5);
while (running) {
// 阻塞等待client端發(fā)生的socket综慎,消息均會(huì)封裝為socket
Socket s = ss.accept();
new Thread(new SocketAction(s)).start();
}
} catch (IOException e) {
Server.this.stop();
}
}
}
class SocketAction implements Runnable {
Socket s;
boolean run = true;
long lastReceiveTime = System.currentTimeMillis();
public SocketAction(Socket s) {
this.s = s;
}
@Override
public void run() {
while (running && run) {
// 若超過(guò)3s未收到客戶端發(fā)送的數(shù)據(jù),則認(rèn)為客戶端已經(jīng)斷開(kāi)勤庐,那么斷開(kāi)服務(wù)端示惊。
if (System.currentTimeMillis() - lastReceiveTime > receiveTimeDelay) {
overThis();
} else {
try {
InputStream in = s.getInputStream();
if (in.available() > 0) {
ObjectInputStream ois = new ObjectInputStream(in);
Object obj = ois.readObject();
lastReceiveTime = System.currentTimeMillis();
System.out.println("接收:" + obj);
ObjectAction oa = actionMapping.get(obj.getClass());
oa = oa == null ? new DefaultObjectAction() : oa;
Object out = oa.action(obj, Server.this);
if (out != null) {
ObjectOutputStream oos = new ObjectOutputStream(s.getOutputStream());
oos.writeObject(out);
oos.flush();
}
} else {
Thread.sleep(10);
}
} catch (Exception e) {
e.printStackTrace();
overThis();
}
}
}
}
private void overThis() {
if (run) {
run = false;
}
if (s != null) {
try {
s.close();
} catch (IOException e) {
e.printStackTrace();
}
}
System.out.println("關(guān)閉:" + s.getRemoteSocketAddress());
}
}
}
3:定義client端
/**
* <p>
* 客戶端接口請(qǐng)求,并接口愉镰。
* 心跳方式:2s/次
* </p>
*/
public class Client {
public static void main(String[] args) throws UnknownHostException, IOException {
String serverIp = "127.0.0.1";
int port = 65432;
Client client = new Client(serverIp, port);
client.start();
}
public interface ObjectAction {
void action(Object obj, Client client);
}
public static final class DefaultObjectAction implements ObjectAction {
@Override
public void action(Object obj, Client client) {
System.out.println("處理:" + obj.toString());
}
}
private String serverIp;
private int port;
private Socket socket;
/**
* 連接狀態(tài)
*/
private boolean running = false;
/**
* 最后一次發(fā)送數(shù)據(jù)的時(shí)間
*/
private long lastSendTime;
/**
* 用于保存接收消息對(duì)象類型及該類型消息處理的對(duì)象
*/
private ConcurrentHashMap<Class, ObjectAction> actionMapping = new ConcurrentHashMap<Class, ObjectAction>();
public Client(String serverIp, int port) {
this.serverIp = serverIp;
this.port = port;
}
public void start() throws IOException {
if (running) {
return;
}
socket = new Socket(serverIp, port);
System.out.println("本地端口:" + socket.getLocalPort());
lastSendTime = System.currentTimeMillis();
running = true;
/**
* 保持長(zhǎng)連接的線程米罚,每隔2秒項(xiàng)服務(wù)器發(fā)一個(gè)一個(gè)保持連接的心跳消息
*/
new Thread(new KeepAliveWatchDog()).start();
/**
* 接受服務(wù)端消息的線程,處理消息
*/
new Thread(new ReceiveWatchDog()).start();
}
public void stop() {
if (running) {
running = false;
}
}
/**
* 添加接收對(duì)象的處理對(duì)象丈探。
*
* @param cls 待處理的對(duì)象录择,其所屬的類。
* @param action 處理過(guò)程對(duì)象碗降。
*/
public void addActionMap(Class<Object> cls, ObjectAction action) {
actionMapping.put(cls, action);
}
public void sendObject(Object obj) throws IOException {
ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
oos.writeObject(obj);
System.out.println("發(fā)送:\t" + obj);
oos.flush();
}
class KeepAliveWatchDog implements Runnable {
long checkDelay = 10;
long keepAliveDelay = 2000;
@Override
public void run() {
while (running) {
if (System.currentTimeMillis() - lastSendTime > keepAliveDelay) {
try {
Client.this.sendObject(new KeepAlive());
} catch (IOException e) {
e.printStackTrace();
Client.this.stop();
}
lastSendTime = System.currentTimeMillis();
} else {
try {
Thread.sleep(checkDelay);
} catch (InterruptedException e) {
e.printStackTrace();
Client.this.stop();
}
}
}
}
}
class ReceiveWatchDog implements Runnable {
@Override
public void run() {
while (running) {
try {
InputStream in = socket.getInputStream();
if (in.available() > 0) {
ObjectInputStream ois = new ObjectInputStream(in);
Object obj = ois.readObject();
System.out.println("接收:" + obj);
ObjectAction oa = actionMapping.get(obj.getClass());
oa = oa == null ? new DefaultObjectAction() : oa;
oa.action(obj, Client.this);
} else {
Thread.sleep(10);
}
} catch (Exception e) {
e.printStackTrace();
Client.this.stop();
}
}
}
}
}