网站自适应框架,用织梦建设网站的步骤,下模板做网站,产品发布网站模板Apache Airflow已经成为Python生态系统中管道编排的事实上的库。与类似的解决方案相反#xff0c;由于它的简单性和可扩展性#xff0c;它已经获得了普及。在本文中#xff0c;我将尝试概述它的主要概念#xff0c;并让您清楚地了解何时以及如何使用它。
Airflow应用场景 …Apache Airflow已经成为Python生态系统中管道编排的事实上的库。与类似的解决方案相反由于它的简单性和可扩展性它已经获得了普及。在本文中我将尝试概述它的主要概念并让您清楚地了解何时以及如何使用它。
Airflow应用场景
想象一下你想要构建一个机器学习管道它由以下几个步骤组成
从基于云的存储中读取图像数据集处理图像使用下载的图像训练深度学习模型将训练好的模型上传到云端部署模型
你将如何安排和自动化这个工作流程Cron作业是一个简单的解决方案但它也带来了许多问题。最重要的是它们不允许你有效地扩展。Airflow提供了轻松调度和扩展复杂数据流程编排的能力另一方面它还能够在故障后自动重新运行它们管理它们的依赖关系并使用日志和仪表板监视它们。
在构建上述数据流之前让我们先了解Apache Airflow 的基本概念。
Airflow 简介
Apache Airflow 是一个开源的平台用于编排、调度和监控工作流工作流是由一系列任务Tasks组成的这些任务可以是数据处理、数据分析、机器学习模型训练、文件传输等各种操作。因此它是ETL和MLOps用例的理想解决方案。示例用例包括
从多个数据源提取数据对其进行聚合、转换并将其存储在数据仓库中。从数据中提取见解并将其显示在分析仪表板中训练、验证和部署机器学习模型
核心组件
在默认版本中安装Apache Airflow 时你将看到四个不同的组件。
Webserver: Webserver是Airflow的用户界面UI它允许您在不需要CLI或API的情况下与之交互。从那里可以执行和监视管道创建与外部系统的连接检查它们的数据集等等。执行器执行器是管道运行的机制。有许多不同类型的管道在本地运行在单个机器中运行或者以分布式方式运行。一些例子是LocalExecutor SequentialExecutor CeleryExecutor和KubernetesExecutor调度器调度器负责在正确的时间执行不同的任务重新运行管道回填数据确保任务完成等。PostgreSQL存储所有管道元数据的数据库。这通常是Postgres但也支持其他SQL数据库。
安装Airflow最简单的方法是使用docker compose。你可以从这里下载官方的docker撰写文件
$ curl -LfO https://airflow.apache.org/docs/apache-airflow/2.5.1/docker-compose.yaml基本概念
要学习Apache Airflow必须熟悉它的主要概念这些概念可能有点难理解让我们试着揭开它们的神秘面纱。
DAGs
所有管道都定义为有向无环图dag。每次执行DAG时都会创建一个单独的运行。每个DAG运行都是独立的并且包含一个关于DAG执行阶段的状态。这意味着相同的dag可以并行执行多次。
要实例化DAG可以使用DAG函数或与上下文管理器一起使用如下所示
from airflow import DAG
with DAG(mlops,default_args{retries: 1,},scheduletimedelta(days1),start_datedatetime(2023, 1, 1)
) as dag:# dag code goes here上下文管理器接受一些关于DAG的全局变量和一些默认参数。默认参数被传递到所有任务中并且可以在每个任务的基础上重写。完整的参数列表可以在官方文档中找到。
在本例中我们定义DAG将从2023年1月1日开始并且每天执行一次。retries参数确保在可能出现故障后重新运行一次。
task任务
DAG的每个节点表示一个Task即一段单独的代码。每个任务可能有一些上游和下游依赖项。这些依赖关系表示任务如何相互关联以及它们应该以何种顺序执行。每当初始化一个新的DAG运行时所有任务都初始化为Task实例。这意味着每个Task实例都是给定任务的特定运行。 operator(任务模板)
操作符可以被视为预定义任务的模板因为它们封装了样板代码并抽象了它们的大部分逻辑。常见的操作符有BashOperator、PythonOperator、MySqlOperator、S3FileTransformOperator。我们看到操作符可以定义遵循特定模式的任务。例如MySqlOperator创建任务来执行SQL查询而BashOperator执行bash脚本。
操作符在DAG上下文管理器中定义如下所示。下面的代码创建了两个任务一个执行bash命令另一个执行MySQL查询。
with DAG(tutorial
) as dag:task1 BashOperator(task_idprint_date,bash_commanddate,)task2 MySqlOperator(task_idload_table,sql/scripts/load_table.sql)任务依赖
为了形成DAG的结构我们需要定义每个任务之间的依赖关系。一种方法是使用符号如下所示
task1 task2 task3
# 一个任务有多个依赖
task1 [task2, task3]
# 也可以使用set_downstream, set_upstream
t1.set_downstream([t2, t3])xcom
xcom或相互通信负责任务之间的通信。xcom对象可以在任务之间推拉数据。更具体地说它们将数据推入元数据数据库其他任务可以从中提取数据。这就是为什么可以通过它们传递的数据量是有限的。但是如果需要传输大数据则可以使用合适的外部数据存储例如对象存储或NoSQL数据库。
看看下面的代码。这两个任务使用ti参数任务实例的缩写通过xcom进行通信。train_model任务将model_path推入元数据数据库元数据由deploy_model任务拉出。
dag DAG(mlops_dag,
)def train_model(ti):model_path train_and_save_model()ti.xcom_push(keymodel_path, valuemodel_path)def deploy_model(ti):model_path ti.xcom_pull(keymodel_path, task_idstrain_model)deploy_trained_model(model_path)train_model_task PythonOperator(task_idtrain_model,python_callabletrain_model,dagdag
)deploy_model_task PythonOperator(task_iddeploy_model,python_callabledeploy_model,dagdag
)train_model_task deploy_model_taskTaskflow
Taskflow API是一种使用Python装饰器task来定义任务的简单方法。如果所有任务的逻辑都可以用Python编写那么一个简单的注释就可以定义一个新任务。Taskflow自动管理其他任务之间的依赖关系和通信。
使用Taskflow API我们可以用dag装饰器初始化DAG。下面是使用Tashflow示例
dag(start_datedatetime(2023, 1, 1),schedule_intervaldaily
)
def mlops():taskdef load_data():. . .return dftaskdef preprocessing(data):. . .return datataskdef fit(data): return Nonedf load_data()data preprocessing(df)model fit(data)dag mlops()注意任务之间的依赖关系是通过每个函数参数隐含的。这里我们是简单的连接顺序但实际可以变得复杂得多。Taskflow API还解决了任务之间的通信问题因此使用xcom的需求有限。
调度
作业调度是Airflow的核心功能之一。这可以使用schedule_interval参数完成该参数接收cron表达式表示日期时间对象或预定义变量如hour daily等。更灵活的方法是使用最近添加的时间表它支持使用Python定义自定义时间表。
下面是如何使用schedule_interval参数的示例。以下DAG将每天执行。
dag(start_datedatetime(2023,1,1),schedule_interval daily,catchup False
)
def my_dag():pass关于调度需要了解两个非常重要的概念回填(backfill)和追赶(catchup)。
一旦我们定义了DAG我们就设置了开始日期和计划间隔。如果catchupTrue则Airflow 将为从开始日期到当前日期的所有计划间隔创建DAG运行。如果catchupFalse气流将只从当前日期调度运行。
回填扩展了这个想法使我们能够在CLI中创建过去的运行而不管catchup参数的值
$ airflow backfill -s START_DATE -e END_DATE DAG_NAME连接
Airflow 提供了一种简单的方法来配置与外部系统或服务的连接。可以使用UI、作为环境变量或通过配置文件创建连接。它们通常需要URL、身份验证信息和唯一id。钩子(Hooks )是一种API它抽象了与这些外部系统的通信。例如我们可以通过如下的UI定义一个PostgreSQL连接 然后使用PostgresHook来建立连接并执行我们的查询
pg_hook PostgresHook(postgres_conn_idpostgres_custom)conn pg_hook.get_conn()
cursor conn.cursor()
cursor.execute(create table _mytable (ModelID int, ModelName varchar(255))
cursor.close()
conn.close()高级概念
为了使本教程尽可能完整我需要提到一些更高级的概念。我不会详细介绍每一个但我强烈建议你看看他们如果你想深入掌握Airflow 。
分支分支允许你将任务划分为许多不同的任务如支持条件处理不同任务的工作流。最常见的方法是BranchPythonOperator。任务组任务组可以在单个组中组织多个任务。它是简化图形视图和重复模式的好工具。动态包包和任务也可以以动态的方式构造。从Airflow 2.3开始可以在运行时创建包和任务这对于并行和依赖输入的任务来说是理想的。气流也支持Jinja模板并且是对动态包非常有用的补充。单元测试和日志记录气流具有运行单元测试和记录信息的专用功能.
Airflow最佳实践
在我们看到实际操作的示例之前让我们讨论一下大多数从业者使用的一些最佳实践。
幂等性dag和任务应该是幂等的。使用相同的输入重新执行相同的DAG运行应该始终具有与执行一次相同的效果。原子性任务应该是原子性的。每个任务应该负责一个操作并且独立于其他任务增量过滤每个DAG运行应该只处理一批支持增量提取和加载的数据。这样可能出现的故障就不会影响整个数据集。顶级代码如果不是用于创建操作符或标记则应避免使用顶级代码因为它会影响性能和加载时间。所有代码都应该在任务内部包括导入包、数据库访问和繁重的计算。复杂性dag应尽可能保持简单因为高复杂性可能会影响性能或调度。