如何實現(xiàn)異步調(diào)用
同步調(diào)用是指調(diào)用方會被一直阻塞简珠, 直到調(diào)用方收到結(jié)果外盯。異步是指調(diào)用方不回阻塞。傳統(tǒng)的socket網(wǎng)絡(luò)請求就是一個同步調(diào)用哟绊。那么如何實現(xiàn)一個異步調(diào)用因妙?
首先設(shè)計自己的異步調(diào)用的api
如果只想要進(jìn)行異步調(diào)用痰憎,那只需一個線程池或者一個線程不斷執(zhí)行從main線程添加的任務(wù)即可票髓,可以寫成類似如下代碼:
public class ThreadPoolTest2 {
static List<Event> eventList = Collections.synchronizedList(new ArrayList<>());
interface Event<V> {
V doSomething();
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
new Thread(() -> {
while (true) {
for (int i = 0; i < eventList.size(); i++) {
Event event = eventList.get(i);
Object result = event.doSomething();
eventList.remove(event);
}
}
}, "thread-1").start();
doSomethingAsync(() -> {
System.out.println(Thread.currentThread().getName());
return 1;
});
System.out.println(Thread.currentThread().getName());
}
private static <T> void doSomethingAsync(Event<T> event) {
eventList.add(event);
}
}
如此一來確實可以異步執(zhí)行部分代碼,然而只是這樣處理無法掌握異步任務(wù)的執(zhí)行結(jié)果铣耘,所以需要doSomethingAsync
函數(shù)能有一個返回值來獲得異步任務(wù)的執(zhí)行結(jié)果洽沟。其中java提供Future
接口完美契合,Future
一般用做為異步調(diào)用的返回值蜗细,他的接口設(shè)計如下:
Future
是在juc
下的一個接口裆操,功能是對于具體的Runnable
或者Callable
任務(wù)的執(zhí)行結(jié)果進(jìn)行取消、查詢是否完成炉媒、獲取結(jié)果踪区。最關(guān)鍵的是通過get
方法獲取執(zhí)行結(jié)果,該方法會阻塞直到任務(wù)返回結(jié)果吊骤。
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Future
配合線程池的用法如下:
ExecutorService executor = Executors.newCachedThreadPool();
//一個線程池缎岗,做了一些操作后,返回結(jié)果
Future<Integer> result = executor.submit(() -> {
//do something
return 1;
});
然而線程池功能很豐富導(dǎo)致源碼量也很多且復(fù)雜白粉,如果只想要最簡單的異步功能传泊,并不需要那么多代碼,可以只考慮實現(xiàn)Future的代碼(不考慮性能)鸭巴,類似如下:
public class AsyncDemo{
static List<EventFutureImp> eventList = Collections.synchronizedList(new ArrayList<>());
interface Event<V> {
V doSomething();
}
interface EventFuture<V> extends Event<V>, Future<V> {
}
static class EventFutureImp<V> implements EventFuture<V> {
Event<V> event;
CountDownLatch countDownLatch;
V result;
public EventFutureImp(Event<V> event) {
this.event = event;
countDownLatch = new CountDownLatch(1);
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
throw new UnsupportedOperationException();
}
@Override
public boolean isCancelled() {
throw new UnsupportedOperationException();
}
@Override
public boolean isDone() {
throw new UnsupportedOperationException();
}
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
throw new UnsupportedOperationException();
}
//-----以上方法不提供實現(xiàn)眷细,只實現(xiàn)最簡單的功能-------------
@Override
public V get() throws InterruptedException, ExecutionException {
countDownLatch.await();
return result;
}
@Override
public V doSomething() {
return event.doSomething();
}
public void done(V result) {
this.result = result;
countDownLatch.countDown();
}
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
new Thread(() -> {
while (true) {
for (int i = 0; i < eventList.size(); i++) {
EventFutureImp event = eventList.get(i);
Object result = event.doSomething();
event.done(result);
eventList.remove(event);
}
}
}, "thread-1").start();
Future<Integer> future = doSomethingAsync(() -> {
System.out.println(Thread.currentThread().getName());
return 1;
});
System.out.println(Thread.currentThread().getName());
System.out.println("result :" + future.get());
}
private static <T> Future<T> doSomethingAsync(Event<T> event) {
EventFutureImp<T> eventFuture = new EventFutureImp<>(event);
eventList.add(eventFuture);
return eventFuture;
}
}
邏輯圖如下所示:
這樣設(shè)計的一個關(guān)鍵是利用CountDownLatch的特性使得future.get()在未得到結(jié)果之前是阻塞的,而得到結(jié)果后又馬上釋放鹃祖。
網(wǎng)絡(luò)的異步調(diào)用
以上的異步調(diào)用其實是單進(jìn)程內(nèi)的異步調(diào)用溪椎,如果要實現(xiàn)一個網(wǎng)絡(luò)的異步調(diào)用,那又比之前復(fù)雜了一些。
1. 自定義協(xié)議的異步調(diào)用
因為服務(wù)器的收發(fā)行為是可以自定義的校读,當(dāng)發(fā)送的請求并不是先到達(dá)的回包又或者不是每個請求都有回包奔害,這時候主要的問題在于,異步的發(fā)送一個網(wǎng)絡(luò)請求后地熄,并不知道請求的返回應(yīng)該對應(yīng)哪個請求华临。其實也可以通過自定義網(wǎng)絡(luò)協(xié)議設(shè)計來解決這個問題:
需要在雙方協(xié)議中設(shè)置requestId和responseId,由調(diào)用方(server)指定requestId并且接受方收到請求后把responseId設(shè)置為和requestId一樣的值端考,這樣異步請求的調(diào)用方也能輕松得到與請求對應(yīng)的返回包雅潭。
具體可以這樣做:
- 首先定義一個map:
Map<String, Future> map = new ConcurrentHashMap<>();
- 發(fā)送的的時候put:
EventFutureImp future= new EventFutureImp ();
map.put(requestId, future);
//發(fā)送
channel.writeAndFlush(request);
- 在得到返回的時候remove:
EventFutureImp future = map.remove(responseId);
if (future!= null) {
future .done(response);
}
關(guān)鍵在于:請求的requestId和回包responseId是一樣的,并且不同請求的requestId各自不同却特。
2. 其他網(wǎng)絡(luò)協(xié)議異步調(diào)用
要為現(xiàn)在已有的一些數(shù)據(jù)庫或者服務(wù)器實現(xiàn)一個異步調(diào)用又該如何做呢扶供。其實很多協(xié)議并不會出現(xiàn)發(fā)送的請求并不是先到達(dá)的回包又或者不是每個請求都有回包這種情況,以http服務(wù)器為例裂明,http沒有類似requestId這樣的字段但對同一個http連接http服務(wù)器總是順序返回請求椿浓,那么如果要自己實現(xiàn)一個http異步請求的話,就可以按如下步驟:
- 首先定義一個queue:
static Queue<Future> queue = new ConcurrentLinkedDeque<>();
- 發(fā)送的的時候add:
EventFutureImp future= new EventFutureImp ();
queue.add(future);
//發(fā)送
...send()
- 在得到返回的時候poll:
EventFutureImp future = queue.poll(responseId);
if (future!= null) {
future .done(response);
}
如此闽晦,由于http的順序返回請求特性扳碍,異步請求的結(jié)果也不會發(fā)生錯亂。
總結(jié)
異步的調(diào)用肯定能提升外部調(diào)用的速度仙蛉,有時候能解決性能上的瓶頸笋敞,把資源利用最大化。但也增加了更多的臨時對象以及線程切換的開銷荠瘪,同時也比同步編程模型更復(fù)雜夯巷,難調(diào)試。
參考博客: Java并發(fā)編程:Callable哀墓、Future和FutureTask