Connection 事件處理相關類
- ConnectionEventType:定義了三種 Connection 相關事件
- ConnectionEventHandler:Connection 事件處理器,處理兩類事件
- Netty 定義的事件:例如 connect弥奸,channelActive 等
- SOFABolt 定義的事件:事件類型 ConnectionEventType
- RpcConnectionEventHandler:ConnectionEventHandler 實現(xiàn)類榨惠,重寫了其 channelInactive 方法
- ConnectionEventListener:Connection 事件監(jiān)聽器,存儲處理對應 ConnectionEventType 的 ConnectionEventProcessor 列表
- ConnectionEventProcessor:真正的 Connection 事件處理器接口
基本原理
- 繼承 ConnectionEventProcessor盛霎,編寫自定義的事件處理類
- 將自定義的事件處理類添加到 ConnectionEventListener 中
- 當觸發(fā) ConnectionEventType 相關事件時赠橙,ConnectionEventHandler 通知監(jiān)聽器 ConnectionEventListener,ConnectionEventListener 取出 ConnectionEventType 的自定義事件處理器列表愤炸,執(zhí)行其 onEvent 方法
一期揪、使用姿勢
事件處理器
=========================== 連接處理器 ===========================
public class MyCONNECTEventProcessor implements ConnectionEventProcessor {
@Override
public void onEvent(String remoteAddr, Connection conn) {
System.out.println("hello, " + remoteAddr);
}
}
=========================== 斷開處理器 ===========================
public class MyCLOSEEventProcessor implements ConnectionEventProcessor {
@Override
public void onEvent(String remoteAddr, Connection conn) {
System.out.println("bye, " + remoteAddr);
}
}
服務端
RpcServer server = new RpcServer(8888);
server.registerUserProcessor(new MyServerUserProcessor());
server.addConnectionEventProcessor(ConnectionEventType.CONNECT, new MyCONNECTEventProcessor());
server.addConnectionEventProcessor(ConnectionEventType.CLOSE, new MyCLOSEEventProcessor());
server.start();
客戶端
RpcClient client = new RpcClient();
client.addConnectionEventProcessor(ConnectionEventType.CONNECT, new MyCONNECTEventProcessor());
client.addConnectionEventProcessor(ConnectionEventType.CLOSE, new MyCLOSEEventProcessor());
client.init();
二、源碼分析
2.1 服務端
public class RpcServer extends AbstractRemotingServer implements RemotingServer {
...
/** connection event handler */
private ConnectionEventHandler connectionEventHandler;
/** connection event listener */
private ConnectionEventListener connectionEventListener = new ConnectionEventListener();
/** connection manager */
private DefaultConnectionManager connectionManager;
protected void doInit() {
...
// 服務端打開了 連接管理器 開關
if (this.switches().isOn(GlobalSwitch.SERVER_MANAGE_CONNECTION_SWITCH)) {
// 創(chuàng)建 ConnectionEventHandler 處理器
this.connectionEventHandler = new RpcConnectionEventHandler(switches());
// 創(chuàng)建 連接管理器
this.connectionManager = new DefaultConnectionManager(new RandomSelectStrategy());
// 設置 connectionManager 到 ConnectionEventHandler 中
this.connectionEventHandler.setConnectionManager(this.connectionManager);
// 設置 connectionEventListener 到 ConnectionEventHandler 中
this.connectionEventHandler.setConnectionEventListener(this.connectionEventListener);
} else {
// 創(chuàng)建 ConnectionEventHandler 處理器
this.connectionEventHandler = new ConnectionEventHandler(switches());
// 設置 connectionEventListener 到 ConnectionEventHandler 中
this.connectionEventHandler.setConnectionEventListener(this.connectionEventListener);
}
...
this.bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) {
...
// 添加 connectionEventHandler 到 netty 的 pipeline
pipeline.addLast("connectionEventHandler", connectionEventHandler);
...
createConnection(channel);
}
private void createConnection(SocketChannel channel) {
Url url = addressParser.parse(RemotingUtil.parseRemoteAddress(channel));
if (switches().isOn(GlobalSwitch.SERVER_MANAGE_CONNECTION_SWITCH)) {
connectionManager.add(new Connection(channel, url), url.getUniqueKey());
} else {
new Connection(channel, url);
}
// 發(fā)布 ConnectionEventType.CONNECT 事件
channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);
}
});
}
public void addConnectionEventProcessor(ConnectionEventType type, ConnectionEventProcessor processor) {
this.connectionEventListener.addConnectionEventProcessor(type, processor);
}
}
2.2 客戶端
public class RpcClient extends AbstractConfigurableInstance {
/** connection event handler */
private ConnectionEventHandler connectionEventHandler = new RpcConnectionEventHandler(switches());
/** reconnect manager */
private ReconnectManager reconnectManager;
/** connection event listener */
private ConnectionEventListener connectionEventListener = new ConnectionEventListener();
/** connection manager */
private DefaultConnectionManager connectionManager = new DefaultConnectionManager(connectionSelectStrategy, connectionFactory, connectionEventHandler, connectionEventListener, switches());
public void init() {
...
this.connectionManager.init();
...
// 重連開關
if (switches().isOn(GlobalSwitch.CONN_RECONNECT_SWITCH)) {
// 創(chuàng)建 ReconnectManager
reconnectManager = new ReconnectManager(connectionManager);
// 設置 ReconnectManager 到 connectionEventHandler 中规个,當 channelInactive 時凤薛,進行重連操作
connectionEventHandler.setReconnectManager(reconnectManager);
}
}
public void addConnectionEventProcessor(ConnectionEventType type,
ConnectionEventProcessor processor) {
this.connectionEventListener.addConnectionEventProcessor(type, processor);
}
}
======================== DefaultConnectionManager ==========================
public void init() {
// 將當前的 DefaultConnectionManager 設置到 connectionEventHandler 中,用于 channelInactive 時诞仓,從 DefaultConnectionManager 中移除指定 Connection
this.connectionEventHandler.setConnectionManager(this);
// 將 connectionEventListener 設置到 connectionEventHandler 中
this.connectionEventHandler.setConnectionEventListener(connectionEventListener);
this.connectionFactory.init(connectionEventHandler);
}
======================== AbstractConnectionFactory ==========================
public void init(final ConnectionEventHandler connectionEventHandler) {
...
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) {
...
pipeline.addLast("connectionEventHandler", connectionEventHandler);
...
}
});
}
不論是服務端還是客戶端缤苫,其實本質(zhì)都在做一件事情:創(chuàng)建 ConnectionEventHandler 實例并添加到 Netty 的 pipeline 中。
之后當有 ConnectionEvent 觸發(fā)時(無論是 Netty 定義的事件被觸發(fā)墅拭,還是 SOFABolt 定義的事件被觸發(fā))活玲,ConnectionEventHandler 會通過異步線程執(zhí)行器通知 ConnectionEventListener,ConnectionEventListener 將消息派發(fā)給具體的 ConnectionEventProcessor 實現(xiàn)類。具體源碼如下:
2.3 事件處理機制核心部分
======================== ConnectionEventListener ==========================
/**
* Listen and dispatch connection events.
*/
public class ConnectionEventListener {
private ConcurrentHashMap<ConnectionEventType, List<ConnectionEventProcessor>> processors = new ConcurrentHashMap<ConnectionEventType, List<ConnectionEventProcessor>>(3);
/**
* Dispatch events.
*/
public void onEvent(ConnectionEventType type, String remoteAddr, Connection conn) {
List<ConnectionEventProcessor> processorList = this.processors.get(type);
if (processorList != null) {
for (ConnectionEventProcessor processor : processorList) {
processor.onEvent(remoteAddr, conn);
}
}
}
/**
* Add event processor.
*/
public void addConnectionEventProcessor(ConnectionEventType type,
ConnectionEventProcessor processor) {
List<ConnectionEventProcessor> processorList = this.processors.get(type);
if (processorList == null) {
this.processors.putIfAbsent(type, new ArrayList<ConnectionEventProcessor>(1));
processorList = this.processors.get(type);
}
processorList.add(processor);
}
}
======================== ConnectionEventProcessor ==========================
/**
* Process connection events.
*/
public interface ConnectionEventProcessor {
/**
* Process event.
*/
public void onEvent(String remoteAddr, Connection conn);
}
======================== ConnectionEventHandler ==========================
/**
* Log the channel status event.
*/
@Sharable
public class ConnectionEventHandler extends ChannelDuplexHandler {
private ConnectionManager connectionManager;
private ConnectionEventListener eventListener;
private ConnectionEventExecutor eventExecutor;
private ReconnectManager reconnectManager;
private GlobalSwitch globalSwitch;
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception {
if (logger.isInfoEnabled()) {
...
}
super.connect(ctx, remoteAddress, localAddress, promise);
}
...
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
...
super.channelInactive(ctx);
Attribute attr = ctx.channel().attr(Connection.CONNECTION);
if (null != attr) {
// 進行重連操作舒憾,這也是 ConnectionEventHandler 持有 reconnectManager 引用的原因
if (this.globalSwitch != null
&& this.globalSwitch.isOn(GlobalSwitch.CONN_RECONNECT_SWITCH)) {
Connection conn = (Connection) attr.get();
if (reconnectManager != null) {
reconnectManager.addReconnectTask(conn.getUrl());
}
}
// 調(diào)用 ConnectionEventType.CLOSE 事件
onEvent((Connection) attr.get(), remoteAddress, ConnectionEventType.CLOSE);
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object event) throws Exception {
if (event instanceof ConnectionEventType) {
switch ((ConnectionEventType) event) {
case CONNECT:
Channel channel = ctx.channel();
if (null != channel) {
Connection connection = channel.attr(Connection.CONNECTION).get();
// 調(diào)用 ConnectionEventType.CONNECT 事件
this.onEvent(connection, connection.getUrl().getOriginUrl(), ConnectionEventType.CONNECT);
}
break;
default:
return;
}
} else {
super.userEventTriggered(ctx, event);
}
}
private void onEvent(Connection conn, String remoteAddress, ConnectionEventType type) {
if (this.eventListener != null) {
// 1. 創(chuàng)建任務:該任務執(zhí)行調(diào)用 ConnectionEventListener 的 onEvent
// 2. 使用 ConnectionEventExecutor 執(zhí)行該任務
this.eventExecutor.onEvent(new Runnable() {
@Override
public void run() {
ConnectionEventHandler.this.eventListener.onEvent(type, remoteAddress, conn);
}
});
}
}
public void setConnectionEventListener(ConnectionEventListener listener) {
if (listener != null) {
// 設置 ConnectionEventListener
this.eventListener = listener;
// 創(chuàng)建 ConnectionEventExecutor镀钓,事件的異步執(zhí)行器
if (this.eventExecutor == null) {
this.eventExecutor = new ConnectionEventExecutor();
}
}
}
public class ConnectionEventExecutor {
ExecutorService executor = new ThreadPoolExecutor(1, 1, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(10000), new NamedThreadFactory("Bolt-conn-event-executor", true));
public void onEvent(Runnable event) {
executor.execute(event);
}
}
}
======================== RpcConnectionEventHandler ==========================
public class RpcConnectionEventHandler extends ConnectionEventHandler {
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Connection conn = ctx.channel().attr(Connection.CONNECTION).get();
if (conn != null) {
// 這就是 ConnectionEventHandler 持有 ConnectionManager 引用的原因
this.getConnectionManager().remove(conn);
}
super.channelInactive(ctx);
}
}
事件的處理流程
事件觸發(fā) -> RpcConnectionEventHandler -> [ ConnectionEventListener -> ConnectionEventProcessor ]
方括號內(nèi)的操作由 ConnectionEventExecutor 異步執(zhí)行
事件的觸發(fā)有兩種:Netty定義的事件(例如 channelInactive)和 SOFABolt 定義的事件,前者直接在 Netty 定義的事件觸發(fā)方法中進行(例如 channelInactive)镀迂,后者在 userEventTriggered 方法中進行觸發(fā)掸宛。
事件的觸發(fā)時機
- ConnectionEventType.CONNECT
- AbstractConnectionFactory # createConnection(客戶端)
- RpcServer # doInit # childHandler # initChannel # createConnection(服務端)
- ConnectionEventType.CLOSE
- ConnectionEventHandler # channelInactive
======================== 客戶端創(chuàng)建連接 ==========================
@Override
public Connection createConnection(Url url) throws Exception {
Channel channel = doCreateConnection(url.getIp(), url.getPort(), url.getConnectTimeout());
Connection conn = new Connection(channel, ProtocolCode.fromBytes(url.getProtocol()),
url.getVersion(), url);
// 發(fā)布 ConnectionEventType.CONNECT 事件
channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);
return conn;
}