这是用户在 2024-8-15 16:30 为 https://airflow.apache.org/docs/apache-airflow/stable/tutorial/objectstorage.html 保存的双语快照页面,由 沉浸式翻译 提供双语支持。了解如何保存?

Object Storage 对象存储

This tutorial shows how to use the Object Storage API to manage objects that reside on object storage, like S3, gcs and azure blob storage. The API is introduced as part of Airflow 2.8.
本教程介绍如何使用对象存储 API 来管理驻留在对象存储上的对象,例如 S3、gcs 和 azure blob 存储。该 API 是作为 Airflow 2.8 的一部分引入的。

The tutorial covers a simple pattern that is often used in data engineering and data science workflows: accessing a web api, saving and analyzing the result.
本教程介绍了一种在数据工程和数据科学工作流中经常使用的简单模式:访问 Web API、保存和分析结果。

Prerequisites 先决条件

To complete this tutorial, you need a few things:
若要完成本教程,需要满足以下几项要求:To complete this tutorial, you need a few things:

  • DuckDB, an in-process analytical database, which can be installed by running pip install duckdb.
    DuckDB,一个进程内分析数据库,可以通过运行 pip install duckdb 来安装。

  • An S3 bucket, along with the Amazon provider including s3fs. You can install the provider package by running pip install apache-airflow-providers-amazon[s3fs]. Alternatively, you can use a different storage provider by changing the URL in the create_object_storage_path function to the appropriate URL for your provider, for example by replacing s3:// with gs:// for Google Cloud Storage, and installing a different provider.
    一个 S3 存储桶,以及包括 s3fs 在内的 Amazon 提供商。您可以通过运行 pip install apache-airflow-providers-amazon[s3fs] 来安装提供程序包。或者,您也可以使用不同的存储提供商,方法是将 create_object_storage_path 函数中的 URL 更改为提供商的相应 URL,例如,将 s3:// 替换为 Google Cloud Storage 的 gs://,并安装不同的提供商。

  • pandas, which you can install by running pip install pandas.
    ,您可以通过运行 pip install pandas 来安装。

Creating an ObjectStoragePath
创建 ObjectStoragePath

The ObjectStoragePath is a path-like object that represents a path on object storage. It is the fundamental building block of the Object Storage API.
ObjectStoragePath 是一个类似路径的对象,表示对象存储上的路径。它是对象存储 API 的基本构建块。

airflow/example_dags/tutorial_objectstorage.py
气流/example_dags/tutorial_objectstorage.py
[source] [资源]

base = ObjectStoragePath("s3://aws_default@airflow-tutorial-data/")

The username part of the URL given to ObjectStoragePath should be a connection ID. The specified connection will be used to obtain the right credentials to access the backend. If it is omitted, the default connection for the backend will be used.
为 ObjectStoragePath 提供的 URL 的用户名部分应为连接 ID。指定的连接将用于获取正确的凭据以访问后端。如果省略,将使用后端的默认连接。

The connection ID can alternatively be passed in with a keyword argument:
也可以使用关键字参数传入连接 ID:

ObjectStoragePath("s3://airflow-tutorial-data/", conn_id="aws_default")

This is useful when reusing a URL defined for another purpose (e.g. Dataset), which generally does not contain a username part. The explicit keyword argument takes precedence over the URL’s username value if both are specified.
这在重用为其他目的(例如数据集)定义的 URL 时非常有用,该 URL 通常不包含用户名部分。如果指定了 URL 的用户名值,则显式关键字参数优先于两者。

It is safe to instantiate an ObjectStoragePath at the root of your DAG. Connections will not be created until the path is used. This means that you can create the path in the global scope of your DAG and use it in multiple tasks.
在 DAG 的根目录处实例化 ObjectStoragePath 是安全的。在使用路径之前,不会创建连接。这意味着您可以在 DAG 的全局范围内创建路径,并在多个任务中使用它。

Saving data to Object Storage
将数据保存到对象存储

An ObjectStoragePath behaves mostly like a pathlib.Path object. You can use it to save and load data directly to and from object storage. So, a typical flow could look like this:
ObjectStoragePath 的行为大多类似于 pathlib。Path 对象。您可以使用它直接在对象存储中保存和加载数据。因此,典型的流程可能如下所示:

airflow/example_dags/tutorial_objectstorage.py
气流/example_dags/tutorial_objectstorage.py
[source] [资源]

    @task
    def get_air_quality_data(**kwargs) -> ObjectStoragePath:
        """
        #### Get Air Quality Data
        This task gets air quality data from the Finnish Meteorological Institute's
        open data API. The data is saved as parquet.
        """
        import pandas as pd

        execution_date = kwargs["logical_date"]
        start_time = kwargs["data_interval_start"]

        params = {
            "format": "json",
            "precision": "double",
            "groupareas": "0",
            "producer": "airquality_urban",
            "area": "Uusimaa",
            "param": ",".join(aq_fields.keys()),
            "starttime": start_time.isoformat(timespec="seconds"),
            "endtime": execution_date.isoformat(timespec="seconds"),
            "tz": "UTC",
        }

        response = requests.get(API, params=params)
        response.raise_for_status()

        # ensure the bucket exists
        base.mkdir(exist_ok=True)

        formatted_date = execution_date.format("YYYYMMDD")
        path = base / f"air_quality_{formatted_date}.parquet"

        df = pd.DataFrame(response.json()).astype(aq_fields)
        with path.open("wb") as file:
            df.to_parquet(file)

        return path

The get_air_quality_data calls the API of the Finnish Meteorological Institute to obtain the air quality data for the region of Helsinki. It creates a Pandas DataFrame from the resulting json. It then saves the data to object storage and converts it on the fly to parquet.
get_air_quality_data调用芬兰气象研究所的API来获取赫尔辛基地区的空气质量数据。它从生成的 json 创建一个 Pandas DataFrame。然后,它将数据保存到对象存储中,并动态将其转换为 parquet。

The key of the object is automatically generated from the logical date of the task, so we could run this everyday and it would create a new object for each day. We concatenate this key with the base path to create the full path to the object. Finally, after writing the object to storage, we return the path to the object. This allows us to use the path in the next task.
对象的键是从任务的逻辑日期自动生成的,因此我们可以每天运行它,并且它会为每天创建一个新对象。我们将此键与基本路径连接起来,以创建对象的完整路径。最后,在将对象写入存储后,我们将路径返回到对象。这使我们能够在下一个任务中使用该路径。

Analyzing the data
分析数据

In understanding the data, you typically want to analyze it. Duck DB is a great tool for this. It is an in-process analytical database that allows you to run SQL queries on data in memory.
在理解数据时,您通常希望对其进行分析。Duck DB 是一个很好的工具。它是一个进程内分析数据库,允许您对内存中的数据运行 SQL 查询。

Because the data is already in parquet format, we can use the read_parquet and because both Duck DB and the ObjectStoragePath use fsspec we can register the backend of the ObjectStoragePath with Duck DB. ObjectStoragePath exposes the fs property for this. We can then use the register_filesystem function from Duck DB to register the backend with Duck DB.
因为数据已经是parquet格式,所以我们可以使用read_parquet,而且因为Duck DB和ObjectStoragePath都使用fsspec,所以我们可以用Duck DB注册ObjectStoragePath的后端。ObjectStoragePath 为此公开了 fs 属性。然后,我们可以使用 Duck DB 中的 register_filesystem 函数向 Duck DB 注册后端。

In Duck DB we can then create a table from the data and run a query on it. The query is returned as a dataframe, which could be used for further analysis or saved to object storage.
在 Duck DB 中,我们可以根据数据创建一个表并对其运行查询。查询以 DataFrame 的形式返回,可用于进一步分析或保存到对象存储中。

airflow/example_dags/tutorial_objectstorage.py
气流/example_dags/tutorial_objectstorage.py
[source] [资源]

    @task
    def analyze(path: ObjectStoragePath, **kwargs):
        """
        #### Analyze
        This task analyzes the air quality data, prints the results
        """
        import duckdb

        conn = duckdb.connect(database=":memory:")
        conn.register_filesystem(path.fs)
        conn.execute(f"CREATE OR REPLACE TABLE airquality_urban AS SELECT * FROM read_parquet('{path}')")

        df2 = conn.execute("SELECT * FROM airquality_urban").fetchdf()

        print(df2.head())

You might note that the analyze function does not know the original path to the object, but that it is passed in as a parameter and obtained through XCom. You do not need to re-instantiate the Path object. Also the connection details are handled transparently.
您可能会注意到,analyze 函数不知道对象的原始路径,但它作为参数传入并通过 XCom 获取。您无需重新实例化 Path 对象。此外,连接详细信息的处理方式也是透明的。

Putting it all together
把它们放在一起

The final DAG looks like this, which wraps things so that we can run it:
最终的 DAG 如下所示,它包装了内容,以便我们可以运行它:

airflow/example_dags/tutorial_objectstorage.py
气流/example_dags/tutorial_objectstorage.py
[source] [资源]


import pendulum
import requests

from airflow.decorators import dag, task
from airflow.io.path import ObjectStoragePath

API = "https://opendata.fmi.fi/timeseries"

aq_fields = {
    "fmisid": "int32",
    "time": "datetime64[ns]",
    "AQINDEX_PT1H_avg": "float64",
    "PM10_PT1H_avg": "float64",
    "PM25_PT1H_avg": "float64",
    "O3_PT1H_avg": "float64",
    "CO_PT1H_avg": "float64",
    "SO2_PT1H_avg": "float64",
    "NO2_PT1H_avg": "float64",
    "TRSC_PT1H_avg": "float64",
}
base = ObjectStoragePath("s3://aws_default@airflow-tutorial-data/")


@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
)
def tutorial_objectstorage():
    """
    ### Object Storage Tutorial Documentation
    This is a tutorial DAG to showcase the usage of the Object Storage API.
    Documentation that goes along with the Airflow Object Storage tutorial is
    located
    [here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial/objectstorage.html)
    """
    @task
    def get_air_quality_data(**kwargs) -> ObjectStoragePath:
        """
        #### Get Air Quality Data
        This task gets air quality data from the Finnish Meteorological Institute's
        open data API. The data is saved as parquet.
        """
        import pandas as pd

        execution_date = kwargs["logical_date"]
        start_time = kwargs["data_interval_start"]

        params = {
            "format": "json",
            "precision": "double",
            "groupareas": "0",
            "producer": "airquality_urban",
            "area": "Uusimaa",
            "param": ",".join(aq_fields.keys()),
            "starttime": start_time.isoformat(timespec="seconds"),
            "endtime": execution_date.isoformat(timespec="seconds"),
            "tz": "UTC",
        }

        response = requests.get(API, params=params)
        response.raise_for_status()

        # ensure the bucket exists
        base.mkdir(exist_ok=True)

        formatted_date = execution_date.format("YYYYMMDD")
        path = base / f"air_quality_{formatted_date}.parquet"

        df = pd.DataFrame(response.json()).astype(aq_fields)
        with path.open("wb") as file:
            df.to_parquet(file)

        return path
    @task
    def analyze(path: ObjectStoragePath, **kwargs):
        """
        #### Analyze
        This task analyzes the air quality data, prints the results
        """
        import duckdb

        conn = duckdb.connect(database=":memory:")
        conn.register_filesystem(path.fs)
        conn.execute(f"CREATE OR REPLACE TABLE airquality_urban AS SELECT * FROM read_parquet('{path}')")

        df2 = conn.execute("SELECT * FROM airquality_urban").fetchdf()

        print(df2.head())
    obj_path = get_air_quality_data()
    analyze(obj_path)
tutorial_objectstorage()

Was this entry helpful? 此条目对您有帮助吗?