Dask Dataframe 和 Parquet

Parquet 是一种流行的列式文件格式,设计用于高效的数据存储和检索。Dask dataframe 包含用于读写 parquet 文件的 read_parquet()to_parquet() 函数/方法。这里我们介绍这些方法,并提供一些提示和最佳实践。

Parquet I/O 需要安装 pyarrow

读取 Parquet 文件

read_parquet([path, columns, filters, ...])

将 Parquet 文件读入 Dask DataFrame

Dask dataframe 提供了 read_parquet() 函数用于读取一个或多个 parquet 文件。它的第一个参数可以是以下之一:

  • 单个 parquet 文件的路径

  • 包含 parquet 文件的目录路径(扩展名为 .parquet.parq 的文件)

  • 展开为一个或多个 parquet 文件路径的 glob 字符串

  • parquet 文件路径列表

这些路径可以是本地路径,也可以通过在路径前添加协议来指向某个远程文件系统(例如 S3GCS)。

>>> import dask.dataframe as dd

# Load a single local parquet file
>>> df = dd.read_parquet("path/to/mydata.parquet")

# Load a directory of local parquet files
>>> df = dd.read_parquet("path/to/my/parquet/")

# Load a directory of parquet files from S3
>>> df = dd.read_parquet("s3://bucket-name/my/parquet/")

请注意,对于远程文件系统,您可能需要配置凭据。如果可能,我们建议通过文件系统特定的配置文件/环境变量在 Dask 外部处理这些凭据。例如,您可能希望使用 AWS 凭据文件存储 S3 凭据。或者,您可以通过 storage_options 关键字参数将配置传递给 fsspec 后端。

>>> df = dd.read_parquet(
...      "s3://bucket-name/my/parquet/",
...      storage_options={"anon": True}  # passed to `s3fs.S3FileSystem`
... )

有关连接到远程数据的更多信息,请参阅连接到远程数据

read_parquet() 有许多配置选项,影响其行为和性能。这里我们重点介绍一些常见选项。

元数据

当使用 read_parquet() 读取多个文件时,它首先加载数据集中文件的元数据。这些元数据可能包括:

  • 数据集模式

  • 数据集如何分区到文件,以及这些文件如何分区到行组

一些 parquet 数据集包含一个 _metadata 文件,它将每个文件的元数据聚合到一个位置。对于中小型数据集,这可能很有用,因为它无需读取数据集中每个文件的部分内容即可访问行组元数据。行组元数据允许 Dask 将大文件拆分为更小的内存分区,并将许多小文件合并为更大的分区,从而可能带来更高的性能。

但是,对于大型数据集,_metadata 文件可能存在问题,因为它对于单个端点来说可能太大了!如果出现这种情况,您可以通过指定 ignore_metadata_file=True 来禁用加载 _metadata 文件。

>>> df = dd.read_parquet(
...      "s3://bucket-name/my/parquet/",
...      ignore_metadata_file=True  # don't read the _metadata file
... )

分区大小

默认情况下,Dask 将使用数据集中第一个 parquet 文件的元数据来推断是否可以安全地将每个文件单独加载为 Dask dataframe 的一个分区。如果 parquet 数据的未压缩字节大小超过 blocksize(默认为 256 MiB),则每个分区将对应于一系列 parquet 行组,而不是整个文件。

为了获得最佳性能,请使用可以单独映射到良好 dataframe 分区大小的文件,并相应地设置 blocksize。如果单个文件需要划分为多个行组范围,并且数据集不包含 _metadata 文件,则 Dask 需要预先加载所有页脚元数据。

我们建议每个文件加载到 pandas 后,内存大小目标是 100-300 MiB。过大的分区可能导致单个 worker 内存使用过多,而过小的分区可能导致 Dask 开销占据主导地位,从而导致性能不佳。

如果您知道您的 parquet 数据集包含过大的文件,可以传递 split_row_groups='adaptive' 来确保 Dask 会尝试将每个分区保持在 blocksize 限制以下。请注意,如果一个或多个行组过大,分区仍可能超过 blocksize

列选择

加载 parquet 数据时,有时您不需要数据集中的所有可用列。在这种情况下,您可能希望通过 columns 关键字参数指定您需要的列子集。这有几个好处:

  • 它让 Dask 从底层文件系统中读取更少的数据,从而降低 IO 成本

  • 它让 Dask 加载更少的数据到内存中,从而减少内存使用

>>> dd.read_parquet(
...     "s3://path/to/myparquet/",
...     columns=["a", "b", "c"]  # Only read columns 'a', 'b', and 'c'
... )

计算分界

默认情况下,read_parquet() 不会生成已知分界的集合。但是,您可以传递 calculate_divisions=True 来告诉 Dask,您希望使用页脚元数据(或全局 _metadata 文件)中的行组统计信息在图创建时计算分界。如果缺少任何必要的行组统计信息,或者未检测到索引列,使用此选项将不会生成已知分界。使用 index 参数是确保所需字段被视为索引的最佳方法。

>>> dd.read_parquet(
...     "s3://path/to/myparquet/",
...     index="timestamp",  # Specify a specific index column
...     calculate_divisions=True,  # Calculate divisions from metadata
... )

尽管使用 calculate_divisions=True 不需要从 parquet 文件中读取任何实际数据,但它确实需要 Dask 加载和处理数据集中每个行组的元数据。因此,对于没有全局 _metadata 文件的大型数据集,应避免计算分界。对于远程存储尤其如此。

有关分界的更多信息,请参阅Dask DataFrame 设计

写入

to_parquet(df, path[, compression, ...])

将 Dask.dataframe 存储到 Parquet 文件

DataFrame.to_parquet(path, **kwargs)

Dask dataframe 提供了 to_parquet() 函数和方法用于写入 parquet 文件。

在其最简单的用法中,这需要一个目录路径来写入数据集。该路径可以是本地路径,也可以通过在路径前添加协议来指向某个远程文件系统(例如 S3GCS)。

# Write to a local directory
>>> df.to_parquet("path/to/my/parquet/")

# Write to S3
>>> df.to_parquet("s3://bucket-name/my/parquet/")

请注意,对于远程文件系统,您可能需要配置凭据。如果可能,我们建议通过文件系统特定的配置文件/环境变量在 Dask 外部处理这些凭据。例如,您可能希望使用 AWS 凭据文件存储 S3 凭据。或者,您可以通过 storage_options 关键字参数将配置传递给 fsspec 后端。

>>> df.to_parquet(
...     "s3://bucket-name/my/parquet/",
...     storage_options={"anon": True}  # passed to `s3fs.S3FileSystem`
... )

有关连接到远程数据的更多信息,请参阅连接到远程数据

Dask 会将每个 Dask dataframe 分区写入此目录中的一个文件。为了优化下游消费者的访问,我们建议每个分区的内存大小目标为 100-300 MiB。这有助于平衡 worker 内存使用与 Dask 开销。您可能会发现 DataFrame.memory_usage_per_partition() 方法对于确定您的数据是否已最优分区非常有用。

to_parquet() 有许多配置选项,影响其行为和性能。这里我们重点介绍一些常见选项。

元数据

为了提高读取性能,Dask 可以选择在写入时通过聚合数据集中每个文件的行组元数据来写入一个全局 _metadata 文件。虽然在读取时可能有用,但在大规模情况下生成此文件可能导致过多的内存使用(并可能导致 Dask worker 被终止)。因此,建议仅对中小型数据集启用此文件的写入功能。

>>> df.to_parquet(
...     "s3://bucket-name/my/parquet/",
...     write_metadata_file=True  # enable writing the _metadata file
... )

文件名

除非使用 partition_on 选项(参见使用 Dask 进行 Hive 分区),否则 to_parquet() 会将每个 Dask dataframe 分区写入输出目录中的一个文件。默认情况下,这些文件的名称类似于 part.0.parquetpart.1.parquet 等。如果您希望更改此命名方案,可以使用 name_function 关键字参数。该参数接受一个函数,其签名为 name_function(partition: int) -> str,接受每个 Dask dataframe 分区的分区索引并返回用作文件名的字符串。请注意,返回的名称必须与它们的分区索引具有相同的排序顺序。

>>> df.npartitions  # 3 partitions (0, 1, and 2)
3

>>> df.to_parquet("/path/to/output", name_function=lambda i: f"data-{i}.parquet")

>>> os.listdir("/path/to/parquet")
["data-0.parquet", "data-1.parquet", "data-2.parquet"]

Hive 分区

有时将 parquet 数据集写入具有 hive 风格的目录结构(例如 '/year=2022/month=12/day=25')很有用。当使用 partition_on 选项时,to_parquet() 会自动生成具有此类目录结构的数据集。在大多数情况下,to_parquet() 会自动处理 hive 分区。有关更多信息,请参阅使用 Dask 进行 Hive 分区