Object Storage¶ 对象存储¶
This tutorial shows how to use the Object Storage API to manage objects that
reside on object storage, like S3, gcs and azure blob storage. The API is introduced
as part of Airflow 2.8.
本教程介绍如何使用对象存储 API 来管理驻留在对象存储上的对象,例如 S3、gcs 和 azure blob 存储。该 API 是作为 Airflow 2.8 的一部分引入的。
The tutorial covers a simple pattern that is often used in data engineering and data
science workflows: accessing a web api, saving and analyzing the result.
本教程介绍了一种在数据工程和数据科学工作流中经常使用的简单模式:访问 Web API、保存和分析结果。
Prerequisites¶ 先决条件¶
To complete this tutorial, you need a few things:
若要完成本教程,需要满足以下几项要求:To complete this tutorial, you need a few things:
DuckDB, an in-process analytical database, which can be installed by running
pip install duckdb
.
DuckDB,一个进程内分析数据库,可以通过运行pip install duckdb
来安装。An S3 bucket, along with the Amazon provider including
s3fs
. You can install the provider package by runningpip install apache-airflow-providers-amazon[s3fs]
. Alternatively, you can use a different storage provider by changing the URL in thecreate_object_storage_path
function to the appropriate URL for your provider, for example by replacings3://
withgs://
for Google Cloud Storage, and installing a different provider.
一个 S3 存储桶,以及包括s3fs
在内的 Amazon 提供商。您可以通过运行pip install apache-airflow-providers-amazon[s3fs]
来安装提供程序包。或者,您也可以使用不同的存储提供商,方法是将create_object_storage_path
函数中的 URL 更改为提供商的相应 URL,例如,将s3://
替换为 Google Cloud Storage 的gs://
,并安装不同的提供商。pandas
, which you can install by runningpip install pandas
.
,您可以通过运行pip install pandas
来安装。
Creating an ObjectStoragePath¶
创建 ObjectStoragePath¶
The ObjectStoragePath is a path-like object that represents a path on object storage.
It is the fundamental building block of the Object Storage API.
ObjectStoragePath 是一个类似路径的对象,表示对象存储上的路径。它是对象存储 API 的基本构建块。
base = ObjectStoragePath("s3://aws_default@airflow-tutorial-data/")
The username part of the URL given to ObjectStoragePath should be a connection ID.
The specified connection will be used to obtain the right credentials to access
the backend. If it is omitted, the default connection for the backend will be used.
为 ObjectStoragePath 提供的 URL 的用户名部分应为连接 ID。指定的连接将用于获取正确的凭据以访问后端。如果省略,将使用后端的默认连接。
The connection ID can alternatively be passed in with a keyword argument:
也可以使用关键字参数传入连接 ID:
ObjectStoragePath("s3://airflow-tutorial-data/", conn_id="aws_default")
This is useful when reusing a URL defined for another purpose (e.g. Dataset),
which generally does not contain a username part. The explicit keyword argument
takes precedence over the URL’s username value if both are specified.
这在重用为其他目的(例如数据集)定义的 URL 时非常有用,该 URL 通常不包含用户名部分。如果指定了 URL 的用户名值,则显式关键字参数优先于两者。
It is safe to instantiate an ObjectStoragePath at the root of your DAG. Connections
will not be created until the path is used. This means that you can create the
path in the global scope of your DAG and use it in multiple tasks.
在 DAG 的根目录处实例化 ObjectStoragePath 是安全的。在使用路径之前,不会创建连接。这意味着您可以在 DAG 的全局范围内创建路径,并在多个任务中使用它。
Saving data to Object Storage¶
将数据保存到对象存储¶
An ObjectStoragePath behaves mostly like a pathlib.Path object. You can
use it to save and load data directly to and from object storage. So, a typical
flow could look like this:
ObjectStoragePath 的行为大多类似于 pathlib。Path 对象。您可以使用它直接在对象存储中保存和加载数据。因此,典型的流程可能如下所示:
@task
def get_air_quality_data(**kwargs) -> ObjectStoragePath:
"""
#### Get Air Quality Data
This task gets air quality data from the Finnish Meteorological Institute's
open data API. The data is saved as parquet.
"""
import pandas as pd
execution_date = kwargs["logical_date"]
start_time = kwargs["data_interval_start"]
params = {
"format": "json",
"precision": "double",
"groupareas": "0",
"producer": "airquality_urban",
"area": "Uusimaa",
"param": ",".join(aq_fields.keys()),
"starttime": start_time.isoformat(timespec="seconds"),
"endtime": execution_date.isoformat(timespec="seconds"),
"tz": "UTC",
}
response = requests.get(API, params=params)
response.raise_for_status()
# ensure the bucket exists
base.mkdir(exist_ok=True)
formatted_date = execution_date.format("YYYYMMDD")
path = base / f"air_quality_{formatted_date}.parquet"
df = pd.DataFrame(response.json()).astype(aq_fields)
with path.open("wb") as file:
df.to_parquet(file)
return path
The get_air_quality_data
calls the API of the Finnish Meteorological Institute
to obtain the air quality data for the region of Helsinki. It creates a
Pandas DataFrame from the resulting json. It then saves the data to object storage
and converts it on the fly to parquet.get_air_quality_data
调用芬兰气象研究所的API来获取赫尔辛基地区的空气质量数据。它从生成的 json 创建一个 Pandas DataFrame。然后,它将数据保存到对象存储中,并动态将其转换为 parquet。
The key of the object is automatically generated from the logical date of the task,
so we could run this everyday and it would create a new object for each day. We
concatenate this key with the base path to create the full path to the object. Finally,
after writing the object to storage, we return the path to the object. This allows
us to use the path in the next task.
对象的键是从任务的逻辑日期自动生成的,因此我们可以每天运行它,并且它会为每天创建一个新对象。我们将此键与基本路径连接起来,以创建对象的完整路径。最后,在将对象写入存储后,我们将路径返回到对象。这使我们能够在下一个任务中使用该路径。
Analyzing the data¶
分析数据¶
In understanding the data, you typically want to analyze it. Duck DB is a great
tool for this. It is an in-process analytical database that allows you to run
SQL queries on data in memory.
在理解数据时,您通常希望对其进行分析。Duck DB 是一个很好的工具。它是一个进程内分析数据库,允许您对内存中的数据运行 SQL 查询。
Because the data is already in parquet format, we can use the read_parquet
and
because both Duck DB and the ObjectStoragePath use fsspec
we can register the
backend of the ObjectStoragePath with Duck DB. ObjectStoragePath exposes the fs
property for this. We can then use the register_filesystem
function from Duck DB
to register the backend with Duck DB.
因为数据已经是parquet格式,所以我们可以使用read_parquet
,而且因为Duck DB和ObjectStoragePath都使用fsspec
,所以我们可以用Duck DB注册ObjectStoragePath的后端。ObjectStoragePath 为此公开了 fs
属性。然后,我们可以使用 Duck DB 中的 register_filesystem
函数向 Duck DB 注册后端。
In Duck DB we can then create a table from the data and run a query on it. The
query is returned as a dataframe, which could be used for further analysis or
saved to object storage.
在 Duck DB 中,我们可以根据数据创建一个表并对其运行查询。查询以 DataFrame 的形式返回,可用于进一步分析或保存到对象存储中。
@task
def analyze(path: ObjectStoragePath, **kwargs):
"""
#### Analyze
This task analyzes the air quality data, prints the results
"""
import duckdb
conn = duckdb.connect(database=":memory:")
conn.register_filesystem(path.fs)
conn.execute(f"CREATE OR REPLACE TABLE airquality_urban AS SELECT * FROM read_parquet('{path}')")
df2 = conn.execute("SELECT * FROM airquality_urban").fetchdf()
print(df2.head())
You might note that the analyze
function does not know the original
path to the object, but that it is passed in as a parameter and obtained
through XCom. You do not need to re-instantiate the Path object. Also
the connection details are handled transparently.
您可能会注意到,analyze
函数不知道对象的原始路径,但它作为参数传入并通过 XCom 获取。您无需重新实例化 Path 对象。此外,连接详细信息的处理方式也是透明的。
Putting it all together¶
把它们放在一起¶
The final DAG looks like this, which wraps things so that we can run it:
最终的 DAG 如下所示,它包装了内容,以便我们可以运行它:
import pendulum
import requests
from airflow.decorators import dag, task
from airflow.io.path import ObjectStoragePath
API = "https://opendata.fmi.fi/timeseries"
aq_fields = {
"fmisid": "int32",
"time": "datetime64[ns]",
"AQINDEX_PT1H_avg": "float64",
"PM10_PT1H_avg": "float64",
"PM25_PT1H_avg": "float64",
"O3_PT1H_avg": "float64",
"CO_PT1H_avg": "float64",
"SO2_PT1H_avg": "float64",
"NO2_PT1H_avg": "float64",
"TRSC_PT1H_avg": "float64",
}
base = ObjectStoragePath("s3://aws_default@airflow-tutorial-data/")
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
)
def tutorial_objectstorage():
"""
### Object Storage Tutorial Documentation
This is a tutorial DAG to showcase the usage of the Object Storage API.
Documentation that goes along with the Airflow Object Storage tutorial is
located
[here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial/objectstorage.html)
"""
@task
def get_air_quality_data(**kwargs) -> ObjectStoragePath:
"""
#### Get Air Quality Data
This task gets air quality data from the Finnish Meteorological Institute's
open data API. The data is saved as parquet.
"""
import pandas as pd
execution_date = kwargs["logical_date"]
start_time = kwargs["data_interval_start"]
params = {
"format": "json",
"precision": "double",
"groupareas": "0",
"producer": "airquality_urban",
"area": "Uusimaa",
"param": ",".join(aq_fields.keys()),
"starttime": start_time.isoformat(timespec="seconds"),
"endtime": execution_date.isoformat(timespec="seconds"),
"tz": "UTC",
}
response = requests.get(API, params=params)
response.raise_for_status()
# ensure the bucket exists
base.mkdir(exist_ok=True)
formatted_date = execution_date.format("YYYYMMDD")
path = base / f"air_quality_{formatted_date}.parquet"
df = pd.DataFrame(response.json()).astype(aq_fields)
with path.open("wb") as file:
df.to_parquet(file)
return path
@task
def analyze(path: ObjectStoragePath, **kwargs):
"""
#### Analyze
This task analyzes the air quality data, prints the results
"""
import duckdb
conn = duckdb.connect(database=":memory:")
conn.register_filesystem(path.fs)
conn.execute(f"CREATE OR REPLACE TABLE airquality_urban AS SELECT * FROM read_parquet('{path}')")
df2 = conn.execute("SELECT * FROM airquality_urban").fetchdf()
print(df2.head())
obj_path = get_air_quality_data()
analyze(obj_path)
tutorial_objectstorage()