Airflow Scheduler源碼解讀

1.Scheduler的啟動和停止命令

1.1 Scheduler啟動命令

對于Airflow的Scheduler我們一般會使用如下命令啟動:

airflow scheduler \
--pid /data/bdetl/airflow/pids/airflow-scheduler.pid \
--stdout /data/bdetl/logs/airflow/airflow-scheduler.out \
--stderr /data/bdetl/logs/airflow/airflow-scheduler.out \
-l /data/bdetl/logs/airflow/airflow-scheduler.log \
-D

更多參數(shù)的可以參考Scheduler參數(shù):

參數(shù) 示意
-sd, --subdir 從指定的路徑中查找dags文件。默認為'[AIRFLOW_HOME]/dags'棉浸,其中[AIRFLOW_HOME]是我們在'airflow.cfg'中為'AIRFLOW_HOME'設(shè)置的值臭笆。
-r, --run-duration 設(shè)置退出前Scheduler程序的循環(huán)執(zhí)行的時間(單位:秒)。
-n, --num_runs 設(shè)置退出Scheduler程序前,所有的dag文件被解析執(zhí)行的次數(shù)桐臊。
-p, --do_pickle 是否將DAG對象以序列化的方式發(fā)送給worker節(jié)點執(zhí)行轴合。

1.2 Scheduler停止命令

cat /data/bdetl/airflow/pids/airflow-scheduler.pid | xargs kill -15

執(zhí)行如上命令后,會殺死scheduler進程,并清除airflow-scheduler.pid文件虐秦。

2.Scheduler程序源碼

如下文章中:

ti表示task_instance,即任務(wù)實例;

tis表示task_instances;

代碼是基于airflow1.10.11版本平酿。

2.1 cli.scheduler(): 接受命令行中的airflow scheduler命令

根據(jù)指定的參數(shù)凤优,生成一個SchedulerJob,再執(zhí)行job的run方法蜈彼。

cli.scheduler()

@cli_utils.action_logging
def scheduler(args):
    py2_deprecation_waring()
    print(settings.HEADER)
    # 生成一個SchedulerJob
    job = jobs.SchedulerJob(
        dag_id=args.dag_id,
        subdir=process_subdir(args.subdir),
        run_duration=args.run_duration,
        num_runs=args.num_runs,
        do_pickle=args.do_pickle)
    # daemon模式
    if args.daemon:
        # 設(shè)置pid以及日志路徑
        pid, stdout, stderr, log_file = setup_locations("scheduler",
                                                        args.pid,
                                                        args.stdout,
                                                        args.stderr,
                                                        args.log_file)
        handle = setup_logging(log_file)
        stdout = open(stdout, 'w+')
        stderr = open(stderr, 'w+')

        ctx = daemon.DaemonContext(
            pidfile=TimeoutPIDLockFile(pid, -1),
            files_preserve=[handle],
            stdout=stdout,
            stderr=stderr,
        )
        # 執(zhí)行schedulerJob的run方法
        with ctx:
            job.run()

        stdout.close()
        stderr.close()
    else:
        signal.signal(signal.SIGINT, sigint_handler)
        signal.signal(signal.SIGTERM, sigint_handler)
        signal.signal(signal.SIGQUIT, sigquit_handler)
        job.run()

2.2 BaseJob.run(): 向job表中新增SchdulerJob記錄并調(diào)用子類的處理邏輯

執(zhí)行上述的job.run()方法之后,會執(zhí)行SchdulerJob父類的BaseJob的run方法:

BaseJob.run()

    def run(self):
        Stats.incr(self.__class__.__name__.lower() + '_start', 1, 1)
        # Adding an entry in the DB
        with create_session() as session:
            self.state = State.RUNNING
            # 往db中添加一條running的schdulerJob記錄
            session.add(self)
            session.commit()
            id_ = self.id
            make_transient(self)
            self.id = id_

            try:
                # 執(zhí)行子類的實現(xiàn)的_execute()方法
                self._execute()
                # In case of max runs or max duration
                self.state = State.SUCCESS
            except SystemExit:
                # In case of ^C or SIGTERM
                self.state = State.SUCCESS
            except Exception:
                self.state = State.FAILED
                raise
            finally:
                # job執(zhí)行完之后,填充end_date并更新記錄
                self.end_date = timezone.utcnow()
                session.merge(self)
                session.commit()

        Stats.incr(self.__class__.__name__.lower() + '_end', 1, 1)

如代碼所示,該方法主要會在job表中新建一條scheduler job的記錄:

job表中的SchedulerJob記錄
  • 如果_execute()方法(包含一個while循環(huán))正常執(zhí)行結(jié)束筑辨,則SchedulerJob的state為SUCCESS;
  • 如果執(zhí)行_execute()過程中,手動結(jié)束程序(ctrl-c or kill -15 pid)幸逆,則SchedulerJob的state為SUCCESS棍辕;
  • 如果執(zhí)行_execute()過程中拋出異常,則SchedulerJob的state為FAILED还绘;
  • 最后添加SchedulerJob的end_date楚昭,并更新db中的記錄。

2.3 SchdulerJob._execute(): SchdulerJob的具體執(zhí)行邏輯

執(zhí)行上述self._execute()會跳轉(zhuǎn)到子類的如下方法:

在這里插入圖片描述

SchdulerJob._execute()

    def _execute(self):
        self.log.info("Starting the scheduler")

        # DAGs can be pickled for easier remote execution by some executors
        pickle_dags = False
        if self.do_pickle and self.executor.__class__ not in \
                (executors.LocalExecutor, executors.SequentialExecutor):
            pickle_dags = True

        self.log.info("Running execute loop for %s seconds", self.run_duration)
        self.log.info("Processing each file at most %s times", self.num_runs)

        # Build up a list of Python files that could contain DAGs
        self.log.info("Searching for files in %s", self.subdir)
        # 根據(jù)指定的self.subdir路徑拍顷,查找dag文件
        known_file_paths = list_py_file_paths(self.subdir)
        self.log.info("There are %s files in %s", len(known_file_paths), self.subdir)

        # When using sqlite, we do not use async_mode
        # so the scheduler job and DAG parser don't access the DB at the same time.
        async_mode = not self.using_sqlite

        # AIRFLOW SETTINGS:處理dag文件的時候抚太,DagFileProcessor的超時時間,超時則kill掉處理進程
        processor_timeout_seconds = conf.getint('core', 'dag_file_processor_timeout')
        processor_timeout = timedelta(seconds=processor_timeout_seconds)

        """
        新建一個file processor agent:
          dag_directory:默認的dag文件路徑或用戶指定的dags文件路徑self.subdir
          file_paths:dags文件夾下的dag文件路徑list
          max_runs:scheduler解析dag文件的次數(shù)昔案,默認為-1尿贫,表示一直解析
          processor_factory:用于創(chuàng)建DagFileProcessor進程(AbstractDagFileProcessor子類)
          processor_timeout:DagFileProcessor進程超時時間
          async_mode:是否使用異步模式啟動DagFileProcessorManager,如果db不是sqlite,則默認使用異步模式
        """
        self.processor_agent = DagFileProcessorAgent(self.subdir,
                                                     known_file_paths,
                                                     self.num_runs,
                                                     type(self)._create_dag_file_processor,
                                                     processor_timeout,
                                                     self.dag_ids,
                                                     pickle_dags,
                                                     async_mode)

        try:
            self._execute_helper()
        except Exception:
            self.log.exception("Exception when executing execute_helper")
        finally:
            self.processor_agent.end()
            self.log.info("Exited execute loop")

_execute()是Schduler的主方法,執(zhí)行調(diào)度系統(tǒng)的主邏輯踏揣,主要包含一下幾部分:

2.3.1 list_py_file_paths(self.subdir): 找到指定路徑下的dag文件

# Build up a list of Python files that could contain DAGs
self.log.info("Searching for files in %s", self.subdir)
# 根據(jù)指定的self.subdir路徑庆亡,查找dag文件
known_file_paths = list_py_file_paths(self.subdir)
self.log.info("There are %s files in %s", len(known_file_paths), self.subdir)

list_py_file_paths(self.subdir)方法會遍歷self.subdir文件夾,并在該文件夾下尋找dag文件呼伸,最終返回的結(jié)果如下所示:


在這里插入圖片描述

后續(xù)的步驟需要對找到的dag文件進行解析。

2.3.2 創(chuàng)建DagFileProcessorAgent來解析找到的dag文件

# AIRFLOW SETTINGS:處理dag文件的時候钝尸,DagFileProcessor的超時時間括享,超時則kill掉處理進程
processor_timeout_seconds = conf.getint('core', 'dag_file_processor_timeout')
processor_timeout = timedelta(seconds=processor_timeout_seconds)

"""
新建一個file processor agent:
  dag_directory:默認的dags文件路徑或用戶指定的dags文件路徑
  file_paths:dags文件夾下的py文件路徑list
  max_runs:scheduler解析py文件的次數(shù),默認為-1珍促,表示一致解析
  processor_factory:用于創(chuàng)建DagFileProcessor進程(AbstractDagFileProcessor子類)的方法
  processor_timeout:DagFileProcessor進程超時時間
  async_mode:是否使用異步模式啟動DagFileProcessorManager,如果db不是sqlite,則默認使用異步模式
"""
self.processor_agent = DagFileProcessorAgent(self.subdir,
                                             known_file_paths,
                                             self.num_runs,
                                             type(self)._create_dag_file_processor,
                                             processor_timeout,
                                             self.dag_ids,
                                             pickle_dags,
                                             async_mode)

如上涉及到一個airflow的配置參數(shù)铃辖,表示處理dag文件的時候,DagFileProcessor的超時時間猪叙,超時則kill掉處理進程,airflow.cfg配置信息如下:

# How long before timing out a DagFileProcessor, which processes a dag file
dag_file_processor_timeout = 50
  • DagFileProcessorAgent

處理DAG文件的代理程序,它負責(zé)在整個調(diào)度過程中所有與DAG解析相關(guān)的工作弃酌。 DagFileProcessorAgent會創(chuàng)建Scheluer的子進程DagFileProcessorManager荸哟,而DagFileProcessorManager會為每一個dag文件創(chuàng)建一個DagFileProcessor進程,來處理dag文件并收集DAG文件的解析結(jié)果芒帕,并在解析dag文件的過程中歉嗓,進行進程間通信,向scheluer主進程匯報文件處理結(jié)果背蟆。

DagFileProcessorManager進程

如下圖所示的是DagFileProcessorAgent鉴分,DagFileProcessorManager哮幢,DagFileProcessor以及對應(yīng)的dag文件之間的對應(yīng):


在這里插入圖片描述

2.3.3 SchdulerJob._execute_helper(): Schduler程序循環(huán)的主邏輯

如下部分為整個Scheduler程序的核心部分,其代碼如下所示:

_execute_helper()

def _execute_helper(self):
    """
    The actual scheduler loop. The main steps in the loop are:
        #. Harvest DAG parsing results through DagFileProcessorAgent
        #. Find and queue executable tasks
            #. Change task instance state in DB
            #. Queue tasks in executor
        #. Heartbeat executor
            #. Execute queued tasks in executor asynchronously
            #. Sync on the states of running tasks
    Following is a graphic representation of these steps.
    .. image:: ../docs/img/scheduler_loop.jpg
    :rtype: None
    """
    # 根據(jù)選擇的executor,執(zhí)行其start方法
    self.executor.start()
    self.log.info("Resetting orphaned tasks for active dag runs")
    # 在啟動scheduler程序的時候志珍,將前綴為非backfill的Running的DagRun下狀態(tài)為SCHEDULED橙垢,QUEUED的ti的狀態(tài)重置為None,使其后續(xù)可以被調(diào)度執(zhí)行
    self.reset_state_for_orphaned_tasks()
    
    # Start after resetting orphaned tasks to avoid stressing out DB.
    # 執(zhí)行processor_agent的start方法伦糯,啟動代理的DagFileProcessorManager柜某,開始循環(huán)解析dags文件
    self.processor_agent.start()
    
    execute_start_time = timezone.utcnow()
    # Last time that self.heartbeat() was called.
    last_self_heartbeat_time = timezone.utcnow()
    
    # For the execute duration, parse and schedule DAGs
    # 開始循環(huán)接收DagFileProcessorManager的解析結(jié)果,并調(diào)度對應(yīng)的dag和tis舔株,while循環(huán)終止條件:
    # 1.設(shè)置了run_duration莺琳,并且while循環(huán)執(zhí)行時間到達run_duration(默認-1);
    # 2.設(shè)置了num_runs,并且所有的dag文件都被處理了num_runs次载慈。
    while (timezone.utcnow() - execute_start_time).total_seconds() < \
            self.run_duration or self.run_duration < 0:
        self.log.debug("Starting Loop...")
        loop_start_time = time.time()
        if self.using_sqlite:
            self.processor_agent.heartbeat()
            # For the sqlite case w/ 1 thread, wait until the processor
            # is finished to avoid concurrent access to the DB.
            self.log.debug(
                "Waiting for processors to finish since we're using sqlite")
            self.processor_agent.wait_until_finished()
            
        # 開始收集被解析的dag文件信息惭等,調(diào)用的是self.processor_agent.harvest_simple_dags()方法
        self.log.debug("Harvesting DAG parsing results")
        simple_dags = self._get_simple_dags()
        self.log.debug("Harvested {} SimpleDAGs".format(len(simple_dags)))
        # Send tasks for execution if available
        simple_dag_bag = SimpleDagBag(simple_dags)
        
        # 開始調(diào)度被解析到的tis
        # 1.處理文件解析的結(jié)果,并將其入executor的queued_tasks办铡,并修改tis狀態(tài):SCHEDULED->QUEUED;
        # 2.執(zhí)行executor的heartbeat方法辞做,異步執(zhí)行queued_tasks中的tis,并同步tis的執(zhí)行狀態(tài);
        # 3.由于限制條件(pool,slots,concurrency等)寡具,對于executor的queued_tasks中未被執(zhí)行的tis秤茅,清空queued_tasks,并將tis狀態(tài)修改為SCHEDULED
        # 4.根據(jù)executor中異步執(zhí)行的ti的結(jié)果童叠,進行相應(yīng)的邏輯處理
        if not self._validate_and_run_task_instances(simple_dag_bag=simple_dag_bag):
            continue
            
        # Heartbeat the scheduler periodically
        # 當前時間和上一次scheduler心跳間隔
        time_since_last_heartbeat = (timezone.utcnow() -
                                     last_self_heartbeat_time).total_seconds()
        # self.heartrate為配置文件中指定的scheduler心跳頻率
        if time_since_last_heartbeat > self.heartrate:
            self.log.debug("Heartbeating the scheduler")
            # 執(zhí)行heartbeat()方法
            # 1.使用當前時間更新job表中SchedulerJob心跳時間框喳;
            # 2.如果job的狀態(tài)被修改為SHUTDOWN,則kill當前job厦坛。
            self.heartbeat()
            # 設(shè)置當前心跳執(zhí)行的時間
            last_self_heartbeat_time = timezone.utcnow()
            
        is_unit_test = conf.getboolean('core', 'unit_test_mode')
        loop_end_time = time.time()
        # while循環(huán)用時
        loop_duration = loop_end_time - loop_start_time
        self.log.debug(
            "Ran scheduling loop in %.2f seconds",
            loop_duration)
        
        if not is_unit_test:
            self.log.debug("Sleeping for %.2f seconds", self._processor_poll_interval)
            # 如果設(shè)置了_processor_poll_interval則讓while程序sleep指定時間
            time.sleep(self._processor_poll_interval)
            
        if self.processor_agent.done:
            self.log.info("Exiting scheduler loop as all files"
                          " have been processed {} times".format(self.num_runs))
            # 所有文件配處理了num_runs次五垮,終止while循環(huán)
            break
            
        if loop_duration < 1 and not is_unit_test:
            sleep_length = 1 - loop_duration
            self.log.debug(
                "Sleeping for {0:.2f} seconds to prevent excessive logging"
                .format(sleep_length))
            # 如果while循環(huán)間隔小于1秒,則讓while循環(huán)sleep(1 - loop_duration)秒杜秸,即while循環(huán)最小間隔為1秒
            sleep(sleep_length)
            
    # Stop any processors
    # 循環(huán)執(zhí)行完了放仗,向DagFileProcessorManager發(fā)送終止信號,讓其停止所有的DagFileProcessor進程撬碟。
    self.processor_agent.terminate()
    
    # Verify that all files were processed, and if so, deactivate DAGs that
    # haven't been touched by the scheduler as they likely have been
    # deleted.
    if self.processor_agent.all_files_processed:
        self.log.info(
            "Deactivating DAGs that haven't been touched since %s",
            execute_start_time.isoformat()
        )
        # 將dag表中未被處理的dag記錄的is_active設(shè)置為False
        models.DAG.deactivate_stale_dags(execute_start_time)
    
    # 執(zhí)行executor的end方法诞挨,結(jié)束executor
    self.executor.end()
    settings.Session.remove()

2.3.3.1 self.executor.start(): 啟動任務(wù)執(zhí)行器

BaseJob的構(gòu)造器中指定的executor:

self.executor = executor or executors.get_default_executor()

根據(jù)配置文件獲得executor:

def get_default_executor():
    """Creates a new instance of the configured executor if none exists and returns it"""
    global DEFAULT_EXECUTOR

    if DEFAULT_EXECUTOR is not None:
        return DEFAULT_EXECUTOR
        # 根據(jù)airflow.cfg中的配置獲取執(zhí)行executor
    executor_name = conf.get('core', 'EXECUTOR')

    DEFAULT_EXECUTOR = _get_executor(executor_name)

    log.info("Using executor %s", executor_name)

    return DEFAULT_EXECUTOR

airflow.cfg配置信息,生產(chǎn)環(huán)境一般會設(shè)置成CeleryExecutor:

# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, KubernetesExecutor
executor = CeleryExecutor

執(zhí)行CeleryExecutor的start方法呢蛤,僅輸出一行日志顯示當前executor使用多少進程來同步任務(wù)元數(shù)據(jù):

def start(self):
    self.log.debug(
        'Starting Celery Executor using %s processes for syncing',
        self._sync_parallelism
    )

2.3.3.1 self.reset_state_for_orphaned_tasks(): scheduler啟動之后重置給定狀態(tài)的tis

在啟動scheduler程序的時候惶傻,將非backfill前綴的而狀態(tài)為RUNNING的的DagRun下狀態(tài)為SCHEDULED,QUEUED的tis的狀態(tài)重置為None其障,這樣可以讓scheduler程序后續(xù)將這些tis正常調(diào)度达罗。


在這里插入圖片描述

2.3.3.2 self.processor_agent.start(): 啟動 DagFileProcessorManager 開始循環(huán)解析dag文件

# Start after resetting orphaned tasks to avoid stressing out DB.
# 執(zhí)行processor_agent的start方法,啟動DagFileProcessorManager處理器,開始循環(huán)解析dags文件
self.processor_agent.start()

DagFileProcessorAgent.start()方法:

def start(self):
    """
    Launch DagFileProcessorManager processor and start DAG parsing loop in manager.
    """
    if six.PY2:
        context = multiprocessing
    else:
        mp_start_method = self._get_multiprocessing_start_method()
        context = multiprocessing.get_context(mp_start_method)
        
    # Scheduler和DagFileProcessorManager的進程雙向通訊管道
    self._parent_signal_conn, child_signal_conn = context.Pipe()
    
    # 創(chuàng)建一個進程粮揉,其target為要執(zhí)行的內(nèi)容巡李,args為傳入target的參數(shù)
    self._process = context.Process(
        target=type(self)._run_processor_manager,
        args=(
            self._dag_directory,
            self._file_paths,
            self._max_runs,
            # getattr prevents error while pickling an instance method.
            # 獲得一個進程工廠,主要是用來創(chuàng)建DagFileProcessor
            getattr(self, "_processor_factory"),
            self._processor_timeout,
            # Schduler進程(DagFileProcessorAgent)與子進程DagFileProcessorManager通信的管道
            child_signal_conn,
            self._dag_ids,
            self._pickle_dags,
            self._async_mode,
        )
    )
    
    # 執(zhí)行process的start方法之后扶认,會調(diào)用target的run方法
    self._process.start()
    self.log.info("Launched DagFileProcessorManager with pid: %s", self._process.pid)

DagFileProcessorAgent和DagFileProcessorManager 的交互邏輯如下所示:


在這里插入圖片描述

2.3.3.3 核心代碼Scheduler程序的while循壞

_execute_helper方法的while循環(huán)是整個調(diào)度的核心侨拦,對于解析到的dag信息作出調(diào)度處理如下圖所示:


在這里插入圖片描述

循環(huán)的主要步驟如下:

  • 通過DagFileProcessorAgent獲取DAG文件解析結(jié)果

  • 查找并排隊可執(zhí)行的任務(wù)

    • 改變DB中的tis狀態(tài);
    • 在執(zhí)行器中對任務(wù)進行排隊
  • 心跳執(zhí)行器

    • 在執(zhí)行器中異步執(zhí)行排隊的任務(wù)(調(diào)用executor的trigger_tasks方法)
    • 同步運行任務(wù)的狀態(tài)(調(diào)用sync方法同步任務(wù)執(zhí)行狀態(tài))
2.3.3.3.1 self._get_simple_dags(): 收集dag文件的解析結(jié)果
# 開始收集dag文件信息,調(diào)用的是self.processor_agent.harvest_simple_dags()方法
self.log.debug("Harvesting DAG parsing results")
simple_dags = self._get_simple_dags()
self.log.debug("Harvested {} SimpleDAGs".format(len(simple_dags)))

self._get_simple_dags()方法最終會調(diào)用self.processor_agent.harvest_simple_dags()方法:

在這里插入圖片描述

  • DagFileProcessorAgent會保存dag文件的統(tǒng)計信息辐宾,其他代碼會根據(jù)agent中保存的最新信息做相應(yīng)處理狱从;
  • 解析到的simple_dags信息(這里返回的都是可以被執(zhí)行的dag,而其在db中對應(yīng)的tis會被設(shè)置為SCHEDULED狀態(tài))叠纹,會在后續(xù)交給Scheduler進行任務(wù)調(diào)度處理季研。
2.3.3.3.2 SimpleDagBag(simple_dags): 將收集到的所有的simple_dags包裝成SimpleDagBag
# Send tasks for execution if available
simple_dag_bag = SimpleDagBag(simple_dags)

SimpleDagBag

class SimpleDagBag(BaseDagBag):
    """
    A collection of SimpleDag objects with some convenience methods.
    """

    def __init__(self, simple_dags):
        """
        Constructor.

        :param simple_dags: SimpleDag objects that should be in this
        :type list(airflow.utils.dag_processing.SimpleDagBag)
        """
        self.simple_dags = simple_dags
        self.dag_id_to_simple_dag = {}

        for simple_dag in simple_dags:
            self.dag_id_to_simple_dag[simple_dag.dag_id] = simple_dag

    @property
    def dag_ids(self):
        """
        :return: IDs of all the DAGs in this
        :rtype: list[unicode]
        """
        return self.dag_id_to_simple_dag.keys()

    def get_dag(self, dag_id):
        """
        :param dag_id: DAG ID
        :type dag_id: unicode
        :return: if the given DAG ID exists in the bag, return the BaseDag
        corresponding to that ID. Otherwise, throw an Exception
        :rtype: airflow.utils.dag_processing.SimpleDag
        """
        if dag_id not in self.dag_id_to_simple_dag:
            raise AirflowException("Unknown DAG ID {}".format(dag_id))
        return self.dag_id_to_simple_dag[dag_id]
2.3.3.3.3 self._validate_and_run_task_instances: 驗證并執(zhí)行tis

_Scheduler.validate_and_run_task_instances()

def _validate_and_run_task_instances(self, simple_dag_bag):
    if len(simple_dag_bag.simple_dags) > 0:
        try:
            # 1.處理文件解析的結(jié)果,并將其入executor的queued_tasks誉察,并修改tis狀態(tài):SCHEDULED->QUEUED;
            self._process_and_execute_tasks(simple_dag_bag)
        except Exception as e:
            self.log.error("Error queuing tasks")
            self.log.exception(e)
            return False

    # Call heartbeats
    self.log.debug("Heartbeating the executor")
    # 2.調(diào)用executor的heartbeat發(fā)送心跳
    # 1) executor.trigger_tasks(open_slots)異步執(zhí)行queued_tasks中的tis与涡;
    # 2) executor.sync()同步tis的元數(shù)據(jù)
    self.executor.heartbeat()

    # 3.對于executor的queued_tasks中未被執(zhí)行的ti,清空queued_tasks持偏,并將其狀態(tài)修改為SCHEDULED
    self._change_state_for_tasks_failed_to_execute()

    # Process events from the executor
    # 4.根據(jù)executor中異步執(zhí)行的tis的結(jié)果驼卖,進行相應(yīng)的邏輯處理
    self._process_executor_events(simple_dag_bag)
    return True
  • self._process_and_execute_tasks(simple_dag_bag)

    • 處理那些dag_run中狀態(tài)為非running的的task_instance,如果task_instance的狀態(tài)為up_for_retry鸿秆,但是其dag_run不是running狀態(tài)酌畜,那么將task_instance的狀態(tài)設(shè)置為failed,后續(xù)不再調(diào)度它們卿叽;

      在這里插入圖片描述

    • 如果task_instance的狀態(tài)為scheduled/queued/up for reschedule桥胞,但是其dag_run的狀態(tài)不是running那么將其狀態(tài)設(shè)置為None,后續(xù)不再調(diào)度它們考婴;

      在這里插入圖片描述

    • 準備執(zhí)行那些滿足條件的task_instance:

      • 按照條件來查找出可被執(zhí)行的tis:滿足條件(priority/concurrency/max_active_runs/pool limits)狀態(tài)為SCHEDULED的task_instance贩虾;

        • 對應(yīng)dag的不為paused,dag_run不是backfill蕉扮,tis為SCHEDULED狀態(tài)整胃;


          在這里插入圖片描述
      • 在db中修改上述可被執(zhí)行的task_instance的狀態(tài)為QUEUED颗圣;

      • 在executor中對上述的task_instance生成airflow run XXX命令喳钟,并將這些命令執(zhí)行入隊操作(放入executor的queued_tasks的字典中);

        queued_tasks字典中的元素如下:

        key:self.dag_id, self.task_id, self.execution_date, self.try_number

        value: (command, priority, queue, simple_task_instance)

  • self.executor.heartbeat()

    • 根據(jù)可用的slots數(shù)在岂,將執(zhí)行任務(wù)的airflow run xxx命令通過celery發(fā)送給遠端的worker來執(zhí)行奔则;

    • 獲取遠端worker執(zhí)行任務(wù)的執(zhí)行狀態(tài)和并在Schduler節(jié)點的executor中的保存任務(wù)狀態(tài)信息,其結(jié)果會在保存在event_buffer字典中:

      key:self.dag_id, self.task_id, self.execution_date, self.try_number

      value:State.FAILED or State.SUCCESS

      event_buffer字典中任務(wù)狀態(tài)信息會在self._process_executor_events方法中使用蔽午。

  • self._change_state_for_tasks_failed_to_execute()

    • 對于executor的queued_tasks中未被執(zhí)行的tis易茬,在db中找到對應(yīng)的QUEUED狀態(tài)的tis,將其狀態(tài)修改為SCHEDULED,并清空queued_tasks字典抽莱;
  • self._process_executor_events(simple_dag_bag)

    • 如果executor已執(zhí)行的task收到的狀態(tài)回復(fù)為FAILED或SUCCESS范抓,但是db中ti狀態(tài)為QUEUED,則任務(wù)可能為killed externally食铐,將其db中的tis的狀態(tài)修改為FAILED匕垫。

      我們在使用external_task_sensor的時候,當external_task_sensor設(shè)置的reschedule_date時間非常短虐呻,可能會造成上面的問題象泵。

      主要原因是第一次執(zhí)行external_task_sensor的時候,不滿足條件將其設(shè)置為reschedule斟叼,而Scheduler又非撑蓟荩快速的將該dag文件解析入隊執(zhí)行,導(dǎo)致當前的ti的狀態(tài)被修改為QUEUED狀態(tài)朗涩,但是這是由于才開始執(zhí)行到self._process_executor_events忽孽,會出現(xiàn)SUCCESS != QUEUED的情況馋缅,導(dǎo)致最終db中的ti狀態(tài)變成FAILED扒腕。

      參考:

      1. AIRFLOW-5071
      2. Thousands of Executor reports task instance X finished (success) although the task says its queued. Was the task killed externally?
2.3.3.3.4 周期的執(zhí)行SchedulerJob的心跳方法
# Heartbeat the scheduler periodically
# 當前時間和上一次scheduler心跳間隔
time_since_last_heartbeat = (timezone.utcnow() -
                             last_self_heartbeat_time).total_seconds()
# self.heartrate為配置文件中scheduler_heartbeat_sec指定的scheduler心跳頻率
if time_since_last_heartbeat > self.heartrate:
    self.log.debug("Heartbeating the scheduler")
    # 執(zhí)行heartbeat()方法
    # 1.如果job的狀態(tài)被修改為SHUTDOWN,則kill當前job;
    # 2.如果沒有達到指定job的心跳頻率(job_heartbeat_sec)萤悴,則sleep瘾腰;
    # 3.使用當前時間更新job表中心跳時間。  
    self.heartbeat()
    # 設(shè)置當前心跳執(zhí)行的時間
    last_self_heartbeat_time = timezone.utcnow()

airflow.cfg中的scheduler_heartbeat_sec配置項:

# The scheduler constantly tries to trigger new tasks (look at the
# scheduler section in the docs for more information). This defines
# how often the scheduler should run (in seconds).
scheduler_heartbeat_sec = 5

airflow.cfg中的job_heartbeat_sec配置項:

# Task instances listen for external kill signal (when you clear tasks
# from the CLI or the UI), this defines the frequency at which they should
# listen (in seconds).
job_heartbeat_sec = 5
在這里插入圖片描述
2.3.3.3.5 self._processor_poll_interval: 輪訓(xùn)間隔時間
if not is_unit_test:
    self.log.debug("Sleeping for %.2f seconds", self._processor_poll_interval)
    # 如果設(shè)置了_processor_poll_interval則讓while程序sleep指定時間
    time.sleep(self._processor_poll_interval)

airflow.cfg中的processor_poll_interval配置項:

# The number of seconds to wait between consecutive DAG file processing
processor_poll_interval = 1
2.3.3.3.6 self.num_runs: while循環(huán)終止條件dag文件達到指定的處理次數(shù)
if self.processor_agent.done:
    self.log.info("Exiting scheduler loop as all files"
                  " have been processed {} times".format(self.num_runs))
    # 所有文件配處理了num_runs次覆履,終止while循環(huán)
    break

airflow.cfg中的num_runs配置項:

# The number of times to try to schedule each DAG file
# -1 indicates unlimited number
num_runs = -1
2.3.3.3.7 設(shè)置while循環(huán)的最小輪訓(xùn)時間
if loop_duration < 1 and not is_unit_test:
    sleep_length = 1 - loop_duration
    self.log.debug(
        "Sleeping for {0:.2f} seconds to prevent excessive logging"
        .format(sleep_length))
    # 如果while循環(huán)間隔小于1秒蹋盆,則讓while循環(huán)sleep(1 - loop_duration)秒,即while循環(huán)最小間隔為1秒
    sleep(sleep_length)

2.3.3.4 self.processor_agent.terminate(): 向DagFileProcessorManager發(fā)送終止信號

# Stop any processors
# 向DagFileProcessorManager發(fā)送終止信號硝全,讓其停止所有的DagFileProcessor進程栖雾。
self.processor_agent.terminate()

DagFileProcessorAgent.terminate()方法:

def terminate(self):
    """
    Send termination signal to DAG parsing processor manager
    and expect it to terminate all DAG file processors.
    """
    if self._process and self._process.is_alive():
        self.log.info("Sending termination message to manager.")
        try:
            # 通過Schduler進程的通信管道,向DagFileProcessorManager發(fā)送TERMINATE_MANAGER的信號
            self._parent_signal_conn.send(DagParsingSignal.TERMINATE_MANAGER)
        except ConnectionError:
            pass

2.3.3.5 models.DAG.deactivate_stale_dags(execute_start_time):

# Verify that all files were processed, and if so, deactivate DAGs that
# haven't been touched by the scheduler as they likely have been
# deleted.
if self.processor_agent.all_files_processed:
    self.log.info(
        "Deactivating DAGs that haven't been touched since %s",
        execute_start_time.isoformat()
    )
    # 將dag表中未被處理的dag記錄的is_active設(shè)置為False
    models.DAG.deactivate_stale_dags(execute_start_time)

2.3.3.6 self.executor.end(): 結(jié)束executor

# 執(zhí)行executor的end方法,結(jié)束executor
self.executor.end()

CeleryExecutor.end()方法:

def end(self, synchronous=False):
    if synchronous:
        while any([
                task.state not in celery_states.READY_STATES
                for task in self.tasks.values()]):
            time.sleep(5)
    self.sync()

2.3.4 self.processor_agent.end():結(jié)束DagFileProcessorManager

結(jié)束Scheduler進程的子進程DagFileProcessorManager:

def end(self):
    """
    Terminate (and then kill) the manager process launched.
    :return:
    """
    if not self._process:
        self.log.warning('Ending without manager process.')
        return
    reap_process_group(self._process.pid, log=self.log)
    self._parent_signal_conn.close()
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末伟众,一起剝皮案震驚了整個濱河市析藕,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌凳厢,老刑警劉巖账胧,帶你破解...
    沈念sama閱讀 218,858評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異先紫,居然都是意外死亡治泥,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,372評論 3 395
  • 文/潘曉璐 我一進店門遮精,熙熙樓的掌柜王于貴愁眉苦臉地迎上來居夹,“玉大人,你說我怎么就攤上這事∽贾” “怎么了劫扒?”我有些...
    開封第一講書人閱讀 165,282評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長狸膏。 經(jīng)常有香客問我粟关,道長,這世上最難降的妖魔是什么环戈? 我笑而不...
    開封第一講書人閱讀 58,842評論 1 295
  • 正文 為了忘掉前任闷板,我火速辦了婚禮,結(jié)果婚禮上院塞,老公的妹妹穿的比我還像新娘遮晚。我一直安慰自己,他們只是感情好拦止,可當我...
    茶點故事閱讀 67,857評論 6 392
  • 文/花漫 我一把揭開白布县遣。 她就那樣靜靜地躺著,像睡著了一般汹族。 火紅的嫁衣襯著肌膚如雪萧求。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,679評論 1 305
  • 那天顶瞒,我揣著相機與錄音夸政,去河邊找鬼。 笑死榴徐,一個胖子當著我的面吹牛守问,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播坑资,決...
    沈念sama閱讀 40,406評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼耗帕,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了袱贮?” 一聲冷哼從身側(cè)響起仿便,我...
    開封第一講書人閱讀 39,311評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎攒巍,沒想到半個月后嗽仪,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,767評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡窑业,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,945評論 3 336
  • 正文 我和宋清朗相戀三年钦幔,在試婚紗的時候發(fā)現(xiàn)自己被綠了枕屉。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片常柄。...
    茶點故事閱讀 40,090評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出西潘,到底是詐尸還是另有隱情卷玉,我是刑警寧澤,帶...
    沈念sama閱讀 35,785評論 5 346
  • 正文 年R本政府宣布喷市,位于F島的核電站相种,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏品姓。R本人自食惡果不足惜寝并,卻給世界環(huán)境...
    茶點故事閱讀 41,420評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望腹备。 院中可真熱鬧衬潦,春花似錦、人聲如沸植酥。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,988評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽友驮。三九已至漂羊,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間卸留,已是汗流浹背走越。 一陣腳步聲響...
    開封第一講書人閱讀 33,101評論 1 271
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留耻瑟,地道東北人买喧。 一個月前我還...
    沈念sama閱讀 48,298評論 3 372
  • 正文 我出身青樓,卻偏偏與公主長得像匆赃,于是被迫代替她去往敵國和親淤毛。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,033評論 2 355

推薦閱讀更多精彩內(nèi)容