最近看了許多消息隊列的資料,也就試著自己實現(xiàn)了下,有問題歡迎一起探討
設(shè)計說明
大體上的設(shè)計是由一條線程1執(zhí)行從等待列表中獲取任務(wù)插入任務(wù)隊列再由線程池中的線程從任務(wù)隊列中取出任務(wù)去執(zhí)行.
添加一條線程1主要是防止在執(zhí)行耗時的任務(wù)時阻塞主線程.當(dāng)執(zhí)行耗時任務(wù)時,添加的任務(wù)的操作快于取出任務(wù)的操作,
當(dāng)任務(wù)隊列長度達(dá)到最大值時,線程1將被阻塞,等待線程2,3...從任務(wù)隊列取出任務(wù)執(zhí)行扯俱。
實現(xiàn)
1.編寫任務(wù)模型
public abstract class TaskBase implements Serializable,Comparable{
public long taskId;
public int priority; //任務(wù)優(yōu)先級,約大優(yōu)先級越高
public TaskBase(int priority){
this.priority = priority;
}
//任務(wù)被執(zhí)行時調(diào)用
public abstract void taskExc();
@Override
public int compareTo(Object o) {
TaskBase taskBase = (TaskBase) o;
if (priority > taskBase.priority){
return -1;
}else if (priority < taskBase.priority){
return 1;
}
return 0;
}
}
2.編寫任務(wù)隊列
public class TaskQueue {
private final int QUEUE_SIZE = 20; //任務(wù)隊列大小
private final List<TaskBase> mWaitList = new ArrayList<TaskBase>();
private final PriorityBlockingQueue<TaskBase> mTaskQueue = new PriorityBlockingQueue(QUEUE_SIZE);
private ExecutorService mThreadPool;
private ExecutorService mAddThread;
private final int mThreadSize;
public TaskQueue(int threadSize){
mThreadPool = Executors.newFixedThreadPool(threadSize);
mAddThread = Executors.newSingleThreadExecutor();
mThreadSize = threadSize;
}
public void start(){
for (int i=0; i<mThreadSize; i++){
mThreadPool.execute(new TaskDispatcher(mTaskQueue));
}
mAddThread.execute(new TaskAddDispatcher(mWaitList,mTaskQueue));
}
public void stop(){
if (mThreadPool != null && !mThreadPool.isShutdown()){
mThreadPool.shutdown();
}
}
public boolean addTask(TaskBase taskBase){
synchronized (mWaitList){
return mWaitList.add(taskBase);
}
}
public boolean addTask(List<TaskBase> taskBases){
synchronized (mWaitList){
return mWaitList.addAll(taskBases);
}
}
public boolean retry(TaskBase taskBase){
synchronized (mWaitList){
if (mWaitList.contains(taskBase)){
return false;
}
return mWaitList.add(taskBase);
}
}
public boolean remove(TaskBase taskBase){
synchronized (mWaitList){
return mWaitList.remove(taskBase);
}
}
}
3.編寫添加任務(wù)到等待列表線程
public class TaskAddDispatcher extends Thread {
private List<TaskBase> mWaitList;
private BlockingQueue<TaskBase> mTaskQueue;
public TaskAddDispatcher(List<TaskBase> waitList, BlockingQueue<TaskBase> taskQueue) {
mWaitList = waitList;
mTaskQueue = taskQueue;
}
@Override
public void run() {
if (mWaitList == null) return;
while (true) {
if (!mWaitList.isEmpty() && mTaskQueue != null) {
synchronized (mWaitList) {
mTaskQueue.add(mWaitList.remove(0));
}
} else {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
4.編寫任務(wù)工作線程
public class TaskDispatcher extends Thread{
private BlockingQueue<TaskBase> mTaskQueue;
public TaskDispatcher(BlockingQueue<TaskBase> taskQueue){
mTaskQueue = taskQueue;
}
@Override
public void run() {
while (true){
try {
if (mTaskQueue != null){
TaskBase task = mTaskQueue.take();
task.taskExc();
}
} catch (InterruptedException e) {
e.printStackTrace();
continue;
}
}
}
}
5.編寫管理類
public class TaskManager {
public final int THREAD_SIZE = 3;
private static TaskManager mTaskManager;
private TaskQueue mTaskQueue;
private TaskManager(){
mTaskQueue = new TaskQueue(THREAD_SIZE);
}
public synchronized static TaskManager getInstance(){
if (mTaskManager == null){
mTaskManager = new TaskManager();
}
return mTaskManager;
}
public boolean addTask(TaskBase taskBase){
return mTaskQueue.addTask(taskBase);
}
public boolean addTask(List<TaskBase> taskBases){
return mTaskQueue.addTask(taskBases);
}
public boolean retryTask(TaskBase taskBase){
return mTaskQueue.retry(taskBase);
}
public boolean cancelTask(TaskBase taskBase){
return mTaskQueue.remove(taskBase);
}
public void start(){
mTaskQueue.start();
}
public void stop(){
mTaskQueue.stop();
}
}
使用
1.繼承TaskBase實現(xiàn)taskExc()方法
public class TestBean extends TaskBase{
public TestBean(int priority) {
super(priority);
}
public TestBean(){
super(0);
}
@Override
public void taskExc() {
Log.d(TestBean.class.getName(), "tasksuccess,priority==>" + priority);
excDelayTask();
}
private void excDelayTask(){
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
2.啟動所有工作線程
TaskManager.getInstance().start()
3.添加任務(wù)
TaskManager.getInstance().add(new TestBean());