本文使用 ThreadPoolExecutor 實(shí)現(xiàn)一個(gè) 帶優(yōu)先級(jí)的線程池
最近做一個(gè)PPT轉(zhuǎn)PDF的功能, 調(diào)用 office 的另存為, 時(shí)間較長, (大約2S轉(zhuǎn)一個(gè)文件), 而且只能單線程來跑, 項(xiàng)目要求批量轉(zhuǎn)好并發(fā)郵件, 如果用戶手動(dòng)點(diǎn)擊的生成PDF則應(yīng)該盡快生成, 不能等批量轉(zhuǎn)好后再讓用戶下載.所以就實(shí)現(xiàn)了一個(gè)有優(yōu)先級(jí)的線程池任務(wù)隊(duì)列. 其實(shí)正常的實(shí)現(xiàn)方式是使用優(yōu)先級(jí)隊(duì)列(java.util.PriorityQueue / java.util.concurrent.PriorityBlockingQueue)這種方式?jīng)]辦法同步的獲取結(jié)果, 編程上有點(diǎn)復(fù)雜, java.util.concurrent.ThreadPoolExecutor 可以 public <T> Future<T> submit(Callable<T> task); 使用Future.get(), 阻塞線程, 等待結(jié)果, 來實(shí)現(xiàn)同步調(diào)用.
public class PriorityThreadPoolExecutor extends ThreadPoolExecutor;
實(shí)現(xiàn)方法很簡單, 繼承 ThreadPoolExecutor 使用 PriorityBlockingQueue 優(yōu)先級(jí)隊(duì)列. PriorityBlockingQueue 有個(gè)坑就是.
Operations on this class make no guarantees about the ordering of elements with equal priority.
*如果優(yōu)先級(jí)相同,不能確定順序. *
實(shí)際測(cè)試下來的結(jié)果是, 如果優(yōu)先級(jí)相同則執(zhí)行順序跟插入順序相反, 這就尷尬了, 著還是FIFO隊(duì)列嗎? 官網(wǎng)給了解決方式.對(duì)每一個(gè)隊(duì)列元素編號(hào), 照抄就可以了. 限制就是隊(duì)列歷史總個(gè)數(shù)不能超過 Long 個(gè). 實(shí)現(xiàn)一個(gè)Comparable 的類
class PriorityRunnable<E extends Comparable<? super E>> implements Runnable, Comparable<PriorityRunnable<E>>;
重載線程池的添加任務(wù)的方法, 追加一個(gè)參數(shù). 如果使用基類的方法, 優(yōu)先級(jí)為 0 .
public void execute(Runnable command, int priority);
public <T> Future<T> submit(Callable<T> task, int priority);
public <T> Future<T> submit(Runnable task, T result, int priority);
public Future<?> submit(Runnable task, int priority);
最終代碼如下
package wang.lcs.sys.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
public class PriorityThreadPoolExecutor extends ThreadPoolExecutor {
private static final Logger log = LoggerFactory.getLogger(PriorityThreadPoolExecutor.class);
private ThreadLocal<Integer> local = new ThreadLocal<Integer>() {
@Override
protected Integer initialValue() {
return 0;
}
};
public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, getWorkQueue());
}
public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, getWorkQueue(), threadFactory);
}
public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, getWorkQueue(), handler);
}
public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, getWorkQueue(), threadFactory, handler);
}
protected static PriorityBlockingQueue getWorkQueue() {
return new PriorityBlockingQueue();
}
@Override
public void execute(Runnable command) {
int priority = local.get();
try {
this.execute(command, priority);
} finally {
local.set(0);
}
}
public void execute(Runnable command, int priority) {
super.execute(new PriorityRunnable(command, priority));
}
public <T> Future<T> submit(Callable<T> task, int priority) {
local.set(priority);
return super.submit(task);
}
public <T> Future<T> submit(Runnable task, T result, int priority) {
local.set(priority);
return super.submit(task, result);
}
public Future<?> submit(Runnable task, int priority) {
local.set(priority);
return super.submit(task);
}
protected static class PriorityRunnable<E extends Comparable<? super E>> implements Runnable, Comparable<PriorityRunnable<E>> {
private final static AtomicLong seq = new AtomicLong();
private final long seqNum;
Runnable run;
private int priority;
public PriorityRunnable(Runnable run, int priority) {
seqNum = seq.getAndIncrement();
this.run = run;
this.priority = priority;
}
public int getPriority() {
return priority;
}
public void setPriority(int priority) {
this.priority = priority;
}
public Runnable getRun() {
return run;
}
@Override
public void run() {
this.run.run();
}
@Override
public int compareTo(PriorityRunnable<E> other) {
int res = 0;
if (this.priority == other.priority) {
if (other.run != this.run) {// ASC
res = (seqNum < other.seqNum ? -1 : 1);
}
} else {// DESC
res = this.priority > other.priority ? -1 : 1;
}
return res;
}
}
}
下面是測(cè)試用例
package wang.lcs.sys.util;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.*;
public class PriorityThreadPoolExecutorTest {
@Test
public void testDefault() throws InterruptedException, ExecutionException {
PriorityThreadPoolExecutor pool = new PriorityThreadPoolExecutor(1, 1000, 1, TimeUnit.MINUTES);
Future[] futures = new Future[20];
StringBuffer buffer = new StringBuffer();
for (int i = 0; i < futures.length; i++) {
int index = i;
futures[i] = pool.submit(new Callable() {
@Override
public Object call() throws Exception {
Thread.sleep(10);
buffer.append(index + ", ");
return null;
}
});
}
// 等待所有任務(wù)結(jié)束
for (int i = 0; i < futures.length; i++) {
futures[i].get();
}
System.out.println(buffer);
assertEquals("0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, ", buffer.toString());
}
@Test
public void testSamePriority() throws InterruptedException, ExecutionException {
PriorityThreadPoolExecutor pool = new PriorityThreadPoolExecutor(1, 1000, 1, TimeUnit.MINUTES);
Future[] futures = new Future[10];
StringBuffer buffer = new StringBuffer();
for (int i = 0; i < futures.length; i++) {
futures[i] = pool.submit(new TenSecondTask(i, 1, buffer), 1);
}
// 等待所有任務(wù)結(jié)束
for (int i = 0; i < futures.length; i++) {
futures[i].get();
}
System.out.println(buffer);
assertEquals("01@00, 01@01, 01@02, 01@03, 01@04, 01@05, 01@06, 01@07, 01@08, 01@09, ", buffer.toString());
}
@Test
public void testRandomPriority() throws InterruptedException, ExecutionException {
PriorityThreadPoolExecutor pool = new PriorityThreadPoolExecutor(1, 1000, 1, TimeUnit.MINUTES);
Future[] futures = new Future[20];
StringBuffer buffer = new StringBuffer();
for (int i = 0; i < futures.length; i++) {
int r = (int) (Math.random() * 100);
futures[i] = pool.submit(new TenSecondTask(i, r, buffer), r);
}
// 等待所有任務(wù)結(jié)束
for (int i = 0; i < futures.length; i++) {
futures[i].get();
}
buffer.append("01@00");
System.out.println(buffer);
String[] split = buffer.toString().split(", ");
// 從 2 開始, 因?yàn)榍懊娴娜蝿?wù)可能已經(jīng)開始
for (int i = 2; i < split.length - 1; i++) {
String s = split[i].split("@")[0];
assertTrue(Integer.valueOf(s) >= Integer.valueOf(split[i + 1].split("@")[0]));
}
}
public static class TenSecondTask<T> implements Callable<T> {
private StringBuffer buffer;
int index;
int priority;
public TenSecondTask(int index, int priority, StringBuffer buffer) {
this.index = index;
this.priority = priority;
this.buffer = buffer;
}
@Override
public T call() throws Exception {
Thread.sleep(10);
buffer.append(String.format("%02d@%02d", this.priority, index)).append(", ");
return null;
}
}
}
需要說明的是: 使用了 ThreadLocal 類, 減少代碼的復(fù)制粘貼