Dask DataFrames 最佳实践

Dask DataFrame 很容易上手,但要用好它确实需要一些经验。本页提供了 Dask DataFrames 的最佳实践建议,并包含了常见问题的解决方案。

使用 Pandas

对于能放入内存的数据,pandas 通常比 Dask DataFrame 更快、更容易使用。虽然“大数据”工具可能令人兴奋,但在适合使用常规数据工具的情况下,它们几乎总是更差的选择。

先进行归约,再使用 pandas

与上述类似,即使你有一个大型数据集,在计算过程中也可能有一点,你已经将数据归约到了更容易管理的程度。此时,你可能想切换到 pandas。

df = dd.read_parquet('my-giant-file.parquet')
df = df[df.name == 'Alice']              # Select a subsection
result = df.groupby('id').value.mean()   # Reduce to a smaller size
result = result.compute()                # Convert to pandas dataframe
result...                                # Continue working with pandas

Pandas 性能技巧也适用于 Dask DataFrame

常规的 pandas 性能技巧,如避免使用 apply、使用矢量化操作、使用分类数据 (categoricals) 等,同样适用于 Dask DataFrame。关于此主题,请参阅 Tom AugspurgerModern Pandas 一文。

使用索引

Dask DataFrame 可以选择沿着单个索引列进行排序。对该列进行某些操作会非常快。例如,如果你的数据集按时间排序,你可以快速选择特定日期的数据、执行时间序列连接等。你可以通过查看 df.known_divisions 属性来检查数据是否已排序。你可以使用 .set_index(column_name) 方法设置索引列。然而,此操作成本较高,因此请谨慎使用(见下文)

df = df.set_index('timestamp')  # set the index to make some operations fast

df.loc['2001-01-05':'2001-01-12']  # this is very fast if you have an index
df.merge(df2, left_index=True, right_index=True)  # this is also very fast

更多信息,请参阅 dataframe 分区 的文档。

避免全量数据重排 (Shuffling)

设置索引是一个重要但成本较高的操作(见上文)。你应该少做,并在之后进行持久化(见下文)。

一些操作,如 set_indexmerge/join,在并行或分布式环境中比在单台机器内存中更难执行。特别是,重新排列数据的重排操作会变得通信更加密集。例如,如果你的数据按客户 ID 排列,但现在你想按时间排列,你的所有分区都必须相互通信以交换数据片段。这可能是一个密集的过程,尤其是在集群上。

因此,一定要设置索引,但尽量减少设置的频率。

df = df.set_index('column_name')  # do this infrequently

此外,set_index 有一些选项可以在某些情况下加速它。例如,如果你知道你的数据集已经排序,或者你已经知道用于分割数据的值,你可以提供这些信息来加速 set_index 操作。更多信息,请参阅 set_index 文档字符串

df2 = df.set_index(d.timestamp, sorted=True)

明智地使用 persist

注意

本节仅与分布式系统上的用户相关。

警告

persist 对查询优化器有一些缺点。它会阻止所有优化,并阻止我们将列投影或过滤器推送到 IO 层。仅在绝对必要或之后需要完整数据集时,才谨慎使用 persist。

DataFrame 工作负载通常如下所示:

  1. 从文件中加载数据

  2. 将数据过滤到特定子集

  3. 重排数据以设置智能索引

  4. 基于此索引数据执行多个复杂查询

通常理想的做法是加载、过滤和重排数据一次,并将结果保留在内存中。之后,每个复杂查询都可以基于这些内存中的数据,而无需每次都重复完整的加载-过滤-重排过程。要做到这一点,请使用 client.persist 方法

df = dd.read_csv('s3://bucket/path/to/*.csv')
df = df[df.balance < 0]
df = client.persist(df)

df = df.set_index('timestamp')
df = client.persist(df)

>>> df.customer_id.nunique().compute()
18452844

>>> df.groupby(df.city).size().compute()
...

persist 之所以重要,是因为 Dask DataFrame 默认是延迟执行的。persist 是告诉集群开始执行你迄今为止定义的计算,并尝试将这些结果保留在内存中的一种方式。你将获得一个新的 DataFrame,它在语义上等同于你的旧 DataFrame,但现在指向正在运行的数据。你的旧 DataFrame 仍然指向延迟计算

# Don't do this
client.persist(df)  # persist doesn't change the input in-place

# Do this instead
df = client.persist(df)  # replace your old lazy DataFrame

重新分区以减少开销

你的 Dask DataFrame 被分割成许多 pandas DataFrames。我们有时称这些为“分区”,分区数量通常是为你决定的。例如,它可能是你正在读取的 CSV 文件数量。然而,随着时间的推移,当你通过过滤或连接减少或增加 pandas DataFrames 的大小时,重新考虑你需要多少分区可能是明智的。分区过多或过少都有成本。

Individual partitions of a Dask DataFrame are pandas DataFrames. One tip from Dask DataFrames Best Practices is to repartition these partitions.

分区应该能舒适地放入内存(小于一 GB),但数量也不宜过多。中心调度器处理每个分区上的每个操作都需要几百微秒。如果你有几千个任务,这几乎不易察觉,但如果可能,减少数量是很好的。

常见的情况是,你将大量数据加载到大小合理的分区中(Dask 的默认设置做得不错),但随后你将数据集过滤到原始数据的一小部分。此时,将许多小分区重新分组为一些较大的分区是明智的。你可以通过使用 dask.dataframe.DataFrame.repartition 方法来做到这一点

df = dd.read_csv('s3://bucket/path/to/*.csv')
df = df[df.name == 'Alice']  # only 1/100th of the data
df = df.repartition(npartitions=df.npartitions // 100)

df = df.persist()  # if on a distributed system

这有助于减少开销并提高矢量化 Pandas 操作的效率。你应该争取每个分区的数据量在 100MB 左右。

连接 (Joins)

连接两个 DataFrame 根据情况可能非常昂贵或非常便宜。在以下情况下是便宜的:

  1. 连接一个 Dask DataFrame 与一个 pandas DataFrame

  2. 连接一个 Dask DataFrame 与另一个只有单个分区的 Dask DataFrame

  3. 沿它们的索引连接 Dask DataFrames

在以下情况下是昂贵的:

  1. 沿非索引列连接 Dask DataFrames

昂贵的情况需要进行重排。这没问题,Dask DataFrame 会很好地完成工作,但它会比典型的线性时间操作更昂贵

dd.merge(a, pandas_df)  # fast
dd.merge(a, b, left_index=True, right_index=True)  # fast
dd.merge(a, b, left_index=True, right_on='id')  # half-fast, half-slow
dd.merge(a, b, left_on='id', right_on='id')  # slow

更多信息请参阅 连接 (Joins)

使用 Parquet

Apache Parquet 是一种列式二进制格式。它是存储大量表格数据的实际标准,也是我们推荐的基础表格数据存储解决方案。

df.to_parquet('path/to/my-results/')
df = dd.read_parquet('path/to/my-results/')

与 CSV 等格式相比,Parquet 带来了以下优势:

  1. 读写速度更快,通常快 4-10 倍

  2. 存储更紧凑,通常是 2-5 倍

  3. 它有 schema (模式),因此不会对列的类型产生歧义。这避免了令人困惑的错误。

  4. 它支持更高级的数据类型,如分类数据、精确的日期时间等

  5. 它更具可移植性,可以与其他系统(如数据库或 Apache Spark)一起使用

  6. 根据数据的分区方式,Dask 可以识别已排序的列,有时可以更有效地选取数据子集

有关更多详细信息,请参阅 Dask Dataframe 和 Parquet