一. 概述
在后端開發(fā)中, 會(huì)經(jīng)常與其他系統(tǒng)對(duì)接進(jìn)行數(shù)據(jù)交換, 而在這過(guò)程經(jīng)常會(huì)遇到一個(gè)問(wèn)題就是推送方說(shuō)已經(jīng)把數(shù)據(jù)推送了, 而接收方咬死說(shuō)我沒(méi)收到, 這就有點(diǎn)尷尬了, 一般最后都是只能推送方重推解決, 這就會(huì)引出重推機(jī)制, 本文介紹的就是重推Demo的實(shí)現(xiàn), 僅限學(xué)習(xí)
二. 重推Demo
本Demo分為2個(gè)步驟實(shí)現(xiàn): 1. 記錄重推參數(shù) 2. 定制重推
2.1 結(jié)構(gòu)
2.2 參數(shù)實(shí)體類
RePush,java
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class RePush implements Serializable {
/**
* ID
*/
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
public Long rePushId;
/**
* 接口名稱
*/
public String interfaceName;
/**
* 類名
*/
public String className;
/**
* 方法名稱
*/
public String methodName;
/**
* 回調(diào)方法
*/
public String callbackObj;
/**
* 參數(shù)列表
*/
public String params;
/**
* 推送狀態(tài) 0:未推送 1:已推送
*/
public Integer status;
/**
* 推送次數(shù)
*/
public Integer pushNum;
/**
* 創(chuàng)建時(shí)間
*/
public LocalDateTime createTime;
/**
* 更新時(shí)間
*/
public LocalDateTime updateTime;
}
2.3 工具類
ObjectUtil.java,用于對(duì)象的序列化
/**
* 把對(duì)象轉(zhuǎn)成字符串
*/
public static String objectToString(Object obj) {
// 對(duì)象轉(zhuǎn)字節(jié)數(shù)組
AtomicReference<String> str = new AtomicReference<>();
Optional.ofNullable(obj).ifPresent(o -> {
try {
byte[] bytes = writeObj(o);
str.set(DatatypeConverter.printBase64Binary(bytes));
} catch (Exception e) {
e.printStackTrace();
}
});
return str.get();
}
/**
* 解析字符串為對(duì)象
*/
public static Object stringToObject(String str) {
AtomicReference<Object> obj = new AtomicReference<>();
Optional.ofNullable(str).ifPresent(s -> {
try {
byte[] bytes = DatatypeConverter.parseBase64Binary(str);
obj.set(readObj(bytes));
} catch (Exception e) {
e.printStackTrace();
}
});
return obj.get();
}
/**
把對(duì)象轉(zhuǎn)為字節(jié)數(shù)組
*/
public static byte[] writeObj(Object obj) throws Exception {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ObjectOutputStream outputStream = new ObjectOutputStream(byteArrayOutputStream);
outputStream.writeObject(obj);
outputStream.close();
return byteArrayOutputStream.toByteArray();
}
/**
把字節(jié)數(shù)組轉(zhuǎn)為對(duì)象
*/
public static Object readObj(byte[] bytes) throws Exception {
ObjectInputStream inputStream = null;
try {
inputStream = new ObjectInputStream(new ByteArrayInputStream(bytes));
return inputStream.readObject();
} finally {
inputStream.close();
}
}
2.4 service層邏輯實(shí)現(xiàn)
2.4.1 接口 IRePushService.java
public interface IRePushService {
/**
* 記錄重推參數(shù)
* @param interfaceName 接口名稱
* @param status 推送狀態(tài) 0:未推送 1:已推送
* @param callbackMethod 回調(diào)方法
* @param className 類名 Class.getName()
* @param methodName 方法名
* @param params 方法參數(shù)
*/
void saveReRushLog(String interfaceName,Integer status ,Predicate<Object> callbackMethod,String className, String methodName,Object... params) throws Exception;
/**
* 重推 (后期建立定時(shí)任務(wù),定時(shí)執(zhí)行)
* @param rePushList 為重推的數(shù)據(jù)集合
*/
void rePush() throws Exception;
/**
* 查詢要重推的數(shù)據(jù)
* @return
*/
List<RePush> queryRePushData();
}
2.4.2 實(shí)現(xiàn)類 RePushServiceImpl.java
/**
* 重推實(shí)現(xiàn)類
*
* @author 王培任
* @date 2021/9/9
*/
@Service
@Slf4j
public class RePushServiceImpl implements IRePushService {
@Resource
private RePushMapper rePushMapper;
/**
* 推送狀態(tài)(已推送)
*/
public static final Integer STATUS_SUCCESS = 1;
/**
* 推送狀態(tài)(未推送)
*/
public static final Integer STATUS_FAILURE = 0;
/**
* 分隔符
*/
public static final String DELIMITER = ",";
/**
* 最大重推次數(shù)
*/
public static final Integer MAX_RE_PUSH_NUM = 3;
@Override
public List<RePush> queryRePushData() {
Example example = Example.builder(RePush.class).build();
example.createCriteria().andEqualTo("status",STATUS_FAILURE).orLessThanOrEqualTo("pushNum",MAX_RE_PUSH_NUM);
return rePushMapper.selectByExample(example);
}
/**
* 記錄重推參數(shù)
* @param interfaceName 接口名稱
* @param status 推送狀態(tài) 0:未推送 1:已推送 (默認(rèn)成功)
* @param callbackMethod 回調(diào)方法
* @param className 類名 Class.getName()
* @param methodName 方法名
* @param params 方法參數(shù)
*/
@Override
public void saveReRushLog(String interfaceName,Integer status ,Predicate<Object> callbackMethod,String className, String methodName,Object... params) throws Exception{
// 參數(shù)校驗(yàn)
Assert.notNull(interfaceName,"接口名稱不能為空!");
Assert.notNull(className,"類名不能為空!");
Assert.notNull(methodName,"方法名不能為空!");
status = Optional.ofNullable(status).orElse(STATUS_SUCCESS);
// 校驗(yàn)類名
Class<?> aClass ;
try {
aClass = Class.forName(className);
} catch (Exception e) {
throw new RuntimeException("未找到類名:"+className);
}
// 校驗(yàn)方法名+參數(shù)
try {
if(null != params){
List<Class<?>> classList = Arrays.stream(params).map(Object::getClass).collect(Collectors.toList());
int size = classList.size();
Class<?> [] aClassList = new Class[size];
for(int i=0;i<size;i++){
aClassList[i] = classList.get(i);
}
Method method = aClass.getMethod(methodName, aClassList);
Assert.notNull(method,String.format("方法名[%s]+參數(shù)[%s]:沒(méi)有匹配的方法",methodName, JSON.toJSON(params)));
}else {
Method method = aClass.getMethod(methodName);
Assert.notNull(method,String.format("方法名[%s]:沒(méi)有匹配的方法",methodName));
}
} catch (Exception e) {
throw new RuntimeException(String.format("方法名[]+參數(shù)[]:沒(méi)有匹配的方法",methodName,JSON.toJSON(params)));
}
// 記錄回調(diào)方法
String callbackObj = null != callbackMethod ? ObjectUtil.objectToString(callbackMethod) : null;
// 參數(shù)保存
StringBuilder builder = new StringBuilder();
if(null != params){
int length = params.length;
for(int i = 0;i<length;i++){
builder.append(ObjectUtil.objectToString(params[i]));
if(i != length-1){
builder.append(DELIMITER);
}
}
}
// 記錄重推參數(shù)
LocalDateTime date = LocalDateTime.now();
RePush rePush = RePush.builder()
.interfaceName(interfaceName)
.className(className)
.methodName(methodName)
.callbackObj(callbackObj)
.params(builder.toString())
.status(status)
.createTime(date)
.updateTime(date)
.build();
// 保存重推的參數(shù)
rePushMapper.insert(rePush);
}
/**
* 重推 (后期建立定時(shí)任務(wù),定時(shí)執(zhí)行)
*/
@Override
public void rePush() throws Exception {
// 查詢要重推的數(shù)據(jù)
List<RePush> rePushes = queryRePushData();
if(CollectionUtils.isEmpty(rePushes)) return;
for(RePush rePush : rePushes){
if(Objects.equals(STATUS_SUCCESS,rePush.getStatus())) return;
try {
rePushHandler(rePush);
} catch (Exception e) {
log.error("重推記錄id[{}]異常:{}=={}",rePush.getRePushId(),e.getMessage(),Arrays.toString(e.getStackTrace()),e);
}
}
}
/**
* 執(zhí)行重推
* @param rePush
* @throws ClassNotFoundException
* @throws InstantiationException
* @throws IllegalAccessException
* @throws InvocationTargetException
*/
public void rePushHandler(RePush rePush) throws Exception {
// 待重推的類
Class<?> aClass = Class.forName(rePush.getClassName());
// 重推方法
Method method = Arrays.stream(aClass.getDeclaredMethods())
.filter(methodTemp -> Objects.equals(methodTemp.getName(), rePush.getMethodName()))
.findAny()
.orElseThrow(() -> new RuntimeException("獲取方法失敗"));
method.setAccessible(true);
// 創(chuàng)建實(shí)例
Object instance = aClass.newInstance();
// 方法執(zhí)行結(jié)果
Object result;
// 重推參數(shù)
String params = rePush.getParams();
if(StringUtils.isEmpty(params)){
result = method.invoke(instance);
}else {
// 解析參數(shù)執(zhí)行方法
String[] objStrs = params.split(DELIMITER);
int length = objStrs.length;
Object[] objParams = new Object[length];
for(int i=0;i<length;i++){
objParams[i] = ObjectUtil.stringToObject(objStrs[i]);
}
result = method.invoke(instance,objParams);
}
// 請(qǐng)求回調(diào)方法
String callbackObj = rePush.getCallbackObj();
// 回調(diào)方法執(zhí)行狀態(tài)
boolean resultStatus = true;
if(StringUtils.isNotEmpty(callbackObj)){
// 回調(diào)方法執(zhí)行
Predicate predicate = (Predicate) ObjectUtil.stringToObject(callbackObj);
resultStatus = predicate.test(result);
}
// 更新重推記錄
rePush.setPushNum(rePush.getPushNum()+1); // 重推次數(shù)+1
rePush.setUpdateTime(LocalDateTime.now());
if(resultStatus){
rePush.setStatus(STATUS_SUCCESS);
}
rePushMapper.updateByPrimaryKey(rePush);
}
}
2.5 記錄重推參數(shù)工具類
RePushUtil.java
@Slf4j
public class RePushUtil {
private static IRePushService rePushService;
/**
* 記錄重推參數(shù)
* @param interfaceName 接口名稱 (必傳)
* @param status 推送狀態(tài) 0:未推送 1:已推送 (默認(rèn)成功)
* @param callbackMethod 回調(diào)方法
* @param className 類名 Class.getName() (必傳)
* @param methodName 方法名 (必傳)
* @param params 方法參數(shù)
*/
public static void saveReRushLog(String interfaceName, Integer status , Predicate<Object> callbackMethod, String className, String methodName, Object... params){
CompletableFuture.runAsync(() -> {
IRePushService iRePushService = getIRePushService();
try {
iRePushService.saveReRushLog(interfaceName,status,callbackMethod,className,methodName,params);
} catch (Exception e) {
log.error("記錄重推參數(shù)異常:{}--{}",e.getMessage(), Arrays.toString(e.getStackTrace()));
}
});
}
public static IRePushService getIRePushService(){
if(null == rePushService){
// 從Spring容器中獲取Bean
rePushService = SpringContextUtil.getBean(IRePushService.class);
}
return rePushService;
}
}
三. 使用Demo步驟
創(chuàng)建重推定時(shí)任務(wù)->在對(duì)外接口方法中記錄重推日志
步驟一: 創(chuàng)建重推定時(shí)任務(wù)
建立定時(shí)任務(wù)定時(shí)執(zhí)行重推方法IRePushService. rePush
步驟二:在業(yè)務(wù)中記錄對(duì)外接口方法接口日志
在業(yè)務(wù)中調(diào)方法RePushUtil. saveReRushLog
記錄重推參數(shù)