深入理解Tomcat(十)Connector

前言

終于進(jìn)行到Connector的分析階段了,這也是Tomcat里面最復(fù)雜的一塊功能了。Connector中文名為連接器辛藻,既然是連接器,它肯定會(huì)連接某些東西,連接些什么呢?

Connector用于接受請(qǐng)求并將請(qǐng)求封裝成Request和Response猛铅,然后交給Container進(jìn)行處理,Container處理完之后再交給Connector返回給客戶(hù)端。

要理解Connector,我們需要問(wèn)自己4個(gè)問(wèn)題腌歉。

  • (1)Connector如何接受請(qǐng)求的桂塞?
  • (2)如何將請(qǐng)求封裝成Request和Response的汰瘫?
  • (3)封裝完之后的Request和Response如何交給Container進(jìn)行處理的趴乡?
  • (4)Container處理完之后如何交給Connector并返回給客戶(hù)端的?

先來(lái)一張Connector的整體結(jié)構(gòu)圖

Connector整體結(jié)構(gòu)圖

【注意】:不同的協(xié)議、不同的通信方式,ProtocolHandler會(huì)有不同的實(shí)現(xiàn)。在Tomcat8.5中区匠,ProtocolHandler的類(lèi)繼承層級(jí)如下圖所示速客。

ProtocolHandler類(lèi)繼承層級(jí)

針對(duì)上述的類(lèi)繼承層級(jí)圖位喂,我們做如下說(shuō)明:

  1. ajp和http11是兩種不同的協(xié)議
  2. nio、nio2和apr是不同的通信方式
  3. 協(xié)議和通信方式可以相互組合规婆。

ProtocolHandler包含三個(gè)部件:Endpoint耘戚、ProcessorAdapter

  1. Endpoint用來(lái)處理底層Socket的網(wǎng)絡(luò)連接部服,Processor用于將Endpoint接收到的Socket封裝成Request赵抢,Adapter用于將Request交給Container進(jìn)行具體的處理宠叼。
  2. Endpoint由于是處理底層的Socket網(wǎng)絡(luò)連接摩渺,因此Endpoint是用來(lái)實(shí)現(xiàn)TCP/IP協(xié)議的横侦,而Processor用來(lái)實(shí)現(xiàn)HTTP協(xié)議的,Adapter將請(qǐng)求適配到Servlet容器進(jìn)行具體的處理伤疙。
  3. Endpoint的抽象實(shí)現(xiàn)類(lèi)AbstractEndpoint里面定義了AcceptorAsyncTimeout兩個(gè)內(nèi)部類(lèi)和一個(gè)Handler接口蛙讥。Acceptor用于監(jiān)聽(tīng)請(qǐng)求旁涤,AsyncTimeout用于檢查異步Request的超時(shí)闻妓,Handler用于處理接收到的Socket注祖,在內(nèi)部調(diào)用Processor進(jìn)行處理。

至此均唉,我們已經(jīng)明白了問(wèn)題(1)是晨、(2)和(3)。至于(4)舔箭,當(dāng)我們了解了Container自然就明白了,前面章節(jié)內(nèi)容已經(jīng)詳細(xì)分析過(guò)了限嫌。

Connector源碼分析入口

我們?cè)?code>Service標(biāo)準(zhǔn)實(shí)現(xiàn)StandardService的源碼中發(fā)現(xiàn)靴庆,其init()start()怒医、stop()destroy()方法分別會(huì)對(duì)Connectors的同名方法進(jìn)行調(diào)用炉抒。而一個(gè)Service對(duì)應(yīng)著多個(gè)Connector。限于篇幅稚叹,本章不再羅列這部分代碼焰薄,需要讀者自行閱讀tomcat源碼拿诸。

【注】:本章我們僅對(duì)http1.1協(xié)議且nio通信方式的相關(guān)代碼進(jìn)行分析。

Connector啟動(dòng)邏輯

我們知道Connector實(shí)現(xiàn)了Lifecycle接口塞茅,所以它是一個(gè)生命周期組件亩码。所以Connector的啟動(dòng)邏輯入口在于init()start()

Connector構(gòu)造方法

在分析之前野瘦,我們看看server.xml描沟,該文件已經(jīng)體現(xiàn)出了tomcat中各個(gè)組件的大體結(jié)構(gòu)。

<?xml version='1.0' encoding='utf-8'?>
<Server port="8005" shutdown="SHUTDOWN">
  <Listener className="org.apache.catalina.startup.VersionLoggerListener" />
  <Listener className="org.apache.catalina.core.AprLifecycleListener" SSLEngine="on" />
  <Listener className="org.apache.catalina.core.JreMemoryLeakPreventionListener" />
  <Listener className="org.apache.catalina.mbeans.GlobalResourcesLifecycleListener" />
  <Listener className="org.apache.catalina.core.ThreadLocalLeakPreventionListener" />

  <GlobalNamingResources>
    <Resource name="UserDatabase" auth="Container"
              type="org.apache.catalina.UserDatabase"
              description="User database that can be updated and saved"
              factory="org.apache.catalina.users.MemoryUserDatabaseFactory"
              pathname="conf/tomcat-users.xml" />
  </GlobalNamingResources>

  <Service name="Catalina">
    <Connector port="8080" protocol="HTTP/1.1" connectionTimeout="20000" redirectPort="8443" />
    <Connector port="8009" protocol="AJP/1.3" redirectPort="8443" />

    <Engine name="Catalina" defaultHost="localhost">
      <Realm className="org.apache.catalina.realm.LockOutRealm">
        <Realm className="org.apache.catalina.realm.UserDatabaseRealm"
               resourceName="UserDatabase"/>
      </Realm>

      <Host name="localhost"  appBase="webapps"
            unpackWARs="true" autoDeploy="true">
        <Valve className="org.apache.catalina.valves.AccessLogValve" directory="logs"
               prefix="localhost_access_log" suffix=".txt"
               pattern="%h %l %u %t &quot;%r&quot; %s %b" />
      </Host>
    </Engine>
  </Service>
</Server>

在這個(gè)文件中鞭光,我們看到一個(gè)Connector有幾個(gè)關(guān)鍵屬性吏廉,portprotocol是其中的兩個(gè)。server.xml默認(rèn)支持兩種協(xié)議:HTTP/1.1AJP/1.3惰许。其中HTTP/1.1用于支持http1.1協(xié)議席覆,而AJP/1.3用于支持對(duì)apache服務(wù)器的通信。

接下來(lái)我們看看構(gòu)造方法汹买。

public Connector() {
    this(null); // 1. 無(wú)參構(gòu)造方法佩伤,傳入?yún)?shù)為空協(xié)議,會(huì)默認(rèn)使用`HTTP/1.1`
}

public Connector(String protocol) {
    setProtocol(protocol);
    // Instantiate protocol handler
    // 5. 使用protocolHandler的類(lèi)名構(gòu)造ProtocolHandler的實(shí)例
    ProtocolHandler p = null;
    try {
        Class<?> clazz = Class.forName(protocolHandlerClassName);
        p = (ProtocolHandler) clazz.getConstructor().newInstance();
    } catch (Exception e) {
        log.error(sm.getString(
                "coyoteConnector.protocolHandlerInstantiationFailed"), e);
    } finally {
        this.protocolHandler = p;
    }

    if (Globals.STRICT_SERVLET_COMPLIANCE) {
        uriCharset = StandardCharsets.ISO_8859_1;
    } else {
        uriCharset = StandardCharsets.UTF_8;
    }
}

@Deprecated
public void setProtocol(String protocol) {
    boolean aprConnector = AprLifecycleListener.isAprAvailable() &&
            AprLifecycleListener.getUseAprConnector();

    // 2. `HTTP/1.1`或`null`晦毙,protocolHandler使用`org.apache.coyote.http11.Http11NioProtocol`生巡,不考慮apr
    if ("HTTP/1.1".equals(protocol) || protocol == null) {
        if (aprConnector) {
            setProtocolHandlerClassName("org.apache.coyote.http11.Http11AprProtocol");
        } else {
            setProtocolHandlerClassName("org.apache.coyote.http11.Http11NioProtocol");
        }
    }
    // 3. `AJP/1.3`,protocolHandler使用`org.apache.coyote.ajp.AjpNioProtocol`结序,不考慮apr
    else if ("AJP/1.3".equals(protocol)) {
        if (aprConnector) {
            setProtocolHandlerClassName("org.apache.coyote.ajp.AjpAprProtocol");
        } else {
            setProtocolHandlerClassName("org.apache.coyote.ajp.AjpNioProtocol");
        }
    }
    // 4. 其他情況障斋,使用傳入的protocol作為protocolHandler的類(lèi)名
    else {
        setProtocolHandlerClassName(protocol);
    }
}

從上面的代碼我們看到構(gòu)造方法主要做了下面幾件事情:

  1. 無(wú)參構(gòu)造方法纵潦,傳入?yún)?shù)為空協(xié)議徐鹤,會(huì)默認(rèn)使用HTTP/1.1
  2. HTTP/1.1null,protocolHandler使用org.apache.coyote.http11.Http11NioProtocol邀层,不考慮apr
  3. AJP/1.3返敬,protocolHandler使用org.apache.coyote.ajp.AjpNioProtocol,不考慮apr
  4. 其他情況寥院,使用傳入的protocol作為protocolHandler的類(lèi)名
  5. 使用protocolHandler的類(lèi)名構(gòu)造ProtocolHandler的實(shí)例

Connector.init()

@Override
protected void initInternal() throws LifecycleException {
    super.initInternal();

    // Initialize adapter
    // 1. 初始化adapter
    adapter = new CoyoteAdapter(this);
    protocolHandler.setAdapter(adapter);

    // Make sure parseBodyMethodsSet has a default
    // 2. 設(shè)置接受body的method列表劲赠,默認(rèn)為POST
    if (null == parseBodyMethodsSet) {
        setParseBodyMethods(getParseBodyMethods());
    }

    if (protocolHandler.isAprRequired() && !AprLifecycleListener.isAprAvailable()) {
        throw new LifecycleException(sm.getString("coyoteConnector.protocolHandlerNoApr",
                getProtocolHandlerClassName()));
    }
    if (AprLifecycleListener.isAprAvailable() && AprLifecycleListener.getUseOpenSSL() &&
            protocolHandler instanceof AbstractHttp11JsseProtocol) {
        AbstractHttp11JsseProtocol<?> jsseProtocolHandler =
                (AbstractHttp11JsseProtocol<?>) protocolHandler;
        if (jsseProtocolHandler.isSSLEnabled() &&
                jsseProtocolHandler.getSslImplementationName() == null) {
            // OpenSSL is compatible with the JSSE configuration, so use it if APR is available
            jsseProtocolHandler.setSslImplementationName(OpenSSLImplementation.class.getName());
        }
    }

    // 3. 初始化protocolHandler
    try {
        protocolHandler.init();
    } catch (Exception e) {
        throw new LifecycleException(
                sm.getString("coyoteConnector.protocolHandlerInitializationFailed"), e);
    }
}

init()方法做了3件事情

  1. 初始化adapter
  2. 設(shè)置接受body的method列表,默認(rèn)為POST
  3. 初始化protocolHandler

ProtocolHandler類(lèi)繼承層級(jí)我們知道ProtocolHandler的子類(lèi)都必須實(shí)現(xiàn)AbstractProtocol抽象類(lèi)秸谢,而protocolHandler.init();的邏輯代碼正是在這個(gè)抽象類(lèi)里面凛澎。我們來(lái)分析一下。

@Override
public void init() throws Exception {
    if (getLog().isInfoEnabled()) {
        getLog().info(sm.getString("abstractProtocolHandler.init", getName()));
    }

    if (oname == null) {
        // Component not pre-registered so register it
        oname = createObjectName();
        if (oname != null) {
            Registry.getRegistry(null, null).registerComponent(this, oname, null);
        }
    }

    if (this.domain != null) {
        rgOname = new ObjectName(domain + ":type=GlobalRequestProcessor,name=" + getName());
        Registry.getRegistry(null, null).registerComponent(
                getHandler().getGlobal(), rgOname, null);
    }

    // 1. 設(shè)置endpoint的名字估蹄,默認(rèn)為:http-nio-{port}
    String endpointName = getName();
    endpoint.setName(endpointName.substring(1, endpointName.length()-1));
    endpoint.setDomain(domain);
    
    // 2. 初始化endpoint
    endpoint.init();
}

我們接著分析一下Endpoint.init()里面又做了什么塑煎。該方法位于AbstactEndpoint抽象類(lèi),該類(lèi)是基于模板方法模式實(shí)現(xiàn)的臭蚁,主要調(diào)用了子類(lèi)的bind()方法最铁。

public abstract void bind() throws Exception;
public abstract void unbind() throws Exception;
public abstract void startInternal() throws Exception;
public abstract void stopInternal() throws Exception;

public void init() throws Exception {
    // 執(zhí)行bind()方法
    if (bindOnInit) {
        bind();
        bindState = BindState.BOUND_ON_INIT;
    }
    if (this.domain != null) {
        // Register endpoint (as ThreadPool - historical name)
        oname = new ObjectName(domain + ":type=ThreadPool,name=\"" + getName() + "\"");
        Registry.getRegistry(null, null).registerComponent(this, oname, null);

        ObjectName socketPropertiesOname = new ObjectName(domain +
                ":type=ThreadPool,name=\"" + getName() + "\",subType=SocketProperties");
        socketProperties.setObjectName(socketPropertiesOname);
        Registry.getRegistry(null, null).registerComponent(socketProperties, socketPropertiesOname, null);

        for (SSLHostConfig sslHostConfig : findSslHostConfigs()) {
            registerJmx(sslHostConfig);
        }
    }
}

繼續(xù)分析bind()方法讯赏,我們終于看到了我們想要看的東西了。關(guān)鍵的代碼在于serverSock.socket().bind(addr,getAcceptCount());冷尉,用于綁定ServerSocket到指定的端口漱挎。

@Override
public void bind() throws Exception {

    if (!getUseInheritedChannel()) {
        serverSock = ServerSocketChannel.open();
        socketProperties.setProperties(serverSock.socket());
        InetSocketAddress addr = (getAddress()!=null?new InetSocketAddress(getAddress(),getPort()):new InetSocketAddress(getPort()));
        serverSock.socket().bind(addr,getAcceptCount());
    } else {
        // Retrieve the channel provided by the OS
        Channel ic = System.inheritedChannel();
        if (ic instanceof ServerSocketChannel) {
            serverSock = (ServerSocketChannel) ic;
        }
        if (serverSock == null) {
            throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited"));
        }
    }
    serverSock.configureBlocking(true); //mimic APR behavior

    // Initialize thread count defaults for acceptor, poller
    if (acceptorThreadCount == 0) {
        // FIXME: Doesn't seem to work that well with multiple accept threads
        acceptorThreadCount = 1;
    }
    if (pollerThreadCount <= 0) {
        //minimum one poller thread
        pollerThreadCount = 1;
    }
    setStopLatch(new CountDownLatch(pollerThreadCount));

    // Initialize SSL if needed
    initialiseSsl();

    selectorPool.open();
}

好了,我們已經(jīng)分析完了init()方法雀哨,接下來(lái)我們分析start()方法磕谅。關(guān)鍵代碼就一行,調(diào)用ProtocolHandler.start()方法震束。

Connector.start()

@Override
protected void startInternal() throws LifecycleException {

    // Validate settings before starting
    if (getPort() < 0) {
        throw new LifecycleException(sm.getString(
                "coyoteConnector.invalidPort", Integer.valueOf(getPort())));
    }

    setState(LifecycleState.STARTING);

    try {
        protocolHandler.start();
    } catch (Exception e) {
        throw new LifecycleException(
                sm.getString("coyoteConnector.protocolHandlerStartFailed"), e);
    }
}

我們深入ProtocolHandler.start()方法怜庸。

  1. 調(diào)用Endpoint.start()方法
  2. 開(kāi)啟異步超時(shí)線(xiàn)程,線(xiàn)程執(zhí)行單元為Asynctimeout
@Override
public void start() throws Exception {
    if (getLog().isInfoEnabled()) {
        getLog().info(sm.getString("abstractProtocolHandler.start", getName()));
    }

    // 1. 調(diào)用`Endpoint.start()`方法
    endpoint.start();

    // Start async timeout thread
    // 2. 開(kāi)啟異步超時(shí)線(xiàn)程垢村,線(xiàn)程執(zhí)行單元為`Asynctimeout`
    asyncTimeout = new AsyncTimeout();
    Thread timeoutThread = new Thread(asyncTimeout, getNameInternal() + "-AsyncTimeout");
    int priority = endpoint.getThreadPriority();
    if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
        priority = Thread.NORM_PRIORITY;
    }
    timeoutThread.setPriority(priority);
    timeoutThread.setDaemon(true);
    timeoutThread.start();
}

這兒我們重點(diǎn)關(guān)注Endpoint.start()方法割疾,主要做的事情如下:

  1. bind()已經(jīng)在init()中分析過(guò)了
  2. 創(chuàng)建工作者線(xiàn)程池
  3. 初始化連接latch,用于限制請(qǐng)求的并發(fā)量
  4. 開(kāi)啟poller線(xiàn)程嘉栓。poller用于對(duì)接受者線(xiàn)程生產(chǎn)的消息(或事件)進(jìn)行處理宏榕,poller最終調(diào)用的是Handler的代碼
  5. 開(kāi)啟acceptor線(xiàn)程
public final void start() throws Exception {
    // 1. `bind()`已經(jīng)在`init()`中分析過(guò)了
    if (bindState == BindState.UNBOUND) {
        bind();
        bindState = BindState.BOUND_ON_START;
    }
    startInternal();
}

@Override
public void startInternal() throws Exception {
    if (!running) {
        running = true;
        paused = false;

        processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                socketProperties.getProcessorCache());
        eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                        socketProperties.getEventCache());
        nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                socketProperties.getBufferPool());

        // Create worker collection
        // 2. 創(chuàng)建工作者線(xiàn)程池
        if ( getExecutor() == null ) {
            createExecutor();
        }
        
        // 3. 初始化連接latch,用于限制請(qǐng)求的并發(fā)量
        initializeConnectionLatch();

        // Start poller threads
        // 4. 開(kāi)啟poller線(xiàn)程侵佃。poller用于對(duì)接受者線(xiàn)程生產(chǎn)的消息(或事件)進(jìn)行處理麻昼,poller最終調(diào)用的是Handler的代碼
        pollers = new Poller[getPollerThreadCount()];
        for (int i=0; i<pollers.length; i++) {
            pollers[i] = new Poller();
            Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
            pollerThread.setPriority(threadPriority);
            pollerThread.setDaemon(true);
            pollerThread.start();
        }
        // 5. 開(kāi)啟acceptor線(xiàn)程
        startAcceptorThreads();
    }
}

protected final void startAcceptorThreads() {
    int count = getAcceptorThreadCount();
    acceptors = new Acceptor[count];

    for (int i = 0; i < count; i++) {
        acceptors[i] = createAcceptor();
        String threadName = getName() + "-Acceptor-" + i;
        acceptors[i].setThreadName(threadName);
        Thread t = new Thread(acceptors[i], threadName);
        t.setPriority(getAcceptorThreadPriority());
        t.setDaemon(getDaemon());
        t.start();
    }
}

Connector請(qǐng)求邏輯

分析完了Connector的啟動(dòng)邏輯之后,我們就需要進(jìn)一步分析一下http的請(qǐng)求邏輯馋辈,當(dāng)請(qǐng)求從客戶(hù)端發(fā)起之后抚芦,需要經(jīng)過(guò)哪些操作才能真正地得到執(zhí)行?tomcat設(shè)計(jì)得非常得精巧和復(fù)雜迈螟,如果沒(méi)有一個(gè)整的調(diào)用邏輯圖叉抡,我們很難在復(fù)雜的代碼中一窺全貌。

警告:過(guò)多的細(xì)節(jié)往往會(huì)掩蓋真相答毫!

先給出調(diào)用鏈路圖~褥民,該圖位于tomcat官網(wǎng) - Apache Tomcat 8 Architecture

調(diào)用鏈路圖

Acceptor

Connector整體結(jié)構(gòu)圖里面我們看到請(qǐng)求的入口是在AcceptorEndpoint.start()方法會(huì)開(kāi)啟Acceptor線(xiàn)程來(lái)處理請(qǐng)求洗搂。那么我們接下來(lái)就要分析一下Acceptor線(xiàn)程中的執(zhí)行邏輯消返。

protected class Acceptor extends AbstractEndpoint.Acceptor {
    @Override
    public void run() {
        int errorDelay = 0;

        // Loop until we receive a shutdown command
        while (running) {

            // Loop if endpoint is paused
            // 1. 運(yùn)行過(guò)程中,如果`Endpoint`暫停了耘拇,則`Acceptor`進(jìn)行自旋(間隔50毫秒) `       
            while (paused && running) {
                state = AcceptorState.PAUSED;
                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    // Ignore
                }
            }
            // 2. 如果`Endpoint`終止運(yùn)行了撵颊,則`Acceptor`也會(huì)終止
            if (!running) {
                break;
            }
            state = AcceptorState.RUNNING;

            try {
                //if we have reached max connections, wait
                // 3. 如果請(qǐng)求達(dá)到了最大連接數(shù),則wait直到連接數(shù)降下來(lái)
                countUpOrAwaitConnection();

                SocketChannel socket = null;
                try {
                    // Accept the next incoming connection from the server
                    // socket
                    // 4. 接受下一次連接的socket
                    socket = serverSock.accept();
                } catch (IOException ioe) {
                    // We didn't get a socket
                    countDownConnection();
                    if (running) {
                        // Introduce delay if necessary
                        errorDelay = handleExceptionWithDelay(errorDelay);
                        // re-throw
                        throw ioe;
                    } else {
                        break;
                    }
                }
                // Successful accept, reset the error delay
                errorDelay = 0;

                // Configure the socket
                if (running && !paused) {
                    // setSocketOptions() will hand the socket off to
                    // an appropriate processor if successful
                    // 5. `setSocketOptions()`這兒是關(guān)鍵惫叛,會(huì)將socket以事件的方式傳遞給poller
                    if (!setSocketOptions(socket)) {
                        closeSocket(socket);
                    }
                } else {
                    closeSocket(socket);
                }
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                log.error(sm.getString("endpoint.accept.fail"), t);
            }
        }
        state = AcceptorState.ENDED;
    }
}

Acceptor.run()方法會(huì)做下面幾件事情

  1. 運(yùn)行過(guò)程中倡勇,如果Endpoint暫停了,則Acceptor進(jìn)行自旋(間隔50毫秒)
  2. 如果Endpoint終止運(yùn)行了挣棕,則Acceptor也會(huì)終止
  3. 如果請(qǐng)求達(dá)到了最大連接數(shù)译隘,則wait直到連接數(shù)降下來(lái)
  4. 接受下一次連接的socket
  5. setSocketOptions()這兒是關(guān)鍵亲桥,會(huì)將socket以事件的方式傳遞給poller

我們來(lái)分析一下關(guān)鍵的方法setSocketOptions()

protected boolean setSocketOptions(SocketChannel socket) {
    // Process the connection
    try {
        //disable blocking, APR style, we are gonna be polling it
        socket.configureBlocking(false);
        Socket sock = socket.socket();
        socketProperties.setProperties(sock);

        NioChannel channel = nioChannels.pop();
        if (channel == null) {
            SocketBufferHandler bufhandler = new SocketBufferHandler(
                    socketProperties.getAppReadBufSize(),
                    socketProperties.getAppWriteBufSize(),
                    socketProperties.getDirectBuffer());
            if (isSSLEnabled()) {
                channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
            } else {
                channel = new NioChannel(socket, bufhandler);
            }
        } else {
            channel.setIOChannel(socket);
            channel.reset();
        }
        // 將channel注冊(cè)到poller固耘,注意關(guān)鍵的兩個(gè)方法题篷,`getPoller0()`和`Poller.register()`
        getPoller0().register(channel);
    } catch (Throwable t) {
        ExceptionUtils.handleThrowable(t);
        try {
            log.error("",t);
        } catch (Throwable tt) {
            ExceptionUtils.handleThrowable(tt);
        }
        // Tell to close the socket
        return false;
    }
    return true;
}

將channel注冊(cè)到poller,注意關(guān)鍵的兩個(gè)方法厅目,getPoller0()Poller.register()番枚。先來(lái)分析一下getPoller0(),該方法比較關(guān)鍵的一個(gè)地方就是以取模的方式對(duì)poller數(shù)量進(jìn)行輪詢(xún)獲取损敷。

/**
 * The socket poller.
 */
private Poller[] pollers = null;
private AtomicInteger pollerRotater = new AtomicInteger(0);
/**
 * Return an available poller in true round robin fashion.
 *
 * @return The next poller in sequence
 */
public Poller getPoller0() {
    int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;
    return pollers[idx];
}

接下來(lái)我們分析一下Poller.register()方法葫笼。因?yàn)?code>Poller維持了一個(gè)events同步隊(duì)列,所以Acceptor接受到的channel會(huì)放在這個(gè)隊(duì)列里面拗馒,放置的代碼為events.offer(event);

public class Poller implements Runnable {

    private final SynchronizedQueue<PollerEvent> events = new SynchronizedQueue<>();

    /**
     * Registers a newly created socket with the poller.
     *
     * @param socket    The newly created socket
     */
    public void register(final NioChannel socket) {
        socket.setPoller(this);
        NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
        socket.setSocketWrapper(ka);
        ka.setPoller(this);
        ka.setReadTimeout(getSocketProperties().getSoTimeout());
        ka.setWriteTimeout(getSocketProperties().getSoTimeout());
        ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
        ka.setSecure(isSSLEnabled());
        ka.setReadTimeout(getConnectionTimeout());
        ka.setWriteTimeout(getConnectionTimeout());
        PollerEvent r = eventCache.pop();
        ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
        if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
        else r.reset(socket,ka,OP_REGISTER);
        addEvent(r);
    }

    private void addEvent(PollerEvent event) {
        events.offer(event);
        if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup();
    }
}

Poller

Acceptor生成了事件PollerEvent路星,那么Poller必然會(huì)對(duì)這些事件進(jìn)行消費(fèi)。我們來(lái)分析一下Poller.run()方法诱桂。真正處理key的地方在于processKey(sk, attachment);洋丐。

public class Poller implements Runnable {
    @Override
    public void run() {
        // Loop until destroy() is called
        while (true) {

            boolean hasEvents = false;

            try {
                if (!close) {
                    hasEvents = events();
                    if (wakeupCounter.getAndSet(-1) > 0) {
                        //if we are here, means we have other stuff to do
                        //do a non blocking select
                        keyCount = selector.selectNow();
                    } else {
                        keyCount = selector.select(selectorTimeout);
                    }
                    wakeupCounter.set(0);
                }
                if (close) {
                    events();
                    timeout(0, false);
                    try {
                        selector.close();
                    } catch (IOException ioe) {
                        log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
                    }
                    break;
                }
            } catch (Throwable x) {
                ExceptionUtils.handleThrowable(x);
                log.error("",x);
                continue;
            }
            //either we timed out or we woke up, process events first
            if ( keyCount == 0 ) hasEvents = (hasEvents | events());

            Iterator<SelectionKey> iterator =
                keyCount > 0 ? selector.selectedKeys().iterator() : null;
            // Walk through the collection of ready keys and dispatch
            // any active event.
            // 對(duì)已經(jīng)準(zhǔn)備好的key進(jìn)行處理
            while (iterator != null && iterator.hasNext()) {
                SelectionKey sk = iterator.next();
                NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();
                // Attachment may be null if another thread has called
                // cancelledKey()
                if (attachment == null) {
                    iterator.remove();
                } else {
                    iterator.remove();
                    // 真正處理key的地方
                    processKey(sk, attachment);
                }
            }//while

            //process timeouts
            timeout(keyCount,hasEvents);
        }//while

        getStopLatch().countDown();
    }
}

我們接著分析processKey(),該方法又會(huì)根據(jù)key的類(lèi)型挥等,來(lái)分別處理讀和寫(xiě)友绝。

  1. 處理讀事件,比如生成Request對(duì)象
  2. 處理寫(xiě)事件肝劲,比如將生成的Response對(duì)象通過(guò)socket寫(xiě)回客戶(hù)端
protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
    try {
        if ( close ) {
            cancelledKey(sk);
        } else if ( sk.isValid() && attachment != null ) {
            if (sk.isReadable() || sk.isWritable() ) {
                if ( attachment.getSendfileData() != null ) {
                    processSendfile(sk,attachment, false);
                } else {
                    unreg(sk, attachment, sk.readyOps());
                    boolean closeSocket = false;
                    // 1. 處理讀事件迁客,比如生成Request對(duì)象
                    // Read goes before write
                    if (sk.isReadable()) {
                        if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
                            closeSocket = true;
                        }
                    }
                    // 2. 處理寫(xiě)事件,比如將生成的Response對(duì)象通過(guò)socket寫(xiě)回客戶(hù)端
                    if (!closeSocket && sk.isWritable()) {
                        if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
                            closeSocket = true;
                        }
                    }
                    if (closeSocket) {
                        cancelledKey(sk);
                    }
                }
            }
        } else {
            //invalid key
            cancelledKey(sk);
        }
    } catch ( CancelledKeyException ckx ) {
        cancelledKey(sk);
    } catch (Throwable t) {
        ExceptionUtils.handleThrowable(t);
        log.error("",t);
    }
}

我們繼續(xù)來(lái)分析方法processSocket()辞槐。

  1. processorCache里面拿一個(gè)Processor來(lái)處理socket掷漱,Processor的實(shí)現(xiàn)為SocketProcessor
  2. Processor放到工作線(xiàn)程池中執(zhí)行
public boolean processSocket(SocketWrapperBase<S> socketWrapper,
        SocketEvent event, boolean dispatch) {
    try {
        if (socketWrapper == null) {
            return false;
        }
        // 1. 從`processorCache`里面拿一個(gè)`Processor`來(lái)處理socket,`Processor`的實(shí)現(xiàn)為`SocketProcessor`
        SocketProcessorBase<S> sc = processorCache.pop();
        if (sc == null) {
            sc = createSocketProcessor(socketWrapper, event);
        } else {
            sc.reset(socketWrapper, event);
        }
        // 2. 將`Processor`放到工作線(xiàn)程池中執(zhí)行
        Executor executor = getExecutor();
        if (dispatch && executor != null) {
            executor.execute(sc);
        } else {
            sc.run();
        }
    } catch (RejectedExecutionException ree) {
        getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
        return false;
    } catch (Throwable t) {
        ExceptionUtils.handleThrowable(t);
        // This means we got an OOM or similar creating a thread, or that
        // the pool and its queue are full
        getLog().error(sm.getString("endpoint.process.fail"), t);
        return false;
    }
    return true;
}

接著我們分析SocketProcessor.doRun()方法(SocketProcessor.run()方法最終調(diào)用此方法)催蝗。該方法將處理邏輯交給Handler處理切威,當(dāng)event為null時(shí)育特,則表明是一個(gè)OPEN_READ事件丙号。

protected class SocketProcessor extends SocketProcessorBase<NioChannel> {

    public SocketProcessor(SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) {
        super(socketWrapper, event);
    }

    @Override
    protected void doRun() {
        NioChannel socket = socketWrapper.getSocket();
        SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());

        try {
            int handshake = -1;

            try {
                if (key != null) {
                    if (socket.isHandshakeComplete()) {
                        // No TLS handshaking required. Let the handler
                        // process this socket / event combination.
                        handshake = 0;
                    } else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT ||
                            event == SocketEvent.ERROR) {
                        // Unable to complete the TLS handshake. Treat it as
                        // if the handshake failed.
                        handshake = -1;
                    } else {
                        handshake = socket.handshake(key.isReadable(), key.isWritable());
                        // The handshake process reads/writes from/to the
                        // socket. status may therefore be OPEN_WRITE once
                        // the handshake completes. However, the handshake
                        // happens when the socket is opened so the status
                        // must always be OPEN_READ after it completes. It
                        // is OK to always set this as it is only used if
                        // the handshake completes.
                        event = SocketEvent.OPEN_READ;
                    }
                }
            } catch (IOException x) {
                handshake = -1;
                if (log.isDebugEnabled()) log.debug("Error during SSL handshake",x);
            } catch (CancelledKeyException ckx) {
                handshake = -1;
            }
            if (handshake == 0) {
                SocketState state = SocketState.OPEN;
                // Process the request from this socket
                // 將處理邏輯交給`Handler`處理,當(dāng)event為null時(shí)缰冤,則表明是一個(gè)`OPEN_READ`事件
                if (event == null) {
                    state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
                } else {
                    state = getHandler().process(socketWrapper, event);
                }
                if (state == SocketState.CLOSED) {
                    close(socket, key);
                }
            } else if (handshake == -1 ) {
                close(socket, key);
            } else if (handshake == SelectionKey.OP_READ){
                socketWrapper.registerReadInterest();
            } else if (handshake == SelectionKey.OP_WRITE){
                socketWrapper.registerWriteInterest();
            }
        } catch (CancelledKeyException cx) {
            socket.getPoller().cancelledKey(key);
        } catch (VirtualMachineError vme) {
            ExceptionUtils.handleThrowable(vme);
        } catch (Throwable t) {
            log.error("", t);
            socket.getPoller().cancelledKey(key);
        } finally {
            socketWrapper = null;
            event = null;
            //return to cache
            if (running && !paused) {
                processorCache.push(this);
            }
        }
    }
}

Handler的實(shí)現(xiàn) -- ConnectionHandler

Handler的關(guān)鍵方法是process()犬缨,該方法非常地長(zhǎng),超過(guò)了200行棉浸,前方高能怀薛!
雖然這個(gè)方法有很多條件分支,但是邏輯卻非常清楚迷郑,主要是調(diào)用Processor.process()方法枝恋。

@Override
public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {
    if (getLog().isDebugEnabled()) {
        getLog().debug(sm.getString("abstractConnectionHandler.process",
                wrapper.getSocket(), status));
    }
    if (wrapper == null) {
        // Nothing to do. Socket has been closed.
        return SocketState.CLOSED;
    }

    S socket = wrapper.getSocket();

    Processor processor = connections.get(socket);
    if (getLog().isDebugEnabled()) {
        getLog().debug(sm.getString("abstractConnectionHandler.connectionsGet",
                processor, socket));
    }

    // Async timeouts are calculated on a dedicated thread and then
    // dispatched. Because of delays in the dispatch process, the
    // timeout may no longer be required. Check here and avoid
    // unnecessary processing.
    if (SocketEvent.TIMEOUT == status && (processor == null ||
            !processor.isAsync() || !processor.checkAsyncTimeoutGeneration())) {
        // This is effectively a NO-OP
        return SocketState.OPEN;
    }

    if (processor != null) {
        // Make sure an async timeout doesn't fire
        getProtocol().removeWaitingProcessor(processor);
    } else if (status == SocketEvent.DISCONNECT || status == SocketEvent.ERROR) {
        // Nothing to do. Endpoint requested a close and there is no
        // longer a processor associated with this socket.
        return SocketState.CLOSED;
    }

    ContainerThreadMarker.set();

    try {
        if (processor == null) {
            String negotiatedProtocol = wrapper.getNegotiatedProtocol();
            if (negotiatedProtocol != null) {
                UpgradeProtocol upgradeProtocol =
                        getProtocol().getNegotiatedProtocol(negotiatedProtocol);
                if (upgradeProtocol != null) {
                    processor = upgradeProtocol.getProcessor(
                            wrapper, getProtocol().getAdapter());
                } else if (negotiatedProtocol.equals("http/1.1")) {
                    // Explicitly negotiated the default protocol.
                    // Obtain a processor below.
                } else {
                    // TODO:
                    // OpenSSL 1.0.2's ALPN callback doesn't support
                    // failing the handshake with an error if no
                    // protocol can be negotiated. Therefore, we need to
                    // fail the connection here. Once this is fixed,
                    // replace the code below with the commented out
                    // block.
                    if (getLog().isDebugEnabled()) {
                        getLog().debug(sm.getString(
                            "abstractConnectionHandler.negotiatedProcessor.fail",
                            negotiatedProtocol));
                    }
                    return SocketState.CLOSED;
                    /*
                     * To replace the code above once OpenSSL 1.1.0 is
                     * used.
                    // Failed to create processor. This is a bug.
                    throw new IllegalStateException(sm.getString(
                            "abstractConnectionHandler.negotiatedProcessor.fail",
                            negotiatedProtocol));
                    */
                }
            }
        }
        if (processor == null) {
            processor = recycledProcessors.pop();
            if (getLog().isDebugEnabled()) {
                getLog().debug(sm.getString("abstractConnectionHandler.processorPop",
                        processor));
            }
        }
        if (processor == null) {
            processor = getProtocol().createProcessor();
            register(processor);
        }

        processor.setSslSupport(
                wrapper.getSslSupport(getProtocol().getClientCertProvider()));

        // Associate the processor with the connection
        connections.put(socket, processor);

        SocketState state = SocketState.CLOSED;
        do {
            // 關(guān)鍵的代碼创倔,終于找到你了
            state = processor.process(wrapper, status);

            if (state == SocketState.UPGRADING) {
                // Get the HTTP upgrade handler
                UpgradeToken upgradeToken = processor.getUpgradeToken();
                // Retrieve leftover input
                ByteBuffer leftOverInput = processor.getLeftoverInput();
                if (upgradeToken == null) {
                    // Assume direct HTTP/2 connection
                    UpgradeProtocol upgradeProtocol = getProtocol().getUpgradeProtocol("h2c");
                    if (upgradeProtocol != null) {
                        processor = upgradeProtocol.getProcessor(
                                wrapper, getProtocol().getAdapter());
                        wrapper.unRead(leftOverInput);
                        // Associate with the processor with the connection
                        connections.put(socket, processor);
                    } else {
                        if (getLog().isDebugEnabled()) {
                            getLog().debug(sm.getString(
                                "abstractConnectionHandler.negotiatedProcessor.fail",
                                "h2c"));
                        }
                        return SocketState.CLOSED;
                    }
                } else {
                    HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler();
                    // Release the Http11 processor to be re-used
                    release(processor);
                    // Create the upgrade processor
                    processor = getProtocol().createUpgradeProcessor(wrapper, upgradeToken);
                    if (getLog().isDebugEnabled()) {
                        getLog().debug(sm.getString("abstractConnectionHandler.upgradeCreate",
                                processor, wrapper));
                    }
                    wrapper.unRead(leftOverInput);
                    // Mark the connection as upgraded
                    wrapper.setUpgraded(true);
                    // Associate with the processor with the connection
                    connections.put(socket, processor);
                    // Initialise the upgrade handler (which may trigger
                    // some IO using the new protocol which is why the lines
                    // above are necessary)
                    // This cast should be safe. If it fails the error
                    // handling for the surrounding try/catch will deal with
                    // it.
                    if (upgradeToken.getInstanceManager() == null) {
                        httpUpgradeHandler.init((WebConnection) processor);
                    } else {
                        ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null);
                        try {
                            httpUpgradeHandler.init((WebConnection) processor);
                        } finally {
                            upgradeToken.getContextBind().unbind(false, oldCL);
                        }
                    }
                }
            }
        } while ( state == SocketState.UPGRADING);

        if (state == SocketState.LONG) {
            // In the middle of processing a request/response. Keep the
            // socket associated with the processor. Exact requirements
            // depend on type of long poll
            longPoll(wrapper, processor);
            if (processor.isAsync()) {
                getProtocol().addWaitingProcessor(processor);
            }
        } else if (state == SocketState.OPEN) {
            // In keep-alive but between requests. OK to recycle
            // processor. Continue to poll for the next request.
            connections.remove(socket);
            release(processor);
            wrapper.registerReadInterest();
        } else if (state == SocketState.SENDFILE) {
            // Sendfile in progress. If it fails, the socket will be
            // closed. If it works, the socket either be added to the
            // poller (or equivalent) to await more data or processed
            // if there are any pipe-lined requests remaining.
        } else if (state == SocketState.UPGRADED) {
            // Don't add sockets back to the poller if this was a
            // non-blocking write otherwise the poller may trigger
            // multiple read events which may lead to thread starvation
            // in the connector. The write() method will add this socket
            // to the poller if necessary.
            if (status != SocketEvent.OPEN_WRITE) {
                longPoll(wrapper, processor);
            }
        } else if (state == SocketState.SUSPENDED) {
            // Don't add sockets back to the poller.
            // The resumeProcessing() method will add this socket
            // to the poller.
        } else {
            // Connection closed. OK to recycle the processor. Upgrade
            // processors are not recycled.
            connections.remove(socket);
            if (processor.isUpgrade()) {
                UpgradeToken upgradeToken = processor.getUpgradeToken();
                HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler();
                InstanceManager instanceManager = upgradeToken.getInstanceManager();
                if (instanceManager == null) {
                    httpUpgradeHandler.destroy();
                } else {
                    ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null);
                    try {
                        httpUpgradeHandler.destroy();
                    } finally {
                        try {
                            instanceManager.destroyInstance(httpUpgradeHandler);
                        } catch (Throwable e) {
                            ExceptionUtils.handleThrowable(e);
                            getLog().error(sm.getString("abstractConnectionHandler.error"), e);
                        }
                        upgradeToken.getContextBind().unbind(false, oldCL);
                    }
                }
            } else {
                release(processor);
            }
        }
        return state;
    } catch(java.net.SocketException e) {
        // SocketExceptions are normal
        getLog().debug(sm.getString(
                "abstractConnectionHandler.socketexception.debug"), e);
    } catch (java.io.IOException e) {
        // IOExceptions are normal
        getLog().debug(sm.getString(
                "abstractConnectionHandler.ioexception.debug"), e);
    } catch (ProtocolException e) {
        // Protocol exceptions normally mean the client sent invalid or
        // incomplete data.
        getLog().debug(sm.getString(
                "abstractConnectionHandler.protocolexception.debug"), e);
    }
    // Future developers: if you discover any other
    // rare-but-nonfatal exceptions, catch them here, and log as
    // above.
    catch (Throwable e) {
        ExceptionUtils.handleThrowable(e);
        // any other exception or error is odd. Here we log it
        // with "ERROR" level, so it will show up even on
        // less-than-verbose logs.
        getLog().error(sm.getString("abstractConnectionHandler.error"), e);
    } finally {
        ContainerThreadMarker.clear();
    }

    // Make sure socket/processor is removed from the list of current
    // connections
    connections.remove(socket);
    release(processor);
    return SocketState.CLOSED;
}

Processor

這兒我們主要關(guān)注的是Processor對(duì)于讀的操作挖炬,也只有一行代碼拣挪。調(diào)用service()方法。

public abstract class AbstractProcessorLight implements Processor {

    @Override
    public SocketState process(SocketWrapperBase<?> socketWrapper, SocketEvent status)
            throws IOException {

        SocketState state = SocketState.CLOSED;
        Iterator<DispatchType> dispatches = null;
        do {
            if (dispatches != null) {
                DispatchType nextDispatch = dispatches.next();
                state = dispatch(nextDispatch.getSocketStatus());
            } else if (status == SocketEvent.DISCONNECT) {
                // Do nothing here, just wait for it to get recycled
            } else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) {
                state = dispatch(status);
                if (state == SocketState.OPEN) {
                    // There may be pipe-lined data to read. If the data isn't
                    // processed now, execution will exit this loop and call
                    // release() which will recycle the processor (and input
                    // buffer) deleting any pipe-lined data. To avoid this,
                    // process it now.
                    state = service(socketWrapper);
                }
            } else if (status == SocketEvent.OPEN_WRITE) {
                // Extra write event likely after async, ignore
                state = SocketState.LONG;
            } else if (status == SocketEvent.OPEN_READ){
                // 調(diào)用`service()`方法
                state = service(socketWrapper);
            } else {
                // Default to closing the socket if the SocketEvent passed in
                // is not consistent with the current state of the Processor
                state = SocketState.CLOSED;
            }

            if (getLog().isDebugEnabled()) {
                getLog().debug("Socket: [" + socketWrapper +
                        "], Status in: [" + status +
                        "], State out: [" + state + "]");
            }

            if (state != SocketState.CLOSED && isAsync()) {
                state = asyncPostProcess();
                if (getLog().isDebugEnabled()) {
                    getLog().debug("Socket: [" + socketWrapper +
                            "], State after async post processing: [" + state + "]");
                }
            }

            if (dispatches == null || !dispatches.hasNext()) {
                // Only returns non-null iterator if there are
                // dispatches to process.
                dispatches = getIteratorAndClearDispatches();
            }
        } while (state == SocketState.ASYNC_END ||
                dispatches != null && state != SocketState.CLOSED);

        return state;
    }
}

Processor.service()方法比較重要的地方就兩點(diǎn)李茫。該方法非常得長(zhǎng)十电,也超過(guò)了200行知押,在此我們不再拷貝此方法的代碼。

  1. 生成Request和Response對(duì)象
  2. 調(diào)用Adapter.service()方法鹃骂,將生成的Request和Response對(duì)象傳進(jìn)去

Adapter

Adapter用于連接ConnectorContainer台盯,起到承上啟下的作用。Processor會(huì)調(diào)用Adapter.service()方法畏线。我們來(lái)分析一下静盅,主要做了下面幾件事情:

  1. 根據(jù)coyote框架的request和response對(duì)象,生成connector的request和response對(duì)象(是HttpServletRequest和HttpServletResponse的封裝)
  2. 補(bǔ)充header
  3. 解析請(qǐng)求寝殴,該方法會(huì)出現(xiàn)代理服務(wù)器温亲、設(shè)置必要的header等操作
  4. 真正進(jìn)入容器的地方,調(diào)用Engine容器下pipeline的閥門(mén)
@Override
public void service(org.apache.coyote.Request req, org.apache.coyote.Response res)
        throws Exception {

    // 1. 根據(jù)coyote框架的request和response對(duì)象杯矩,生成connector的request和response對(duì)象(是HttpServletRequest和HttpServletResponse的封裝)
    Request request = (Request) req.getNote(ADAPTER_NOTES);
    Response response = (Response) res.getNote(ADAPTER_NOTES);

    if (request == null) {
        // Create objects
        request = connector.createRequest();
        request.setCoyoteRequest(req);
        response = connector.createResponse();
        response.setCoyoteResponse(res);

        // Link objects
        request.setResponse(response);
        response.setRequest(request);

        // Set as notes
        req.setNote(ADAPTER_NOTES, request);
        res.setNote(ADAPTER_NOTES, response);

        // Set query string encoding
        req.getParameters().setQueryStringCharset(connector.getURICharset());
    }

    // 2. 補(bǔ)充header
    if (connector.getXpoweredBy()) {
        response.addHeader("X-Powered-By", POWERED_BY);
    }

    boolean async = false;
    boolean postParseSuccess = false;

    req.getRequestProcessor().setWorkerThreadName(THREAD_NAME.get());

    try {
        // Parse and set Catalina and configuration specific
        // request parameters
        // 3. 解析請(qǐng)求栈虚,該方法會(huì)出現(xiàn)代理服務(wù)器、設(shè)置必要的header等操作
        postParseSuccess = postParseRequest(req, request, res, response);
        if (postParseSuccess) {
            //check valves if we support async
            request.setAsyncSupported(
                    connector.getService().getContainer().getPipeline().isAsyncSupported());
            // Calling the container
            // 4. 真正進(jìn)入容器的地方史隆,調(diào)用Engine容器下pipeline的閥門(mén)
            connector.getService().getContainer().getPipeline().getFirst().invoke(
                    request, response);
        }
        if (request.isAsync()) {
            async = true;
            ReadListener readListener = req.getReadListener();
            if (readListener != null && request.isFinished()) {
                // Possible the all data may have been read during service()
                // method so this needs to be checked here
                ClassLoader oldCL = null;
                try {
                    oldCL = request.getContext().bind(false, null);
                    if (req.sendAllDataReadEvent()) {
                        req.getReadListener().onAllDataRead();
                    }
                } finally {
                    request.getContext().unbind(false, oldCL);
                }
            }

            Throwable throwable =
                    (Throwable) request.getAttribute(RequestDispatcher.ERROR_EXCEPTION);

            // If an async request was started, is not going to end once
            // this container thread finishes and an error occurred, trigger
            // the async error process
            if (!request.isAsyncCompleting() && throwable != null) {
                request.getAsyncContextInternal().setErrorState(throwable, true);
            }
        } else {
            request.finishRequest();
            response.finishResponse();
        }

    } catch (IOException e) {
        // Ignore
    } finally {
        AtomicBoolean error = new AtomicBoolean(false);
        res.action(ActionCode.IS_ERROR, error);

        if (request.isAsyncCompleting() && error.get()) {
            // Connection will be forcibly closed which will prevent
            // completion happening at the usual point. Need to trigger
            // call to onComplete() here.
            res.action(ActionCode.ASYNC_POST_PROCESS,  null);
            async = false;
        }

        // Access log
        if (!async && postParseSuccess) {
            // Log only if processing was invoked.
            // If postParseRequest() failed, it has already logged it.
            Context context = request.getContext();
            // If the context is null, it is likely that the endpoint was
            // shutdown, this connection closed and the request recycled in
            // a different thread. That thread will have updated the access
            // log so it is OK not to update the access log here in that
            // case.
            if (context != null) {
                context.logAccess(request, response,
                        System.currentTimeMillis() - req.getStartTime(), false);
            }
        }

        req.getRequestProcessor().setWorkerThreadName(null);

        // Recycle the wrapper request and response
        if (!async) {
            request.recycle();
            response.recycle();
        }
    }
}

總結(jié)

本文我們首先拋出了理解Connector前需要解答的4個(gè)問(wèn)題魂务。然后給出了整體結(jié)構(gòu)圖,并分析結(jié)構(gòu)圖中的各個(gè)組件及其關(guān)聯(lián)關(guān)系泌射。最后粘姜,我們根據(jù)整體結(jié)構(gòu)圖分析了Connector的啟動(dòng)邏輯和請(qǐng)求邏輯(內(nèi)部邏輯可謂是非常細(xì)節(jié)和復(fù)雜)。

通過(guò)上面的源碼分析熔酷,我們終于清楚了Connector解決了什么問(wèn)題孤紧,結(jié)構(gòu)是怎樣的內(nèi)部又是如何工作的拒秘。

參考鏈接

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末号显,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子躺酒,更是在濱河造成了極大的恐慌押蚤,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,324評(píng)論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件羹应,死亡現(xiàn)場(chǎng)離奇詭異揽碘,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,356評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門(mén)雳刺,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)劫灶,“玉大人,你說(shuō)我怎么就攤上這事掖桦』氪耍” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 162,328評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵滞详,是天一觀的道長(zhǎng)凛俱。 經(jīng)常有香客問(wèn)我,道長(zhǎng)料饥,這世上最難降的妖魔是什么蒲犬? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,147評(píng)論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮岸啡,結(jié)果婚禮上原叮,老公的妹妹穿的比我還像新娘。我一直安慰自己巡蘸,他們只是感情好奋隶,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,160評(píng)論 6 388
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著悦荒,像睡著了一般唯欣。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上搬味,一...
    開(kāi)封第一講書(shū)人閱讀 51,115評(píng)論 1 296
  • 那天境氢,我揣著相機(jī)與錄音,去河邊找鬼碰纬。 笑死萍聊,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的悦析。 我是一名探鬼主播寿桨,決...
    沈念sama閱讀 40,025評(píng)論 3 417
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼强戴!你這毒婦竟也來(lái)了亭螟?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 38,867評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤酌泰,失蹤者是張志新(化名)和其女友劉穎媒佣,沒(méi)想到半個(gè)月后匕累,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體陵刹,經(jīng)...
    沈念sama閱讀 45,307評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,528評(píng)論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了衰琐。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片也糊。...
    茶點(diǎn)故事閱讀 39,688評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖羡宙,靈堂內(nèi)的尸體忽然破棺而出狸剃,到底是詐尸還是另有隱情,我是刑警寧澤狗热,帶...
    沈念sama閱讀 35,409評(píng)論 5 343
  • 正文 年R本政府宣布钞馁,位于F島的核電站,受9級(jí)特大地震影響匿刮,放射性物質(zhì)發(fā)生泄漏僧凰。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,001評(píng)論 3 325
  • 文/蒙蒙 一熟丸、第九天 我趴在偏房一處隱蔽的房頂上張望训措。 院中可真熱鬧,春花似錦光羞、人聲如沸绩鸣。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,657評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)呀闻。三九已至,卻和暖如春潜慎,著一層夾襖步出監(jiān)牢的瞬間总珠,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,811評(píng)論 1 268
  • 我被黑心中介騙來(lái)泰國(guó)打工勘纯, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留局服,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,685評(píng)論 2 368
  • 正文 我出身青樓驳遵,卻偏偏與公主長(zhǎng)得像淫奔,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子堤结,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,573評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容