Disruptor是一個(gè)高性能的線程間消息傳遞庫(kù)争群。它源于LMAX對(duì)并發(fā)性 传蹈、性能和非阻塞算法的研究厚脉,如今構(gòu)成了其Exchange基礎(chǔ)架構(gòu)的核心部分习寸。
理解Disruptor是什么的最好方法是將它與目前已經(jīng)的很好理解和非常相似的東西進(jìn)行比較,例如與Java的BlockingQueue進(jìn)行對(duì)比傻工。與隊(duì)列一樣霞溪,Disruptor的目的是在同一進(jìn)程內(nèi)的線程之間傳遞數(shù)據(jù)(例如消息或事件)孵滞。但是Disruptor相比傳統(tǒng)JDK中的隊(duì)列提供了一些關(guān)鍵功能,它們是:
Disruptor中的同一個(gè)消息會(huì)向所有消費(fèi)者都發(fā)送-即多播能力鸯匹。
為事件(events)預(yù)先分配內(nèi)存坊饶,避免頻繁垃圾回收與內(nèi)存分配開(kāi)銷。
可選擇無(wú)鎖(lock-free)殴蓬,基于CAS操作讓多個(gè)生產(chǎn)者不會(huì)競(jìng)爭(zhēng)同一個(gè)元素匿级,實(shí)現(xiàn)無(wú)鎖操作元素。
使用兩階段協(xié)議染厅,讓多個(gè)線程可同時(shí)修改不同元素痘绎,需要注意的是消費(fèi)元素時(shí)候只能讀取到已經(jīng)提交的元素。
緩存行填充肖粮,避免偽共享孤页。
多播能力是Java中隊(duì)列和Disruptor之間最大的行為差異。當(dāng)您有多個(gè)消費(fèi)者在同一個(gè)Disruptor上監(jiān)聽(tīng)事件時(shí)候涩馆,所有事件都會(huì)發(fā)布給所有消費(fèi)者行施,而Java隊(duì)列中的每個(gè)事件只會(huì)發(fā)送給某一個(gè)消費(fèi)者。 Disruptor的行為旨在用于需要對(duì)同一數(shù)據(jù)進(jìn)行獨(dú)立的多個(gè)并行操作的情況魂那。
Disruptor的目標(biāo)之一是在低延遲環(huán)境中使用蛾号,在低延遲系統(tǒng)中,必須減少或移除內(nèi)存分配涯雅;在基于Java的系統(tǒng)中鲜结,目的是減少由于垃圾收集導(dǎo)致的系統(tǒng)停頓;為了支持這一點(diǎn)斩芭,用戶可以預(yù)先分配Disruptor中事件所需的存儲(chǔ)空間(也就是聲明RingBuffer的大星嵯佟)。在構(gòu)造RingBuffer期間划乖,EventFactory由用戶提供,并將在Disruptor的Ring Buffer中每個(gè)事件元素創(chuàng)建時(shí)候被調(diào)用挤土。將新數(shù)據(jù)發(fā)布到Disruptor時(shí)琴庵,API將允許用戶獲取構(gòu)造的對(duì)象,以便他們可以調(diào)用方法或更新該存儲(chǔ)對(duì)象上的字段仰美,Disruptor保證這些操作只要正確實(shí)現(xiàn)就是并發(fā)安全的迷殿。
低延遲期望推動(dòng)的另一個(gè)關(guān)鍵實(shí)現(xiàn)細(xì)節(jié)是使用無(wú)鎖算法來(lái)實(shí)現(xiàn)Disruptor;所有內(nèi)存可見(jiàn)性和正確性保證都是使用內(nèi)存屏障(體現(xiàn)為volatile)或CAS操作實(shí)現(xiàn)的咖杂;在Disruptor的實(shí)現(xiàn)中只有一個(gè)情況需要實(shí)際鎖定庆寺,這就是當(dāng)使用BlockingWaitStrategy策略時(shí)候,這僅僅是為了使用條件變量诉字,以便在等待新事件到達(dá)時(shí)前parked消費(fèi)線程懦尝。許多低延遲系統(tǒng)將使用忙等待busy-wait 來(lái)避免使用條件可能引起的抖動(dòng)知纷,但是大量在系統(tǒng)繁忙等待的操作可能導(dǎo)致性能顯著下降,尤其是在CPU資源嚴(yán)重受限的情況下陵霉。
在JDK的BlockingQueue中當(dāng)我們需要添加或者取出元素時(shí)候是需要加獨(dú)占鎖的,通過(guò)鎖來(lái)保證多線程對(duì)底層共享的數(shù)據(jù)結(jié)構(gòu)進(jìn)行保護(hù),使用鎖導(dǎo)致同時(shí)只有一個(gè)線程可以向隊(duì)列添加元素或者刪除元素民镜。Disruptor則使用兩階段協(xié)議袜瞬,讓多個(gè)線程可同時(shí)修改不同元素,需要注意的是消費(fèi)元素時(shí)候只能讀取到已經(jīng)提交的元素;在Disruptor中某個(gè)線程要訪問(wèn)Ring Buffer中某個(gè)序列號(hào)下對(duì)應(yīng)的元素時(shí)候要先通過(guò)CAS操作獲取對(duì)應(yīng)元素的所有權(quán)(第一階段)效床,然后通過(guò)序列號(hào)獲取到對(duì)應(yīng)的元素對(duì)象并對(duì)其中的屬性進(jìn)行修改睹酌,最后在發(fā)布元素(第二階段),只有發(fā)布后的元素才可以被消費(fèi)者讀仁L础憋沿;當(dāng)多個(gè)線程寫(xiě)入元素時(shí)候多個(gè)線程都會(huì)先執(zhí)行CAS操作獲取到Ringbuffer中的某一個(gè)元素的所有權(quán),然后可以并發(fā)的對(duì)自己的元素進(jìn)行修改谨朝,但是需要注意的是只有序列號(hào)小的發(fā)布后卤妒,后面的才可以發(fā)布∽直遥可知使用CAS相比使用鎖大大減少了開(kāi)銷则披,提高了并發(fā)度。
計(jì)算機(jī)系統(tǒng)中為了解決主內(nèi)存與CPU運(yùn)行速度的差距洗出,在CPU與主內(nèi)存之間添加了一級(jí)或者多級(jí)高速緩沖存儲(chǔ)器(Cache)士复,這個(gè)Cache一般是集成到CPU內(nèi)部的,所以也叫 CPU Cache翩活,如下圖是兩級(jí)cache結(jié)構(gòu):
Cache內(nèi)部是按行存儲(chǔ)的阱洪,其中每一行稱為一個(gè)Cache行,Cache行是Cache與主內(nèi)存進(jìn)行數(shù)據(jù)交換的單位菠镇,Cache行的大小一般為2的冪次數(shù)字節(jié)冗荸。
當(dāng)CPU訪問(wèn)某一個(gè)變量時(shí)候,首先會(huì)去看CPU Cache內(nèi)是否有該變量利耍,如果有則直接從中獲取蚌本,否者就去主內(nèi)存里面獲取該變量,然后把該變量所在內(nèi)存區(qū)域的一個(gè)Cache行大小的內(nèi)存拷貝到Cache(Cache行是Cache與主內(nèi)存進(jìn)行數(shù)據(jù)交換的單位)隘梨。由于存放到Cache行的的是內(nèi)存塊而不是單個(gè)變量程癌,所以可能會(huì)把多個(gè)變量存放到了一個(gè)cache行。當(dāng)多個(gè)線程同時(shí)修改一個(gè)緩存行里面的多個(gè)變量時(shí)候轴猎,由于同時(shí)只能有一個(gè)線程操作緩存行嵌莉,所以相比每個(gè)變量放到一個(gè)緩存行性能會(huì)有所下降,這就是偽共享捻脖。
如上圖變量x,y同時(shí)被放到了CPU的一級(jí)和二級(jí)緩存锐峭,當(dāng)線程1使用CPU1對(duì)變量x進(jìn)行更新時(shí)候中鼠,首先會(huì)修改cpu1的一級(jí)緩存變量x所在緩存行,這時(shí)候緩存一致性協(xié)議會(huì)導(dǎo)致cpu2中變量x對(duì)應(yīng)的緩存行失效只祠,那么線程2寫(xiě)入變量x的時(shí)候就只能去二級(jí)緩存去查找兜蠕,這就破壞了一級(jí)緩存,而一級(jí)緩存比二級(jí)緩存更快抛寝,這里也說(shuō)明了多個(gè)線程不可能同時(shí)去修改自己所使用的cpu中緩存行中相同緩存行里面的變量熊杨。更壞的情況下如果cpu只有一級(jí)緩存,那么會(huì)導(dǎo)致頻繁的直接訪問(wèn)主內(nèi)存盗舰。
Disruptor中的環(huán)形緩存(Ring Buffer)底層是一個(gè)地址連續(xù)的數(shù)組晶府,則數(shù)組內(nèi)相鄰的元素很容易會(huì)被放入到同一個(gè)Cache行里面從而導(dǎo)致偽共享的出現(xiàn),Disruptor通過(guò)緩存行填充钻趋,讓數(shù)組中的每個(gè)元素獨(dú)占一個(gè)緩存行從而解決了偽共享問(wèn)題的出現(xiàn)川陆。另外為了避免環(huán)形緩存(Ring Buffer)中序列號(hào)(定位元素的游標(biāo))與其他元素共享緩存行,對(duì)其也就像了緩存行填充蛮位,以提高訪問(wèn)序列號(hào)時(shí)候緩存的命中率较沪。
在我們理解Disruptor如何工作前,我們先看看Disruptor中的核心術(shù)語(yǔ)的介紹失仁,或者說(shuō)是Disruptor中的DDD(Domain-Driven Design)域?qū)ο?/p>
- Ring Buffer: 環(huán)形緩沖區(qū)尸曼,通常被認(rèn)為是Disruptor的核心,但是從3.0版本開(kāi)始萄焦,Ring Buffer僅負(fù)責(zé)存儲(chǔ)和更新Disruptor中的數(shù)據(jù)(事件)控轿。
- Sequence: Disruptor使用Sequences作為識(shí)別特定組件所在位置的方法。每個(gè)消費(fèi)者(EventProcessor)都像Disruptor本身一樣維護(hù)一個(gè)Sequence拂封。大多數(shù)并發(fā)代碼依賴于這些Sequence值的移動(dòng)茬射,因此Sequence支持AtomicLong的許多當(dāng)前功能。事實(shí)上冒签,3版本與2之間唯一真正的區(qū)別是防止了Sequence和其他變量之間出現(xiàn)偽共享在抛。
- Sequencer: Sequencer是Disruptor的真正核心。該接口的2個(gè)實(shí)現(xiàn)(單生產(chǎn)者萧恕,多生產(chǎn)者)實(shí)現(xiàn)了所有并發(fā)算法霜定,用于在生產(chǎn)者和消費(fèi)者之間快速、正確地傳遞數(shù)據(jù)廊鸥。
- Sequence Barrier: 序列屏障(Sequence Barrier)由Sequencer產(chǎn)生,并包含對(duì)Sequencer中主要發(fā)布者的序列Sequence和任何依賴的消費(fèi)者的序列Sequence的引用辖所。它包含了確定是否有任何可供消費(fèi)者處理的事件的邏輯惰说。
- Wait Strategy: 等待策略,確定消費(fèi)者如何等待生產(chǎn)者將事件放入Disruptor缘回。
- Event: 從生產(chǎn)者傳遞給消費(fèi)者的數(shù)據(jù)單位吆视。事件沒(méi)有特定的代碼表示典挑,因?yàn)樗耆捎脩舳x。
- EventProcessor: 用于處理來(lái)自Disruptor的事件的主事件循環(huán)啦吧,并擁有消費(fèi)者序列的所有權(quán)您觉。其有一個(gè)名為BatchEventProcessor的實(shí)現(xiàn),它包含事件循環(huán)的有效實(shí)現(xiàn)授滓,并將回調(diào)使用者提供的EventHandler接口實(shí)現(xiàn)(在線程池內(nèi)運(yùn)行BatchEventProcessor的run方法)琳水。
- EventHandler: 由用戶實(shí)現(xiàn)并代表Disruptor的消費(fèi)者的接口。
- Producer: 調(diào)用Disruptor以將事件放入隊(duì)列的用戶代碼般堆。這個(gè)概念在代碼中也沒(méi)有具體表示在孝。
上面我們介紹了為了Disruptor中的核心概念,下面我們將這些元素組合在一起淮摔,如下圖是LMAX在其高性能核心服務(wù)中使用Disruptor的示例:
如上圖示例中有三個(gè)消費(fèi)者私沮,即日志記錄JournalConsumer(將輸入數(shù)據(jù)寫(xiě)入持久性日志文件),復(fù)制ReplicationConsumer(將輸入數(shù)據(jù)發(fā)送到另一臺(tái)機(jī)器以確保存在數(shù)據(jù)的遠(yuǎn)程副本)和業(yè)務(wù)邏輯ApplicationConsumer(真正的處理工作)和橙,其中JournalConsumer和ReplicationConsumer是可以并行執(zhí)行的仔燕。
Producer向Disruptor的Ring Buffer中寫(xiě)入事件,消費(fèi)者JournalConsumer和ReplicationConsumer(EventHandler)使用多播方式同時(shí)消費(fèi)Ring Buffer中的每一個(gè)元素魔招,兩者都有各自的SequenceBarrier用來(lái)控制當(dāng)前可用消費(fèi)Ring Buffer中的哪一個(gè)事件晰搀,并且當(dāng)不存在可用事件時(shí)候如何處理。消費(fèi)者ApplicationConsumer則是等JournalConsumer和ReplicationConsumer對(duì)同一個(gè)元素處理完畢后仆百,在對(duì)該元素進(jìn)行處理,這個(gè)可以使用下面這個(gè)簡(jiǎn)化圖來(lái)概括:
每個(gè)消費(fèi)者持有自己的當(dāng)前消費(fèi)序號(hào)厕隧,由于是環(huán)形buffer,所以生產(chǎn)者寫(xiě)入事件時(shí)候要看序號(hào)最小的消費(fèi)者序號(hào)俄周,以避免覆蓋還沒(méi)有被消費(fèi)的事件吁讨,另外Consumer3消費(fèi)事件時(shí)候只能消費(fèi)已經(jīng)被Consumer1,Consumer2都處理過(guò)的事件。
每個(gè)EventHandler被包裹到對(duì)應(yīng)的BatchEventProcessor中峦朗,BatchEventProcessor是一個(gè)事件處理循環(huán)建丧,類似NIOevenloop,每個(gè)BatchEventProcessor被分到線程池里面一個(gè)固定線程來(lái)執(zhí)行。BatchEventProcessor發(fā)現(xiàn)可用元素后波势,就調(diào)用EventHandler發(fā)射出元素翎朱。如上圖Consumer1,Consumer2,Consumer3共享同一個(gè)Ringbuffer。
另外如上圖Consumer1,Consumer2(EventHandler)分別被自己的BatchEventProcessor包裹尺铣,但是其共享同一個(gè)SequenceBarrier拴曲,Consumer1,Consumer2讀取元素時(shí)候要調(diào)用SequenceBarrier的waitfor來(lái)判斷是否有可以讀取的元素;Consumer3被自己的BatchEventProcessor包裹凛忿,其有自己的SequenceBarrier澈灼,并且持有其依賴的前面的所有消費(fèi)者的引用(Consumer1,Consumer2的引用),Consumer3消費(fèi)元素時(shí)候要看其依賴的所有消費(fèi)者,看其是否都消費(fèi)了某一個(gè)元素叁熔,如果是其才可以消費(fèi)該元素委乌。