第一步 收集
上碼:
public StressResult test(int concurrencyLevel, int totalRequests, StressTask stressTask, int warmUpTime) {
if (stressTask == null) {
stressTask = this.emptyTestService;
}
//預(yù)執(zhí)行,確保代碼無誤
this.warmUp(warmUpTime, stressTask);
int everyThreadCount = totalRequests / concurrencyLevel;
// 建立兩個(gè)開關(guān)怀各,一個(gè)管理開始時(shí)所有的數(shù)據(jù)預(yù)備到位席赂,一個(gè)管理結(jié)束時(shí)所有的數(shù)據(jù)執(zhí)行完畢
//開始的開關(guān)
CyclicBarrier threadStartBarrier = new CyclicBarrier(concurrencyLevel);
//結(jié)束的開關(guān)
CountDownLatch threadEndLatch = new CountDownLatch(concurrencyLevel);
//失敗的次數(shù)統(tǒng)計(jì)
AtomicInteger failedCounter = new AtomicInteger();
StressContext stressContext = new StressContext();
stressContext.setTestService(stressTask);
stressContext.setEveryThreadCount(everyThreadCount);
stressContext.setThreadStartBarrier(threadStartBarrier);
stressContext.setThreadEndLatch(threadEndLatch);
stressContext.setFailedCounter(failedCounter);
//設(shè)定線程池
ExecutorService executorService = Executors.newFixedThreadPool(concurrencyLevel);
List<StressThreadWorker> workers = new ArrayList(concurrencyLevel);
int realTotalRequests;
StressThreadWorker worker;
//添加到隊(duì)列和執(zhí)行隊(duì)列中的數(shù)據(jù)分開來寫
//將數(shù)據(jù)添加到執(zhí)行隊(duì)列中去
for(realTotalRequests = 0; realTotalRequests < concurrencyLevel; ++realTotalRequests) {
worker = new StressThreadWorker(stressContext, everyThreadCount);
workers.add(worker);
}
//將執(zhí)行隊(duì)列中的數(shù)據(jù)執(zhí)行
for(realTotalRequests = 0; realTotalRequests < concurrencyLevel; ++realTotalRequests) {
worker = (StressThreadWorker)workers.get(realTotalRequests);
executorService.submit(worker);
}
//在此處等待線程池執(zhí)行完畢
try {
threadEndLatch.await();
} catch (InterruptedException var20) {
log.error("InterruptedException", var20);
}
//關(guān)閉線程池
executorService.shutdownNow();
//真是請(qǐng)求的數(shù)量需要重新計(jì)算缰盏,避免上面有除不盡的情況,如并發(fā)99痰娱,總請(qǐng)求書1000
realTotalRequests = everyThreadCount * concurrencyLevel;
int failedRequests = failedCounter.get();
StressResult stressResult = new StressResult();
StressTester.SortResult sortResult = this.getSortedTimes(workers);
List<Long> allTimes = sortResult.allTimes;
stressResult.setAllTimes(allTimes);
List<Long> trheadTimes = sortResult.trheadTimes;
long totalTime = (Long)trheadTimes.get(trheadTimes.size() - 1);
stressResult.setTestsTakenTime(totalTime);
stressResult.setFailedRequests(failedRequests);
stressResult.setTotalRequests(realTotalRequests);
stressResult.setConcurrencyLevel(concurrencyLevel);
stressResult.setWorkers(workers);
return stressResult;
}
下面貼一下StressThreadWorker
的源碼:
package com.taobao.stresstester.core;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class StressThreadWorker implements Runnable {
private StressTask service;
private CyclicBarrier threadStartBarrier;
private CountDownLatch threadEndLatch;
private AtomicInteger failedCounter = null;
private int count;
protected static Logger log = LoggerFactory.getLogger(SimpleResultFormater.class);
private List<Long> everyTimes;
public StressThreadWorker(StressContext stressContext, int count) {
this.threadStartBarrier = stressContext.getThreadStartBarrier();
this.threadEndLatch = stressContext.getThreadEndLatch();
this.failedCounter = stressContext.getFailedCounter();
this.count = count;
this.everyTimes = new ArrayList(count);
this.service = stressContext.getTestService();
}
public List<Long> getEveryTimes() {
return this.everyTimes;
}
public void run() {
try {
//等數(shù)據(jù)就位
this.threadStartBarrier.await();
this.doRun();
} catch (Exception var2) {
log.error("Test exception", var2);
var2.printStackTrace();
}
}
protected void doRun() throws Exception {
//此處的count數(shù)是每個(gè)線程需要執(zhí)行的數(shù)鸽粉,即請(qǐng)求總數(shù)/并發(fā)線程總數(shù)
for(int i = 0; i < this.count; ++i) {
long start = System.nanoTime();
try {
this.service.doTask();
} catch (Throwable var12) {
//失敗次數(shù)統(tǒng)計(jì),所有線程共享 失敗次數(shù)
this.failedCounter.incrementAndGet();
} finally {
//在finally中進(jìn)行接收數(shù)據(jù)
//每個(gè)work是一個(gè)新對(duì)象藻三,所以統(tǒng)計(jì)的數(shù)據(jù)不會(huì)互相影響,最后的數(shù)據(jù)放在每個(gè)work中
long var6 = System.nanoTime();
long limit = var6 - start;
this.everyTimes.add(limit);
}
}
this.threadEndLatch.countDown();
}
}
結(jié)果整理排序的類:
protected StressTester.SortResult getSortedTimes(List<StressThreadWorker> workers) {
List<Long> allTimes = new ArrayList();
List<Long> trheadTimes = new ArrayList();
Iterator var5 = workers.iterator();
while(var5.hasNext()) {
StressThreadWorker worker = (StressThreadWorker)var5.next();
List<Long> everyWorkerTimes = worker.getEveryTimes();
long workerTotalTime = StatisticsUtils.getTotal(everyWorkerTimes);
trheadTimes.add(workerTotalTime);
Iterator var10 = everyWorkerTimes.iterator();
//將所有的每次執(zhí)行的時(shí)間匯總
while(var10.hasNext()) {
Long time = (Long)var10.next();
allTimes.add(time);
}
}
//此處按照大小排序跪者,在后面取百分比多少的運(yùn)行時(shí)間時(shí)棵帽,直接按照索引取數(shù)值
Collections.sort(allTimes);
Collections.sort(trheadTimes);
StressTester.SortResult result = new StressTester.SortResult();
result.allTimes = allTimes;
result.trheadTimes = trheadTimes;
return result;
}
class SortResult {
List<Long> allTimes;
List<Long> trheadTimes;
SortResult() {
}
}
第二步 格式化輸出
public void format(StressResult stressResult, Writer writer) {
//測(cè)試總耗時(shí)
long testsTakenTime = stressResult.getTestsTakenTime();
//測(cè)試總請(qǐng)求數(shù)
int totalRequests = stressResult.getTotalRequests();
//測(cè)試并發(fā)線程數(shù)
int concurrencyLevel = stressResult.getConcurrencyLevel();
//納秒轉(zhuǎn)毫秒
float takes = StatisticsUtils.toMs(testsTakenTime);
//此處存放的是所有請(qǐng)求的時(shí)間集合
List<Long> allTimes = stressResult.getAllTimes();
//聚合所有時(shí)間,求和
long totaleTimes = StatisticsUtils.getTotal(allTimes);
//TPS(每秒傳輸?shù)恼?qǐng)求)=總線程數(shù)*總請(qǐng)求數(shù)/總耗時(shí) 最后再轉(zhuǎn)換為秒級(jí)別
float tps = 1.0E9F * (float)concurrencyLevel * ((float)totalRequests / (float)totaleTimes);
//計(jì)算平均時(shí)間(此處是納秒)
float averageTime = StatisticsUtils.getAverage(totaleTimes, totalRequests);
//每個(gè)線程的平均時(shí)間
float onTheadAverageTime = averageTime / (float)concurrencyLevel;
//計(jì)算指定百分比對(duì)應(yīng)索引渣玲,直接獲取索引對(duì)應(yīng)的數(shù)字逗概,即百分比多少的數(shù)據(jù)是在多少納秒之前執(zhí)行的
int count_50 = totalRequests / 2;
int count_66 = totalRequests * 66 / 100;
int count_75 = totalRequests * 75 / 100;
int count_80 = totalRequests * 80 / 100;
int count_90 = totalRequests * 90 / 100;
int count_95 = totalRequests * 95 / 100;
int count_98 = totalRequests * 98 / 100;
int count_99 = totalRequests * 99 / 100;
long longestRequest = (Long)allTimes.get(allTimes.size() - 1);
long shortestRequest = (Long)allTimes.get(0);
StringBuilder view = new StringBuilder();
view.append(" Concurrency Level:\t").append(concurrencyLevel).append("--并發(fā)數(shù)");
view.append("\r\n Time taken for tests:\t").append(takes).append(" ms").append("--測(cè)試耗時(shí)");
view.append("\r\n Complete Requests:\t").append(totalRequests).append("--完成測(cè)試次數(shù)");
view.append("\r\n Failed Requests:\t").append(stressResult.getFailedRequests()).append("--失敗次數(shù)");
view.append("\r\n Requests per second:\t").append(tps).append("--QPS");
view.append("\r\n Time per request:\t").append(StatisticsUtils.toMs(averageTime)).append(" ms").append("--平均耗時(shí)");
view.append("\r\n Time per request:\t").append(StatisticsUtils.toMs(onTheadAverageTime)).append(" ms (across all concurrent requests)").append("--平均耗時(shí),忽略并發(fā)影響");
view.append("\r\n Shortest request:\t").append(StatisticsUtils.toMs(shortestRequest)).append(" ms").append("--最短耗時(shí)");
view.append("\r\n Percentage of the requests served within a certain time (ms)");
view.append("\r\n 50%\t").append(StatisticsUtils.toMs((Long)allTimes.get(count_50))).append("--50% 的耗時(shí)在0.005703毫秒以下");
view.append("\r\n 66%\t").append(StatisticsUtils.toMs((Long)allTimes.get(count_66)));
view.append("\r\n 75%\t").append(StatisticsUtils.toMs((Long)allTimes.get(count_75)));
view.append("\r\n 80%\t").append(StatisticsUtils.toMs((Long)allTimes.get(count_80)));
view.append("\r\n 90%\t").append(StatisticsUtils.toMs((Long)allTimes.get(count_90)));
view.append("\r\n 95%\t").append(StatisticsUtils.toMs((Long)allTimes.get(count_95)));
view.append("\r\n 98%\t").append(StatisticsUtils.toMs((Long)allTimes.get(count_98)));
view.append("\r\n 99%\t").append(StatisticsUtils.toMs((Long)allTimes.get(count_99)));
view.append("\r\n 100%\t").append(StatisticsUtils.toMs(longestRequest)).append(" (longest request)").append("--最長(zhǎng)的耗時(shí)");
try {
writer.write(view.toString());
} catch (IOException var29) {
log.error("IOException:", var29);
}
}
TPS和QPS的區(qū)別
TPS(transcations per second)每秒傳輸?shù)恼?qǐng)求總數(shù)忘衍。系統(tǒng)級(jí)別指標(biāo)仗谆。
QPS(queries per second)每秒查詢數(shù)指巡。單臺(tái)服務(wù)器級(jí)別指標(biāo)。