springboot整合Netty應用

什么是Netty

Netty是一個異步事件驅動的網(wǎng)絡應用程序框架,用于快速開發(fā)可維護的高性能協(xié)議服務器和客戶端,Netty是一個NIO客戶端服務器框架,可以快速輕松地開發(fā)網(wǎng)絡應用程序御板,例如協(xié)議服務器和客戶端猎莲。它極大地簡化和簡化了諸如TCP和UDP套接字服務器之類的網(wǎng)絡編程煤篙。

“快速簡便”并不意味著最終的應用程序將遭受可維護性或性能問題的困擾。Netty經過精心設計,結合了許多協(xié)議(例如FTP盔几,SMTP,HTTP以及各種基于二進制和文本的舊式協(xié)議)的實施經驗掩幢。結果逊拍,Netty成功地找到了一種無需妥協(xié)即可輕松實現(xiàn)開發(fā),性能际邻,穩(wěn)定性和靈活性的方法芯丧。

特性

  • 高性能 事件驅動
  • 異步非堵塞 基于NIO的客戶端,服務器端編程框架
  • 穩(wěn)定性和伸縮性
  • 適用于各種傳輸類型的統(tǒng)一API-阻塞和非阻塞套接字
  • 基于靈活且可擴展的事件模型,可將關注點明確分離
  • 高度可定制的線程模型-單線程,一個活多個線程池 ,例如SEDA
  • 真正的無連接數(shù)據(jù)報套接字支持 (從3.1開始)

表現(xiàn)

  • 更高的吞吐量
  • 減少資源消耗
  • 減少不必要的內存復制

使用

在pom.xml中添加依賴

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.42.Final</version>
        </dependency>

一丶Springboot應用啟動加載Netty應用

暴露Netty端口,隨springboot應用一并啟動

/**
 * @dete: 2021/4/21 9:08 上午
 * @author: 徐子木
 */
@SpringBootApplication
public class PmSocketApplication implements CommandLineRunner {
    
// yml中指定netty端口號
    @Value("${netty.port}")
    private int nettyServerPort;

    @Autowired
    private NettyWebSocketServer nettyServer;

    public static void main(String[] args) {
        SpringApplication.run(PmSocketApplication.class, args);
    }


    @Override
    public void run(String... args) throws Exception {
        //netty 服務端啟動的端口不可和Springboot啟動類的端口號重復
        nettyServer.start(nettyServerPort);

        //關閉服務器的時候同時關閉Netty服務
         Runtime.getRuntime().addShutdownHook(new Thread(() -> nettyServer.destroy()));
    }
}

二丶Netty整合WebSocket服務端

啟動或銷毀netty服務端的過程實現(xiàn)

/**
 * Netty整合websocket 服務端
 * 運行流程:
 * 1.創(chuàng)建一個ServerBootstrap的實例引導和綁定服務器
 * 2.創(chuàng)建并分配一個NioEventLoopGroup實例以進行事件的處理,比如接收連接和讀寫數(shù)據(jù)
 * 3.指定服務器綁定的本地的InetSocketAddress
 * 4.使用一個NettyServerHandler的實例初始化每一個新的Channel
 * 5.調用ServerBootstrap.bind()方法以綁定服務器
 *
 * @description
 * @author: 徐子木
 * @create: 2020-06-02 14:23
 **/
@Component
@Slf4j
public class NettyWebSocketServer {


    /**
     * EventLoop接口
     * NioEventLoop中維護了一個線程和任務隊列,支持異步提交任務,線程啟動時會調用NioEventLoop的run方法,執(zhí)行I/O任務和非I/O任務
     * I/O任務即selectionKey中的ready的事件,如accept,connect,read,write等,由processSelectedKeys方法觸發(fā)
     * 非I/O任務添加到taskQueue中的任務,如register0,bind0等任務,由runAllTasks方法觸發(fā)
     * 兩種任務的執(zhí)行時間比由變量ioRatio控制,默認為50,則表示允許非IO任務執(zhí)行的事件與IO任務的執(zhí)行時間相等
     */
    private final EventLoopGroup boosGroup = new NioEventLoopGroup();

    private final EventLoopGroup workGroup = new NioEventLoopGroup();

    /**
     * Channel
     * Channel類似Socket,它代表一個實體(如一個硬件設備,一個網(wǎng)絡套接字) 的開放連接,如讀寫操作.通俗的講,Channel字面意思就是通道,每一個客戶端與服務端之間進行通信的一個雙向通道.
     * Channel主要工作:
     * 1.當前網(wǎng)絡連接的通道的狀態(tài)(例如是否打開?是否已連接?)
     * 2.網(wǎng)絡連接的配置參數(shù)(例如接收緩沖區(qū)的大小)
     * 3.提供異步的網(wǎng)絡I/O操作(如建立連接,讀寫,綁定端口),異步調用意味著任何I/O調用都將立即返回,并且不保證在調用結束時鎖清秋的I/O操作已完成.
     * 調用立即放回一個ChannelFuture實例,通過注冊監(jiān)聽器到ChannelFuture上,可以I/O操作成功,失敗或取消時回調通知調用方.
     * 4.支持關聯(lián)I/O操作與對應的處理程序.
     * 不同協(xié)議,不同的阻塞類型的連接都有不同的Channel類型與之對應,下面是一些常用的Channel類型
     * NioSocketChannel,異步的客戶端 TCP Socket連接
     * NioServerSocketChannel,異步的服務端 TCP Socket 連接
     * NioDatagramChannel,異步的UDP連接
     * NioSctpChannel,異步的客戶端Sctp連接
     * NioSctoServerChannel,異步的Sctp服務端連接
     * 這些通道涵蓋了UDP 和TCP網(wǎng)絡IO以及文件IO
     */
    private Channel channel;


    /**
     * 啟動服務
     *
     * @param port
     */
    public void start(int port) {
        log.info("=================Netty 端口啟動:{}==================",port);

        /**
         * Future
         * Future提供了另外一種在操作完成時通知應用程序的方式,這個對象可以看做一個異步操作的結果占位符.
         * 通俗的講,它相當于一位指揮官,發(fā)送了一個請求建立完連接,通信完畢了,你通知一聲它回來關閉各種IO通道,整個過程,它是不阻塞的,異步的.
         * 在Netty中所有的IO操作都是異步的,不能理科的值消息是否被正確處理,但是可以過一會兒等他執(zhí)行完成或者直接注冊一個監(jiān)聽,具體的實現(xiàn)就是通過Future和ChannelFutures.
         * 他們可以注冊一個監(jiān)聽,當操作執(zhí)行成功成功或者失敗時監(jiān)聽會自動觸發(fā)注冊的監(jiān)聽事件
         */
        try {
            /**
             * Bootstrap
             * Bootstrap是引導的意思,一個Netty應用通常由一個Bootstrap開始
             * 主要作用是配置整個Netty程序,串聯(lián)各個組件
             * Netty中Bootstrap類是服務端啟動引導類
             */
            ServerBootstrap server = new ServerBootstrap();
            server.group(boosGroup, workGroup)
                    //非阻塞異步服務端TCP Socket 連接
                    .channel(NioServerSocketChannel.class)
                    //設置為前端WebSocket可以連接
                    .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            // HttpServerCodec: 將請求和映帶消息節(jié)嗎為HTTP消息
                            pipeline.addLast("http-codec", new HttpServerCodec());
                            // 講HTTP消息的多個部分合成一條完整的HTTP消息
                            pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
                            // 向客戶端發(fā)送HTML5文件
                            socketChannel.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
                            // 進行設置心跳檢測
                            socketChannel.pipeline().addLast(new IdleStateHandler(60, 30, 60 * 30, TimeUnit.SECONDS));
                            // 配置通道處理 來進行業(yè)務處理
                            pipeline.addLast("handler", new WebSocketServerHandler());
                        }

                    });
            channel = server.bind(port).sync().channel();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @PreDestroy
    public void destroy() {
        log.info("=================Netty服務關閉==================");
        if (channel != null) {
            channel.close();
        }
        boosGroup.shutdownGracefully();
        workGroup.shutdownGracefully();
    }


}

三丶Socket處理注冊連接類

接收客戶端連接信息,心跳檢測,存儲于銷毀等的處理

其余部分業(yè)務類(messageService),均為自行處理連接存儲為業(yè)務所用,請忽略

/**
 * @description
 * @author: 徐子木
 * @create: 2020-06-02 14:57
 **/
@Slf4j
@Component
public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {

    public static final byte PING_MSG = 1;
    public static final byte PONG_MSG = 2;
    public static final byte CUSTOM_MSG = 3;
    private int heartbeatCount = 0;

  // 配置客戶端是否為https的控制
    @Value("${netty.ssl-enabled:false}")
    private Boolean useSsl;

    /**
     * 這里可以引入自己業(yè)務類來處理進行的客戶端連接
     */
    @Autowired
    private MessageService messageService;
    
    public static WebSocketServerHandler webSocketServerHandler;

    /**
     * 解決啟動加載不到自己業(yè)務類
     */
    @PostConstruct
    public void init() {
        webSocketServerHandler = this;
    }


    private WebSocketServerHandshaker handshaker;


    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        //http請求和tcp請求分開處理
        if (msg instanceof HttpRequest) {
            handlerHttpRequest(ctx, (HttpRequest) msg);
        } else if (msg instanceof WebSocketFrame) {
            //踩坑: simpleChannelInboundHandler 他會進行一次釋放(引用計數(shù)器減一),參考源碼,而我們釋放的時候就變?yōu)榱?,所以必須手動進行引用計數(shù)器加1
            WebSocketFrame frame = (WebSocketFrame) msg;
            frame.retain();
            handlerWebSocketFrame(ctx, frame);
        }

    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    /**
     * WebSocket 消息處理
     *
     * @param ctx
     * @param frame
     */
    private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
        //判斷是否是關閉鏈路的指令
        if (frame instanceof CloseWebSocketFrame) {
            log.info("【" + ctx.channel().remoteAddress() + "】已關閉(服務器端)");
            //移除channel
            NioSocketChannel channel = (NioSocketChannel) ctx.channel();
            webSocketServerHandler.messageService.removeConnection(channel);
            handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame);
            return;
        }
        //判斷是否是ping消息
        if (frame instanceof PingWebSocketFrame) {
            log.info("【ping】");
            return;
        }
        //判斷實時是pong消息
        if (frame instanceof PongWebSocketFrame) {
            log.info("【pong】");
            return;
        }
        //本例子只支持文本世曾,不支持二進制
        if (!(frame instanceof TextWebSocketFrame)) {
            log.info("【不支持二進制】");
            throw new UnsupportedOperationException("不支持二進制");
        }

        // 傳送的消息 ,接收客戶端指定格式(自己與客戶端約定json格式)的消息,并進行處理
        MessageObject messageObject = JSONObject.parseObject(((TextWebSocketFrame) frame).text().toString(), MessageObject.class);
        webSocketServerHandler.messageService.sendMessage(messageObject, ctx);
    }

    /**
     * websocket第一次連接握手
     *
     * @param ctx
     */
    @SuppressWarnings("deprecation")
    private void handlerHttpRequest(ChannelHandlerContext ctx, HttpRequest req) {
// 這里接收客戶端附加連接參數(shù),根據(jù)自己業(yè)務與客戶端指定需要哪些參數(shù)來辨別連接唯一性
        String userUid = null;
        String sectionId = null;
        if ("GET".equalsIgnoreCase(req.getMethod().toString())) {
            String uri = req.getUri();
            userUid = req.getUri().substring(uri.indexOf("/", 2) + 1, uri.lastIndexOf("/"));
            sectionId = req.getUri().substring(uri.lastIndexOf("/") + 1);
            //對用戶信息進行存儲
            NioSocketChannel channel = (NioSocketChannel) ctx.channel();

            webSocketServerHandler.messageService.putConnection(userUid, sectionId, channel);
        }

        // http 解碼失敗
        if (!req.getDecoderResult().isSuccess() || (!"websocket".equalsIgnoreCase(req.headers().get("Upgrade")))) {
            sendHttpResponse(ctx, (FullHttpRequest) req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
        }
        //可以通過url獲取其他參數(shù)
        WebSocketServerHandshakerFactory factory;
// 這里主要用于 客戶端為wss連接的處理
        if (useSsl != null && useSsl) {
            factory = new WebSocketServerHandshakerFactory(
                    "wss://" + req.headers().get("Host") + "/" + req.getUri() + "", null, false
            );
        } else {
            factory = new WebSocketServerHandshakerFactory(
                    "ws://" + req.headers().get("Host") + "/" + req.getUri() + "", null, false
            );
        }
        handshaker = factory.newHandshaker(req);
        if (handshaker == null) {
            WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
        } else {
            //進行連接
            handshaker.handshake(ctx.channel(), (FullHttpRequest) req);
        }
    }

    @SuppressWarnings("deprecation")
    private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) {
        // 返回應答給客戶端
        if (res.getStatus().code() != 200) {
            ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
            res.content().writeBytes(buf);

            // buf.release();
        }
        // 如果是非Keep-Alive缨恒,關閉連接
        ChannelFuture f = ctx.channel().writeAndFlush(res);
        if (!HttpHeaders.isKeepAlive(req) || res.getStatus().code() != 200) {
            f.addListener(ChannelFutureListener.CLOSE);
        }
    }


    /**
     * 這里是保持服務器與客戶端長連接  進行心跳檢測 避免連接斷開
     *
     * @param ctx
     * @param evt
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent stateEvent = (IdleStateEvent) evt;
            PingWebSocketFrame ping = new PingWebSocketFrame();
            switch (stateEvent.state()) {
                //讀空閑(服務器端)
                case READER_IDLE:
                    //log.info("【" + ctx.channel().remoteAddress() + "】讀空閑(服務器端)");
                    ctx.writeAndFlush(ping);
                    break;
                //寫空閑(客戶端)
                case WRITER_IDLE:
                    //log.info("【" + ctx.channel().remoteAddress() + "】寫空閑(客戶端)");
                    ctx.writeAndFlush(ping);
                    break;
                case ALL_IDLE:
                    //log.info("【" + ctx.channel().remoteAddress() + "】讀寫空閑");
                    break;
                default:
                    break;
            }
        }
    }


    /**
     * 出現(xiàn)異常時
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
        //移除channel
        webSocketServerHandler.messageService.removeConnection((NioSocketChannel) ctx.channel());
        ctx.close();
        log.info("【" + ctx.channel().remoteAddress() + "】已關閉(服務器端)");
    }


}

四丶與業(yè)務整合并存儲客戶連接

以下均為自己業(yè)務與socket連接對象的存儲,發(fā)送,踢出等處理,可根據(jù)自己業(yè)務自行參考

當時公司業(yè)務需要將連接存儲兩級維度,可根據(jù)自己的業(yè)務給客戶端分組,或不分組都可以

/**
 * @description
 * @author: 徐子木
 * @create: 2020-09-30 15:06
 **/
@Slf4j
@Service(value = "messageService")
public class MessageServiceImpl implements MessageService {

// 是消息通信dubbo類,請忽略
    @Autowired
    private ChatMsgService chatMsgService;
// 是db存儲類 ,請忽略
    @Autowired
    private BidPresentDao bidPresentDao;

// 這里是netty可為指定的唯一key去與連接進行分組處理并存儲
    private HashedWheelTimer hashedWheelTimer = new HashedWheelTimer();

    private final AttributeKey<String> userKey = AttributeKey.valueOf("user");

    private final AttributeKey<String> sectionKey = AttributeKey.valueOf("section");

    /**
     * 裝載標段與對應在線的用戶
     */
    private static final Map<String, ChannelGroup> SECTION_GROUPS = new ConcurrentHashMap<>();


    /**
     * 維護某標段中的socket連接
     *
     * @param sectionId
     * @param channel
     */
    @Override
    public void putConnection(String userId, String sectionId, NioSocketChannel channel) {

        channel.attr(userKey).set(userId);
        channel.attr(sectionKey).set(sectionId);

        bidPresentDao.comeOnlineByUserId(userId, sectionId);

        //存儲用戶標段對應連接
        ChannelGroup channelGroup = SECTION_GROUPS.get(sectionId);
        if (null == channelGroup) {
            //保存全局的,連接上的服務器的客戶
            channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
            channelGroup.add(channel);
            SECTION_GROUPS.put(sectionId, channelGroup);
        } else {
            channelGroup.add(channel);
        }
    }

    /**
     * 判斷一個通道是否有用戶在使用
     *
     * @param channel
     * @return
     */
    private boolean hasUser(Channel channel) {
        return ((channel.hasAttr(userKey) || channel.hasAttr(sectionKey)));
    }

    /**
     * 獲取連接對應用戶
     *
     * @param channel
     * @return
     */
    @Override
    public String getBindUserId(NioSocketChannel channel) {
        if (hasUser(channel)) {
            return channel.attr(userKey).get();
        }
        return null;
    }

    /**
     * 獲取連接對應標段Id
     *
     * @param channel
     * @return
     */
    @Override
    public String getBindSectionId(NioSocketChannel channel) {
        if (hasUser(channel)) {
            return channel.attr(sectionKey).get();
        }
        return null;
    }


    /**
     * 用戶退出標段在線連接
     *
     * @param channel
     */
    @Override
    public void removeConnection(NioSocketChannel channel) {
        String userId = getBindUserId(channel);
        String sectionId = getBindSectionId(channel);

        Iterator<Map.Entry<String, ChannelGroup>> iterator = SECTION_GROUPS.entrySet().iterator();
        while (iterator.hasNext()) {
            ChannelGroup channelGroup = iterator.next().getValue();
            if (channelGroup.contains(channel)) {
                channelGroup.remove(channel);
            }
            if (null == channelGroup || channelGroup.size() == 0) {
                iterator.remove();
            }
        }
        if (StringUtils.isNotEmpty(userId) && StringUtils.isNotEmpty(sectionId)) {
            bidPresentDao.exitOnlineByUserId(userId, sectionId);
        }
    }

    /**
     * 根據(jù)用戶Id獲取連接
     *
     * @param userId
     * @return
     */
    private NioSocketChannel getChannel(String userId) {
        Iterator<Map.Entry<String, ChannelGroup>> iterator = SECTION_GROUPS.entrySet().iterator();
        while (iterator.hasNext()) {
            ChannelGroup channelGroup = iterator.next().getValue();
            for (Channel channel : channelGroup) {
                if (userId.equalsIgnoreCase(channel.attr(userKey).get())) {
                    return (NioSocketChannel) channel;
                }
            }
        }
        return null;
    }

    /**
     * 發(fā)送純狀態(tài)碼的消息
     *
     * @param toUserId
     * @param message
     */
    @Override
    public void sendMessage(String toUserId, String message) {
        NioSocketChannel channel = getChannel(toUserId);
        if (channel != null) {
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("message", message);
            channel.writeAndFlush(new TextWebSocketFrame(JSONObject.toJSONString(jsonObject)));
        }
    }

    /**
     * 向指定投標人發(fā)送狀態(tài)
     *
     * @param toUserId
     */
    @Override
    public void sendMessage(String toUserId, MessageEnum messageEnum) {
        NioSocketChannel channel = getChannel(toUserId);
        if (channel != null) {
            MessageObject messageObject = MessageObject.builder().
                    code(messageEnum.getCode())
                    .build();
            channel.writeAndFlush(new TextWebSocketFrame(JSONObject.toJSONString(messageObject)));
        }
    }

    /**
     * 向當前標段所有投標人發(fā)送消息
     *
     * @param sectionId
     */
    @Override
    public void sendMessageAll(String sectionId, MessageEnum messageEnum) {
        log.debug("標段Id: {},發(fā)送狀態(tài)碼: {}", sectionId, messageEnum);
        MessageObject messageObject = MessageObject.builder()
                .code(messageEnum.getCode())
                .build();
        sendMessageAll(sectionId, messageObject);
    }

    @Override
    public void sendMessageAll(String sectionId, MessageObject messageObject) {
        ChannelGroup channelGroup = SECTION_GROUPS.get(sectionId);
        if (channelGroup == null || channelGroup.size() == 0) {
            log.warn("暫時無客戶端在線 sectionId:{}", sectionId);
            return;
        }
        channelGroup.writeAndFlush(new TextWebSocketFrame(JSONObject.toJSONString(messageObject)));
    }


    /**
     * 根據(jù)狀態(tài)碼處理消息
     *
     * @param message
     */
    @Override
    public void sendMessage(MessageObject message, ChannelHandlerContext ctx) {
        MessageEnum messageEnum = MessageEnum.valuesOf(message.getCode());
        switch (messageEnum) {
            case CHAT_SEND_MESSAGE:
                ChatMsg msg = JSONObject.parseObject(message.getData().toString(), ChatMsg.class);
                flushSectionChat(msg, ctx);
                break;
            case HEART_CONNECT:
                //心跳連接
                break;
            case REDIRECT:
                RedirectEntity redirectEntity = JSONObject.parseObject(message.getData().toString(), RedirectEntity.class);
                flushSectionToObject(redirectEntity.getSectionId(), redirectEntity);
                break;
            case DECODE:
                String sectionId = JSONObject.parseObject(message.getData().toString(), String.class);
                sendMessageAll(sectionId, MessageEnum.DECODE);
                break;
            default:

                break;
        }

    }

    /**
     * 向該標段中發(fā)送系統(tǒng)消息
     *
     * @param sectionId
     * @param msg
     */
    @Override
    public void flushSectionSystem(String sectionId, String msg) {
        ChatMsg chatMsg = ChatMsg.builder()
                .sectionId(sectionId)
                .msgType(ChatMsgType.NOTICE.getCode())
                .content(msg)
                .build();
        flushSectionChat(chatMsg, null);
    }

    /**
     * 向當前標段的在線人員刷新一條消息
     *
     * @param chatMsg
     */
    private void flushSectionChat(ChatMsg chatMsg, ChannelHandlerContext ctx) {
        if (ObjectUtil.isNotEmpty(ctx)) {
            InetSocketAddress inetSocketAddress = (InetSocketAddress) ctx.channel().remoteAddress();
            String ip = inetSocketAddress.getAddress().getHostAddress();
            chatMsg.setIp(ip);
        }

        String sectionId = chatMsg.getSectionId();
        MessageObject messageObject = MessageObject.builder()
                .code(MessageEnum.CHAT_SEND_MESSAGE.getCode())
                .data(chatMsg)
                .build();
        sendMessageAll(sectionId, messageObject);
        chatMsgService.create(chatMsg);
    }


    /**
     * 當前標段在線人員發(fā)送自定義數(shù)據(jù)
     *
     * @param sectionId
     * @param object
     */
    private void flushSectionToObject(String sectionId, Object object) {
        MessageObject messageObject = MessageObject.builder()
                .code(MessageEnum.REDIRECT.getCode())
                .data(object)
                .build();

        sendMessageAll(sectionId, messageObject);
    }


    /**
     * 向指定用戶發(fā)送自定義數(shù)據(jù)
     *
     * @param toUserId
     */
    @Override
    public void sendMessage(String toUserId, Object object, MessageEnum messageEnum) {
        NioSocketChannel channel = getChannel(toUserId);
        if (channel != null) {
            MessageObject messageObject = MessageObject.builder().
                    code(messageEnum.getCode())
                    .data(object)
                    .build();
            if(MessageEnum.EXTRACT_PARAM_RESULT.equals(messageEnum)){
                log.debug("參數(shù)抽取發(fā)送內容:{}",JSONObject.toJSONString(messageObject));
            }
            channel.writeAndFlush(new TextWebSocketFrame(JSONObject.toJSONString(messageObject)));
        }
    }


}

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市轮听,隨后出現(xiàn)的幾起案子骗露,更是在濱河造成了極大的恐慌,老刑警劉巖血巍,帶你破解...
    沈念sama閱讀 222,000評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件萧锉,死亡現(xiàn)場離奇詭異,居然都是意外死亡述寡,警方通過查閱死者的電腦和手機柿隙,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,745評論 3 399
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來鲫凶,“玉大人禀崖,你說我怎么就攤上這事∠菩颍” “怎么了帆焕?”我有些...
    開封第一講書人閱讀 168,561評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長。 經常有香客問我叶雹,道長财饥,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,782評論 1 298
  • 正文 為了忘掉前任折晦,我火速辦了婚禮钥星,結果婚禮上,老公的妹妹穿的比我還像新娘满着。我一直安慰自己谦炒,他們只是感情好,可當我...
    茶點故事閱讀 68,798評論 6 397
  • 文/花漫 我一把揭開白布风喇。 她就那樣靜靜地躺著宁改,像睡著了一般。 火紅的嫁衣襯著肌膚如雪魂莫。 梳的紋絲不亂的頭發(fā)上还蹲,一...
    開封第一講書人閱讀 52,394評論 1 310
  • 那天,我揣著相機與錄音耙考,去河邊找鬼谜喊。 笑死,一個胖子當著我的面吹牛倦始,可吹牛的內容都是我干的斗遏。 我是一名探鬼主播,決...
    沈念sama閱讀 40,952評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼鞋邑,長吁一口氣:“原來是場噩夢啊……” “哼诵次!你這毒婦竟也來了?” 一聲冷哼從身側響起炫狱,我...
    開封第一講書人閱讀 39,852評論 0 276
  • 序言:老撾萬榮一對情侶失蹤藻懒,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后视译,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體嬉荆,經...
    沈念sama閱讀 46,409評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 38,483評論 3 341
  • 正文 我和宋清朗相戀三年酷含,在試婚紗的時候發(fā)現(xiàn)自己被綠了鄙早。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,615評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡椅亚,死狀恐怖限番,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情呀舔,我是刑警寧澤弥虐,帶...
    沈念sama閱讀 36,303評論 5 350
  • 正文 年R本政府宣布扩灯,位于F島的核電站,受9級特大地震影響霜瘪,放射性物質發(fā)生泄漏珠插。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,979評論 3 334
  • 文/蒙蒙 一颖对、第九天 我趴在偏房一處隱蔽的房頂上張望捻撑。 院中可真熱鬧,春花似錦缤底、人聲如沸顾患。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,470評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽江解。三九已至,卻和暖如春坑鱼,著一層夾襖步出監(jiān)牢的瞬間膘流,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,571評論 1 272
  • 我被黑心中介騙來泰國打工鲁沥, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人耕魄。 一個月前我還...
    沈念sama閱讀 49,041評論 3 377
  • 正文 我出身青樓画恰,卻偏偏與公主長得像,于是被迫代替她去往敵國和親吸奴。 傳聞我的和親對象是個殘疾皇子允扇,可洞房花燭夜當晚...
    茶點故事閱讀 45,630評論 2 359

推薦閱讀更多精彩內容