在 Dask 中使用 Hive 分区
目录
在 Dask 中使用 Hive 分区¶
有时将数据集写入类似 Hive 的目录方案会很有用。例如,如果您的 DataFrame 包含 'year'
和 'semester'
列,基于 Hive 的目录结构可能看起来像这样
output-path/
├── year=2022/
│ ├── semester=fall/
│ │ └── part.0.parquet
│ └── semester=spring/
│ ├── part.0.parquet
│ └── part.1.parquet
└── year=2023/
└── semester=fall/
└── part.1.parquet
这种自描述结构的使用意味着 'output-path/year=2022/semester=fall/'
目录中的所有行将在 'year'
列中包含值 2022
,并在 'semester'
列中包含值 'fall'
。
生成 Hive 分区数据集的主要优势在于,read_parquet()
可以应用某些 IO 过滤器,而无需解析任何文件元数据。换句话说,当数据集已经在 'year'
列上进行 Hive 分区时,以下命令通常会更快。
>>> dd.read_parquet("output-path", filters=[("year", ">", 2022)])
使用 Hive 分区写入 Parquet 数据¶
当使用 partition_on
选项时,Dask 的 to_parquet()
函数将自动生成 Hive 分区目录方案。
>>> df.to_parquet("output-path", partition_on=["year", "semester"])
>>> os.listdir("output-path")
["year=2022", "year=2023"]
>>> os.listdir("output-path/year=2022")
["semester=fall", "semester=spring"]
>>> os.listdir("output-path/year=2022/semester=spring")
['part.0.parquet', 'part.1.parquet']
重要的是要认识到,Dask 不会聚合每个叶目录中写入的数据文件。这是因为每个 DataFrame 分区在执行 to_parquet()
任务图期间都是独立写入的。为了写入分区 i 的数据,分区 i 写入任务将对列 ["year", "semester"]
执行 groupby 操作,然后每个不同的组将使用文件名 'part.{i}.parquet'
写入到相应的目录中。因此,一个 Hive 分区写入可能会在每个叶目录中生成大量文件(每个 DataFrame 分区一个文件)。
如果您的应用程序要求您为每个 Hive 分区生成一个单独的 Parquet 文件,一种可能的解决方案是在调用 to_parquet()
之前对分区列进行排序或洗牌。
>>> partition_on = ["year", "semester"]
>>> df.shuffle(on=partition_on).to_parquet(partition_on=partition_on)
使用像这样的全局洗牌非常昂贵,应尽可能避免。但是,它也能保证产生最少的文件数量,这有时可能值得付出牺牲。
使用 Hive 分区读取 Parquet 数据¶
在大多数情况下,read_parquet()
会自动处理 Hive 分区数据。默认情况下,所有 Hive 分区列将被解释为类别列。
>>> ddf = dd.read_parquet("output-path", columns=["year", "semester"])
>>> ddf
Dask DataFrame Structure:
year semester
npartitions=4
category[known] category[known]
... ...
... ...
... ...
... ...
Dask Name: read-parquet, 1 graph layer
>>> ddf.compute()
year semester
0 2022 fall
1 2022 fall
2 2022 fall
3 2022 spring
4 2022 spring
5 2022 spring
6 2023 fall
7 2023 fall
定义自定义分区模式¶
可以为 Hive 分区列指定自定义模式。然后将使用指定的类型而不是 category 读取这些列。
>>> schema = pa.schema([("year", pa.int16()), ("semester", pa.string())])
>>> ddf2 = dd.read_parquet(
... path,
... columns=["year", "semester"],
... dataset={"partitioning": {"flavor": "hive", "schema": schema}}
... )
Dask DataFrame Structure:
year semester
npartitions=4
int16 object
... ...
... ...
... ...
... ...
如果您的任何 Hive 分区列包含空值,您必须以这种方式指定分区模式。
虽然不是必须的,但如果您需要在高基数列上进行分区,我们仍然建议您指定分区模式。这是因为默认的 'category'
数据类型会以一种可能显著增加 Dask 集合总体内存占用的方式跟踪已知类别。事实上,read_parquet()
出于同样的原因已经清除了其他列的“已知类别”(参见类别型数据)。
最佳实践¶
虽然 Hive 分区有时可以通过简化过滤来提高读取性能,但在其他情况下也可能导致性能下降和错误。
避免高基数¶
一个好的经验法则是避免在 float 列或包含许多唯一值(即高基数)的任何列上进行分区。
使用 Hive 分区时用户体验差的最常见原因在于分区列的高基数。例如,如果您尝试在具有数百万唯一值的列上进行分区,那么 :func:`to_parquet` 将需要生成数百万个目录。这些目录的管理可能会给文件系统带来压力,而每个目录中需要大量小文件则肯定会加剧问题。
对分区使用简单数据类型¶
由于 Hive 分区数据是“自描述的”,我们建议您避免在复杂数据类型上进行分区,并尽可能选择基于整数或字符串的数据类型。如果无法轻松地从用于定义目录名称的字符串值推断出您的数据类型,那么 IO 引擎可能难以解析这些值。
例如,直接在具有 datetime64
数据类型的列上进行分区可能会产生如下所示的目录名称
output-path/
├── date=2022-01-01 00:00:00/
├── date=2022-02-01 00:00:00/
├── ...
└── date=2022-12-01 00:00:00/
这些目录名称不会被正确解释为 datetime64
值,甚至在 Windows 系统上被认为是非法的。为了获得更可靠的行为,我们建议将此类列分解为一个或多个“简单”列。例如,可以轻松地使用 'date'
来构造 'year'
、'month'
和 'day'
列(根据需要)。
读取时聚合文件¶
警告
参数 aggregate_files
当前被列为实验性。但是,目前没有计划在未来的版本中移除该参数或改变其行为。
由于 Hive 分区通常会产生大量小文件,正确使用 aggregate_files
参数通常会提高 read_parquet()
的性能。以下面这个数据集为例
dataset-path/
├── region=1/
│ ├── section=a/
│ │ └── 01.parquet
│ │ └── 02.parquet
│ │ └── 03.parquet
│ ├── section=b/
│ └── └── 04.parquet
│ └── └── 05.parquet
└── region=2/
├── section=a/
│ ├── 06.parquet
│ ├── 07.parquet
│ ├── 08.parquet
如果我们将此情况下的 aggregate_files
设置为 True
,我们是在告诉 Dask 任何 Parquet 数据文件都可以聚合成同一个输出 DataFrame 分区。如果,相反,我们指定分区列的名称(例如 'region'
或 'section'
),我们则允许聚合任何共享文件路径直至(包括)相应目录名的两个文件。例如,如果 aggregate_files
设置为 'section'
,04.parquet
和 05.parquet
可以聚合并在一起,但 03.parquet
和 04.parquet
不能。然而,如果 aggregate_files
设置为 'region'
,则 04.parquet
可以与 05.parquet
聚合并在一起,并且 03.parquet
可以与 04.parquet
聚合并在一起。
使用 aggregate_files
通常会通过使 DataFrame 分区更接近 blocksize
参数指定的大小来提高性能。相比之下,默认行为可能会产生大量远小于 blocksize
的分区。