Presto技術(shù)源碼解析總結(jié)-一個(gè)SQL的奇幻之旅 下

Presto技術(shù)總結(jié) 因?yàn)閮?nèi)容過長分為了上下兩集

2.4.5stage調(diào)度器開始調(diào)度

stage調(diào)度器主要包括以下三種

(1)Source task

  • SourcePartitionedScheduler

(2)Fixed task

  • FixedCountScheduler
  • FixedSourcePartitionedScheduler

分配策略主要包括下面兩種
(1) DynamicSplitPlacementPolicy

(2) FixedSplitPlacementPolicy

在query調(diào)度器中,調(diào)用stage的調(diào)度器

調(diào)用代碼為stageSchedulers.get(stage.getStageId()).schedule();

第一種為:SourcePartitionedScheduler

public synchronized ScheduleResult schedule()
{
    int overallSplitAssignmentCount = 0;
    ImmutableSet.Builder<RemoteTask> overallNewTasks = ImmutableSet.builder();
    List<ListenableFuture<?>> overallBlockedFutures = new ArrayList<>();
    boolean anyBlockedOnPlacements = false;
    boolean anyBlockedOnNextSplitBatch = false;
    boolean anyNotBlocked = false;

    for (Entry<Lifespan, ScheduleGroup> entry : scheduleGroups.entrySet()) {
        Lifespan lifespan = entry.getKey();
        ScheduleGroup scheduleGroup = entry.getValue();
        Set<Split> pendingSplits = scheduleGroup.pendingSplits;

        if (scheduleGroup.state != ScheduleGroupState.DISCOVERING_SPLITS) {
            verify(scheduleGroup.nextSplitBatchFuture == null);
        }
        else if (pendingSplits.isEmpty()) {
            // try to get the next batch 如果沒有等待中的split览效,則開始獲取下一批的split
            if (scheduleGroup.nextSplitBatchFuture == null) {
                scheduleGroup.nextSplitBatchFuture = splitSource.getNextBatch(scheduleGroup.partitionHandle, lifespan, splitBatchSize - pendingSplits.size());

                long start = System.nanoTime();
                Futures.addCallback(scheduleGroup.nextSplitBatchFuture, new FutureCallback<SplitBatch>()
                {
                    @Override
                    public void onSuccess(SplitBatch result)
                    {
                        stage.recordGetSplitTime(start);
                    }

                    @Override
                    public void onFailure(Throwable t)
                    {
                    }
                });
            }

            if (scheduleGroup.nextSplitBatchFuture.isDone()) {
                SplitBatch nextSplits = getFutureValue(scheduleGroup.nextSplitBatchFuture);
                scheduleGroup.nextSplitBatchFuture = null;
                pendingSplits.addAll(nextSplits.getSplits());
                if (nextSplits.isLastBatch() && scheduleGroup.state == ScheduleGroupState.DISCOVERING_SPLITS) {
                   //如果是最后一個(gè)batch都办,調(diào)度組的狀態(tài)是正在發(fā)現(xiàn)split的話匆背,則將調(diào)度組的狀態(tài)更新為沒有更多的splits
                    scheduleGroup.state = ScheduleGroupState.NO_MORE_SPLITS;
                }
            }
            else {
                overallBlockedFutures.add(scheduleGroup.nextSplitBatchFuture);
                anyBlockedOnNextSplitBatch = true;
                continue;
            }
        }

        Multimap<Node, Split> splitAssignment = ImmutableMultimap.of();
        if (!pendingSplits.isEmpty()) {
            if (!scheduleGroup.placementFuture.isDone()) {
                continue;
            }

            if (state == State.INITIALIZED) {
                state = State.SPLITS_ADDED;
            }

            // 計(jì)算分片分配的位置,根據(jù)前面生成的策略飞盆,這一步其實(shí)是將split分配到不同的node上去
            SplitPlacementResult splitPlacementResult = splitPlacementPolicy.computeAssignments(pendingSplits);
            splitAssignment = splitPlacementResult.getAssignments();

            // remove splits with successful placements
            splitAssignment.values().forEach(pendingSplits::remove); // AbstractSet.removeAll performs terribly here.
            overallSplitAssignmentCount += splitAssignment.size();

            // if not completed placed, mark scheduleGroup as blocked on placement
            if (!pendingSplits.isEmpty()) {
                scheduleGroup.placementFuture = splitPlacementResult.getBlocked();
                overallBlockedFutures.add(scheduleGroup.placementFuture);
                anyBlockedOnPlacements = true;
            }
        }

        // if no new splits will be assigned, update state and attach completion event
        Multimap<Node, Lifespan> noMoreSplitsNotification = ImmutableMultimap.of();
        if (pendingSplits.isEmpty() && scheduleGroup.state == ScheduleGroupState.NO_MORE_SPLITS) {
            scheduleGroup.state = ScheduleGroupState.DONE;
            if (!lifespan.isTaskWide()) {
                Node node = ((FixedSplitPlacementPolicy) splitPlacementPolicy).getNodeForBucket(lifespan.getId());
                noMoreSplitsNotification = ImmutableMultimap.of(node, lifespan);
            }
        }

        //將split分配到不同的node上執(zhí)行贷岸,輸入node和split放回一個(gè)RemoteTask头镊,然后執(zhí)行task
        overallNewTasks.addAll(assignSplits(splitAssignment, noMoreSplitsNotification));

        // Assert that "placement future is not done" implies "pendingSplits is not empty".
        // The other way around is not true. One obvious reason is (un)lucky timing, where the placement is unblocked between `computeAssignments` and this line.
        // However, there are other reasons that could lead to this.
        // Note that `computeAssignments` is quite broken:
        // 1. It always returns a completed future when there are no tasks, regardless of whether all nodes are blocked.
        // 2. The returned future will only be completed when a node with an assigned task becomes unblocked. Other nodes don't trigger future completion.
        // As a result, to avoid busy loops caused by 1, we check pendingSplits.isEmpty() instead of placementFuture.isDone() here.
        if (scheduleGroup.nextSplitBatchFuture == null && scheduleGroup.pendingSplits.isEmpty() && scheduleGroup.state != ScheduleGroupState.DONE) {
            anyNotBlocked = true;
        }
    }

    if (autoDropCompletedLifespans) {
        drainCompletedLifespans();
    }

    // * `splitSource.isFinished` invocation may fail after `splitSource.close` has been invoked.
    //   If state is NO_MORE_SPLITS/FINISHED, splitSource.isFinished has previously returned true, and splitSource is closed now.
    // * Even if `splitSource.isFinished()` return true, it is not necessarily safe to tear down the split source.
    //   * If anyBlockedOnNextSplitBatch is true, it means we have not checked out the recently completed nextSplitBatch futures,
    //     which may contain recently published splits. We must not ignore those.
    //   * If any scheduleGroup is still in DISCOVERING_SPLITS state, it means it hasn't realized that there will be no more splits.
    //     Next time it invokes getNextBatch, it will realize that. However, the invocation will fail we tear down splitSource now.
    if ((state == State.NO_MORE_SPLITS || state == State.FINISHED) || (scheduleGroups.isEmpty() && splitSource.isFinished())) {
        switch (state) {
            case INITIALIZED:
                // we have not scheduled a single split so far
                state = State.SPLITS_ADDED;
                ScheduleResult emptySplitScheduleResult = scheduleEmptySplit();
                overallNewTasks.addAll(emptySplitScheduleResult.getNewTasks());
                overallSplitAssignmentCount++;
                // fall through
            case SPLITS_ADDED:
                state = State.NO_MORE_SPLITS;
                splitSource.close();
                // fall through
            case NO_MORE_SPLITS:
                if (!scheduleGroups.isEmpty()) {
                    // we are blocked on split assignment
                    break;
                }
                state = State.FINISHED;
                whenFinishedOrNewLifespanAdded.set(null);
                // fall through
            case FINISHED:
                return new ScheduleResult(
                        true,
                        overallNewTasks.build(),
                        overallSplitAssignmentCount);
            default:
                throw new IllegalStateException("Unknown state");
        }
    }

    if (anyNotBlocked) {
        return new ScheduleResult(false, overallNewTasks.build(), overallSplitAssignmentCount);
    }

    // Only try to finalize task creation when scheduling would block
    overallNewTasks.addAll(finalizeTaskCreationIfNecessary());

    ScheduleResult.BlockedReason blockedReason;
    if (anyBlockedOnNextSplitBatch) {
        blockedReason = anyBlockedOnPlacements ? MIXED_SPLIT_QUEUES_FULL_AND_WAITING_FOR_SOURCE : WAITING_FOR_SOURCE;
    }
    else {
        blockedReason = anyBlockedOnPlacements ? SPLIT_QUEUES_FULL : NO_ACTIVE_DRIVER_GROUP;
    }

    overallBlockedFutures.add(whenFinishedOrNewLifespanAdded);
    return new ScheduleResult(
            false,
            overallNewTasks.build(),
            nonCancellationPropagating(whenAnyComplete(overallBlockedFutures)),
            blockedReason,
            overallSplitAssignmentCount);
}

splitPlacementPolicy.computeAssignments()方法

DynamicSplitPlacementPolicy類  動(dòng)態(tài)分配邏輯的實(shí)現(xiàn)

@Override
public SplitPlacementResult computeAssignments(Set<Split> splits)
{
    //調(diào)用了nodeSelector的計(jì)算分配的方法
    return nodeSelector.computeAssignments(splits, remoteTasks.get());
}

//前面提到過nodeSelector接口的實(shí)現(xiàn)類基本是都是TopologyAwareNodeSelector,下面是TopologyAwareNodeSelector分配split的實(shí)現(xiàn)邏輯
@Override
    public SplitPlacementResult computeAssignments(Set<Split> splits, List<RemoteTask> existingTasks)
    {
        //拿出selector里面的nodeMap
        NodeMap nodeMap = this.nodeMap.get().get();
        //創(chuàng)建分配Map
        Multimap<Node, Split> assignment = HashMultimap.create();
        NodeAssignmentStats assignmentStats = new NodeAssignmentStats(nodeTaskMap, nodeMap, existingTasks);

        int[] topologicCounters = new int[topologicalSplitCounters.size()];
        Set<NetworkLocation> filledLocations = new HashSet<>();
        Set<Node> blockedExactNodes = new HashSet<>();
        boolean splitWaitingForAnyNode = false;
        for (Split split : splits) {
            //先判斷這個(gè)split能不能遠(yuǎn)程獲取羔砾,如果不能的話负间,則取出split對(duì)應(yīng)的網(wǎng)絡(luò)地址,看能否找到對(duì)應(yīng)的node姜凄,包括Coordinator政溃,然后放到候選節(jié)點(diǎn)里面,如果找不到對(duì)應(yīng)的節(jié)點(diǎn)态秧,則拋出異常
            if (!split.isRemotelyAccessible()) {
                List<Node> candidateNodes = selectExactNodes(nodeMap, split.getAddresses(), includeCoordinator);
                if (candidateNodes.isEmpty()) {
                    log.debug("No nodes available to schedule %s. Available nodes %s", split, nodeMap.getNodesByHost().keys());
                    throw new PrestoException(NO_NODES_AVAILABLE, "No nodes available to run query");
                }
                //這個(gè)里面涉及到一個(gè)選擇策略董虱,主要是最小候選節(jié)點(diǎn)數(shù),和最大task分配的split進(jìn)行撮合對(duì)比
                Node chosenNode = bestNodeSplitCount(candidateNodes.iterator(), minCandidates, maxPendingSplitsPerTask, assignmentStats);
                //如果可以選出來
                if (chosenNode != null) {
                    //放入選擇的node和對(duì)應(yīng)的split
                    assignment.put(chosenNode, split);
                    assignmentStats.addAssignedSplit(chosenNode);
                }
                // Exact node set won't matter, if a split is waiting for any node
                else if (!splitWaitingForAnyNode) {
                    //如果根據(jù)策略找不到申鱼,則先把所有的候選節(jié)點(diǎn)放到這個(gè)set中
                    blockedExactNodes.addAll(candidateNodes);
                }
                continue;
            }
            //如果不存在遠(yuǎn)程獲取的問題愤诱,則根據(jù)下面的
            Node chosenNode = null;
            //生成網(wǎng)絡(luò)層數(shù),后面需要遞歸
            int depth = networkLocationSegmentNames.size();
            int chosenDepth = 0;
            Set<NetworkLocation> locations = new HashSet<>();
            //把這個(gè)split對(duì)應(yīng)網(wǎng)絡(luò)地址遍歷放在NetworkLocation集合中
            for (HostAddress host : split.getAddresses()) {
                locations.add(networkLocationCache.get(host));
            }
            //如果緩存里面獲取不到地址捐友,則放入root地址淫半,并將網(wǎng)絡(luò)層數(shù)置為0
            if (locations.isEmpty()) {
                // Add the root location
                locations.add(ROOT_LOCATION);
                depth = 0;
            }
            // Try each address at progressively shallower network locations
            for (int i = depth; i >= 0 && chosenNode == null; i--) {
                for (NetworkLocation location : locations) {
                    // Skip locations which are only shallower than this level
                    // For example, locations which couldn't be located will be at the "root" location
                    if (location.getSegments().size() < i) {
                        continue;
                    }
                    location = location.subLocation(0, i);
                    if (filledLocations.contains(location)) {
                        continue;
                    }
                    Set<Node> nodes = nodeMap.getWorkersByNetworkPath().get(location);
                    chosenNode = bestNodeSplitCount(new ResettableRandomizedIterator<>(nodes), minCandidates, calculateMaxPendingSplits(i, depth), assignmentStats);
                    if (chosenNode != null) {
                        chosenDepth = i;
                        break;
                    }
                    filledLocations.add(location);
                }
            }
            if (chosenNode != null) {
                //放入選擇的node和對(duì)應(yīng)的split
                assignment.put(chosenNode, split);
                assignmentStats.addAssignedSplit(chosenNode);
                topologicCounters[chosenDepth]++;
            }
            else {
                splitWaitingForAnyNode = true;
            }
        }
        for (int i = 0; i < topologicCounters.length; i++) {
            if (topologicCounters[i] > 0) {
                topologicalSplitCounters.get(i).update(topologicCounters[i]);
            }
        }

        ListenableFuture<?> blocked;
        int maxPendingForWildcardNetworkAffinity = calculateMaxPendingSplits(0, networkLocationSegmentNames.size());
        if (splitWaitingForAnyNode) {
            blocked = toWhenHasSplitQueueSpaceFuture(existingTasks, calculateLowWatermark(maxPendingForWildcardNetworkAffinity));
        }
        else {
            blocked = toWhenHasSplitQueueSpaceFuture(blockedExactNodes, existingTasks, calculateLowWatermark(maxPendingForWildcardNetworkAffinity));
        }
        return new SplitPlacementResult(blocked, assignment);
    }

SourcePartitionedScheduler類的assignSplits方法

//對(duì)傳入的參數(shù)splitAssignment進(jìn)行遍歷,對(duì)每個(gè)entry都執(zhí)行如下操作
//根據(jù)node獲取該node上的task,若task為空匣砖,則新建一個(gè)task科吭,否則將該node上的split提交給運(yùn)行在該node上的task進(jìn)行處理
private Set<RemoteTask> assignSplits(Multimap<Node, Split> splitAssignment, Multimap<Node, Lifespan> noMoreSplitsNotification)
{
    ImmutableSet.Builder<RemoteTask> newTasks = ImmutableSet.builder();

    ImmutableSet<Node> nodes = ImmutableSet.<Node>builder()
            .addAll(splitAssignment.keySet())
            .addAll(noMoreSplitsNotification.keySet())
            .build();
    for (Node node : nodes) {
        // source partitioned tasks can only receive broadcast data; otherwise it would have a different distribution
        ImmutableMultimap<PlanNodeId, Split> splits = ImmutableMultimap.<PlanNodeId, Split>builder()
                .putAll(partitionedNode, splitAssignment.get(node))
                .build();
        ImmutableMultimap.Builder<PlanNodeId, Lifespan> noMoreSplits = ImmutableMultimap.builder();
        if (noMoreSplitsNotification.containsKey(node)) {
            noMoreSplits.putAll(partitionedNode, noMoreSplitsNotification.get(node));
        }
        newTasks.addAll(stage.scheduleSplits(
                node,
                splits,
                noMoreSplits.build()));
    }
    return newTasks.build();
}

SqlStageExecution類的scheduleSplits方法

public synchronized Set<RemoteTask> scheduleSplits(Node node, Multimap<PlanNodeId, Split> splits, Multimap<PlanNodeId, Lifespan> noMoreSplitsNotification)
{
    requireNonNull(node, "node is null");
    requireNonNull(splits, "splits is null");

    splitsScheduled.set(true);

    checkArgument(stateMachine.getFragment().getPartitionedSources().containsAll(splits.keySet()), "Invalid splits");

    ImmutableSet.Builder<RemoteTask> newTasks = ImmutableSet.builder();
    Collection<RemoteTask> tasks = this.tasks.get(node);
    RemoteTask task;
    if (tasks == null) {
        // The output buffer depends on the task id starting from 0 and being sequential, since each
        // task is assigned a private buffer based on task id.
        TaskId taskId = new TaskId(stateMachine.getStageId(), nextTaskId.getAndIncrement());
        task = scheduleTask(node, taskId, splits);
        newTasks.add(task);
    }
    else {
        task = tasks.iterator().next();
        task.addSplits(splits);
    }
    if (noMoreSplitsNotification.size() > 1) {
        // The assumption that `noMoreSplitsNotification.size() <= 1` currently holds.
        // If this assumption no longer holds, we should consider calling task.noMoreSplits with multiple entries in one shot.
        // These kind of methods can be expensive since they are grabbing locks and/or sending HTTP requests on change.
        throw new UnsupportedOperationException("This assumption no longer holds: noMoreSplitsNotification.size() < 1");
    }
    for (Entry<PlanNodeId, Lifespan> entry : noMoreSplitsNotification.entries()) {
        task.noMoreSplits(entry.getKey(), entry.getValue());
    }
    return newTasks.build();

第二種為FixedSourcePartitionedScheduler

public ScheduleResult schedule()
{
    // schedule a task on every node in the distribution
    List<RemoteTask> newTasks = ImmutableList.of();
    if (!scheduledTasks) {
        newTasks = partitioning.getPartitionToNode().entrySet().stream()
                .map(entry -> stage.scheduleTask(entry.getValue(), entry.getKey()))
                .collect(toImmutableList());
        scheduledTasks = true;
    }

    boolean allBlocked = true;
    List<ListenableFuture<?>> blocked = new ArrayList<>();
    BlockedReason blockedReason = BlockedReason.NO_ACTIVE_DRIVER_GROUP;
    int splitsScheduled = 0;

    Iterator<SourcePartitionedScheduler> schedulerIterator = sourcePartitionedSchedulers.iterator();
    List<Lifespan> driverGroupsToStart = ImmutableList.of();
    while (schedulerIterator.hasNext()) {
        SourcePartitionedScheduler sourcePartitionedScheduler = schedulerIterator.next();

        for (Lifespan lifespan : driverGroupsToStart) {
            sourcePartitionedScheduler.startLifespan(lifespan, partitionHandleFor(lifespan));
        }

        ScheduleResult schedule = sourcePartitionedScheduler.schedule();
        splitsScheduled += schedule.getSplitsScheduled();
        if (schedule.getBlockedReason().isPresent()) {
            blocked.add(schedule.getBlocked());
            blockedReason = blockedReason.combineWith(schedule.getBlockedReason().get());
        }
        else {
            verify(schedule.getBlocked().isDone(), "blockedReason not provided when scheduler is blocked");
            allBlocked = false;
        }

        driverGroupsToStart = sourcePartitionedScheduler.drainCompletedLifespans();

        if (schedule.isFinished()) {
            schedulerIterator.remove();
            sourcePartitionedScheduler.close();
        }
    }

    if (allBlocked) {
        return new ScheduleResult(sourcePartitionedSchedulers.isEmpty(), newTasks, whenAnyComplete(blocked), blockedReason, splitsScheduled);
    }
    else {
        return new ScheduleResult(sourcePartitionedSchedulers.isEmpty(), newTasks, splitsScheduled);
    }
}

第三種為FixedCountScheduler

public ScheduleResult schedule()
{
    List<RemoteTask> newTasks = partitionToNode.entrySet().stream()
            .map(entry -> taskScheduler.apply(entry.getValue(), entry.getKey()))
            .collect(toImmutableList());

    return new ScheduleResult(true, newTasks, 0);
}

2.5生成RemoteTask任務(wù)

根據(jù)Presto的架構(gòu),stage調(diào)度會(huì)產(chǎn)生task任務(wù)下發(fā)到worker上執(zhí)行

SqlStageExecution類的scheduleTask方法

private synchronized RemoteTask scheduleTask(Node node, TaskId taskId, Multimap<PlanNodeId, Split> sourceSplits)
    {
        ImmutableMultimap.Builder<PlanNodeId, Split> initialSplits = ImmutableMultimap.builder();
        //搜集所有的sourceSplits猴鲫,在類型為source的stage中对人,該方法傳入?yún)?shù)sourceSplits是值的,而在fixed和single的stage中拂共,該方法的傳入?yún)?shù)sourceSplits是沒有值的
        initialSplits.putAll(sourceSplits);
        
        sourceTasks.forEach((planNodeId, task) -> {
            TaskStatus status = task.getTaskStatus();
            if (status.getState() != TaskState.FINISHED) {
                initialSplits.put(planNodeId, createRemoteSplitFor(taskId, status.getSelf()));
            }
        });
        OutputBuffers outputBuffers = this.outputBuffers.get();
        checkState(outputBuffers != null, "Initial output buffers must be set before a task can be scheduled");

        //創(chuàng)建遠(yuǎn)程的task任務(wù)
        RemoteTask task = remoteTaskFactory.createRemoteTask(
                stateMachine.getSession(),
                taskId,
                node,
                stateMachine.getFragment(),
                initialSplits.build(),
                outputBuffers,
                nodeTaskMap.createPartitionedSplitCountTracker(node, taskId),
                summarizeTaskInfo);

        completeSources.forEach(task::noMoreSplits);
        allTasks.add(taskId);
        tasks.computeIfAbsent(node, key -> newConcurrentHashSet()).add(task);
        nodeTaskMap.addTask(node, task);
        task.addStateChangeListener(new StageTaskListener());
        if (!stateMachine.getState().isDone()) {
            task.start();
        }
        else {
            task.abort();
        }
        return task;
    }

RemoteTask接口對(duì)應(yīng)實(shí)現(xiàn)類HttpRemoteTask

   public HttpRemoteTask(Session session,
            TaskId taskId,
            String nodeId,
            URI location,
            PlanFragment planFragment,
            Multimap<PlanNodeId, Split> initialSplits,
            OutputBuffers outputBuffers,
            HttpClient httpClient,
            Executor executor,
            ScheduledExecutorService updateScheduledExecutor,
            ScheduledExecutorService errorScheduledExecutor,
            Duration minErrorDuration,
            Duration maxErrorDuration,
            Duration taskStatusRefreshMaxWait,
            Duration taskInfoUpdateInterval,
            boolean summarizeTaskInfo,
            JsonCodec<TaskStatus> taskStatusCodec,
            JsonCodec<TaskInfo> taskInfoCodec,
            JsonCodec<TaskUpdateRequest> taskUpdateRequestCodec,
            PartitionedSplitCountTracker partitionedSplitCountTracker,
            RemoteTaskStats stats)
    {
        requireNonNull(session, "session is null");
        requireNonNull(taskId, "taskId is null");
        requireNonNull(nodeId, "nodeId is null");
        requireNonNull(location, "location is null");
        requireNonNull(planFragment, "planFragment is null");
        requireNonNull(outputBuffers, "outputBuffers is null");
        requireNonNull(httpClient, "httpClient is null");
        requireNonNull(executor, "executor is null");
        requireNonNull(taskStatusCodec, "taskStatusCodec is null");
        requireNonNull(taskInfoCodec, "taskInfoCodec is null");
        requireNonNull(taskUpdateRequestCodec, "taskUpdateRequestCodec is null");
        requireNonNull(partitionedSplitCountTracker, "partitionedSplitCountTracker is null");
        requireNonNull(stats, "stats is null");

        try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s", taskId)) {
            this.taskId = taskId;
            this.session = session;
            this.nodeId = nodeId;
            this.planFragment = planFragment;
            this.outputBuffers.set(outputBuffers);
            this.httpClient = httpClient;
            this.executor = executor;
            this.errorScheduledExecutor = errorScheduledExecutor;
            this.summarizeTaskInfo = summarizeTaskInfo;
            this.taskInfoCodec = taskInfoCodec;
            this.taskUpdateRequestCodec = taskUpdateRequestCodec;
            this.updateErrorTracker = new RequestErrorTracker(taskId, location, minErrorDuration, maxErrorDuration, errorScheduledExecutor, "updating task");
            this.partitionedSplitCountTracker = requireNonNull(partitionedSplitCountTracker, "partitionedSplitCountTracker is null");
            this.stats = stats;

            for (Entry<PlanNodeId, Split> entry : requireNonNull(initialSplits, "initialSplits is null").entries()) {
                ScheduledSplit scheduledSplit = new ScheduledSplit(nextSplitId.getAndIncrement(), entry.getKey(), entry.getValue());
                pendingSplits.put(entry.getKey(), scheduledSplit);
            }
            pendingSourceSplitCount = planFragment.getPartitionedSources().stream()
                    .filter(initialSplits::containsKey)
                    .mapToInt(partitionedSource -> initialSplits.get(partitionedSource).size())
                    .sum();

            List<BufferInfo> bufferStates = outputBuffers.getBuffers()
                    .keySet().stream()
                    .map(outputId -> new BufferInfo(outputId, false, 0, 0, PageBufferInfo.empty()))
                    .collect(toImmutableList());

            TaskInfo initialTask = createInitialTask(taskId, location, nodeId, bufferStates, new TaskStats(DateTime.now(), null));

            this.taskStatusFetcher = new ContinuousTaskStatusFetcher(
                    this::failTask,
                    initialTask.getTaskStatus(),
                    taskStatusRefreshMaxWait,
                    taskStatusCodec,
                    executor,
                    httpClient,
                    minErrorDuration,
                    maxErrorDuration,
                    errorScheduledExecutor,
                    stats);

            this.taskInfoFetcher = new TaskInfoFetcher(
                    this::failTask,
                    initialTask,
                    httpClient,
                    taskInfoUpdateInterval,
                    taskInfoCodec,
                    minErrorDuration,
                    maxErrorDuration,
                    summarizeTaskInfo,
                    executor,
                    updateScheduledExecutor,
                    errorScheduledExecutor,
                    stats);

            taskStatusFetcher.addStateChangeListener(newStatus -> {
                TaskState state = newStatus.getState();
                if (state.isDone()) {
                    cleanUpTask();
                }
                else {
                    partitionedSplitCountTracker.setPartitionedSplitCount(getPartitionedSplitCount());
                    updateSplitQueueSpace();
                }
            });

            long timeout = minErrorDuration.toMillis() / MIN_RETRIES;
            this.requestTimeout = new Duration(timeout + taskStatusRefreshMaxWait.toMillis(), MILLISECONDS);
            partitionedSplitCountTracker.setPartitionedSplitCount(getPartitionedSplitCount());
            updateSplitQueueSpace();
        }
    }

Task的start方法牺弄,開始輪詢對(duì)應(yīng)的task狀態(tài)

public void start()
{
    try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s", taskId)) {
        // to start we just need to trigger an update
        scheduleUpdate();

        taskStatusFetcher.start();
        taskInfoFetcher.start();
    }
}

exchange用于從上游stage中獲取數(shù)據(jù),而outputBuffer則將當(dāng)前stage的數(shù)據(jù)輸出給下游stage

2.6Task執(zhí)行

2.6.1Worker接收Task任務(wù)

前面創(chuàng)建RemoteTask后匣缘,通過http rest請(qǐng)求將task任務(wù)下放到對(duì)應(yīng)的worker上去

@Path("/v1/task")
@POST
    @Path("{taskId}")
    @Consumes(MediaType.APPLICATION_JSON)
    @Produces(MediaType.APPLICATION_JSON)
    public Response createOrUpdateTask(@PathParam("taskId") TaskId taskId, TaskUpdateRequest taskUpdateRequest, @Context UriInfo uriInfo)
    {
        requireNonNull(taskUpdateRequest, "taskUpdateRequest is null");

        Session session = taskUpdateRequest.getSession().toSession(sessionPropertyManager);
        TaskInfo taskInfo = taskManager.updateTask(session,
                taskId,
                taskUpdateRequest.getFragment(),
                taskUpdateRequest.getSources(),
                taskUpdateRequest.getOutputIds());

        if (shouldSummarize(uriInfo)) {
            taskInfo = taskInfo.summarize();
        }

        return Response.ok().entity(taskInfo).build();
    }

SqlTaskManager類的updateTask方法

@Override
public TaskInfo updateTask(Session session, TaskId taskId, Optional<PlanFragment> fragment, List<TaskSource> sources, OutputBuffers outputBuffers)
{
    requireNonNull(session, "session is null");
    requireNonNull(taskId, "taskId is null");
    requireNonNull(fragment, "fragment is null");
    requireNonNull(sources, "sources is null");
    requireNonNull(outputBuffers, "outputBuffers is null");

    if (resourceOvercommit(session)) {
        // TODO: This should have been done when the QueryContext was created. However, the session isn't available at that point.
        queryContexts.getUnchecked(taskId.getQueryId()).setResourceOvercommit();
    }

    SqlTask sqlTask = tasks.getUnchecked(taskId);
    sqlTask.recordHeartbeat();
    return sqlTask.updateTask(session, fragment, sources, outputBuffers);
}
public TaskInfo updateTask(Session session, Optional<PlanFragment> fragment, List<TaskSource> sources, OutputBuffers outputBuffers)
{
    try {
        // The LazyOutput buffer does not support write methods, so the actual
        // output buffer must be established before drivers are created (e.g.
        // a VALUES query).
        outputBuffer.setOutputBuffers(outputBuffers);

        // assure the task execution is only created once
        SqlTaskExecution taskExecution;
        synchronized (this) {
            // is task already complete?
            TaskHolder taskHolder = taskHolderReference.get();
            if (taskHolder.isFinished()) {
                return taskHolder.getFinalTaskInfo();
            }
            taskExecution = taskHolder.getTaskExecution();
            if (taskExecution == null) {
                checkState(fragment.isPresent(), "fragment must be present");
                //首次的話會(huì)新建一個(gè)task執(zhí)行器
                taskExecution = sqlTaskExecutionFactory.create(session, queryContext, taskStateMachine, outputBuffer, fragment.get(), sources);
                taskHolderReference.compareAndSet(taskHolder, new TaskHolder(taskExecution));
                needsPlan.set(false);
            }
        }

        if (taskExecution != null) {
            taskExecution.addSources(sources);
        }
    }
    catch (Error e) {
        failed(e);
        throw e;
    }
    catch (RuntimeException e) {
        failed(e);
    }

    return getTaskInfo();
}
public SqlTaskExecution create(Session session, QueryContext queryContext, TaskStateMachine taskStateMachine, OutputBuffer outputBuffer, PlanFragment fragment, List<TaskSource> sources)
{
    boolean verboseStats = getVerboseStats(session);
    TaskContext taskContext = queryContext.addTaskContext(
            taskStateMachine,
            session,
            verboseStats,
            cpuTimerEnabled);

    LocalExecutionPlan localExecutionPlan;
    try (SetThreadName ignored = new SetThreadName("Task-%s", taskStateMachine.getTaskId())) {
        try {
            localExecutionPlan = planner.plan(
                    taskContext,
                    fragment.getRoot(),
                    fragment.getSymbols(),
                    fragment.getPartitioningScheme(),
                    fragment.getPipelineExecutionStrategy() == GROUPED_EXECUTION,
                    fragment.getPartitionedSources(),
                    outputBuffer);

            for (DriverFactory driverFactory : localExecutionPlan.getDriverFactories()) {
                Optional<PlanNodeId> sourceId = driverFactory.getSourceId();
                if (sourceId.isPresent() && fragment.isPartitionedSources(sourceId.get())) {
                    checkArgument(fragment.getPipelineExecutionStrategy() == driverFactory.getPipelineExecutionStrategy(),
                            "Partitioned pipelines are expected to have the same execution strategy as the fragment");
                }
                else {
                    checkArgument(fragment.getPipelineExecutionStrategy() != UNGROUPED_EXECUTION || driverFactory.getPipelineExecutionStrategy() == UNGROUPED_EXECUTION,
                            "When fragment execution strategy is ungrouped, all pipelines should have ungrouped execution strategy");
                }
            }
        }
        catch (Throwable e) {
            // planning failed
            taskStateMachine.failed(e);
            throwIfUnchecked(e);
            throw new RuntimeException(e);
        }
    }
    return createSqlTaskExecution(
            taskStateMachine,
            taskContext,
            outputBuffer,
            sources,
            localExecutionPlan,
            taskExecutor,
            taskNotificationExecutor,
            queryMonitor);
}

SqlTaskExecution類的createSqlTaskExecution方法

static SqlTaskExecution createSqlTaskExecution(
        TaskStateMachine taskStateMachine,
        TaskContext taskContext,
        OutputBuffer outputBuffer,
        List<TaskSource> sources,
        LocalExecutionPlan localExecutionPlan,
        TaskExecutor taskExecutor,
        Executor notificationExecutor,
        QueryMonitor queryMonitor)
{
    SqlTaskExecution task = new SqlTaskExecution(
            taskStateMachine,
            taskContext,
            outputBuffer,
            localExecutionPlan,
            taskExecutor,
            queryMonitor,
            notificationExecutor);
    try (SetThreadName ignored = new SetThreadName("Task-%s", task.getTaskId())) {
        // The scheduleDriversForTaskLifeCycle method calls enqueueDriverSplitRunner, which registers a callback with access to this object.
        // The call back is accessed from another thread, so this code can not be placed in the constructor.
        //tasks是一個(gè)全局緩存猖闪,根據(jù)taskId獲取已經(jīng)緩存的sqlTask鲜棠,若沒有則新建一個(gè)
        SqlTask sqlTask = tasks.getUnchecked(taskId);
        sqlTask.recordHeartbeat();
        //執(zhí)行sqlTask,并返回執(zhí)行信息
        return sqlTask.updateTask(session, fragment, sources, outputBuffers);
    }
}

2.6.2Worker啟動(dòng)執(zhí)行

Worker啟動(dòng)的時(shí)候培慌,調(diào)用TaskExecutor類的start方法豁陆,其主要作用就是處理在Worker上運(yùn)行的所有Task中的Split

@PostConstruct
public synchronized void start()
{
    //runnerThreads 的值通過配置參數(shù):task.max-worker-threads進(jìn)行配置的,默認(rèn)值為當(dāng)前cpu核數(shù)*4
    checkState(!closed, "TaskExecutor is closed");
    for (int i = 0; i < runnerThreads; i++) {
        addRunnerThread();
    }
    splitMonitorExecutor.scheduleWithFixedDelay(this::monitorActiveSplits, 1, 1, TimeUnit.MINUTES);
}

TaskExecutor類addRunnerThread方法

private synchronized void addRunnerThread()
{
    try {
        //Runner是本類TaskExecutor的內(nèi)部類
        executor.execute(new TaskRunner());
    }
    catch (RejectedExecutionException ignored) {
    }
}

TaskRunner類

private class TaskRunner
        implements Runnable
{
    private final long runnerId = NEXT_RUNNER_ID.getAndIncrement();

    @Override
    public void run()
    {
        try (SetThreadName runnerName = new SetThreadName("SplitRunner-%s", runnerId)) {
            while (!closed && !Thread.currentThread().isInterrupted()) {
                // select next worker
                //獲取下一個(gè)需要處理的PrioritizedSplitRunner對(duì)象吵护,PrioritizedSplitRunner是對(duì)作用于一個(gè)Split所有操作的包裝盒音,封裝了作用于一個(gè)Split上的一系列的Operator
                //優(yōu)先級(jí)SplitRunner
                final PrioritizedSplitRunner split;
                try {
                    //從等待隊(duì)列中取出一個(gè)split
                    split = waitingSplits.take();
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return;
                }

                String threadId = split.getTaskHandle().getTaskId() + "-" + split.getSplitId();
                try (SetThreadName splitName = new SetThreadName(threadId)) {
                    RunningSplitInfo splitInfo = new RunningSplitInfo(ticker.read(), threadId, Thread.currentThread());
                    runningSplitInfos.add(splitInfo);

                    //將取出的split加入到runningSplit隊(duì)列,該隊(duì)列中保存了所有正在處理的split
                    runningSplits.add(split);

                    ListenableFuture<?> blocked;
                    try {
                        //調(diào)用各個(gè)Split的process()方法
                        blocked = split.process();
                    }
                    finally {
                        runningSplitInfos.remove(splitInfo);
                        //執(zhí)行完畢之后,需要將Split從runningSplits中移除
                        runningSplits.remove(split);
                    }
                    //finished表示整個(gè)split是否已經(jīng)處理完畢
                    if (split.isFinished()) {
                        log.debug("%s is finished", split.getInfo());
                        splitFinished(split);
                    }
                    else {
                        //blocked表示本次執(zhí)行是否完畢
                        if (blocked.isDone()) {
                            //如果本次執(zhí)行完畢了,split還沒有被處理完畢馅而,則繼續(xù)放到等待隊(duì)列中
                            waitingSplits.offer(split);
                        }
                        else {
                            //放到阻塞隊(duì)列中
                            blockedSplits.put(split, blocked);
                            blocked.addListener(() -> {
                                //一旦固定時(shí)間片執(zhí)行完畢祥诽,則從阻塞隊(duì)列中移除
                                blockedSplits.remove(split);
                                //重新設(shè)置優(yōu)先級(jí)
                                split.resetLevelPriority();
                                //重新放回到等待隊(duì)列中
                                waitingSplits.offer(split);
                            }, executor);
                        }
                    }
                }
                catch (Throwable t) {
                    // ignore random errors due to driver thread interruption
                    if (!split.isDestroyed()) {
                        if (t instanceof PrestoException) {
                            PrestoException e = (PrestoException) t;
                            log.error("Error processing %s: %s: %s", split.getInfo(), e.getErrorCode().getName(), e.getMessage());
                        }
                        else {
                            log.error(t, "Error processing %s", split.getInfo());
                        }
                    }
                    splitFinished(split);
                }
            }
        }
        finally {
            //如果線程被中斷,或者TaskExecutor結(jié)束
            if (!closed) {
                //如果是線程被中斷瓮恭,然后TaskExecutor尚未結(jié)束雄坪,則重新啟動(dòng)一個(gè)Runner線程
                addRunnerThread();
            }
        }
    }
}

所有對(duì)split的處理均由split.process完成,此處的split是PrioritizedSplitRunner的實(shí)例

public ListenableFuture<?> process()
        throws Exception
{
    try {
        long startNanos = ticker.read();
        start.compareAndSet(0, startNanos);
        lastReady.compareAndSet(0, startNanos);
        processCalls.incrementAndGet();

        waitNanos.getAndAdd(startNanos - lastReady.get());

        CpuTimer timer = new CpuTimer();

        //調(diào)用split的processFor(Duration duration)方法進(jìn)行實(shí)際的split的處理,這里的split是SplitRunner的實(shí)例屯蹦,然而SplitRunner的實(shí)例主要是DriverSplitRunner维哈,SPLIT_RUN_QUANTA值是一個(gè)時(shí)間段,默認(rèn)為一秒
        ListenableFuture<?> blocked = split.processFor(SPLIT_RUN_QUANTA);

        CpuTimer.CpuDuration elapsed = timer.elapsedTime();

        long quantaScheduledNanos = ticker.read() - startNanos;
        scheduledNanos.addAndGet(quantaScheduledNanos);

        priority.set(taskHandle.addScheduledNanos(quantaScheduledNanos));
        lastRun.set(ticker.read());

        if (blocked == NOT_BLOCKED) {
            unblockedQuantaWallTime.add(elapsed.getWall());
        }
        else {
            blockedQuantaWallTime.add(elapsed.getWall());
        }

        long quantaCpuNanos = elapsed.getCpu().roundTo(NANOSECONDS);
        cpuTimeNanos.addAndGet(quantaCpuNanos);

        globalCpuTimeMicros.update(quantaCpuNanos / 1000);
        globalScheduledTimeMicros.update(quantaScheduledNanos / 1000);

        return blocked;
    }
    catch (Throwable e) {
        finishedFuture.setException(e);
        throw e;
    }
}

2.6.3生成Driver

DriverSplitRunner類的processFor方法登澜,DriverSplitRunner是SqlTaskExecution類的內(nèi)部類

@Override
public ListenableFuture<?> processFor(Duration duration)
{
    //driver是作用于split上的一系列的operator的封裝類,driver需要處理的Split存儲(chǔ)在屬性newSources中
    Driver driver;
    synchronized (this) {
        //如果在執(zhí)行該方法前阔挠,DriverSplitRunner就已經(jīng)結(jié)束了,那么就沒有必要進(jìn)行后續(xù)的操作了脑蠕,直接返回一個(gè)value為null的ListenableFuture即可
        if (closed) {
            return Futures.immediateFuture(null);
        }
        //若當(dāng)前的Driver為null购撼,則需要首先根據(jù)Client指定的split創(chuàng)建一個(gè)driver,partitionedSplit是類DriverSplitRunner中的屬性谴仙,其類型為ScheduledSplit迂求,而ScheduledSplit是Split的封裝類
        if (this.driver == null) {
            this.driver = driverSplitRunnerFactory.createDriver(driverContext, partitionedSplit);
        }
        //driver是作用于split上的一系列的operator的封裝類,driver需要處理的split存儲(chǔ)在屬性newSources中
        driver = this.driver;
    }

    return driver.processFor(duration);
}

2.6.4Driver執(zhí)行

Driver類的processFor方法

public ListenableFuture<?> processFor(Duration duration)
    {
        checkLockNotHeld("Can not process for a duration while holding the driver lock");

        requireNonNull(duration, "duration is null");

        // if the driver is blocked we don't need to continue
        SettableFuture<?> blockedFuture = driverBlockedFuture.get();
        if (!blockedFuture.isDone()) {
            return blockedFuture;
        }

        //最多可以運(yùn)行時(shí)間
        long maxRuntime = duration.roundTo(TimeUnit.NANOSECONDS);

        //當(dāng)前線程獲得鎖晃跺,若有其他線程持有鎖锁摔,則最多等待100毫秒
        Optional<ListenableFuture<?>> result = tryWithLock(100, TimeUnit.MILLISECONDS, () -> {
            driverContext.startProcessTimer();
            driverContext.getYieldSignal().setWithDelay(maxRuntime, driverContext.getYieldExecutor());
            try {
                long start = System.nanoTime();
                do {
                    //對(duì)split的實(shí)際處理,在processInternal中
                    ListenableFuture<?> future = processInternal();
                    if (!future.isDone()) {
                        return updateDriverBlockedFuture(future);
                    }
                }
                while (System.nanoTime() - start < maxRuntime && !isFinishedInternal());
            }
            finally {
                driverContext.getYieldSignal().reset();
                driverContext.recordProcessed();
            }
            return NOT_BLOCKED;
        });
        return result.orElse(NOT_BLOCKED);
    }

Driver類的processInternal方法

@GuardedBy("exclusiveLock")
private ListenableFuture<?> processInternal()
{
    checkLockHeld("Lock must be held to call processInternal");

    handleMemoryRevoke();

    try {
        //如果有尚未處理的讀取的split哼审,將未讀取的split加入到sourceOperator中
        processNewSources();

        //如果只有一個(gè)Operator則特別處理
        if (operators.size() == 1) {
            //如果當(dāng)前Driver已經(jīng)執(zhí)行完畢谐腰,則返回NOT_BLOCKED
            if (driverContext.isDone()) {
                return NOT_BLOCKED;
            }

            //獲取Operator
            Operator current = operators.get(0);
            //判斷Operator是否阻塞
            Optional<ListenableFuture<?>> blocked = getBlockedFuture(current);
            if (blocked.isPresent()) {
                current.getOperatorContext().recordBlocked(blocked.get());
                return blocked.get();
            }

            //若未阻塞,則直接結(jié)束當(dāng)前Operator
            // there is only one operator so just finish it
            current.getOperatorContext().startIntervalTimer();
            current.finish();
            current.getOperatorContext().recordFinish();
            return NOT_BLOCKED;
        }

        boolean movedPage = false;
        //若Operator的個(gè)數(shù)大于1涩盾,則執(zhí)行下面的循環(huán)十气,從下面的循環(huán)可以看出,每次取出相鄰的兩個(gè)Operator春霍,得到前一個(gè)Operator的輸出數(shù)據(jù)砸西,然后將該輸出數(shù)據(jù)作為后一個(gè)Operator的輸入數(shù)據(jù)
        for (int i = 0; i < operators.size() - 1 && !driverContext.isDone(); i++) {
            //一次取出相鄰的兩個(gè)Operator
            Operator current = operators.get(i);
            Operator next = operators.get(i + 1);

            // skip blocked operator
            if (getBlockedFuture(current).isPresent()) {
                continue;
            }

            //如果當(dāng)前Operator沒有結(jié)束,而且下一個(gè)Operator也需要輸入
            if (!current.isFinished() && !getBlockedFuture(next).isPresent() && next.needsInput()) {
                //從當(dāng)前Operator中獲得OutputPage,然后將該page作為輸入,交給下一個(gè)Operator進(jìn)行操作
                current.getOperatorContext().startIntervalTimer();
                //Operator對(duì)Page操作的核心邏輯芹枷,不同的Operator對(duì)Page的操作處理不一樣,下面以LimitOperator為示例
                Page page = current.getOutput();
                current.getOperatorContext().recordGetOutput(page);

                //將獲得的OutputPage交給下一個(gè)Operator進(jìn)行處理
                if (page != null && page.getPositionCount() != 0) {
                    next.getOperatorContext().startIntervalTimer();
                    //Operator對(duì)Page操作的核心邏輯衅疙,不同的Operator對(duì)Page的操作處理不一樣
                    next.addInput(page);
                    next.getOperatorContext().recordAddInput(page);
                    //標(biāo)示,表示進(jìn)行了Page的移動(dòng)
                    movedPage = true;
                }

                if (current instanceof SourceOperator) {
                    movedPage = true;
                }
            }

            //如果當(dāng)前的Operator已經(jīng)完成了鸳慈,則通知下一個(gè)Operator:不會(huì)再有輸入了饱溢,需要完成數(shù)據(jù)處理,并將結(jié)果進(jìn)行刷新
            if (current.isFinished()) {
                // let next operator know there will be no more data
                next.getOperatorContext().startIntervalTimer();
                next.finish();
                next.getOperatorContext().recordFinish();
            }
        }

        //如果所有的Operator都已經(jīng)循環(huán)完畢了走芋,但是沒有發(fā)生Page的移動(dòng)绩郎,我們需要檢查是否有Operator被block住了
        if (!movedPage) {
            List<Operator> blockedOperators = new ArrayList<>();
            List<ListenableFuture<?>> blockedFutures = new ArrayList<>();
            //循環(huán)所有的Operator,并獲得每個(gè)Operator的ListenableFuture對(duì)象翁逞,判斷:若當(dāng)前Operator已經(jīng)執(zhí)行結(jié)束肋杖,則會(huì)返回其是否在等待額外的內(nèi)存
            for (Operator operator : operators) {
                Optional<ListenableFuture<?>> blocked = getBlockedFuture(operator);
                if (blocked.isPresent()) {
                    blockedOperators.add(operator);
                    blockedFutures.add(blocked.get());
                }
            }

            //若確實(shí)有Operator被阻塞住了
            if (!blockedFutures.isEmpty()) {
                // unblock when the first future is complete
                //任意一個(gè)ListenableFuture完成,就會(huì)解除當(dāng)前Driver的阻塞狀態(tài)
                ListenableFuture<?> blocked = firstFinishedFuture(blockedFutures);
                // driver records serial blocked time
                //當(dāng)前Driver添加monitor實(shí)時(shí)監(jiān)聽是否已經(jīng)解除阻塞狀態(tài)
                driverContext.recordBlocked(blocked);
                // each blocked operator is responsible for blocking the execution
                // until one of the operators can continue
                //為每個(gè)Operator注冊(cè)監(jiān)聽器挖函,實(shí)時(shí)監(jiān)聽是否已經(jīng)解除阻塞狀態(tài)
                for (Operator operator : blockedOperators) {
                    operator.getOperatorContext().recordBlocked(blocked);
                }
                return blocked;
            }
        }

        return NOT_BLOCKED;
    }
    catch (Throwable t) {
        List<StackTraceElement> interrupterStack = exclusiveLock.getInterrupterStack();
        if (interrupterStack == null) {
            driverContext.failed(t);
            throw t;
        }

        // Driver thread was interrupted which should only happen if the task is already finished.
        // If this becomes the actual cause of a failed query there is a bug in the task state machine.
        Exception exception = new Exception("Interrupted By");
        exception.setStackTrace(interrupterStack.stream().toArray(StackTraceElement[]::new));
        PrestoException newException = new PrestoException(GENERIC_INTERNAL_ERROR, "Driver was interrupted", exception);
        newException.addSuppressed(t);
        driverContext.failed(newException);
        throw newException;
    }
}

Driver類的getBlockedFuture判斷指定的Operator是否阻塞

private Optional<ListenableFuture<?>> getBlockedFuture(Operator operator)
{
    ListenableFuture<?> blocked = revokingOperators.get(operator);
    if (blocked != null) {
        // We mark operator as blocked regardless of blocked.isDone(), because finishMemoryRevoke has not been called yet.
        return Optional.of(blocked);
    }
    blocked = operator.isBlocked();
    if (!blocked.isDone()) {
        return Optional.of(blocked);
    }
    blocked = operator.getOperatorContext().isWaitingForMemory();
    if (!blocked.isDone()) {
        return Optional.of(blocked);
    }
    blocked = operator.getOperatorContext().isWaitingForRevocableMemory();
    if (!blocked.isDone()) {
        return Optional.of(blocked);
    }
    return Optional.empty();
}

2.6.5Operator執(zhí)行

Operator接口的getOutput()方法和addInput()方法是Operator處理Page的核心状植,這里以LimitOperator為示例

@Override
public void addInput(Page page)
{
    checkState(needsInput());

    if (page.getPositionCount() <= remainingLimit) {
        remainingLimit -= page.getPositionCount();
        nextPage = page;
    }
    else {
        Block[] blocks = new Block[page.getChannelCount()];
        for (int channel = 0; channel < page.getChannelCount(); channel++) {
            Block block = page.getBlock(channel);
            blocks[channel] = block.getRegion(0, (int) remainingLimit);
        }
        nextPage = new Page((int) remainingLimit, blocks);
        remainingLimit = 0;
    }
}

@Override
public Page getOutput()
{
    Page page = nextPage;
    nextPage = null;
    return page;
}

3.技術(shù)性改造

3.1支持Hive View

3.2自定義的Connector

3.3隱式轉(zhuǎn)化

3.4支持UDF

3.5性能調(diào)優(yōu)

未寫完善待續(xù)…

如有錯(cuò)誤請(qǐng)及時(shí)指出,共同進(jìn)步~

每天晚上更新~

如需轉(zhuǎn)載請(qǐng)附上本文鏈接怨喘,原創(chuàng)不易謝謝~

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末浅萧,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子哲思,更是在濱河造成了極大的恐慌,老刑警劉巖吩案,帶你破解...
    沈念sama閱讀 221,406評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件棚赔,死亡現(xiàn)場離奇詭異,居然都是意外死亡徘郭,警方通過查閱死者的電腦和手機(jī)靠益,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,395評(píng)論 3 398
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來残揉,“玉大人胧后,你說我怎么就攤上這事”Щ罚” “怎么了壳快?”我有些...
    開封第一講書人閱讀 167,815評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長镇草。 經(jīng)常有香客問我眶痰,道長,這世上最難降的妖魔是什么梯啤? 我笑而不...
    開封第一講書人閱讀 59,537評(píng)論 1 296
  • 正文 為了忘掉前任竖伯,我火速辦了婚禮,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘七婴。我一直安慰自己祟偷,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,536評(píng)論 6 397
  • 文/花漫 我一把揭開白布打厘。 她就那樣靜靜地躺著修肠,像睡著了一般。 火紅的嫁衣襯著肌膚如雪婚惫。 梳的紋絲不亂的頭發(fā)上氛赐,一...
    開封第一講書人閱讀 52,184評(píng)論 1 308
  • 那天,我揣著相機(jī)與錄音先舷,去河邊找鬼艰管。 笑死,一個(gè)胖子當(dāng)著我的面吹牛蒋川,可吹牛的內(nèi)容都是我干的牲芋。 我是一名探鬼主播,決...
    沈念sama閱讀 40,776評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼捺球,長吁一口氣:“原來是場噩夢(mèng)啊……” “哼缸浦!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起氮兵,我...
    開封第一講書人閱讀 39,668評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤裂逐,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后泣栈,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體卜高,經(jīng)...
    沈念sama閱讀 46,212評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,299評(píng)論 3 340
  • 正文 我和宋清朗相戀三年南片,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了掺涛。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,438評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡疼进,死狀恐怖薪缆,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情伞广,我是刑警寧澤拣帽,帶...
    沈念sama閱讀 36,128評(píng)論 5 349
  • 正文 年R本政府宣布,位于F島的核電站嚼锄,受9級(jí)特大地震影響诞外,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜灾票,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,807評(píng)論 3 333
  • 文/蒙蒙 一峡谊、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧,春花似錦既们、人聲如沸濒析。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,279評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽号杏。三九已至,卻和暖如春斯棒,著一層夾襖步出監(jiān)牢的瞬間盾致,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,395評(píng)論 1 272
  • 我被黑心中介騙來泰國打工荣暮, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留庭惜,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,827評(píng)論 3 376
  • 正文 我出身青樓穗酥,卻偏偏與公主長得像护赊,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子砾跃,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,446評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容