一星虹、首先看到入口地方:
@Path("/v1/cluster")
public class ClusterStatsResource
{
private final InternalNodeManager nodeManager;
private final QueryManager queryManager;
private final boolean isIncludeCoordinator;
有三個(gè)狀態(tài):節(jié)點(diǎn)管理器飒房、查詢管理器、是否包含協(xié)調(diào)器
二狠毯、提供的服務(wù):
@GET
@Produces(MediaType.APPLICATION_JSON)
public ClusterStats getClusterStats()
{
long runningQueries = 0;
long blockedQueries = 0;
long queuedQueries = 0;
省略。嚼松。。献酗。
返回的是ClusterStats信息
三、ClusterStats狀態(tài)有哪些?
public static class ClusterStats
{
private final long runningQueries;
private final long blockedQueries;
private final long queuedQueries;
private final long activeWorkers;
private final long runningDrivers;
private final double reservedMemory;
private final double rowInputRate;
private final double byteInputRate;
private final double cpuTimeRate;
此處我們可以知道此接口返回的是:運(yùn)行查詢數(shù)罕偎、阻塞個(gè)數(shù)、進(jìn)入排隊(duì)數(shù)、活動(dòng)的worker數(shù)甩苛、正在運(yùn)行的drivers數(shù) 等信息。
四讯蒲、ClusterStats狀態(tài)是如何獲取的呢?
由兩個(gè)最開始ClusterStatsResource中的兩個(gè)狀態(tài)提供:
InternalNodeManager
QueryManager
五墨林、先看InternalNodeManager提供什么?
@Inject
public DiscoveryNodeManager(
@ServiceType("presto") ServiceSelector serviceSelector,
NodeInfo nodeInfo,
FailureDetector failureDetector,
NodeVersion expectedNodeVersion,
@ForNodeManager HttpClient httpClient)
{
省略旭等。酌呆。肪笋。
this.currentNode = refreshNodesInternal();
}
看最后一行月劈,refreshNodesInternal。
//獲取所有節(jié)點(diǎn)的狀態(tài)信息猜揪,通過(guò)/v1/service來(lái)獲取。包括location而姐、節(jié)點(diǎn)狀態(tài)腊凶、節(jié)點(diǎn)ID拴念、UUID等信息
Set<ServiceDescriptor> services = serviceSelector.selectAllServices().stream()
.filter(service -> !failureDetector.getFailed().contains(service))
.collect(toImmutableSet());
//獲取所有節(jié)點(diǎn)狀況
allNodes = new AllNodes(activeNodesBuilder.build(), inactiveNodesBuilder.build(), shuttingDownNodesBuilder.build());
activeNodesByConnectorId = byConnectorIdBuilder.build();
coordinators = coordinatorsBuilder.build();
每隔5秒來(lái),由協(xié)調(diào)器節(jié)點(diǎn)主動(dòng)去查詢workers狀態(tài)政鼠。而且在更新完成5s之后,就調(diào)用上面的refreshNodesInternal方法公般。通過(guò)/v1/service來(lái)獲取節(jié)點(diǎn)信息,把新節(jié)點(diǎn)加入到DiscoveryNodeManager的一個(gè)map中nodeStates官帘。
@PostConstruct
public void startPollingNodeStates()
{
// 如果是協(xié)調(diào)器節(jié)點(diǎn),就定時(shí)5s去刷新拉去worker節(jié)點(diǎn)數(shù)據(jù)
if (getCoordinators().contains(currentNode)) {
nodeStateUpdateExecutor.scheduleWithFixedDelay(() -> {
AllNodes allNodes = getAllNodes();
<b>從上面我們已經(jīng)看出來(lái)ClusterStats的activeWorkers信息可以從InternalNodeManager的nodeStates中獲取刽虹。</b>
六、再看QueryManager提供什么?
他的實(shí)現(xiàn)類是:SqlQueryManager
QueryManager提供了getAllQueryInfo方法給ClusterStatsResource來(lái)獲取ClusterStats中的狀態(tài)信息胖缤。
@Override
public List<QueryInfo> getAllQueryInfo()
{
return queries.values().stream()
.map(queryExecution -> {
try {
return queryExecution.getQueryInfo();
}
catch (RuntimeException ignored) {
return null;
}
})
.filter(Objects::nonNull)
.collect(toImmutableList());
}
這個(gè)類在創(chuàng)建query時(shí)候,加入到 queries中
private final ConcurrentMap<QueryId, QueryExecution> queries = new ConcurrentHashMap<>();
同時(shí)對(duì)每一個(gè)query添加監(jiān)聽(tīng)器草姻,一旦執(zhí)行狀態(tài)改變,就更新?tīng)顟B(tài)query狀態(tài)即:QueryInfo
七撩独、回頭看看ClusterStats中的狀態(tài)
public ClusterStats getClusterStats()
{
long runningQueries = 0;
long blockedQueries = 0;
long queuedQueries = 0;
long activeNodes = nodeManager.getNodes(NodeState.ACTIVE).size();
if (!isIncludeCoordinator) {
activeNodes -= 1;
}
long runningDrivers = 0;
double memoryReservation = 0;
double rowInputRate = 0;
double byteInputRate = 0;
double cpuTimeRate = 0;
for (QueryInfo query : queryManager.getAllQueryInfo()) {
if (query.getState() == QueryState.QUEUED) {
queuedQueries++;
}
else if (query.getState() == QueryState.RUNNING) {
if (query.getQueryStats().isFullyBlocked()) {
blockedQueries++;
}
else {
runningQueries++;
}
}
if (!query.getState().isDone()) {
double totalExecutionTimeSeconds = query.getQueryStats().getElapsedTime().getValue(SECONDS);
if (totalExecutionTimeSeconds != 0) {
byteInputRate += query.getQueryStats().getProcessedInputDataSize().toBytes() / totalExecutionTimeSeconds;
rowInputRate += query.getQueryStats().getProcessedInputPositions() / totalExecutionTimeSeconds;
cpuTimeRate += (query.getQueryStats().getTotalCpuTime().getValue(SECONDS)) / totalExecutionTimeSeconds;
}
memoryReservation += query.getQueryStats().getTotalMemoryReservation().toBytes();
runningDrivers += query.getQueryStats().getRunningDrivers();
}
}
return new ClusterStats(runningQueries, blockedQueries, queuedQueries, activeNodes, runningDrivers, memoryReservation, rowInputRate, byteInputRate, cpuTimeRate);
}
我們看到大量的信息是來(lái)自query.QueryInfo中的信息。
八澳迫、狀態(tài)監(jiān)聽(tīng)器實(shí)現(xiàn)
待續(xù)~~~