原始對賬系統(tǒng)拾弃,單線程
while(存在未對賬訂單){
????? pos = getPOrders();???// 查詢未對賬訂單
? ????dos = getDOrders();? ?? // 查詢派送單
? ????diff = check(pos, dos);? ?? // 執(zhí)行對賬操作
? ????save(diff);? ?? // 差異寫入差異庫
}
改為多線程:join
while(存在未對賬訂單){
? // 查詢未對賬訂單
? Thread T1 = new Thread(()->{
? ? pos = getPOrders();
? });
? T1.start();
? // 查詢派送單
? Thread T2 = new Thread(()->{
? ? dos = getDOrders();
? });
? T2.start();
? // 等待T1顽腾、T2結(jié)束
? T1.join();
? T2.join();
? diff = check(pos, dos);? ?// 執(zhí)行對賬操作
? save(diff);? ??// 差異寫入差異庫
}
創(chuàng)建線程耗時,用線程池優(yōu)化:?CountDownLatch?
Executor executor =Executors.newFixedThreadPool(2);??// 創(chuàng)建2個線程的線程池
while(存在未對賬訂單){
? CountDownLatch latch =new CountDownLatch(2);???// 計數(shù)器初始化為2
? // 查詢未對賬訂單
? executor.execute(()-> {
? ? pos = getPOrders();
? ? latch.countDown();
? });
? // 查詢派送單
? executor.execute(()-> {
? ? dos = getDOrders();
? ? latch.countDown();
? });
? latch.await();??// 等待兩個查詢操作結(jié)束
? diff = check(pos, dos);???// 執(zhí)行對賬操作
? save(diff);??? // 差異寫入差異庫
}
前面我們將 getPOrders() 和 getDOrders() 這兩個查詢操作并行了丑慎,但這兩個查詢操作和對賬操作 check()迟螺、save() 之間還是串行的拔妥。很顯然赶掖,這兩個查詢操作和對賬操作也是可以并行的,也就是說七扰,在執(zhí)行對賬操作的時候奢赂,可以同時去執(zhí)行下一輪的查詢操作,這個過程可以形象化地表述為下面這幅示意圖颈走。
那接下來我們再來思考一下如何實現(xiàn)這步優(yōu)化膳灶,兩次查詢操作能夠和對賬操作并行,對賬操作還依賴查詢操作的結(jié)果立由,這明顯有點生產(chǎn)者 - 消費者的意思轧钓,兩次查詢操作是生產(chǎn)者,對賬操作是消費者锐膜。既然是生產(chǎn)者 - 消費者模型毕箍,那就需要有個隊列,來保存生產(chǎn)者生產(chǎn)的數(shù)據(jù)道盏,而消費者則從這個隊列消費數(shù)據(jù)而柑。不過針對對賬這個項目,我設計了兩個隊列荷逞,并且兩個隊列的元素之間還有對應關系媒咳。具體如下圖所示,訂單查詢操作將訂單查詢結(jié)果插入訂單隊列种远,派送單查詢操作將派送單插入派送單隊列涩澡,這兩個隊列的元素之間是有一一對應的關系的。兩個隊列的好處是坠敷,對賬操作可以每次從訂單隊列出一個元素妙同,從派送單隊列出一個元素,然后對這兩個元素執(zhí)行對賬操作膝迎,這樣數(shù)據(jù)一定不會亂掉渐溶。
下面再來看如何用雙隊列來實現(xiàn)完全的并行。一個最直接的想法是:一個線程 T1 執(zhí)行訂單的查詢工作弄抬,一個線程 T2 執(zhí)行派送單的查詢工作茎辐,當線程 T1 和 T2 都各自生產(chǎn)完 1 條數(shù)據(jù)的時候,通知線程 T3 執(zhí)行對賬操作。這個想法雖看上去簡單拖陆,但其實還隱藏著一個條件弛槐,那就是線程 T1 和線程 T2 的工作要步調(diào)一致,不能一個跑得太快依啰,一個跑得太慢乎串,只有這樣才能做到各自生產(chǎn)完 1 條數(shù)據(jù)的時候,通知線程 T3速警。下面這幅圖形象地描述了上面的意圖:線程 T1 和線程 T2 只有都生產(chǎn)完 1 條數(shù)據(jù)的時候叹誉,才能一起向下執(zhí)行,也就是說闷旧,線程 T1 和線程 T2 要互相等待长豁,步調(diào)要一致;同時當線程 T1 和 T2 都生產(chǎn)完一條數(shù)據(jù)的時候忙灼,還要能夠通知線程 T3 執(zhí)行對賬操作匠襟。
Vector<P> pos;? ?// 訂單隊列
Vector<D> dos;? ?// 派送單隊列
Executor executor = Executors.newFixedThreadPool(1);??// 執(zhí)行回調(diào)的線程池
final CyclicBarrier barrier = new CyclicBarrier(2, ()->{
? ????? executor.execute(()->check());? ? ?//等到barrier.await()減為0才執(zhí)行
? });
void check(){
? P p = pos.remove(0);
? D d = dos.remove(0);
? diff = check(p, d);? ?// 執(zhí)行對賬操作
? save(diff);??? // 差異寫入差異庫
}
void checkAll(){
? // 循環(huán)查詢訂單庫
? Thread T1 = new Thread(()->{
? ? while(存在未對賬訂單){
? ? ????? pos.add(getPOrders());??? ? ? // 查詢訂單庫
? ? ? ????barrier.await();??? ? ? // 等待
? ? }
? });
? T1.start();?
? // 循環(huán)查詢運單庫
? Thread T2 = new Thread(()->{
? ? while(存在未對賬訂單){
? ? ????? dos.add(getDOrders());???// 查詢運單庫
? ? ? ????barrier.await();??? ? ? // 等待
? ? }
? });
? T2.start();
}
1.為啥要用線程池,而不是在回調(diào)函數(shù)中直接調(diào)用该园?
2.線程池為啥使用單線程的酸舍?
我的考慮:
1.使用線程池是為了異步操作,否則回掉函數(shù)是同步調(diào)用的里初,也就是本次對賬操作執(zhí)行完才能進行下一輪的檢查啃勉。
2.線程數(shù)量固定為1,防止了多線程并發(fā)導致的數(shù)據(jù)不一致双妨,因為訂單和派送單是兩個隊列璧亮,只有單線程去兩個隊列中取消息才不會出現(xiàn)消息不匹配的問題。