<返回更多

Apache Airflow的完整介绍

2020-06-04    
加入收藏

关键概念,安装和DAG实际示例

Apache Airflow的完整介绍

> Photo by Şahin Yeşilyaprak on Unsplash

 

Airflow是用于自动化和安排任务和工作流程的工具。 如果您想以数据科学家,数据分析师或数据工程师的身份高效工作,那么拥有一个可以自动执行要定期重复的流程的工具至关重要。 从提取,转换和加载常规分析报告的数据到自动重新训练机器学习模型,这可以是任何事情。

Airflow使您可以轻松地自动化主要由Python和SQL编写的简单到复杂的流程,并具有丰富的Web UI来可视化,监视和修复可能出现的任何问题。

下面的文章是对该工具的完整介绍。 我已经包含了从虚拟环境中安装到以简单易懂的步骤运行第一个dag的所有内容。

我将本教程分为6个部分,以使其易于理解,以便您可以跳过已经熟悉的部分。 包括以下步骤:

· 基本气流概念。

· 如何在虚拟环境中设置Airflow安装。

· 运行Airflow Web UI和调度程序。

· 常见的CLI命令列表。

· Web UI导览。

· 创建一个真实的示例DAG。

1.基本概念

在讨论Airflow的安装和使用之前,我将简要介绍该工具至关重要的几个概念。

DAG

该工具的核心是DAG(有向无环图)的概念。 DAG是您要在工作流中运行的一系列任务。 这可能包括通过SQL查询提取数据,使用Python执行一些计算,然后将转换后的数据加载到新表中。 在Airflow中,每个步骤都将作为DAG中的单独任务编写。

通过Airflow,您还可以指定任务之间的关系,任何依赖关系(例如,在运行任务之前已将数据加载到表中)以及应按顺序运行任务。

DAG用Python编写,并另存为.py文件。 该工具广泛使用DAG_ID来协调DAG的运行。

DAG运行

我们具体说明了DAG应何时通过execute_date自动运行。 DAG按照指定的时间表运行(由CRON表达式定义),该时间表可以是每天,每周,每分钟或几乎任何其他时间间隔

Operator

Operator将要在每个任务中执行的操作封装在DAG中。 Airflow有大量的内置操作员,它们可以执行特定任务,其中一些特定于平台。 此外,可以创建自己的自定义运算符。

2.安装

我将为您提供在隔离的Pipenv环境中进行Airflow设置的个人设置。 如果您使用其他虚拟环境工具,则步骤可能会有所不同。 此设置的大部分灵感来自于出色的Stackoverflow线程。

对您的Airflow项目使用版本控制是一个好主意,因此第一步是在Github上创建一个存储库。 我叫我airflow_sandbox。 使用git clone" git web url"创建到本地环境的存储库克隆后。

从终端导航到目录,例如 cd / path / to / my_airflow_directory。

进入正确的目录后,我们将安装pipenv环境以及特定版本的Python,Airflow本身和Flask,这是运行Airflow所必需的依赖项。 为了使一切正常工作,最好为所有安装指定特定版本。

pipenv install --python=3.7 Flask==1.0.3 Apache-airflow==1.10.3

气流需要在本地系统上运行一个称为AIRFLOW_HOME的位置。 如果未指定,则默认为您的路线目录。 我更喜欢通过在.env文件中指定它来在我正在工作的项目目录的路径中设置Airflow。 为此,只需运行以下命令。

echo "AIRFLOW_HOME=${PWD}/airflow" >> .env

接下来,我们初始化pipenv环境。

pipenv shell

气流需要运行数据库后端。 默认设置为此使用SQLite数据库,这对于学习和实验来说是很好的选择。 如果您想建立自己的数据库后端,那么气流文档会提供很好的指导。 初始化数据库类型。

airflow initdb

最后,我们创建一个目录来存储我们的dag。

mkdir -p ${AIRFLOW_HOME}/dags/

就是说初始基本设置完成了。 您现在应该具有一个如下所示的项目结构。

Apache Airflow的完整介绍

 

3.运行Airflow

Airflow具有出色的Web UI,您可以在其中查看和监视您的问题。 要启动Web服务器以查看UI,只需运行以下CLI命令。 默认情况下,Airflow将使用端口8080,因为我已经使用该端口运行我指定8081的其他端口。

airflow webserver -p 8081

我们还需要启动调度程序。

airflow scheduler

现在,如果我们导航到http:// localhost:8081 / admin /?showPaused = True。 我们将看到以下屏幕。

Apache Airflow的完整介绍

 

Airflow有一些显示在用户界面中的示例dag。 一旦开始创建自己的窗口,您可以通过单击屏幕底部的"隐藏暂停的DAG"来隐藏它们。

4.基本的CLI命令

让我们使用这些示例dag来浏览一些常见的Airflow CLI命令。

让我们从教程dag中运行sleep任务。

airflow run tutorial sleep 2020-05-31

我们可以在教程DAG中列出任务。

bash-3.2$ airflow list_tasks tutorial

暂停此DAG。

airflow pause tutorial

取消暂停教程。

airflow unpause tutorial

回填(在过去的日期执行任务)。 指定dag_id(教程),开始日期(-s)和结束日期(-e)。

airflow backfill tutorial -s 2020-05-28 -e 2020-05-30

有关CLI命令的完整列表,请参见文档中的此页面。

5. Web UI

我们可以从Web UI监视,检查和运行任务。 如果返回到Web服务器,我们可以看到我们在教程DAG上运行的CLI命令的效果。 为了便于查看,我隐藏了暂停的dag。

Apache Airflow的完整介绍

 

我们可以通过多种方法来检查DAGS的运行情况。

如果我们选择树状视图。

Apache Airflow的完整介绍

 

我们可以轻松查看哪些任务已运行,正在运行或已失败。

Apache Airflow的完整介绍

 

我们还可以通过单击小方块从此处运行,清除或标记特定任务。

Apache Airflow的完整介绍

 

如果单击"渲染"按钮,我们可以查看已运行的代码或命令。

Apache Airflow的完整介绍

 

通过"代码"视图,我们可以查看组成DAG的代码。

Apache Airflow的完整介绍

 

Graph View是一种很好的方式可视化任务的排序或相关方式。

Apache Airflow的完整介绍

 

Web UI中的另一个重要区域是Admin。 在这里,您可以定义与其他平台(如数据库)的连接,并定义可重用的变量。

Apache Airflow的完整介绍

 

6.第一个DAG

我将在此处尝试提供一个接近实际的DAG示例,以说明至少一种使用Airflow的方法,并介绍随之而来的一些复杂性。

我将编写一个Airflow DAG,它首先检查BigQuery公共数据集中感兴趣日期的数据是否存在,然后按日程将数据加载到我自己的私有项目中的表中。

BigQuery有一个免费的使用层,该层允许您每月查询1TB数据,因此,如果您想自己尝试一下,则可以以零成本进行。

BigQuery设置

为了同时使用Airflow和BigQuery,我们需要首先完成一些附加的设置步骤。

为了能够通过Airflow查询和加载BigQuery中的数据,我们需要首先授予Airflow所需的安全权限。

为此,您需要在google Cloud Platform上创建一个服务帐户。 这有点像创建一个有权访问您的帐户的用户,但其目的是允许其他平台访问。

首先,从Google Cloud Console导航到服务帐户。 然后单击创建服务帐户按钮。

接下来填写显示的表格。

Apache Airflow的完整介绍

 

在下一页上,您需要选择要授予的访问级别。 我已选择所有资源的编辑器,因为这是我的个人帐户,并且此处没有存储任何敏感数据。 如果我更担心潜在的安全问题,那么我将授予更多的细化权限。

接下来,您需要创建一个私钥,可以通过选择创建密钥来完成。 选择JSON,因为这是Airflow所需要的。 私钥将被下载到您需要安全存储的本地系统。

现在,我们需要返回Airflow Web UI并使用此JSON文件的输出更新bigquery_default连接。 您还需要添加一个默认的项目ID,如下所示。

Apache Airflow的完整介绍

 

我们还需要在Google Pipenv环境中安装一些Google Cloud依赖项。 我已经安装了以下内容。

pipenv install google-cloud-storage httplib2 google-api-python-client google-cloud-bigquery pandas_gbq

创建DAG

以下是将执行上述步骤的DAG的代码。 应将其另存为.py文件在我们之前创建的dags目录中。

DAG的顶部是必需的进口。 Airflow提供了一系列运营商,可在Google Cloud Platform上执行大多数功能。 我已经导入了BigQueryOperator(用于运行查询和加载数据)和BigQueryCheckOperator(用于检查特定日期的数据是否存在)。

在DAG的下一部分中,我们定义dag_args,然后创建DAG,该DAG提供诸如dag_id,start_date和应该多长时间运行一次任务等信息。 Airflow使用CRON表达式定义时间表,有关这些表达式的更多信息,请访问此页面。

然后,我们将每个步骤定义为一项任务,我将其定义为变量t1和t2。 这些任务均在工作流程中执行特定步骤。 这些命令的运行顺序位于DAG的最底部。

现在,我们可以转到Web UI并运行DAG。

Apache Airflow的完整介绍

 

如果我们转到BigQuery控制台,我们还将看到Airflow已创建并加载了数据的表。

Apache Airflow的完整介绍

 

本文旨在作为一个完整的介绍,让您开始使用Airflow创建第一个DAG并开始运行。 有关更详细的使用指南,请在此处找到Airflow文档。

可以在此Github存储库中找到本文详细介绍的完整项目的链接。

谢谢阅读!

如果您想加入,我会每月发送一次通讯,请通过此链接注册。 期待成为您学习之旅的一部分!

(本文翻译自Rebecca Vickery的文章《A Complete Introduction to Apache Airflow》,参考:https://towardsdatascience.com/a-complete-introduction-to-apache-airflow-b7e238a33df)

声明:本站部分内容来自互联网,如有版权侵犯或其他问题请与我们联系,我们将立即删除或处理。
▍相关推荐
更多资讯 >>>