Dask Dataframe 和 Parquet

Parquet 是一种流行的柱状文件格式,专为高效数据存储和检索而设计。Dask dataframe 包含 read_parquet()to_parquet() 函数/方法,分别用于读取和写入 parquet 文件。本文档将介绍这些方法,并提供一些技巧和最佳实践。

Parquet I/O 需要安装 pyarrow

读取 Parquet 文件

read_parquet([path, columns, filters, ...])

将 Parquet 文件读取到 Dask DataFrame 中

Dask dataframe 提供了 read_parquet() 函数用于读取一个或多个 parquet 文件。其第一个参数可以是以下之一:

  • 单个 parquet 文件的路径

  • parquet 文件目录的路径(扩展名为 .parquet.parq 的文件)

  • 扩展为一个或多个 parquet 文件路径的 glob 字符串

  • parquet 文件路径列表

这些路径可以是本地的,或者通过在路径前加上协议来指向某些远程文件系统(例如 S3GCS)。

>>> import dask.dataframe as dd

# Load a single local parquet file
>>> df = dd.read_parquet("path/to/mydata.parquet")

# Load a directory of local parquet files
>>> df = dd.read_parquet("path/to/my/parquet/")

# Load a directory of parquet files from S3
>>> df = dd.read_parquet("s3://bucket-name/my/parquet/")

请注意,对于远程文件系统,您可能需要配置凭据。如果可能,我们建议通过文件系统特定的配置文件/环境变量在 Dask 外部处理这些凭据。例如,您可能希望使用 AWS 凭据文件 存储 S3 凭据。另外,您也可以通过 storage_options 关键字参数将配置传递给 fsspec 后端。

>>> df = dd.read_parquet(
...      "s3://bucket-name/my/parquet/",
...      storage_options={"anon": True}  # passed to `s3fs.S3FileSystem`
... )

有关连接远程数据的更多信息,请参阅 连接远程数据

read_parquet() 有许多配置选项,会影响行为和性能。这里我们重点介绍一些常用选项。

元数据

当使用 read_parquet() 读取 多个文件 时,它会首先加载数据集中文件的元数据。此元数据可能包括:

  • 数据集模式 (Schema)

  • 数据集如何被分区为文件,以及这些文件如何被分区为行组 (row-groups)

一些 parquet 数据集包含一个 _metadata 文件,该文件将每个文件的元数据聚合成一个位置。对于中小型数据集,这 可能 非常有用,因为它可以在不读取数据集中 每个 文件的情况下访问行组元数据。行组元数据允许 Dask 将大文件分割成更小的内存分区,并将许多小文件合并成更大的分区,从而可能带来更高的性能。

然而,对于大型数据集,_metadata 文件可能会出现问题,因为它可能对于单个端点来说太大而无法解析!如果出现这种情况,您可以通过指定 ignore_metadata_file=True 来禁用加载 _metadata 文件。

>>> df = dd.read_parquet(
...      "s3://bucket-name/my/parquet/",
...      ignore_metadata_file=True  # don't read the _metadata file
... )

分区大小

默认情况下,Dask 将使用数据集中第一个 parquet 文件的元数据来推断是否可以安全地将每个文件作为 Dask dataframe 中的一个分区单独加载。如果 parquet 数据的未压缩字节大小超过 blocksize(默认为 256 MiB),则每个分区将对应于一个 parquet 行组范围,而不是整个文件。

为了获得最佳性能,请使用可以单独映射到良好 dataframe 分区大小的文件,并相应地设置 blocksize。如果单个文件需要分成多个行组范围,并且数据集中不包含 _metadata 文件,Dask 需要提前加载所有 footer 元数据。

我们建议目标是每个文件加载到 pandas 后内存大小为 100-300 MiB。过大的分区可能导致单个 worker 内存使用过多,而过小的分区可能因 Dask 开销占主导地位而导致性能不佳。

如果您知道您的 parquet 数据集包含过大的文件,您可以传递 split_row_groups='adaptive' 以确保 Dask 会尝试将每个分区保持在 blocksize 限制之下。请注意,如果一个或多个行组过大,分区仍然可能超过 blocksize

列选择

加载 parquet 数据时,有时您不需要数据集中所有可用的列。在这种情况下,您可能希望通过 columns 关键字参数指定您需要的列子集。这有几个好处:

  • 它允许 Dask 从后端文件系统读取更少的数据,从而降低 IO 成本

  • 它允许 Dask 加载更少的数据到内存中,从而减少内存使用

>>> dd.read_parquet(
...     "s3://path/to/myparquet/",
...     columns=["a", "b", "c"]  # Only read columns 'a', 'b', and 'c'
... )

计算分区边界

默认情况下,read_parquet() 不会 生成具有已知分区边界的集合。但是,您可以传递 calculate_divisions=True 来告诉 Dask 您希望使用 footer 元数据(或全局 _metadata 文件)中的行组统计信息在图创建时计算分区边界。如果缺少任何必要的行组统计信息,或者未检测到索引列,使用此选项将不会生成已知分区边界。使用 index 参数是确保所需字段被视为索引的最佳方法。

>>> dd.read_parquet(
...     "s3://path/to/myparquet/",
...     index="timestamp",  # Specify a specific index column
...     calculate_divisions=True,  # Calculate divisions from metadata
... )

尽管使用 calculate_divisions=True 不需要从 parquet 文件中读取任何 实际 数据,但它确实要求 Dask 加载和处理数据集中每个行组的元数据。因此,对于没有全局 _metadata 文件的大型数据集,应避免计算分区边界。对于远程存储尤其如此。

有关分区边界的更多信息,请参阅 Dask DataFrame 设计

写入

to_parquet(df, path[, compression, ...])

将 Dask.dataframe 存储到 Parquet 文件

DataFrame.to_parquet(path, **kwargs)

Dask dataframe 提供了 to_parquet() 函数和方法用于写入 parquet 文件。

在最简单的用法中,它接受一个目录路径,用于写入数据集。此路径可以是本地的,或者通过在路径前加上协议来指向某些远程文件系统(例如 S3GCS)。

# Write to a local directory
>>> df.to_parquet("path/to/my/parquet/")

# Write to S3
>>> df.to_parquet("s3://bucket-name/my/parquet/")

请注意,对于远程文件系统,您可能需要配置凭据。如果可能,我们建议通过文件系统特定的配置文件/环境变量在 Dask 外部处理这些凭据。例如,您可能希望使用 AWS 凭据文件 存储 S3 凭据。另外,您也可以通过 storage_options 关键字参数将配置传递给 fsspec 后端。

>>> df.to_parquet(
...     "s3://bucket-name/my/parquet/",
...     storage_options={"anon": True}  # passed to `s3fs.S3FileSystem`
... )

有关连接远程数据的更多信息,请参阅 连接远程数据

Dask 会将每个 Dask dataframe 分区写入到此目录下的一个文件。为了优化下游消费者的访问,我们建议目标是每个分区内存大小为 100-300 MiB。这有助于平衡 worker 内存使用与 Dask 开销。您可能会发现 DataFrame.memory_usage_per_partition() 方法对于确定您的数据是否最佳分区很有用。

to_parquet() 有许多配置选项,会影响行为和性能。这里我们重点介绍一些常用选项。

元数据

为了提高 读取 性能,Dask 可以在写入时选择性地通过聚合数据集中每个文件的行组元数据来写入一个全局 _metadata 文件。虽然在读取时可能很有用,但生成此文件可能会导致大规模时内存使用过多(并可能导致 Dask worker 被终止)。因此,建议仅对中小型数据集启用此文件的写入。

>>> df.to_parquet(
...     "s3://bucket-name/my/parquet/",
...     write_metadata_file=True  # enable writing the _metadata file
... )

文件名

除非使用 partition_on 选项(参见 使用 Dask 进行 Hive 分区),否则 to_parquet() 会将每个 Dask dataframe 分区写入到输出目录中的一个文件。默认情况下,这些文件将具有诸如 part.0.parquetpart.1.parquet 等名称。如果您希望更改此命名方案,可以使用 name_function 关键字参数。它接受一个签名为 name_function(partition: int) -> str 的函数,该函数接收每个 Dask dataframe 分区的索引并返回用作文件名的字符串。请注意,返回的名称必须与它们的分区索引按相同顺序排序。

>>> df.npartitions  # 3 partitions (0, 1, and 2)
3

>>> df.to_parquet("/path/to/output", name_function=lambda i: f"data-{i}.parquet")

>>> os.listdir("/path/to/parquet")
["data-0.parquet", "data-1.parquet", "data-2.parquet"]

Hive 分区

有时将 parquet 数据集写入具有类似 hive 的目录结构(例如 '/year=2022/month=12/day=25')很有用。当使用 partition_on 选项时,to_parquet() 会自动生成具有此类目录结构的数据集。在大多数情况下,to_parquet() 会自动处理 hive 分区。有关更多信息,请参阅 使用 Dask 进行 Hive 分区