配置文件
Spring:
kafka:
listener:
#設(shè)置是否批量消費,默認(rèn) single(單條)扒俯,batch(批量)
type: single
# 集群地址
#bootstrap-servers: kafka:XXX
bootstrap-servers: 192.168.XXX.XXX:XXX
# 生產(chǎn)者配置
producer:
# 重試次數(shù)
retries: 3
# 應(yīng)答級別
# acks=0 把消息發(fā)送到kafka就認(rèn)為發(fā)送成功
# acks=1 把消息發(fā)送到kafka leader分區(qū)奶卓,并且寫入磁盤就認(rèn)為發(fā)送成功
# acks=all 把消息發(fā)送到kafka leader分區(qū),并且leader分區(qū)的副本follower對消息進(jìn)行了同步就任務(wù)發(fā)送成功
acks: all
# 批量處理的最大大小 單位 byte
batch-size: 4096
# 發(fā)送延時,當(dāng)生產(chǎn)端積累的消息達(dá)到batch-size或接收到消息linger.ms后,生產(chǎn)者就會將消息提交給kafka
buffer-memory: 33554432
# 客戶端ID
client-id: hello-kafka
# Key 序列化類
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# Value 序列化類
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 消息壓縮:none撼玄、lz4夺姑、gzip、snappy掌猛,默認(rèn)為 none盏浙。
compression-type: gzip
properties:
partitioner:
#指定自定義分區(qū)器###
class: com.rd.gateway.config.MyPartitioner
linger:
# 發(fā)送延時,當(dāng)生產(chǎn)端積累的消息達(dá)到batch-size或接收到消息linger.ms后,生產(chǎn)者就會將消息提交給kafka
ms: 1000
max:
block:
# KafkaProducer.send() 和 partitionsFor() 方法的最長阻塞時間 單位 ms
ms: 6000
# 消費者配置
consumer:
# 默認(rèn)消費者組
group-id: GatewayGroup
# 自動提交 offset 默認(rèn) true
enable-auto-commit: false
# 自動提交的頻率 單位 ms
auto-commit-interval: 1000
# 批量消費最大數(shù)量
max-poll-records: 100
# Key 反序列化類
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# Value 反序列化類
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 當(dāng)kafka中沒有初始o(jì)ffset或offset超出范圍時將自動重置offset
# earliest:重置為分區(qū)中最小的offset
# latest:重置為分區(qū)中最新的offset(消費分區(qū)中新產(chǎn)生的數(shù)據(jù))
# none:只要有一個分區(qū)不存在已提交的offset,就拋出異常
auto-offset-reset: latest
# properties:
# interceptor:
# classes: com.example.demo.service.MyConsumerInterceptor
# session:
# timeout:
# # session超時,超過這個時間consumer沒有發(fā)送心跳,就會觸發(fā)rebalance操作
# ms: 120000
# request:
# timeout:
# # 請求超時
# ms: 120000
# 指定logback配置文件,因為查找優(yōu)先級問題废膘,最好手動配置上辣往,避免其他依賴導(dǎo)致未使用到自定義的logback文件
logging:
config: classpath:logback.xml
logback.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<!-- 日志文件路徑 -->
<property name="logPath" value="C://Users//wangw//Desktop//aliyun-tts//"/>
<!-- 日志文件名稱 -->
<property name="logName" value="sp-ipage-test"/>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level --- [%thread] %logger Line:%-3L - %msg%n</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<!-- debug 日志文件 -->
<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- 正在記錄的日志文檔的路徑及文檔名 -->
<file>${logPath}${logName}.log</file>
<!--日志文檔輸出格式-->
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level --- [%thread] %logger Line:%-3L - %msg%n</pattern>
<charset>UTF-8</charset>
</encoder>
<!-- 日志記錄器的滾動策略,按日期殖卑,按大小記錄 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- 日志歸檔 -->
<fileNamePattern>${logPath}${logName}-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>100MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
<!--日志文檔保留天數(shù)-->
<maxHistory>10</maxHistory>
</rollingPolicy>
</appender>
<appender name="KAFKA_APPENDER" class="com.github.danielwegener.logback.kafka.KafkaAppender">
<encoder class="com.github.danielwegener.logback.kafka.encoding.PatternLayoutKafkaMessageEncoder">
<layout class="net.logstash.logback.layout.LogstashLayout">
<!--開啟的話會包含hostname等logback的context信息-->
<includeContext>true</includeContext>
<!--是否包含日志來源-->
<includeCallerData>true</includeCallerData>
<fieldNames class="net.logstash.logback.fieldnames.ShortenedFieldNames"/>
</layout>
<charset>UTF-8</charset>
</encoder>
<!--kafka topic 需要與配置文件里面的topic一致 否則kafka不認(rèn)識-->
<topic>gatewayLogs</topic>
<!--主鍵分區(qū)策略-->
<keyingStrategy class="com.github.danielwegener.logback.kafka.keying.RoundRobinKeyingStrategy"/>
<!--kafka消息提交策略,logback-kafka-appender為我們提供了兩種策略,
異步提交策略(AsynchronousDeliveryStrategy)
阻塞提交策略(BlockingDeliveryStrategy)
-->
<deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy"/>
<!--bootstrap.servers 為kafka 部署地址,服務(wù)端需要使用對應(yīng)的IP地址,不能使用localhost -->
<producerConfig>bootstrap.servers=192.168.XXX.XXX:XXX</producerConfig>
</appender>
<appender name="kafkaAppenderAsync" class="ch.qos.logback.classic.AsyncAppender">
<appender-ref ref="KAFKA_APPENDER"/>
</appender>
<!--記錄行為日志到 kafka-->
<logger name="KafkaPipeline" level="INFO">
<appender-ref ref="kafkaAppenderAsync"/>
</logger>
<!-- 開發(fā)坊萝、測試環(huán)境孵稽,額外指定不同包下不同的日志等級 -->
<springProfile name="dev,test">
<logger name="org.springframework.web" level="ERROR">
</logger>
<logger name="org.springboot.sample" level="ERROR">
</logger>
<logger name="com.ipage.work" level="INFO">
</logger>
</springProfile>
<!-- 生產(chǎn)環(huán)境 -->
<springProfile name="prod">
<logger name="org.springframework.web" level="ERROR">
</logger>
<logger name="org.springboot.sample" level="ERROR">
</logger>
<logger name="com.ipage.work" level="INFO">
</logger>
</springProfile>
<logger name="org.springframework.web" level="INFO"/>
<logger name="org.springboot.sample" level="TRACE"/>
<!-- 基礎(chǔ)日志等級 -->
<root level="INFO">
<appender-ref ref="FILE"/>
<appender-ref ref="CONSOLE"/>
<appender-ref ref="kafkaAppenderAsync"/>
</root>
</configuration>
常用變量
//日志前綴
String GATEWAY_LOG_CS = "gatewayLog參數(shù)#:";
實體類
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
@Data
public class LogUserInfo {
@ApiModelProperty("用戶id")
private Long userId;
@ApiModelProperty("請求類型:request/reponse")
private String type;
@ApiModelProperty("請求類id")
private String requestId;
@ApiModelProperty("請求類型:get/post")
private String method;
@ApiModelProperty("請求路徑")
private String url;
@ApiModelProperty("傳輸數(shù)據(jù)體")
private String content;
}
HttpRequestFilter
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.rd.common.constant.CommonConstant;
import com.rd.common.constant.Constant;
import com.rd.common.constant.TokenConstants;
import com.rd.common.model.LogUserInfo;
import com.rd.common.model.ResultInfoEnum;
import com.rd.common.utils.JwtUtils;
import io.jsonwebtoken.Claims;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.util.MultiValueMap;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
/**
* @author
* @date 2023/2/3 - 10:54
* @描述 請求參數(shù)日志打印
*/
@Component
@Slf4j
@AllArgsConstructor
public class HttpRequestFilter implements GlobalFilter, Ordered {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
String token = request.getHeaders().getFirst(TokenConstants.AUTHENTICATION);
//裁剪前綴 token是否合法
if (StringUtils.isNotEmpty(token) && token.startsWith(TokenConstants.PREFIX)) {
token = token.replaceFirst(TokenConstants.PREFIX, StringUtils.EMPTY);
}
if(StringUtils.isBlank(token)){
return unauthorizedResponse(exchange, "令牌不能為空");
}
//解析token
Claims claims = JwtUtils.parseToken(token);
if (claims == null) {
return unauthorizedResponse(exchange, "令牌已過期或驗證不正確!");
}
//用戶校驗空
String userid = JwtUtils.getUserId(claims);
String method = request.getMethodValue();
String contentType = request.getHeaders().getFirst("Content-Type");
String url = JSONObject.toJSONString(request.getURI());
log.info("網(wǎng)關(guān)請求路徑: "+url);// 請求路徑打印
if ("POST".equals(method)) {
return DataBufferUtils.join(exchange.getRequest().getBody())
.flatMap(dataBuffer -> {
byte[] bytes = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(bytes);
try {
String bodyString = new String(bytes, "utf-8");
//log.info("請求參數(shù): "+bodyString);//打印請求參數(shù)
LogUserInfo logUserInfo = new LogUserInfo();
logUserInfo.setUserId(Long.valueOf(userid));
logUserInfo.setUrl(url);
logUserInfo.setType("request");
logUserInfo.setRequestId(request.getId());
logUserInfo.setMethod("post");
logUserInfo.setContent(bodyString);
log.info(CommonConstant.GATEWAY_LOG_CS+ JSON.toJSONString(logUserInfo));
exchange.getAttributes().put("POST_BODY", bodyString);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
DataBufferUtils.release(dataBuffer);
Flux<DataBuffer> cachedFlux = Flux.defer(() -> {
DataBuffer buffer = exchange.getResponse().bufferFactory()
.wrap(bytes);
return Mono.just(buffer);
});
ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(
exchange.getRequest()) {
@Override
public Flux<DataBuffer> getBody() {
return cachedFlux;
}
};
return chain.filter(exchange.mutate().request(mutatedRequest)
.build());
});
}else if ("GET".equals(method)) {
MultiValueMap<String, String> queryParams = request.getQueryParams();
LogUserInfo logUserInfo = new LogUserInfo();
logUserInfo.setUserId(Long.valueOf(userid));
logUserInfo.setUrl(url);
logUserInfo.setType("request");
logUserInfo.setRequestId(request.getId());
logUserInfo.setMethod("get");
logUserInfo.setContent(JSON.toJSONString(queryParams));
log.info(CommonConstant.GATEWAY_LOG_CS+JSONObject.toJSONString(logUserInfo));
//log.info("請求參數(shù):" + queryParams);
log.info("****************************************************************************\n");
return chain.filter(exchange);
}
//get請求無效十偶,拿不到參數(shù)
log.info("****************************************************************************\n");
return chain.filter(exchange);
}
@Override
public int getOrder() {
return -200;
}
private Mono<Void> unauthorizedResponse(ServerWebExchange exchange, String msg) {
log.error("[鑒權(quán)異常處理]請求路徑:{}", exchange.getRequest().getPath());
return webFluxResponseWriter(exchange.getResponse(), msg, HttpStatus.UNAUTHORIZED);
}
public static Mono<Void> webFluxResponseWriter(ServerHttpResponse response, String contentType, HttpStatus status) {
return response.writeWith(Mono.just(getDataBuffer(response, contentType)));
}
public static DataBuffer getDataBuffer(ServerHttpResponse response, String value) {
JSONObject json = new JSONObject();
json.put("code", ResultInfoEnum.USER_INFO_ERROR.getCode());
json.put("msg", value);
byte[] bytes = json.toJSONString().getBytes(StandardCharsets.UTF_8);
return response.bufferFactory().wrap(bytes);
}
}
WrapperResponseGlobalFilter
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.nacos.shaded.com.google.common.base.Joiner;
import com.alibaba.nacos.shaded.com.google.common.base.Throwables;
import com.alibaba.nacos.shaded.com.google.common.collect.Lists;
import com.rd.common.constant.CommonConstant;
import com.rd.common.constant.TokenConstants;
import com.rd.common.model.LogUserInfo;
import com.rd.common.model.ResultInfoEnum;
import com.rd.common.utils.JwtUtils;
import com.rd.common.utils.StringUtils;
import io.jsonwebtoken.Claims;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.List;
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR;
/**
* @author
* @date 2023/2/3 - 10:54
* @描述 返回參數(shù)日志打印
*/
@Slf4j
@Component
public class WrapperResponseGlobalFilter implements GlobalFilter, Ordered {
@Override
public int getOrder() {
// -1 is response write filter, must be called before that
return -2;
}
private static Joiner joiner = Joiner.on("");
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
ServerHttpResponse originalResponse = exchange.getResponse();
DataBufferFactory bufferFactory = originalResponse.bufferFactory();
String token = request.getHeaders().getFirst(TokenConstants.AUTHENTICATION);
String url = JSONObject.toJSONString(request.getURI());
//裁剪前綴 token是否合法
if (org.apache.commons.lang3.StringUtils.isNotEmpty(token) && token.startsWith(TokenConstants.PREFIX)) {
token = token.replaceFirst(TokenConstants.PREFIX, org.apache.commons.lang3.StringUtils.EMPTY);
}
if(org.apache.commons.lang3.StringUtils.isBlank(token)){
return unauthorizedResponse(exchange, "令牌不能為空");
}
//解析token
Claims claims = JwtUtils.parseToken(token);
if (claims == null) {
return unauthorizedResponse(exchange, "令牌已過期或驗證不正確菩鲜!");
}
//用戶校驗空
String userid = JwtUtils.getUserId(claims);
ServerHttpResponseDecorator response = new ServerHttpResponseDecorator(originalResponse) {
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
if (getStatusCode().equals(HttpStatus.OK) && body instanceof Flux) {
// 獲取ContentType,判斷是否返回JSON格式數(shù)據(jù)
String originalResponseContentType = exchange.getAttribute(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR);
if (StringUtils.isNotBlank(originalResponseContentType) && originalResponseContentType.contains("application/json")) {
Flux<? extends DataBuffer> fluxBody = Flux.from(body);
//(返回數(shù)據(jù)內(nèi)如果字符串過大惦积,默認(rèn)會切割)解決返回體分段傳輸
return super.writeWith(fluxBody.buffer().map(dataBuffers -> {
List<String> list = Lists.newArrayList();
dataBuffers.forEach(dataBuffer -> {
try {
byte[] content = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(content);
DataBufferUtils.release(dataBuffer);
list.add(new String(content, "utf-8"));
} catch (Exception e) {
log.info("加載Response字節(jié)流異常接校,失敗原因:{}", Throwables.getStackTraceAsString(e));
}
});
String responseData = joiner.join(list);
//log.info("返回參數(shù)responseData:"+responseData);
//System.out.println("responseData:"+responseData);
LogUserInfo logUserInfo = new LogUserInfo();
logUserInfo.setUserId(Long.valueOf(userid));
logUserInfo.setUrl(url);
logUserInfo.setType("response");
logUserInfo.setRequestId(request.getId());
logUserInfo.setContent(responseData);
String jsonString = JSON.toJSONString(logUserInfo);
log.info(CommonConstant.GATEWAY_LOG_CS+ jsonString);
//jsonString = "返回參數(shù): " + jsonString;
//LogUserInfo info = JSON.parseObject(jsonString.substring(5, jsonString.length()), LogUserInfo.class);
byte[] uppedContent = new String(responseData.getBytes(), Charset.forName("UTF-8")).getBytes();
originalResponse.getHeaders().setContentLength(uppedContent.length);
return bufferFactory.wrap(uppedContent);
}));
}
}
return super.writeWith(body);
}
@Override
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
return writeWith(Flux.from(body).flatMapSequential(p -> p));
}
};
return chain.filter(exchange.mutate().response(response).build());
}
private Mono<Void> unauthorizedResponse(ServerWebExchange exchange, String msg) {
log.error("[鑒權(quán)異常處理]請求路徑:{}", exchange.getRequest().getPath());
return webFluxResponseWriter(exchange.getResponse(), msg, HttpStatus.UNAUTHORIZED);
}
public static Mono<Void> webFluxResponseWriter(ServerHttpResponse response, String contentType, HttpStatus status) {
return response.writeWith(Mono.just(getDataBuffer(response, contentType)));
}
public static DataBuffer getDataBuffer(ServerHttpResponse response, String value) {
JSONObject json = new JSONObject();
json.put("code", ResultInfoEnum.USER_INFO_ERROR.getCode());
json.put("msg", value);
byte[] bytes = json.toJSONString().getBytes(StandardCharsets.UTF_8);
return response.bufferFactory().wrap(bytes);
}
}
MyPartitioner
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
/**
* 自定義分區(qū)器
*
* @author zzkk
* @create 2021/5/26 13:40
**/
public class MyPartitioner implements Partitioner {
/**
* 分區(qū)策略核心方法
* @param topic
* @param key
* @param keyBytes
* @param value
* @param valueBytes
* @param cluster
* @return
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//具體分區(qū)邏輯,這里全部發(fā)送到0號分區(qū)
return 0;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
//————————————————
//
//版權(quán)聲明:本文為博主原創(chuàng)文章狮崩,遵循 CC 4.0 BY-SA 版權(quán)協(xié)議蛛勉,轉(zhuǎn)載請附上原文出處鏈接和本聲明。
//
//原文鏈接:https://blog.csdn.net/qq_39091806/article/details/129956913
依賴
<!-- kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!--logstash 整合logback-->
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>4.11</version>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--logback 整合 kafka-->
<dependency>
<groupId>com.github.danielwegener</groupId>
<artifactId>logback-kafka-appender</artifactId>
<version>0.1.0</version>
</dependency>
<!-- lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!-- test -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.2.11</version>
</dependency>