一宙址、啟動方式
// 創(chuàng)建 RpcClient 實(shí)例:主要是初始化心跳處理器HeartbeatHandler和相應(yīng)消息分發(fā)器RpcHandler
client =new RpcClient();
// 初始化 netty 客戶端:主要是初始化SOFA的相關(guān)組件
client.init();
二盯孙、源碼分析
client.init();
==this.connectionManager.setAddressParser(this.addressParser);//設(shè)置地址編解碼器
==this.connectionManager.init();? ===this.connectionEventHandler.setConnectionManager(this);
===this.connectionEventHandler.setConnectionEventListener(connectionEventListener)//添加鏈接監(jiān)聽器
===this.connectionFactory.init(connectionEventHandler);//初始化鏈接工廠
====bootstrap = new ????Bootstrap();bootstrap.group(workerGroup).channel(NettyEventLoopUtil.getClientSock????etChannelClass()) .option(ChannelOption.TCP_NODELAY, ????ConfigManager.tcp_nodelay()) .option(ChannelOption.SO_REUSEADDR, ????ConfigManager.tcp_so_reuseaddr()) .option(ChannelOption.SO_KEEPALIVE, ????ConfigManager.tcp_so_keepalive());
????// init netty write buffer water
????markinitWriteBufferWaterMark();
????// init byte buf allocator
????if (ConfigManager.netty_buffer_pooled()) { ????this.bootstrap.option(ChannelOption.ALLOCATOR, ????PooledByteBufAllocator.DEFAULT);} else {
????this.bootstrap.option(ChannelOption.ALLOCATOR, ????UnpooledByteBufAllocator.DEFAULT);}
????bootstrap.handler(new ChannelInitializer<SocketChannel>()
????{ @Override protected void initChannel(SocketChannel channel) {
????ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast("decoder", ????codec.newDecoder());
????pipeline.addLast("encoder", codec.newEncoder());
????boolean idleSwitch = ConfigManager.tcp_idle_switch(); if (idleSwitch) {
????pipeline.addLast("idleStateHandler", new IdleStateHandler(ConfigManager.tcp_idle(), ????ConfigManager.tcp_idle(), 0, TimeUnit.MILLISECONDS));
????pipeline.addLast("heartbeatHandler", heartbeatHandler); }//添加心跳鏈接處理器? ? ? ? ????pipeline.addLast("connectionEventHandler", connectionEventHandler); ????pipeline.addLast("handler", handler); }});
==this.rpcRemoting =new RpcClientRemoting(new RpcCommandFactory(),this.addressParser,
this.connectionManager);//初始化客戶端遠(yuǎn)程調(diào)用
==this.taskScanner.add(this.connectionManager);//使用scanner定時掃描完成的鏈接已艰,并移除連接池
==this.taskScanner.start();
==if (monitorStrategy == null) { ScheduledDisconnectStrategy strategy = new ????ScheduledDisconnectStrategy(); connectionMonitor = new ????DefaultConnectionMonitor(strategy, this.connectionManager);} else { ????connectionMonitor = new DefaultConnectionMonitor(monitorStrategy, ????this.connectionManager);}connectionMonitor.start();//Monitor定時監(jiān)聽已經(jīng)關(guān)閉的鏈接券勺,
==if (switches().isOn(GlobalSwitch.CONN_RECONNECT_SWITCH)) { reconnectManager = new ReconnectManager(connectionManager); connectionEventHandler.setReconnectManager(reconnectManager); logger.warn("Switch on reconnect manager");}//設(shè)置重連管理器,內(nèi)部通過通過HealConnectionRunner子線程循環(huán)需要重連的任務(wù)