徒手?jǐn)]框架--高并發(fā)環(huán)境下的請(qǐng)求合并

原文地址:www.xilidou.com/2018/01/22/…

在高并發(fā)系統(tǒng)中碍遍,我們經(jīng)常遇到這樣的需求:系統(tǒng)產(chǎn)生大量的請(qǐng)求系谐,但是這些請(qǐng)求實(shí)時(shí)性要求不高度宦。我們就可以將這些請(qǐng)求合并梦裂,達(dá)到一定數(shù)量我們統(tǒng)一提交。最大化的利用系統(tǒng)性IO,提升系統(tǒng)的吞吐性能。

所以請(qǐng)求合并框架需要考慮以下兩個(gè)需求:

  1. 當(dāng)請(qǐng)求收集到一定數(shù)量時(shí)提交數(shù)據(jù)
  2. 一段時(shí)間后如果請(qǐng)求沒有達(dá)到指定的數(shù)量也進(jìn)行提交

我們就聊聊一如何實(shí)現(xiàn)這樣一個(gè)需求趣苏。

閱讀這篇文章你將會(huì)了解到:

  • ScheduledThreadPoolExecutor
  • 阻塞隊(duì)列
  • 線程安全的參數(shù)
  • LockSuppor的使用

設(shè)計(jì)思路和實(shí)現(xiàn)

我們就聊一聊實(shí)現(xiàn)這個(gè)東西的具體思路是什么狡相。希望大家能夠?qū)W習(xí)到分析問題,設(shè)計(jì)模塊的一些套路食磕。

  1. 底層使用什么數(shù)據(jù)結(jié)構(gòu)來(lái)持有需要合并的請(qǐng)求尽棕?
    • 既然我們的系統(tǒng)是在高并發(fā)的環(huán)境下使用,那我們肯定不能使用彬伦,普通的ArrayList來(lái)持有萄金。我們可以使用阻塞隊(duì)列來(lái)持有需要合并的請(qǐng)求。
    • 我們的數(shù)據(jù)結(jié)構(gòu)需要提供一個(gè) add() 的方法給外部媚朦,用于提交數(shù)據(jù)。當(dāng)外部add數(shù)據(jù)以后日戈,需要檢查隊(duì)列里面的數(shù)據(jù)的個(gè)數(shù)是否達(dá)到我們限額询张?達(dá)到數(shù)量提交數(shù)據(jù),不達(dá)到繼續(xù)等待浙炼。
    • 數(shù)據(jù)結(jié)構(gòu)還需要提供一個(gè)timeOut()的方法份氧,外部有一個(gè)計(jì)時(shí)器定時(shí)調(diào)用這個(gè)timeOut方法,如果方法被調(diào)用弯屈,則直接向遠(yuǎn)程提交數(shù)據(jù)蜗帜。
    • 條件滿足的時(shí)候線程執(zhí)行提交動(dòng)作,條件不滿足的時(shí)候線程應(yīng)當(dāng)暫停资厉,等待隊(duì)列達(dá)到提交數(shù)據(jù)的條件厅缺。所以我們可以考慮使用 LockSuppor.park()LockSuppor.unpark 來(lái)暫停和激活操作線程。

經(jīng)過上面的分析宴偿,我們就有了這樣一個(gè)數(shù)據(jù)結(jié)構(gòu):

private static class FlushThread<Item> implements Runnable{

        private final String name;

        //隊(duì)列大小
        private final int bufferSize;
        //操作間隔
        private int flushInterval;

        //上一次提交的時(shí)間湘捎。
        private volatile long lastFlushTime;
        private volatile Thread writer;

        //持有數(shù)據(jù)的阻塞隊(duì)列
        private final BlockingQueue<Item> queue;

        //達(dá)成條件后具體執(zhí)行的方法
        private final Processor<Item> processor;

        //構(gòu)造函數(shù)
        public FlushThread(String name, int bufferSize, int flushInterval,int queueSize,Processor<Item> processor) {
            this.name = name;
            this.bufferSize = bufferSize;
            this.flushInterval = flushInterval;
            this.lastFlushTime = System.currentTimeMillis();
            this.processor = processor;

            this.queue = new ArrayBlockingQueue<>(queueSize);

        }

        //外部提交數(shù)據(jù)的方法
        public boolean add(Item item){
            boolean result = queue.offer(item);
            flushOnDemand();
            return result;
        }

        //提供給外部的超時(shí)方法
        public void timeOut(){
            //超過兩次提交超過時(shí)間間隔
            if(System.currentTimeMillis() - lastFlushTime >= flushInterval){
                start();
            }
        }

        //解除線程的阻塞
        private void start(){
            LockSupport.unpark(writer);
        }

        //當(dāng)前的數(shù)據(jù)是否大于提交的條件
        private void flushOnDemand(){
            if(queue.size() >= bufferSize){
                start();
            }
        }

        //執(zhí)行提交數(shù)據(jù)的方法
        public void flush(){
            lastFlushTime = System.currentTimeMillis();
            List<Item> temp = new ArrayList<>(bufferSize);
            int size = queue.drainTo(temp,bufferSize);
            if(size > 0){
                try {
                    processor.process(temp);
                }catch (Throwable e){
                    log.error("process error",e);
                }
            }
        }

        //根據(jù)數(shù)據(jù)的尺寸和時(shí)間間隔判斷是否提交
        private boolean canFlush(){
            return queue.size() > bufferSize || System.currentTimeMillis() - lastFlushTime > flushInterval;
        }

        @Override
        public void run() {
            writer = Thread.currentThread();
            writer.setName(name);

            while (!writer.isInterrupted()){
                while (!canFlush()){
                    //如果線程沒有被打斷,且不達(dá)到執(zhí)行的條件窄刘,則阻塞線程
                    LockSupport.park(this);
                }
                flush();
            }

        }

    }
復(fù)制代碼
  1. 如何實(shí)現(xiàn)定時(shí)提交呢窥妇?

通常我們遇到定時(shí)相關(guān)的需求,首先想到的應(yīng)該是使用 ScheduledThreadPoolExecutor定時(shí)來(lái)調(diào)用FlushThread 的 timeOut 方法,如果你想到的是 Thread.sleep()...那需要再努力學(xué)習(xí)娩践,多看源碼了活翩。

  1. 怎樣進(jìn)一步的提升系統(tǒng)的吞吐量?

我們使用的FlushThread 實(shí)現(xiàn)了 Runnable 所以我們可以考慮使用線程池來(lái)持有多個(gè)FlushThread翻伺。

所以我們就有這樣的代碼:


public class Flusher<Item> {

    private final FlushThread<Item>[] flushThreads;

    private AtomicInteger index;

    //防止多個(gè)線程同時(shí)執(zhí)行材泄。增加一個(gè)隨機(jī)數(shù)間隔
    private static final Random r = new Random();

    private static final int delta = 50;

    private static ScheduledExecutorService TIMER = new ScheduledThreadPoolExecutor(1);

    private static ExecutorService POOL = Executors.newCachedThreadPool();

    public Flusher(String name,int bufferSiz,int flushInterval,int queueSize,int threads,Processor<Item> processor) {

        this.flushThreads = new FlushThread[threads];

        if(threads > 1){
            index = new AtomicInteger();
        }

        for (int i = 0; i < threads; i++) {
            final FlushThread<Item> flushThread = new FlushThread<Item>(name+ "-" + i,bufferSiz,flushInterval,queueSize,processor);
            flushThreads[i] = flushThread;
            POOL.submit(flushThread);
            //定時(shí)調(diào)用 timeOut()方法。
            TIMER.scheduleAtFixedRate(flushThread::timeOut, r.nextInt(delta), flushInterval, TimeUnit.MILLISECONDS);
        }
    }

    // 對(duì) index 取模吨岭,保證多線程都能被add
    public boolean add(Item item){
        int len = flushThreads.length;
        if(len == 1){
            return flushThreads[0].add(item);
        }

        int mod = index.incrementAndGet() % len;
        return flushThreads[mod].add(item);

    }

    //上文已經(jīng)描述
    private static class FlushThread<Item> implements Runnable{
        ...省略
    }
}

復(fù)制代碼
  1. 面向接口編程脸爱,提升系統(tǒng)擴(kuò)展性:
public interface Processor<T> {
    void process(List<T> list);
}
復(fù)制代碼

使用

我們寫個(gè)測(cè)試方法測(cè)試一下:

//實(shí)現(xiàn) Processor 將 String 全部輸出
public class PrintOutProcessor implements Processor<String>{
    @Override
    public void process(List<String> list) {

        System.out.println("start flush");

        list.forEach(System.out::println);

        System.out.println("end flush");
    }
}

復(fù)制代碼

public class Test {

    public static void main(String[] args) throws InterruptedException {

        Flusher<String> stringFlusher = new Flusher<>("test",5,1000,30,1,new PrintOutProcessor());

        int index = 1;
        while (true){
            stringFlusher.add(String.valueOf(index++));
            Thread.sleep(1000);
        }
    }
}

復(fù)制代碼

執(zhí)行的結(jié)果:


start flush
1
2
3
end flush
start flush
4
5
6
7
end flush

復(fù)制代碼

我們發(fā)現(xiàn)并沒有達(dá)到5個(gè)數(shù)字就觸發(fā)了flush。因?yàn)橛|發(fā)了超時(shí)提交未妹,雖然還沒有達(dá)到規(guī)定的5 個(gè)數(shù)據(jù)簿废,但還是執(zhí)行了 flush空入。

如果我們?nèi)コ?Thread.sleep(1000); 再看看結(jié)果:

start flush
1
2
3
4
5
end flush
start flush
6
7
8
9
10
end flush
復(fù)制代碼

每5個(gè)數(shù)一次提交。完美族檬。歪赢。。单料。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末埋凯,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子扫尖,更是在濱河造成了極大的恐慌白对,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,542評(píng)論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件换怖,死亡現(xiàn)場(chǎng)離奇詭異甩恼,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)沉颂,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,596評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門条摸,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人铸屉,你說(shuō)我怎么就攤上這事钉蒲。” “怎么了彻坛?”我有些...
    開封第一講書人閱讀 158,021評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵顷啼,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我昌屉,道長(zhǎng)线梗,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,682評(píng)論 1 284
  • 正文 為了忘掉前任怠益,我火速辦了婚禮仪搔,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘蜻牢。我一直安慰自己烤咧,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,792評(píng)論 6 386
  • 文/花漫 我一把揭開白布抢呆。 她就那樣靜靜地躺著煮嫌,像睡著了一般。 火紅的嫁衣襯著肌膚如雪抱虐。 梳的紋絲不亂的頭發(fā)上昌阿,一...
    開封第一講書人閱讀 49,985評(píng)論 1 291
  • 那天,我揣著相機(jī)與錄音,去河邊找鬼懦冰。 笑死灶轰,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的刷钢。 我是一名探鬼主播笋颤,決...
    沈念sama閱讀 39,107評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼内地!你這毒婦竟也來(lái)了伴澄?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,845評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤阱缓,失蹤者是張志新(化名)和其女友劉穎非凌,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體荆针,經(jīng)...
    沈念sama閱讀 44,299評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡敞嗡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,612評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了祭犯。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,747評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡滚停,死狀恐怖沃粗,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情键畴,我是刑警寧澤最盅,帶...
    沈念sama閱讀 34,441評(píng)論 4 333
  • 正文 年R本政府宣布,位于F島的核電站起惕,受9級(jí)特大地震影響涡贱,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜惹想,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,072評(píng)論 3 317
  • 文/蒙蒙 一问词、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧嘀粱,春花似錦激挪、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,828評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至娃磺,卻和暖如春薄湿,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,069評(píng)論 1 267
  • 我被黑心中介騙來(lái)泰國(guó)打工豺瘤, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留吆倦,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,545評(píng)論 2 362
  • 正文 我出身青樓炉奴,卻偏偏與公主長(zhǎng)得像逼庞,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子瞻赶,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,658評(píng)論 2 350

推薦閱讀更多精彩內(nèi)容

  • 關(guān)于Mongodb的全面總結(jié) MongoDB的內(nèi)部構(gòu)造《MongoDB The Definitive Guide》...
    中v中閱讀 31,914評(píng)論 2 89
  • Swift1> Swift和OC的區(qū)別1.1> Swift沒有地址/指針的概念1.2> 泛型1.3> 類型嚴(yán)謹(jǐn) 對(duì)...
    cosWriter閱讀 11,093評(píng)論 1 32
  • 在一個(gè)方法內(nèi)部定義的變量都存儲(chǔ)在棧中赛糟,當(dāng)這個(gè)函數(shù)運(yùn)行結(jié)束后,其對(duì)應(yīng)的棧就會(huì)被回收砸逊,此時(shí)璧南,在其方法體中定義的變量將不...
    Y了個(gè)J閱讀 4,413評(píng)論 1 14
  • 大家好,我是Amber师逸!今天給大家推薦一部美國(guó)驚悚電影《NO ESCAPE》中文譯名《無(wú)處可逃》司倚。 一個(gè)美國(guó)工程師...
    甜不甜加點(diǎn)鹽閱讀 410評(píng)論 0 0
  • 人在世間的長(zhǎng)河里,時(shí)代的大框架中篓像,到底能為自己活多少动知?時(shí)代和個(gè)體就像一輛大馬車,有時(shí)候顯得多么無(wú)力员辩。 我努力表現(xiàn)的...
    娃娃zy閱讀 139評(píng)論 0 0