主要內(nèi)容
1. 什么是異步
2. 軟件設(shè)計(jì)中如何實(shí)現(xiàn)異步操作
2.1 Callback機(jī)制
2.1.1 asynchronous callback
2.1.2 Event-Listener
2.2 Future機(jī)制
2.2.1 Future
2.2.2 Promise
3. 異步編程的優(yōu)勢和不足
4. 總結(jié)
1.什么是異步
程序或系統(tǒng)中關(guān)于異步的概念使用的比較多,那么什么是異步呢携冤?下面舉個(gè)生活中最常見的情景來進(jìn)行說明:
叫外賣
這個(gè)例子中盒刚,訂餐者作為Caller细诸,一份蛋炒飯的制作以及配送作為一個(gè)Task惕它,Caller只需要打個(gè)電話給餐廳怕午,說明需求即可,然后就可以繼續(xù)執(zhí)行自己的任務(wù)"學(xué)習(xí)"淹魄。而待完成的Task交由對方來完成郁惜,Caller只需要在Task完成后得到通知即可。這就是一個(gè)典型的異步過程甲锡。
2.軟件設(shè)計(jì)中如何實(shí)現(xiàn)異步操作
軟件開發(fā)過程中我們經(jīng)常會(huì)遇到異步的情況兆蕉,比如:網(wǎng)絡(luò)編程中的異步IO,Web開發(fā)中的異步Servlet缤沦,Ajax等等虎韵。從程序設(shè)計(jì)的角度來說,異步操作的實(shí)現(xiàn)主要可以通過以下兩種方式實(shí)現(xiàn):
- 異步回調(diào)機(jī)制
- Future機(jī)制
2.1 異步Callback機(jī)制
Callback指的就是回調(diào)機(jī)制缸废,回調(diào)機(jī)制通常指的是將可執(zhí)行的code作為參數(shù)傳遞給其它的code包蓝,并在合適的時(shí)機(jī)執(zhí)行。當(dāng)然企量,這里的“合適的時(shí)機(jī)”可能是異步的测萎,也可能是同步的裸诽。前者就是我們要討論的異步Callback機(jī)制早龟。
異步Callback機(jī)制在具體實(shí)現(xiàn)上也會(huì)有不同的方案慢哈,比如:普通的回調(diào)函數(shù)或事件監(jiān)聽模式上面所有的方法均是基于回調(diào)函數(shù)來完成異步操作的易核,無非是對回調(diào)函數(shù)進(jìn)行封裝而已函似。
2.1.1 asynchronous callback
在c語言中身诺,可以以函數(shù)指針的形式來實(shí)現(xiàn)回調(diào)函數(shù)的傳遞届垫,但是我們知道Java中是不支持函數(shù)指針的驮肉,不過別忘了拇勃!我們還有接口呢四苇!
情景描述
模擬ZooKeeper中client端異步對server端進(jìn)行操作,這里就只模擬create node的操作方咆。
上圖是整個(gè)創(chuàng)建流程的時(shí)序圖月腋,步驟如下:
- 實(shí)例化Client客戶端
- 啟動(dòng)Client內(nèi)部的Worker線程
- User調(diào)用asyncCreate方法向RemoteServer發(fā)起創(chuàng)建Node節(jié)點(diǎn)請求。
- 將請求信息封裝成Packet對象瓣赂,加入Client內(nèi)部的BlockQueue中榆骚。
- Worker不斷輪詢BlockQueue,通過take()方法取出隊(duì)列中的待發(fā)送Packet煌集。
- 將請求發(fā)送到RemoteServer
- 接受從RemoteServer返回的響應(yīng)妓肢。
- 回調(diào)Callback接口的process方法。
Test Code
@Test
public void testCreateNode() throws InterruptedException{
final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger count = new AtomicInteger(0);
Client client = new Client("localhost",8888);
long begin = System.currentTimeMillis();
client.asyncCreate("exist NodeInfo", new CallBack(){
@Override
public void process(int rc, Object response, Object ctx) {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {}
count.incrementAndGet();
latch.countDown();
Assert.assertEquals("I'm context", ctx);
}
}, "I'm context");
//asyncCreate return immediately
Assert.assertTrue((System.currentTimeMillis() - begin) < 1000);
latch.await();
Assert.assertEquals(1, count.intValue());
}
CallBack接口實(shí)現(xiàn)如下:
/**
* 模擬異步回調(diào)接口
*
* @author wqx
*
*/
public interface CallBack{
/**
* 回調(diào)函數(shù)
*
* @param rc: result code
* @param response
* @param ctx : context
*/
void process(int rc, Object response,Object ctx);
}
回調(diào)函數(shù)process中有三個(gè)參數(shù):
- rc: 返回碼
- response:返回值對象
- ctx:上下文對象context
信息載體Packet實(shí)現(xiàn)如下:
/**
* Packet對象:封裝request和response對象
*
* @author wqx
*
*/
public class Packet{
private Object request;
private CallBack cb;
private Object ctx;
private Object response;
private int errorCode;
//苫纤。碉钠。纲缓。
}
Client端實(shí)現(xiàn)
Client的設(shè)計(jì)很簡單,既然需要實(shí)現(xiàn)異步喊废,那么就只要將任務(wù)交給別人做嘍祝高。這里的別人就是一個(gè)叫Worker的工作線程。Client設(shè)計(jì)如下:
public class Client {
private final BlockingQueue<Packet> outgoingQueue = new LinkedBlockingQueue<Packet>();
private Worker worker;
public Client(String host, int port){
worker = new Worker(host,port);
worker.start();
}
/**
*
* @param nodeInfo:模擬節(jié)點(diǎn)信息
* @param cb:回調(diào)函數(shù)
* @param ctx:上下文信息context
*/
public void asyncCreate(String nodeInfo,CallBack cb, Object ctx){
Packet packet = new Packet();
packet.setRequest(nodeInfo);
packet.setCb(cb);
packet.setCtx(ctx);
outgoingQueue.offer(packet);
}
class Worker extends Thread {//...}
}
當(dāng)?shù)谌秸{(diào)用Client的asyncCreate方法后污筷,asyncCreate做的只是將參數(shù)封裝如Packet對象中工闺,并添加入outgoingQueue發(fā)送隊(duì)列,然后立即返回瓣蛀÷襟。可見,這一過程并不會(huì)發(fā)生阻塞揪惦。那么Worker的任務(wù)也就很明確了:不多的從outgoingQueue中取出Packet對象遍搞,然后發(fā)送到Server端,然后接收Server端返回的信息器腋。
class Worker extends Thread {
private String host;
private int port;
public Worker(String host, int port){
this.host = host;
this.port = port;
setDaemon(true);
}
public void run(){
Packet packet = null;
try {
packet = outgoingQueue.take();
} catch (InterruptedException e) {}
Object resp = sendPacket(packet);
Packet p = (Packet)resp;
//執(zhí)行回調(diào)函數(shù)
packet.getCb().process(p.getErrorCode(), p.getResponse(), p.getCtx());
}
public Object sendPacket(Packet packet){
Object resp = null;
try{
Socket socket = new Socket(host,port);
OutputStream oos = socket.getOutputStream();
InputStream ois = socket.getInputStream();
try{
oos.write(JSON.toJSONString(packet).getBytes());
oos.flush();
byte[] buf = new byte[1024];
int recvSize = ois.read(buf);
String text = new String(buf,0,recvSize);
resp = JSON.parseObject(text, Packet.class);
}finally{
if(oos != null){
oos.close();
}
if(ois != null){
ois.close();
}
socket.close();
}
}catch(Exception e){}
return resp;
}
}
2.1.2 Event-Listener
監(jiān)聽器模式:事件源經(jīng)過事件的封裝傳給監(jiān)聽器溪猿,當(dāng)事件源觸發(fā)事件后,監(jiān)聽器接收到事件對象可以回調(diào)事件的方法.這一處理方法我們平時(shí)接觸的非常多了纫塌,Servlet中HttpSessionListener诊县、ServletContextListener等。
這一思路體現(xiàn)的軟件設(shè)計(jì)基本原則是:重要的狀態(tài)變更需要發(fā)送事件并留出監(jiān)聽接口措左。
情景描述
系統(tǒng)每晚定時(shí)進(jìn)行批處理任務(wù)依痊,如果任務(wù)失敗則需要進(jìn)行報(bào)警操作。
Test Code
測試用的批處理任務(wù)類DummyBatchTask怎披,繼承了抽象類BatchTask胸嘁,根據(jù)傳輸入的type值,執(zhí)行不同的操作凉逛,這主要方便測試性宏。
public class DummyBatchTask extends BatchTask {
/*
* 測試類型
*/
private int type;
public DummyBatchTask(String taskName, int type){
super(taskName);
this.type = type;
}
@Override
void process() {
System.out.println(taskName + " begin....");
switch(type){
case 1:
//process normally
break;
case 2:
//exception case
throw new NullPointerException();
}
System.out.println(taskName + " completed....");
}
}
用戶自定義監(jiān)聽類BatchTaskListener:
public class BatchTaskListener implements Listener<BatchTask> {
private DummyWarningService warningService;
public BatchTaskListener(DummyWarningService warningService){
this.warningService = warningService;
}
@Override
public void onSuccess(Event<BatchTask> event) {
warningService.dummyWarning(event.getElement(),"Success");
}
@Override
public void onException(Event<BatchTask> event, Throwable t) {
warningService.dummyWarning(event.getElement(),t);
}
}
下面是測試類EventListenerTest的單元測試用例:
@Test
public void testTaskSuccess() {
DummyWarningService warningService = new DummyWarningService();
BatchTaskListener listener = new BatchTaskListener(warningService);
DummyBatchTask task = new DummyBatchTask("DummyTask-01",1);
BatchTaskManager manager = new BatchTaskManager();
manager.task(task)
.listener(listener)
.process();
Assert.assertEquals("DummyTask-01", warningService.taskName);
Assert.assertEquals("Success", warningService.warningMsg);
}
@Test
public void testTaskException() {
DummyWarningService warningService = new DummyWarningService();
BatchTaskListener listener = new BatchTaskListener(warningService);
DummyBatchTask task = new DummyBatchTask("DummyTask-02",2);
BatchTaskManager manager = new BatchTaskManager();
manager.task(task)
.listener(listener)
.process();
Assert.assertEquals("DummyTask-02", warningService.taskName);
Assert.assertEquals(NullPointerException.class, warningService.warningMsg.getClass());
}
設(shè)計(jì)
涉及到的主要組件如下:
- BatchTask:批處理任務(wù)抽象類,用戶只要繼承該類状飞,實(shí)現(xiàn)其process方法即可毫胜。
- BatchTaskManager:BatchTask實(shí)例化后提交給BatchTaskManager,BatchTaskManager負(fù)責(zé)BatchTask的執(zhí)行和監(jiān)控诬辈。
- Event:事件接口
- EventType:事件類型
- BatchTaskEvent:具體的事件
- Listener:監(jiān)聽接口
(1)首先看下BatchTask的實(shí)現(xiàn):
public abstract class BatchTask {
public String taskName;
public BatchTask(String taskName){
this.taskName = taskName;
}
public String getTaskName(){
return taskName;
}
abstract void process() throws TimeoutException;
}
BatchTask結(jié)構(gòu)很簡單酵使,有一個(gè)成員屬性taskName,具體的任務(wù)類需要繼承BatchTask焙糟,并實(shí)現(xiàn)父類中的process方法口渔。
(2) Listener接口是一個(gè)泛型回調(diào)接口,當(dāng)被監(jiān)聽的實(shí)體對象的狀態(tài)發(fā)生變化的時(shí)候穿撮,就需要觸發(fā)監(jiān)聽器的相應(yīng)方法缺脉。這里的Listener接口中只有兩個(gè)方法瞧哟,監(jiān)聽兩種狀態(tài):成功或異常。這其實(shí)需要更具具體業(yè)務(wù)具體分析枪向。
/**
* Listener接口
*
* @author wqx
*
* @param <T>
*/
public interface Listener<T> {
/**
* success
*
* @param event
*/
public void onSuccess(Event<T> event);
/**
* failure
*
* @param event
* @param t
*/
void onException(Event<T> event, Throwable t);
}
(3) 這里最重要的就是BatchTaskManager,其作為供Client端使用的API咧党,提供了一系列操作BatchTask和listener的接口秘蛔。下面是code:
public class BatchTaskManager {
private BatchTask task;
private Listener<BatchTask> listener;
public BatchTaskManager task(BatchTask task){
this.task = task;
return this;
}
public BatchTaskManager listener(Listener<BatchTask> listener){
this.listener = listener;
return this;
}
public void process(){
if(task == null)
throw new IllegalArgumentException("Task is null");
boolean success = false;
try{
task.process();
success = true;
}catch(Throwable t){
listener.onException(new BatchTaskEvent(task,EventType.ERROR), t);
}
if(success){
listener.onSuccess(new BatchTaskEvent(task,EventType.SUCCESS));
}
}
}
在BatchTaskManager的process方法中實(shí)現(xiàn)了批處理任務(wù)task的執(zhí)行操作,同時(shí)對于任務(wù)狀態(tài)的變更留出了監(jiān)聽接口傍衡。
2.2 Future機(jī)制
Future表示一個(gè)異步計(jì)算的結(jié)果深员,并提供相應(yīng)方法來判斷任務(wù)是否已經(jīng)完成或者取消,以及獲取任務(wù)結(jié)果或取消任務(wù)蛙埂。最熟悉的莫過于java.util.concurrent.Future倦畅。而Promise可以認(rèn)為是一個(gè)可寫的Future,調(diào)用者可以通過Promise標(biāo)記任務(wù)是失敗或成功绣的。下面介紹下這兩種組件叠赐。
2.2.1 Future
Future既然是異步任務(wù)的抽象,那么任務(wù)提交后我們就可以做別的事情了屡江,提交后的任何時(shí)刻都可能詢問任務(wù)是否完成isDone芭概?是否被取消isCancelled?可能隨時(shí)取消任務(wù)cancel惩嘉,也能通過get()方法來獲取任務(wù)結(jié)果罢洲。juc中的FutureTask是Future唯一實(shí)現(xiàn),表示一種抽象的可生產(chǎn)結(jié)果的計(jì)算文黎。FutureTask表示的計(jì)算通過Callable實(shí)現(xiàn)惹苗,Callable相當(dāng)于有返回值的Runnable。這里以Future的實(shí)現(xiàn)類FutureTask為例:
private static ExecutorService exec;
private static int nThreads = Runtime.getRuntime().availableProcessors() * 2;
@Test
public void testFuture() throws InterruptedException, ExecutionException{
FutureTask<String> f = new FutureTask<String>(new Callable<String>(){
@Override
public String call() throws Exception {
String result = "Hello World";
return result;
}
});
exec.submit(f);
Assert.assertTrue(!f.isDone());
Assert.assertEquals("Hello World", f.get());
}
java.util.concurrent.Future的不足
首先從上面的例子中可以看到耸峭,如果想要獲取異步任務(wù)的結(jié)果桩蓉,我們需要調(diào)用Future的get()方法,這個(gè)操作會(huì)阻塞到異步任務(wù)完成為止抓艳。這其實(shí)和異步編程思想是違背的触机。
通常在異步編程中,我們只要明確任務(wù)完成后做什么操作玷或,而不是等待任務(wù)的結(jié)果儡首。
這也是juc中的Future在功能上的很明顯的缺陷。不過偏友,幸運(yùn)的是它的改進(jìn)方案很多蔬胯,比如:Guava中的ListenableFuture;Netty中的自定義的Future位他。兩者的實(shí)現(xiàn)方式類似氛濒,都是通過向Future注冊一個(gè)callback函數(shù)产场,只要異步任務(wù)一完成,則直接調(diào)用該回調(diào)函數(shù)舞竿。以Netty中的Future為例京景,使用過程如下:
//submit a task to thread pool
Future<?> f = exec.submit(new Runnable() { ... });
f.addListener(new FutureListener<?> {
public void operationComplete(Future<?> f) {
//operationComplete to be executed once the task is complete
}
});
2.2.2 Promise
Promise實(shí)際上就是一個(gè)可寫的Future,它是Future的一種改進(jìn)骗奖。什么是可寫的Future确徙?對于2.2.1小節(jié)中介紹的Future來說,從主流程的角度执桌,只有通過cancel一種方式來改變其狀態(tài)鄙皇。那么Promise作為可寫的Future,其對于Future的改變可以通過多個(gè)操作實(shí)現(xiàn)仰挣。一般Promise都會(huì)實(shí)現(xiàn)一系列setXXX方法用以改變Future狀態(tài)伴逸。下面以Netty中的Promise為例進(jìn)行說明,接口如下:
/**
* Special {@link Future} which is writable.
*/
public interface Promise<V> extends Future<V> {
Promise<V> setSuccess(V result);
Promise<V> setFailure(Throwable cause);
@Override
Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
Promise<V> await() throws InterruptedException;
@Override
Promise<V> awaitUninterruptibly();
}
可以看到Promise中的setSuccess和setFailure兩個(gè)方法,通過這兩個(gè)方法就可以從外部改變其狀態(tài)膘壶。此外還可以看到添加和移除Listener的操作接口错蝴,這其實(shí)就是對java.util.concurrent.Future的不足的改進(jìn),正如上一小節(jié)所說的那樣香椎。下面截取Netty中Bootstrap中的connect(SocketAddress remoteAddress)方法的一部分實(shí)現(xiàn):
if (regFuture.isDone()) {
doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
} else {
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
}
});
}
Bootstrap發(fā)起連接涉及到兩個(gè)操作:(1)register漱竖,(2)connect。在Netty中所有的操作都是異步的畜伐,上面代碼中的regFuture代表一個(gè)register異步操作結(jié)果馍惹。如果注冊操作完成了,即:regFuture.isDone() = true玛界,那么進(jìn)行連接操作万矾,具體由doConnect0實(shí)現(xiàn)。否則慎框,向regFuture添加一個(gè)監(jiān)聽器良狈,只有當(dāng)register操作完成后,才會(huì)進(jìn)行doConnect0操作笨枯。
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
}
});
Netty中出現(xiàn)大量的利用GenericFutureListener機(jī)制代替Future的get方法薪丁。原因很簡單:異步操作的時(shí)間是無法預(yù)測的,如果不設(shè)置超時(shí)時(shí)間馅精,即用get()方法严嗜,那么會(huì)導(dǎo)致該線程被長時(shí)間阻塞。而設(shè)置了超時(shí)時(shí)間洲敢,即用get(timeout漫玄,TimeUnit)方法,這個(gè)超時(shí)的時(shí)間由無法精準(zhǔn)預(yù)測。此時(shí)睦优,利用異步通知機(jī)制回調(diào)GenericFutureListener是最佳方案渗常。
3 異步編程的優(yōu)勢和不足
異步編程的優(yōu)勢
異步編程的優(yōu)勢是顯而易見的,當(dāng)我們的業(yè)務(wù)邏輯中有一部分的任務(wù)屬于耗時(shí)型Task的時(shí)候汗盘,可以將這樣的任務(wù)分發(fā)給別的線程進(jìn)行處理皱碘,當(dāng)前線程可以繼續(xù)進(jìn)行工作。這可以極大提升了程序的性能隐孽。此外尸执,當(dāng)我們的業(yè)務(wù)中存在對外界的依賴(這里的“對外界的依賴”指的是:比如網(wǎng)絡(luò)連接的建立、SQL連接的建立缓醋、和外部系統(tǒng)的通信等等),異步實(shí)現(xiàn)的方案可以有效绊诲、便捷的處理各種失敗送粱、異常的情況,增強(qiáng)程序的健壯性掂之。
異步編程的不足
首先抗俄,對于習(xí)慣于順序編程的人來說,任務(wù)的順序執(zhí)行更加有條理性世舰,Task1到TaskN一步一步執(zhí)行动雹,每一步的結(jié)果作為下面步驟的輸入,最終得到我們想要的結(jié)果跟压。而異步編程模型中胰蝠,如何在主流程中獲取異步結(jié)果是一個(gè)問題。此外震蒋,異步編程通常涉及到多線程的并發(fā)情況茸塞,線程安全方面需要做保證,這無疑增加了編程的復(fù)雜度查剖。
4 總結(jié)
本文介紹了幾種常見的異步編程模型钾虐,通過簡要的代碼實(shí)現(xiàn)了其主要原理特性。并對異步編程模型的優(yōu)缺點(diǎn)進(jìn)行了簡單概括笋庄。
- Callback機(jī)制:Callback的實(shí)現(xiàn)相對簡單效扫,但是需要額外注意多線程環(huán)境中的安全性問題。適用于回調(diào)函數(shù)中僅需要完成簡單任務(wù)的情況直砂。
- Future機(jī)制:Future機(jī)制及其衍生的Promise可以實(shí)現(xiàn)在主流程中獲取異步結(jié)果菌仁,對于復(fù)雜的異步任務(wù)有更加良好的可控性,這點(diǎn)優(yōu)于Callback哆键。