Dask DataFrames 最佳实践
目录
Dask DataFrames 最佳实践¶
Dask DataFrame 入门很容易,但要用好它需要一些经验。本页面提供了 Dask DataFrames 最佳实践建议,并包含常见问题的解决方案。
使用 Pandas¶
对于能放入内存的数据,pandas 通常比 Dask DataFrame 更快且更易于使用。虽然“大数据”工具令人兴奋,但在常规数据工具适用时,它们几乎总是更逊一筹。
先规约,然后使用 pandas¶
与上述类似,即使你拥有大型数据集,计算过程中也可能存在将数据量规约到更易于管理的程度。此时,你可能希望切换到 pandas。
df = dd.read_parquet('my-giant-file.parquet')
df = df[df.name == 'Alice'] # Select a subsection
result = df.groupby('id').value.mean() # Reduce to a smaller size
result = result.compute() # Convert to pandas dataframe
result... # Continue working with pandas
Pandas 性能技巧同样适用于 Dask DataFrame¶
常见的 pandas 性能技巧,例如避免使用 apply、使用向量化操作、使用分类数据等,都同样适用于 Dask DataFrame。关于此主题,建议阅读 Modern Pandas,作者 Tom Augspurger。
使用索引¶
Dask DataFrame 可以选择沿单个索引列进行排序。针对此列的某些操作速度非常快。例如,如果你的数据集按时间排序,你可以快速选择特定日期的数据,执行时间序列连接等。你可以通过查看 df.known_divisions
属性来检查数据是否已排序。你可以使用 .set_index(column_name)
方法设置索引列。不过,此操作开销很大,因此请谨慎使用(参见下文)。
df = df.set_index('timestamp') # set the index to make some operations fast
df.loc['2001-01-05':'2001-01-12'] # this is very fast if you have an index
df.merge(df2, left_index=True, right_index=True) # this is also very fast
更多信息,请参阅关于 DataFrame 分区 的文档。
避免全量数据重排¶
设置索引是一个重要但开销很大的操作(见上文)。你应该不频繁地执行此操作,并且之后应该进行持久化(见下文)。
某些操作,例如 set_index
和 merge/join
,在并行或分布式环境中比在单机内存中更难执行。特别是,重新排列数据的*重排操作*会变得通信密集。例如,如果你的数据按客户 ID 排列,但现在你想按时间排列,那么所有分区都必须相互通信以交换数据碎片。这可能是一个密集的过程,尤其是在集群上。
因此,务必设置索引,但尽量不频繁地进行。
df = df.set_index('column_name') # do this infrequently
此外,set_index
有一些选项可以在某些情况下加速。例如,如果你知道数据集已排序,或者你已经知道用于划分数据集的值,你可以提供这些值来加速 set_index
操作。更多信息,请参阅 set_index docstring。
df2 = df.set_index(d.timestamp, sorted=True)
智能地持久化¶
注意
本节仅与分布式系统用户相关。
警告
persist 在查询优化器方面存在一些缺点。它会阻止所有优化,并阻止我们将列投影或过滤器推送到 IO 层。仅在绝对必要时或之后需要完整数据集时才谨慎使用 persist。
通常 DataFrame 工作负载如下所示
从文件加载数据
将数据过滤到特定子集
重排数据以设置智能索引
基于此索引数据的几个复杂查询
通常,理想的做法是加载、过滤和重排数据一次,并将结果保留在内存中。之后,每个复杂的查询都可以基于这些内存中的数据,而无需每次都重复完整的加载-过滤-重排过程。为此,请使用 client.persist 方法。
df = dd.read_csv('s3://bucket/path/to/*.csv')
df = df[df.balance < 0]
df = client.persist(df)
df = df.set_index('timestamp')
df = client.persist(df)
>>> df.customer_id.nunique().compute()
18452844
>>> df.groupby(df.city).size().compute()
...
Persist 很重要,因为 Dask DataFrame 默认是惰性计算的。它是一种告诉集群开始执行你目前已定义的计算,并尝试将结果保留在内存中的方式。你会得到一个新的 DataFrame,它在语义上与旧的 DataFrame 等效,但现在指向正在运行的数据。你的旧 DataFrame 仍然指向惰性计算。
# Don't do this
client.persist(df) # persist doesn't change the input in-place
# Do this instead
df = client.persist(df) # replace your old lazy DataFrame
重新分区以减少开销¶
你的 Dask DataFrame 被分成许多 pandas DataFrames。我们有时称这些为“分区”,分区数量通常由系统决定。例如,它可能是你正在读取的 CSV 文件数量。然而,随着时间的推移,当你通过过滤或连接来减小或增大 pandas DataFrames 的大小时,重新考虑你需要多少个分区可能是明智的。分区过多或过少都会带来开销。
分区应该能舒适地放入内存(小于一千兆字节),同时数量也不宜过多。调度器处理每个分区上的每个操作需要几百微秒。如果你有几千个任务,这几乎不可察觉,但如果可能的话,最好减少数量。
常见的情况是,你将大量数据加载到合理大小的分区中(Dask 的默认设置是合理的选择),但随后你将数据集过滤到只剩下原始数据的一小部分。此时,明智的做法是将许多小分区重新组合成几个较大的分区。你可以使用 dask.dataframe.DataFrame.repartition
方法来做到这一点。
df = dd.read_csv('s3://bucket/path/to/*.csv')
df = df[df.name == 'Alice'] # only 1/100th of the data
df = df.repartition(npartitions=df.npartitions // 100)
df = df.persist() # if on a distributed system
这有助于减少开销并提高向量化 Pandas 操作的效率。你应该争取使每个分区的数据量约为 100MB。
连接¶
连接两个 DataFrame 可能开销非常大,也可能开销非常小,这取决于具体情况。在以下情况下开销很小:
连接 Dask DataFrame 和 pandas DataFrame
连接 Dask DataFrame 和另一个单分区 Dask DataFrame
沿索引连接 Dask DataFrames
在以下情况下开销很大:
沿非索引列连接 Dask DataFrames
开销大的情况需要进行重排。这没问题,Dask DataFrame 会很好地完成任务,但这比典型的线性时间操作开销更大。
dd.merge(a, pandas_df) # fast
dd.merge(a, b, left_index=True, right_index=True) # fast
dd.merge(a, b, left_index=True, right_on='id') # half-fast, half-slow
dd.merge(a, b, left_on='id', right_on='id') # slow
更多信息请参阅连接。
使用 Parquet¶
Apache Parquet 是一种列式二进制格式。它是存储大量表格数据的实际标准,也是我们推荐的基础表格数据存储解决方案。
df.to_parquet('path/to/my-results/')
df = dd.read_parquet('path/to/my-results/')
与 CSV 等格式相比,Parquet 具有以下优点:
读写速度更快,通常快 4-10 倍
存储更紧凑,通常压缩 2-5 倍
它有 schema,因此不存在列类型模糊的问题。这避免了令人困惑的错误。
它支持更高级的数据类型,例如分类数据、正确的日期时间等
它更具可移植性,可以与数据库或 Apache Spark 等其他系统一起使用
根据数据的分区方式,Dask 可以识别已排序的列,有时可以更有效地挑选出数据的子集
更多详细信息,请参阅 Dask Dataframe 和 Parquet。