原文地址:www.xilidou.com/2018/01/22/…
在高并發(fā)系統(tǒng)中碍遍,我們經(jīng)常遇到這樣的需求:系統(tǒng)產(chǎn)生大量的請(qǐng)求系谐,但是這些請(qǐng)求實(shí)時(shí)性要求不高度宦。我們就可以將這些請(qǐng)求合并梦裂,達(dá)到一定數(shù)量我們統(tǒng)一提交。最大化的利用系統(tǒng)性IO,提升系統(tǒng)的吞吐性能。
所以請(qǐng)求合并框架需要考慮以下兩個(gè)需求:
- 當(dāng)請(qǐng)求收集到一定數(shù)量時(shí)提交數(shù)據(jù)
- 一段時(shí)間后如果請(qǐng)求沒有達(dá)到指定的數(shù)量也進(jìn)行提交
我們就聊聊一如何實(shí)現(xiàn)這樣一個(gè)需求趣苏。
閱讀這篇文章你將會(huì)了解到:
- ScheduledThreadPoolExecutor
- 阻塞隊(duì)列
- 線程安全的參數(shù)
- LockSuppor的使用
設(shè)計(jì)思路和實(shí)現(xiàn)
我們就聊一聊實(shí)現(xiàn)這個(gè)東西的具體思路是什么狡相。希望大家能夠?qū)W習(xí)到分析問題,設(shè)計(jì)模塊的一些套路食磕。
- 底層使用什么數(shù)據(jù)結(jié)構(gòu)來(lái)持有需要合并的請(qǐng)求尽棕?
- 既然我們的系統(tǒng)是在高并發(fā)的環(huán)境下使用,那我們肯定不能使用彬伦,普通的
ArrayList
來(lái)持有萄金。我們可以使用阻塞隊(duì)列來(lái)持有需要合并的請(qǐng)求。 - 我們的數(shù)據(jù)結(jié)構(gòu)需要提供一個(gè) add() 的方法給外部媚朦,用于提交數(shù)據(jù)。當(dāng)外部add數(shù)據(jù)以后日戈,需要檢查隊(duì)列里面的數(shù)據(jù)的個(gè)數(shù)是否達(dá)到我們限額询张?達(dá)到數(shù)量提交數(shù)據(jù),不達(dá)到繼續(xù)等待浙炼。
- 數(shù)據(jù)結(jié)構(gòu)還需要提供一個(gè)timeOut()的方法份氧,外部有一個(gè)計(jì)時(shí)器定時(shí)調(diào)用這個(gè)timeOut方法,如果方法被調(diào)用弯屈,則直接向遠(yuǎn)程提交數(shù)據(jù)蜗帜。
- 條件滿足的時(shí)候線程執(zhí)行提交動(dòng)作,條件不滿足的時(shí)候線程應(yīng)當(dāng)暫停资厉,等待隊(duì)列達(dá)到提交數(shù)據(jù)的條件厅缺。所以我們可以考慮使用
LockSuppor.park()
和LockSuppor.unpark
來(lái)暫停和激活操作線程。
- 既然我們的系統(tǒng)是在高并發(fā)的環(huán)境下使用,那我們肯定不能使用彬伦,普通的
經(jīng)過上面的分析宴偿,我們就有了這樣一個(gè)數(shù)據(jù)結(jié)構(gòu):
private static class FlushThread<Item> implements Runnable{
private final String name;
//隊(duì)列大小
private final int bufferSize;
//操作間隔
private int flushInterval;
//上一次提交的時(shí)間湘捎。
private volatile long lastFlushTime;
private volatile Thread writer;
//持有數(shù)據(jù)的阻塞隊(duì)列
private final BlockingQueue<Item> queue;
//達(dá)成條件后具體執(zhí)行的方法
private final Processor<Item> processor;
//構(gòu)造函數(shù)
public FlushThread(String name, int bufferSize, int flushInterval,int queueSize,Processor<Item> processor) {
this.name = name;
this.bufferSize = bufferSize;
this.flushInterval = flushInterval;
this.lastFlushTime = System.currentTimeMillis();
this.processor = processor;
this.queue = new ArrayBlockingQueue<>(queueSize);
}
//外部提交數(shù)據(jù)的方法
public boolean add(Item item){
boolean result = queue.offer(item);
flushOnDemand();
return result;
}
//提供給外部的超時(shí)方法
public void timeOut(){
//超過兩次提交超過時(shí)間間隔
if(System.currentTimeMillis() - lastFlushTime >= flushInterval){
start();
}
}
//解除線程的阻塞
private void start(){
LockSupport.unpark(writer);
}
//當(dāng)前的數(shù)據(jù)是否大于提交的條件
private void flushOnDemand(){
if(queue.size() >= bufferSize){
start();
}
}
//執(zhí)行提交數(shù)據(jù)的方法
public void flush(){
lastFlushTime = System.currentTimeMillis();
List<Item> temp = new ArrayList<>(bufferSize);
int size = queue.drainTo(temp,bufferSize);
if(size > 0){
try {
processor.process(temp);
}catch (Throwable e){
log.error("process error",e);
}
}
}
//根據(jù)數(shù)據(jù)的尺寸和時(shí)間間隔判斷是否提交
private boolean canFlush(){
return queue.size() > bufferSize || System.currentTimeMillis() - lastFlushTime > flushInterval;
}
@Override
public void run() {
writer = Thread.currentThread();
writer.setName(name);
while (!writer.isInterrupted()){
while (!canFlush()){
//如果線程沒有被打斷,且不達(dá)到執(zhí)行的條件窄刘,則阻塞線程
LockSupport.park(this);
}
flush();
}
}
}
復(fù)制代碼
- 如何實(shí)現(xiàn)定時(shí)提交呢窥妇?
通常我們遇到定時(shí)相關(guān)的需求,首先想到的應(yīng)該是使用 ScheduledThreadPoolExecutor
定時(shí)來(lái)調(diào)用FlushThread 的 timeOut 方法,如果你想到的是 Thread.sleep()
...那需要再努力學(xué)習(xí)娩践,多看源碼了活翩。
- 怎樣進(jìn)一步的提升系統(tǒng)的吞吐量?
我們使用的FlushThread
實(shí)現(xiàn)了 Runnable
所以我們可以考慮使用線程池來(lái)持有多個(gè)FlushThread
翻伺。
所以我們就有這樣的代碼:
public class Flusher<Item> {
private final FlushThread<Item>[] flushThreads;
private AtomicInteger index;
//防止多個(gè)線程同時(shí)執(zhí)行材泄。增加一個(gè)隨機(jī)數(shù)間隔
private static final Random r = new Random();
private static final int delta = 50;
private static ScheduledExecutorService TIMER = new ScheduledThreadPoolExecutor(1);
private static ExecutorService POOL = Executors.newCachedThreadPool();
public Flusher(String name,int bufferSiz,int flushInterval,int queueSize,int threads,Processor<Item> processor) {
this.flushThreads = new FlushThread[threads];
if(threads > 1){
index = new AtomicInteger();
}
for (int i = 0; i < threads; i++) {
final FlushThread<Item> flushThread = new FlushThread<Item>(name+ "-" + i,bufferSiz,flushInterval,queueSize,processor);
flushThreads[i] = flushThread;
POOL.submit(flushThread);
//定時(shí)調(diào)用 timeOut()方法。
TIMER.scheduleAtFixedRate(flushThread::timeOut, r.nextInt(delta), flushInterval, TimeUnit.MILLISECONDS);
}
}
// 對(duì) index 取模吨岭,保證多線程都能被add
public boolean add(Item item){
int len = flushThreads.length;
if(len == 1){
return flushThreads[0].add(item);
}
int mod = index.incrementAndGet() % len;
return flushThreads[mod].add(item);
}
//上文已經(jīng)描述
private static class FlushThread<Item> implements Runnable{
...省略
}
}
復(fù)制代碼
- 面向接口編程脸爱,提升系統(tǒng)擴(kuò)展性:
public interface Processor<T> {
void process(List<T> list);
}
復(fù)制代碼
使用
我們寫個(gè)測(cè)試方法測(cè)試一下:
//實(shí)現(xiàn) Processor 將 String 全部輸出
public class PrintOutProcessor implements Processor<String>{
@Override
public void process(List<String> list) {
System.out.println("start flush");
list.forEach(System.out::println);
System.out.println("end flush");
}
}
復(fù)制代碼
public class Test {
public static void main(String[] args) throws InterruptedException {
Flusher<String> stringFlusher = new Flusher<>("test",5,1000,30,1,new PrintOutProcessor());
int index = 1;
while (true){
stringFlusher.add(String.valueOf(index++));
Thread.sleep(1000);
}
}
}
復(fù)制代碼
執(zhí)行的結(jié)果:
start flush
1
2
3
end flush
start flush
4
5
6
7
end flush
復(fù)制代碼
我們發(fā)現(xiàn)并沒有達(dá)到5個(gè)數(shù)字就觸發(fā)了flush。因?yàn)橛|發(fā)了超時(shí)提交未妹,雖然還沒有達(dá)到規(guī)定的5 個(gè)數(shù)據(jù)簿废,但還是執(zhí)行了 flush空入。
如果我們?nèi)コ?Thread.sleep(1000);
再看看結(jié)果:
start flush
1
2
3
4
5
end flush
start flush
6
7
8
9
10
end flush
復(fù)制代碼
每5個(gè)數(shù)一次提交。完美族檬。歪赢。。单料。