背景
在公司做某個(gè)項(xiàng)目的時(shí)候女器,所有數(shù)據(jù)從第三方通過調(diào)用接口的方式獲取到。但是第三方返回的數(shù)據(jù)有新老數(shù)據(jù)之分栗竖,所謂新老數(shù)據(jù)之分,是指某一條數(shù)據(jù)渠啤,你對(duì)它進(jìn)行了修改狐肢,數(shù)據(jù)庫會(huì)存兩條,舊的一條估計(jì)是作為歷史版本數(shù)據(jù)沥曹。但是份名,第三方返給我的數(shù)據(jù)是沒有一個(gè)字段去標(biāo)識(shí)這個(gè)新舊數(shù)據(jù)的碟联。所以,這就需要去做個(gè)過濾僵腺。為了簡(jiǎn)便鲤孵,在獲取到所有數(shù)據(jù)之后,把新數(shù)據(jù)單獨(dú)存表辰如,后面的操作以這個(gè)表為主表普监,就可以避免很多麻煩∩ッ唬可能很多人看完這個(gè)第一感覺會(huì)把這個(gè)問題歸于去重鹰椒。后面我否定了,我認(rèn)為去重呕童,是指數(shù)據(jù)存在多條一模一樣的漆际,去掉重復(fù)的,隨便取一條夺饲,或者我認(rèn)為這個(gè)比去重稍稍復(fù)雜一點(diǎn)奸汇。(條條大路通羅馬,不一定非得最優(yōu)解嘛往声,視圖就不說了)
正題
第三方提供的接口很多擂找,為了提高效率,采用多線程的方式去拉去數(shù)據(jù)浩销。那么問題來了贯涎,一次同時(shí)跑多少個(gè)線程?越多越好嗎?答案肯定是否定的。至于具體多少個(gè)慢洋,網(wǎng)上給出了答案塘雳。我個(gè)人沒有測(cè)試,有興趣你們可以試試普筹。
拿來主義之網(wǎng)上說-最好起cpu核心數(shù)量x2個(gè)線程或者 cpu核心數(shù)量x2+2個(gè)線程败明。重新扯回正題:如何實(shí)現(xiàn)先把所有數(shù)據(jù)拉取完了之后,在單獨(dú)起一個(gè)線程去做數(shù)據(jù)收集太防?
這就需要對(duì)線程做一些控制了妻顶。網(wǎng)上給出了幾種方案,我選擇了使用CountDownLatch線程輔助同步類蜒车。
示例代碼
這是主測(cè)試代碼讳嘱,功能就是創(chuàng)建一個(gè)固定大小為5的線程池。用線程池去跑11線程酿愧。這里面主要是要注意設(shè)置需要等待線程的數(shù)量呢燥。CountDownLatch countDownLatch = new CountDownLatch(10);這個(gè)表示線程記數(shù)為10個(gè)寓娩。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Mian {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(5);
List<Runnable> taskList = new ArrayList<Runnable>();
CountDownLatch countDownLatch = new CountDownLatch(10);//這個(gè)是重點(diǎn)
taskList.add(new Special(countDownLatch));
taskList.add(new Common(countDownLatch));
taskList.add(new Common(countDownLatch));
taskList.add(new Common(countDownLatch));
taskList.add(new Common(countDownLatch));
taskList.add(new Common(countDownLatch));
taskList.add(new Common(countDownLatch));
taskList.add(new Common(countDownLatch));
taskList.add(new Common(countDownLatch));
taskList.add(new Common(countDownLatch));
taskList.add(new Common(countDownLatch));
taskList.forEach(t -> {
executorService.execute(t);
});
executorService.shutdown();
}
}
下面是普通的線程代碼叛氨,這個(gè)里面需要注意的就是countDownLatch.countDown()這句話呼渣,把線程記數(shù)值減一。這步操作也是加鎖的寞埠,因?yàn)橛洈?shù)對(duì)于所有線程來說都是共享的屁置,多線程操作共享變量,你懂得不加鎖會(huì)怎樣仁连。
package thread;
import java.util.concurrent.CountDownLatch;
public class Common implements Runnable {
private CountDownLatch countDownLatch;
public Common(CountDownLatch countDownLatch) {
this.setCountDownLatch(countDownLatch);
}
@Override
public void run() {
synchronized (countDownLatch) {
System.out.println("進(jìn)入普通線程了");
countDownLatch.countDown();
System.out.println("目前的線程記數(shù)值為:" + countDownLatch.getCount());
}
}
public CountDownLatch getCountDownLatch() {
return countDownLatch;
}
public void setCountDownLatch(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
}
下面是特定的線程蓝角,它需要等待我前面設(shè)置的線程跑完之后在運(yùn)行。這里面主要的代碼就是countDownLatch.await()饭冬;這句話的就是去判斷線程記數(shù)值是否為0使鹅。若是0就會(huì)繼續(xù)往下走,否則就會(huì)阻塞昌抠。
package thread;
import java.util.concurrent.CountDownLatch;
public class Special implements Runnable {
private CountDownLatch countDownLatch;
public Special(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
System.out.println("進(jìn)入特殊線程");
System.out.println("特殊線程患朱,此時(shí)線程記數(shù)值為:" + countDownLatch.getCount());
try {
countDownLatch.await();//這是重點(diǎn)
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("特殊線程,此時(shí)線程記數(shù)值為:" + countDownLatch.getCount());
System.out.println("特殊線程炊苫,結(jié)束特殊線程");
}
public CountDownLatch getCountDownLatch() {
return countDownLatch;
}
public void setCountDownLatch(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
}
運(yùn)行結(jié)果:
大家可以看到雖然一開始打印了進(jìn)入特殊線程裁厅,但是卻并未輸出結(jié)束特殊線程,而是最后才執(zhí)行侨艾,說明
當(dāng)運(yùn)行到紅框的時(shí)候执虹,該線程就進(jìn)入了阻塞。直到其他所有線程運(yùn)行完畢唠梨,才重新喚醒它往后執(zhí)行袋励。到這兒可以說是已經(jīng)實(shí)現(xiàn)了我想要的效果。
下面我在截幾張運(yùn)行結(jié)果的圖
細(xì)心的小伙伴可能發(fā)現(xiàn)咋后面的數(shù)字每次都感覺不一樣当叭。線程池嘛插龄,執(zhí)行五個(gè)線程,這五個(gè)線程是有先后的嘛科展。
源碼賞析
countDownLatch.countDown()這個(gè)方法會(huì)把記數(shù)值減一(默認(rèn)的哈),同時(shí)當(dāng)值為零的時(shí)候糠雨,就會(huì)去喚醒出于阻塞的線程才睹。貼哈源碼。
我個(gè)人理解unpark是真正的喚醒機(jī)制甘邀,這個(gè)是屬于操作系統(tǒng)層面的東西了琅攘。(個(gè)人拙見)
countDownLatch.await()這個(gè)就比較簡(jiǎn)單了,當(dāng)值不為零就阻塞松邪,否則就直接運(yùn)行了坞琴。貼哈源碼
tryAcquireShared判斷是否阻塞,doAcquireSharedInterruptibly阻塞的具體操作(個(gè)人拙見)
結(jié)語
對(duì)多線程的操作呢逗抑,我也是個(gè)菜鳥剧辐,如有錯(cuò)誤之處寒亥,望不吝賜教。