文件名称:
Airflow使用指南【About云整理】.pdf
开发工具:
文件大小: 2mb
下载次数: 0
上传时间: 2019-06-30
详细说明:About云整理的Airflow使用指南
1. 如何安装和配置 Airflow?
2. 如何通过 Airflow UI 监控 data pipeline (管道)并对其进行故障排除
3. 什么是 Airflow Platform ?
4. Airflow 是如何进行数据分析,追踪数据,调试数据流的?
5. Airflow 命令行接口的基本操作有哪些?www.aboutyun.com活到老学到老
以下是一些将触发一些任务实例的命令。在运行以下命令时,您应该能够在 example bash operator DAG中
看到任务的状态发生变化
#运行第一个任务实例
airflow run example bash operator runme 0 2015-01-01
#运行两天的任务回填
airflow backfill example bash operator -s 2015-01-0I -e 2015-01-02
安装
1.获取 Airflow:
安装最新稳定版 Airflow的最简单方法是使用pip:
pip install apache-airflow
您还可以安装 Airflow的一些别的支持功能组件,例如 sep apl或者 postgres:
pip install apache-airflow [postgres, gcp api]
2额外的护展包:
通过PyPI的 apache- airflow命令下载的基木包只含有启动的基础部分内容。您可以根据您环境的需要
下载您的护展包ε例如,如果您不需要连接 Postgres,那么您就不需要使用yum命令安装 postgres-deve1,
或者在您使用的系统上面安装 postgre应用,并在安装中的经历一些痛苫过程
除此之外, Airflow可以按照需求导入这些扩展包来使用
如下是列举出来的子包列表和他的功能
包名
安装命令
说明
pip install
al
所有Arow功能
apache-airflow[all]
pip install
all dbs
所有集成的数据库
apache-airflow [all dbs
pip install
Unicorn的异步 worker
async
apache-airflowlasync
classes
p⊥ p ins ta11
celery
apache-airflow[celery]
CeleryEXecutor
plp install
cloud
Cloudant hook
apache-airflowIcloudantI
p1p⊥ natal1
crypto
apache-airflow[cryptol
加密元数据db中的连接密码
www.aboutyun.com活到老学到老
plp Install
devel
最小开发工具要求
apache-airf⊥ ow leve⊥」
p⊥ p insta11
deve hado
apache- airflow[ develhadoAirflow+ Hadoop stack的依赖
op]
pip install
Druid. jo相关的 operators和
ru
apache-airflow[druid]
hooks
Goge云平台 hooks和
pip install
operators(使用
gcp apI
apache-airflow [gcp api
googIe-api-python-c
⊥ient)
pip install
github enter
apache-airf1ow[ github cntGithub企业版身份认证
erprIse
pip insta⊥1
google auth apache-airf1ow[goog1 e autGoogle身份认证
h
pip Install
hdfs
apache-airflow [hdfs]
HDFS hooks和 operators
install
hive
apache-airflow [hive]
所有Hive相关的 operators
pip insta⊥
jdbc
apache-airflow[jdbc]
JDBC hooks和 operators
Kerberos集成 Kerberized
kerberos
pip install
apache-airflow[kerberos I
Hadoop
install
kubernetes
apache-airflow [kubcrnetes Kubernetes Executor uK
operator
pip install
p
apache-airflow [ldap]
用户的LDAP身份验证
pip install
Microsoft SQL Server operators
mssql
apache-airflowlmssal
和hook,作为 Airflow后端支持
MySQL operators和hook,支
持作为Afow后端。 MySQL
pip install
mysal
q
服务器的版本必须是56.4+
apache-airflow [mysqlI
确切的版本上限取决于
mysq1c1ient包的版本。例
www.aboutyun.com活到老学到老
如, mysql1ient1.3.12
只能与 MySQL服务器5.64
到57一起使用。
pip install
password
用户密码验证
apache-airflow[password]
pip install
Postgres operators和hook,作
postgres
apache- airflow[ postgres]为Afw后端支持
pip install
启用QDs( Qubole数据服务)
apache-airflow [ads I
支持
p⊥ p insta11
bimq作为 Celery后端支
rabbitmq
apache-airflowlrabbitma]
持
pip install
redis
apache-airflow l
Redis hooks和 sensors
3
pip Install
S3Keysensor,
apache-airflow[s3
S3Prefixsensor
p- p Insta⊥1
samba
Hive2Sambaoperator
apache-airflow[sambal
pip Install
SlackAPIPostoperato
apache-airflow[slack]
pip insta⊥⊥
ssh
SSH hooks及 Operator
apache-airflow [ssh]
pip install
做为Arow后端的 Vertica
vertica
apache-airflowlvertical
hook支持
3初始化Airf1ow数据库
在您运行任务之前, Airflow需要初始化数据库。如果您只是在试验和学习 Airflow,您可以坚持使用默
认的 SQLite选项。如果您不想使用SaLiτe,请査看初始化数掘库后端以设置其他数据库。
配置完成后,若您想要运行任务,需要先初始化数据庠:
airflow initdb
操作指南
设置配置选项:
第一次运行 Airflow时,它会在$ AIRFLOW HOME目录中创建一个名为 airflow.cfg的文件(默认情况下为
/ airflow)。此文件包含 Airflow的配置,您可以对其进行编辑以更改任何设置。您还可以使用以下格式
设置带有环境变量的选项:$ AIRFL0W[ SECTIONE(KEY:(注意使用双下划线)。
www.aboutyun.com活到老学到老
例如,元数据库连接字符牛可以在 airflow.cfg设置,如下所示
Core」
sql alchemy conn- my conn string
或者通过创建相应的环境变量
AIRFLOW CORE SQL ALCHEMY coNN- my conn string
您还可以通过将cmd附加到键来在运行时派生连接字符串,如下所示
Icore
sql alchemy conn cmd= bash command to run
下列配置选项支持这种cmd配置办法
[core韶分巾的 sql alchemy conn
[core]部分中的 fernet kev
[ celery]部分中的 broker ur1
[ celery]部分中的 result backend
Atlas」部分中的 password
smtp]韶分中的 smtp password
[1dap]部分中的 bind passw
[ kubernetes]部分中的 git pas sword
这背后的想法是不将密码存储在纯文本文件的框屮。
所有配置选项的优先顺序如下
1.环境变量
2. airflow.cfg中的配置
3. airflow.cfg屮的命令
默认
2初始化数据库后端:
如果您想对Airf1low进行真正的试使用,您应该考虑设置一个真正的数据库后端并切换到 Localexccutor
由于 Airflow是使用优秀的 Sqlalchemy库与其元数据进行交互而构建的,因此您可以使用任何
alchemy所支持的数据库作为后端数据库。我们准荐使用 My SQL或 Postgres
注意:我们依赖更严格的 MYSQL S劬L设置来获得合理的默认值。确保在[ mysqld]下的m.cnf中指定了
explicit defaults for timestamp=1;
注意:如果您泱定使用 Postgres,我们建议您使用 psycopg2驱动程序并在 Sqlalchemy连接字符串中指
定它。另请注意,由 Sqlalchemy没有公开在 Postgres连接URI中定位特定模式的方法,因此您可能
需要使用类似于 ALTER ROLE username SEt search path= airflow, foobar;的命令为您的角色设置默认
模式
www.aboutyun.com活到老学到老
旦您设置好管理 Airflow的数据库以后,您需要更改配置文件$ AIRFLOW HOME/ airflow.cfg中的
Sqlalchemy连接字符串。然后,您还应该将“ executor”设置更改为使用“ ocalexecutor”,这是一个
可以在不地并行化任务实例的执行程序。
#初始化数据库
airflow initdb
3.使用0 perators(执行器):
operator(执行器)代表一个理想情况卜是幂等的任务。 operator(执行器)决定了DAG运行时实际执行
的内容
(1) BashOperator
使用 BashOperator在 Bash shell中执行命令。
run this- Bashoperator(
task_id=run_after_ loop'
bash command=echo 1
dag=dag
模板
您可以使用 Jinja模板来参数化 bash command参数。
also run this= BashOperator(
skid=’ also run this
bash command-'echo "run id-lt run id))I dag run-[ dag run JI"
dag dag,
故障排除
(a)找不到 ninja模板
在使用 bash command参数直接调用Bash脚本时,需要在脚本名称后添加空格。这是因为Ai「ow尝试
将 Jinja模板应用于一个失败的脚本
t2= Bashoperator(
task_id="bash example
#这将会出现` nja template not found的错误
bash command="/ home/batcher/test. sh
#在加了空格之后,这会正常作
bash command= home/ batcher/test. sh"
www.aboutyun.com活到老学到老
dag=dag)
(2) PythonOperator
使用 PythonOperator执行 Python回调。
def print context( ds, *kk kwargs
print( kwargs
print( ds
return Whatever you return gets printed in the logs
run this= PythonOperator
isk id= print the context
provide_context-True
py thon callable print_context
dag-dag
传递参数
使用 op args和 op kwargs参数将额外参数传递给 Python的回遁函数
def my sleeping function (random base
"这是一个将在DAG执行体中运行的函数
time. sleep (random base
Gcnerate 10 sleeping tasks, sleeping from 0 to 4 seconds respectivcly
in range(5)
ask- Py thonOperator(
task id='sleep for '+ str(i)
py thon callable-my sleeping function
T' random base': float(i)/101
模板
当您将 provide context参数设置为True,Airf1ow会传入一组额外的关键字参数:一个用于每个 Jinja模
板变量和一个 templates dict参数。
templates dict参数是模板化的,因此字典中的每个值都被评估为 Jinja模板。
www.aboutyun.com活到老学到老
(3) Google云平台0 perators(执行器)
GoogleCloudStorageToBigQueryOperator
使用 GooglecloudStorageToBigQuery Operator执行 BigQuery加载作业。
GceInstanceStartOperator
允许启动一个已存在的 Google compute engine实例。
在此示例中,参数值从 Airflow变量中提取。此外, default args字典用于将公共参数传递给单个DAG中
的所有 operator(执行器)
PROJECT ID- modcls. Variable got( PROJECT ID,1')
LOCATION- models. Variable get( LOCATION
INSTANCE- models. Variable get(INSTA\CE,)
SHORT MACHINE TYPE \AME- models. Variable get( SHORT MACHINE TYPE NAME',')
SET MACHINE TYPE BODY
machine Type':'zones/0/machineTypes/0. format(LOCATION, SHORT MACHINE TYPE NAME)
default args
start date: airflow utils. dates. days ago (1)
通过将所需的参数传递给构造函数来定义 Gce Ins tanceStartOperator。
gce instance start
GceInstanceStart Operator(
project id=PRO JECT ID
zone=LOCATION
resource id=INSTANCE
task id' gcp compute start task'
Gcelns tance StopOperator
允许停止一个已存在的 Google Compute engine实例
参数定义请参阋上面的 Gce InstanceStartoperator
通过将所需的参数传递给构造函数来定义 Gce Instance StopOperator
gce instance stop= Gce Instance Stop(perator
project id=PROJE
CT
www.aboutyun.com活到老学到老
zone=LOCATION
resource id=INSTANCH
task_id-'gcp compute stop_ task'
GceSetMachine TypeOperator
允许把一个已停止实例的机器类型改变至特定的类型。
参数定义请参阅上面的 GceInstanceStartOperator
通过将所需的参数传递给构造函数米定义 GceSetMachineTypeOperator
gce set machine type- Gcc SetMachineTypcOperator(
project id=PROJECT ID
zone- LOCATION
re source id=INSTANCE
body=SET MACHINE TYPE BODY,
task id= gcp compute set machine type
GcfFunctionDeleteOperator
使用 default args字典来传递参数给 operator(执行器)。
PROJECT ID- models. Variable get( PROJECT ID
LOCATION- models. Variable get( LOCATION',')
ENTRYPOINT- models. Variable get( ENTRYPOINT,'1)
A fully-qualified name of the function to delete
FLNCTION NAME = projects/0/locations/f!/functions/I. format(PROJECT ID, LOCATION
ENTRYPOINT)
ae'start date airflow utils, dates. days ago(1)
It
使用 GcfFunctionDeleteOperator来从 Google cloud Functions删除一个函数。
t1=GcfFunctionDeleteOperator(
task id="gcf delete task
name=FLNCTION NAME
(a)故障排除
(系统自动生成,下载前可以参看下载内容)
下载文件列表
相关说明
- 本站资源为会员上传分享交流与学习,如有侵犯您的权益,请联系我们删除.
- 本站是交换下载平台,提供交流渠道,下载内容来自于网络,除下载问题外,其它问题请自行百度。
- 本站已设置防盗链,请勿用迅雷、QQ旋风等多线程下载软件下载资源,下载后用WinRAR最新版进行解压.
- 如果您发现内容无法下载,请稍后再次尝试;或者到消费记录里找到下载记录反馈给我们.
- 下载后发现下载的内容跟说明不相乎,请到消费记录里找到下载记录反馈给我们,经确认后退回积分.
- 如下载前有疑问,可以通过点击"提供者"的名字,查看对方的联系方式,联系对方咨询.