前言
終于進(jìn)行到Connector
的分析階段了,這也是Tomcat里面最復(fù)雜的一塊功能了。Connector
中文名為連接器
辛藻,既然是連接器,它肯定會(huì)連接某些東西,連接些什么呢?
Connector
用于接受請(qǐng)求并將請(qǐng)求封裝成Request和Response猛铅,然后交給Container
進(jìn)行處理,Container
處理完之后再交給Connector
返回給客戶(hù)端。
要理解Connector
,我們需要問(wèn)自己4個(gè)問(wèn)題腌歉。
- (1)
Connector
如何接受請(qǐng)求的桂塞? - (2)如何將請(qǐng)求封裝成Request和Response的汰瘫?
- (3)封裝完之后的Request和Response如何交給
Container
進(jìn)行處理的趴乡? - (4)
Container
處理完之后如何交給Connector
并返回給客戶(hù)端的?
先來(lái)一張Connector
的整體結(jié)構(gòu)圖
【注意】:不同的協(xié)議、不同的通信方式,ProtocolHandler
會(huì)有不同的實(shí)現(xiàn)。在Tomcat8.5中区匠,ProtocolHandler
的類(lèi)繼承層級(jí)如下圖所示速客。
針對(duì)上述的類(lèi)繼承層級(jí)圖位喂,我們做如下說(shuō)明:
- ajp和http11是兩種不同的協(xié)議
- nio、nio2和apr是不同的通信方式
- 協(xié)議和通信方式可以相互組合规婆。
ProtocolHandler
包含三個(gè)部件:Endpoint
耘戚、Processor
、Adapter
。
-
Endpoint
用來(lái)處理底層Socket的網(wǎng)絡(luò)連接部服,Processor
用于將Endpoint
接收到的Socket封裝成Request赵抢,Adapter
用于將Request交給Container進(jìn)行具體的處理宠叼。 -
Endpoint
由于是處理底層的Socket網(wǎng)絡(luò)連接摩渺,因此Endpoint
是用來(lái)實(shí)現(xiàn)TCP/IP協(xié)議
的横侦,而Processor
用來(lái)實(shí)現(xiàn)HTTP協(xié)議
的,Adapter
將請(qǐng)求適配到Servlet容器進(jìn)行具體的處理伤疙。 -
Endpoint
的抽象實(shí)現(xiàn)類(lèi)AbstractEndpoint里面定義了Acceptor
和AsyncTimeout
兩個(gè)內(nèi)部類(lèi)和一個(gè)Handler接口
蛙讥。Acceptor
用于監(jiān)聽(tīng)請(qǐng)求旁涤,AsyncTimeout
用于檢查異步Request的超時(shí)闻妓,Handler
用于處理接收到的Socket注祖,在內(nèi)部調(diào)用Processor
進(jìn)行處理。
至此均唉,我們已經(jīng)明白了問(wèn)題(1)是晨、(2)和(3)。至于(4)舔箭,當(dāng)我們了解了Container自然就明白了,前面章節(jié)內(nèi)容已經(jīng)詳細(xì)分析過(guò)了限嫌。
Connector源碼分析入口
我們?cè)?code>Service標(biāo)準(zhǔn)實(shí)現(xiàn)StandardService
的源碼中發(fā)現(xiàn)靴庆,其init()
、start()
怒医、stop()
和destroy()
方法分別會(huì)對(duì)Connectors的同名方法進(jìn)行調(diào)用炉抒。而一個(gè)Service
對(duì)應(yīng)著多個(gè)Connector
。限于篇幅稚叹,本章不再羅列這部分代碼焰薄,需要讀者自行閱讀tomcat源碼拿诸。
【注】:本章我們僅對(duì)
http1.1協(xié)議且nio通信方式
的相關(guān)代碼進(jìn)行分析。
Connector啟動(dòng)邏輯
我們知道Connector
實(shí)現(xiàn)了Lifecycle
接口塞茅,所以它是一個(gè)生命周期組件
亩码。所以Connector
的啟動(dòng)邏輯入口在于init()
和start()
。
Connector構(gòu)造方法
在分析之前野瘦,我們看看server.xml
描沟,該文件已經(jīng)體現(xiàn)出了tomcat中各個(gè)組件的大體結(jié)構(gòu)。
<?xml version='1.0' encoding='utf-8'?>
<Server port="8005" shutdown="SHUTDOWN">
<Listener className="org.apache.catalina.startup.VersionLoggerListener" />
<Listener className="org.apache.catalina.core.AprLifecycleListener" SSLEngine="on" />
<Listener className="org.apache.catalina.core.JreMemoryLeakPreventionListener" />
<Listener className="org.apache.catalina.mbeans.GlobalResourcesLifecycleListener" />
<Listener className="org.apache.catalina.core.ThreadLocalLeakPreventionListener" />
<GlobalNamingResources>
<Resource name="UserDatabase" auth="Container"
type="org.apache.catalina.UserDatabase"
description="User database that can be updated and saved"
factory="org.apache.catalina.users.MemoryUserDatabaseFactory"
pathname="conf/tomcat-users.xml" />
</GlobalNamingResources>
<Service name="Catalina">
<Connector port="8080" protocol="HTTP/1.1" connectionTimeout="20000" redirectPort="8443" />
<Connector port="8009" protocol="AJP/1.3" redirectPort="8443" />
<Engine name="Catalina" defaultHost="localhost">
<Realm className="org.apache.catalina.realm.LockOutRealm">
<Realm className="org.apache.catalina.realm.UserDatabaseRealm"
resourceName="UserDatabase"/>
</Realm>
<Host name="localhost" appBase="webapps"
unpackWARs="true" autoDeploy="true">
<Valve className="org.apache.catalina.valves.AccessLogValve" directory="logs"
prefix="localhost_access_log" suffix=".txt"
pattern="%h %l %u %t "%r" %s %b" />
</Host>
</Engine>
</Service>
</Server>
在這個(gè)文件中鞭光,我們看到一個(gè)Connector
有幾個(gè)關(guān)鍵屬性吏廉,port
和protocol
是其中的兩個(gè)。server.xml
默認(rèn)支持兩種協(xié)議:HTTP/1.1
和AJP/1.3
惰许。其中HTTP/1.1
用于支持http1.1協(xié)議席覆,而AJP/1.3
用于支持對(duì)apache服務(wù)器的通信。
接下來(lái)我們看看構(gòu)造方法汹买。
public Connector() {
this(null); // 1. 無(wú)參構(gòu)造方法佩伤,傳入?yún)?shù)為空協(xié)議,會(huì)默認(rèn)使用`HTTP/1.1`
}
public Connector(String protocol) {
setProtocol(protocol);
// Instantiate protocol handler
// 5. 使用protocolHandler的類(lèi)名構(gòu)造ProtocolHandler的實(shí)例
ProtocolHandler p = null;
try {
Class<?> clazz = Class.forName(protocolHandlerClassName);
p = (ProtocolHandler) clazz.getConstructor().newInstance();
} catch (Exception e) {
log.error(sm.getString(
"coyoteConnector.protocolHandlerInstantiationFailed"), e);
} finally {
this.protocolHandler = p;
}
if (Globals.STRICT_SERVLET_COMPLIANCE) {
uriCharset = StandardCharsets.ISO_8859_1;
} else {
uriCharset = StandardCharsets.UTF_8;
}
}
@Deprecated
public void setProtocol(String protocol) {
boolean aprConnector = AprLifecycleListener.isAprAvailable() &&
AprLifecycleListener.getUseAprConnector();
// 2. `HTTP/1.1`或`null`晦毙,protocolHandler使用`org.apache.coyote.http11.Http11NioProtocol`生巡,不考慮apr
if ("HTTP/1.1".equals(protocol) || protocol == null) {
if (aprConnector) {
setProtocolHandlerClassName("org.apache.coyote.http11.Http11AprProtocol");
} else {
setProtocolHandlerClassName("org.apache.coyote.http11.Http11NioProtocol");
}
}
// 3. `AJP/1.3`,protocolHandler使用`org.apache.coyote.ajp.AjpNioProtocol`结序,不考慮apr
else if ("AJP/1.3".equals(protocol)) {
if (aprConnector) {
setProtocolHandlerClassName("org.apache.coyote.ajp.AjpAprProtocol");
} else {
setProtocolHandlerClassName("org.apache.coyote.ajp.AjpNioProtocol");
}
}
// 4. 其他情況障斋,使用傳入的protocol作為protocolHandler的類(lèi)名
else {
setProtocolHandlerClassName(protocol);
}
}
從上面的代碼我們看到構(gòu)造方法主要做了下面幾件事情:
- 無(wú)參構(gòu)造方法纵潦,傳入?yún)?shù)為空協(xié)議徐鹤,會(huì)默認(rèn)使用
HTTP/1.1
-
HTTP/1.1
或null
,protocolHandler使用org.apache.coyote.http11.Http11NioProtocol
邀层,不考慮apr -
AJP/1.3
返敬,protocolHandler使用org.apache.coyote.ajp.AjpNioProtocol
,不考慮apr - 其他情況寥院,使用傳入的protocol作為protocolHandler的類(lèi)名
- 使用protocolHandler的類(lèi)名構(gòu)造ProtocolHandler的實(shí)例
Connector.init()
@Override
protected void initInternal() throws LifecycleException {
super.initInternal();
// Initialize adapter
// 1. 初始化adapter
adapter = new CoyoteAdapter(this);
protocolHandler.setAdapter(adapter);
// Make sure parseBodyMethodsSet has a default
// 2. 設(shè)置接受body的method列表劲赠,默認(rèn)為POST
if (null == parseBodyMethodsSet) {
setParseBodyMethods(getParseBodyMethods());
}
if (protocolHandler.isAprRequired() && !AprLifecycleListener.isAprAvailable()) {
throw new LifecycleException(sm.getString("coyoteConnector.protocolHandlerNoApr",
getProtocolHandlerClassName()));
}
if (AprLifecycleListener.isAprAvailable() && AprLifecycleListener.getUseOpenSSL() &&
protocolHandler instanceof AbstractHttp11JsseProtocol) {
AbstractHttp11JsseProtocol<?> jsseProtocolHandler =
(AbstractHttp11JsseProtocol<?>) protocolHandler;
if (jsseProtocolHandler.isSSLEnabled() &&
jsseProtocolHandler.getSslImplementationName() == null) {
// OpenSSL is compatible with the JSSE configuration, so use it if APR is available
jsseProtocolHandler.setSslImplementationName(OpenSSLImplementation.class.getName());
}
}
// 3. 初始化protocolHandler
try {
protocolHandler.init();
} catch (Exception e) {
throw new LifecycleException(
sm.getString("coyoteConnector.protocolHandlerInitializationFailed"), e);
}
}
init()
方法做了3件事情
- 初始化adapter
- 設(shè)置接受body的method列表,默認(rèn)為POST
- 初始化protocolHandler
從ProtocolHandler類(lèi)繼承層級(jí)
我們知道ProtocolHandler
的子類(lèi)都必須實(shí)現(xiàn)AbstractProtocol
抽象類(lèi)秸谢,而protocolHandler.init();
的邏輯代碼正是在這個(gè)抽象類(lèi)里面凛澎。我們來(lái)分析一下。
@Override
public void init() throws Exception {
if (getLog().isInfoEnabled()) {
getLog().info(sm.getString("abstractProtocolHandler.init", getName()));
}
if (oname == null) {
// Component not pre-registered so register it
oname = createObjectName();
if (oname != null) {
Registry.getRegistry(null, null).registerComponent(this, oname, null);
}
}
if (this.domain != null) {
rgOname = new ObjectName(domain + ":type=GlobalRequestProcessor,name=" + getName());
Registry.getRegistry(null, null).registerComponent(
getHandler().getGlobal(), rgOname, null);
}
// 1. 設(shè)置endpoint的名字估蹄,默認(rèn)為:http-nio-{port}
String endpointName = getName();
endpoint.setName(endpointName.substring(1, endpointName.length()-1));
endpoint.setDomain(domain);
// 2. 初始化endpoint
endpoint.init();
}
我們接著分析一下Endpoint.init()
里面又做了什么塑煎。該方法位于AbstactEndpoint
抽象類(lèi),該類(lèi)是基于模板方法模式實(shí)現(xiàn)的臭蚁,主要調(diào)用了子類(lèi)的bind()
方法最铁。
public abstract void bind() throws Exception;
public abstract void unbind() throws Exception;
public abstract void startInternal() throws Exception;
public abstract void stopInternal() throws Exception;
public void init() throws Exception {
// 執(zhí)行bind()方法
if (bindOnInit) {
bind();
bindState = BindState.BOUND_ON_INIT;
}
if (this.domain != null) {
// Register endpoint (as ThreadPool - historical name)
oname = new ObjectName(domain + ":type=ThreadPool,name=\"" + getName() + "\"");
Registry.getRegistry(null, null).registerComponent(this, oname, null);
ObjectName socketPropertiesOname = new ObjectName(domain +
":type=ThreadPool,name=\"" + getName() + "\",subType=SocketProperties");
socketProperties.setObjectName(socketPropertiesOname);
Registry.getRegistry(null, null).registerComponent(socketProperties, socketPropertiesOname, null);
for (SSLHostConfig sslHostConfig : findSslHostConfigs()) {
registerJmx(sslHostConfig);
}
}
}
繼續(xù)分析bind()
方法讯赏,我們終于看到了我們想要看的東西了。關(guān)鍵的代碼在于serverSock.socket().bind(addr,getAcceptCount());
冷尉,用于綁定ServerSocket
到指定的端口漱挎。
@Override
public void bind() throws Exception {
if (!getUseInheritedChannel()) {
serverSock = ServerSocketChannel.open();
socketProperties.setProperties(serverSock.socket());
InetSocketAddress addr = (getAddress()!=null?new InetSocketAddress(getAddress(),getPort()):new InetSocketAddress(getPort()));
serverSock.socket().bind(addr,getAcceptCount());
} else {
// Retrieve the channel provided by the OS
Channel ic = System.inheritedChannel();
if (ic instanceof ServerSocketChannel) {
serverSock = (ServerSocketChannel) ic;
}
if (serverSock == null) {
throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited"));
}
}
serverSock.configureBlocking(true); //mimic APR behavior
// Initialize thread count defaults for acceptor, poller
if (acceptorThreadCount == 0) {
// FIXME: Doesn't seem to work that well with multiple accept threads
acceptorThreadCount = 1;
}
if (pollerThreadCount <= 0) {
//minimum one poller thread
pollerThreadCount = 1;
}
setStopLatch(new CountDownLatch(pollerThreadCount));
// Initialize SSL if needed
initialiseSsl();
selectorPool.open();
}
好了,我們已經(jīng)分析完了init()
方法雀哨,接下來(lái)我們分析start()
方法磕谅。關(guān)鍵代碼就一行,調(diào)用ProtocolHandler.start()
方法震束。
Connector.start()
@Override
protected void startInternal() throws LifecycleException {
// Validate settings before starting
if (getPort() < 0) {
throw new LifecycleException(sm.getString(
"coyoteConnector.invalidPort", Integer.valueOf(getPort())));
}
setState(LifecycleState.STARTING);
try {
protocolHandler.start();
} catch (Exception e) {
throw new LifecycleException(
sm.getString("coyoteConnector.protocolHandlerStartFailed"), e);
}
}
我們深入ProtocolHandler.start()
方法怜庸。
- 調(diào)用
Endpoint.start()
方法 - 開(kāi)啟異步超時(shí)線(xiàn)程,線(xiàn)程執(zhí)行單元為
Asynctimeout
@Override
public void start() throws Exception {
if (getLog().isInfoEnabled()) {
getLog().info(sm.getString("abstractProtocolHandler.start", getName()));
}
// 1. 調(diào)用`Endpoint.start()`方法
endpoint.start();
// Start async timeout thread
// 2. 開(kāi)啟異步超時(shí)線(xiàn)程垢村,線(xiàn)程執(zhí)行單元為`Asynctimeout`
asyncTimeout = new AsyncTimeout();
Thread timeoutThread = new Thread(asyncTimeout, getNameInternal() + "-AsyncTimeout");
int priority = endpoint.getThreadPriority();
if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
priority = Thread.NORM_PRIORITY;
}
timeoutThread.setPriority(priority);
timeoutThread.setDaemon(true);
timeoutThread.start();
}
這兒我們重點(diǎn)關(guān)注Endpoint.start()
方法割疾,主要做的事情如下:
-
bind()
已經(jīng)在init()
中分析過(guò)了 - 創(chuàng)建工作者線(xiàn)程池
- 初始化連接latch,用于限制請(qǐng)求的并發(fā)量
- 開(kāi)啟poller線(xiàn)程嘉栓。poller用于對(duì)接受者線(xiàn)程生產(chǎn)的消息(或事件)進(jìn)行處理宏榕,poller最終調(diào)用的是Handler的代碼
- 開(kāi)啟acceptor線(xiàn)程
public final void start() throws Exception {
// 1. `bind()`已經(jīng)在`init()`中分析過(guò)了
if (bindState == BindState.UNBOUND) {
bind();
bindState = BindState.BOUND_ON_START;
}
startInternal();
}
@Override
public void startInternal() throws Exception {
if (!running) {
running = true;
paused = false;
processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getProcessorCache());
eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getEventCache());
nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getBufferPool());
// Create worker collection
// 2. 創(chuàng)建工作者線(xiàn)程池
if ( getExecutor() == null ) {
createExecutor();
}
// 3. 初始化連接latch,用于限制請(qǐng)求的并發(fā)量
initializeConnectionLatch();
// Start poller threads
// 4. 開(kāi)啟poller線(xiàn)程侵佃。poller用于對(duì)接受者線(xiàn)程生產(chǎn)的消息(或事件)進(jìn)行處理麻昼,poller最終調(diào)用的是Handler的代碼
pollers = new Poller[getPollerThreadCount()];
for (int i=0; i<pollers.length; i++) {
pollers[i] = new Poller();
Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);
pollerThread.start();
}
// 5. 開(kāi)啟acceptor線(xiàn)程
startAcceptorThreads();
}
}
protected final void startAcceptorThreads() {
int count = getAcceptorThreadCount();
acceptors = new Acceptor[count];
for (int i = 0; i < count; i++) {
acceptors[i] = createAcceptor();
String threadName = getName() + "-Acceptor-" + i;
acceptors[i].setThreadName(threadName);
Thread t = new Thread(acceptors[i], threadName);
t.setPriority(getAcceptorThreadPriority());
t.setDaemon(getDaemon());
t.start();
}
}
Connector請(qǐng)求邏輯
分析完了Connector
的啟動(dòng)邏輯之后,我們就需要進(jìn)一步分析一下http的請(qǐng)求邏輯馋辈,當(dāng)請(qǐng)求從客戶(hù)端發(fā)起之后抚芦,需要經(jīng)過(guò)哪些操作才能真正地得到執(zhí)行?tomcat設(shè)計(jì)得非常得精巧和復(fù)雜迈螟,如果沒(méi)有一個(gè)整的調(diào)用邏輯圖叉抡,我們很難在復(fù)雜的代碼中一窺全貌。
警告:過(guò)多的細(xì)節(jié)往往會(huì)掩蓋真相答毫!
先給出調(diào)用鏈路圖~褥民,該圖位于tomcat官網(wǎng) - Apache Tomcat 8 Architecture
Acceptor
在Connector整體結(jié)構(gòu)圖
里面我們看到請(qǐng)求的入口是在Acceptor
。Endpoint.start()
方法會(huì)開(kāi)啟Acceptor線(xiàn)程
來(lái)處理請(qǐng)求洗搂。那么我們接下來(lái)就要分析一下Acceptor線(xiàn)程
中的執(zhí)行邏輯消返。
protected class Acceptor extends AbstractEndpoint.Acceptor {
@Override
public void run() {
int errorDelay = 0;
// Loop until we receive a shutdown command
while (running) {
// Loop if endpoint is paused
// 1. 運(yùn)行過(guò)程中,如果`Endpoint`暫停了耘拇,則`Acceptor`進(jìn)行自旋(間隔50毫秒) `
while (paused && running) {
state = AcceptorState.PAUSED;
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// Ignore
}
}
// 2. 如果`Endpoint`終止運(yùn)行了撵颊,則`Acceptor`也會(huì)終止
if (!running) {
break;
}
state = AcceptorState.RUNNING;
try {
//if we have reached max connections, wait
// 3. 如果請(qǐng)求達(dá)到了最大連接數(shù),則wait直到連接數(shù)降下來(lái)
countUpOrAwaitConnection();
SocketChannel socket = null;
try {
// Accept the next incoming connection from the server
// socket
// 4. 接受下一次連接的socket
socket = serverSock.accept();
} catch (IOException ioe) {
// We didn't get a socket
countDownConnection();
if (running) {
// Introduce delay if necessary
errorDelay = handleExceptionWithDelay(errorDelay);
// re-throw
throw ioe;
} else {
break;
}
}
// Successful accept, reset the error delay
errorDelay = 0;
// Configure the socket
if (running && !paused) {
// setSocketOptions() will hand the socket off to
// an appropriate processor if successful
// 5. `setSocketOptions()`這兒是關(guān)鍵惫叛,會(huì)將socket以事件的方式傳遞給poller
if (!setSocketOptions(socket)) {
closeSocket(socket);
}
} else {
closeSocket(socket);
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error(sm.getString("endpoint.accept.fail"), t);
}
}
state = AcceptorState.ENDED;
}
}
Acceptor.run()
方法會(huì)做下面幾件事情
- 運(yùn)行過(guò)程中倡勇,如果
Endpoint
暫停了,則Acceptor
進(jìn)行自旋(間隔50毫秒) - 如果
Endpoint
終止運(yùn)行了挣棕,則Acceptor
也會(huì)終止 - 如果請(qǐng)求達(dá)到了最大連接數(shù)译隘,則wait直到連接數(shù)降下來(lái)
- 接受下一次連接的socket
-
setSocketOptions()
這兒是關(guān)鍵亲桥,會(huì)將socket以事件的方式傳遞給poller
我們來(lái)分析一下關(guān)鍵的方法setSocketOptions()
。
protected boolean setSocketOptions(SocketChannel socket) {
// Process the connection
try {
//disable blocking, APR style, we are gonna be polling it
socket.configureBlocking(false);
Socket sock = socket.socket();
socketProperties.setProperties(sock);
NioChannel channel = nioChannels.pop();
if (channel == null) {
SocketBufferHandler bufhandler = new SocketBufferHandler(
socketProperties.getAppReadBufSize(),
socketProperties.getAppWriteBufSize(),
socketProperties.getDirectBuffer());
if (isSSLEnabled()) {
channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
} else {
channel = new NioChannel(socket, bufhandler);
}
} else {
channel.setIOChannel(socket);
channel.reset();
}
// 將channel注冊(cè)到poller固耘,注意關(guān)鍵的兩個(gè)方法题篷,`getPoller0()`和`Poller.register()`
getPoller0().register(channel);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
try {
log.error("",t);
} catch (Throwable tt) {
ExceptionUtils.handleThrowable(tt);
}
// Tell to close the socket
return false;
}
return true;
}
將channel注冊(cè)到poller,注意關(guān)鍵的兩個(gè)方法厅目,getPoller0()
和Poller.register()
番枚。先來(lái)分析一下getPoller0()
,該方法比較關(guān)鍵的一個(gè)地方就是以取模的方式
對(duì)poller數(shù)量進(jìn)行輪詢(xún)獲取损敷。
/**
* The socket poller.
*/
private Poller[] pollers = null;
private AtomicInteger pollerRotater = new AtomicInteger(0);
/**
* Return an available poller in true round robin fashion.
*
* @return The next poller in sequence
*/
public Poller getPoller0() {
int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;
return pollers[idx];
}
接下來(lái)我們分析一下Poller.register()
方法葫笼。因?yàn)?code>Poller維持了一個(gè)events同步隊(duì)列
,所以Acceptor
接受到的channel會(huì)放在這個(gè)隊(duì)列里面拗馒,放置的代碼為events.offer(event);
public class Poller implements Runnable {
private final SynchronizedQueue<PollerEvent> events = new SynchronizedQueue<>();
/**
* Registers a newly created socket with the poller.
*
* @param socket The newly created socket
*/
public void register(final NioChannel socket) {
socket.setPoller(this);
NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
socket.setSocketWrapper(ka);
ka.setPoller(this);
ka.setReadTimeout(getSocketProperties().getSoTimeout());
ka.setWriteTimeout(getSocketProperties().getSoTimeout());
ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
ka.setSecure(isSSLEnabled());
ka.setReadTimeout(getConnectionTimeout());
ka.setWriteTimeout(getConnectionTimeout());
PollerEvent r = eventCache.pop();
ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
else r.reset(socket,ka,OP_REGISTER);
addEvent(r);
}
private void addEvent(PollerEvent event) {
events.offer(event);
if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup();
}
}
Poller
Acceptor
生成了事件PollerEvent
路星,那么Poller
必然會(huì)對(duì)這些事件進(jìn)行消費(fèi)。我們來(lái)分析一下Poller.run()
方法诱桂。真正處理key的地方在于processKey(sk, attachment);
洋丐。
public class Poller implements Runnable {
@Override
public void run() {
// Loop until destroy() is called
while (true) {
boolean hasEvents = false;
try {
if (!close) {
hasEvents = events();
if (wakeupCounter.getAndSet(-1) > 0) {
//if we are here, means we have other stuff to do
//do a non blocking select
keyCount = selector.selectNow();
} else {
keyCount = selector.select(selectorTimeout);
}
wakeupCounter.set(0);
}
if (close) {
events();
timeout(0, false);
try {
selector.close();
} catch (IOException ioe) {
log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
}
break;
}
} catch (Throwable x) {
ExceptionUtils.handleThrowable(x);
log.error("",x);
continue;
}
//either we timed out or we woke up, process events first
if ( keyCount == 0 ) hasEvents = (hasEvents | events());
Iterator<SelectionKey> iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null;
// Walk through the collection of ready keys and dispatch
// any active event.
// 對(duì)已經(jīng)準(zhǔn)備好的key進(jìn)行處理
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();
// Attachment may be null if another thread has called
// cancelledKey()
if (attachment == null) {
iterator.remove();
} else {
iterator.remove();
// 真正處理key的地方
processKey(sk, attachment);
}
}//while
//process timeouts
timeout(keyCount,hasEvents);
}//while
getStopLatch().countDown();
}
}
我們接著分析processKey()
,該方法又會(huì)根據(jù)key的類(lèi)型挥等,來(lái)分別處理讀和寫(xiě)友绝。
- 處理讀事件,比如生成Request對(duì)象
- 處理寫(xiě)事件肝劲,比如將生成的Response對(duì)象通過(guò)socket寫(xiě)回客戶(hù)端
protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
try {
if ( close ) {
cancelledKey(sk);
} else if ( sk.isValid() && attachment != null ) {
if (sk.isReadable() || sk.isWritable() ) {
if ( attachment.getSendfileData() != null ) {
processSendfile(sk,attachment, false);
} else {
unreg(sk, attachment, sk.readyOps());
boolean closeSocket = false;
// 1. 處理讀事件迁客,比如生成Request對(duì)象
// Read goes before write
if (sk.isReadable()) {
if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
closeSocket = true;
}
}
// 2. 處理寫(xiě)事件,比如將生成的Response對(duì)象通過(guò)socket寫(xiě)回客戶(hù)端
if (!closeSocket && sk.isWritable()) {
if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
closeSocket = true;
}
}
if (closeSocket) {
cancelledKey(sk);
}
}
}
} else {
//invalid key
cancelledKey(sk);
}
} catch ( CancelledKeyException ckx ) {
cancelledKey(sk);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error("",t);
}
}
我們繼續(xù)來(lái)分析方法processSocket()
辞槐。
- 從
processorCache
里面拿一個(gè)Processor
來(lái)處理socket掷漱,Processor
的實(shí)現(xiàn)為SocketProcessor
- 將
Processor
放到工作線(xiàn)程池中執(zhí)行
public boolean processSocket(SocketWrapperBase<S> socketWrapper,
SocketEvent event, boolean dispatch) {
try {
if (socketWrapper == null) {
return false;
}
// 1. 從`processorCache`里面拿一個(gè)`Processor`來(lái)處理socket,`Processor`的實(shí)現(xiàn)為`SocketProcessor`
SocketProcessorBase<S> sc = processorCache.pop();
if (sc == null) {
sc = createSocketProcessor(socketWrapper, event);
} else {
sc.reset(socketWrapper, event);
}
// 2. 將`Processor`放到工作線(xiàn)程池中執(zhí)行
Executor executor = getExecutor();
if (dispatch && executor != null) {
executor.execute(sc);
} else {
sc.run();
}
} catch (RejectedExecutionException ree) {
getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
return false;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
getLog().error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}
接著我們分析SocketProcessor.doRun()
方法(SocketProcessor.run()
方法最終調(diào)用此方法)催蝗。該方法將處理邏輯交給Handler
處理切威,當(dāng)event為null時(shí)育特,則表明是一個(gè)OPEN_READ
事件丙号。
protected class SocketProcessor extends SocketProcessorBase<NioChannel> {
public SocketProcessor(SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) {
super(socketWrapper, event);
}
@Override
protected void doRun() {
NioChannel socket = socketWrapper.getSocket();
SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
try {
int handshake = -1;
try {
if (key != null) {
if (socket.isHandshakeComplete()) {
// No TLS handshaking required. Let the handler
// process this socket / event combination.
handshake = 0;
} else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT ||
event == SocketEvent.ERROR) {
// Unable to complete the TLS handshake. Treat it as
// if the handshake failed.
handshake = -1;
} else {
handshake = socket.handshake(key.isReadable(), key.isWritable());
// The handshake process reads/writes from/to the
// socket. status may therefore be OPEN_WRITE once
// the handshake completes. However, the handshake
// happens when the socket is opened so the status
// must always be OPEN_READ after it completes. It
// is OK to always set this as it is only used if
// the handshake completes.
event = SocketEvent.OPEN_READ;
}
}
} catch (IOException x) {
handshake = -1;
if (log.isDebugEnabled()) log.debug("Error during SSL handshake",x);
} catch (CancelledKeyException ckx) {
handshake = -1;
}
if (handshake == 0) {
SocketState state = SocketState.OPEN;
// Process the request from this socket
// 將處理邏輯交給`Handler`處理,當(dāng)event為null時(shí)缰冤,則表明是一個(gè)`OPEN_READ`事件
if (event == null) {
state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
} else {
state = getHandler().process(socketWrapper, event);
}
if (state == SocketState.CLOSED) {
close(socket, key);
}
} else if (handshake == -1 ) {
close(socket, key);
} else if (handshake == SelectionKey.OP_READ){
socketWrapper.registerReadInterest();
} else if (handshake == SelectionKey.OP_WRITE){
socketWrapper.registerWriteInterest();
}
} catch (CancelledKeyException cx) {
socket.getPoller().cancelledKey(key);
} catch (VirtualMachineError vme) {
ExceptionUtils.handleThrowable(vme);
} catch (Throwable t) {
log.error("", t);
socket.getPoller().cancelledKey(key);
} finally {
socketWrapper = null;
event = null;
//return to cache
if (running && !paused) {
processorCache.push(this);
}
}
}
}
Handler的實(shí)現(xiàn) -- ConnectionHandler
Handler
的關(guān)鍵方法是process()
犬缨,該方法非常地長(zhǎng),超過(guò)了200行棉浸,前方高能怀薛!
雖然這個(gè)方法有很多條件分支,但是邏輯卻非常清楚迷郑,主要是調(diào)用Processor.process()
方法枝恋。
@Override
public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {
if (getLog().isDebugEnabled()) {
getLog().debug(sm.getString("abstractConnectionHandler.process",
wrapper.getSocket(), status));
}
if (wrapper == null) {
// Nothing to do. Socket has been closed.
return SocketState.CLOSED;
}
S socket = wrapper.getSocket();
Processor processor = connections.get(socket);
if (getLog().isDebugEnabled()) {
getLog().debug(sm.getString("abstractConnectionHandler.connectionsGet",
processor, socket));
}
// Async timeouts are calculated on a dedicated thread and then
// dispatched. Because of delays in the dispatch process, the
// timeout may no longer be required. Check here and avoid
// unnecessary processing.
if (SocketEvent.TIMEOUT == status && (processor == null ||
!processor.isAsync() || !processor.checkAsyncTimeoutGeneration())) {
// This is effectively a NO-OP
return SocketState.OPEN;
}
if (processor != null) {
// Make sure an async timeout doesn't fire
getProtocol().removeWaitingProcessor(processor);
} else if (status == SocketEvent.DISCONNECT || status == SocketEvent.ERROR) {
// Nothing to do. Endpoint requested a close and there is no
// longer a processor associated with this socket.
return SocketState.CLOSED;
}
ContainerThreadMarker.set();
try {
if (processor == null) {
String negotiatedProtocol = wrapper.getNegotiatedProtocol();
if (negotiatedProtocol != null) {
UpgradeProtocol upgradeProtocol =
getProtocol().getNegotiatedProtocol(negotiatedProtocol);
if (upgradeProtocol != null) {
processor = upgradeProtocol.getProcessor(
wrapper, getProtocol().getAdapter());
} else if (negotiatedProtocol.equals("http/1.1")) {
// Explicitly negotiated the default protocol.
// Obtain a processor below.
} else {
// TODO:
// OpenSSL 1.0.2's ALPN callback doesn't support
// failing the handshake with an error if no
// protocol can be negotiated. Therefore, we need to
// fail the connection here. Once this is fixed,
// replace the code below with the commented out
// block.
if (getLog().isDebugEnabled()) {
getLog().debug(sm.getString(
"abstractConnectionHandler.negotiatedProcessor.fail",
negotiatedProtocol));
}
return SocketState.CLOSED;
/*
* To replace the code above once OpenSSL 1.1.0 is
* used.
// Failed to create processor. This is a bug.
throw new IllegalStateException(sm.getString(
"abstractConnectionHandler.negotiatedProcessor.fail",
negotiatedProtocol));
*/
}
}
}
if (processor == null) {
processor = recycledProcessors.pop();
if (getLog().isDebugEnabled()) {
getLog().debug(sm.getString("abstractConnectionHandler.processorPop",
processor));
}
}
if (processor == null) {
processor = getProtocol().createProcessor();
register(processor);
}
processor.setSslSupport(
wrapper.getSslSupport(getProtocol().getClientCertProvider()));
// Associate the processor with the connection
connections.put(socket, processor);
SocketState state = SocketState.CLOSED;
do {
// 關(guān)鍵的代碼创倔,終于找到你了
state = processor.process(wrapper, status);
if (state == SocketState.UPGRADING) {
// Get the HTTP upgrade handler
UpgradeToken upgradeToken = processor.getUpgradeToken();
// Retrieve leftover input
ByteBuffer leftOverInput = processor.getLeftoverInput();
if (upgradeToken == null) {
// Assume direct HTTP/2 connection
UpgradeProtocol upgradeProtocol = getProtocol().getUpgradeProtocol("h2c");
if (upgradeProtocol != null) {
processor = upgradeProtocol.getProcessor(
wrapper, getProtocol().getAdapter());
wrapper.unRead(leftOverInput);
// Associate with the processor with the connection
connections.put(socket, processor);
} else {
if (getLog().isDebugEnabled()) {
getLog().debug(sm.getString(
"abstractConnectionHandler.negotiatedProcessor.fail",
"h2c"));
}
return SocketState.CLOSED;
}
} else {
HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler();
// Release the Http11 processor to be re-used
release(processor);
// Create the upgrade processor
processor = getProtocol().createUpgradeProcessor(wrapper, upgradeToken);
if (getLog().isDebugEnabled()) {
getLog().debug(sm.getString("abstractConnectionHandler.upgradeCreate",
processor, wrapper));
}
wrapper.unRead(leftOverInput);
// Mark the connection as upgraded
wrapper.setUpgraded(true);
// Associate with the processor with the connection
connections.put(socket, processor);
// Initialise the upgrade handler (which may trigger
// some IO using the new protocol which is why the lines
// above are necessary)
// This cast should be safe. If it fails the error
// handling for the surrounding try/catch will deal with
// it.
if (upgradeToken.getInstanceManager() == null) {
httpUpgradeHandler.init((WebConnection) processor);
} else {
ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null);
try {
httpUpgradeHandler.init((WebConnection) processor);
} finally {
upgradeToken.getContextBind().unbind(false, oldCL);
}
}
}
}
} while ( state == SocketState.UPGRADING);
if (state == SocketState.LONG) {
// In the middle of processing a request/response. Keep the
// socket associated with the processor. Exact requirements
// depend on type of long poll
longPoll(wrapper, processor);
if (processor.isAsync()) {
getProtocol().addWaitingProcessor(processor);
}
} else if (state == SocketState.OPEN) {
// In keep-alive but between requests. OK to recycle
// processor. Continue to poll for the next request.
connections.remove(socket);
release(processor);
wrapper.registerReadInterest();
} else if (state == SocketState.SENDFILE) {
// Sendfile in progress. If it fails, the socket will be
// closed. If it works, the socket either be added to the
// poller (or equivalent) to await more data or processed
// if there are any pipe-lined requests remaining.
} else if (state == SocketState.UPGRADED) {
// Don't add sockets back to the poller if this was a
// non-blocking write otherwise the poller may trigger
// multiple read events which may lead to thread starvation
// in the connector. The write() method will add this socket
// to the poller if necessary.
if (status != SocketEvent.OPEN_WRITE) {
longPoll(wrapper, processor);
}
} else if (state == SocketState.SUSPENDED) {
// Don't add sockets back to the poller.
// The resumeProcessing() method will add this socket
// to the poller.
} else {
// Connection closed. OK to recycle the processor. Upgrade
// processors are not recycled.
connections.remove(socket);
if (processor.isUpgrade()) {
UpgradeToken upgradeToken = processor.getUpgradeToken();
HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler();
InstanceManager instanceManager = upgradeToken.getInstanceManager();
if (instanceManager == null) {
httpUpgradeHandler.destroy();
} else {
ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null);
try {
httpUpgradeHandler.destroy();
} finally {
try {
instanceManager.destroyInstance(httpUpgradeHandler);
} catch (Throwable e) {
ExceptionUtils.handleThrowable(e);
getLog().error(sm.getString("abstractConnectionHandler.error"), e);
}
upgradeToken.getContextBind().unbind(false, oldCL);
}
}
} else {
release(processor);
}
}
return state;
} catch(java.net.SocketException e) {
// SocketExceptions are normal
getLog().debug(sm.getString(
"abstractConnectionHandler.socketexception.debug"), e);
} catch (java.io.IOException e) {
// IOExceptions are normal
getLog().debug(sm.getString(
"abstractConnectionHandler.ioexception.debug"), e);
} catch (ProtocolException e) {
// Protocol exceptions normally mean the client sent invalid or
// incomplete data.
getLog().debug(sm.getString(
"abstractConnectionHandler.protocolexception.debug"), e);
}
// Future developers: if you discover any other
// rare-but-nonfatal exceptions, catch them here, and log as
// above.
catch (Throwable e) {
ExceptionUtils.handleThrowable(e);
// any other exception or error is odd. Here we log it
// with "ERROR" level, so it will show up even on
// less-than-verbose logs.
getLog().error(sm.getString("abstractConnectionHandler.error"), e);
} finally {
ContainerThreadMarker.clear();
}
// Make sure socket/processor is removed from the list of current
// connections
connections.remove(socket);
release(processor);
return SocketState.CLOSED;
}
Processor
這兒我們主要關(guān)注的是Processor
對(duì)于讀的操作挖炬,也只有一行代碼拣挪。調(diào)用service()
方法。
public abstract class AbstractProcessorLight implements Processor {
@Override
public SocketState process(SocketWrapperBase<?> socketWrapper, SocketEvent status)
throws IOException {
SocketState state = SocketState.CLOSED;
Iterator<DispatchType> dispatches = null;
do {
if (dispatches != null) {
DispatchType nextDispatch = dispatches.next();
state = dispatch(nextDispatch.getSocketStatus());
} else if (status == SocketEvent.DISCONNECT) {
// Do nothing here, just wait for it to get recycled
} else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) {
state = dispatch(status);
if (state == SocketState.OPEN) {
// There may be pipe-lined data to read. If the data isn't
// processed now, execution will exit this loop and call
// release() which will recycle the processor (and input
// buffer) deleting any pipe-lined data. To avoid this,
// process it now.
state = service(socketWrapper);
}
} else if (status == SocketEvent.OPEN_WRITE) {
// Extra write event likely after async, ignore
state = SocketState.LONG;
} else if (status == SocketEvent.OPEN_READ){
// 調(diào)用`service()`方法
state = service(socketWrapper);
} else {
// Default to closing the socket if the SocketEvent passed in
// is not consistent with the current state of the Processor
state = SocketState.CLOSED;
}
if (getLog().isDebugEnabled()) {
getLog().debug("Socket: [" + socketWrapper +
"], Status in: [" + status +
"], State out: [" + state + "]");
}
if (state != SocketState.CLOSED && isAsync()) {
state = asyncPostProcess();
if (getLog().isDebugEnabled()) {
getLog().debug("Socket: [" + socketWrapper +
"], State after async post processing: [" + state + "]");
}
}
if (dispatches == null || !dispatches.hasNext()) {
// Only returns non-null iterator if there are
// dispatches to process.
dispatches = getIteratorAndClearDispatches();
}
} while (state == SocketState.ASYNC_END ||
dispatches != null && state != SocketState.CLOSED);
return state;
}
}
Processor.service()
方法比較重要的地方就兩點(diǎn)李茫。該方法非常得長(zhǎng)十电,也超過(guò)了200行知押,在此我們不再拷貝此方法的代碼。
- 生成Request和Response對(duì)象
- 調(diào)用
Adapter.service()
方法鹃骂,將生成的Request和Response對(duì)象傳進(jìn)去
Adapter
Adapter
用于連接Connector
和Container
台盯,起到承上啟下的作用。Processor
會(huì)調(diào)用Adapter.service()
方法畏线。我們來(lái)分析一下静盅,主要做了下面幾件事情:
- 根據(jù)coyote框架的request和response對(duì)象,生成connector的request和response對(duì)象(是HttpServletRequest和HttpServletResponse的封裝)
- 補(bǔ)充header
- 解析請(qǐng)求寝殴,該方法會(huì)出現(xiàn)代理服務(wù)器温亲、設(shè)置必要的header等操作
- 真正進(jìn)入容器的地方,調(diào)用Engine容器下pipeline的閥門(mén)
@Override
public void service(org.apache.coyote.Request req, org.apache.coyote.Response res)
throws Exception {
// 1. 根據(jù)coyote框架的request和response對(duì)象杯矩,生成connector的request和response對(duì)象(是HttpServletRequest和HttpServletResponse的封裝)
Request request = (Request) req.getNote(ADAPTER_NOTES);
Response response = (Response) res.getNote(ADAPTER_NOTES);
if (request == null) {
// Create objects
request = connector.createRequest();
request.setCoyoteRequest(req);
response = connector.createResponse();
response.setCoyoteResponse(res);
// Link objects
request.setResponse(response);
response.setRequest(request);
// Set as notes
req.setNote(ADAPTER_NOTES, request);
res.setNote(ADAPTER_NOTES, response);
// Set query string encoding
req.getParameters().setQueryStringCharset(connector.getURICharset());
}
// 2. 補(bǔ)充header
if (connector.getXpoweredBy()) {
response.addHeader("X-Powered-By", POWERED_BY);
}
boolean async = false;
boolean postParseSuccess = false;
req.getRequestProcessor().setWorkerThreadName(THREAD_NAME.get());
try {
// Parse and set Catalina and configuration specific
// request parameters
// 3. 解析請(qǐng)求栈虚,該方法會(huì)出現(xiàn)代理服務(wù)器、設(shè)置必要的header等操作
postParseSuccess = postParseRequest(req, request, res, response);
if (postParseSuccess) {
//check valves if we support async
request.setAsyncSupported(
connector.getService().getContainer().getPipeline().isAsyncSupported());
// Calling the container
// 4. 真正進(jìn)入容器的地方史隆,調(diào)用Engine容器下pipeline的閥門(mén)
connector.getService().getContainer().getPipeline().getFirst().invoke(
request, response);
}
if (request.isAsync()) {
async = true;
ReadListener readListener = req.getReadListener();
if (readListener != null && request.isFinished()) {
// Possible the all data may have been read during service()
// method so this needs to be checked here
ClassLoader oldCL = null;
try {
oldCL = request.getContext().bind(false, null);
if (req.sendAllDataReadEvent()) {
req.getReadListener().onAllDataRead();
}
} finally {
request.getContext().unbind(false, oldCL);
}
}
Throwable throwable =
(Throwable) request.getAttribute(RequestDispatcher.ERROR_EXCEPTION);
// If an async request was started, is not going to end once
// this container thread finishes and an error occurred, trigger
// the async error process
if (!request.isAsyncCompleting() && throwable != null) {
request.getAsyncContextInternal().setErrorState(throwable, true);
}
} else {
request.finishRequest();
response.finishResponse();
}
} catch (IOException e) {
// Ignore
} finally {
AtomicBoolean error = new AtomicBoolean(false);
res.action(ActionCode.IS_ERROR, error);
if (request.isAsyncCompleting() && error.get()) {
// Connection will be forcibly closed which will prevent
// completion happening at the usual point. Need to trigger
// call to onComplete() here.
res.action(ActionCode.ASYNC_POST_PROCESS, null);
async = false;
}
// Access log
if (!async && postParseSuccess) {
// Log only if processing was invoked.
// If postParseRequest() failed, it has already logged it.
Context context = request.getContext();
// If the context is null, it is likely that the endpoint was
// shutdown, this connection closed and the request recycled in
// a different thread. That thread will have updated the access
// log so it is OK not to update the access log here in that
// case.
if (context != null) {
context.logAccess(request, response,
System.currentTimeMillis() - req.getStartTime(), false);
}
}
req.getRequestProcessor().setWorkerThreadName(null);
// Recycle the wrapper request and response
if (!async) {
request.recycle();
response.recycle();
}
}
}
總結(jié)
本文我們首先拋出了理解Connector
前需要解答的4個(gè)問(wèn)題魂务。然后給出了整體結(jié)構(gòu)圖,并分析結(jié)構(gòu)圖中的各個(gè)組件及其關(guān)聯(lián)關(guān)系泌射。最后粘姜,我們根據(jù)整體結(jié)構(gòu)圖分析了Connector
的啟動(dòng)邏輯和請(qǐng)求邏輯(內(nèi)部邏輯可謂是非常細(xì)節(jié)和復(fù)雜)。
通過(guò)上面的源碼分析熔酷,我們終于清楚了Connector解決了什么問(wèn)題
孤紧,結(jié)構(gòu)是怎樣的
,內(nèi)部又是如何工作的
拒秘。