聊聊flink的SourceFunction

本文主要研究一下flink的SourceFunction

實例

        // set up the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> dataStreamSource = env.addSource(new RandomWordSource());

        dataStreamSource.map(new UpperCaseMapFunc()).print();

        env.execute("sourceFunctionDemo");
  • 這里通過addSource方法來添加自定義的SourceFunction

SourceFunction

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/functions/source/SourceFunction.java

/**
 * Base interface for all stream data sources in Flink. The contract of a stream source
 * is the following: When the source should start emitting elements, the {@link #run} method
 * is called with a {@link SourceContext} that can be used for emitting elements.
 * The run method can run for as long as necessary. The source must, however, react to an
 * invocation of {@link #cancel()} by breaking out of its main loop.
 *
 * <h3>CheckpointedFunction Sources</h3>
 *
 * <p>Sources that also implement the {@link org.apache.flink.streaming.api.checkpoint.CheckpointedFunction}
 * interface must ensure that state checkpointing, updating of internal state and emission of
 * elements are not done concurrently. This is achieved by using the provided checkpointing lock
 * object to protect update of state and emission of elements in a synchronized block.
 *
 * <p>This is the basic pattern one should follow when implementing a checkpointed source:
 *
 * <pre>{@code
 *  public class ExampleCountSource implements SourceFunction<Long>, CheckpointedFunction {
 *      private long count = 0L;
 *      private volatile boolean isRunning = true;
 *
 *      private transient ListState<Long> checkpointedCount;
 *
 *      public void run(SourceContext<T> ctx) {
 *          while (isRunning && count < 1000) {
 *              // this synchronized block ensures that state checkpointing,
 *              // internal state updates and emission of elements are an atomic operation
 *              synchronized (ctx.getCheckpointLock()) {
 *                  ctx.collect(count);
 *                  count++;
 *              }
 *          }
 *      }
 *
 *      public void cancel() {
 *          isRunning = false;
 *      }
 *
 *      public void initializeState(FunctionInitializationContext context) {
 *          this.checkpointedCount = context
 *              .getOperatorStateStore()
 *              .getListState(new ListStateDescriptor<>("count", Long.class));
 *
 *          if (context.isRestored()) {
 *              for (Long count : this.checkpointedCount.get()) {
 *                  this.count = count;
 *              }
 *          }
 *      }
 *
 *      public void snapshotState(FunctionSnapshotContext context) {
 *          this.checkpointedCount.clear();
 *          this.checkpointedCount.add(count);
 *      }
 * }
 * }</pre>
 *
 *
 * <h3>Timestamps and watermarks:</h3>
 * Sources may assign timestamps to elements and may manually emit watermarks.
 * However, these are only interpreted if the streaming program runs on
 * {@link TimeCharacteristic#EventTime}. On other time characteristics
 * ({@link TimeCharacteristic#IngestionTime} and {@link TimeCharacteristic#ProcessingTime}),
 * the watermarks from the source function are ignored.
 *
 * <h3>Gracefully Stopping Functions</h3>
 * Functions may additionally implement the {@link org.apache.flink.api.common.functions.StoppableFunction}
 * interface. "Stopping" a function, in contrast to "canceling" means a graceful exit that leaves the
 * state and the emitted elements in a consistent state.
 *
 * <p>When a source is stopped, the executing thread is not interrupted, but expected to leave the
 * {@link #run(SourceContext)} method in reasonable time on its own, preserving the atomicity
 * of state updates and element emission.
 *
 * @param <T> The type of the elements produced by this source.
 *
 * @see org.apache.flink.api.common.functions.StoppableFunction
 * @see org.apache.flink.streaming.api.TimeCharacteristic
 */
@Public
public interface SourceFunction<T> extends Function, Serializable {

    /**
     * Starts the source. Implementations can use the {@link SourceContext} emit
     * elements.
     *
     * <p>Sources that implement {@link org.apache.flink.streaming.api.checkpoint.CheckpointedFunction}
     * must lock on the checkpoint lock (using a synchronized block) before updating internal
     * state and emitting elements, to make both an atomic operation:
     *
     * <pre>{@code
     *  public class ExampleCountSource implements SourceFunction<Long>, CheckpointedFunction {
     *      private long count = 0L;
     *      private volatile boolean isRunning = true;
     *
     *      private transient ListState<Long> checkpointedCount;
     *
     *      public void run(SourceContext<T> ctx) {
     *          while (isRunning && count < 1000) {
     *              // this synchronized block ensures that state checkpointing,
     *              // internal state updates and emission of elements are an atomic operation
     *              synchronized (ctx.getCheckpointLock()) {
     *                  ctx.collect(count);
     *                  count++;
     *              }
     *          }
     *      }
     *
     *      public void cancel() {
     *          isRunning = false;
     *      }
     *
     *      public void initializeState(FunctionInitializationContext context) {
     *          this.checkpointedCount = context
     *              .getOperatorStateStore()
     *              .getListState(new ListStateDescriptor<>("count", Long.class));
     *
     *          if (context.isRestored()) {
     *              for (Long count : this.checkpointedCount.get()) {
     *                  this.count = count;
     *              }
     *          }
     *      }
     *
     *      public void snapshotState(FunctionSnapshotContext context) {
     *          this.checkpointedCount.clear();
     *          this.checkpointedCount.add(count);
     *      }
     * }
     * }</pre>
     *
     * @param ctx The context to emit elements to and for accessing locks.
     */
    void run(SourceContext<T> ctx) throws Exception;

    /**
     * Cancels the source. Most sources will have a while loop inside the
     * {@link #run(SourceContext)} method. The implementation needs to ensure that the
     * source will break out of that loop after this method is called.
     *
     * <p>A typical pattern is to have an {@code "volatile boolean isRunning"} flag that is set to
     * {@code false} in this method. That flag is checked in the loop condition.
     *
     * <p>When a source is canceled, the executing thread will also be interrupted
     * (via {@link Thread#interrupt()}). The interruption happens strictly after this
     * method has been called, so any interruption handler can rely on the fact that
     * this method has completed. It is good practice to make any flags altered by
     * this method "volatile", in order to guarantee the visibility of the effects of
     * this method to any interruption handler.
     */
    void cancel();

    // ------------------------------------------------------------------------
    //  source context
    // ------------------------------------------------------------------------

    /**
     * Interface that source functions use to emit elements, and possibly watermarks.
     *
     * @param <T> The type of the elements produced by the source.
     */
    @Public // Interface might be extended in the future with additional methods.
    interface SourceContext<T> {

        //......
    }
}
  • SourceFunction是flink stream data sources的基本接口胡岔,這里頭定義了run方法以及cancel方法阐肤,同時定義了SourceContext接口

SourceContext

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/functions/source/SourceFunction.java

    /**
     * Interface that source functions use to emit elements, and possibly watermarks.
     *
     * @param <T> The type of the elements produced by the source.
     */
    @Public // Interface might be extended in the future with additional methods.
    interface SourceContext<T> {

        /**
         * Emits one element from the source, without attaching a timestamp. In most cases,
         * this is the default way of emitting elements.
         *
         * <p>The timestamp that the element will get assigned depends on the time characteristic of
         * the streaming program:
         * <ul>
         *     <li>On {@link TimeCharacteristic#ProcessingTime}, the element has no timestamp.</li>
         *     <li>On {@link TimeCharacteristic#IngestionTime}, the element gets the system's
         *         current time as the timestamp.</li>
         *     <li>On {@link TimeCharacteristic#EventTime}, the element will have no timestamp initially.
         *         It needs to get a timestamp (via a {@link TimestampAssigner}) before any time-dependent
         *         operation (like time windows).</li>
         * </ul>
         *
         * @param element The element to emit
         */
        void collect(T element);

        /**
         * Emits one element from the source, and attaches the given timestamp. This method
         * is relevant for programs using {@link TimeCharacteristic#EventTime}, where the
         * sources assign timestamps themselves, rather than relying on a {@link TimestampAssigner}
         * on the stream.
         *
         * <p>On certain time characteristics, this timestamp may be ignored or overwritten.
         * This allows programs to switch between the different time characteristics and behaviors
         * without changing the code of the source functions.
         * <ul>
         *     <li>On {@link TimeCharacteristic#ProcessingTime}, the timestamp will be ignored,
         *         because processing time never works with element timestamps.</li>
         *     <li>On {@link TimeCharacteristic#IngestionTime}, the timestamp is overwritten with the
         *         system's current time, to realize proper ingestion time semantics.</li>
         *     <li>On {@link TimeCharacteristic#EventTime}, the timestamp will be used.</li>
         * </ul>
         *
         * @param element The element to emit
         * @param timestamp The timestamp in milliseconds since the Epoch
         */
        @PublicEvolving
        void collectWithTimestamp(T element, long timestamp);

        /**
         * Emits the given {@link Watermark}. A Watermark of value {@code t} declares that no
         * elements with a timestamp {@code t' <= t} will occur any more. If further such
         * elements will be emitted, those elements are considered <i>late</i>.
         *
         * <p>This method is only relevant when running on {@link TimeCharacteristic#EventTime}.
         * On {@link TimeCharacteristic#ProcessingTime},Watermarks will be ignored. On
         * {@link TimeCharacteristic#IngestionTime}, the Watermarks will be replaced by the
         * automatic ingestion time watermarks.
         *
         * @param mark The Watermark to emit
         */
        @PublicEvolving
        void emitWatermark(Watermark mark);

        /**
         * Marks the source to be temporarily idle. This tells the system that this source will
         * temporarily stop emitting records and watermarks for an indefinite amount of time. This
         * is only relevant when running on {@link TimeCharacteristic#IngestionTime} and
         * {@link TimeCharacteristic#EventTime}, allowing downstream tasks to advance their
         * watermarks without the need to wait for watermarks from this source while it is idle.
         *
         * <p>Source functions should make a best effort to call this method as soon as they
         * acknowledge themselves to be idle. The system will consider the source to resume activity
         * again once {@link SourceContext#collect(T)}, {@link SourceContext#collectWithTimestamp(T, long)},
         * or {@link SourceContext#emitWatermark(Watermark)} is called to emit elements or watermarks from the source.
         */
        @PublicEvolving
        void markAsTemporarilyIdle();

        /**
         * Returns the checkpoint lock. Please refer to the class-level comment in
         * {@link SourceFunction} for details about how to write a consistent checkpointed
         * source.
         *
         * @return The object to use as the lock
         */
        Object getCheckpointLock();

        /**
         * This method is called by the system to shut down the context.
         */
        void close();
    }
  • SourceContext主要定義了數(shù)據(jù)源發(fā)射數(shù)據(jù)的接口,這里是collect方法(如果數(shù)據(jù)本身沒有時間丙躏,則在使用TimeCharacteristic.EventTime的時候谤饭,可以使用TimestampAssigner在進行依賴時間的相關(guān)操作時指定timestamp止毕;如果是配合TimeCharacteristic.IngestionTime套才,則無需指定,系統(tǒng)會自動生成timestamp)辱揭;除了collect方法外离唐,還有collectWithTimestamp發(fā)射數(shù)據(jù)同時指定timestamp(配合TimeCharacteristic.EventTime使用)
  • 此外還定義了emitWatermark方法,用于處理數(shù)據(jù)亂序時界阁,只考慮哪些時間范圍內(nèi)的數(shù)據(jù)侯繁,這個只有在配合TimeCharacteristic.EventTime的時候才有效;如果是TimeCharacteristic.ProcessingTime則watermark會被忽略泡躯;如果是TimeCharacteristic.IngestionTime則watermark會被自動生成的ingestion time watermarks替代
  • 這里還定義了markAsTemporarilyIdle方法贮竟,用于告訴系統(tǒng)當(dāng)前的source會暫停發(fā)射數(shù)據(jù)一段時間,這個只在配合使用TimeCharacteristic.IngestionTime或者TimeCharacteristic.EventTime的時候才有效较剃;當(dāng)SourceContext.collect(T)或者SourceContext.collectWithTimestamp(T, long)或者SourceContext.emitWatermark(Watermark)被調(diào)用時咕别,系統(tǒng)會認為source又恢復(fù)回來繼續(xù)生產(chǎn)數(shù)據(jù)
  • 這里還定義了getCheckpointLock方法,用于返回checkpoint的lock写穴,方便source處理checkpoint相關(guān)的邏輯
  • close方法主要給系統(tǒng)來調(diào)用惰拱,用于關(guān)閉context相關(guān)的資源

Task.run(上游)

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskmanager/Task.java

/**
 * The Task represents one execution of a parallel subtask on a TaskManager.
 * A Task wraps a Flink operator (which may be a user function) and
 * runs it, providing all services necessary for example to consume input data,
 * produce its results (intermediate result partitions) and communicate
 * with the JobManager.
 *
 * <p>The Flink operators (implemented as subclasses of
 * {@link AbstractInvokable} have only data readers, -writers, and certain event callbacks.
 * The task connects those to the network stack and actor messages, and tracks the state
 * of the execution and handles exceptions.
 *
 * <p>Tasks have no knowledge about how they relate to other tasks, or whether they
 * are the first attempt to execute the task, or a repeated attempt. All of that
 * is only known to the JobManager. All the task knows are its own runnable code,
 * the task's configuration, and the IDs of the intermediate results to consume and
 * produce (if any).
 *
 * <p>Each Task is run by one dedicated thread.
 */
public class Task implements Runnable, TaskActions, CheckpointListener {
    //......

    /**
     * The core work method that bootstraps the task and executes its code.
     */
    @Override
    public void run() {
            //......
            // now load and instantiate the task's invokable code
            invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);

            // ----------------------------------------------------------------
            //  actual task core work
            // ----------------------------------------------------------------

            // we must make strictly sure that the invokable is accessible to the cancel() call
            // by the time we switched to running.
            this.invokable = invokable;

            // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
            if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
                throw new CancelTaskException();
            }

            // notify everyone that we switched to running
            notifyObservers(ExecutionState.RUNNING, null);
            taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));

            // make sure the user code classloader is accessible thread-locally
            executingThread.setContextClassLoader(userCodeClassLoader);

            // run the invokable
            invokable.invoke();

            //......
    }
}
  • Task的run方法會調(diào)用invokable.invoke(),這里的invokable為StreamTask

StreamTask.invoke

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/StreamTask.java

/**
 * Base class for all streaming tasks. A task is the unit of local processing that is deployed
 * and executed by the TaskManagers. Each task runs one or more {@link StreamOperator}s which form
 * the Task's operator chain. Operators that are chained together execute synchronously in the
 * same thread and hence on the same stream partition. A common case for these chains
 * are successive map/flatmap/filter tasks.
 *
 * <p>The task chain contains one "head" operator and multiple chained operators.
 * The StreamTask is specialized for the type of the head operator: one-input and two-input tasks,
 * as well as for sources, iteration heads and iteration tails.
 *
 * <p>The Task class deals with the setup of the streams read by the head operator, and the streams
 * produced by the operators at the ends of the operator chain. Note that the chain may fork and
 * thus have multiple ends.
 *
 * <p>The life cycle of the task is set up as follows:
 * <pre>{@code
 *  -- setInitialState -> provides state of all operators in the chain
 *
 *  -- invoke()
 *        |
 *        +----> Create basic utils (config, etc) and load the chain of operators
 *        +----> operators.setup()
 *        +----> task specific init()
 *        +----> initialize-operator-states()
 *        +----> open-operators()
 *        +----> run()
 *        +----> close-operators()
 *        +----> dispose-operators()
 *        +----> common cleanup
 *        +----> task specific cleanup()
 * }</pre>
 *
 * <p>The {@code StreamTask} has a lock object called {@code lock}. All calls to methods on a
 * {@code StreamOperator} must be synchronized on this lock object to ensure that no methods
 * are called concurrently.
 *
 * @param <OUT>
 * @param <OP>
 */
@Internal
public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
        extends AbstractInvokable
        implements AsyncExceptionHandler {

        //......

    @Override
    public final void invoke() throws Exception {

        boolean disposed = false;
        try {
            //......

            // let the task do its work
            isRunning = true;
            run();

            // if this left the run() method cleanly despite the fact that this was canceled,
            // make sure the "clean shutdown" is not attempted
            if (canceled) {
                throw new CancelTaskException();
            }

            LOG.debug("Finished task {}", getName());

            //......
        }
        finally {
            // clean up everything we initialized
            isRunning = false;

            //......
        }
    }
}
  • StreamTask的invoke方法里頭調(diào)用了子類的run方法啊送,這里子類為SourceStreamTask

SourceStreamTask.run

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java

    @Override
    protected void run() throws Exception {
        headOperator.run(getCheckpointLock(), getStreamStatusMaintainer());
    }
  • SourceStreamTask的run方法主要調(diào)用headOperator的run方法偿短,這里的headOperator為SourceStream

SourceStream.run

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/StreamSource.java

    public void run(final Object lockingObject, final StreamStatusMaintainer streamStatusMaintainer) throws Exception {
        run(lockingObject, streamStatusMaintainer, output);
    }

    public void run(final Object lockingObject,
            final StreamStatusMaintainer streamStatusMaintainer,
            final Output<StreamRecord<OUT>> collector) throws Exception {

        final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();

        final Configuration configuration = this.getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration();
        final long latencyTrackingInterval = getExecutionConfig().isLatencyTrackingConfigured()
            ? getExecutionConfig().getLatencyTrackingInterval()
            : configuration.getLong(MetricOptions.LATENCY_INTERVAL);

        LatencyMarksEmitter<OUT> latencyEmitter = null;
        if (latencyTrackingInterval > 0) {
            latencyEmitter = new LatencyMarksEmitter<>(
                getProcessingTimeService(),
                collector,
                latencyTrackingInterval,
                this.getOperatorID(),
                getRuntimeContext().getIndexOfThisSubtask());
        }

        final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();

        this.ctx = StreamSourceContexts.getSourceContext(
            timeCharacteristic,
            getProcessingTimeService(),
            lockingObject,
            streamStatusMaintainer,
            collector,
            watermarkInterval,
            -1);

        try {
            userFunction.run(ctx);

            // if we get here, then the user function either exited after being done (finite source)
            // or the function was canceled or stopped. For the finite source case, we should emit
            // a final watermark that indicates that we reached the end of event-time
            if (!isCanceledOrStopped()) {
                ctx.emitWatermark(Watermark.MAX_WATERMARK);
            }
        } finally {
            // make sure that the context is closed in any case
            ctx.close();
            if (latencyEmitter != null) {
                latencyEmitter.close();
            }
        }
    }
  • SourceStream的run方法,這里先通過StreamSourceContexts.getSourceContext構(gòu)造SourceFunction.SourceContext馋没,然后調(diào)用userFunction的run方法昔逗,這里的userFunction為RandomWordSource,即用戶自定義的SourceFunction(這里要注意在調(diào)用userFunction.run(ctx)之前篷朵,如果latencyTrackingInterval大于0勾怒,還創(chuàng)建了LatencyMarksEmitter)

RandomWordSource.run

public class RandomWordSource implements SourceFunction<String> {

    private static final Logger LOGGER = LoggerFactory.getLogger(RandomWordSource.class);

    private volatile boolean isRunning = true;

    private static final String[] words = new String[]{"The", "brown", "fox", "quick", "jump", "sucky", "5dolla"};

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        while (isRunning) {
            Thread.sleep(300);
            int rnd = (int) (Math.random() * 10 % words.length);
            LOGGER.info("emit word: {}", words[rnd]);
            ctx.collect(words[rnd]);
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }
}
  • RandomWordSource的run方法會一直循環(huán)發(fā)射數(shù)據(jù)

StreamSource.LatencyMarksEmitter

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/StreamSource.java

    private static class LatencyMarksEmitter<OUT> {
        private final ScheduledFuture<?> latencyMarkTimer;

        public LatencyMarksEmitter(
                final ProcessingTimeService processingTimeService,
                final Output<StreamRecord<OUT>> output,
                long latencyTrackingInterval,
                final OperatorID operatorId,
                final int subtaskIndex) {

            latencyMarkTimer = processingTimeService.scheduleAtFixedRate(
                new ProcessingTimeCallback() {
                    @Override
                    public void onProcessingTime(long timestamp) throws Exception {
                        try {
                            // ProcessingTimeService callbacks are executed under the checkpointing lock
                            output.emitLatencyMarker(new LatencyMarker(timestamp, operatorId, subtaskIndex));
                        } catch (Throwable t) {
                            // we catch the Throwables here so that we don't trigger the processing
                            // timer services async exception handler
                            LOG.warn("Error while emitting latency marker.", t);
                        }
                    }
                },
                0L,
                latencyTrackingInterval);
        }

        public void close() {
            latencyMarkTimer.cancel(true);
        }
    }
  • LatencyMarksEmitter是在StreamSource的run方法里頭赃份,調(diào)用userFunction的run方法前創(chuàng)建的(如果latencyTrackingInterval>0的話)涨冀,這里的latencyTrackingInterval先調(diào)用getExecutionConfig().isLatencyTrackingConfigured()判斷executionConfig是否有配置該值焰情,有配置的話則使用getExecutionConfig().getLatencyTrackingInterval()返回的值法牲,沒有配置的話則使用configuration.getLong(MetricOptions.LATENCY_INTERVAL)返回的值,后者默認是2000L(這里使用的是后者的配置瘫辩,即為2000)
  • LatencyMarksEmitter的構(gòu)造器里頭調(diào)用processingTimeService.scheduleAtFixedRate方法注冊了一個fixedRate的定時任務(wù)烦秩,調(diào)度間隔為latencyTrackingInterval
  • 定時任務(wù)的處理內(nèi)容在ProcessingTimeCallback的onProcessTime方法味悄,里頭調(diào)用了output.emitLatencyMarker(new LatencyMarker(timestamp, operatorId, subtaskIndex))來發(fā)送LatencyMarker缚够;這里的processingTimeService為SystemProcessingTimeService幔妨;這里的output為AbstractStreamOperator.CountingOutput

SystemProcessingTimeService.scheduleAtFixedRate

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java

    @Override
    public ScheduledFuture<?> scheduleAtFixedRate(ProcessingTimeCallback callback, long initialDelay, long period) {
        long nextTimestamp = getCurrentProcessingTime() + initialDelay;

        // we directly try to register the timer and only react to the status on exception
        // that way we save unnecessary volatile accesses for each timer
        try {
            return timerService.scheduleAtFixedRate(
                new RepeatedTriggerTask(status, task, checkpointLock, callback, nextTimestamp, period),
                initialDelay,
                period,
                TimeUnit.MILLISECONDS);
        } catch (RejectedExecutionException e) {
            final int status = this.status.get();
            if (status == STATUS_QUIESCED) {
                return new NeverCompleteFuture(initialDelay);
            }
            else if (status == STATUS_SHUTDOWN) {
                throw new IllegalStateException("Timer service is shut down");
            }
            else {
                // something else happened, so propagate the exception
                throw e;
            }
        }
    }

    @Override
    public long getCurrentProcessingTime() {
        return System.currentTimeMillis();
    }
  • SystemProcessingTimeService的scheduleAtFixedRate方法鹦赎,實際是委托timerService的scheduleAtFixedRate來執(zhí)行的谍椅,這里的timerService即ScheduledThreadPoolExecutor误堡,它的corePoolSize為1,然后它調(diào)度的任務(wù)是RepeatedTriggerTask

RepeatedTriggerTask

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java

    /**
     * Internal task which is repeatedly called by the processing time service.
     */
    private static final class RepeatedTriggerTask implements Runnable {

        private final AtomicInteger serviceStatus;
        private final Object lock;
        private final ProcessingTimeCallback target;
        private final long period;
        private final AsyncExceptionHandler exceptionHandler;

        private long nextTimestamp;

        private RepeatedTriggerTask(
                final AtomicInteger serviceStatus,
                final AsyncExceptionHandler exceptionHandler,
                final Object lock,
                final ProcessingTimeCallback target,
                final long nextTimestamp,
                final long period) {

            this.serviceStatus = Preconditions.checkNotNull(serviceStatus);
            this.lock = Preconditions.checkNotNull(lock);
            this.target = Preconditions.checkNotNull(target);
            this.period = period;
            this.exceptionHandler = Preconditions.checkNotNull(exceptionHandler);

            this.nextTimestamp = nextTimestamp;
        }

        @Override
        public void run() {
            synchronized (lock) {
                try {
                    if (serviceStatus.get() == STATUS_ALIVE) {
                        target.onProcessingTime(nextTimestamp);
                    }

                    nextTimestamp += period;
                } catch (Throwable t) {
                    TimerException asyncException = new TimerException(t);
                    exceptionHandler.handleAsyncException("Caught exception while processing repeated timer task.", asyncException);
                }
            }
        }
    }
  • RepeatedTriggerTask會在serviceStatus為STATUS_ALIVE的時候雏吭,調(diào)用ProcessingTimeCallback的onProcessingTime锁施;這里的nextTimestamp最初傳進來的是依據(jù)getCurrentProcessingTime() + initialDelay來算的,之后不斷累加period

AbstractStreamOperator.CountingOutput.emitLatencyMarker

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java

    /**
     * Wrapping {@link Output} that updates metrics on the number of emitted elements.
     */
    public static class CountingOutput<OUT> implements Output<StreamRecord<OUT>> {
        private final Output<StreamRecord<OUT>> output;
        private final Counter numRecordsOut;

        public CountingOutput(Output<StreamRecord<OUT>> output, Counter counter) {
            this.output = output;
            this.numRecordsOut = counter;
        }

        @Override
        public void emitWatermark(Watermark mark) {
            output.emitWatermark(mark);
        }

        @Override
        public void emitLatencyMarker(LatencyMarker latencyMarker) {
            output.emitLatencyMarker(latencyMarker);
        }

        @Override
        public void collect(StreamRecord<OUT> record) {
            numRecordsOut.inc();
            output.collect(record);
        }

        @Override
        public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
            numRecordsOut.inc();
            output.collect(outputTag, record);
        }

        @Override
        public void close() {
            output.close();
        }
    }
  • 它實際包裝的是RecordWriterOutput

RecordWriterOutput.emitLatencyMarker

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java

/**
 * Implementation of {@link Output} that sends data using a {@link RecordWriter}.
 */
@Internal
public class RecordWriterOutput<OUT> implements OperatorChain.WatermarkGaugeExposingOutput<StreamRecord<OUT>> {

    private StreamRecordWriter<SerializationDelegate<StreamElement>> recordWriter;

    private SerializationDelegate<StreamElement> serializationDelegate;

    //......

    @Override
    public void emitLatencyMarker(LatencyMarker latencyMarker) {
        serializationDelegate.setInstance(latencyMarker);

        try {
            recordWriter.randomEmit(serializationDelegate);
        }
        catch (Exception e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    }
}
  • 這里的emitLatencyMarker主要調(diào)用了StreamRecordWriter的randomEmit(它實際上是通過父類RecordWriter來發(fā)射)杖们,來發(fā)射LatencyMarker

RecordWriter

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java

    /**
     * This is used to send LatencyMarks to a random target channel.
     */
    public void randomEmit(T record) throws IOException, InterruptedException {
        sendToTarget(record, rng.nextInt(numChannels));
    }

    private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException {
        RecordSerializer<T> serializer = serializers[targetChannel];

        SerializationResult result = serializer.addRecord(record);

        while (result.isFullBuffer()) {
            if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) {
                // If this was a full record, we are done. Not breaking
                // out of the loop at this point will lead to another
                // buffer request before breaking out (that would not be
                // a problem per se, but it can lead to stalls in the
                // pipeline).
                if (result.isFullRecord()) {
                    break;
                }
            }
            BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel);

            result = serializer.continueWritingWithNextBufferBuilder(bufferBuilder);
        }
        checkState(!serializer.hasSerializedData(), "All data should be written at once");

        if (flushAlways) {
            targetPartition.flush(targetChannel);
        }
    }
  • RecordWriter的randomEmit就是隨機選擇一個targetChannel悉抵,然后進行發(fā)送

Task.run(下游)

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskmanager/Task.java

/**
 * The Task represents one execution of a parallel subtask on a TaskManager.
 * A Task wraps a Flink operator (which may be a user function) and
 * runs it, providing all services necessary for example to consume input data,
 * produce its results (intermediate result partitions) and communicate
 * with the JobManager.
 *
 * <p>The Flink operators (implemented as subclasses of
 * {@link AbstractInvokable} have only data readers, -writers, and certain event callbacks.
 * The task connects those to the network stack and actor messages, and tracks the state
 * of the execution and handles exceptions.
 *
 * <p>Tasks have no knowledge about how they relate to other tasks, or whether they
 * are the first attempt to execute the task, or a repeated attempt. All of that
 * is only known to the JobManager. All the task knows are its own runnable code,
 * the task's configuration, and the IDs of the intermediate results to consume and
 * produce (if any).
 *
 * <p>Each Task is run by one dedicated thread.
 */
public class Task implements Runnable, TaskActions, CheckpointListener {
    //......

    /**
     * The core work method that bootstraps the task and executes its code.
     */
    @Override
    public void run() {
            //......
            // now load and instantiate the task's invokable code
            invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);

            // ----------------------------------------------------------------
            //  actual task core work
            // ----------------------------------------------------------------

            // we must make strictly sure that the invokable is accessible to the cancel() call
            // by the time we switched to running.
            this.invokable = invokable;

            // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
            if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
                throw new CancelTaskException();
            }

            // notify everyone that we switched to running
            notifyObservers(ExecutionState.RUNNING, null);
            taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));

            // make sure the user code classloader is accessible thread-locally
            executingThread.setContextClassLoader(userCodeClassLoader);

            // run the invokable
            invokable.invoke();

            //......
    }
}
  • 下游的Task的run方法會調(diào)用invokable.invoke(),這里的invokable為OneInputStreamTask

OneInputStreamTask

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java

    @Override
    protected void run() throws Exception {
        // cache processor reference on the stack, to make the code more JIT friendly
        final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;

        while (running && inputProcessor.processInput()) {
            // all the work happens in the "processInput" method
        }
    }
  • Task的run方法會調(diào)用StreamTask的invoke方法摘完,而invoke方法會調(diào)用OneInputStreamTask的run方法這里主要是不斷循環(huán)調(diào)用inputProcessor.processInput()姥饰;這里的inputProcessor為StreamInputProcessor

StreamInputProcessor

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java

    public boolean processInput() throws Exception {
        if (isFinished) {
            return false;
        }
        if (numRecordsIn == null) {
            try {
                numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
            } catch (Exception e) {
                LOG.warn("An exception occurred during the metrics setup.", e);
                numRecordsIn = new SimpleCounter();
            }
        }

        while (true) {
            if (currentRecordDeserializer != null) {
                DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);

                if (result.isBufferConsumed()) {
                    currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
                    currentRecordDeserializer = null;
                }

                if (result.isFullRecord()) {
                    StreamElement recordOrMark = deserializationDelegate.getInstance();

                    if (recordOrMark.isWatermark()) {
                        // handle watermark
                        statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel);
                        continue;
                    } else if (recordOrMark.isStreamStatus()) {
                        // handle stream status
                        statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel);
                        continue;
                    } else if (recordOrMark.isLatencyMarker()) {
                        // handle latency marker
                        synchronized (lock) {
                            streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());
                        }
                        continue;
                    } else {
                        // now we can do the actual processing
                        StreamRecord<IN> record = recordOrMark.asRecord();
                        synchronized (lock) {
                            numRecordsIn.inc();
                            streamOperator.setKeyContextElement1(record);
                            streamOperator.processElement(record);
                        }
                        return true;
                    }
                }
            }

            final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
            if (bufferOrEvent != null) {
                if (bufferOrEvent.isBuffer()) {
                    currentChannel = bufferOrEvent.getChannelIndex();
                    currentRecordDeserializer = recordDeserializers[currentChannel];
                    currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
                }
                else {
                    // Event received
                    final AbstractEvent event = bufferOrEvent.getEvent();
                    if (event.getClass() != EndOfPartitionEvent.class) {
                        throw new IOException("Unexpected event: " + event);
                    }
                }
            }
            else {
                isFinished = true;
                if (!barrierHandler.isEmpty()) {
                    throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
                }
                return false;
            }
        }
    }
  • processInput方法首先調(diào)用currentRecordDeserializer.getNextRecord(deserializationDelegate)獲取nextRecord,然后只有當(dāng)result.isFullRecord()的時候才進行處理
  • 處理的時候會根據(jù)StreamElement的不同類型進行不同處理孝治,主要分為watermark列粪、streamStatus、latencyMakrker及正常的數(shù)據(jù)這幾類來處理
  • 如果是正常的數(shù)據(jù)谈飒,則調(diào)用streamOperator.processElement(record)岂座,這里的streamOperator為StreamMap

StreamMap.processElement

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/StreamMap.java

/**
 * A {@link StreamOperator} for executing {@link MapFunction MapFunctions}.
 */
@Internal
public class StreamMap<IN, OUT>
        extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
        implements OneInputStreamOperator<IN, OUT> {

    private static final long serialVersionUID = 1L;

    public StreamMap(MapFunction<IN, OUT> mapper) {
        super(mapper);
        chainingStrategy = ChainingStrategy.ALWAYS;
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        output.collect(element.replace(userFunction.map(element.getValue())));
    }
}
  • 這里調(diào)用了userFunction.map(element.getValue())來進行map操作,這里的userFunction即為UpperCaseMapFunc

小結(jié)

  • SourceFunction是flink stream data sources的基本接口杭措,這里頭定義了run方法以及cancel方法费什,同時定義了SourceContext接口;SourceContext接口主要定義了collect手素、collectWithTimestamp方法用于發(fā)射數(shù)據(jù)鸳址,同時也提供了emitWatermark來發(fā)射Watermark
  • 對于數(shù)據(jù)的發(fā)射來說,其調(diào)用順序為Task.run --> StreamTask.invoke --> SourceStreamTask.run --> SourceStream.run --> userFunction.run(ctx)(RandomWordSource.run)泉懦;SourceStream.run里頭在調(diào)用userFunction.run之前會判斷l(xiāng)atencyTrackingInterval是否大于0稿黍,如果大于0則會創(chuàng)建LatencyMarksEmitter,它注冊了定時任務(wù)來定時回調(diào)ProcessingTimeCallback的onProcessingTime方法祠斧,來觸發(fā)output.emitLatencyMarker(new LatencyMarker(timestamp, operatorId, subtaskIndex))
  • 這里相當(dāng)于下游會收到userFunction.run發(fā)送的用戶數(shù)據(jù)闻察,也會收到定時任務(wù)發(fā)送的LatencyMarker;下游的調(diào)用順序為Task.run --> StreamTask.invoke --> OneInputStreamTask.run --> StreamInputProcessor.processInput --> statusWatermarkValve.inputWatermark或者statusWatermarkValve.inputStreamStatus或者streamOperator.processLatencyMarker或者streamOperator.processElement琢锋;可以看到StreamInputProcessor.processInput里頭會根據(jù)數(shù)據(jù)的不同類型做不同處理辕漂,如果是用戶數(shù)據(jù),則調(diào)用streamOperator.processElement即StreamMap.processElement --> userFunction.map(UpperCaseMapFunc.map)

doc

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末吴超,一起剝皮案震驚了整個濱河市钉嘹,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌鲸阻,老刑警劉巖跋涣,帶你破解...
    沈念sama閱讀 218,122評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件缨睡,死亡現(xiàn)場離奇詭異,居然都是意外死亡陈辱,警方通過查閱死者的電腦和手機奖年,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,070評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來沛贪,“玉大人陋守,你說我怎么就攤上這事±常” “怎么了水评?”我有些...
    開封第一講書人閱讀 164,491評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長媚送。 經(jīng)常有香客問我中燥,道長,這世上最難降的妖魔是什么塘偎? 我笑而不...
    開封第一講書人閱讀 58,636評論 1 293
  • 正文 為了忘掉前任疗涉,我火速辦了婚禮,結(jié)果婚禮上式塌,老公的妹妹穿的比我還像新娘博敬。我一直安慰自己,他們只是感情好峰尝,可當(dāng)我...
    茶點故事閱讀 67,676評論 6 392
  • 文/花漫 我一把揭開白布偏窝。 她就那樣靜靜地躺著,像睡著了一般武学。 火紅的嫁衣襯著肌膚如雪祭往。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,541評論 1 305
  • 那天火窒,我揣著相機與錄音硼补,去河邊找鬼。 笑死熏矿,一個胖子當(dāng)著我的面吹牛已骇,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播票编,決...
    沈念sama閱讀 40,292評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼褪储,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了慧域?” 一聲冷哼從身側(cè)響起鲤竹,我...
    開封第一講書人閱讀 39,211評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎昔榴,沒想到半個月后辛藻,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體碘橘,經(jīng)...
    沈念sama閱讀 45,655評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,846評論 3 336
  • 正文 我和宋清朗相戀三年吱肌,在試婚紗的時候發(fā)現(xiàn)自己被綠了痘拆。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,965評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡岩榆,死狀恐怖错负,靈堂內(nèi)的尸體忽然破棺而出坟瓢,到底是詐尸還是另有隱情勇边,我是刑警寧澤,帶...
    沈念sama閱讀 35,684評論 5 347
  • 正文 年R本政府宣布折联,位于F島的核電站粒褒,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏诚镰。R本人自食惡果不足惜奕坟,卻給世界環(huán)境...
    茶點故事閱讀 41,295評論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望清笨。 院中可真熱鬧月杉,春花似錦、人聲如沸抠艾。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,894評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽检号。三九已至腌歉,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間齐苛,已是汗流浹背翘盖。 一陣腳步聲響...
    開封第一講書人閱讀 33,012評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留凹蜂,地道東北人馍驯。 一個月前我還...
    沈念sama閱讀 48,126評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像玛痊,于是被迫代替她去往敵國和親汰瘫。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,914評論 2 355

推薦閱讀更多精彩內(nèi)容

  • 我想逃離卿啡,逃離這壓抑的情緒吟吝。 渡邊說:哪有人喜歡孤獨,不過是不喜歡亂交朋友罷了颈娜。 我說:哪有人喜歡軍戀剑逃,不過是我愛...
    念木東兮閱讀 2,072評論 7 14
  • “責(zé)任心”“上進心”“事業(yè)心”要有不斷學(xué)習(xí)的能力浙宜。 果然不出自己所料,這樣的書讀起來咋比英語還難呢蛹磺,說實話粟瞬,竟然沒...
    Justina_Fu閱讀 85評論 0 0
  • 2018年11月23日已堅持打卡4/6天 今天閱讀《流量池》第6~7章關(guān)于微信社會化營銷+事件營銷的內(nèi)容裙品。 第一部...
    藍豆芽先生閱讀 754評論 0 2
  • 是的,很難俗或。已經(jīng)好多天沒感覺去寫市怎。今天更是沒有頭緒。剛翻看之前的文字辛慰,滿滿的喜悅区匠。再看今天有點沒法去寫,文字不容易...
    杰克帥閱讀 242評論 1 0