将 Hive 分区与 Dask 一起使用
目录
将 Hive 分区与 Dask 一起使用¶
有时使用类似 hive 的目录方案写入数据集会很有用。例如,如果您的 dataframe 包含 'year'
和 'semester'
列,基于 hive 的目录结构可能如下所示
output-path/
├── year=2022/
│ ├── semester=fall/
│ │ └── part.0.parquet
│ └── semester=spring/
│ ├── part.0.parquet
│ └── part.1.parquet
└── year=2023/
└── semester=fall/
└── part.1.parquet
这种自描述结构的使用意味着 'output-path/year=2022/semester=fall/'
目录中的所有行在 'year'
列中将包含值 2022
,在 'semester'
列中将包含值 'fall'
。
生成 hive 分区数据集的主要优势在于 read_parquet()
可以应用某些 IO 过滤器,而无需解析任何文件元数据。换句话说,当数据集已按 'year'
列进行 hive 分区时,以下命令通常会更快。
>>> dd.read_parquet("output-path", filters=[("year", ">", 2022)])
使用 Hive 分区写入 Parquet 数据¶
使用 partition_on
选项时,Dask 的 to_parquet()
函数会自动生成 hive 分区目录方案。
>>> df.to_parquet("output-path", partition_on=["year", "semester"])
>>> os.listdir("output-path")
["year=2022", "year=2023"]
>>> os.listdir("output-path/year=2022")
["semester=fall", "semester=spring"]
>>> os.listdir("output-path/year=2022/semester=spring")
['part.0.parquet', 'part.1.parquet']
重要的是要认识到,Dask 不会聚合写入到每个叶目录中的数据文件。这是因为在执行 to_parquet()
任务图期间,每个 DataFrame 分区都是独立写入的。为了写出分区 i 的数据,分区-i 写入任务将对列 ["year", "semester"]
执行 groupby 操作,然后每个不同的组将使用文件名 'part.{i}.parquet'
写入到相应的目录。因此,hive 分区写入可能会在每个叶目录中生成大量文件(每个 DataFrame 分区一个文件)。
如果您的应用程序要求为每个 hive 分区生成一个 Parquet 文件,一种可能的解决方案是在调用 to_parquet()
之前,对分区列进行排序或 shuffle。
>>> partition_on = ["year", "semester"]
>>> df.shuffle(on=partition_on).to_parquet(partition_on=partition_on)
像这样使用全局 shuffle 是非常昂贵的,应尽可能避免。但是,它也保证生成最少数量的文件,有时可能值得付出牺牲。
使用 Hive 分区读取 Parquet 数据¶
在大多数情况下,read_parquet()
会自动处理 hive 分区数据。默认情况下,所有 hive 分区列都将被解释为分类列。
>>> ddf = dd.read_parquet("output-path", columns=["year", "semester"])
>>> ddf
Dask DataFrame Structure:
year semester
npartitions=4
category[known] category[known]
... ...
... ...
... ...
... ...
Dask Name: read-parquet, 1 graph layer
>>> ddf.compute()
year semester
0 2022 fall
1 2022 fall
2 2022 fall
3 2022 spring
4 2022 spring
5 2022 spring
6 2023 fall
7 2023 fall
定义自定义分区模式¶
可以为 hive 分区列指定自定义模式。然后将使用指定的类型而不是 category 读取这些列。
>>> schema = pa.schema([("year", pa.int16()), ("semester", pa.string())])
>>> ddf2 = dd.read_parquet(
... path,
... columns=["year", "semester"],
... dataset={"partitioning": {"flavor": "hive", "schema": schema}}
... )
Dask DataFrame Structure:
year semester
npartitions=4
int16 object
... ...
... ...
... ...
... ...
如果您的任何 hive 分区列包含 null 值,您必须以这种方式指定分区模式。
虽然不是必需的,但如果您需要对高基数列进行分区,我们也建议您指定分区模式。这是因为默认的 'category'
dtype 会以一种显著增加 Dask 集合整体内存占用量的方式跟踪已知类别。事实上,read_parquet()
出于同样的原因已经清除了其他列的“已知类别”(参见 分类数据)。
最佳实践¶
虽然 hive 分区有时可以通过简化过滤来提高读取性能,但在其他情况下也可能导致性能下降和错误。
避免高基数¶
一个好的经验法则是避免对 float 列或任何包含许多唯一值(即高基数)的列进行分区。
使用 hive 分区时用户体验差的最常见原因是对分区列的高基数。例如,如果您尝试对具有数百万个唯一值的列进行分区,则 :func:`to_parquet` 需要生成数百万个目录。管理这些目录可能会给文件系统带来压力,而且每个目录中需要许多小文件肯定会加剧这个问题。
使用简单数据类型进行分区¶
由于 hive 分区数据是“自描述”的,我们建议您避免对复杂数据类型进行分区,并尽可能选择基于整数或字符串的数据类型。如果您的数据类型不能轻易地从用于定义目录名称的字符串值中推断出来,那么 IO 引擎可能难以解析这些值。
例如,直接对具有 datetime64
dtype 的列进行分区可能会生成如下所示的目录名称
output-path/
├── date=2022-01-01 00:00:00/
├── date=2022-02-01 00:00:00/
├── ...
└── date=2022-12-01 00:00:00/
这些目录名称不会被正确解释为 datetime64
值,甚至在 Windows 系统上被认为是无效的。为了获得更可靠的行为,我们建议将此类列分解为一个或多个“简单”列。例如,可以轻松使用 'date'
来构造 'year'
、'month'
和 'day'
列(根据需要)。
读取时聚合文件¶
警告
参数 aggregate_files
目前被列为实验性。但是,目前没有计划在未来的版本中删除该参数或改变其行为。
由于 hive 分区通常会产生大量小文件,read_parquet()
的性能通常会受益于正确使用 aggregate_files
参数。以以下数据集为例
dataset-path/
├── region=1/
│ ├── section=a/
│ │ └── 01.parquet
│ │ └── 02.parquet
│ │ └── 03.parquet
│ ├── section=b/
│ └── └── 04.parquet
│ └── └── 05.parquet
└── region=2/
├── section=a/
│ ├── 06.parquet
│ ├── 07.parquet
│ ├── 08.parquet
如果我们将 aggregate_files
在此情况下设置为 True
,我们是在告诉 Dask,任何 parquet 数据文件都可以聚合成同一个输出 DataFrame 分区。反之,如果指定分区列的名称(例如 'region'
或 'section'
),我们允许聚合共享文件路径直到(包括)相应目录名称的任意两个文件。例如,如果将 aggregate_files
设置为 'section'
,则 04.parquet
和 05.parquet
可以聚合在一起,但 03.parquet
和 04.parquet
不能。但是,如果将 aggregate_files
设置为 'region'
,则 04.parquet
可以与 05.parquet
聚合,并且 03.parquet
可以与 04.parquet
聚合。
使用 aggregate_files
通常会提高性能,因为它使 DataFrame 分区更有可能接近 blocksize
参数指定的大小。相比之下,默认行为可能会产生大量远小于 blocksize
的分区。