在學(xué)習(xí)Java 多線程并發(fā)開發(fā)過程中翠霍,了解到DelayQueue類的主要作用:是一個無界的BlockingQueue,用于放置實現(xiàn)了Delayed接口的對象蠢莺,其中的對象只能在其到期時才能從隊列中取走寒匙。這種隊列是有序的,即隊頭對象的延遲到期時間最長躏将。注意:不能將null元素放置到這種隊列中锄弱。
Delayed,一種混合風(fēng)格的接口祸憋,用來標(biāo)記那些應(yīng)該在給定延遲時間之后執(zhí)行的對象会宪。此接口的實現(xiàn)必須定義一個compareTo方法,該方法提供與此接口的getDelay方法一致的排序夺衍。
在網(wǎng)上也看到兩個示例狈谊,但這兩個示例個人在實際運行時均沒有達(dá)到滿足業(yè)務(wù)場景的效果,因而對其進(jìn)行了修改沟沙,供大家參考討論河劝。
業(yè)務(wù)場景一:多考生考試
該場景來自于http://ideasforjava.iteye.com/blog/657384,模擬一個考試的日子矛紫,考試時間為120分鐘赎瞎,30分鐘后才可交卷,當(dāng)時間到了颊咬,或?qū)W生都交完卷了考試結(jié)束务甥。
這個場景中幾個點需要注意:
考試時間為120分鐘,30分鐘后才可交卷喳篇,初始化考生完成試卷時間最小應(yīng)為30分鐘
對于能夠在120分鐘內(nèi)交卷的考生敞临,如何實現(xiàn)這些考生交卷
對于120分鐘內(nèi)沒有完成考試的考生,在120分鐘考試時間到后需要讓他們強制交卷
在所有的考生都交完卷后麸澜,需要將控制線程關(guān)閉
實現(xiàn)思想:用DelayQueue存儲考生(Student類)挺尿,每一個考生都有自己的名字和完成試卷的時間,Teacher線程對DelayQueue進(jìn)行監(jiān)控,收取完成試卷小于120分鐘的學(xué)生的試卷编矾。當(dāng)考試時間120分鐘到時熟史,先關(guān)閉Teacher線程,然后強制DelayQueue中還存在的考生交卷窄俏。每一個考生交卷都會進(jìn)行一次countDownLatch.countDown()蹂匹,當(dāng)countDownLatch.await()不再阻塞說明所有考生都交完卷了,而后結(jié)束考試凹蜈。
packagecom.my.base.concurrent.delayQueue;importjava.util.Iterator;importjava.util.Random;importjava.util.concurrent.CountDownLatch;importjava.util.concurrent.DelayQueue;importjava.util.concurrent.Delayed;importjava.util.concurrent.TimeUnit;/***this project is created for my partactice.
*In the? project I will write the mybatis by myself
*
*2014-1-10? 下午9:43:48
*@author孫振超? mychaoyue2011@163.com*/publicclassExam {/***
*2014-1-10 下午9:43:48 by 孫振超
*
*@paramargs
*void
*@throwsInterruptedException*/publicstaticvoidmain(String[] args)throwsInterruptedException {//TODO Auto-generated method stubintstudentNumber = 20;
CountDownLatch countDownLatch=newCountDownLatch(studentNumber+1);
DelayQueue< Student> students =newDelayQueue();
Random random=newRandom();for(inti = 0; i < studentNumber; i++) {
students.put(newStudent("student"+(i+1), 30+random.nextInt(120),countDownLatch));
}
Thread teacherThread=newThread(newTeacher(students));
students.put(newEndExam(students, 120,countDownLatch,teacherThread));
teacherThread.start();
countDownLatch.await();
System.out.println(" 考試時間到限寞,全部交卷!");
}
}classStudentimplementsRunnable,Delayed{privateString name;privatelongworkTime;privatelongsubmitTime;privatebooleanisForce =false;privateCountDownLatch countDownLatch;publicStudent(){}publicStudent(String name,longworkTime,CountDownLatch countDownLatch){this.name =name;this.workTime =workTime;this.submitTime = TimeUnit.NANOSECONDS.convert(workTime, TimeUnit.NANOSECONDS)+System.nanoTime();this.countDownLatch =countDownLatch;
}
@OverridepublicintcompareTo(Delayed o) {//TODO Auto-generated method stubif(o ==null|| ! (oinstanceofStudent))return1;if(o ==this)return0;
Student s=(Student)o;if(this.workTime >s.workTime) {return1;
}elseif(this.workTime ==s.workTime) {return0;
}else{return-1;
}
}
@OverridepubliclonggetDelay(TimeUnit unit) {//TODO Auto-generated method stubreturnunit.convert(submitTime -System.nanoTime(),? TimeUnit.NANOSECONDS);
}
@Overridepublicvoidrun() {//TODO Auto-generated method stubif(isForce) {
System.out.println(name+ " 交卷, 希望用時" + workTime + "分鐘"+" ,實際用時 120分鐘");
}else{
System.out.println(name+ " 交卷, 希望用時" + workTime + "分鐘"+" ,實際用時 "+workTime +" 分鐘");
}
countDownLatch.countDown();
}publicbooleanisForce() {returnisForce;
}publicvoidsetForce(booleanisForce) {this.isForce =isForce;
}
}classEndExamextendsStudent{privateDelayQueuestudents;privateCountDownLatch countDownLatch;privateThread teacherThread;publicEndExam(DelayQueue students,longworkTime, CountDownLatch countDownLatch,Thread teacherThread) {super("強制收卷", workTime,countDownLatch);this.students =students;this.countDownLatch =countDownLatch;this.teacherThread =teacherThread;
}
@Overridepublicvoidrun() {//TODO Auto-generated method stubteacherThread.interrupt();
Student tmpStudent;for(Iterator iterator2 =students.iterator(); iterator2.hasNext();) {
tmpStudent=iterator2.next();
tmpStudent.setForce(true);
tmpStudent.run();
}
countDownLatch.countDown();
}
}classTeacherimplementsRunnable{privateDelayQueuestudents;publicTeacher(DelayQueuestudents){this.students =students;
}
@Overridepublicvoidrun() {//TODO Auto-generated method stubtry{
System.out.println(" test start");while(!Thread.interrupted()){
students.take().run();
}
}catch(Exception e) {//TODO: handle exceptione.printStackTrace();
}
}
}
業(yè)務(wù)場景二:具有過期時間的緩存
該場景來自于http://www.cnblogs.com/jobs/archive/2007/04/27/730255.html踪区,向緩存添加內(nèi)容時昆烁,給每一個key設(shè)定過期時間,系統(tǒng)自動將超過過期時間的key清除缎岗。
這個場景中幾個點需要注意:
當(dāng)向緩存中添加key-value對時静尼,如果這個key在緩存中存在并且還沒有過期,需要用這個key對應(yīng)的新過期時間
為了能夠讓DelayQueue將其已保存的key刪除传泊,需要重寫實現(xiàn)Delayed接口添加到DelayQueue的DelayedItem的hashCode函數(shù)和equals函數(shù)
當(dāng)緩存關(guān)閉鼠渺,監(jiān)控程序也應(yīng)關(guān)閉,因而監(jiān)控線程應(yīng)當(dāng)用守護(hù)線程
具體實現(xiàn)如下:
packagecom.my.base.concurrent.delayQueue;importjava.util.Random;importjava.util.concurrent.ConcurrentHashMap;importjava.util.concurrent.DelayQueue;importjava.util.concurrent.Delayed;importjava.util.concurrent.TimeUnit;/***Cache.java
*
* Created on 2014-1-11 上午11:30:36 by sunzhenchao mychaoyue2011@163.com*/publicclassCache{publicConcurrentHashMap map =newConcurrentHashMap();publicDelayQueue> queue =newDelayQueue>();publicvoidput(K k,V v,longliveTime){
V v2=map.put(k, v);
DelayedItem tmpItem =newDelayedItem(k, liveTime);if(v2 !=null) {
queue.remove(tmpItem);
}
queue.put(tmpItem);
}publicCache(){
Thread t=newThread(){
@Overridepublicvoidrun(){
dameonCheckOverdueKey();
}
};
t.setDaemon(true);
t.start();
}publicvoiddameonCheckOverdueKey(){while(true) {
DelayedItem delayedItem =queue.poll();if(delayedItem !=null) {
map.remove(delayedItem.getT());
System.out.println(System.nanoTime()+" remove "+delayedItem.getT() +" from cache");
}try{
Thread.sleep(300);
}catch(Exception e) {//TODO: handle exception}
}
}/*** TODO
*@paramargs
* 2014-1-11 上午11:30:36
*@author:孫振超
*@throwsInterruptedException*/publicstaticvoidmain(String[] args)throwsInterruptedException {
Random random=newRandom();intcacheNumber = 10;intliveTime = 0;
Cache cache =newCache();for(inti = 0; i < cacheNumber; i++) {
liveTime= random.nextInt(3000);
System.out.println(i+"? "+liveTime);
cache.put(i+"", i, random.nextInt(liveTime));if(random.nextInt(cacheNumber) > 7) {
liveTime= random.nextInt(3000);
System.out.println(i+"? "+liveTime);
cache.put(i+"", i, random.nextInt(liveTime));
}
}
Thread.sleep(3000);
System.out.println();
}
}classDelayedItemimplementsDelayed{privateT t;privatelongliveTime ;privatelongremoveTime;publicDelayedItem(T t,longliveTime){this.setT(t);this.liveTime =liveTime;this.removeTime = TimeUnit.NANOSECONDS.convert(liveTime, TimeUnit.NANOSECONDS) +System.nanoTime();
}
@OverridepublicintcompareTo(Delayed o) {if(o ==null)return1;if(o ==this)return0;if(oinstanceofDelayedItem){
DelayedItem tmpDelayedItem = (DelayedItem)o;if(liveTime >tmpDelayedItem.liveTime ) {return1;
}elseif(liveTime ==tmpDelayedItem.liveTime) {return0;
}else{return-1;
}
}longdiff = getDelay(TimeUnit.NANOSECONDS) -o.getDelay(TimeUnit.NANOSECONDS);returndiff > 0 ? 1:diff == 0? 0:-1;
}
@OverridepubliclonggetDelay(TimeUnit unit) {returnunit.convert(removeTime -System.nanoTime(), unit);
}publicT getT() {returnt;
}publicvoidsetT(T t) {this.t =t;
}
@OverridepublicinthashCode(){returnt.hashCode();
}
@Overridepublicbooleanequals(Object object){if(objectinstanceofDelayedItem) {returnobject.hashCode() == hashCode() ?true:false;
}returnfalse;
}
}