一琐旁、 組件介紹
-
Channel
在 Netty 中, Channel 是一個 Socket 連接的抽象, 它為用戶提供了關(guān)于底層 Socket 狀態(tài)(是否是連接還是斷開) 以及對 Socket 的讀寫等操作缭乘。
服務(wù)器連接監(jiān)聽的channel 婆廊,也叫parent channel
密强。 對應(yīng)每一個客戶端連接讀寫的channel,叫child channel
。 -
EventLoopGroup
Netty的調(diào)度模塊稱為EvenLoopGroup,它包含一組EventLoop盆佣,Channel通過注冊到EventLoop中執(zhí)行操作。默認EventLoop個數(shù)為cpu核數(shù)的兩倍
BossGroup(boss線程組):相當(dāng)于mainReactor械荷,負責(zé)建立連接并且把連接注冊到WorkGroup中共耍。
WorkerGroup(worker線程組):相當(dāng)于subReactor,WorkGroup負責(zé)處理連接對應(yīng)的讀寫事件吨瞎。
boss group和worker goup相當(dāng)于多Reactor多線程
設(shè)計模式痹兜。
EvenLoopGroup.png -
Channel類型
NioDatagramChannel
,表示異步的UDP連接颤诀;
NioSocketChannel
表示異步的TCP連接
NioServerSocketChannel
表示異步的TCP Socket連接字旭,對每一個新進來的連接都會創(chuàng)建一個SocketChannel对湃。 -
Bootstrap和ServerBootstrap
啟動器,完成Netty客戶端或服務(wù)端的初始化谐算;
因為UDP是無連接的熟尉,所有直接使用Bootstrap归露;Http則使用ServerBootstrap
啟動流程:BootstrapStart.png
二洲脂、主要邏輯
udp服務(wù)啟動相關(guān)邏輯:
public void startUdpServer(){
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group) //配置一個線程組
.channel(NioDatagramChannel.class) //設(shè)置channel類型為UPD
.option(ChannelOption.SO_BROADCAST, true) //支持廣播
.option(ChannelOption.SO_RCVBUF, 2048 * 1024)// 設(shè)置channel讀緩沖區(qū)大小
.option(ChannelOption.SO_SNDBUF, 2048 * 1024)// 設(shè)置channel寫緩沖區(qū)大小
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception { //裝配handler流水線
ChannelPipeline pipeline = ch.pipeline(); //Handler將按pipeline中添加的順序執(zhí)行
pipeline.addLast(new UdpServerHandler(serviceFactory)); //自定義的處理器
}
});
//綁定端口(默認是異步的,可以加ChannelFunture的監(jiān)聽事件)剧包,sync()同步阻塞等待連接成功恐锦;客戶端使用.connect(host, port)連接
ChannelFuture channelFuture = bootstrap.bind(port).sync();
log.info("udp服務(wù)器啟動,端口為"+port);
nettyUtil.setChannel(channelFuture.channel());
//sync()同步阻塞等待channel關(guān)閉
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
//關(guān)閉資源
group.shutdownGracefully();
}
}
udp自定義handler相關(guān)邏輯疆液,注意這里泛型為DatagramPacket一铅,表示接收處理UDP報文
@Slf4j
public class UdpServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {
private ServiceFactory serviceFactory;
//創(chuàng)建可緩存線程池
ExecutorService executorService = Executors.newCachedThreadPool();
public ServerHandler(ServiceFactory serviceFactory){
this.serviceFactory = serviceFactory;
}
//監(jiān)聽channel的消息,注意此時的handler為單線程處理堕油,可以把請求加到線程池中提升效率
@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
DatagramPacket p = packet.copy();
//將請求放在線程池處理
executorService.execute(new UdpHandlerRunnable(p));
}
class UdpHandlerRunnable implements Runnable{
DatagramPacket packet;
UdpHandlerRunnable(DatagramPacket packet){
this.packet = packet;
}
public void run(){
ByteBuf byteBuf = packet.content();
byteBuf.retain(); // byteBuf引用計數(shù)加1潘飘,避免報引用為0異常
String content = new String(ByteBufUtil.getBytes(byteBuf));
log.info("得到來自 "+packet.sender()+" 的請求, content = " + content);
// 業(yè)務(wù)邏輯
// ...
byteBuf.release(); //注意釋放byteBuf和packet
packet.release();
}
}
}
http服務(wù)啟動相關(guān)邏輯
public void startHttpServer(){
EventLoopGroup bossGroup = new NioEventLoopGroup(); //boss工作組
EventLoopGroup workerGroup = new NioEventLoopGroup(); //worker工作組
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) //設(shè)置channel類型為NioServerSocketChannel
.childOption(ChannelOption.SO_RCVBUF, 2048 * 1024)// 設(shè)置channel讀緩沖區(qū)為2M
.childOption(ChannelOption.SO_SNDBUF, 1024 * 1024)// 設(shè)置channel寫緩沖區(qū)為1M
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception { //裝配handler流水線
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpResponseEncoder()); //編碼器
pipeline.addLast(new HttpRequestDecoder()); //解碼器
pipeline.addLast("aggregator", new HttpObjectAggregator(65536));//設(shè)置塊的最大字節(jié)數(shù)
pipeline.addLast(new HttpServerHandler(serviceFactory)); //自定義的處理器
}
});
ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); //綁定端口,sync()同步阻塞等待連接成功
log.info("http服務(wù)器啟動掉缺,端口為"+port);
nettyUtil.setChannel(channelFuture.channel());
//sync()同步阻塞等待channel關(guān)閉
channelFuture.channel().closeFuture().sync();
}catch (InterruptedException e) {
e.printStackTrace();
}finally{
//關(guān)閉資源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
http自定義handler相關(guān)邏輯
@Slf4j
public class HttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private ServiceFactory serviceFactory;
public HttpServerHandler(ServiceFactory serviceFactory){
this.serviceFactory = serviceFactory;
}
//監(jiān)聽channel的消息
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
String method = request.method().name();
String uri = request.uri();
String body = fullHttpRequest.content().toString(CharsetUtil.UTF_8);
log.info("method = {}, uri = {}, body = {}", method, uri, body);
//具體業(yè)務(wù)邏輯...
}
//異常捕獲
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
NettyUtil.httpResponse(ctx.channel(), Error.error("錯誤請求"));
}
}
發(fā)送請求的工具類
/**
* udp單播發(fā)送
* @param host
* @param port
* @param pushMsg 待發(fā)送的信息
*/
public void singleCast(String host, int port, String pushMsg){
InetSocketAddress remoteAddress = new InetSocketAddress(host, port); //遠程地址
ByteBuf byteBuf1 = new UnpooledByteBufAllocator(false).buffer();
byteBuf1.writeCharSequence(pushMsg, CharsetUtil.UTF_8);
DatagramPacket packet = new DatagramPacket(byteBuf1, remoteAddress);
this.channel.writeAndFlush(packet);
}
/**
* http響應(yīng)
* @param channel
* @param sendMsg 響應(yīng)的信息
*/
public void httpResponse(Channel channel, String sendMsg){
FullHttpResponse response = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1,
HttpResponseStatus.OK,
Unpooled.wrappedBuffer(sendMsg.getBytes(CharsetUtil.UTF_8)));
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;charset=UTF-8");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
channel.writeAndFlush(response);
}
/**
* 參數(shù)轉(zhuǎn)map
* @param fullReq
* @return
*/
public Map<String, String> parse(FullHttpRequest fullReq) {
try{
HttpMethod method = fullReq.method();
Map<String, String> parmMap = new HashMap<>();
if (HttpMethod.GET == method) {
// 是GET請求
QueryStringDecoder decoder = new QueryStringDecoder(fullReq.uri());
decoder.parameters().entrySet().forEach( entry -> {
// entry.getValue()是一個List, 只取第一個元素
parmMap.put(entry.getKey(), entry.getValue().get(0));
});
} else if (HttpMethod.POST == method) {
// 是POST請求
HttpPostRequestDecoder decoder = new HttpPostRequestDecoder(fullReq);
decoder.offer(fullReq);
List<InterfaceHttpData> parmList = decoder.getBodyHttpDatas();
for (InterfaceHttpData parm : parmList) {
Attribute data = (Attribute) parm;
parmMap.put(data.getName(), data.getValue());
}
} else {
// 不支持其它方法
log.error("暫不支持該方法");
}
return parmMap;
}catch (IOException e) {
return null;
}
}
/**
* 文件下載
* @param ctx
* @param fileName
* @param type
*/
public void responseExportFile(ChannelHandlerContext ctx, String fileName, String type) {
try {
StringBuilder path = new StringBuilder();
if( type.equals("1") ){ //apk
path.append(".").append(File.separator).append("upload").append(File.separator).append("apk").append(File.separator).append(fileName);
}
if( type.equals("2") ){ //圖片
path.append(".").append(File.separator).append("upload").append(File.separator).append("img").append(File.separator).append(fileName);
}
File file = new File(path.toString()).getCanonicalFile();
if(!file.exists()){
log.error("下載文件不存在, path = {}", path);
this.httpResponse(ctx.channel(), JSON.toJSONString(Result.failure("下載文件不存在")));
return;
}
//隨機讀取文件
final RandomAccessFile raf = new RandomAccessFile(file, "r");
long fileLength = raf.length();
//定義response對象
HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
//設(shè)置請求頭部
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, fileLength);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/octet-stream; charset=UTF-8");
response.headers().add(HttpHeaderNames.CONTENT_DISPOSITION,
"attachment; filename=\"" + URLEncoder.encode(file.getName(), "UTF-8") + "\";");
ctx.write(response);
//設(shè)置事件通知對象
ChannelFuture sendFileFuture = ctx
.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ctx.newProgressivePromise());
sendFileFuture.addListener(new ChannelProgressiveFutureListener() {
//文件傳輸完成執(zhí)行監(jiān)聽器
@Override
public void operationComplete(ChannelProgressiveFuture future)
throws Exception {
log.info("文件 {} 下載成功.", fileName);
}
//文件傳輸進度監(jiān)聽器
@Override
public void operationProgressed(ChannelProgressiveFuture future,
long progress, long total) throws Exception {
}
});
//刷新緩沖區(qū)數(shù)據(jù)卜录,文件結(jié)束標(biāo)志符
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}