Airflow 是一个编排、调度和监控workflow的平台,由Airbnb开源,现在在Apache Software Foundation 孵化。
Airflow 将workflow编排为由tasks组成的DAGs(有向无环图),调度器在一组workers上按照指定的依赖关系执行tasks。同时,Airflow 提供了丰富的命令行工具和简单易用的用户界面以便用户查看和操作,并且Airflow提供了监控和报警系统。Airflow的调度依赖于crontab命令,与crontab相比Airflow可以直观的看到任务执行情况、任务之间的逻辑依赖关系、可以设定任务出错时邮件提醒、可以查看任务执行日志。
本次主要聊一下Airflow的搭建与简单使用,这应该是目前网上最全的配置了。这次搭建是在Centos7系统,Airflow版本是2.2.4。
安装
1 | airflow需要home目录,默认是~/airflow, |
更改DB
Airflow默认使用的是sqlite,正式环境需要改为MySQL。
安装MySQL
Centos下先安装MySQL,强烈建议直接安装8.0,否则各种异常情况。
1 | # 安装 |
创建Airflow使用的DB
1 | CREATE DATABASE airflow CHARACTER SET utf8; |
更改airflow.cfg
1 | sql_alchemy_conn = mysql://{USERNAME}:{PASSWORD}@{MYSQL_HOST}:3306/airflow |
启动服务
1 | 初始化数据库,如果sqlite版本过低,需要升级。执行完后出现cfg文件 |
创建dags
查看airflow.cfg中的配置dags_folder,在该目录下添加python文件。
start_date表示从2022-3-21开始,使用的时区为UTC。schedule_interval表示每隔三分钟需要执行一次,Airflow会根据开始时间和间隔时间自动计算出下一次的执行时间。
1 | import datetime |
代码中的A>>B,表示A先执行,后执行B。
BashOperator可以执行Bash命令,除了BashOperator外,还有其它Operator可以使用,具体可以查看资料。
过一会在Web页面上便能看到这个Dag:
关闭与重启
关闭airflow方案:
ps -ef | grep -Ei ‘airflow’ | grep -v ‘grep’ | grep -v ‘%s’ | awk ‘{print $2}’ | xargs -i kill -9 {}
删除~/airflow下的airflow-webserver和airflow-scheduler
airflow webserver -p 8080 -D
airflow scheduler -D
特殊情况可重新设置db:airflow db reset
总结
Airflow国内用的比较少,相关资料不全。如果想在实际业务中使用,对于Dag的基本组成部分、Operator等也要有所了解。
资料
sqlit版本太低
wget –no-check-certificat https://www.sqlite.org/2022/sqlite-autoconf-3380000.tar.gz