前言
在我們擴(kuò)展scg時(shí),獲取requestbody也是一個(gè)挺常見(jiàn)的需求了吞杭,比如記錄日志盏浇,我們要獲取請(qǐng)求體里面的內(nèi)容。在HTTP協(xié)議中芽狗,服務(wù)器接收到客戶端的請(qǐng)求時(shí)绢掰,請(qǐng)求體(RequestBody)通常是以流的形式傳輸?shù)摹_@個(gè)流在設(shè)計(jì)上是只讀且不可重復(fù)讀取的译蒂。即request body只能讀取一次曼月,但我們很多時(shí)候是更希望這個(gè)requestbody可以被多次讀取,那我們今天就來(lái)聊下這個(gè)話題
實(shí)現(xiàn)思路
通常我們會(huì)實(shí)現(xiàn)一個(gè)全局過(guò)濾器柔昼,并將過(guò)濾器的優(yōu)先級(jí)調(diào)到最高哑芹。
該過(guò)濾器調(diào)到最高的原因是防止一些內(nèi)置過(guò)濾器優(yōu)先讀取到requestbody,會(huì)導(dǎo)致我們這個(gè)過(guò)濾器讀取到requestbody捕透,就已經(jīng)報(bào)body只能讀取一次的異常聪姿。
異常如下
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: Only one connection receive subscriber allowed.
Caused by: java.lang.IllegalStateException: Only one connection receive subscriber allowed.
在這個(gè)過(guò)濾器里面我們要實(shí)現(xiàn)的功能如下
- 將原有的request請(qǐng)求中的body內(nèi)容讀出來(lái)
- 使用ServerHttpRequestDecorator這個(gè)請(qǐng)求裝飾器對(duì)request進(jìn)行包裝,重寫getBody方法
- 將包裝后的請(qǐng)求放到過(guò)濾器鏈中傳遞下去
示例
@RequiredArgsConstructor
public class RequestBodyParamsFetchGlobalFilter implements Ordered, GlobalFilter {
private final GwCommonProperty gwCommonProperty;
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
if (isSkipFetchRequestBodyParams(exchange)) {
return chain.filter(exchange);
} else {
return DataBufferUtils.join(exchange.getRequest().getBody())
.flatMap(dataBuffer -> {
DataBufferUtils.retain(dataBuffer);
Flux<DataBuffer> cachedFlux = Flux
.defer(() -> Flux.just(dataBuffer.slice(0, dataBuffer.readableByteCount())));
exchange.getAttributes().put(REQUEST_BODY_PARAMS_ATRR_NAME, RouteUtil.getRequestBodyParams(exchange));
ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(
exchange.getRequest()) {
@Override
public Flux<DataBuffer> getBody() {
return cachedFlux;
}
};
return chain.filter(exchange.mutate().request(mutatedRequest).build());
});
}
}
private boolean isSkipFetchRequestBodyParams(ServerWebExchange exchange){
if(!gwCommonProperty.isFetchRequestBodyParams()){
return true;
}
if(exchange.getRequest().getHeaders().getContentType() == null && !HttpMethod.POST.name().equalsIgnoreCase(Objects.requireNonNull(exchange.getRequest().getMethod()).name())){
return true;
}else{
return false;
}
}
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE;
}
}
大家如果搜索一下乙嘀,scg獲取請(qǐng)求體末购,有很大一部分都是這種寫法。這種寫法基本上是可以滿足我們的需求虎谢。但是在請(qǐng)求壓力比較大的情況下盟榴,可能會(huì)堆外內(nèi)存溢出問(wèn)題
reactor.netty.ReactorNetty$InternalNettyException: io.netty.util.internal.OutOfDirectMemoryError:failed to allocate
有沒(méi)有更好的實(shí)現(xiàn)方式
我這邊使用的springcloud版本是Hoxton.SR3,在這個(gè)版本我發(fā)現(xiàn)了一個(gè)挺好玩的過(guò)濾器
org.springframework.cloud.gateway.filter.AdaptCachedBodyGlobalFilter
見(jiàn)名之意婴噩,這就是一個(gè)自適應(yīng)的緩存body全局過(guò)濾器擎场。這個(gè)過(guò)濾器的代碼如下
public class AdaptCachedBodyGlobalFilter
implements GlobalFilter, Ordered, ApplicationListener<EnableBodyCachingEvent> {
private ConcurrentMap<String, Boolean> routesToCache = new ConcurrentHashMap<>();
/**
* Cached request body key.
*/
@Deprecated
public static final String CACHED_REQUEST_BODY_KEY = CACHED_REQUEST_BODY_ATTR;
@Override
public void onApplicationEvent(EnableBodyCachingEvent event) {
this.routesToCache.putIfAbsent(event.getRouteId(), true);
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// the cached ServerHttpRequest is used when the ServerWebExchange can not be
// mutated, for example, during a predicate where the body is read, but still
// needs to be cached.
ServerHttpRequest cachedRequest = exchange
.getAttributeOrDefault(CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR, null);
if (cachedRequest != null) {
exchange.getAttributes().remove(CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR);
return chain.filter(exchange.mutate().request(cachedRequest).build());
}
//
DataBuffer body = exchange.getAttributeOrDefault(CACHED_REQUEST_BODY_ATTR, null);
Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
if (body != null || !this.routesToCache.containsKey(route.getId())) {
return chain.filter(exchange);
}
return ServerWebExchangeUtils.cacheRequestBody(exchange, (serverHttpRequest) -> {
// don't mutate and build if same request object
if (serverHttpRequest == exchange.getRequest()) {
return chain.filter(exchange);
}
return chain.filter(exchange.mutate().request(serverHttpRequest).build());
});
}
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE + 1000;
}
}
看到這個(gè)源碼,是不是有種豁然開(kāi)朗的感覺(jué)几莽,它的實(shí)現(xiàn)套路不就是我們上文說(shuō)的實(shí)現(xiàn)思路嗎迅办,根據(jù)源碼,我們僅需發(fā)布EnableBodyCachingEvent事件章蚣,并將要監(jiān)聽(tīng)的routeId送入EnableBodyCachingEvent站欺,剩下緩存requestbody的事情,就交給AdaptCachedBodyGlobalFilter來(lái)幫我們處理
示例
**
* @see AdaptCachedBodyGlobalFilter
*/
@Configuration
@AutoConfigureAfter(GatewayAutoConfiguration.class)
@RequiredArgsConstructor
public class RequestBodyCacheConfig implements ApplicationContextAware, CommandLineRunner {
private final RouteLocator routeDefinitionRouteLocator;
private ApplicationContext applicationContext;
@Override
public void run(String... args) throws Exception {
List<Signal<Route>> routes = routeDefinitionRouteLocator.getRoutes().materialize()
.collect(Collectors.toList()).block();
assert routes != null;
routes.forEach(routeSignal -> {
if (routeSignal.get() != null) {
Route route = routeSignal.get();
System.out.println(route.getId());
publishEnableBodyCachingEvent(route.getId());
}
});
}
@EventListener
public void refreshRoutesEvent(RefreshRoutesEvent refreshRoutesEvent){
if(refreshRoutesEvent.getSource() instanceof NewRouteId){
publishEnableBodyCachingEvent(((NewRouteId) refreshRoutesEvent.getSource()).getRouteId());
}else{
routeDefinitionRouteLocator.getRoutes().subscribe(route -> {
publishEnableBodyCachingEvent(route.getId());
});
}
}
private void publishEnableBodyCachingEvent(String routeId){
EnableBodyCachingEvent enableBodyCachingEvent = new EnableBodyCachingEvent(this, routeId);
applicationContext.publishEvent(enableBodyCachingEvent);
}
public void addRouteRouteDefinition(RouteDefinition routeDefinition){
NewRouteId source = NewRouteId.builder().routeId(routeDefinition.getId()).source(this).build();
applicationContext.publishEvent(new RefreshRoutesEvent(source));
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
這個(gè)代碼的意思就是在項(xiàng)目啟動(dòng)時(shí),遍歷一下路由矾策,發(fā)送EnableBodyCachingEvent磷账。并再監(jiān)聽(tīng)RefreshRoutesEvent 事件,當(dāng)有路由新增時(shí)蝴韭,再次發(fā)送EnableBodyCachingEvent事件够颠。其業(yè)務(wù)語(yǔ)義是讓每個(gè)route都能被AdaptCachedBodyGlobalFilter處理熙侍,并緩存requestbody
發(fā)布EnableBodyCachingEvent事件的核心代碼如下
private void publishEnableBodyCachingEvent(String routeId){
EnableBodyCachingEvent enableBodyCachingEvent = new EnableBodyCachingEvent(this, routeId);
applicationContext.publishEvent(enableBodyCachingEvent);
}
做完上述的事情后榄鉴,我們僅需在我們需要獲取requestbody的地方,寫下如下代碼即可
String bodyContent = null;
DataBuffer body = exchange.getAttributeOrDefault(CACHED_REQUEST_BODY_ATTR, null);
if(body != null){
bodyContent = body.toString(StandardCharsets.UTF_8);
}
總結(jié)
框架也是不斷在演進(jìn)蛉抓,因此對(duì)于我們?nèi)粘J褂玫目蚣芮斐荆喽嚓P(guān)注下,有現(xiàn)成的輪子巷送,就使用現(xiàn)成的輪子驶忌,現(xiàn)成輪子滿不足不了,先看下該輪子是否有預(yù)留擴(kuò)展點(diǎn)笑跛,如果沒(méi)有付魔,我們?cè)倏紤]自己制造輪子