問題背景描述
服務(wù)以及上線幾個月,今天在客服查詢用戶的提現(xiàn)信息的時候,發(fā)現(xiàn)有的用戶竟然出現(xiàn)了高達數(shù)千數(shù)萬的提現(xiàn)請求.由于我們的用戶體量并沒有那么大,交易流水按理說也不應(yīng)該有那么多,客服帶著疑問跟我們報出了這個問題.
于是,我便查詢了最近的交易記錄,發(fā)現(xiàn)有幾個人是定向的而且多次的大額交易記錄,然后我便查詢了一下充值記錄,發(fā)現(xiàn)用戶的充值記錄是根本沒有那么多的,也就是說用戶的賬戶余額發(fā)生了異常,那么問題出在哪了呢删顶?
發(fā)現(xiàn)問題
帶著上面的疑問,我仔細的檢查了交易記錄的數(shù)據(jù),發(fā)現(xiàn)在交易記錄中,有針對某個功能同一時間的大量請求,然后第一時間去查詢用戶的訪問日志,發(fā)現(xiàn)存在某一ip同一時間內(nèi)下針對某個接口地址的大量請求.
例如:
102.1.1.X 2016-10-23 16:66:01 /xxxxxxx/v1/money/opt/pee/ Post
102.1.1.X 2016-10-23 16:66:01 /xxxxxxx/v1/money/opt/pee/ Post
102.1.1.X 2016-10-23 16:66:01 /xxxxxxx/v1/money/opt/pee/ Post
102.1.1.X 2016-10-23 16:66:01 /xxxxxxx/v1/money/opt/pee/ Post
102.1.1.X 2016-10-23 16:66:01 /xxxxxxx/v1/money/opt/pee/ Post
102.1.1.X 2016-10-23 16:66:01 /xxxxxxx/v1/money/opt/pee/ Post
102.1.1.X 2016-10-23 16:66:01 /xxxxxxx/v1/money/opt/pee/ Post
由此猜測,是被用戶用程序盜刷了接口.
那么追究其本質(zhì)問題,就算是用戶盜刷了接口,也不應(yīng)該出現(xiàn)余額異常的問題,對應(yīng)著這個接口的代碼順藤摸瓜屢下去.
原代碼是這個樣子的(由于涉及到具體業(yè)務(wù),這里用偽代碼來代替):
// 參數(shù)校驗
if(StringUtils.isEmty(xx)){
throw new Exception(Error.param_error);
}
// 用戶余額校驗
1.余額是否大于0
2.余額是否充足
// 檢測是否查看過
boolean b = seeLogService.hasSee();
if(b) throw new Exception(Error.has_see);
// 更新查看日志
seeLogService.update(xx);
// 增加收入者用戶余額并記錄日志
OrdersService.addUserGoldNum
// 減少消費者用戶余額并記錄日志
OrdersService.subtractUserGoldNum
讓我們仔細看一看上面的代碼,會有什么問題鱼蝉?
這里我們先帶著我們想到的問題之處,去測試一下,首先我用CyclicBarrier
做了一個并發(fā)的請求工具,工具類的代碼如下:
import com.alibaba.fastjson.JSON;
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Executors+CyclicBarrier實現(xiàn)的并發(fā)測試小例子<br>
* 例子實現(xiàn)了并發(fā)測試中使用的集合點煌贴,集合點超時時間及思考時間等技術(shù)
*
* @author 王光東
* @version 1.0
* @date 2016年06月27日16:43:49
*/
public class FlushGeneratorPost {
// 主線程停止的標志
private static volatile boolean runFlag = true;
// 欄桿
private CyclicBarrier cyclicBarrier;
// 線程數(shù)
private static int threads;
// 總數(shù)
private static AtomicInteger totalCount = new AtomicInteger();
// 已經(jīng)開啟的線程數(shù)
private static AtomicInteger startedCount = new AtomicInteger();
// 已經(jīng)完成任務(wù)的線程數(shù)
private static AtomicInteger finishCount = new AtomicInteger();
// 正在進行的線程數(shù)
private static AtomicInteger runCount = new AtomicInteger();
// 成功的線程數(shù)
private static AtomicInteger successCount = new AtomicInteger();
// 失敗的線程數(shù)
private static AtomicInteger failCount = new AtomicInteger();
// 請求的url地址
private String url;
// 集合點超時時間
private long rendzvousWaitTime = 0;
// 思考時間
private long thinkTime = 0;
// 次數(shù)
private static int iteration = 0;
/**
* 初始值設(shè)置
*
* @param url 被測url
* @param threads 總線程數(shù)
* @param iteration 每個線程迭代次數(shù)
* @param rendzvousWaitTime 集合點超時時間,如果不啟用超時時間说榆,請將此值設(shè)置為0.<br>
* 如果不啟用集合點虚吟,請將此值設(shè)置為-1<br>
* 如果不啟用超時時間,則等待所有的線程全部到達后签财,才會繼續(xù)往下執(zhí)行<br>
* @param thinkTime 思考時間串慰,如果啟用思考時間,請將此值設(shè)置為0
*/
public FlushGeneratorPost(String url, int threads, int iteration, long rendzvousWaitTime,
long thinkTime) {
totalCount.getAndSet(threads);
FlushGeneratorPost.threads = threads;
this.url = url;
this.iteration = iteration;
this.rendzvousWaitTime = rendzvousWaitTime;
this.thinkTime = thinkTime;
}
// 過得線程數(shù)的信息
public static ThreadCount getThreadCount() {
return new ThreadCount(threads, runCount.get(), startedCount.get(), finishCount.get(),
successCount.get(), failCount.get());
}
// 判斷線程是否應(yīng)該停止
public static boolean isRun() {
return finishCount.get() != threads;
}
// 優(yōu)雅的停止線程
public synchronized static void stop() {
runFlag = false;
}
// 執(zhí)行任務(wù)
public void runTask() {
List<Future<String>> resultList = new ArrayList<Future<String>>();
// 線程池構(gòu)造
ExecutorService exeService = Executors.newFixedThreadPool(threads);
cyclicBarrier = new CyclicBarrier(threads);//默認加載全部線程
for (int i = 0; i < threads; i++) {
resultList.add(
exeService.submit(new TaskThread(i, url, iteration, rendzvousWaitTime, thinkTime)));
}
exeService.shutdown();
for (int j = 0; j < resultList.size(); j++) {
try {
System.out.println(resultList.get(j).get());
} catch (Exception e) {
e.printStackTrace();
}
}
stop();
}
/**
* 不同狀態(tài)的線程數(shù)構(gòu)造類
*/
static class ThreadCount {
public final int runThreads;
public final int startedThreads;
public final int finishedThreads;
public final int totalThreads;
public final int successCount;
public final int failCount;
public ThreadCount(int totalThreads, int runThreads, int startedThreads,
int finishedThreads, int successCount, int failCount) {
this.totalThreads = totalThreads;
this.runThreads = runThreads;
this.startedThreads = startedThreads;
this.finishedThreads = finishedThreads;
this.successCount = successCount;
this.failCount = failCount;
}
}
/**
* 實際的業(yè)務(wù)線程類
*/
private class TaskThread implements Callable<String> {
private String url;
private long rendzvousWaitTime = 0;
private long thinkTime = 0;
private int iteration = 0;
private int iterCount = 0;
private int taskId;
/**
* 任務(wù)執(zhí)行者屬性設(shè)置
*
* @param taskId 任務(wù)id號
* @param url 被測url
* @param iteration 迭代次數(shù)唱蒸,如果一直執(zhí)行則需將此值設(shè)置為0
* @param rendzvousWaitTime 集合點超時時間邦鲫,如果不需要設(shè)置時間,則將此值設(shè)置為0神汹。如果不需要設(shè)置集合點庆捺,則將此值設(shè)置為-1
* @param thinkTime 思考時間,如果不需要設(shè)置思考時間屁魏,則將此值設(shè)置為0
*/
public TaskThread(int taskId, String url, int iteration, long rendzvousWaitTime,
long thinkTime) {
this.taskId = taskId;
this.url = url;
this.rendzvousWaitTime = rendzvousWaitTime;
this.thinkTime = thinkTime;
this.iteration = iteration;
}
@Override
public String call() throws Exception {
startedCount.getAndIncrement();
runCount.getAndIncrement();
while (runFlag && iterCount < iteration) {
if (iteration != 0)
iterCount++;
try {
if (rendzvousWaitTime > 0) {
try {
System.out.println("任務(wù):task-" + taskId + " 已到達集合點...等待其他線程,集合點等待超時時間為:"
+ rendzvousWaitTime);
cyclicBarrier.await(rendzvousWaitTime, TimeUnit.MICROSECONDS);
} catch (InterruptedException e) {
} catch (BrokenBarrierException e) {
System.out.println(
"task-" + taskId + " 等待時間已超過集合點超時時間:" + rendzvousWaitTime
+ " ms,將開始執(zhí)行任務(wù)....");
} catch (TimeoutException e) {
}
} else if (rendzvousWaitTime == 0) {
try {
System.out.println("任務(wù):task-" + taskId + " 已到達集合點...等待其他線程");
cyclicBarrier.await();
} catch (InterruptedException e) {
} catch (BrokenBarrierException e) {
}
}
// 發(fā)送請求返回結(jié)果
Bean result = readContent(url);
System.out.println(
"線程:task-" + taskId + " 獲取到的資源大刑弦浴:" + result.getResult().length() + ",狀態(tài)碼:"
+ result.getState());
// 增加成功的值
successCount.getAndIncrement();
// 判斷是否需要思考
if (thinkTime != 0) {
System.out.println("task-" + taskId + " 距下次啟動時間:" + thinkTime);
Thread.sleep(thinkTime);
}
} catch (Exception e) {
failCount.getAndIncrement();
}
}
// 增加完成次數(shù)
finishCount.getAndIncrement();
// 減少運行的線程數(shù)量
runCount.decrementAndGet();
return Thread.currentThread().getName() + " 執(zhí)行完成!";
}
}
public static void main(String[] args) {
final long startTime = System.currentTimeMillis();
String baseUri = "http://localhost:8080/xxx/xx/xx/xx/xx";
new Thread() {
public void run() {
new FlushGeneratorPost(
baseUri, 20, 1,
0, 0).runTask(); //開啟20個線程一次同時去請求這個接口
}
}.start();
new Thread() {
public void run() {
while (isRun()) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("isRun:" + FlushGeneratorPost.isRun());
System.out
.println("totalThreads:" + FlushGeneratorPost.getThreadCount().totalThreads);
System.out.println(
"startedThreads:" + FlushGeneratorPost.getThreadCount().startedThreads);
System.out.println("runThreads:" + FlushGeneratorPost.getThreadCount().runThreads);
System.out.println(
"finishedThread:" + FlushGeneratorPost.getThreadCount().finishedThreads);
System.out
.println("successCount:" + FlushGeneratorPost.getThreadCount().successCount);
System.out.println("failCount:" + FlushGeneratorPost.getThreadCount().failCount);
System.out.println();
}
System.out.println("\n\n 執(zhí)行" + threads * iteration + "次請求一共花費了"
+ (System.currentTimeMillis() - startTime) / 1000 + "秒");
}
}.start();
}
/**
* httpUrlConnection的get請求
*
* @param uri
* @return
* @throws IOException
*/
private static Bean readContent(String uri) throws IOException {
String body = "xxx=111&xxx22=1&xxx33=2";
URL postUrl = new URL(uri);
// 打開連接
HttpURLConnection connection = (HttpURLConnection) postUrl.openConnection();
// 設(shè)置是否向connection輸出蚁堤,因為這個是post請求醉者,參數(shù)要放在
// http正文內(nèi),因此需要設(shè)為true
connection.setDoOutput(true);
// Read from the connection. Default is true.
connection.setDoInput(true);
// 默認是 GET方式
connection.setRequestMethod("POST");
connection.setConnectTimeout(3 * 1000);
// Post 請求不能使用緩存
connection.setUseCaches(false);
connection.setInstanceFollowRedirects(true);
// 配置本次連接的Content-type披诗,配置為application/x-www-form-urlencoded的
// 意思是正文是urlencoded編碼過的form參數(shù),下面我們可以看到我們對正文內(nèi)容使用URLEncoder.encode
// 進行編碼
connection.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
connection.setRequestProperty("Accept", "application/json;charset=UTF-8");
String Authorization = "xxxxxssss123sdffasdf"; connection.setRequestProperty("Authorization", Authorization);
// 連接立磁,從postUrl.openConnection()至此的配置必須要在connect之前完成呈队,
// 要注意的是connection.getOutputStream會隱含的進行connect。
connection.connect();
DataOutputStream out = new DataOutputStream(connection.getOutputStream());
// The URL-encoded contend
// 正文唱歧,正文內(nèi)容其實跟get的URL中 '? '后的參數(shù)字符串一致
// content = "count=" + URLEncoder.encode(String.valueOf(1), "UTF-8");
// content +="&amount="+URLEncoder.encode(String.valueOf(10), "UTF-8");
// DataOutputStream.writeBytes將字符串中的16位的unicode字符以8位的字符形式寫到流里面
out.writeBytes(body);
out.flush();
out.close();
BufferedReader reader = new BufferedReader(new InputStreamReader(connection.getInputStream()));
String line;
StringBuilder sb = new StringBuilder();
while ((line = reader.readLine()) != null) {
sb.append(line);
}
int state = connection.getResponseCode();
reader.close();
connection.disconnect();
return new Bean(state, sb.toString());
}
/**
* 結(jié)果集實體類
*/
public static class Bean {
private int state;
private String result;
public Bean(int state, String result) {
this.state = state;
this.result = result;
}
public int getState() {
return state;
}
public void setState(int state) {
this.state = state;
}
public String getResult() {
return result;
}
public void setResult(String result) {
this.result = result;
}
}
}
然后利用我的這個工具去還原請求我們被盜刷的接口.我們?nèi)ヒ稽c點的驗證我們猜想的問題所在,首先,我用了一個賬戶余額充足的賬戶去測試,發(fā)現(xiàn)沒什么問題.
這個地方為什么沒有問題呢?我在加錢的服務(wù)和減少金錢的服務(wù)中利用了數(shù)據(jù)庫的悲觀鎖保證了用戶余額的安全(這里我們先不吐槽悲觀鎖的問題哈).
然后我又換了一個余額少一點的賬戶進行了重試,發(fā)現(xiàn)這次的就出問題了,加錢的用戶正確加上了20*price的金額,而扣錢的用戶賬戶為0,也就是說,假如我只有20快,但是我想花100快給a,這時候我余額變成0了(也就是花了20快),然后a的余額增加了100快,那等于系統(tǒng)因為bug原因陪了80快.
這時候我們再回顧一下之前我們猜測的問題
- 首先我的最外層的金額檢測,沒有事務(wù)和鎖的保證,所以在并發(fā)的時候,這一層的檢測就變成了無效的檢測.
- 檢測是否查看過由于同事的疏忽,沒有做并發(fā)時候的唯一性校驗,即如果用戶看過一次就不能再繼續(xù)再看了,所以這一層的檢測在并發(fā)的時候也會出問題.
- 由于并發(fā)請求使得1和2的校驗都不成功的時候這時候就來到了第三步,這時候我們是要進行真正的金額變動和變動日志的記錄了,這個時候我們看一下具體的代碼實現(xiàn).
加錢服務(wù)的代碼
* 用戶加錢的api
*
* @param userId 用戶id
* @param amount 鉆石金額
* @param orderId 訂單id
* @param orderType 訂單類型
* @param moneyLogOrderChangeType 加錢的理由
* @param relationUserId 這個字段意味這次加錢是跟誰有關(guān)的
* @return
*/
public int addUserGoldNum(String userId, int amount, String orderId, OrderType orderType,
MoneyLogOrderChangeType moneyLogOrderChangeType, String relationUserId) {
if (userId == null)
throw new BusinessException(ErrorCode.NOT_FIND_USER_ACCOUNT);
int currentAmount = userInfoDAO.queryGoldNumByUserId(userId);
if (amount <= 0)
return currentAmount;
int newAmount = currentAmount + amount;
LOG.info(
"<Important!> add BEGIN!! [userId]= " + userId + " [current amount]= " + currentAmount
+ " [new ammount]= " + newAmount);
userInfoDAO.addGoldNumByUserId(userId, amount);
// 獲得用戶在內(nèi)存中信息
UserInfo userInfo = UserCache.getInstance().loadUserInfo(userId);
if (userInfo != null) {
userInfo.setGoldNum(newAmount);
UserCache.getInstance().updateUserInfo(userInfo);
} else
throw new BusinessException(ErrorCode.NOT_FIND_USER);
LOG.info("<Important!> add COMPLETE!! [userId]= " + userId + " [current amount]= "
+ currentAmount + " [new ammount]= " + newAmount);
userService.updateUserInfo(userInfo);
//更新data日志
DailyUserDataLogCache.incrDiamonds(userId, amount);
//記錄到money log
MoneyLog moneyLog = new MoneyLog();
moneyLog.setUserId(userId);
moneyLog.setAmountType(MoneyLogAmountType.ADD.getType());
moneyLog.setChangeAmount(amount);
moneyLog.setChangeType(moneyLogOrderChangeType.getType());
moneyLog.setChangeReason(moneyLogOrderChangeType.getReason());
moneyLog.setOrderId(orderId);
moneyLog.setRelationId(relationUserId);
moneyLog.setType(orderType.getType());
moneyLogService.save(moneyLog);
return newAmount;
}
減錢服務(wù)的代碼
public int subtractUserGoldNum(String userId, int amount, String orderId, OrderType orderType,
MoneyLogOrderChangeType moneyLogOrderChangeType, String relationUserId) {
if (userId == null)
throw new BusinessException(ErrorCode.NOT_FIND_USER_ACCOUNT);
int currentAmount = userInfoDAO.queryGoldNumByUserId(userId);
int newAmount = currentAmount - amount;
if (newAmount < 0)
throw new BusinessException(ErrorCode.INSUFFICIENT_MONEY);
LOG.info("<Important!> subtract BEGIN!! [userId]= " + userId + " [current amount]= "
+ currentAmount + " [new ammount]= " + newAmount);
userInfoDAO.subtractGoldNumByUserId(userId, amount);
// 獲得用戶在內(nèi)存中信息
UserInfo userInfo = UserCache.getInstance().loadUserInfo(userId);
if (userInfo != null) {
userInfo.setGoldNum(newAmount);
UserCache.getInstance().updateUserInfo(userInfo);
} else
throw new BusinessException(ErrorCode.NOT_FIND_USER);
LOG.info("<Important!> subtract COMPLETE!! [userId]= " + userId + " [current amount]= "
+ currentAmount + " [new ammount]= " + newAmount);
//記錄到money log
MoneyLog moneyLog = new MoneyLog();
moneyLog.setUserId(userId);
moneyLog.setAmountType(MoneyLogAmountType.SUBTRACT.getType());
moneyLog.setChangeAmount(amount);
moneyLog.setChangeType(moneyLogOrderChangeType.getType());
moneyLog.setChangeReason(moneyLogOrderChangeType.getReason());
moneyLog.setOrderId(orderId);
moneyLog.setRelationId(relationUserId);
moneyLog.setType(orderType.getType());
moneyLogService.save(moneyLog);
return newAmount;
}
我們可以看到,按照常理來說,我們的扣錢服務(wù)中有了對余額的檢測,如果余額不夠會拋出業(yè)務(wù)異常,讓數(shù)據(jù)回滾,那么案例來說我們的程序應(yīng)該是沒問題的啊?但是仔細查詢消費日志表的時候會發(fā)現(xiàn),正常我加錢和扣錢都會記錄一條記錄,也就是我加錢和扣錢的記錄數(shù)量是相等的,但是在我們并發(fā)請求了之后,我的數(shù)據(jù)庫中的記錄數(shù)是不對等的,我的扣錢記錄比加錢記錄少,這是為什么呢钠绍?
其實我們之前已經(jīng)說過,無論是加錢服務(wù)還是減錢服務(wù)都是有悲觀鎖來保證的,那么這個悲觀鎖是怎么回事呢,其實就是一個for update語句,在事務(wù)提交了之后會自動釋放鎖,但是由于我們的項目是一個編程式事務(wù),而這個服務(wù)的加錢和扣錢直接在view層調(diào)用了,所以這時候這兩個服務(wù)是兩個事物,所以即使扣錢服務(wù)發(fā)生了異常,那么我們之前的錢已經(jīng)給收入者加過了,這時候是無法回滾的.
然后再看一下我們的業(yè)務(wù),重新整理思考一下邏輯,在并發(fā)請求的調(diào)用中,給用戶a加錢服務(wù)調(diào)用完之后,我們的需要調(diào)用扣錢服務(wù)給b扣費,這時候我們發(fā)現(xiàn)用戶余額不足了,而拋出異常,但是給a加的錢并沒有還原回去,然后b的余額也只是0而已.
解決問題
其實我們在上面的原子服務(wù)中已經(jīng)做了很多的檢測,然而因為疏忽的問題造成了現(xiàn)在的問題,要解決這個問題有幾種辦法也都很簡單.
我們先去除view層的余額校驗(這里去除他是因為沒啥用,屬于一個優(yōu)化代碼)
- 在檢測用戶是否查看過的地方增加鎖和唯一性校驗,保證用戶只偷看一次(這種方法其實治標不治本,也只是針對于這個接口能保證沒有問題)
- 把扣錢服務(wù)在加錢服務(wù)之前調(diào)用,這樣扣錢服務(wù)發(fā)送異常的時候,就會熔斷,不會繼續(xù)走加錢服務(wù)(這種方式代碼改動量最小,不過保不準哪個同事繼續(xù)會出現(xiàn)這樣的疏忽).
- 抽象出一層組合的服務(wù)層,吧扣錢和加錢放在一個事務(wù)之中,如果有其他的業(yè)務(wù)就繼續(xù)組合,讓業(yè)務(wù)處于同一個事務(wù)下,既可以保證數(shù)據(jù)安全,又能保證業(yè)務(wù)正確.還可以規(guī)范化整個開發(fā)中的代碼調(diào)用(這種方式也是我比較推薦的,而且在日后做服務(wù)化和架構(gòu)梳理的時候也會比較方便,而且來了新人也不容易出問題)
總結(jié)問題
我們上面已經(jīng)把問題找到并解決了問題,不過已經(jīng)異常的數(shù)據(jù)還是讓本寶寶在10.24程序員節(jié)日的時候忙活了很久很久,可坑壞本寶寶了,于是乎樓主便整理了這個大事件中所暴漏出來的問題.
- 接口沒做簽名和加密(sign,base64沒做,客戶端未做混淆,用戶可以輕易的請求到服務(wù)接口)
- 相同的請求數(shù)據(jù)沒做過濾(例如加上接口調(diào)用會話id來過濾)
- 接口沒做組合服務(wù)(這也是整個事件過程中比較重要的一個地方,因為代碼未做規(guī)范化,所以才會暴漏了這么嚴重的問題,如果統(tǒng)一了服務(wù)調(diào)用就不會出現(xiàn)這個問題了,就像加錢和扣錢的原子服務(wù)以及處理了很多會發(fā)生的問題了)
- 沒做并發(fā)測試,測試點不足(不提了,可能很多公司都有這個通病吧,哎,以后要重點注意了)
- 用戶金錢安全性保證不夠(給不了你心愛的用戶安全感,拿什么說愛他們)
- 未做風(fēng)控檢測(考慮每天跑跑定時任務(wù),做一些業(yè)務(wù)檢測)