1.Scheduler的啟動和停止命令
1.1 Scheduler啟動命令
對于Airflow的Scheduler我們一般會使用如下命令啟動:
airflow scheduler \
--pid /data/bdetl/airflow/pids/airflow-scheduler.pid \
--stdout /data/bdetl/logs/airflow/airflow-scheduler.out \
--stderr /data/bdetl/logs/airflow/airflow-scheduler.out \
-l /data/bdetl/logs/airflow/airflow-scheduler.log \
-D
更多參數(shù)的可以參考Scheduler參數(shù):
參數(shù) | 示意 |
---|---|
-sd, --subdir | 從指定的路徑中查找dags文件。默認為'[AIRFLOW_HOME]/dags'棉浸,其中[AIRFLOW_HOME]是我們在'airflow.cfg'中為'AIRFLOW_HOME'設(shè)置的值臭笆。 |
-r, --run-duration | 設(shè)置退出前Scheduler程序的循環(huán)執(zhí)行的時間(單位:秒)。 |
-n, --num_runs | 設(shè)置退出Scheduler程序前,所有的dag文件被解析執(zhí)行的次數(shù)桐臊。 |
-p, --do_pickle | 是否將DAG對象以序列化的方式發(fā)送給worker節(jié)點執(zhí)行轴合。 |
1.2 Scheduler停止命令
cat /data/bdetl/airflow/pids/airflow-scheduler.pid | xargs kill -15
執(zhí)行如上命令后,會殺死scheduler進程,并清除airflow-scheduler.pid文件虐秦。
2.Scheduler程序源碼
如下文章中:
ti表示task_instance,即任務(wù)實例;
tis表示task_instances;
代碼是基于airflow1.10.11版本平酿。
2.1 cli.scheduler(): 接受命令行中的airflow scheduler命令
根據(jù)指定的參數(shù)凤优,生成一個SchedulerJob,再執(zhí)行job的run方法蜈彼。
@cli_utils.action_logging
def scheduler(args):
py2_deprecation_waring()
print(settings.HEADER)
# 生成一個SchedulerJob
job = jobs.SchedulerJob(
dag_id=args.dag_id,
subdir=process_subdir(args.subdir),
run_duration=args.run_duration,
num_runs=args.num_runs,
do_pickle=args.do_pickle)
# daemon模式
if args.daemon:
# 設(shè)置pid以及日志路徑
pid, stdout, stderr, log_file = setup_locations("scheduler",
args.pid,
args.stdout,
args.stderr,
args.log_file)
handle = setup_logging(log_file)
stdout = open(stdout, 'w+')
stderr = open(stderr, 'w+')
ctx = daemon.DaemonContext(
pidfile=TimeoutPIDLockFile(pid, -1),
files_preserve=[handle],
stdout=stdout,
stderr=stderr,
)
# 執(zhí)行schedulerJob的run方法
with ctx:
job.run()
stdout.close()
stderr.close()
else:
signal.signal(signal.SIGINT, sigint_handler)
signal.signal(signal.SIGTERM, sigint_handler)
signal.signal(signal.SIGQUIT, sigquit_handler)
job.run()
2.2 BaseJob.run(): 向job表中新增SchdulerJob記錄并調(diào)用子類的處理邏輯
執(zhí)行上述的job.run()方法之后,會執(zhí)行SchdulerJob父類的BaseJob的run方法:
def run(self):
Stats.incr(self.__class__.__name__.lower() + '_start', 1, 1)
# Adding an entry in the DB
with create_session() as session:
self.state = State.RUNNING
# 往db中添加一條running的schdulerJob記錄
session.add(self)
session.commit()
id_ = self.id
make_transient(self)
self.id = id_
try:
# 執(zhí)行子類的實現(xiàn)的_execute()方法
self._execute()
# In case of max runs or max duration
self.state = State.SUCCESS
except SystemExit:
# In case of ^C or SIGTERM
self.state = State.SUCCESS
except Exception:
self.state = State.FAILED
raise
finally:
# job執(zhí)行完之后,填充end_date并更新記錄
self.end_date = timezone.utcnow()
session.merge(self)
session.commit()
Stats.incr(self.__class__.__name__.lower() + '_end', 1, 1)
如代碼所示,該方法主要會在job表中新建一條scheduler job的記錄:
- 如果_execute()方法(包含一個while循環(huán))正常執(zhí)行結(jié)束筑辨,則SchedulerJob的state為SUCCESS;
- 如果執(zhí)行_execute()過程中,手動結(jié)束程序(
ctrl-c
orkill -15 pid
)幸逆,則SchedulerJob的state為SUCCESS棍辕; - 如果執(zhí)行_execute()過程中拋出異常,則SchedulerJob的state為FAILED还绘;
- 最后添加SchedulerJob的end_date楚昭,并更新db中的記錄。
2.3 SchdulerJob._execute(): SchdulerJob的具體執(zhí)行邏輯
執(zhí)行上述self._execute()
會跳轉(zhuǎn)到子類的如下方法:
SchdulerJob._execute()
def _execute(self):
self.log.info("Starting the scheduler")
# DAGs can be pickled for easier remote execution by some executors
pickle_dags = False
if self.do_pickle and self.executor.__class__ not in \
(executors.LocalExecutor, executors.SequentialExecutor):
pickle_dags = True
self.log.info("Running execute loop for %s seconds", self.run_duration)
self.log.info("Processing each file at most %s times", self.num_runs)
# Build up a list of Python files that could contain DAGs
self.log.info("Searching for files in %s", self.subdir)
# 根據(jù)指定的self.subdir路徑拍顷,查找dag文件
known_file_paths = list_py_file_paths(self.subdir)
self.log.info("There are %s files in %s", len(known_file_paths), self.subdir)
# When using sqlite, we do not use async_mode
# so the scheduler job and DAG parser don't access the DB at the same time.
async_mode = not self.using_sqlite
# AIRFLOW SETTINGS:處理dag文件的時候抚太,DagFileProcessor的超時時間,超時則kill掉處理進程
processor_timeout_seconds = conf.getint('core', 'dag_file_processor_timeout')
processor_timeout = timedelta(seconds=processor_timeout_seconds)
"""
新建一個file processor agent:
dag_directory:默認的dag文件路徑或用戶指定的dags文件路徑self.subdir
file_paths:dags文件夾下的dag文件路徑list
max_runs:scheduler解析dag文件的次數(shù)昔案,默認為-1尿贫,表示一直解析
processor_factory:用于創(chuàng)建DagFileProcessor進程(AbstractDagFileProcessor子類)
processor_timeout:DagFileProcessor進程超時時間
async_mode:是否使用異步模式啟動DagFileProcessorManager,如果db不是sqlite,則默認使用異步模式
"""
self.processor_agent = DagFileProcessorAgent(self.subdir,
known_file_paths,
self.num_runs,
type(self)._create_dag_file_processor,
processor_timeout,
self.dag_ids,
pickle_dags,
async_mode)
try:
self._execute_helper()
except Exception:
self.log.exception("Exception when executing execute_helper")
finally:
self.processor_agent.end()
self.log.info("Exited execute loop")
_execute()是Schduler的主方法,執(zhí)行調(diào)度系統(tǒng)的主邏輯踏揣,主要包含一下幾部分:
2.3.1 list_py_file_paths(self.subdir): 找到指定路徑下的dag文件
# Build up a list of Python files that could contain DAGs
self.log.info("Searching for files in %s", self.subdir)
# 根據(jù)指定的self.subdir路徑庆亡,查找dag文件
known_file_paths = list_py_file_paths(self.subdir)
self.log.info("There are %s files in %s", len(known_file_paths), self.subdir)
list_py_file_paths(self.subdir)方法會遍歷self.subdir文件夾,并在該文件夾下尋找dag文件呼伸,最終返回的結(jié)果如下所示:
后續(xù)的步驟需要對找到的dag文件進行解析。
2.3.2 創(chuàng)建DagFileProcessorAgent來解析找到的dag文件
# AIRFLOW SETTINGS:處理dag文件的時候钝尸,DagFileProcessor的超時時間括享,超時則kill掉處理進程
processor_timeout_seconds = conf.getint('core', 'dag_file_processor_timeout')
processor_timeout = timedelta(seconds=processor_timeout_seconds)
"""
新建一個file processor agent:
dag_directory:默認的dags文件路徑或用戶指定的dags文件路徑
file_paths:dags文件夾下的py文件路徑list
max_runs:scheduler解析py文件的次數(shù),默認為-1珍促,表示一致解析
processor_factory:用于創(chuàng)建DagFileProcessor進程(AbstractDagFileProcessor子類)的方法
processor_timeout:DagFileProcessor進程超時時間
async_mode:是否使用異步模式啟動DagFileProcessorManager,如果db不是sqlite,則默認使用異步模式
"""
self.processor_agent = DagFileProcessorAgent(self.subdir,
known_file_paths,
self.num_runs,
type(self)._create_dag_file_processor,
processor_timeout,
self.dag_ids,
pickle_dags,
async_mode)
如上涉及到一個airflow的配置參數(shù)铃辖,表示處理dag文件的時候,DagFileProcessor的超時時間猪叙,超時則kill掉處理進程,airflow.cfg
配置信息如下:
# How long before timing out a DagFileProcessor, which processes a dag file
dag_file_processor_timeout = 50
- DagFileProcessorAgent
處理DAG文件的代理程序,它負責(zé)在整個調(diào)度過程中所有與DAG解析相關(guān)的工作弃酌。 DagFileProcessorAgent會創(chuàng)建Scheluer的子進程DagFileProcessorManager荸哟,而DagFileProcessorManager會為每一個dag文件創(chuàng)建一個DagFileProcessor進程,來處理dag文件并收集DAG文件的解析結(jié)果芒帕,并在解析dag文件的過程中歉嗓,進行進程間通信,向scheluer主進程匯報文件處理結(jié)果背蟆。
如下圖所示的是DagFileProcessorAgent鉴分,DagFileProcessorManager哮幢,DagFileProcessor以及對應(yīng)的dag文件之間的對應(yīng):
2.3.3 SchdulerJob._execute_helper(): Schduler程序循環(huán)的主邏輯
如下部分為整個Scheduler程序的核心部分,其代碼如下所示:
def _execute_helper(self):
"""
The actual scheduler loop. The main steps in the loop are:
#. Harvest DAG parsing results through DagFileProcessorAgent
#. Find and queue executable tasks
#. Change task instance state in DB
#. Queue tasks in executor
#. Heartbeat executor
#. Execute queued tasks in executor asynchronously
#. Sync on the states of running tasks
Following is a graphic representation of these steps.
.. image:: ../docs/img/scheduler_loop.jpg
:rtype: None
"""
# 根據(jù)選擇的executor,執(zhí)行其start方法
self.executor.start()
self.log.info("Resetting orphaned tasks for active dag runs")
# 在啟動scheduler程序的時候志珍,將前綴為非backfill的Running的DagRun下狀態(tài)為SCHEDULED橙垢,QUEUED的ti的狀態(tài)重置為None,使其后續(xù)可以被調(diào)度執(zhí)行
self.reset_state_for_orphaned_tasks()
# Start after resetting orphaned tasks to avoid stressing out DB.
# 執(zhí)行processor_agent的start方法伦糯,啟動代理的DagFileProcessorManager柜某,開始循環(huán)解析dags文件
self.processor_agent.start()
execute_start_time = timezone.utcnow()
# Last time that self.heartbeat() was called.
last_self_heartbeat_time = timezone.utcnow()
# For the execute duration, parse and schedule DAGs
# 開始循環(huán)接收DagFileProcessorManager的解析結(jié)果,并調(diào)度對應(yīng)的dag和tis舔株,while循環(huán)終止條件:
# 1.設(shè)置了run_duration莺琳,并且while循環(huán)執(zhí)行時間到達run_duration(默認-1);
# 2.設(shè)置了num_runs,并且所有的dag文件都被處理了num_runs次载慈。
while (timezone.utcnow() - execute_start_time).total_seconds() < \
self.run_duration or self.run_duration < 0:
self.log.debug("Starting Loop...")
loop_start_time = time.time()
if self.using_sqlite:
self.processor_agent.heartbeat()
# For the sqlite case w/ 1 thread, wait until the processor
# is finished to avoid concurrent access to the DB.
self.log.debug(
"Waiting for processors to finish since we're using sqlite")
self.processor_agent.wait_until_finished()
# 開始收集被解析的dag文件信息惭等,調(diào)用的是self.processor_agent.harvest_simple_dags()方法
self.log.debug("Harvesting DAG parsing results")
simple_dags = self._get_simple_dags()
self.log.debug("Harvested {} SimpleDAGs".format(len(simple_dags)))
# Send tasks for execution if available
simple_dag_bag = SimpleDagBag(simple_dags)
# 開始調(diào)度被解析到的tis
# 1.處理文件解析的結(jié)果,并將其入executor的queued_tasks办铡,并修改tis狀態(tài):SCHEDULED->QUEUED;
# 2.執(zhí)行executor的heartbeat方法辞做,異步執(zhí)行queued_tasks中的tis,并同步tis的執(zhí)行狀態(tài);
# 3.由于限制條件(pool,slots,concurrency等)寡具,對于executor的queued_tasks中未被執(zhí)行的tis秤茅,清空queued_tasks,并將tis狀態(tài)修改為SCHEDULED
# 4.根據(jù)executor中異步執(zhí)行的ti的結(jié)果童叠,進行相應(yīng)的邏輯處理
if not self._validate_and_run_task_instances(simple_dag_bag=simple_dag_bag):
continue
# Heartbeat the scheduler periodically
# 當前時間和上一次scheduler心跳間隔
time_since_last_heartbeat = (timezone.utcnow() -
last_self_heartbeat_time).total_seconds()
# self.heartrate為配置文件中指定的scheduler心跳頻率
if time_since_last_heartbeat > self.heartrate:
self.log.debug("Heartbeating the scheduler")
# 執(zhí)行heartbeat()方法
# 1.使用當前時間更新job表中SchedulerJob心跳時間框喳;
# 2.如果job的狀態(tài)被修改為SHUTDOWN,則kill當前job厦坛。
self.heartbeat()
# 設(shè)置當前心跳執(zhí)行的時間
last_self_heartbeat_time = timezone.utcnow()
is_unit_test = conf.getboolean('core', 'unit_test_mode')
loop_end_time = time.time()
# while循環(huán)用時
loop_duration = loop_end_time - loop_start_time
self.log.debug(
"Ran scheduling loop in %.2f seconds",
loop_duration)
if not is_unit_test:
self.log.debug("Sleeping for %.2f seconds", self._processor_poll_interval)
# 如果設(shè)置了_processor_poll_interval則讓while程序sleep指定時間
time.sleep(self._processor_poll_interval)
if self.processor_agent.done:
self.log.info("Exiting scheduler loop as all files"
" have been processed {} times".format(self.num_runs))
# 所有文件配處理了num_runs次五垮,終止while循環(huán)
break
if loop_duration < 1 and not is_unit_test:
sleep_length = 1 - loop_duration
self.log.debug(
"Sleeping for {0:.2f} seconds to prevent excessive logging"
.format(sleep_length))
# 如果while循環(huán)間隔小于1秒,則讓while循環(huán)sleep(1 - loop_duration)秒杜秸,即while循環(huán)最小間隔為1秒
sleep(sleep_length)
# Stop any processors
# 循環(huán)執(zhí)行完了放仗,向DagFileProcessorManager發(fā)送終止信號,讓其停止所有的DagFileProcessor進程撬碟。
self.processor_agent.terminate()
# Verify that all files were processed, and if so, deactivate DAGs that
# haven't been touched by the scheduler as they likely have been
# deleted.
if self.processor_agent.all_files_processed:
self.log.info(
"Deactivating DAGs that haven't been touched since %s",
execute_start_time.isoformat()
)
# 將dag表中未被處理的dag記錄的is_active設(shè)置為False
models.DAG.deactivate_stale_dags(execute_start_time)
# 執(zhí)行executor的end方法诞挨,結(jié)束executor
self.executor.end()
settings.Session.remove()
2.3.3.1 self.executor.start(): 啟動任務(wù)執(zhí)行器
BaseJob的構(gòu)造器中指定的executor:
self.executor = executor or executors.get_default_executor()
根據(jù)配置文件獲得executor:
def get_default_executor():
"""Creates a new instance of the configured executor if none exists and returns it"""
global DEFAULT_EXECUTOR
if DEFAULT_EXECUTOR is not None:
return DEFAULT_EXECUTOR
# 根據(jù)airflow.cfg中的配置獲取執(zhí)行executor
executor_name = conf.get('core', 'EXECUTOR')
DEFAULT_EXECUTOR = _get_executor(executor_name)
log.info("Using executor %s", executor_name)
return DEFAULT_EXECUTOR
airflow.cfg
配置信息,生產(chǎn)環(huán)境一般會設(shè)置成CeleryExecutor:
# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, KubernetesExecutor
executor = CeleryExecutor
執(zhí)行CeleryExecutor的start方法呢蛤,僅輸出一行日志顯示當前executor使用多少進程來同步任務(wù)元數(shù)據(jù):
def start(self):
self.log.debug(
'Starting Celery Executor using %s processes for syncing',
self._sync_parallelism
)
2.3.3.1 self.reset_state_for_orphaned_tasks(): scheduler啟動之后重置給定狀態(tài)的tis
在啟動scheduler程序的時候惶傻,將非backfill前綴的而狀態(tài)為RUNNING的的DagRun下狀態(tài)為SCHEDULED,QUEUED的tis的狀態(tài)重置為None其障,這樣可以讓scheduler程序后續(xù)將這些tis正常調(diào)度达罗。
2.3.3.2 self.processor_agent.start(): 啟動 DagFileProcessorManager 開始循環(huán)解析dag文件
# Start after resetting orphaned tasks to avoid stressing out DB.
# 執(zhí)行processor_agent的start方法,啟動DagFileProcessorManager處理器,開始循環(huán)解析dags文件
self.processor_agent.start()
DagFileProcessorAgent.start()方法:
def start(self):
"""
Launch DagFileProcessorManager processor and start DAG parsing loop in manager.
"""
if six.PY2:
context = multiprocessing
else:
mp_start_method = self._get_multiprocessing_start_method()
context = multiprocessing.get_context(mp_start_method)
# Scheduler和DagFileProcessorManager的進程雙向通訊管道
self._parent_signal_conn, child_signal_conn = context.Pipe()
# 創(chuàng)建一個進程粮揉,其target為要執(zhí)行的內(nèi)容巡李,args為傳入target的參數(shù)
self._process = context.Process(
target=type(self)._run_processor_manager,
args=(
self._dag_directory,
self._file_paths,
self._max_runs,
# getattr prevents error while pickling an instance method.
# 獲得一個進程工廠,主要是用來創(chuàng)建DagFileProcessor
getattr(self, "_processor_factory"),
self._processor_timeout,
# Schduler進程(DagFileProcessorAgent)與子進程DagFileProcessorManager通信的管道
child_signal_conn,
self._dag_ids,
self._pickle_dags,
self._async_mode,
)
)
# 執(zhí)行process的start方法之后扶认,會調(diào)用target的run方法
self._process.start()
self.log.info("Launched DagFileProcessorManager with pid: %s", self._process.pid)
DagFileProcessorAgent和DagFileProcessorManager 的交互邏輯如下所示:
2.3.3.3 核心代碼Scheduler程序的while循壞
_execute_helper方法的while循環(huán)是整個調(diào)度的核心侨拦,對于解析到的dag信息作出調(diào)度處理如下圖所示:
循環(huán)的主要步驟如下:
通過DagFileProcessorAgent獲取DAG文件解析結(jié)果
-
查找并排隊可執(zhí)行的任務(wù)
- 改變DB中的tis狀態(tài);
- 在執(zhí)行器中對任務(wù)進行排隊
-
心跳執(zhí)行器
- 在執(zhí)行器中異步執(zhí)行排隊的任務(wù)(調(diào)用executor的trigger_tasks方法)
- 同步運行任務(wù)的狀態(tài)(調(diào)用sync方法同步任務(wù)執(zhí)行狀態(tài))
2.3.3.3.1 self._get_simple_dags(): 收集dag文件的解析結(jié)果
# 開始收集dag文件信息,調(diào)用的是self.processor_agent.harvest_simple_dags()方法
self.log.debug("Harvesting DAG parsing results")
simple_dags = self._get_simple_dags()
self.log.debug("Harvested {} SimpleDAGs".format(len(simple_dags)))
self._get_simple_dags()方法最終會調(diào)用self.processor_agent.harvest_simple_dags()
方法:
- DagFileProcessorAgent會保存dag文件的統(tǒng)計信息辐宾,其他代碼會根據(jù)agent中保存的最新信息做相應(yīng)處理狱从;
- 解析到的simple_dags信息(這里返回的都是可以被執(zhí)行的dag,而其在db中對應(yīng)的tis會被設(shè)置為SCHEDULED狀態(tài))叠纹,會在后續(xù)交給Scheduler進行任務(wù)調(diào)度處理季研。
2.3.3.3.2 SimpleDagBag(simple_dags): 將收集到的所有的simple_dags包裝成SimpleDagBag
# Send tasks for execution if available
simple_dag_bag = SimpleDagBag(simple_dags)
class SimpleDagBag(BaseDagBag):
"""
A collection of SimpleDag objects with some convenience methods.
"""
def __init__(self, simple_dags):
"""
Constructor.
:param simple_dags: SimpleDag objects that should be in this
:type list(airflow.utils.dag_processing.SimpleDagBag)
"""
self.simple_dags = simple_dags
self.dag_id_to_simple_dag = {}
for simple_dag in simple_dags:
self.dag_id_to_simple_dag[simple_dag.dag_id] = simple_dag
@property
def dag_ids(self):
"""
:return: IDs of all the DAGs in this
:rtype: list[unicode]
"""
return self.dag_id_to_simple_dag.keys()
def get_dag(self, dag_id):
"""
:param dag_id: DAG ID
:type dag_id: unicode
:return: if the given DAG ID exists in the bag, return the BaseDag
corresponding to that ID. Otherwise, throw an Exception
:rtype: airflow.utils.dag_processing.SimpleDag
"""
if dag_id not in self.dag_id_to_simple_dag:
raise AirflowException("Unknown DAG ID {}".format(dag_id))
return self.dag_id_to_simple_dag[dag_id]
2.3.3.3.3 self._validate_and_run_task_instances: 驗證并執(zhí)行tis
_Scheduler.validate_and_run_task_instances()
def _validate_and_run_task_instances(self, simple_dag_bag):
if len(simple_dag_bag.simple_dags) > 0:
try:
# 1.處理文件解析的結(jié)果,并將其入executor的queued_tasks誉察,并修改tis狀態(tài):SCHEDULED->QUEUED;
self._process_and_execute_tasks(simple_dag_bag)
except Exception as e:
self.log.error("Error queuing tasks")
self.log.exception(e)
return False
# Call heartbeats
self.log.debug("Heartbeating the executor")
# 2.調(diào)用executor的heartbeat發(fā)送心跳
# 1) executor.trigger_tasks(open_slots)異步執(zhí)行queued_tasks中的tis与涡;
# 2) executor.sync()同步tis的元數(shù)據(jù)
self.executor.heartbeat()
# 3.對于executor的queued_tasks中未被執(zhí)行的ti,清空queued_tasks持偏,并將其狀態(tài)修改為SCHEDULED
self._change_state_for_tasks_failed_to_execute()
# Process events from the executor
# 4.根據(jù)executor中異步執(zhí)行的tis的結(jié)果驼卖,進行相應(yīng)的邏輯處理
self._process_executor_events(simple_dag_bag)
return True
-
self._process_and_execute_tasks(simple_dag_bag)
-
處理那些dag_run中狀態(tài)為非running的的task_instance,如果task_instance的狀態(tài)為up_for_retry鸿秆,但是其dag_run不是running狀態(tài)酌畜,那么將task_instance的狀態(tài)設(shè)置為failed,后續(xù)不再調(diào)度它們卿叽;
在這里插入圖片描述 -
如果task_instance的狀態(tài)為scheduled/queued/up for reschedule桥胞,但是其dag_run的狀態(tài)不是running那么將其狀態(tài)設(shè)置為None,后續(xù)不再調(diào)度它們考婴;
在這里插入圖片描述 -
準備執(zhí)行那些滿足條件的task_instance:
-
按照條件來查找出可被執(zhí)行的tis:滿足條件(priority/concurrency/max_active_runs/pool limits)狀態(tài)為SCHEDULED的task_instance贩虾;
-
對應(yīng)dag的不為paused,dag_run不是backfill蕉扮,tis為SCHEDULED狀態(tài)整胃;
在這里插入圖片描述
-
在db中修改上述可被執(zhí)行的task_instance的狀態(tài)為QUEUED颗圣;
-
在executor中對上述的task_instance生成
airflow run XXX
命令喳钟,并將這些命令執(zhí)行入隊操作(放入executor的queued_tasks的字典中);queued_tasks字典中的元素如下:
key:
self.dag_id
,self.task_id
,self.execution_date
,self.try_number
value: (
command
,priority
,queue
,simple_task_instance
)
-
-
-
self.executor.heartbeat()
根據(jù)可用的slots數(shù)在岂,將執(zhí)行任務(wù)的
airflow run xxx
命令通過celery發(fā)送給遠端的worker來執(zhí)行奔则;-
獲取遠端worker執(zhí)行任務(wù)的執(zhí)行狀態(tài)和并在Schduler節(jié)點的executor中的保存任務(wù)狀態(tài)信息,其結(jié)果會在保存在event_buffer字典中:
key:
self.dag_id
,self.task_id
,self.execution_date
,self.try_number
value:
State.FAILED
orState.SUCCESS
event_buffer字典中任務(wù)狀態(tài)信息會在self._process_executor_events方法中使用蔽午。
-
self._change_state_for_tasks_failed_to_execute()
- 對于executor的queued_tasks中未被執(zhí)行的tis易茬,在db中找到對應(yīng)的QUEUED狀態(tài)的tis,將其狀態(tài)修改為SCHEDULED,并清空queued_tasks字典抽莱;
-
self._process_executor_events(simple_dag_bag)
-
如果executor已執(zhí)行的task收到的狀態(tài)回復(fù)為FAILED或SUCCESS范抓,但是db中ti狀態(tài)為QUEUED,則任務(wù)可能為killed externally食铐,將其db中的tis的狀態(tài)修改為FAILED匕垫。
我們在使用external_task_sensor的時候,當external_task_sensor設(shè)置的reschedule_date時間非常短虐呻,可能會造成上面的問題象泵。
主要原因是第一次執(zhí)行external_task_sensor的時候,不滿足條件將其設(shè)置為reschedule斟叼,而Scheduler又非撑蓟荩快速的將該dag文件解析入隊執(zhí)行,導(dǎo)致當前的ti的狀態(tài)被修改為QUEUED狀態(tài)朗涩,但是這是由于才開始執(zhí)行到self._process_executor_events忽孽,會出現(xiàn)SUCCESS != QUEUED的情況馋缅,導(dǎo)致最終db中的ti狀態(tài)變成FAILED扒腕。
參考:
-
2.3.3.3.4 周期的執(zhí)行SchedulerJob的心跳方法
# Heartbeat the scheduler periodically
# 當前時間和上一次scheduler心跳間隔
time_since_last_heartbeat = (timezone.utcnow() -
last_self_heartbeat_time).total_seconds()
# self.heartrate為配置文件中scheduler_heartbeat_sec指定的scheduler心跳頻率
if time_since_last_heartbeat > self.heartrate:
self.log.debug("Heartbeating the scheduler")
# 執(zhí)行heartbeat()方法
# 1.如果job的狀態(tài)被修改為SHUTDOWN,則kill當前job;
# 2.如果沒有達到指定job的心跳頻率(job_heartbeat_sec)萤悴,則sleep瘾腰;
# 3.使用當前時間更新job表中心跳時間。
self.heartbeat()
# 設(shè)置當前心跳執(zhí)行的時間
last_self_heartbeat_time = timezone.utcnow()
airflow.cfg中的scheduler_heartbeat_sec配置項:
# The scheduler constantly tries to trigger new tasks (look at the
# scheduler section in the docs for more information). This defines
# how often the scheduler should run (in seconds).
scheduler_heartbeat_sec = 5
airflow.cfg中的job_heartbeat_sec配置項:
# Task instances listen for external kill signal (when you clear tasks
# from the CLI or the UI), this defines the frequency at which they should
# listen (in seconds).
job_heartbeat_sec = 5
2.3.3.3.5 self._processor_poll_interval: 輪訓(xùn)間隔時間
if not is_unit_test:
self.log.debug("Sleeping for %.2f seconds", self._processor_poll_interval)
# 如果設(shè)置了_processor_poll_interval則讓while程序sleep指定時間
time.sleep(self._processor_poll_interval)
airflow.cfg中的processor_poll_interval配置項:
# The number of seconds to wait between consecutive DAG file processing
processor_poll_interval = 1
2.3.3.3.6 self.num_runs: while循環(huán)終止條件dag文件達到指定的處理次數(shù)
if self.processor_agent.done:
self.log.info("Exiting scheduler loop as all files"
" have been processed {} times".format(self.num_runs))
# 所有文件配處理了num_runs次覆履,終止while循環(huán)
break
airflow.cfg中的num_runs配置項:
# The number of times to try to schedule each DAG file
# -1 indicates unlimited number
num_runs = -1
2.3.3.3.7 設(shè)置while循環(huán)的最小輪訓(xùn)時間
if loop_duration < 1 and not is_unit_test:
sleep_length = 1 - loop_duration
self.log.debug(
"Sleeping for {0:.2f} seconds to prevent excessive logging"
.format(sleep_length))
# 如果while循環(huán)間隔小于1秒蹋盆,則讓while循環(huán)sleep(1 - loop_duration)秒,即while循環(huán)最小間隔為1秒
sleep(sleep_length)
2.3.3.4 self.processor_agent.terminate(): 向DagFileProcessorManager發(fā)送終止信號
# Stop any processors
# 向DagFileProcessorManager發(fā)送終止信號硝全,讓其停止所有的DagFileProcessor進程栖雾。
self.processor_agent.terminate()
DagFileProcessorAgent.terminate()方法:
def terminate(self):
"""
Send termination signal to DAG parsing processor manager
and expect it to terminate all DAG file processors.
"""
if self._process and self._process.is_alive():
self.log.info("Sending termination message to manager.")
try:
# 通過Schduler進程的通信管道,向DagFileProcessorManager發(fā)送TERMINATE_MANAGER的信號
self._parent_signal_conn.send(DagParsingSignal.TERMINATE_MANAGER)
except ConnectionError:
pass
2.3.3.5 models.DAG.deactivate_stale_dags(execute_start_time):
# Verify that all files were processed, and if so, deactivate DAGs that
# haven't been touched by the scheduler as they likely have been
# deleted.
if self.processor_agent.all_files_processed:
self.log.info(
"Deactivating DAGs that haven't been touched since %s",
execute_start_time.isoformat()
)
# 將dag表中未被處理的dag記錄的is_active設(shè)置為False
models.DAG.deactivate_stale_dags(execute_start_time)
2.3.3.6 self.executor.end(): 結(jié)束executor
# 執(zhí)行executor的end方法,結(jié)束executor
self.executor.end()
CeleryExecutor.end()方法:
def end(self, synchronous=False):
if synchronous:
while any([
task.state not in celery_states.READY_STATES
for task in self.tasks.values()]):
time.sleep(5)
self.sync()
2.3.4 self.processor_agent.end():結(jié)束DagFileProcessorManager
結(jié)束Scheduler進程的子進程DagFileProcessorManager:
def end(self):
"""
Terminate (and then kill) the manager process launched.
:return:
"""
if not self._process:
self.log.warning('Ending without manager process.')
return
reap_process_group(self._process.pid, log=self.log)
self._parent_signal_conn.close()