Presto技術(shù)總結(jié) 因?yàn)閮?nèi)容過長分為了上下兩集
2.4.5stage調(diào)度器開始調(diào)度
stage調(diào)度器主要包括以下三種
(1)Source task
- SourcePartitionedScheduler
(2)Fixed task
- FixedCountScheduler
- FixedSourcePartitionedScheduler
分配策略主要包括下面兩種
(1) DynamicSplitPlacementPolicy
(2) FixedSplitPlacementPolicy
在query調(diào)度器中,調(diào)用stage的調(diào)度器
調(diào)用代碼為stageSchedulers.get(stage.getStageId()).schedule();
第一種為:SourcePartitionedScheduler
public synchronized ScheduleResult schedule()
{
int overallSplitAssignmentCount = 0;
ImmutableSet.Builder<RemoteTask> overallNewTasks = ImmutableSet.builder();
List<ListenableFuture<?>> overallBlockedFutures = new ArrayList<>();
boolean anyBlockedOnPlacements = false;
boolean anyBlockedOnNextSplitBatch = false;
boolean anyNotBlocked = false;
for (Entry<Lifespan, ScheduleGroup> entry : scheduleGroups.entrySet()) {
Lifespan lifespan = entry.getKey();
ScheduleGroup scheduleGroup = entry.getValue();
Set<Split> pendingSplits = scheduleGroup.pendingSplits;
if (scheduleGroup.state != ScheduleGroupState.DISCOVERING_SPLITS) {
verify(scheduleGroup.nextSplitBatchFuture == null);
}
else if (pendingSplits.isEmpty()) {
// try to get the next batch 如果沒有等待中的split览效,則開始獲取下一批的split
if (scheduleGroup.nextSplitBatchFuture == null) {
scheduleGroup.nextSplitBatchFuture = splitSource.getNextBatch(scheduleGroup.partitionHandle, lifespan, splitBatchSize - pendingSplits.size());
long start = System.nanoTime();
Futures.addCallback(scheduleGroup.nextSplitBatchFuture, new FutureCallback<SplitBatch>()
{
@Override
public void onSuccess(SplitBatch result)
{
stage.recordGetSplitTime(start);
}
@Override
public void onFailure(Throwable t)
{
}
});
}
if (scheduleGroup.nextSplitBatchFuture.isDone()) {
SplitBatch nextSplits = getFutureValue(scheduleGroup.nextSplitBatchFuture);
scheduleGroup.nextSplitBatchFuture = null;
pendingSplits.addAll(nextSplits.getSplits());
if (nextSplits.isLastBatch() && scheduleGroup.state == ScheduleGroupState.DISCOVERING_SPLITS) {
//如果是最后一個(gè)batch都办,調(diào)度組的狀態(tài)是正在發(fā)現(xiàn)split的話匆背,則將調(diào)度組的狀態(tài)更新為沒有更多的splits
scheduleGroup.state = ScheduleGroupState.NO_MORE_SPLITS;
}
}
else {
overallBlockedFutures.add(scheduleGroup.nextSplitBatchFuture);
anyBlockedOnNextSplitBatch = true;
continue;
}
}
Multimap<Node, Split> splitAssignment = ImmutableMultimap.of();
if (!pendingSplits.isEmpty()) {
if (!scheduleGroup.placementFuture.isDone()) {
continue;
}
if (state == State.INITIALIZED) {
state = State.SPLITS_ADDED;
}
// 計(jì)算分片分配的位置,根據(jù)前面生成的策略飞盆,這一步其實(shí)是將split分配到不同的node上去
SplitPlacementResult splitPlacementResult = splitPlacementPolicy.computeAssignments(pendingSplits);
splitAssignment = splitPlacementResult.getAssignments();
// remove splits with successful placements
splitAssignment.values().forEach(pendingSplits::remove); // AbstractSet.removeAll performs terribly here.
overallSplitAssignmentCount += splitAssignment.size();
// if not completed placed, mark scheduleGroup as blocked on placement
if (!pendingSplits.isEmpty()) {
scheduleGroup.placementFuture = splitPlacementResult.getBlocked();
overallBlockedFutures.add(scheduleGroup.placementFuture);
anyBlockedOnPlacements = true;
}
}
// if no new splits will be assigned, update state and attach completion event
Multimap<Node, Lifespan> noMoreSplitsNotification = ImmutableMultimap.of();
if (pendingSplits.isEmpty() && scheduleGroup.state == ScheduleGroupState.NO_MORE_SPLITS) {
scheduleGroup.state = ScheduleGroupState.DONE;
if (!lifespan.isTaskWide()) {
Node node = ((FixedSplitPlacementPolicy) splitPlacementPolicy).getNodeForBucket(lifespan.getId());
noMoreSplitsNotification = ImmutableMultimap.of(node, lifespan);
}
}
//將split分配到不同的node上執(zhí)行贷岸,輸入node和split放回一個(gè)RemoteTask头镊,然后執(zhí)行task
overallNewTasks.addAll(assignSplits(splitAssignment, noMoreSplitsNotification));
// Assert that "placement future is not done" implies "pendingSplits is not empty".
// The other way around is not true. One obvious reason is (un)lucky timing, where the placement is unblocked between `computeAssignments` and this line.
// However, there are other reasons that could lead to this.
// Note that `computeAssignments` is quite broken:
// 1. It always returns a completed future when there are no tasks, regardless of whether all nodes are blocked.
// 2. The returned future will only be completed when a node with an assigned task becomes unblocked. Other nodes don't trigger future completion.
// As a result, to avoid busy loops caused by 1, we check pendingSplits.isEmpty() instead of placementFuture.isDone() here.
if (scheduleGroup.nextSplitBatchFuture == null && scheduleGroup.pendingSplits.isEmpty() && scheduleGroup.state != ScheduleGroupState.DONE) {
anyNotBlocked = true;
}
}
if (autoDropCompletedLifespans) {
drainCompletedLifespans();
}
// * `splitSource.isFinished` invocation may fail after `splitSource.close` has been invoked.
// If state is NO_MORE_SPLITS/FINISHED, splitSource.isFinished has previously returned true, and splitSource is closed now.
// * Even if `splitSource.isFinished()` return true, it is not necessarily safe to tear down the split source.
// * If anyBlockedOnNextSplitBatch is true, it means we have not checked out the recently completed nextSplitBatch futures,
// which may contain recently published splits. We must not ignore those.
// * If any scheduleGroup is still in DISCOVERING_SPLITS state, it means it hasn't realized that there will be no more splits.
// Next time it invokes getNextBatch, it will realize that. However, the invocation will fail we tear down splitSource now.
if ((state == State.NO_MORE_SPLITS || state == State.FINISHED) || (scheduleGroups.isEmpty() && splitSource.isFinished())) {
switch (state) {
case INITIALIZED:
// we have not scheduled a single split so far
state = State.SPLITS_ADDED;
ScheduleResult emptySplitScheduleResult = scheduleEmptySplit();
overallNewTasks.addAll(emptySplitScheduleResult.getNewTasks());
overallSplitAssignmentCount++;
// fall through
case SPLITS_ADDED:
state = State.NO_MORE_SPLITS;
splitSource.close();
// fall through
case NO_MORE_SPLITS:
if (!scheduleGroups.isEmpty()) {
// we are blocked on split assignment
break;
}
state = State.FINISHED;
whenFinishedOrNewLifespanAdded.set(null);
// fall through
case FINISHED:
return new ScheduleResult(
true,
overallNewTasks.build(),
overallSplitAssignmentCount);
default:
throw new IllegalStateException("Unknown state");
}
}
if (anyNotBlocked) {
return new ScheduleResult(false, overallNewTasks.build(), overallSplitAssignmentCount);
}
// Only try to finalize task creation when scheduling would block
overallNewTasks.addAll(finalizeTaskCreationIfNecessary());
ScheduleResult.BlockedReason blockedReason;
if (anyBlockedOnNextSplitBatch) {
blockedReason = anyBlockedOnPlacements ? MIXED_SPLIT_QUEUES_FULL_AND_WAITING_FOR_SOURCE : WAITING_FOR_SOURCE;
}
else {
blockedReason = anyBlockedOnPlacements ? SPLIT_QUEUES_FULL : NO_ACTIVE_DRIVER_GROUP;
}
overallBlockedFutures.add(whenFinishedOrNewLifespanAdded);
return new ScheduleResult(
false,
overallNewTasks.build(),
nonCancellationPropagating(whenAnyComplete(overallBlockedFutures)),
blockedReason,
overallSplitAssignmentCount);
}
splitPlacementPolicy.computeAssignments()方法
DynamicSplitPlacementPolicy類 動(dòng)態(tài)分配邏輯的實(shí)現(xiàn)
@Override
public SplitPlacementResult computeAssignments(Set<Split> splits)
{
//調(diào)用了nodeSelector的計(jì)算分配的方法
return nodeSelector.computeAssignments(splits, remoteTasks.get());
}
//前面提到過nodeSelector接口的實(shí)現(xiàn)類基本是都是TopologyAwareNodeSelector,下面是TopologyAwareNodeSelector分配split的實(shí)現(xiàn)邏輯
@Override
public SplitPlacementResult computeAssignments(Set<Split> splits, List<RemoteTask> existingTasks)
{
//拿出selector里面的nodeMap
NodeMap nodeMap = this.nodeMap.get().get();
//創(chuàng)建分配Map
Multimap<Node, Split> assignment = HashMultimap.create();
NodeAssignmentStats assignmentStats = new NodeAssignmentStats(nodeTaskMap, nodeMap, existingTasks);
int[] topologicCounters = new int[topologicalSplitCounters.size()];
Set<NetworkLocation> filledLocations = new HashSet<>();
Set<Node> blockedExactNodes = new HashSet<>();
boolean splitWaitingForAnyNode = false;
for (Split split : splits) {
//先判斷這個(gè)split能不能遠(yuǎn)程獲取羔砾,如果不能的話负间,則取出split對(duì)應(yīng)的網(wǎng)絡(luò)地址,看能否找到對(duì)應(yīng)的node姜凄,包括Coordinator政溃,然后放到候選節(jié)點(diǎn)里面,如果找不到對(duì)應(yīng)的節(jié)點(diǎn)态秧,則拋出異常
if (!split.isRemotelyAccessible()) {
List<Node> candidateNodes = selectExactNodes(nodeMap, split.getAddresses(), includeCoordinator);
if (candidateNodes.isEmpty()) {
log.debug("No nodes available to schedule %s. Available nodes %s", split, nodeMap.getNodesByHost().keys());
throw new PrestoException(NO_NODES_AVAILABLE, "No nodes available to run query");
}
//這個(gè)里面涉及到一個(gè)選擇策略董虱,主要是最小候選節(jié)點(diǎn)數(shù),和最大task分配的split進(jìn)行撮合對(duì)比
Node chosenNode = bestNodeSplitCount(candidateNodes.iterator(), minCandidates, maxPendingSplitsPerTask, assignmentStats);
//如果可以選出來
if (chosenNode != null) {
//放入選擇的node和對(duì)應(yīng)的split
assignment.put(chosenNode, split);
assignmentStats.addAssignedSplit(chosenNode);
}
// Exact node set won't matter, if a split is waiting for any node
else if (!splitWaitingForAnyNode) {
//如果根據(jù)策略找不到申鱼,則先把所有的候選節(jié)點(diǎn)放到這個(gè)set中
blockedExactNodes.addAll(candidateNodes);
}
continue;
}
//如果不存在遠(yuǎn)程獲取的問題愤诱,則根據(jù)下面的
Node chosenNode = null;
//生成網(wǎng)絡(luò)層數(shù),后面需要遞歸
int depth = networkLocationSegmentNames.size();
int chosenDepth = 0;
Set<NetworkLocation> locations = new HashSet<>();
//把這個(gè)split對(duì)應(yīng)網(wǎng)絡(luò)地址遍歷放在NetworkLocation集合中
for (HostAddress host : split.getAddresses()) {
locations.add(networkLocationCache.get(host));
}
//如果緩存里面獲取不到地址捐友,則放入root地址淫半,并將網(wǎng)絡(luò)層數(shù)置為0
if (locations.isEmpty()) {
// Add the root location
locations.add(ROOT_LOCATION);
depth = 0;
}
// Try each address at progressively shallower network locations
for (int i = depth; i >= 0 && chosenNode == null; i--) {
for (NetworkLocation location : locations) {
// Skip locations which are only shallower than this level
// For example, locations which couldn't be located will be at the "root" location
if (location.getSegments().size() < i) {
continue;
}
location = location.subLocation(0, i);
if (filledLocations.contains(location)) {
continue;
}
Set<Node> nodes = nodeMap.getWorkersByNetworkPath().get(location);
chosenNode = bestNodeSplitCount(new ResettableRandomizedIterator<>(nodes), minCandidates, calculateMaxPendingSplits(i, depth), assignmentStats);
if (chosenNode != null) {
chosenDepth = i;
break;
}
filledLocations.add(location);
}
}
if (chosenNode != null) {
//放入選擇的node和對(duì)應(yīng)的split
assignment.put(chosenNode, split);
assignmentStats.addAssignedSplit(chosenNode);
topologicCounters[chosenDepth]++;
}
else {
splitWaitingForAnyNode = true;
}
}
for (int i = 0; i < topologicCounters.length; i++) {
if (topologicCounters[i] > 0) {
topologicalSplitCounters.get(i).update(topologicCounters[i]);
}
}
ListenableFuture<?> blocked;
int maxPendingForWildcardNetworkAffinity = calculateMaxPendingSplits(0, networkLocationSegmentNames.size());
if (splitWaitingForAnyNode) {
blocked = toWhenHasSplitQueueSpaceFuture(existingTasks, calculateLowWatermark(maxPendingForWildcardNetworkAffinity));
}
else {
blocked = toWhenHasSplitQueueSpaceFuture(blockedExactNodes, existingTasks, calculateLowWatermark(maxPendingForWildcardNetworkAffinity));
}
return new SplitPlacementResult(blocked, assignment);
}
SourcePartitionedScheduler類的assignSplits方法
//對(duì)傳入的參數(shù)splitAssignment進(jìn)行遍歷,對(duì)每個(gè)entry都執(zhí)行如下操作
//根據(jù)node獲取該node上的task,若task為空匣砖,則新建一個(gè)task科吭,否則將該node上的split提交給運(yùn)行在該node上的task進(jìn)行處理
private Set<RemoteTask> assignSplits(Multimap<Node, Split> splitAssignment, Multimap<Node, Lifespan> noMoreSplitsNotification)
{
ImmutableSet.Builder<RemoteTask> newTasks = ImmutableSet.builder();
ImmutableSet<Node> nodes = ImmutableSet.<Node>builder()
.addAll(splitAssignment.keySet())
.addAll(noMoreSplitsNotification.keySet())
.build();
for (Node node : nodes) {
// source partitioned tasks can only receive broadcast data; otherwise it would have a different distribution
ImmutableMultimap<PlanNodeId, Split> splits = ImmutableMultimap.<PlanNodeId, Split>builder()
.putAll(partitionedNode, splitAssignment.get(node))
.build();
ImmutableMultimap.Builder<PlanNodeId, Lifespan> noMoreSplits = ImmutableMultimap.builder();
if (noMoreSplitsNotification.containsKey(node)) {
noMoreSplits.putAll(partitionedNode, noMoreSplitsNotification.get(node));
}
newTasks.addAll(stage.scheduleSplits(
node,
splits,
noMoreSplits.build()));
}
return newTasks.build();
}
SqlStageExecution類的scheduleSplits方法
public synchronized Set<RemoteTask> scheduleSplits(Node node, Multimap<PlanNodeId, Split> splits, Multimap<PlanNodeId, Lifespan> noMoreSplitsNotification)
{
requireNonNull(node, "node is null");
requireNonNull(splits, "splits is null");
splitsScheduled.set(true);
checkArgument(stateMachine.getFragment().getPartitionedSources().containsAll(splits.keySet()), "Invalid splits");
ImmutableSet.Builder<RemoteTask> newTasks = ImmutableSet.builder();
Collection<RemoteTask> tasks = this.tasks.get(node);
RemoteTask task;
if (tasks == null) {
// The output buffer depends on the task id starting from 0 and being sequential, since each
// task is assigned a private buffer based on task id.
TaskId taskId = new TaskId(stateMachine.getStageId(), nextTaskId.getAndIncrement());
task = scheduleTask(node, taskId, splits);
newTasks.add(task);
}
else {
task = tasks.iterator().next();
task.addSplits(splits);
}
if (noMoreSplitsNotification.size() > 1) {
// The assumption that `noMoreSplitsNotification.size() <= 1` currently holds.
// If this assumption no longer holds, we should consider calling task.noMoreSplits with multiple entries in one shot.
// These kind of methods can be expensive since they are grabbing locks and/or sending HTTP requests on change.
throw new UnsupportedOperationException("This assumption no longer holds: noMoreSplitsNotification.size() < 1");
}
for (Entry<PlanNodeId, Lifespan> entry : noMoreSplitsNotification.entries()) {
task.noMoreSplits(entry.getKey(), entry.getValue());
}
return newTasks.build();
第二種為FixedSourcePartitionedScheduler
public ScheduleResult schedule()
{
// schedule a task on every node in the distribution
List<RemoteTask> newTasks = ImmutableList.of();
if (!scheduledTasks) {
newTasks = partitioning.getPartitionToNode().entrySet().stream()
.map(entry -> stage.scheduleTask(entry.getValue(), entry.getKey()))
.collect(toImmutableList());
scheduledTasks = true;
}
boolean allBlocked = true;
List<ListenableFuture<?>> blocked = new ArrayList<>();
BlockedReason blockedReason = BlockedReason.NO_ACTIVE_DRIVER_GROUP;
int splitsScheduled = 0;
Iterator<SourcePartitionedScheduler> schedulerIterator = sourcePartitionedSchedulers.iterator();
List<Lifespan> driverGroupsToStart = ImmutableList.of();
while (schedulerIterator.hasNext()) {
SourcePartitionedScheduler sourcePartitionedScheduler = schedulerIterator.next();
for (Lifespan lifespan : driverGroupsToStart) {
sourcePartitionedScheduler.startLifespan(lifespan, partitionHandleFor(lifespan));
}
ScheduleResult schedule = sourcePartitionedScheduler.schedule();
splitsScheduled += schedule.getSplitsScheduled();
if (schedule.getBlockedReason().isPresent()) {
blocked.add(schedule.getBlocked());
blockedReason = blockedReason.combineWith(schedule.getBlockedReason().get());
}
else {
verify(schedule.getBlocked().isDone(), "blockedReason not provided when scheduler is blocked");
allBlocked = false;
}
driverGroupsToStart = sourcePartitionedScheduler.drainCompletedLifespans();
if (schedule.isFinished()) {
schedulerIterator.remove();
sourcePartitionedScheduler.close();
}
}
if (allBlocked) {
return new ScheduleResult(sourcePartitionedSchedulers.isEmpty(), newTasks, whenAnyComplete(blocked), blockedReason, splitsScheduled);
}
else {
return new ScheduleResult(sourcePartitionedSchedulers.isEmpty(), newTasks, splitsScheduled);
}
}
第三種為FixedCountScheduler
public ScheduleResult schedule()
{
List<RemoteTask> newTasks = partitionToNode.entrySet().stream()
.map(entry -> taskScheduler.apply(entry.getValue(), entry.getKey()))
.collect(toImmutableList());
return new ScheduleResult(true, newTasks, 0);
}
2.5生成RemoteTask任務(wù)
根據(jù)Presto的架構(gòu),stage調(diào)度會(huì)產(chǎn)生task任務(wù)下發(fā)到worker上執(zhí)行
SqlStageExecution類的scheduleTask方法
private synchronized RemoteTask scheduleTask(Node node, TaskId taskId, Multimap<PlanNodeId, Split> sourceSplits)
{
ImmutableMultimap.Builder<PlanNodeId, Split> initialSplits = ImmutableMultimap.builder();
//搜集所有的sourceSplits猴鲫,在類型為source的stage中对人,該方法傳入?yún)?shù)sourceSplits是值的,而在fixed和single的stage中拂共,該方法的傳入?yún)?shù)sourceSplits是沒有值的
initialSplits.putAll(sourceSplits);
sourceTasks.forEach((planNodeId, task) -> {
TaskStatus status = task.getTaskStatus();
if (status.getState() != TaskState.FINISHED) {
initialSplits.put(planNodeId, createRemoteSplitFor(taskId, status.getSelf()));
}
});
OutputBuffers outputBuffers = this.outputBuffers.get();
checkState(outputBuffers != null, "Initial output buffers must be set before a task can be scheduled");
//創(chuàng)建遠(yuǎn)程的task任務(wù)
RemoteTask task = remoteTaskFactory.createRemoteTask(
stateMachine.getSession(),
taskId,
node,
stateMachine.getFragment(),
initialSplits.build(),
outputBuffers,
nodeTaskMap.createPartitionedSplitCountTracker(node, taskId),
summarizeTaskInfo);
completeSources.forEach(task::noMoreSplits);
allTasks.add(taskId);
tasks.computeIfAbsent(node, key -> newConcurrentHashSet()).add(task);
nodeTaskMap.addTask(node, task);
task.addStateChangeListener(new StageTaskListener());
if (!stateMachine.getState().isDone()) {
task.start();
}
else {
task.abort();
}
return task;
}
RemoteTask接口對(duì)應(yīng)實(shí)現(xiàn)類HttpRemoteTask
public HttpRemoteTask(Session session,
TaskId taskId,
String nodeId,
URI location,
PlanFragment planFragment,
Multimap<PlanNodeId, Split> initialSplits,
OutputBuffers outputBuffers,
HttpClient httpClient,
Executor executor,
ScheduledExecutorService updateScheduledExecutor,
ScheduledExecutorService errorScheduledExecutor,
Duration minErrorDuration,
Duration maxErrorDuration,
Duration taskStatusRefreshMaxWait,
Duration taskInfoUpdateInterval,
boolean summarizeTaskInfo,
JsonCodec<TaskStatus> taskStatusCodec,
JsonCodec<TaskInfo> taskInfoCodec,
JsonCodec<TaskUpdateRequest> taskUpdateRequestCodec,
PartitionedSplitCountTracker partitionedSplitCountTracker,
RemoteTaskStats stats)
{
requireNonNull(session, "session is null");
requireNonNull(taskId, "taskId is null");
requireNonNull(nodeId, "nodeId is null");
requireNonNull(location, "location is null");
requireNonNull(planFragment, "planFragment is null");
requireNonNull(outputBuffers, "outputBuffers is null");
requireNonNull(httpClient, "httpClient is null");
requireNonNull(executor, "executor is null");
requireNonNull(taskStatusCodec, "taskStatusCodec is null");
requireNonNull(taskInfoCodec, "taskInfoCodec is null");
requireNonNull(taskUpdateRequestCodec, "taskUpdateRequestCodec is null");
requireNonNull(partitionedSplitCountTracker, "partitionedSplitCountTracker is null");
requireNonNull(stats, "stats is null");
try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s", taskId)) {
this.taskId = taskId;
this.session = session;
this.nodeId = nodeId;
this.planFragment = planFragment;
this.outputBuffers.set(outputBuffers);
this.httpClient = httpClient;
this.executor = executor;
this.errorScheduledExecutor = errorScheduledExecutor;
this.summarizeTaskInfo = summarizeTaskInfo;
this.taskInfoCodec = taskInfoCodec;
this.taskUpdateRequestCodec = taskUpdateRequestCodec;
this.updateErrorTracker = new RequestErrorTracker(taskId, location, minErrorDuration, maxErrorDuration, errorScheduledExecutor, "updating task");
this.partitionedSplitCountTracker = requireNonNull(partitionedSplitCountTracker, "partitionedSplitCountTracker is null");
this.stats = stats;
for (Entry<PlanNodeId, Split> entry : requireNonNull(initialSplits, "initialSplits is null").entries()) {
ScheduledSplit scheduledSplit = new ScheduledSplit(nextSplitId.getAndIncrement(), entry.getKey(), entry.getValue());
pendingSplits.put(entry.getKey(), scheduledSplit);
}
pendingSourceSplitCount = planFragment.getPartitionedSources().stream()
.filter(initialSplits::containsKey)
.mapToInt(partitionedSource -> initialSplits.get(partitionedSource).size())
.sum();
List<BufferInfo> bufferStates = outputBuffers.getBuffers()
.keySet().stream()
.map(outputId -> new BufferInfo(outputId, false, 0, 0, PageBufferInfo.empty()))
.collect(toImmutableList());
TaskInfo initialTask = createInitialTask(taskId, location, nodeId, bufferStates, new TaskStats(DateTime.now(), null));
this.taskStatusFetcher = new ContinuousTaskStatusFetcher(
this::failTask,
initialTask.getTaskStatus(),
taskStatusRefreshMaxWait,
taskStatusCodec,
executor,
httpClient,
minErrorDuration,
maxErrorDuration,
errorScheduledExecutor,
stats);
this.taskInfoFetcher = new TaskInfoFetcher(
this::failTask,
initialTask,
httpClient,
taskInfoUpdateInterval,
taskInfoCodec,
minErrorDuration,
maxErrorDuration,
summarizeTaskInfo,
executor,
updateScheduledExecutor,
errorScheduledExecutor,
stats);
taskStatusFetcher.addStateChangeListener(newStatus -> {
TaskState state = newStatus.getState();
if (state.isDone()) {
cleanUpTask();
}
else {
partitionedSplitCountTracker.setPartitionedSplitCount(getPartitionedSplitCount());
updateSplitQueueSpace();
}
});
long timeout = minErrorDuration.toMillis() / MIN_RETRIES;
this.requestTimeout = new Duration(timeout + taskStatusRefreshMaxWait.toMillis(), MILLISECONDS);
partitionedSplitCountTracker.setPartitionedSplitCount(getPartitionedSplitCount());
updateSplitQueueSpace();
}
}
Task的start方法牺弄,開始輪詢對(duì)應(yīng)的task狀態(tài)
public void start()
{
try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s", taskId)) {
// to start we just need to trigger an update
scheduleUpdate();
taskStatusFetcher.start();
taskInfoFetcher.start();
}
}
exchange用于從上游stage中獲取數(shù)據(jù),而outputBuffer則將當(dāng)前stage的數(shù)據(jù)輸出給下游stage
2.6Task執(zhí)行
2.6.1Worker接收Task任務(wù)
前面創(chuàng)建RemoteTask后匣缘,通過http rest請(qǐng)求將task任務(wù)下放到對(duì)應(yīng)的worker上去
@Path("/v1/task")
@POST
@Path("{taskId}")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Response createOrUpdateTask(@PathParam("taskId") TaskId taskId, TaskUpdateRequest taskUpdateRequest, @Context UriInfo uriInfo)
{
requireNonNull(taskUpdateRequest, "taskUpdateRequest is null");
Session session = taskUpdateRequest.getSession().toSession(sessionPropertyManager);
TaskInfo taskInfo = taskManager.updateTask(session,
taskId,
taskUpdateRequest.getFragment(),
taskUpdateRequest.getSources(),
taskUpdateRequest.getOutputIds());
if (shouldSummarize(uriInfo)) {
taskInfo = taskInfo.summarize();
}
return Response.ok().entity(taskInfo).build();
}
SqlTaskManager類的updateTask方法
@Override
public TaskInfo updateTask(Session session, TaskId taskId, Optional<PlanFragment> fragment, List<TaskSource> sources, OutputBuffers outputBuffers)
{
requireNonNull(session, "session is null");
requireNonNull(taskId, "taskId is null");
requireNonNull(fragment, "fragment is null");
requireNonNull(sources, "sources is null");
requireNonNull(outputBuffers, "outputBuffers is null");
if (resourceOvercommit(session)) {
// TODO: This should have been done when the QueryContext was created. However, the session isn't available at that point.
queryContexts.getUnchecked(taskId.getQueryId()).setResourceOvercommit();
}
SqlTask sqlTask = tasks.getUnchecked(taskId);
sqlTask.recordHeartbeat();
return sqlTask.updateTask(session, fragment, sources, outputBuffers);
}
public TaskInfo updateTask(Session session, Optional<PlanFragment> fragment, List<TaskSource> sources, OutputBuffers outputBuffers)
{
try {
// The LazyOutput buffer does not support write methods, so the actual
// output buffer must be established before drivers are created (e.g.
// a VALUES query).
outputBuffer.setOutputBuffers(outputBuffers);
// assure the task execution is only created once
SqlTaskExecution taskExecution;
synchronized (this) {
// is task already complete?
TaskHolder taskHolder = taskHolderReference.get();
if (taskHolder.isFinished()) {
return taskHolder.getFinalTaskInfo();
}
taskExecution = taskHolder.getTaskExecution();
if (taskExecution == null) {
checkState(fragment.isPresent(), "fragment must be present");
//首次的話會(huì)新建一個(gè)task執(zhí)行器
taskExecution = sqlTaskExecutionFactory.create(session, queryContext, taskStateMachine, outputBuffer, fragment.get(), sources);
taskHolderReference.compareAndSet(taskHolder, new TaskHolder(taskExecution));
needsPlan.set(false);
}
}
if (taskExecution != null) {
taskExecution.addSources(sources);
}
}
catch (Error e) {
failed(e);
throw e;
}
catch (RuntimeException e) {
failed(e);
}
return getTaskInfo();
}
public SqlTaskExecution create(Session session, QueryContext queryContext, TaskStateMachine taskStateMachine, OutputBuffer outputBuffer, PlanFragment fragment, List<TaskSource> sources)
{
boolean verboseStats = getVerboseStats(session);
TaskContext taskContext = queryContext.addTaskContext(
taskStateMachine,
session,
verboseStats,
cpuTimerEnabled);
LocalExecutionPlan localExecutionPlan;
try (SetThreadName ignored = new SetThreadName("Task-%s", taskStateMachine.getTaskId())) {
try {
localExecutionPlan = planner.plan(
taskContext,
fragment.getRoot(),
fragment.getSymbols(),
fragment.getPartitioningScheme(),
fragment.getPipelineExecutionStrategy() == GROUPED_EXECUTION,
fragment.getPartitionedSources(),
outputBuffer);
for (DriverFactory driverFactory : localExecutionPlan.getDriverFactories()) {
Optional<PlanNodeId> sourceId = driverFactory.getSourceId();
if (sourceId.isPresent() && fragment.isPartitionedSources(sourceId.get())) {
checkArgument(fragment.getPipelineExecutionStrategy() == driverFactory.getPipelineExecutionStrategy(),
"Partitioned pipelines are expected to have the same execution strategy as the fragment");
}
else {
checkArgument(fragment.getPipelineExecutionStrategy() != UNGROUPED_EXECUTION || driverFactory.getPipelineExecutionStrategy() == UNGROUPED_EXECUTION,
"When fragment execution strategy is ungrouped, all pipelines should have ungrouped execution strategy");
}
}
}
catch (Throwable e) {
// planning failed
taskStateMachine.failed(e);
throwIfUnchecked(e);
throw new RuntimeException(e);
}
}
return createSqlTaskExecution(
taskStateMachine,
taskContext,
outputBuffer,
sources,
localExecutionPlan,
taskExecutor,
taskNotificationExecutor,
queryMonitor);
}
SqlTaskExecution類的createSqlTaskExecution方法
static SqlTaskExecution createSqlTaskExecution(
TaskStateMachine taskStateMachine,
TaskContext taskContext,
OutputBuffer outputBuffer,
List<TaskSource> sources,
LocalExecutionPlan localExecutionPlan,
TaskExecutor taskExecutor,
Executor notificationExecutor,
QueryMonitor queryMonitor)
{
SqlTaskExecution task = new SqlTaskExecution(
taskStateMachine,
taskContext,
outputBuffer,
localExecutionPlan,
taskExecutor,
queryMonitor,
notificationExecutor);
try (SetThreadName ignored = new SetThreadName("Task-%s", task.getTaskId())) {
// The scheduleDriversForTaskLifeCycle method calls enqueueDriverSplitRunner, which registers a callback with access to this object.
// The call back is accessed from another thread, so this code can not be placed in the constructor.
//tasks是一個(gè)全局緩存猖闪,根據(jù)taskId獲取已經(jīng)緩存的sqlTask鲜棠,若沒有則新建一個(gè)
SqlTask sqlTask = tasks.getUnchecked(taskId);
sqlTask.recordHeartbeat();
//執(zhí)行sqlTask,并返回執(zhí)行信息
return sqlTask.updateTask(session, fragment, sources, outputBuffers);
}
}
2.6.2Worker啟動(dòng)執(zhí)行
Worker啟動(dòng)的時(shí)候培慌,調(diào)用TaskExecutor類的start方法豁陆,其主要作用就是處理在Worker上運(yùn)行的所有Task中的Split
@PostConstruct
public synchronized void start()
{
//runnerThreads 的值通過配置參數(shù):task.max-worker-threads進(jìn)行配置的,默認(rèn)值為當(dāng)前cpu核數(shù)*4
checkState(!closed, "TaskExecutor is closed");
for (int i = 0; i < runnerThreads; i++) {
addRunnerThread();
}
splitMonitorExecutor.scheduleWithFixedDelay(this::monitorActiveSplits, 1, 1, TimeUnit.MINUTES);
}
TaskExecutor類addRunnerThread方法
private synchronized void addRunnerThread()
{
try {
//Runner是本類TaskExecutor的內(nèi)部類
executor.execute(new TaskRunner());
}
catch (RejectedExecutionException ignored) {
}
}
TaskRunner類
private class TaskRunner
implements Runnable
{
private final long runnerId = NEXT_RUNNER_ID.getAndIncrement();
@Override
public void run()
{
try (SetThreadName runnerName = new SetThreadName("SplitRunner-%s", runnerId)) {
while (!closed && !Thread.currentThread().isInterrupted()) {
// select next worker
//獲取下一個(gè)需要處理的PrioritizedSplitRunner對(duì)象吵护,PrioritizedSplitRunner是對(duì)作用于一個(gè)Split所有操作的包裝盒音,封裝了作用于一個(gè)Split上的一系列的Operator
//優(yōu)先級(jí)SplitRunner
final PrioritizedSplitRunner split;
try {
//從等待隊(duì)列中取出一個(gè)split
split = waitingSplits.take();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
String threadId = split.getTaskHandle().getTaskId() + "-" + split.getSplitId();
try (SetThreadName splitName = new SetThreadName(threadId)) {
RunningSplitInfo splitInfo = new RunningSplitInfo(ticker.read(), threadId, Thread.currentThread());
runningSplitInfos.add(splitInfo);
//將取出的split加入到runningSplit隊(duì)列,該隊(duì)列中保存了所有正在處理的split
runningSplits.add(split);
ListenableFuture<?> blocked;
try {
//調(diào)用各個(gè)Split的process()方法
blocked = split.process();
}
finally {
runningSplitInfos.remove(splitInfo);
//執(zhí)行完畢之后,需要將Split從runningSplits中移除
runningSplits.remove(split);
}
//finished表示整個(gè)split是否已經(jīng)處理完畢
if (split.isFinished()) {
log.debug("%s is finished", split.getInfo());
splitFinished(split);
}
else {
//blocked表示本次執(zhí)行是否完畢
if (blocked.isDone()) {
//如果本次執(zhí)行完畢了,split還沒有被處理完畢馅而,則繼續(xù)放到等待隊(duì)列中
waitingSplits.offer(split);
}
else {
//放到阻塞隊(duì)列中
blockedSplits.put(split, blocked);
blocked.addListener(() -> {
//一旦固定時(shí)間片執(zhí)行完畢祥诽,則從阻塞隊(duì)列中移除
blockedSplits.remove(split);
//重新設(shè)置優(yōu)先級(jí)
split.resetLevelPriority();
//重新放回到等待隊(duì)列中
waitingSplits.offer(split);
}, executor);
}
}
}
catch (Throwable t) {
// ignore random errors due to driver thread interruption
if (!split.isDestroyed()) {
if (t instanceof PrestoException) {
PrestoException e = (PrestoException) t;
log.error("Error processing %s: %s: %s", split.getInfo(), e.getErrorCode().getName(), e.getMessage());
}
else {
log.error(t, "Error processing %s", split.getInfo());
}
}
splitFinished(split);
}
}
}
finally {
//如果線程被中斷,或者TaskExecutor結(jié)束
if (!closed) {
//如果是線程被中斷瓮恭,然后TaskExecutor尚未結(jié)束雄坪,則重新啟動(dòng)一個(gè)Runner線程
addRunnerThread();
}
}
}
}
所有對(duì)split的處理均由split.process完成,此處的split是PrioritizedSplitRunner的實(shí)例
public ListenableFuture<?> process()
throws Exception
{
try {
long startNanos = ticker.read();
start.compareAndSet(0, startNanos);
lastReady.compareAndSet(0, startNanos);
processCalls.incrementAndGet();
waitNanos.getAndAdd(startNanos - lastReady.get());
CpuTimer timer = new CpuTimer();
//調(diào)用split的processFor(Duration duration)方法進(jìn)行實(shí)際的split的處理,這里的split是SplitRunner的實(shí)例屯蹦,然而SplitRunner的實(shí)例主要是DriverSplitRunner维哈,SPLIT_RUN_QUANTA值是一個(gè)時(shí)間段,默認(rèn)為一秒
ListenableFuture<?> blocked = split.processFor(SPLIT_RUN_QUANTA);
CpuTimer.CpuDuration elapsed = timer.elapsedTime();
long quantaScheduledNanos = ticker.read() - startNanos;
scheduledNanos.addAndGet(quantaScheduledNanos);
priority.set(taskHandle.addScheduledNanos(quantaScheduledNanos));
lastRun.set(ticker.read());
if (blocked == NOT_BLOCKED) {
unblockedQuantaWallTime.add(elapsed.getWall());
}
else {
blockedQuantaWallTime.add(elapsed.getWall());
}
long quantaCpuNanos = elapsed.getCpu().roundTo(NANOSECONDS);
cpuTimeNanos.addAndGet(quantaCpuNanos);
globalCpuTimeMicros.update(quantaCpuNanos / 1000);
globalScheduledTimeMicros.update(quantaScheduledNanos / 1000);
return blocked;
}
catch (Throwable e) {
finishedFuture.setException(e);
throw e;
}
}
2.6.3生成Driver
DriverSplitRunner類的processFor方法登澜,DriverSplitRunner是SqlTaskExecution類的內(nèi)部類
@Override
public ListenableFuture<?> processFor(Duration duration)
{
//driver是作用于split上的一系列的operator的封裝類,driver需要處理的Split存儲(chǔ)在屬性newSources中
Driver driver;
synchronized (this) {
//如果在執(zhí)行該方法前阔挠,DriverSplitRunner就已經(jīng)結(jié)束了,那么就沒有必要進(jìn)行后續(xù)的操作了脑蠕,直接返回一個(gè)value為null的ListenableFuture即可
if (closed) {
return Futures.immediateFuture(null);
}
//若當(dāng)前的Driver為null购撼,則需要首先根據(jù)Client指定的split創(chuàng)建一個(gè)driver,partitionedSplit是類DriverSplitRunner中的屬性谴仙,其類型為ScheduledSplit迂求,而ScheduledSplit是Split的封裝類
if (this.driver == null) {
this.driver = driverSplitRunnerFactory.createDriver(driverContext, partitionedSplit);
}
//driver是作用于split上的一系列的operator的封裝類,driver需要處理的split存儲(chǔ)在屬性newSources中
driver = this.driver;
}
return driver.processFor(duration);
}
2.6.4Driver執(zhí)行
Driver類的processFor方法
public ListenableFuture<?> processFor(Duration duration)
{
checkLockNotHeld("Can not process for a duration while holding the driver lock");
requireNonNull(duration, "duration is null");
// if the driver is blocked we don't need to continue
SettableFuture<?> blockedFuture = driverBlockedFuture.get();
if (!blockedFuture.isDone()) {
return blockedFuture;
}
//最多可以運(yùn)行時(shí)間
long maxRuntime = duration.roundTo(TimeUnit.NANOSECONDS);
//當(dāng)前線程獲得鎖晃跺,若有其他線程持有鎖锁摔,則最多等待100毫秒
Optional<ListenableFuture<?>> result = tryWithLock(100, TimeUnit.MILLISECONDS, () -> {
driverContext.startProcessTimer();
driverContext.getYieldSignal().setWithDelay(maxRuntime, driverContext.getYieldExecutor());
try {
long start = System.nanoTime();
do {
//對(duì)split的實(shí)際處理,在processInternal中
ListenableFuture<?> future = processInternal();
if (!future.isDone()) {
return updateDriverBlockedFuture(future);
}
}
while (System.nanoTime() - start < maxRuntime && !isFinishedInternal());
}
finally {
driverContext.getYieldSignal().reset();
driverContext.recordProcessed();
}
return NOT_BLOCKED;
});
return result.orElse(NOT_BLOCKED);
}
Driver類的processInternal方法
@GuardedBy("exclusiveLock")
private ListenableFuture<?> processInternal()
{
checkLockHeld("Lock must be held to call processInternal");
handleMemoryRevoke();
try {
//如果有尚未處理的讀取的split哼审,將未讀取的split加入到sourceOperator中
processNewSources();
//如果只有一個(gè)Operator則特別處理
if (operators.size() == 1) {
//如果當(dāng)前Driver已經(jīng)執(zhí)行完畢谐腰,則返回NOT_BLOCKED
if (driverContext.isDone()) {
return NOT_BLOCKED;
}
//獲取Operator
Operator current = operators.get(0);
//判斷Operator是否阻塞
Optional<ListenableFuture<?>> blocked = getBlockedFuture(current);
if (blocked.isPresent()) {
current.getOperatorContext().recordBlocked(blocked.get());
return blocked.get();
}
//若未阻塞,則直接結(jié)束當(dāng)前Operator
// there is only one operator so just finish it
current.getOperatorContext().startIntervalTimer();
current.finish();
current.getOperatorContext().recordFinish();
return NOT_BLOCKED;
}
boolean movedPage = false;
//若Operator的個(gè)數(shù)大于1涩盾,則執(zhí)行下面的循環(huán)十气,從下面的循環(huán)可以看出,每次取出相鄰的兩個(gè)Operator春霍,得到前一個(gè)Operator的輸出數(shù)據(jù)砸西,然后將該輸出數(shù)據(jù)作為后一個(gè)Operator的輸入數(shù)據(jù)
for (int i = 0; i < operators.size() - 1 && !driverContext.isDone(); i++) {
//一次取出相鄰的兩個(gè)Operator
Operator current = operators.get(i);
Operator next = operators.get(i + 1);
// skip blocked operator
if (getBlockedFuture(current).isPresent()) {
continue;
}
//如果當(dāng)前Operator沒有結(jié)束,而且下一個(gè)Operator也需要輸入
if (!current.isFinished() && !getBlockedFuture(next).isPresent() && next.needsInput()) {
//從當(dāng)前Operator中獲得OutputPage,然后將該page作為輸入,交給下一個(gè)Operator進(jìn)行操作
current.getOperatorContext().startIntervalTimer();
//Operator對(duì)Page操作的核心邏輯芹枷,不同的Operator對(duì)Page的操作處理不一樣,下面以LimitOperator為示例
Page page = current.getOutput();
current.getOperatorContext().recordGetOutput(page);
//將獲得的OutputPage交給下一個(gè)Operator進(jìn)行處理
if (page != null && page.getPositionCount() != 0) {
next.getOperatorContext().startIntervalTimer();
//Operator對(duì)Page操作的核心邏輯衅疙,不同的Operator對(duì)Page的操作處理不一樣
next.addInput(page);
next.getOperatorContext().recordAddInput(page);
//標(biāo)示,表示進(jìn)行了Page的移動(dòng)
movedPage = true;
}
if (current instanceof SourceOperator) {
movedPage = true;
}
}
//如果當(dāng)前的Operator已經(jīng)完成了鸳慈,則通知下一個(gè)Operator:不會(huì)再有輸入了饱溢,需要完成數(shù)據(jù)處理,并將結(jié)果進(jìn)行刷新
if (current.isFinished()) {
// let next operator know there will be no more data
next.getOperatorContext().startIntervalTimer();
next.finish();
next.getOperatorContext().recordFinish();
}
}
//如果所有的Operator都已經(jīng)循環(huán)完畢了走芋,但是沒有發(fā)生Page的移動(dòng)绩郎,我們需要檢查是否有Operator被block住了
if (!movedPage) {
List<Operator> blockedOperators = new ArrayList<>();
List<ListenableFuture<?>> blockedFutures = new ArrayList<>();
//循環(huán)所有的Operator,并獲得每個(gè)Operator的ListenableFuture對(duì)象翁逞,判斷:若當(dāng)前Operator已經(jīng)執(zhí)行結(jié)束肋杖,則會(huì)返回其是否在等待額外的內(nèi)存
for (Operator operator : operators) {
Optional<ListenableFuture<?>> blocked = getBlockedFuture(operator);
if (blocked.isPresent()) {
blockedOperators.add(operator);
blockedFutures.add(blocked.get());
}
}
//若確實(shí)有Operator被阻塞住了
if (!blockedFutures.isEmpty()) {
// unblock when the first future is complete
//任意一個(gè)ListenableFuture完成,就會(huì)解除當(dāng)前Driver的阻塞狀態(tài)
ListenableFuture<?> blocked = firstFinishedFuture(blockedFutures);
// driver records serial blocked time
//當(dāng)前Driver添加monitor實(shí)時(shí)監(jiān)聽是否已經(jīng)解除阻塞狀態(tài)
driverContext.recordBlocked(blocked);
// each blocked operator is responsible for blocking the execution
// until one of the operators can continue
//為每個(gè)Operator注冊(cè)監(jiān)聽器挖函,實(shí)時(shí)監(jiān)聽是否已經(jīng)解除阻塞狀態(tài)
for (Operator operator : blockedOperators) {
operator.getOperatorContext().recordBlocked(blocked);
}
return blocked;
}
}
return NOT_BLOCKED;
}
catch (Throwable t) {
List<StackTraceElement> interrupterStack = exclusiveLock.getInterrupterStack();
if (interrupterStack == null) {
driverContext.failed(t);
throw t;
}
// Driver thread was interrupted which should only happen if the task is already finished.
// If this becomes the actual cause of a failed query there is a bug in the task state machine.
Exception exception = new Exception("Interrupted By");
exception.setStackTrace(interrupterStack.stream().toArray(StackTraceElement[]::new));
PrestoException newException = new PrestoException(GENERIC_INTERNAL_ERROR, "Driver was interrupted", exception);
newException.addSuppressed(t);
driverContext.failed(newException);
throw newException;
}
}
Driver類的getBlockedFuture判斷指定的Operator是否阻塞
private Optional<ListenableFuture<?>> getBlockedFuture(Operator operator)
{
ListenableFuture<?> blocked = revokingOperators.get(operator);
if (blocked != null) {
// We mark operator as blocked regardless of blocked.isDone(), because finishMemoryRevoke has not been called yet.
return Optional.of(blocked);
}
blocked = operator.isBlocked();
if (!blocked.isDone()) {
return Optional.of(blocked);
}
blocked = operator.getOperatorContext().isWaitingForMemory();
if (!blocked.isDone()) {
return Optional.of(blocked);
}
blocked = operator.getOperatorContext().isWaitingForRevocableMemory();
if (!blocked.isDone()) {
return Optional.of(blocked);
}
return Optional.empty();
}
2.6.5Operator執(zhí)行
Operator接口的getOutput()方法和addInput()方法是Operator處理Page的核心状植,這里以LimitOperator為示例
@Override
public void addInput(Page page)
{
checkState(needsInput());
if (page.getPositionCount() <= remainingLimit) {
remainingLimit -= page.getPositionCount();
nextPage = page;
}
else {
Block[] blocks = new Block[page.getChannelCount()];
for (int channel = 0; channel < page.getChannelCount(); channel++) {
Block block = page.getBlock(channel);
blocks[channel] = block.getRegion(0, (int) remainingLimit);
}
nextPage = new Page((int) remainingLimit, blocks);
remainingLimit = 0;
}
}
@Override
public Page getOutput()
{
Page page = nextPage;
nextPage = null;
return page;
}
3.技術(shù)性改造
3.1支持Hive View
3.2自定義的Connector
3.3隱式轉(zhuǎn)化
3.4支持UDF
3.5性能調(diào)優(yōu)
未寫完善待續(xù)…
如有錯(cuò)誤請(qǐng)及時(shí)指出,共同進(jìn)步~
每天晚上更新~
如需轉(zhuǎn)載請(qǐng)附上本文鏈接怨喘,原創(chuàng)不易謝謝~