聊聊flink的CheckpointScheduler

本文主要研究一下flink的CheckpointScheduler

CheckpointCoordinatorDeActivator

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java

/**
 * This actor listens to changes in the JobStatus and activates or deactivates the periodic
 * checkpoint scheduler.
 */
public class CheckpointCoordinatorDeActivator implements JobStatusListener {

    private final CheckpointCoordinator coordinator;

    public CheckpointCoordinatorDeActivator(CheckpointCoordinator coordinator) {
        this.coordinator = checkNotNull(coordinator);
    }

    @Override
    public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
        if (newJobStatus == JobStatus.RUNNING) {
            // start the checkpoint scheduler
            coordinator.startCheckpointScheduler();
        } else {
            // anything else should stop the trigger for now
            coordinator.stopCheckpointScheduler();
        }
    }
}
  • CheckpointCoordinatorDeActivator實(shí)現(xiàn)了JobStatusListener接口,在jobStatusChanges的時(shí)候驹暑,根據(jù)狀態(tài)來(lái)調(diào)用coordinator.startCheckpointScheduler或者coordinator.stopCheckpointScheduler

CheckpointCoordinator.ScheduledTrigger

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java

/**
 * The checkpoint coordinator coordinates the distributed snapshots of operators and state.
 * It triggers the checkpoint by sending the messages to the relevant tasks and collects the
 * checkpoint acknowledgements. It also collects and maintains the overview of the state handles
 * reported by the tasks that acknowledge the checkpoint.
 */
public class CheckpointCoordinator {

    /** Map from checkpoint ID to the pending checkpoint */
    private final Map<Long, PendingCheckpoint> pendingCheckpoints;

    /** The number of consecutive failed trigger attempts */
    private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new AtomicInteger(0);

    //......

    public void startCheckpointScheduler() {
        synchronized (lock) {
            if (shutdown) {
                throw new IllegalArgumentException("Checkpoint coordinator is shut down");
            }

            // make sure all prior timers are cancelled
            stopCheckpointScheduler();

            periodicScheduling = true;
            long initialDelay = ThreadLocalRandom.current().nextLong(
                minPauseBetweenCheckpointsNanos / 1_000_000L, baseInterval + 1L);
            currentPeriodicTrigger = timer.scheduleAtFixedRate(
                    new ScheduledTrigger(), initialDelay, baseInterval, TimeUnit.MILLISECONDS);
        }
    }

    public void stopCheckpointScheduler() {
        synchronized (lock) {
            triggerRequestQueued = false;
            periodicScheduling = false;

            if (currentPeriodicTrigger != null) {
                currentPeriodicTrigger.cancel(false);
                currentPeriodicTrigger = null;
            }

            for (PendingCheckpoint p : pendingCheckpoints.values()) {
                p.abortError(new Exception("Checkpoint Coordinator is suspending."));
            }

            pendingCheckpoints.clear();
            numUnsuccessfulCheckpointsTriggers.set(0);
        }
    }

    private final class ScheduledTrigger implements Runnable {

        @Override
        public void run() {
            try {
                triggerCheckpoint(System.currentTimeMillis(), true);
            }
            catch (Exception e) {
                LOG.error("Exception while triggering checkpoint for job {}.", job, e);
            }
        }
    }

    //......
}
  • CheckpointCoordinator的startCheckpointScheduler方法首先調(diào)用stopCheckpointScheduler取消PendingCheckpoint,之后使用timer.scheduleAtFixedRate重新調(diào)度ScheduledTrigger
  • stopCheckpointScheduler會(huì)調(diào)用PendingCheckpoint.abortError來(lái)取消pendingCheckpoints唱歧,然后清空pendingCheckpoints(Map<Long, PendingCheckpoint>)以及numUnsuccessfulCheckpointsTriggers(AtomicInteger)
  • ScheduledTrigger實(shí)現(xiàn)了Runnable接口鳖擒,其run方法主要是調(diào)用triggerCheckpoint,傳遞的isPeriodic參數(shù)為true

CheckpointCoordinator.triggerCheckpoint

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java

/**
 * The checkpoint coordinator coordinates the distributed snapshots of operators and state.
 * It triggers the checkpoint by sending the messages to the relevant tasks and collects the
 * checkpoint acknowledgements. It also collects and maintains the overview of the state handles
 * reported by the tasks that acknowledge the checkpoint.
 */
public class CheckpointCoordinator {

    /** Tasks who need to be sent a message when a checkpoint is started */
    private final ExecutionVertex[] tasksToTrigger;

    /** Tasks who need to acknowledge a checkpoint before it succeeds */
    private final ExecutionVertex[] tasksToWaitFor;

    /** Map from checkpoint ID to the pending checkpoint */
    private final Map<Long, PendingCheckpoint> pendingCheckpoints;

    /** The maximum number of checkpoints that may be in progress at the same time */
    private final int maxConcurrentCheckpointAttempts;

    /** The min time(in ns) to delay after a checkpoint could be triggered. Allows to
     * enforce minimum processing time between checkpoint attempts */
    private final long minPauseBetweenCheckpointsNanos;

    /**
     * Triggers a new standard checkpoint and uses the given timestamp as the checkpoint
     * timestamp.
     *
     * @param timestamp The timestamp for the checkpoint.
     * @param isPeriodic Flag indicating whether this triggered checkpoint is
     * periodic. If this flag is true, but the periodic scheduler is disabled,
     * the checkpoint will be declined.
     * @return <code>true</code> if triggering the checkpoint succeeded.
     */
    public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) {
        return triggerCheckpoint(timestamp, checkpointProperties, null, isPeriodic).isSuccess();
    }

    @VisibleForTesting
    public CheckpointTriggerResult triggerCheckpoint(
            long timestamp,
            CheckpointProperties props,
            @Nullable String externalSavepointLocation,
            boolean isPeriodic) {

        // make some eager pre-checks
        synchronized (lock) {
            // abort if the coordinator has been shutdown in the meantime
            if (shutdown) {
                return new CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN);
            }

            // Don't allow periodic checkpoint if scheduling has been disabled
            if (isPeriodic && !periodicScheduling) {
                return new CheckpointTriggerResult(CheckpointDeclineReason.PERIODIC_SCHEDULER_SHUTDOWN);
            }

            // validate whether the checkpoint can be triggered, with respect to the limit of
            // concurrent checkpoints, and the minimum time between checkpoints.
            // these checks are not relevant for savepoints
            if (!props.forceCheckpoint()) {
                // sanity check: there should never be more than one trigger request queued
                if (triggerRequestQueued) {
                    LOG.warn("Trying to trigger another checkpoint for job {} while one was queued already.", job);
                    return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED);
                }

                // if too many checkpoints are currently in progress, we need to mark that a request is queued
                if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
                    triggerRequestQueued = true;
                    if (currentPeriodicTrigger != null) {
                        currentPeriodicTrigger.cancel(false);
                        currentPeriodicTrigger = null;
                    }
                    return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
                }

                // make sure the minimum interval between checkpoints has passed
                final long earliestNext = lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos;
                final long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000;

                if (durationTillNextMillis > 0) {
                    if (currentPeriodicTrigger != null) {
                        currentPeriodicTrigger.cancel(false);
                        currentPeriodicTrigger = null;
                    }
                    // Reassign the new trigger to the currentPeriodicTrigger
                    currentPeriodicTrigger = timer.scheduleAtFixedRate(
                            new ScheduledTrigger(),
                            durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS);

                    return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
                }
            }
        }

        // check if all tasks that we need to trigger are running.
        // if not, abort the checkpoint
        Execution[] executions = new Execution[tasksToTrigger.length];
        for (int i = 0; i < tasksToTrigger.length; i++) {
            Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
            if (ee == null) {
                LOG.info("Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.",
                        tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
                        job);
                return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
            } else if (ee.getState() == ExecutionState.RUNNING) {
                executions[i] = ee;
            } else {
                LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.",
                        tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
                        job,
                        ExecutionState.RUNNING,
                        ee.getState());
                return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
            }
        }

        // next, check if all tasks that need to acknowledge the checkpoint are running.
        // if not, abort the checkpoint
        Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(tasksToWaitFor.length);

        for (ExecutionVertex ev : tasksToWaitFor) {
            Execution ee = ev.getCurrentExecutionAttempt();
            if (ee != null) {
                ackTasks.put(ee.getAttemptId(), ev);
            } else {
                LOG.info("Checkpoint acknowledging task {} of job {} is not being executed at the moment. Aborting checkpoint.",
                        ev.getTaskNameWithSubtaskIndex(),
                        job);
                return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
            }
        }

        // we will actually trigger this checkpoint!

        // we lock with a special lock to make sure that trigger requests do not overtake each other.
        // this is not done with the coordinator-wide lock, because the 'checkpointIdCounter'
        // may issue blocking operations. Using a different lock than the coordinator-wide lock,
        // we avoid blocking the processing of 'acknowledge/decline' messages during that time.
        synchronized (triggerLock) {

            final CheckpointStorageLocation checkpointStorageLocation;
            final long checkpointID;

            try {
                // this must happen outside the coordinator-wide lock, because it communicates
                // with external services (in HA mode) and may block for a while.
                checkpointID = checkpointIdCounter.getAndIncrement();

                checkpointStorageLocation = props.isSavepoint() ?
                        checkpointStorage.initializeLocationForSavepoint(checkpointID, externalSavepointLocation) :
                        checkpointStorage.initializeLocationForCheckpoint(checkpointID);
            }
            catch (Throwable t) {
                int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
                LOG.warn("Failed to trigger checkpoint for job {} ({} consecutive failed attempts so far).",
                        job,
                        numUnsuccessful,
                        t);
                return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
            }

            final PendingCheckpoint checkpoint = new PendingCheckpoint(
                job,
                checkpointID,
                timestamp,
                ackTasks,
                props,
                checkpointStorageLocation,
                executor);

            if (statsTracker != null) {
                PendingCheckpointStats callback = statsTracker.reportPendingCheckpoint(
                    checkpointID,
                    timestamp,
                    props);

                checkpoint.setStatsCallback(callback);
            }

            // schedule the timer that will clean up the expired checkpoints
            final Runnable canceller = () -> {
                synchronized (lock) {
                    // only do the work if the checkpoint is not discarded anyways
                    // note that checkpoint completion discards the pending checkpoint object
                    if (!checkpoint.isDiscarded()) {
                        LOG.info("Checkpoint {} of job {} expired before completing.", checkpointID, job);

                        checkpoint.abortExpired();
                        pendingCheckpoints.remove(checkpointID);
                        rememberRecentCheckpointId(checkpointID);

                        triggerQueuedRequests();
                    }
                }
            };

            try {
                // re-acquire the coordinator-wide lock
                synchronized (lock) {
                    // since we released the lock in the meantime, we need to re-check
                    // that the conditions still hold.
                    if (shutdown) {
                        return new CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN);
                    }
                    else if (!props.forceCheckpoint()) {
                        if (triggerRequestQueued) {
                            LOG.warn("Trying to trigger another checkpoint for job {} while one was queued already.", job);
                            return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED);
                        }

                        if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
                            triggerRequestQueued = true;
                            if (currentPeriodicTrigger != null) {
                                currentPeriodicTrigger.cancel(false);
                                currentPeriodicTrigger = null;
                            }
                            return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
                        }

                        // make sure the minimum interval between checkpoints has passed
                        final long earliestNext = lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos;
                        final long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000;

                        if (durationTillNextMillis > 0) {
                            if (currentPeriodicTrigger != null) {
                                currentPeriodicTrigger.cancel(false);
                                currentPeriodicTrigger = null;
                            }

                            // Reassign the new trigger to the currentPeriodicTrigger
                            currentPeriodicTrigger = timer.scheduleAtFixedRate(
                                    new ScheduledTrigger(),
                                    durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS);

                            return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
                        }
                    }

                    LOG.info("Triggering checkpoint {} @ {} for job {}.", checkpointID, timestamp, job);

                    pendingCheckpoints.put(checkpointID, checkpoint);

                    ScheduledFuture<?> cancellerHandle = timer.schedule(
                            canceller,
                            checkpointTimeout, TimeUnit.MILLISECONDS);

                    if (!checkpoint.setCancellerHandle(cancellerHandle)) {
                        // checkpoint is already disposed!
                        cancellerHandle.cancel(false);
                    }

                    // trigger the master hooks for the checkpoint
                    final List<MasterState> masterStates = MasterHooks.triggerMasterHooks(masterHooks.values(),
                            checkpointID, timestamp, executor, Time.milliseconds(checkpointTimeout));
                    for (MasterState s : masterStates) {
                        checkpoint.addMasterState(s);
                    }
                }
                // end of lock scope

                final CheckpointOptions checkpointOptions = new CheckpointOptions(
                        props.getCheckpointType(),
                        checkpointStorageLocation.getLocationReference());

                // send the messages to the tasks that trigger their checkpoint
                for (Execution execution: executions) {
                    execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
                }

                numUnsuccessfulCheckpointsTriggers.set(0);
                return new CheckpointTriggerResult(checkpoint);
            }
            catch (Throwable t) {
                // guard the map against concurrent modifications
                synchronized (lock) {
                    pendingCheckpoints.remove(checkpointID);
                }

                int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
                LOG.warn("Failed to trigger checkpoint {} for job {}. ({} consecutive failed attempts so far)",
                        checkpointID, job, numUnsuccessful, t);

                if (!checkpoint.isDiscarded()) {
                    checkpoint.abortError(new Exception("Failed to trigger checkpoint", t));
                }

                try {
                    checkpointStorageLocation.disposeOnFailure();
                }
                catch (Throwable t2) {
                    LOG.warn("Cannot dispose failed checkpoint storage location {}", checkpointStorageLocation, t2);
                }

                return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
            }

        } // end trigger lock
    }

    //......
}
  • 首先判斷如果不是forceCheckpoint的話投放,則判斷當(dāng)前的pendingCheckpoints值是否超過(guò)maxConcurrentCheckpointAttempts,超過(guò)的話暴构,立刻fail fast跪呈,返回CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);之后判斷距離lastCheckpointCompletionNanos的時(shí)間是否大于等于minPauseBetweenCheckpointsNanos取逾,否則fail fast耗绿,返回CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS),確保checkpoint不被頻繁觸發(fā)
  • 之后檢查tasksToTrigger的任務(wù)(觸發(fā)checkpoint的時(shí)候需要通知到的task)是否都處于RUNNING狀態(tài)砾隅,不是的話則立刻fail fast误阻,返回CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING)
  • 之后檢查tasksToWaitFor的任務(wù)(需要在執(zhí)行成功的時(shí)候ack checkpoint的任務(wù))是否都處于RUNNING狀態(tài),不是的話立刻fail fast晴埂,返回CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING)
  • 前面幾步檢查通過(guò)了之后才開(kāi)始真正的checkpoint的觸發(fā)究反,它首先分配一個(gè)checkpointID,然后初始化checkpointStorageLocation儒洛,如果異常則返回CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION)精耐;之后創(chuàng)建PendingCheckpoint,同時(shí)準(zhǔn)備canceller(用于在失效的時(shí)候執(zhí)行abort操作)琅锻;之后對(duì)于不是forceCheckpoint的卦停,再重新來(lái)一輪TOO_MANY_CONCURRENT_CHECKPOINTS、MINIMUM_TIME_BETWEEN_CHECKPOINTS校驗(yàn)
  • 最后就是針對(duì)Execution恼蓬,挨個(gè)觸發(fā)execution的triggerCheckpoint操作惊完,成功返回CheckpointTriggerResult(checkpoint),異常則返回CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION)

Execution.triggerCheckpoint

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/executiongraph/Execution.java

public class Execution implements AccessExecution, Archiveable<ArchivedExecution>, LogicalSlot.Payload {

    /**
     * Trigger a new checkpoint on the task of this execution.
     *
     * @param checkpointId of th checkpoint to trigger
     * @param timestamp of the checkpoint to trigger
     * @param checkpointOptions of the checkpoint to trigger
     */
    public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
        final LogicalSlot slot = assignedResource;

        if (slot != null) {
            final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();

            taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions);
        } else {
            LOG.debug("The execution has no slot assigned. This indicates that the execution is " +
                "no longer running.");
        }
    }

    //......
}
  • triggerCheckpoint主要是調(diào)用taskManagerGateway.triggerCheckpoint处硬,這里的taskManagerGateway為RpcTaskManagerGateway

RpcTaskManagerGateway

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java

/**
 * Implementation of the {@link TaskManagerGateway} for Flink's RPC system.
 */
public class RpcTaskManagerGateway implements TaskManagerGateway {

    private final TaskExecutorGateway taskExecutorGateway;

    public void triggerCheckpoint(ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
        taskExecutorGateway.triggerCheckpoint(
            executionAttemptID,
            checkpointId,
            timestamp,
            checkpointOptions);
    }

    //......
}
  • RpcTaskManagerGateway的triggerCheckpoint方法調(diào)用taskExecutorGateway.triggerCheckpoint小槐,這里的taskExecutorGateway為AkkaInvocationHandler,通過(guò)rpc通知TaskExecutor

TaskExecutor.triggerCheckpoint

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskExecutor.java

/**
 * TaskExecutor implementation. The task executor is responsible for the execution of multiple
 * {@link Task}.
 */
public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {

    public CompletableFuture<Acknowledge> triggerCheckpoint(
            ExecutionAttemptID executionAttemptID,
            long checkpointId,
            long checkpointTimestamp,
            CheckpointOptions checkpointOptions) {
        log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);

        final Task task = taskSlotTable.getTask(executionAttemptID);

        if (task != null) {
            task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions);

            return CompletableFuture.completedFuture(Acknowledge.get());
        } else {
            final String message = "TaskManager received a checkpoint request for unknown task " + executionAttemptID + '.';

            log.debug(message);
            return FutureUtils.completedExceptionally(new CheckpointException(message));
        }
    }

    //......
}
  • TaskExecutor的triggerCheckpoint方法這里調(diào)用task.triggerCheckpointBarrier

Task.triggerCheckpointBarrier

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/taskmanager/Task.java

public class Task implements Runnable, TaskActions, CheckpointListener {

    /** The invokable of this task, if initialized. All accesses must copy the reference and
     * check for null, as this field is cleared as part of the disposal logic. */
    @Nullable
    private volatile AbstractInvokable invokable;

    /**
     * Calls the invokable to trigger a checkpoint.
     *
     * @param checkpointID The ID identifying the checkpoint.
     * @param checkpointTimestamp The timestamp associated with the checkpoint.
     * @param checkpointOptions Options for performing this checkpoint.
     */
    public void triggerCheckpointBarrier(
            final long checkpointID,
            long checkpointTimestamp,
            final CheckpointOptions checkpointOptions) {

        final AbstractInvokable invokable = this.invokable;
        final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);

        if (executionState == ExecutionState.RUNNING && invokable != null) {

            // build a local closure
            final String taskName = taskNameWithSubtask;
            final SafetyNetCloseableRegistry safetyNetCloseableRegistry =
                FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread();

            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    // set safety net from the task's context for checkpointing thread
                    LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());
                    FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);

                    try {
                        boolean success = invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions);
                        if (!success) {
                            checkpointResponder.declineCheckpoint(
                                    getJobID(), getExecutionId(), checkpointID,
                                    new CheckpointDeclineTaskNotReadyException(taskName));
                        }
                    }
                    catch (Throwable t) {
                        if (getExecutionState() == ExecutionState.RUNNING) {
                            failExternally(new Exception(
                                "Error while triggering checkpoint " + checkpointID + " for " +
                                    taskNameWithSubtask, t));
                        } else {
                            LOG.debug("Encountered error while triggering checkpoint {} for " +
                                "{} ({}) while being not in state running.", checkpointID,
                                taskNameWithSubtask, executionId, t);
                        }
                    } finally {
                        FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null);
                    }
                }
            };
            executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));
        }
        else {
            LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId);

            // send back a message that we did not do the checkpoint
            checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID,
                    new CheckpointDeclineTaskNotReadyException(taskNameWithSubtask));
        }
    }

    //......
}
  • Task的triggerCheckpointBarrier方法首先判斷executionState是否RUNNING以及invokable是否不為null荷辕,不滿足條件則執(zhí)行checkpointResponder.declineCheckpoint
  • 滿足條件則執(zhí)行executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId))
  • 這個(gè)runnable方法里頭會(huì)執(zhí)行invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions)凿跳,這里的invokable為SourceStreamTask

SourceStreamTask.triggerCheckpoint

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java

@Internal
public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends StreamSource<OUT, SRC>>
    extends StreamTask<OUT, OP> {

    private volatile boolean externallyInducedCheckpoints;

    @Override
    public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
        if (!externallyInducedCheckpoints) {
            return super.triggerCheckpoint(checkpointMetaData, checkpointOptions);
        }
        else {
            // we do not trigger checkpoints here, we simply state whether we can trigger them
            synchronized (getCheckpointLock()) {
                return isRunning();
            }
        }
    }

    //......
}
  • SourceStreamTask的triggerCheckpoint先判斷,如果externallyInducedCheckpoints為false疮方,則調(diào)用父類StreamTask的triggerCheckpoint

StreamTask.triggerCheckpoint

@Internal
public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
        extends AbstractInvokable
        implements AsyncExceptionHandler {

    @Override
    public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
        try {
            // No alignment if we inject a checkpoint
            CheckpointMetrics checkpointMetrics = new CheckpointMetrics()
                    .setBytesBufferedInAlignment(0L)
                    .setAlignmentDurationNanos(0L);

            return performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics);
        }
        catch (Exception e) {
            // propagate exceptions only if the task is still in "running" state
            if (isRunning) {
                throw new Exception("Could not perform checkpoint " + checkpointMetaData.getCheckpointId() +
                    " for operator " + getName() + '.', e);
            } else {
                LOG.debug("Could not perform checkpoint {} for operator {} while the " +
                    "invokable was not in state running.", checkpointMetaData.getCheckpointId(), getName(), e);
                return false;
            }
        }
    }

    private boolean performCheckpoint(
            CheckpointMetaData checkpointMetaData,
            CheckpointOptions checkpointOptions,
            CheckpointMetrics checkpointMetrics) throws Exception {

        LOG.debug("Starting checkpoint ({}) {} on task {}",
            checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());

        synchronized (lock) {
            if (isRunning) {
                // we can do a checkpoint

                // All of the following steps happen as an atomic step from the perspective of barriers and
                // records/watermarks/timers/callbacks.
                // We generally try to emit the checkpoint barrier as soon as possible to not affect downstream
                // checkpoint alignments

                // Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
                //           The pre-barrier work should be nothing or minimal in the common case.
                operatorChain.prepareSnapshotPreBarrier(checkpointMetaData.getCheckpointId());

                // Step (2): Send the checkpoint barrier downstream
                operatorChain.broadcastCheckpointBarrier(
                        checkpointMetaData.getCheckpointId(),
                        checkpointMetaData.getTimestamp(),
                        checkpointOptions);

                // Step (3): Take the state snapshot. This should be largely asynchronous, to not
                //           impact progress of the streaming topology
                checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
                return true;
            }
            else {
                // we cannot perform our checkpoint - let the downstream operators know that they
                // should not wait for any input from this operator

                // we cannot broadcast the cancellation markers on the 'operator chain', because it may not
                // yet be created
                final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
                Exception exception = null;

                for (StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> streamRecordWriter : streamRecordWriters) {
                    try {
                        streamRecordWriter.broadcastEvent(message);
                    } catch (Exception e) {
                        exception = ExceptionUtils.firstOrSuppressed(
                            new Exception("Could not send cancel checkpoint marker to downstream tasks.", e),
                            exception);
                    }
                }

                if (exception != null) {
                    throw exception;
                }

                return false;
            }
        }
    }

    private void checkpointState(
            CheckpointMetaData checkpointMetaData,
            CheckpointOptions checkpointOptions,
            CheckpointMetrics checkpointMetrics) throws Exception {

        CheckpointStreamFactory storage = checkpointStorage.resolveCheckpointStorageLocation(
                checkpointMetaData.getCheckpointId(),
                checkpointOptions.getTargetLocation());

        CheckpointingOperation checkpointingOperation = new CheckpointingOperation(
            this,
            checkpointMetaData,
            checkpointOptions,
            storage,
            checkpointMetrics);

        checkpointingOperation.executeCheckpointing();
    }

    //......
}
  • StreamTask的triggerCheckpoint方法的主要處理邏輯在performCheckpoint方法上拄显,該方法針對(duì)task的isRunning分別進(jìn)行不同處理
  • isRunning為true的時(shí)候,這里頭分了三步來(lái)處理案站,第一步執(zhí)行operatorChain.prepareSnapshotPreBarrier,第二步執(zhí)行operatorChain.broadcastCheckpointBarrier,第三步執(zhí)行checkpointState方法蟆盐,checkpointState里頭創(chuàng)建CheckpointingOperation承边,然后調(diào)用checkpointingOperation.executeCheckpointing()
  • 如果isRunning為false,則這里streamRecordWriter.broadcastEvent(message)石挂,這里的message為CancelCheckpointMarker

OperatorChain.prepareSnapshotPreBarrier

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/OperatorChain.java

@Internal
public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements StreamStatusMaintainer {

    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
        // go forward through the operator chain and tell each operator
        // to prepare the checkpoint
        final StreamOperator<?>[] operators = this.allOperators;
        for (int i = operators.length - 1; i >= 0; --i) {
            final StreamOperator<?> op = operators[i];
            if (op != null) {
                op.prepareSnapshotPreBarrier(checkpointId);
            }
        }
    }

    //......
}
  • OperatorChain的prepareSnapshotPreBarrier會(huì)遍歷allOperators挨個(gè)調(diào)用StreamOperator的prepareSnapshotPreBarrier方法

OperatorChain.broadcastCheckpointBarrier

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/OperatorChain.java

@Internal
public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements StreamStatusMaintainer {

    public void broadcastCheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) throws IOException {
        CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, checkpointOptions);
        for (RecordWriterOutput<?> streamOutput : streamOutputs) {
            streamOutput.broadcastEvent(barrier);
        }
    }

    //......
}
  • OperatorChain的broadcastCheckpointBarrier方法則會(huì)遍歷streamOutputs挨個(gè)調(diào)用streamOutput的broadcastEvent方法

CheckpointingOperation.executeCheckpointing

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/StreamTask.java

    private static final class CheckpointingOperation {

        private final StreamTask<?, ?> owner;

        private final CheckpointMetaData checkpointMetaData;
        private final CheckpointOptions checkpointOptions;
        private final CheckpointMetrics checkpointMetrics;
        private final CheckpointStreamFactory storageLocation;

        private final StreamOperator<?>[] allOperators;

        private long startSyncPartNano;
        private long startAsyncPartNano;

        // ------------------------

        private final Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress;

        public CheckpointingOperation(
                StreamTask<?, ?> owner,
                CheckpointMetaData checkpointMetaData,
                CheckpointOptions checkpointOptions,
                CheckpointStreamFactory checkpointStorageLocation,
                CheckpointMetrics checkpointMetrics) {

            this.owner = Preconditions.checkNotNull(owner);
            this.checkpointMetaData = Preconditions.checkNotNull(checkpointMetaData);
            this.checkpointOptions = Preconditions.checkNotNull(checkpointOptions);
            this.checkpointMetrics = Preconditions.checkNotNull(checkpointMetrics);
            this.storageLocation = Preconditions.checkNotNull(checkpointStorageLocation);
            this.allOperators = owner.operatorChain.getAllOperators();
            this.operatorSnapshotsInProgress = new HashMap<>(allOperators.length);
        }

        public void executeCheckpointing() throws Exception {
            startSyncPartNano = System.nanoTime();

            try {
                for (StreamOperator<?> op : allOperators) {
                    checkpointStreamOperator(op);
                }

                if (LOG.isDebugEnabled()) {
                    LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}",
                        checkpointMetaData.getCheckpointId(), owner.getName());
                }

                startAsyncPartNano = System.nanoTime();

                checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000);

                // we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submit
                AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(
                    owner,
                    operatorSnapshotsInProgress,
                    checkpointMetaData,
                    checkpointMetrics,
                    startAsyncPartNano);

                owner.cancelables.registerCloseable(asyncCheckpointRunnable);
                owner.asyncOperationsThreadPool.submit(asyncCheckpointRunnable);

                if (LOG.isDebugEnabled()) {
                    LOG.debug("{} - finished synchronous part of checkpoint {}. " +
                            "Alignment duration: {} ms, snapshot duration {} ms",
                        owner.getName(), checkpointMetaData.getCheckpointId(),
                        checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
                        checkpointMetrics.getSyncDurationMillis());
                }
            } catch (Exception ex) {
                // Cleanup to release resources
                for (OperatorSnapshotFutures operatorSnapshotResult : operatorSnapshotsInProgress.values()) {
                    if (null != operatorSnapshotResult) {
                        try {
                            operatorSnapshotResult.cancel();
                        } catch (Exception e) {
                            LOG.warn("Could not properly cancel an operator snapshot result.", e);
                        }
                    }
                }

                if (LOG.isDebugEnabled()) {
                    LOG.debug("{} - did NOT finish synchronous part of checkpoint {}. " +
                            "Alignment duration: {} ms, snapshot duration {} ms",
                        owner.getName(), checkpointMetaData.getCheckpointId(),
                        checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
                        checkpointMetrics.getSyncDurationMillis());
                }

                owner.synchronousCheckpointExceptionHandler.tryHandleCheckpointException(checkpointMetaData, ex);
            }
        }

        @SuppressWarnings("deprecation")
        private void checkpointStreamOperator(StreamOperator<?> op) throws Exception {
            if (null != op) {

                OperatorSnapshotFutures snapshotInProgress = op.snapshotState(
                        checkpointMetaData.getCheckpointId(),
                        checkpointMetaData.getTimestamp(),
                        checkpointOptions,
                        storageLocation);
                operatorSnapshotsInProgress.put(op.getOperatorID(), snapshotInProgress);
            }
        }

        private enum AsyncCheckpointState {
            RUNNING,
            DISCARDED,
            COMPLETED
        }
    }
  • CheckpointingOperation定義在StreamTask類里頭博助,executeCheckpointing方法先對(duì)所有的StreamOperator執(zhí)行checkpointStreamOperator操作,checkpointStreamOperator方法會(huì)調(diào)用StreamOperator的snapshotState方法痹愚,之后創(chuàng)建AsyncCheckpointRunnable任務(wù)并提交異步運(yùn)行

AbstractStreamOperator.snapshotState

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java

@PublicEvolving
public abstract class AbstractStreamOperator<OUT>
        implements StreamOperator<OUT>, Serializable {

    @Override
    public final OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions,
            CheckpointStreamFactory factory) throws Exception {

        KeyGroupRange keyGroupRange = null != keyedStateBackend ?
                keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;

        OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures();

        try (StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
                checkpointId,
                timestamp,
                factory,
                keyGroupRange,
                getContainingTask().getCancelables())) {

            snapshotState(snapshotContext);

            snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
            snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());

            if (null != operatorStateBackend) {
                snapshotInProgress.setOperatorStateManagedFuture(
                    operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
            }

            if (null != keyedStateBackend) {
                snapshotInProgress.setKeyedStateManagedFuture(
                    keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
            }
        } catch (Exception snapshotException) {
            try {
                snapshotInProgress.cancel();
            } catch (Exception e) {
                snapshotException.addSuppressed(e);
            }

            String snapshotFailMessage = "Could not complete snapshot " + checkpointId + " for operator " +
                getOperatorName() + ".";

            if (!getContainingTask().isCanceled()) {
                LOG.info(snapshotFailMessage, snapshotException);
            }
            throw new Exception(snapshotFailMessage, snapshotException);
        }

        return snapshotInProgress;
    }

    /**
     * Stream operators with state, which want to participate in a snapshot need to override this hook method.
     *
     * @param context context that provides information and means required for taking a snapshot
     */
    public void snapshotState(StateSnapshotContext context) throws Exception {
        final KeyedStateBackend<?> keyedStateBackend = getKeyedStateBackend();
        //TODO all of this can be removed once heap-based timers are integrated with RocksDB incremental snapshots
        if (keyedStateBackend instanceof AbstractKeyedStateBackend &&
            ((AbstractKeyedStateBackend<?>) keyedStateBackend).requiresLegacySynchronousTimerSnapshots()) {

            KeyedStateCheckpointOutputStream out;

            try {
                out = context.getRawKeyedOperatorStateOutput();
            } catch (Exception exception) {
                throw new Exception("Could not open raw keyed operator state stream for " +
                    getOperatorName() + '.', exception);
            }

            try {
                KeyGroupsList allKeyGroups = out.getKeyGroupList();
                for (int keyGroupIdx : allKeyGroups) {
                    out.startNewKeyGroup(keyGroupIdx);

                    timeServiceManager.snapshotStateForKeyGroup(
                        new DataOutputViewStreamWrapper(out), keyGroupIdx);
                }
            } catch (Exception exception) {
                throw new Exception("Could not write timer service of " + getOperatorName() +
                    " to checkpoint state stream.", exception);
            } finally {
                try {
                    out.close();
                } catch (Exception closeException) {
                    LOG.warn("Could not close raw keyed operator state stream for {}. This " +
                        "might have prevented deleting some state data.", getOperatorName(), closeException);
                }
            }
        }
    }

    //......
}
  • AbstractStreamOperator的snapshotState方法只有在keyedStateBackend是AbstractKeyedStateBackend類型富岳,而且requiresLegacySynchronousTimerSnapshots為true的條件下才會(huì)操作,具體是觸發(fā)timeServiceManager.snapshotStateForKeyGroup(new DataOutputViewStreamWrapper(out), keyGroupIdx)拯腮;不過(guò)它有不同的子類可能覆蓋了snapshotState方法窖式,比如AbstractUdfStreamOperator

AbstractUdfStreamOperator

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java

@PublicEvolving
public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
        extends AbstractStreamOperator<OUT>
        implements OutputTypeConfigurable<OUT> {

    @Override
    public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);
        StreamingFunctionUtils.snapshotFunctionState(context, getOperatorStateBackend(), userFunction);
    }

        //......
}
  • AbstractUdfStreamOperator覆蓋了父類AbstractStreamOperator的snapshotState方法,新增了StreamingFunctionUtils.snapshotFunctionState操作

StreamingFunctionUtils.snapshotFunctionState

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/util/functions/StreamingFunctionUtils.java

@Internal
public final class StreamingFunctionUtils {

    public static void snapshotFunctionState(
            StateSnapshotContext context,
            OperatorStateBackend backend,
            Function userFunction) throws Exception {

        Preconditions.checkNotNull(context);
        Preconditions.checkNotNull(backend);

        while (true) {

            if (trySnapshotFunctionState(context, backend, userFunction)) {
                break;
            }

            // inspect if the user function is wrapped, then unwrap and try again if we can snapshot the inner function
            if (userFunction instanceof WrappingFunction) {
                userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction();
            } else {
                break;
            }
        }
    }

    private static boolean trySnapshotFunctionState(
            StateSnapshotContext context,
            OperatorStateBackend backend,
            Function userFunction) throws Exception {

        if (userFunction instanceof CheckpointedFunction) {
            ((CheckpointedFunction) userFunction).snapshotState(context);

            return true;
        }

        if (userFunction instanceof ListCheckpointed) {
            @SuppressWarnings("unchecked")
            List<Serializable> partitionableState = ((ListCheckpointed<Serializable>) userFunction).
                    snapshotState(context.getCheckpointId(), context.getCheckpointTimestamp());

            ListState<Serializable> listState = backend.
                    getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);

            listState.clear();

            if (null != partitionableState) {
                try {
                    for (Serializable statePartition : partitionableState) {
                        listState.add(statePartition);
                    }
                } catch (Exception e) {
                    listState.clear();

                    throw new Exception("Could not write partitionable state to operator " +
                        "state backend.", e);
                }
            }

            return true;
        }

        return false;
    }

    //......
}
  • snapshotFunctionState方法动壤,這里執(zhí)行了trySnapshotFunctionState操作萝喘,這里userFunction的類型,如果實(shí)現(xiàn)了CheckpointedFunction接口琼懊,則調(diào)用CheckpointedFunction.snapshotState阁簸,如果實(shí)現(xiàn)了ListCheckpointed接口,則調(diào)用ListCheckpointed.snapshotState方法哼丈,注意這里先clear了ListState启妹,然后調(diào)用ListState.add方法將返回的List添加到ListState中

小結(jié)

  • flink的CheckpointCoordinatorDeActivator在job的status為RUNNING的時(shí)候會(huì)觸發(fā)CheckpointCoordinator的startCheckpointScheduler,非RUNNING的時(shí)候調(diào)用CheckpointCoordinator的stopCheckpointScheduler方法
  • CheckpointCoordinator的startCheckpointScheduler主要是注冊(cè)了ScheduledTrigger任務(wù)醉旦,其run方法執(zhí)行triggerCheckpoint操作饶米,triggerCheckpoint方法在真正觸發(fā)checkpoint之前會(huì)進(jìn)行一系列的校驗(yàn),不滿足則立刻fail fast髓抑,其中可能的原因有(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS咙崎、CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS、NOT_ALL_REQUIRED_TASKS_RUNNING)吨拍;滿足條件的話褪猛,就是挨個(gè)遍歷executions,調(diào)用Execution.triggerCheckpoint羹饰,它借助taskManagerGateway.triggerCheckpoint來(lái)通過(guò)rpc調(diào)用TaskExecutor的triggerCheckpoint方法
  • TaskExecutor的triggerCheckpoint主要是調(diào)用Task的triggerCheckpointBarrier方法伊滋,后者主要是異步執(zhí)行一個(gè)runnable,里頭的run方法是調(diào)用invokable.triggerCheckpoint队秩,這里的invokable為SourceStreamTask笑旺,而它主要是調(diào)用父類StreamTask的triggerCheckpoint方法,該方法的主要邏輯在performCheckpoint操作上馍资;performCheckpoint在isRunning為true的時(shí)候筒主,分了三步來(lái)處理,第一步執(zhí)行operatorChain.prepareSnapshotPreBarrier,第二步執(zhí)行operatorChain.broadcastCheckpointBarrier乌妙,第三步執(zhí)行checkpointState方法使兔,checkpointState里頭創(chuàng)建CheckpointingOperation,然后調(diào)用checkpointingOperation.executeCheckpointing()
  • CheckpointingOperation的executeCheckpointing方法會(huì)對(duì)所有的StreamOperator執(zhí)行checkpointStreamOperator操作藤韵,而checkpointStreamOperator方法會(huì)調(diào)用StreamOperator的snapshotState方法虐沥;AbstractStreamOperator的snapshotState方法只有在keyedStateBackend是AbstractKeyedStateBackend類型,而且requiresLegacySynchronousTimerSnapshots為true的條件下才會(huì)操作
  • AbstractUdfStreamOperator覆蓋了父類AbstractStreamOperator的snapshotState方法泽艘,新增了StreamingFunctionUtils.snapshotFunctionState操作欲险,該操作會(huì)根據(jù)userFunction的類型調(diào)用相應(yīng)的方法(如果實(shí)現(xiàn)了CheckpointedFunction接口,則調(diào)用CheckpointedFunction.snapshotState匹涮,如果實(shí)現(xiàn)了ListCheckpointed接口天试,則調(diào)用ListCheckpointed.snapshotState方法)

doc

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市焕盟,隨后出現(xiàn)的幾起案子秋秤,更是在濱河造成了極大的恐慌,老刑警劉巖脚翘,帶你破解...
    沈念sama閱讀 221,576評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件灼卢,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡来农,警方通過(guò)查閱死者的電腦和手機(jī)鞋真,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,515評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)沃于,“玉大人涩咖,你說(shuō)我怎么就攤上這事》庇ǎ” “怎么了檩互?”我有些...
    開(kāi)封第一講書(shū)人閱讀 168,017評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)咨演。 經(jīng)常有香客問(wèn)我闸昨,道長(zhǎng),這世上最難降的妖魔是什么薄风? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 59,626評(píng)論 1 296
  • 正文 為了忘掉前任饵较,我火速辦了婚禮,結(jié)果婚禮上遭赂,老公的妹妹穿的比我還像新娘循诉。我一直安慰自己,他們只是感情好撇他,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,625評(píng)論 6 397
  • 文/花漫 我一把揭開(kāi)白布茄猫。 她就那樣靜靜地躺著狈蚤,像睡著了一般。 火紅的嫁衣襯著肌膚如雪募疮。 梳的紋絲不亂的頭發(fā)上炫惩,一...
    開(kāi)封第一講書(shū)人閱讀 52,255評(píng)論 1 308
  • 那天,我揣著相機(jī)與錄音阿浓,去河邊找鬼。 笑死蹋绽,一個(gè)胖子當(dāng)著我的面吹牛芭毙,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播卸耘,決...
    沈念sama閱讀 40,825評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼退敦,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了蚣抗?” 一聲冷哼從身側(cè)響起侈百,我...
    開(kāi)封第一講書(shū)人閱讀 39,729評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎翰铡,沒(méi)想到半個(gè)月后钝域,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,271評(píng)論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡锭魔,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,363評(píng)論 3 340
  • 正文 我和宋清朗相戀三年例证,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片迷捧。...
    茶點(diǎn)故事閱讀 40,498評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡织咧,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出漠秋,到底是詐尸還是另有隱情笙蒙,我是刑警寧澤,帶...
    沈念sama閱讀 36,183評(píng)論 5 350
  • 正文 年R本政府宣布庆锦,位于F島的核電站捅位,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏肥荔。R本人自食惡果不足惜绿渣,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,867評(píng)論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望燕耿。 院中可真熱鬧中符,春花似錦、人聲如沸誉帅。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,338評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至档插,卻和暖如春慢蜓,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背郭膛。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,458評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工晨抡, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人则剃。 一個(gè)月前我還...
    沈念sama閱讀 48,906評(píng)論 3 376
  • 正文 我出身青樓耘柱,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親棍现。 傳聞我的和親對(duì)象是個(gè)殘疾皇子调煎,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,507評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容