好久沒有寫技術(shù)文章了吗货,工作上了解到RxJava框架適合我們的某些業(yè)務(wù)場景维雇,所以嘗試著RxJava整合到我們的業(yè)務(wù)代碼肥橙,以解決部分問題栽燕。
很粗糙很狂野的整合進(jìn)來暂氯,沒有細(xì)致的琢磨框架潮模,以觀察者模式實現(xiàn)的框架配合以鏈?zhǔn)秸{(diào)用的代碼風(fēng)格,看起來確實稍微‘高大上’一些痴施。
業(yè)務(wù)場景
當(dāng)下主流的微服務(wù)架構(gòu)體系中擎厢,各個微服務(wù)的業(yè)務(wù)之間相互獨立,比如商品晾剖、訂單锉矢、客戶等等,它們即可獨立提供服務(wù)齿尽,也可與其他服務(wù)協(xié)同沽损,于是便涉及到業(yè)務(wù)聚合,比如前端展示一個商品詳情頁會涉及到商品信息循头,價格绵估,評價等等的數(shù)據(jù)炎疆,如下。
比如用戶進(jìn)入商品詳情頁国裳,可能會發(fā)生什么形入?用戶需要看到商品的詳細(xì)信息,包括價格缝左,屬性等等亿遂,同時還需要看到優(yōu)惠券信息、以及相關(guān)的評價渺杉。(這里是假設(shè)一個請求需要處理這么多事)
// 偽代碼如下
GoodsBean goodsBean = goodsFeignService.queryGoods();
List<CouponBean> couponBeans = couponFeignService.queryCoupon();
List<EvaluationBean> evaluations = evaluationFeignService.queryEvaluation();
// 真實的業(yè)務(wù)場景往往更加復(fù)雜
return join.map(goodsBean, couponBeans, evaluations);
這么寫毫無疑問也是可以把業(yè)務(wù)跑起來蛇数,但是作為一名優(yōu)秀的互聯(lián)網(wǎng)底層搬磚工作者,除了完成必要的工作是越,盡可能希望把代碼寫的優(yōu)雅一些耳舅,就像砌墻的時候砌地更光滑,同時還能稍微提高一些性能倚评?
解決方案
RxJava可以很好地解決該業(yè)務(wù)訴求浦徊,利用異步并發(fā)提升一點性能,將應(yīng)用中的無依賴操作轉(zhuǎn)為異步并發(fā)處理天梧。無依賴(或已完成依賴)操作比如:
- 數(shù)據(jù)庫讀取
- feign遠(yuǎn)程RPC調(diào)用
- 第三方開放平臺接口調(diào)用
- ……
public abstract class AbstractObserver implements Observer<JoinQueryParameter> {
/**
* 日志
*/
public final static Logger logger = LoggerFactory.getLogger(AbstractObserver.class);
private ResultMap resultMap;
private JoinQueryParameter joinQueryParameter;
@Override
public void onSubscribe(Disposable disposable) {}
@Override
public void onNext(JoinQueryParameter joinQueryParameter) {
resultMap = this.queryOriginalData(joinQueryParameter);
update(joinQueryParameter);
}
@Override
public void onError(Throwable throwable) {
logger.error("查詢信息發(fā)生異常", throwable);
}
@Override
public void onComplete() {
this.onCompleteEvent(joinQueryParameter, resultMap);
}
/**
* 查詢初始化信息
* @param JoinQueryParameter
* @return
*/
protected abstract ResultMap queryOriginalData(JoinQueryParameter joinQueryParameter);
/**
* 加載數(shù)據(jù)完成后的事件
*
* @param JoinQueryParameter
* @param resultMap
*/
protected abstract void onCompleteEvent(JoinQueryParameter joinQueryParameter, ResultMap resultMap);
}
public abstract class AbstractObservableStrategy extends AbstractObserver {
/**
*
* @param joinQueryParameter
* @return
*/
public AbstractObservableStrategy create(final JoinQueryParameter joinQueryParameter) {
Observable.create(new ObservableOnSubscribe<JoinQueryParameter>(){
@Override
public void subscribe(ObservableEmitter<JoinQueryParameter> e) throws Exception {
e.onNext(joinQueryParameter);
e.onComplete();
}
}).subscribe(this);
return this;
}
/**
* 查詢初始化依賴數(shù)據(jù)
* @param joinQueryParameter
* @return
*/
protected abstract ResultMap queryOriginalData(JoinQueryParameter joinQueryParameter);
/**
* 加載數(shù)據(jù)完成后的事件
*
* @param joinQueryParameter
* @param resultMap
*/
protected abstract void onCompleteEvent(JoinQueryParameter joinQueryParameter, ResultMap resultMap);
/**
* 增加聚合查詢方法
*
* @param joinQueryParameter
* @param iQueryData
* @return
*/
protected Observable addObservable(JoinQueryParameter joinQueryParameter, final IQueryData iQueryData) {
return Observable.just(joinQueryParameter)
.flatMap(new Function<JoinQueryParameter, ObservableSource<ResultMap>>() {
@Override
public ObservableSource apply(JoinQueryParameter joinQueryParameter) throws Exception {
return Observable.just(iQueryData.queryData(joinQueryParameter));
}
}).subscribeOn(Schedulers.io())
;
}
}
public interface IQueryData {
/**
* 查詢列表
* @param joinQueryParameter
* @return
*/
ResultMap queryData(JoinQueryParameter joinQueryParameter);
}
public class DefaultObservbleStrategy extends AbstractObservableStrategy {
/**
* 日志
*/
public final static Logger logger = LoggerFactory.getLogger(DefaultObservbleStrategy.class);
/**
* 查詢初始化信息
* @param joinQueryParameter
* @return
*/
@Override
protected ResultMap queryOriginalData(JoinQueryParameter joinQueryParameter) {
return new GoodsQueryDataImpl().queryData(joinQueryParameter);
}
/**
* 加載數(shù)據(jù)完成后的事件
*
* @param joinQueryParameter
* @param originalData
*/
@Override
protected void onCompleteEvent(JoinQueryParameter joinQueryParameter, ResultMap originalData) {
ResultMap resultMap = (ResultMap)Observable.zip(
Observable.just(originalData),
addObservable(joinQueryParameter, new QueryCouponImpl()),
addObservable(joinQueryParameter, new QueryEvaluationImpl()),
new Function5<ResultMap, ResultMap, ResultMap, ResultMap, ResultMap, ResultMap>() {
@Override
public ResultMap apply(ResultMap goodsMap, ResultMap couponMap, ResultMap couponEvaluation) throws Exception {
return join.map(goodsMap, couponMap, couponEvaluation);
}
}
).blockingLast();
DefaultObservbleStrategy.this.setResultMap(resultMap);
}
}
以上是實現(xiàn)時寫的大致骨架盔性,由于具體業(yè)務(wù)遠(yuǎn)遠(yuǎn)比此文列舉的需求要復(fù)雜,故而整的略微復(fù)雜了呢岗,但是找到關(guān)鍵的節(jié)點代碼即可纯出。
Observable.zip(
Observable.just(joinQueryParameter)
.flatMap(new Function<JoinQueryParameter, ObservableSource<ResultMap>>() {
@Override
public ObservableSource apply(JoinQueryParameter joinQueryParameter) throws Exception {
return Observable.just(iQueryData.queryData(joinQueryParameter));
}
}).subscribeOn(Schedulers.io()),
Observable.just(joinQueryParameter)
.flatMap(new Function<JoinQueryParameter, ObservableSource<ResultMap>>() {
@Override
public ObservableSource apply(JoinQueryParameter joinQueryParameter) throws Exception {
return Observable.just(iQueryData.queryData(joinQueryParameter));
}
}).subscribeOn(Schedulers.io())敷燎,
new Function<ResultMap, ResultMap, ResultMap>() {
@Override
public ResultMap apply(ResultMap goodsMap, ResultMap couponMap, ResultMap couponEvaluation) throws Exception {
return join.map(goodsMap, couponMap, couponEvaluation);
}
}
).blockingLast();
總而言之,微服務(wù)盛行后箩言,服務(wù)與服務(wù)之間的數(shù)據(jù)聚合愈發(fā)復(fù)雜硬贯,讓人有點頭疼。