我們安裝了hadoop集群后就可以使用了. 但是生產(chǎn)環(huán)境的hadoop任務(wù)非常復(fù)雜, 必須有維護(hù)任務(wù)調(diào)度和依賴的工具. CDH官方的工具是oozie, 我們的選型是airflow.
airflow相對于其他的hadoop任務(wù)調(diào)度工具(ooziw, azkaban)具有社區(qū)活躍, 可視化強(qiáng)大等優(yōu)點. 我們使用airflow主要解決如下問題:
1. 定時驅(qū)動任務(wù); hadoop作業(yè)的典型場景是T+1, 即每天的凌晨跑昨天的數(shù)據(jù). 這樣就需要有類似crontab的定時驅(qū)動功能. 驅(qū)動的最小粒度是分鐘.
2. dag編程模型. 所謂的dag編程模型, 就是你可以將一個項目分成若干任務(wù), 每個任務(wù)都可以依賴其他任務(wù)的正確執(zhí)行. 任務(wù)之間根據(jù)依賴關(guān)系形成一個有向無環(huán)圖(dag).
3. dag之間的可以互相依賴. 這個功能是目前azkaban不具備的, airflow是通過senor完成的. 由于hadoop任務(wù)一定是依賴某些特定的數(shù)據(jù)ready后執(zhí)行的. 比如你計算DAU, 必須要等待訂單數(shù)據(jù)倉庫的ready. 但是一般的調(diào)度是不知道其他的dag執(zhí)行情況的. 另外如果把所有的互相依賴任務(wù)都放在同一個dag里面, dag顯得就特別臃腫了.
4. 可以設(shè)置全局變量.
5. 至少可以調(diào)度 sqoop, hive, spark任務(wù).
當(dāng)然, 需要提醒一句, airflow是一個離線調(diào)度工具, 它不是運行storm等流失數(shù)據(jù)的合適工具.
安裝airflow 1.10.0
我們選擇安裝1.10.0 版本, 這個版本在pip中還沒有更新, 只能通過源碼安裝. 1.10.0相對于1.9多了時區(qū)設(shè)置的功能. 這個功能對于相對于UTC差了8個時區(qū)的中國來說非常重要, 不然不得不處理調(diào)度時間和服務(wù)器時間不一致的問題.
yum install gcc-c++ -y
yum install python-devel -y
yum -y install mysql-community-devel
yum install -y krb5-devel
yum install cyrus-sasl-lib.x86_64 -y
yum install cyrus-sasl-devel.x86_64 -y
yum install libgsasl-devel.x86_64 -y
pip uninstall configparser
pip install configparser -U
cd /tmp;
git clone https://github.com/apache/incubator-airflow.git
cd incubator-airflow/
git checkout v1-10-stable
export SLUGIFY_USES_TEXT_UNIDECODE=yes
pip install celery
pip install .
pip install .[all]
安裝 rabbitmq
yum install erlang -y
rpm --import https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc
yum install rabbitmq-server
systemctl start rabbitmq-server.service
systemctl enable rabbitmq-server.service
rabbitmqctl add_user airflow 123456
rabbitmqctl set_user_tags airflow administrator
rabbitmqctl set_permissions -p / airflow ".*" ".*" ".*"
配置數(shù)據(jù)庫
使用root登陸mysql并創(chuàng)建用戶和database:
grant all privileges on airflow.* to airflow@'%' identified by '123456';
flush privileges;
create database airflow DEFAULT CHARACTER SET utf8;
還要設(shè)置一下mysql的配置文件:
找到 mysql 的my.ini 的 [mysqld], 加入
explicit_defaults_for_timestamp
重啟mysqld
創(chuàng)建環(huán)境變量
mkdir -p /home/airflow
echo "export AIRFLOW_HOME=/home/airflow" >> /etc/profile
source /etc/profile
配置
airflow initdb
ls /home/airflow/airflow.cfg # 確保此文件的存在
修改 /home/airflow/airflow.cfg:
executor = CeleryExecutor
sql_alchemy_conn=mysql://123456@yun00:3306/airflow
broker_url = amqp://123456@yun00:5672/
result_backend = db+mysql://123456@yun00:3306/airflow
catchup_by_default = False
max_active_runs_per_dag = 1
這里我們設(shè)置max_active_runs_per_dag為一, 保證同一個dag只能依次運行.
初始化db
airflow initdb
注意第一次運行是為了生成airflow.cfg文件, 這次才是配置mysql.
安裝systemd啟動文件
cd /tmp
wget https://raw.githubusercontent.com/apache/incubator-airflow/master/scripts/systemd/airflow.conf
mv airflow.conf /etc/tmpfiles.d/
wget https://raw.githubusercontent.com/apache/incubator-airflow/master/scripts/systemd/airflow-webserver.service
mv airflow-webserver.service /usr/lib/systemd/system/
修改airflow-webserver.service:
ExecStart=/bin/airflow webserver -p3425 --pid /run/airflow/webserver.pid
監(jiān)聽3425端口
修改/etc/sysconfig/airflow:
AIRFLOW_HOME=/home/airflow
mkdir -p /run/airflow; chown airflow:airflow /run/airflow/
systemctl restart airflow-webserver.service
systemctl status airflow-webserver.service # 查看服務(wù)狀態(tài)
wget https://raw.githubusercontent.com/apache/incubator-airflow/master/scripts/systemd/airflow-scheduler.service
mv airflow-scheduler.service /usr/lib/systemd/system/
systemctl restart airflow-scheduler.service
systemctl status airflow-scheduler.service # 查看服務(wù)狀態(tài)
wget https://raw.githubusercontent.com/apache/incubator-airflow/master/scripts/systemd/airflow-worker.service
mv airflow-worker.service /usr/lib/systemd/system/
systemctl restart airflow-worker.service
systemctl status airflow-worker.service # 查看服務(wù)狀態(tài)
測試
啟動: example_branch_dop_operator_v3 這是一個分鐘級別的任務(wù), 如果所有配置正確的話, 可以看到這個任務(wù)每分鐘執(zhí)行一次.
airflow的時區(qū)設(shè)置
airflow默認(rèn)的調(diào)度時間為UTC. 這個非常麻煩, 因為中國的服務(wù)器時間是北京時間, 但是ariflow的調(diào)度時間是UTC, 相差了8小時.
airflow直到1.10.0 才解決這個問題. 有人會疑惑為什么這個基礎(chǔ)的功能這個就才解決. airflow的開發(fā)者由于是跨國公司, 他們的服務(wù)器時間正是UTC. 所以他們不存在這個問題, 支持本地時間也是由于社區(qū)的要求才開發(fā)的功能.
在 airflow.cfg中配置:
default_timezone = Asia/Shanghai
airflow的分布式方案
airflow的webserver和scheduler是單點的, 但是可以對worker進(jìn)行分布式部署. 只需要將相同的配置和環(huán)境變量拷貝過去, 同時啟動worker進(jìn)程. scheduler能及時發(fā)現(xiàn)新的worker.
有一點需要注意的是, worker的運行依賴于dags目錄. 因此dags文件必須在每個airflow主機(jī)上存在. 本人推薦使用掛載公共目錄的方式進(jìn)行共享.
airflow的開發(fā)機(jī)配置
為了安全, 開發(fā)人員使用的機(jī)器不能是airflow運行的機(jī)器, 需要單獨配置機(jī)器. 開發(fā)機(jī)的配置和worker機(jī)器一樣的流程, 只是不需要啟動任何服務(wù)而已.