客戶端斷線重連機(jī)制讨衣。
客戶端數(shù)量多,且需要傳遞的數(shù)據(jù)量級(jí)較大式镐》凑颍可以周期性的發(fā)送數(shù)據(jù)的時(shí)候,使用娘汞。要求對(duì)數(shù)據(jù)的即時(shí)性不高的時(shí)候歹茶,才可使用。
優(yōu)點(diǎn): 可以使用數(shù)據(jù)緩存。不是每條數(shù)據(jù)進(jìn)行一次數(shù)據(jù)交互惊豺×敲希可以定時(shí)回收資源,對(duì)資源利用率高尸昧。相對(duì)來(lái)說(shuō)揩页,即時(shí)性可以通過(guò)其他方式保證。如: 120秒自動(dòng)斷線烹俗。數(shù)據(jù)變化1000次請(qǐng)求服務(wù)器一次爆侣。300秒中自動(dòng)發(fā)送不足1000次的變化數(shù)據(jù)。
image.png
/**
* 1. 雙線程組
* 2. Bootstrap配置啟動(dòng)信息
* 3. 注冊(cè)業(yè)務(wù)處理Handler
* 4. 綁定服務(wù)監(jiān)聽(tīng)端口并啟動(dòng)服務(wù)
*/
package com.bjsxt.socket.netty.timer;
import com.bjsxt.socket.utils.SerializableFactory4Marshalling;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
public class Server4Timer {
// 監(jiān)聽(tīng)線程組幢妄,監(jiān)聽(tīng)客戶端請(qǐng)求
private EventLoopGroup acceptorGroup = null;
// 處理客戶端相關(guān)操作線程組兔仰,負(fù)責(zé)處理與客戶端的數(shù)據(jù)通訊
private EventLoopGroup clientGroup = null;
// 服務(wù)啟動(dòng)相關(guān)配置信息
private ServerBootstrap bootstrap = null;
public Server4Timer(){
init();
}
private void init(){
acceptorGroup = new NioEventLoopGroup();
clientGroup = new NioEventLoopGroup();
bootstrap = new ServerBootstrap();
// 綁定線程組
bootstrap.group(acceptorGroup, clientGroup);
// 設(shè)定通訊模式為NIO
bootstrap.channel(NioServerSocketChannel.class);
// 設(shè)定緩沖區(qū)大小
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
// SO_SNDBUF發(fā)送緩沖區(qū),SO_RCVBUF接收緩沖區(qū)蕉鸳,SO_KEEPALIVE開啟心跳監(jiān)測(cè)(保證連接有效)
bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024)
.option(ChannelOption.SO_RCVBUF, 16*1024)
.option(ChannelOption.SO_KEEPALIVE, true);
// 增加日志Handler乎赴,日志級(jí)別為info
// bootstrap.handler(new LoggingHandler(LogLevel.INFO));
}
public ChannelFuture doAccept(int port) throws InterruptedException{
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingDecoder());
ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingEncoder());
// 定義一個(gè)定時(shí)斷線處理器,當(dāng)多長(zhǎng)時(shí)間內(nèi)潮尝,沒(méi)有任何的可讀取數(shù)據(jù)榕吼,自動(dòng)斷開連接。
// 構(gòu)造參數(shù)勉失,就是間隔時(shí)長(zhǎng)友题。 默認(rèn)的單位是秒。
// 自定義間隔時(shí)長(zhǎng)單位戴质。 new ReadTimeoutHandler(long times, TimeUnit unit);
ch.pipeline().addLast(new ReadTimeoutHandler(3));
ch.pipeline().addLast(new Server4TimerHandler());
}
});
ChannelFuture future = bootstrap.bind(port).sync();
return future;
}
public void release(){
this.acceptorGroup.shutdownGracefully();
this.clientGroup.shutdownGracefully();
}
public static void main(String[] args){
ChannelFuture future = null;
Server4Timer server = null;
try{
server = new Server4Timer();
future = server.doAccept(9999);
System.out.println("server started.");
future.channel().closeFuture().sync();
}catch(InterruptedException e){
e.printStackTrace();
}finally{
if(null != future){
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(null != server){
server.release();
}
}
}
}
/**
* @Sharable注解 -
* 代表當(dāng)前Handler是一個(gè)可以分享的處理器度宦。也就意味著,服務(wù)器注冊(cè)此Handler后告匠,可以分享給多個(gè)客戶端同時(shí)使用戈抄。
* 如果不使用注解描述類型,則每次客戶端請(qǐng)求時(shí)后专,必須為客戶端重新創(chuàng)建一個(gè)新的Handler對(duì)象划鸽。
*
*/
package com.bjsxt.socket.netty.timer;
import com.bjsxt.socket.utils.ResponseMessage;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
@Sharable
public class Server4TimerHandler extends ChannelHandlerAdapter {
// 業(yè)務(wù)處理邏輯
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("from client : ClassName - " + msg.getClass().getName()
+ " ; message : " + msg.toString());
ResponseMessage response = new ResponseMessage(0L, "test response");
ctx.writeAndFlush(response);
}
// 異常處理邏輯
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("server exceptionCaught method run...");
// cause.printStackTrace();
ctx.close();
}
}
/**
* 1. 單線程組
* 2. Bootstrap配置啟動(dòng)信息
* 3. 注冊(cè)業(yè)務(wù)處理Handler
* 4. connect連接服務(wù),并發(fā)起請(qǐng)求
*/
package com.bjsxt.socket.netty.timer;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import com.bjsxt.socket.utils.RequestMessage;
import com.bjsxt.socket.utils.SerializableFactory4Marshalling;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.WriteTimeoutHandler;
public class Client4Timer {
// 處理請(qǐng)求和處理服務(wù)端響應(yīng)的線程組
private EventLoopGroup group = null;
// 服務(wù)啟動(dòng)相關(guān)配置信息
private Bootstrap bootstrap = null;
private ChannelFuture future = null;
public Client4Timer(){
init();
}
private void init(){
group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
// 綁定線程組
bootstrap.group(group);
// 設(shè)定通訊模式為NIO
bootstrap.channel(NioSocketChannel.class);
// bootstrap.handler(new LoggingHandler(LogLevel.INFO));
}
public void setHandlers() throws InterruptedException{
this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingDecoder());
ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingEncoder());
// 寫操作自定斷線戚哎。 在指定時(shí)間內(nèi)裸诽,沒(méi)有寫操作,自動(dòng)斷線型凳。
ch.pipeline().addLast(new WriteTimeoutHandler(3));
ch.pipeline().addLast(new Client4TimerHandler());
}
});
}
public ChannelFuture getChannelFuture(String host, int port) throws InterruptedException{
if(future == null){
future = this.bootstrap.connect(host, port).sync();
}
if(!future.channel().isActive()){
future = this.bootstrap.connect(host, port).sync();
}
return future;
}
public void release(){
this.group.shutdownGracefully();
}
public static void main(String[] args) {
Client4Timer client = null;
ChannelFuture future = null;
try{
client = new Client4Timer();
client.setHandlers();
future = client.getChannelFuture("localhost", 9999);
for(int i = 0; i < 3; i++){
RequestMessage msg = new RequestMessage(new Random().nextLong(),
"test"+i, new byte[0]);
future.channel().writeAndFlush(msg);
TimeUnit.SECONDS.sleep(2);
}
TimeUnit.SECONDS.sleep(5);
future = client.getChannelFuture("localhost", 9999);
RequestMessage msg = new RequestMessage(new Random().nextLong(),
"test", new byte[0]);
future.channel().writeAndFlush(msg);
}catch(Exception e){
e.printStackTrace();
}finally{
if(null != future){
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(null != client){
client.release();
}
}
}
}
package com.bjsxt.socket.netty.timer;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
public class Client4TimerHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("from server : ClassName - " + msg.getClass().getName()
+ " ; message : " + msg.toString());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("client exceptionCaught method run...");
cause.printStackTrace();
ctx.close();
}
/**
* 當(dāng)連接建立成功后丈冬,出發(fā)的代碼邏輯。
* 在一次連接中只運(yùn)行唯一一次甘畅。
* 通常用于實(shí)現(xiàn)連接確認(rèn)和資源初始化的埂蕊。
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client channel active");
}
}