Thrift源碼分析(TThreadPoolServer)

Thrift提供的網(wǎng)絡(luò)服務(wù)模型:?jiǎn)尉€程污秆、多線程、事件驅(qū)動(dòng)墅茉,從另一個(gè)角度劃分為:阻塞服務(wù)模型命黔、非阻塞服務(wù)模型。

  • 阻塞服務(wù)模型:TSimpleServer躁锁、TThreadPoolServer纷铣。

  • 非阻塞服務(wù)模型:TNonblockingServer、THsHaServer和TThreadedSelectorServer战转。

TThreadPoolServer類圖

TThreadPoolServer模式采用阻塞socket方式工作搜立,主線程負(fù)責(zé)阻塞式監(jiān)聽是否有新socket到來(lái),每當(dāng)新的客戶端連接請(qǐng)求過(guò)來(lái)槐秧,就將其封裝起來(lái)啄踊,然后交給線程池來(lái)處理。線程池來(lái)完成具體的業(yè)務(wù)處理刁标,將結(jié)果發(fā)給客戶端颠通。流程圖如下所示。

TThreadPoolServer

我們先來(lái)看看Args 類膀懈,Args 類多了幾個(gè)陌生的參數(shù)顿锰,

public static class Args extends AbstractServerArgs<Args> {
        //用來(lái)設(shè)置線程池的時(shí)候使用
        public int minWorkerThreads = 5;
        public int maxWorkerThreads = Integer.MAX_VALUE;
        // 線程池
        public ExecutorService executorService;

        // 用來(lái)設(shè)置線程池shutdown后,主線程等待多長(zhǎng)時(shí)間
        public int stopTimeoutVal = 60;
        public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;

      /**
       * requestTimeout和beBackoffSlotLength共同用來(lái)設(shè)置這次重試和上次重試相隔時(shí)常启搂,詳細(xì)的方式下面有講解
       */
        // 用來(lái)設(shè)置總共重試時(shí)長(zhǎng)
        public int requestTimeout = 20;
        public TimeUnit requestTimeoutUnit = TimeUnit.SECONDS;

        // 這個(gè)參數(shù)用來(lái)設(shè)置重試步伐的
        public int beBackoffSlotLength = 100;
        public TimeUnit beBackoffSlotLengthUnit = TimeUnit.MILLISECONDS;

        public Args(TServerTransport transport) {
            super(transport);
        }

        public Args minWorkerThreads(int n) {
            minWorkerThreads = n;
            return this;
        }

        public Args maxWorkerThreads(int n) {
            maxWorkerThreads = n;
            return this;
        }

        public Args stopTimeoutVal(int n) {
            stopTimeoutVal = n;
            return this;
        }

        public Args stopTimeoutUnit(TimeUnit tu) {
            stopTimeoutUnit = tu;
            return this;
        }

        public Args requestTimeout(int n) {
            requestTimeout = n;
            return this;
        }

        public Args requestTimeoutUnit(TimeUnit tu) {
            requestTimeoutUnit = tu;
            return this;
        }

        //Binary exponential backoff slot length
        public Args beBackoffSlotLength(int n) {
            beBackoffSlotLength = n;
            return this;
        }

        //Binary exponential backoff slot time unit
        public Args beBackoffSlotLengthUnit(TimeUnit tu) {
            beBackoffSlotLengthUnit = tu;
            return this;
        }

        public Args executorService(ExecutorService executorService) {
            this.executorService = executorService;
            return this;
        }
    }

我們?cè)诳聪耇ThreadPoolServer的源碼硼控,這個(gè)是TThreadPoolServer的構(gòu)造函數(shù),在構(gòu)造函數(shù)里實(shí)例化線程池胳赌。

 public TThreadPoolServer(Args args) {
        super(args);

        stopTimeoutUnit = args.stopTimeoutUnit;
        stopTimeoutVal = args.stopTimeoutVal;
        requestTimeoutUnit = args.requestTimeoutUnit;
        requestTimeout = args.requestTimeout;
        beBackoffSlotInMillis = args.beBackoffSlotLengthUnit.toMillis(args.beBackoffSlotLength);

        // 實(shí)例化線程池 可以自己設(shè)計(jì)線程池后傳進(jìn)來(lái)牢撼,或者ThreadPoolServer給你創(chuàng)建
        executorService_ = args.executorService != null ?
                args.executorService : createDefaultExecutorService(args);
    }

    // 創(chuàng)建線程池代碼
    private static ExecutorService createDefaultExecutorService(Args args) {
        // 線程池等待隊(duì)列 此隊(duì)列中不緩存任何一個(gè)任務(wù)。向線程池提交任務(wù)時(shí)疑苫,如果沒有空閑線程來(lái)運(yùn)行任務(wù)熏版,
        // 則入列操作會(huì)阻塞。當(dāng)有線程來(lái)獲取任務(wù)時(shí)捍掺,出列操作會(huì)喚醒執(zhí)行入列操作的線程撼短。
        // 從這個(gè)特性來(lái)看,SynchronousQueue是一個(gè)無(wú)界隊(duì)列挺勿,因此當(dāng)使用SynchronousQueue作為線程池的阻塞隊(duì)列時(shí)阔加,
        // 參數(shù)maximumPoolSizes沒有任何作用。
        SynchronousQueue<Runnable> executorQueue =
                new SynchronousQueue<Runnable>();
        return new ThreadPoolExecutor(args.minWorkerThreads,
                args.maxWorkerThreads,
                args.stopTimeoutVal,
                args.stopTimeoutUnit,
                executorQueue);
    }

serve()函數(shù)

  • preServe()函數(shù):開啟服務(wù)器進(jìn)行監(jiān)聽
  • execute()函數(shù):將處理客戶端請(qǐng)求交給線程池
  • waitForShutdown()函數(shù):服務(wù)端停止工作后满钟,關(guān)閉線程池
 // 重頭戲胜榔,通過(guò)serve()啟動(dòng)服務(wù)端
    public void serve() {
        // 服務(wù)器進(jìn)行監(jiān)聽
        if (!preServe()) {
            return;
        }

        // 在execute()函數(shù)里面獲取新的客戶端的連接請(qǐng)求。然后交給線程池進(jìn)行相應(yīng)的業(yè)務(wù)處理
        execute();

        // 服務(wù)端停止工作后湃番,通過(guò)這個(gè)函數(shù)關(guān)閉線程池
        waitForShutdown();

        setServing(false);
    }
  • preServe()函數(shù)夭织,用戶來(lái)開啟服務(wù)端對(duì)客戶端的監(jiān)聽

 protected boolean preServe() {
        try {
            // 服務(wù)器進(jìn)行監(jiān)聽
            serverTransport_.listen();
        } catch (TTransportException ttx) {
            LOGGER.error("Error occurred during listening.", ttx);
            return false;
        }

        // Run the preServe event
        if (eventHandler_ != null) {
            eventHandler_.preServe();
        }
        stopped_ = false;
        setServing(true);

        return true;
    }

execute()函數(shù),服務(wù)端接收到客戶端連接的請(qǐng)求后吠撮,將其封裝成WorkerProcess類尊惰,丟給線程池

protected void execute() {
        int failureCount = 0;
        // stopped_ 是服務(wù)端停止的標(biāo)記
        while (!stopped_) {
            try {
                // 接收到了來(lái)自新的客戶端的連接請(qǐng)求,套接字
                TTransport client = serverTransport_.accept();
                // WorkerProcess類繼承了Runnable類泥兰,將客戶端封裝在里面弄屡,好扔給線程池處理
                // 對(duì)客戶端詳細(xì)的處理過(guò)程在WorkerProcess類的run()方法里
                WorkerProcess wp = new WorkerProcess(client);

                // 記錄加入線程池的重試次數(shù)
                int retryCount = 0;
                // 剩余的重試時(shí)間 剛開始等于requestTimeout
                long remainTimeInMillis = requestTimeoutUnit.toMillis(requestTimeout);
                while (true) {
                    try {
                        // 交給線程池處理
                        executorService_.execute(wp);
                        break;
                    } catch (Throwable t) {
                        // 如果報(bào)錯(cuò) 根據(jù)錯(cuò)誤類型進(jìn)行重試
                        if (t instanceof RejectedExecutionException) {
                            // 加入線程池被拒絕了
                            // 重試次數(shù)加一
                            retryCount++;
                            try {
                                if (remainTimeInMillis > 0) {
                                    //do a truncated 20 binary exponential backoff sleep
                                    // sleepTimeInMillis: 每次重試失敗后,休眠的時(shí)長(zhǎng)鞋诗,
                                    long sleepTimeInMillis = ((long) (random.nextDouble() *
                                            (1L << Math.min(retryCount, 20)))) * beBackoffSlotInMillis;
                                    // 獲取sleepTimeInMillis和remainTimeInMillis較小的值
                                    sleepTimeInMillis = Math.min(sleepTimeInMillis, remainTimeInMillis);
                                    TimeUnit.MILLISECONDS.sleep(sleepTimeInMillis);
                                    // 減去這次休眠的時(shí)間
                                    remainTimeInMillis = remainTimeInMillis - sleepTimeInMillis;
                                } else {
                                    client.close();
                                    wp = null;
                                    LOGGER.warn("Task has been rejected by ExecutorService " + retryCount
                                            + " times till timedout, reason: " + t);
                                    break;
                                }
                            } catch (InterruptedException e) {
                                LOGGER.warn("Interrupted while waiting to place client on executor queue.");
                                Thread.currentThread().interrupt();
                                break;
                            }
                        } else if (t instanceof Error) {
                            LOGGER.error("ExecutorService threw error: " + t, t);
                            throw (Error) t;
                        } else {
                            //for other possible runtime errors from ExecutorService, should also not kill serve
                            LOGGER.warn("ExecutorService threw error: " + t, t);
                            break;
                        }
                    }
                }
            } catch (TTransportException ttx) {
                if (!stopped_) {
                    // 加入現(xiàn)場(chǎng)失敗次數(shù)加一
                    ++failureCount;
                    LOGGER.warn("Transport error occurred during acceptance of message.", ttx);
                }
            }
        }
    }
  • waitForShutdown()函數(shù)膀捷,當(dāng)服務(wù)端線程停止后, 停止線程池
  protected void waitForShutdown() {
        // 不在接受新的線程削彬,并且等待之前提交的線程都執(zhí)行完在關(guān)閉全庸,
        executorService_.shutdown();

        // Loop until awaitTermination finally does return without a interrupted
        // exception. If we don't do this, then we'll shut down prematurely. We want
        // to let the executorService clear it's task queue, closing client sockets
        // appropriately.
        /**
         * 循環(huán)執(zhí)行知道調(diào)用awaitTermination() 后不拋出異常,如果不這樣做線程池會(huì)關(guān)閉的過(guò)早
         * 我們希望線程池可以讓自己等待隊(duì)列里的任務(wù)也執(zhí)行完畢融痛,然后再關(guān)閉于客戶端的socket連接
         * 就是再線程池執(zhí)行shutdown()方法后壶笼,留stopTimeoutVal長(zhǎng)的時(shí)間執(zhí)行完等待隊(duì)列里的任務(wù)。
         */
        long timeoutMS = stopTimeoutUnit.toMillis(stopTimeoutVal);
        long now = System.currentTimeMillis();
        while (timeoutMS >= 0) {
            try {
                // 該方法調(diào)用會(huì)被阻塞雁刷,直到所有任務(wù)執(zhí)行完畢并且shutdown請(qǐng)求被調(diào)用覆劈,
                // 或者參數(shù)中定義的timeout時(shí)間到達(dá)或者當(dāng)前線程被打斷,
                // 這幾種情況任意一個(gè)發(fā)生了就會(huì)導(dǎo)致該方法的執(zhí)行沛励。
                executorService_.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
                break;
            } catch (InterruptedException ix) {
                long newnow = System.currentTimeMillis();
                timeoutMS -= (newnow - now);
                now = newnow;
            }
        }
    }

WorkerProcess 類源碼责语,WorkerProcess 類繼承了Runnable ,在run()函數(shù)里進(jìn)行相應(yīng)的業(yè)務(wù)邏輯處理侯勉。

private class WorkerProcess implements Runnable {

        /**
         * Client that this services.
         */
        private TTransport client_;

        /**
         * Default constructor.
         *
         * @param client Transport to process
         */
        private WorkerProcess(TTransport client) {
            client_ = client;
        }

        /**
         * Loops on processing a client forever
         */
        public void run() {
            // 業(yè)務(wù)邏輯處理器
            TProcessor processor = null;
            // 傳輸層
            TTransport inputTransport = null;
            TTransport outputTransport = null;
            // 協(xié)議層
            TProtocol inputProtocol = null;
            TProtocol outputProtocol = null;

            TServerEventHandler eventHandler = null;
            ServerContext connectionContext = null;

            try {
                // 獲取客戶端相應(yīng)的處理器鹦筹,傳輸層,協(xié)議層
                processor = processorFactory_.getProcessor(client_);
                inputTransport = inputTransportFactory_.getTransport(client_);
                outputTransport = outputTransportFactory_.getTransport(client_);
                inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
                outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);

                eventHandler = getEventHandler();
                if (eventHandler != null) {
                    connectionContext = eventHandler.createContext(inputProtocol, outputProtocol);
                }
                // we check stopped_ first to make sure we're not supposed to be shutting
                // down. this is necessary for graceful shutdown.
                while (true) {

                    if (eventHandler != null) {
                        eventHandler.processContext(connectionContext, inputTransport, outputTransport);
                    }

                    if (stopped_) {
                        break;
                    }
                    // 進(jìn)行業(yè)務(wù)邏輯處理址貌,如果處理完一個(gè)請(qǐng)求以后铐拐,下一個(gè)請(qǐng)求還沒來(lái)
                    // 那么這個(gè)線程將會(huì)阻塞在這里
                    processor.process(inputProtocol, outputProtocol);
                }
            } catch (Exception x) {
                // We'll usually receive RuntimeException types here
                // Need to unwrap to ascertain real causing exception before we choose to ignore
                // Ignore err-logging all transport-level/type exceptions、
                // 在這里我們收到RuntimeException類型的異常练对,我們需要在忽略這個(gè)異常前拆開這個(gè)異常遍蟋,查明弄清是什么異常,螟凭、
                if (!isIgnorableException(x)) {
                    // Log the exception at error level and continue
                    LOGGER.error((x instanceof TException ? "Thrift " : "") + "Error occurred during processing of message.", x);
                }
            } finally {
                if (eventHandler != null) {
                    eventHandler.deleteContext(connectionContext, inputProtocol, outputProtocol);
                }
                if (inputTransport != null) {
                    inputTransport.close();
                }
                if (outputTransport != null) {
                    outputTransport.close();
                }
                if (client_.isOpen()) {
                    client_.close();
                }
            }
        }

        // 分析異常虚青,看看是什么導(dǎo)致的異常,對(duì)Thrift的異常還不太了解螺男,了解了以后在來(lái)研究下這
        private boolean isIgnorableException(Exception x) {
            TTransportException tTransportException = null;

            if (x instanceof TTransportException) {
                tTransportException = (TTransportException) x;
            } else if (x.getCause() instanceof TTransportException) {
                tTransportException = (TTransportException) x.getCause();
            }

            if (tTransportException != null) {
                switch (tTransportException.getType()) {
                    case TTransportException.END_OF_FILE:
                    case TTransportException.TIMED_OUT:
                        return true;
                }
            }
            return false;
        }
    }
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末棒厘,一起剝皮案震驚了整個(gè)濱河市纵穿,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌奢人,老刑警劉巖谓媒,帶你破解...
    沈念sama閱讀 206,311評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異何乎,居然都是意外死亡句惯,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,339評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門支救,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)抢野,“玉大人,你說(shuō)我怎么就攤上這事各墨≈腹拢” “怎么了?”我有些...
    開封第一講書人閱讀 152,671評(píng)論 0 342
  • 文/不壞的土叔 我叫張陵欲主,是天一觀的道長(zhǎng)邓厕。 經(jīng)常有香客問我,道長(zhǎng)扁瓢,這世上最難降的妖魔是什么详恼? 我笑而不...
    開封第一講書人閱讀 55,252評(píng)論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮引几,結(jié)果婚禮上昧互,老公的妹妹穿的比我還像新娘。我一直安慰自己伟桅,他們只是感情好敞掘,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,253評(píng)論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著楣铁,像睡著了一般玖雁。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上盖腕,一...
    開封第一講書人閱讀 49,031評(píng)論 1 285
  • 那天赫冬,我揣著相機(jī)與錄音,去河邊找鬼溃列。 笑死劲厌,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的听隐。 我是一名探鬼主播补鼻,決...
    沈念sama閱讀 38,340評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了风范?” 一聲冷哼從身側(cè)響起咨跌,我...
    開封第一講書人閱讀 36,973評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎硼婿,沒想到半個(gè)月后虑润,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,466評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡加酵,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,937評(píng)論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了哭当。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片猪腕。...
    茶點(diǎn)故事閱讀 38,039評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖钦勘,靈堂內(nèi)的尸體忽然破棺而出陋葡,到底是詐尸還是另有隱情,我是刑警寧澤彻采,帶...
    沈念sama閱讀 33,701評(píng)論 4 323
  • 正文 年R本政府宣布腐缤,位于F島的核電站,受9級(jí)特大地震影響肛响,放射性物質(zhì)發(fā)生泄漏岭粤。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,254評(píng)論 3 307
  • 文/蒙蒙 一特笋、第九天 我趴在偏房一處隱蔽的房頂上張望剃浇。 院中可真熱鬧,春花似錦猎物、人聲如沸虎囚。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,259評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)淘讥。三九已至,卻和暖如春堤如,著一層夾襖步出監(jiān)牢的瞬間蒲列,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工煤惩, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留嫉嘀,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 45,497評(píng)論 2 354
  • 正文 我出身青樓魄揉,卻偏偏與公主長(zhǎng)得像剪侮,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,786評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容

  • 必備的理論基礎(chǔ) 1.操作系統(tǒng)作用: 隱藏丑陋復(fù)雜的硬件接口瓣俯,提供良好的抽象接口杰标。 管理調(diào)度進(jìn)程,并將多個(gè)進(jìn)程對(duì)硬件...
    drfung閱讀 3,525評(píng)論 0 5
  • 一彩匕、簡(jiǎn)歷準(zhǔn)備 1腔剂、個(gè)人技能 (1)自定義控件、UI設(shè)計(jì)驼仪、常用動(dòng)畫特效 自定義控件 ①為什么要自定義控件掸犬? Andr...
    lucas777閱讀 5,186評(píng)論 2 54
  • 一.基礎(chǔ)知識(shí)篇 1.1 Java基礎(chǔ)知識(shí)篇 final, finally, finalize 的區(qū)別 final修...
    xSpringCloud閱讀 1,734評(píng)論 1 46
  • 所有知識(shí)點(diǎn)已整理成app app下載地址 J2EE 部分: 1.Switch能否用string做參數(shù)? 在 Jav...
    侯蛋蛋_閱讀 2,407評(píng)論 1 4
  • 聊聊阻塞與非阻塞介褥、同步與異步、I/O 模型 來(lái)源:huangguisu 鏈接:http://blog.csdn.n...
    meng_philip123閱讀 1,637評(píng)論 1 13