聲明:原創(chuàng)文章蜒程,轉載請注明出處绅你。http://www.reibang.com/u/e02df63eaa87
1、從生產(chǎn)者消費者說起
在傳統(tǒng)的生產(chǎn)者消費者模型中昭躺,通常是采用BlockingQueue實現(xiàn)忌锯。其中生產(chǎn)者線程負責提交需求,消費者線程負責處理任務领炫,二者之間通過共享內(nèi)存緩沖區(qū)進行通信偶垮。由于內(nèi)存緩沖區(qū)的存在,允許生產(chǎn)者和消費者之間速度的差異,確保系統(tǒng)正常運行似舵。
下圖展示一個簡單的生產(chǎn)者消費者模型脚猾,生產(chǎn)者從文件中讀取數(shù)據(jù),將數(shù)據(jù)內(nèi)容寫入到阻塞隊列中砚哗,消費者從隊列的另一邊獲取數(shù)據(jù)龙助,進行計算并將結果輸出。其中Main負責創(chuàng)建兩類線程并初始化隊列蛛芥。
Main:
public class Main {
public static void main(String[] args) {
// 初始化阻塞隊列
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(1000);
// 創(chuàng)建生產(chǎn)者線程
Thread producer = new Thread(new Producer(blockingQueue, "temp.dat"));
producer.start();
// 創(chuàng)建消費者線程
Thread consumer = new Thread(new Consumer(blockingQueue));
consumer.start();
}
}
生產(chǎn)者:
public class Producer implements Runnable {
private BlockingQueue<String> blockingQueue;
private String fileName;
private static final String FINIDHED = "EOF";
public Producer(BlockingQueue<String> blockingQueue, String fileName) {
this.blockingQueue = blockingQueue;
this.fileName = fileName;
}
@Override
public void run() {
try {
BufferedReader reader = new BufferedReader(new FileReader(new File(fileName)));
String line;
while ((line = reader.readLine()) != null) {
blockingQueue.put(line);
}
// 結束標志
blockingQueue.put(FINIDHED);
} catch (Exception e) {
e.printStackTrace();
}
}
}
消費者:
public class Consumer implements Runnable {
private BlockingQueue<String> blockingQueue;
private static final String FINIDHED = "EOF";
public Consumer(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
String line;
String[] arrStr;
int ret;
try {
while (!(line = blockingQueue.take()).equals(FINIDHED)) {
// 消費
arrStr = line.split("\t");
if (arrStr.length != 2) {
continue;
}
ret = Integer.parseInt(arrStr[0]) + Integer.parseInt(arrStr[1]);
System.out.println(ret);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
生產(chǎn)者-消費者模型可以很容易地將生產(chǎn)和消費進行解耦提鸟,優(yōu)化系統(tǒng)整體結構,并且由于存在緩沖區(qū)仅淑,可以緩解兩端性能不匹配的問題称勋。
2、BlockingQueue的不足
上述使用了ArrayBlockingQueue
漓糙,通過查看其實現(xiàn)铣缠,完全是使用鎖和阻塞等待實現(xiàn)線程同步。在高并發(fā)場景下昆禽,性能不是很優(yōu)越。
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
insert(e);
} finally {
lock.unlock();
}
}
但是蝇庭,ConcurrentLinkedQueue
卻是一個高性能隊列醉鳖,這是因為其實現(xiàn)使用了無鎖的CAS操作。
3哮内、Disruptor初體驗
Disruptor是由LMAX公司開發(fā)的一款高效無鎖內(nèi)存隊列盗棵。使用無鎖方式實現(xiàn)了一個環(huán)形隊列代替線性隊列。相對于普通的線性隊列酝锅,環(huán)形隊列不需要維護頭尾兩個指針刚操,只需維護一個當前位置就可以完成出入隊操作壕曼。受限于環(huán)形結構,隊列的大小只能初始化時指定瞭恰,不能動態(tài)擴展。
如下圖所示狱庇,Disruptor的實現(xiàn)為一個循環(huán)隊列惊畏,ringbuffer擁有一個序號(Seq),這個序號指向數(shù)組中下一個可用的元素密任。
隨著不停地填充這個buffer(可能也會有相應的讀妊掌簟),這個序號會一直增長浪讳,直到超過這個環(huán)缰盏。
Disruptor要求數(shù)組大小設置為2的N次方。這樣可以通過Seq & (QueueSize - 1) 直接獲取,其效率要比取目诓拢快得多形葬。這是因為(Queue - 1)的二進制為全1等形式。例如暮的,上圖中QueueSize大小為8笙以,Seq為10,則只需要計算二進制1010 & 0111 = 2冻辩,可直接得到index=2位置的元素猖腕。
在RingBuffer中,生產(chǎn)者向數(shù)組中寫入數(shù)據(jù)恨闪,生產(chǎn)者寫入數(shù)據(jù)時倘感,使用CAS操作。消費者從中讀取數(shù)據(jù)時咙咽,為防止多個消費者同時處理一個數(shù)據(jù)老玛,也使用CAS操作進行數(shù)據(jù)保護。
這種固定大小的RingBuffer還有一個好處是钧敞,可以內(nèi)存復用蜡豹。不會有新空間需要分配或者舊的空間回收,當數(shù)組填充滿后溉苛,再寫入數(shù)據(jù)會將數(shù)據(jù)覆蓋镜廉。
4、Disruptor小試牛刀
同樣地愚战,使用Disruptor處理第一節(jié)中的生產(chǎn)者消費者的案例娇唯。
4.1 添加Maven依賴
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.2</version>
</dependency>
4.2 定義事件對象
由于我們只需要將文件中的數(shù)據(jù)行讀出,然后進行計算寂玲。因此塔插,定義FileData.class
來保存文件行。
public class FileData {
private String line;
public String getLine() {
return line;
}
public void setLine(String line) {
this.line = line;
}
}
4.3 定義工廠類
用于產(chǎn)生FileData
的工廠類拓哟,會在Disruptor系統(tǒng)初始化時想许,構造所有的緩沖區(qū)中的對象實例。
public class DisruptorFactory implements EventFactory<FileData> {
public FileData newInstance() {
return new FileData();
}
}
4.4 定義消費者
消費者的作用是讀取數(shù)據(jù)并進行處理彰檬。數(shù)據(jù)的讀取已經(jīng)由Disruptor封裝伸刃,onEvent()
方法為Disruptor框架的回調(diào)方法。只需要進行簡單的數(shù)據(jù)處理即可逢倍。
public class DisruptorConsumer implements WorkHandler<FileData> {
private static final String FINIDHED = "EOF";
@Override
public void onEvent(FileData event) throws Exception {
String line = event.getLine();
if (line.equals(FINIDHED)) {
return;
}
// 消費
String[] arrStr = line.split("\t");
if (arrStr.length != 2) {
return;
}
int ret = Integer.parseInt(arrStr[0]) + Integer.parseInt(arrStr[1]);
System.out.println(ret);
}
}
4.5 定義生產(chǎn)者
生產(chǎn)者需要一個Ringbuffer
的引用捧颅。其中pushData()
方法是將生產(chǎn)的數(shù)據(jù)寫入到RingBuffer中。具體的過程是较雕,首先通過next()
方法得到下一個可用的序列號碉哑;取得下一個可用的FileData
挚币,并設置該對象的值;最后扣典,進行數(shù)據(jù)發(fā)布妆毕,這個FileData
對象會傳遞給消費者。
public class DisruptorProducer {
private static final String FINIDHED = "EOF";
private final RingBuffer<FileData> ringBuffer;
public DisruptorProducer(RingBuffer<FileData> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void pushData(String line) {
long seq = ringBuffer.next();
try {
FileData event = ringBuffer.get(seq); // 獲取可用位置
event.setLine(line); // 填充可用位置
} catch (Exception e) {
e.printStackTrace();
} finally {
ringBuffer.publish(seq); // 通知消費者
}
}
public void read(String fileName) {
try {
BufferedReader reader = new BufferedReader(new FileReader(new File(fileName)));
String line;
while ((line = reader.readLine()) != null) {
// 生產(chǎn)數(shù)據(jù)
pushData(line);
}
// 結束標志
pushData(FINIDHED);
} catch (Exception e) {
e.printStackTrace();
}
}
}
4.6 定義Main函數(shù)
最后需要一個DisruptorMain()
將上述的數(shù)據(jù)贮尖、生產(chǎn)者和消費者進行整合笛粘。
public class DisruptorMain {
public static void main(String[] args) {
DisruptorFactory factory = new DisruptorFactory(); // 工廠
ExecutorService executor = Executors.newCachedThreadPool(); // 線程池
int BUFFER_SIZE = 16; // 必須為2的冪指數(shù)
// 初始化Disruptor
Disruptor<FileData> disruptor = new Disruptor<>(factory,
BUFFER_SIZE,
executor,
ProducerType.MULTI, // Create a RingBuffer supporting multiple event publishers to the one RingBuffer
new BlockingWaitStrategy() // 默認阻塞策略
);
// 啟動消費者
disruptor.handleEventsWithWorkerPool(new DisruptorConsumer(),
new DisruptorConsumer()
);
disruptor.start();
// 啟動生產(chǎn)者
RingBuffer<FileData> ringBuffer = disruptor.getRingBuffer();
DisruptorProducer producer = new DisruptorProducer(ringBuffer);
producer.read("temp.dat");
// 關閉
disruptor.shutdown();
executor.shutdown();
}
}
5、Disruptor策略
Disruptor生產(chǎn)者和消費者之間是通過什么策略進行同步呢湿硝?Disruptor提供了如下幾種策略:
- BlockingWaitStrategy:默認等待策略薪前。和BlockingQueue的實現(xiàn)很類似,通過使用鎖和條件(Condition)進行線程同步和喚醒关斜。此策略對于線程切換來說示括,最節(jié)約CPU資源,但在高并發(fā)場景下性能有限痢畜。
-
SleepingWaitStrategy:CPU友好型策略垛膝。會在循環(huán)中不斷等待數(shù)據(jù)。首先進行自旋等待丁稀,若不成功吼拥,則使用
Thread.yield()
讓出CPU,并使用LockSupport.parkNanos(1)
進行線程睡眠二驰。所以扔罪,此策略數(shù)據(jù)處理數(shù)據(jù)可能會有較高的延遲,適合用于對延遲不敏感的場景桶雀。優(yōu)點是對生產(chǎn)者線程影響小,典型應用場景是異步日志唬复。 -
YieldingWaitStrategy:低延時策略矗积。消費者線程會不斷循環(huán)監(jiān)控RingBuffer的變化,在循環(huán)內(nèi)部使用
Thread.yield()
讓出CPU給其他線程敞咧。 - BusySpinWaitStrategy:死循環(huán)策略棘捣。消費者線程會盡最大可能監(jiān)控緩沖區(qū)的變化,會占用所有CPU資源休建。
6乍恐、Disruptor解決CPU Cache偽共享問題
為了解決CPU和內(nèi)存速度不匹配的問題,CPU中有多個高速緩存Cache测砂。在Cache中茵烈,讀寫數(shù)據(jù)的基本單位是緩存行,緩存行是內(nèi)存復制到緩存的最小單位砌些。
若兩個變量放在同一個Cache Line中呜投,在多線程情況下加匈,可能會相互影響彼此的性能。如上圖所示仑荐,CPU1上的線程更新了變量X雕拼,則CPU上的緩存行會失效,同一行的Y即使沒有更新也會失效粘招,導致Cache無法命中啥寇。
同樣地,若CPU2上的線程更新了Y洒扎,則導致CPU1上的緩存行又失效辑甜。如果CPU經(jīng)常不能命中緩存,則系統(tǒng)的吞吐量則會下降逊笆。這就是偽共享問題栈戳。
解決偽共享問題,可以在變量的前后都占據(jù)一定的填充位置难裆,盡量讓變量占用一個完整的緩存行子檀。如上圖中,CPU1上的線程更新了X乃戈,則CPU2上的Y則不會失效褂痰。同樣地,CPU2上的線程更新了Y症虑,則CPU1的不會失效缩歪。
class LhsPadding
{
protected long p1, p2, p3, p4, p5, p6, p7;
}
class Value extends LhsPadding
{
protected volatile long value;
}
class RhsPadding extends Value
{
protected long p9, p10, p11, p12, p13, p14, p15;
}
/**
* <p>Concurrent sequence class used for tracking the progress of
* the ring buffer and event processors. Support a number
* of concurrent operations including CAS and order writes.
*
* <p>Also attempts to be more efficient with regards to false
* sharing by adding padding around the volatile field.
*/
public class Sequence extends RhsPadding
{
static final long INITIAL_VALUE = -1L;
private static final Unsafe UNSAFE;
private static final long VALUE_OFFSET;
static
{
UNSAFE = Util.getUnsafe();
try
{
VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value"));
}
catch (final Exception e)
{
throw new RuntimeException(e);
}
}
... ...
}
在Sequence
的實現(xiàn)中,主要使用的是Value谍憔,但通過LhsPadding
和RhsPadding
在Value的前后填充了一些空間匪蝙,使Value無沖突的存在于緩存行中。