Dask DataFrame 设计

Dask DataFrame 设计

Dask DataFrame 协调了许多沿索引排列的 Pandas DataFrame/Series。我们使用以下组件定义 Dask DataFrame 对象:

  • 具有指定分区的特殊键集合的 Dask 图,例如 ('x', 0), ('x', 1), ...

  • 用于标识 Dask 图中哪些键指向此 DataFrame 的名称,例如 'x'

  • 一个空的 Pandas 对象,包含适当的元数据(例如列名、dtypes 等)

  • 沿索引的分区边界序列,称为 divisions

元数据

许多 DataFrame 操作依赖于知道列的名称和 dtype。为了跟踪这些信息,所有 Dask DataFrame 对象都有一个 _meta 属性,该属性包含一个空的 Pandas 对象,具有相同的 dtypes 和名称。例如:

>>> df = pd.DataFrame({'a': [1, 2, 3], 'b': ['x', 'y', 'z']})
>>> ddf = dd.from_pandas(df, npartitions=2)
>>> ddf._meta
Empty DataFrame
Columns: [a, b]
Index: []
>>> ddf._meta.dtypes
a     int64
b    object
dtype: object

在内部,Dask DataFrame 尽力在所有操作中传播此信息,因此大多数时候用户不必担心这个问题。通常这是通过在少量模拟数据样本上评估操作来完成的,这些样本可以在 _meta_nonempty 属性上找到

>>> ddf._meta_nonempty
   a    b
0  1  foo
1  1  foo

有时,此操作可能在用户定义函数中失败(例如,使用 DataFrame.apply 时),或者可能过于昂贵。对于这些情况,许多函数支持一个可选的 meta 关键字,该关键字允许直接指定元数据,从而避免推断步骤。为方便起见,它支持多种选项:

  1. 具有适当 dtypes 和名称的 Pandas 对象。如果不为空,将取空切片。

>>> ddf.map_partitions(foo, meta=pd.DataFrame({'a': [1], 'b': [2]}))
  1. 对适当名称和 dtypes 的描述。这可以有几种形式:

  • 一个 dict 格式的 {name: dtype} 或一个 (name, dtype) 可迭代对象指定一个 DataFrame。请注意,顺序很重要:meta 中名称的顺序应与列的顺序匹配。

  • 一个 (name, dtype) 元组指定一个 series。

此关键字适用于所有接受用户提供的可调用对象(例如 DataFrame.map_partitions, DataFrame.apply 等)的函数/方法,以及许多创建函数(例如 dd.from_delayed)。

分区

在内部,Dask DataFrame 被分割成许多分区,每个分区是一个 Pandas DataFrame。这些 DataFrame 沿索引纵向分割。当我们的索引已排序并且我们知道分区划分的值时,我们可以在昂贵的算法(例如 groupby、join 等)方面变得更加智能和高效。

例如,如果我们有一个时间序列索引,那么我们的分区可能按月划分:所有一月份的数据将位于一个分区,而所有二月份的数据将位于下一个分区。在这些情况下,沿索引进行诸如 locgroupbyjoin/merge 的操作将比并行方式效率高 很多。您可以使用以下字段查看 DataFrame 的分区数量和 divisions 值:

>>> df.npartitions
4
>>> df.divisions
['2015-01-01', '2015-02-01', '2015-03-01', '2015-04-01', '2015-04-31']

分区数量和 divisions 值在优化期间可能会发生变化。优化器将尝试创建大小合理的分区,以避免通过许多小分区给调度器带来压力。

Divisions 包括每个分区索引的最小值和最后一个分区索引的最大值。在上面的示例中,如果用户搜索特定的日期时间范围,那么我们就知道需要检查哪些分区以及可以丢弃哪些分区。

>>> df.loc['2015-01-20': '2015-02-10']  # Must inspect first two partitions

通常我们没有关于分区的信息。例如,读取 CSV 文件时,如果我们没有额外的用户输入,就不知道数据是如何划分的。在这种情况下,.divisions 将全部为 None

>>> df.divisions
[None, None, None, None, None]

在这些情况下,任何需要具有已知 divisions 的清晰分区 DataFrame 的操作都必须执行排序。这通常可以通过调用 df.set_index(...) 来实现。

分组

默认情况下,groupby 将根据几个不同的因素选择输出分区的数量。它将查看分组键的数量来猜测数据的基数。它将利用此信息计算基于输入分区数量的因子。您可以通过使用 split_out 参数指定输出分区的数量来覆盖此行为。

result = df.groupby('id').value.mean()
result.npartitions  # returns 1

result = df.groupby(['id', 'id2']).value.mean()
result.npartitions  # returns 5

result = df.groupby('id').value.mean(split_out=8)
result.npartitions  # returns 8

一些 groupby 聚合函数具有不同的 split_out 默认值。split_out=True 将保持分区数量不变,这对于不会大幅减少行数的操作很有用。

result = df.groupby('id').value.nunique()
result.npartitions  # returns same as df.npartitions