Dask DataFrame 设计

Dask DataFrame 设计

Dask DataFrames 协调沿索引排列的许多 Pandas DataFrames/Series。我们用以下组件定义一个 Dask DataFrame 对象:

  • 一个带有特殊键集(指定分区)的 Dask 图,例如 ('x', 0), ('x', 1), ...

  • 一个名称,用于标识 Dask 图中哪些键指向此 DataFrame,例如 'x'

  • 一个空的 Pandas 对象,包含适当的元数据(例如列名、数据类型等)

  • 沿索引的一系列分区边界,称为 divisions

元数据

许多 DataFrame 操作依赖于知道列的名称和数据类型。为了跟踪这些信息,所有 Dask DataFrame 对象都有一个 _meta 属性,其中包含一个具有相同数据类型和名称的空 Pandas 对象。例如:

>>> df = pd.DataFrame({'a': [1, 2, 3], 'b': ['x', 'y', 'z']})
>>> ddf = dd.from_pandas(df, npartitions=2)
>>> ddf._meta
Empty DataFrame
Columns: [a, b]
Index: []
>>> ddf._meta.dtypes
a     int64
b    object
dtype: object

在内部,Dask DataFrame 会尽力将这些信息通过所有操作进行传播,因此大多数情况下用户不必担心这个问题。通常,这是通过对少量模拟数据样本进行操作评估来实现的,这些数据可以在 _meta_nonempty 属性上找到。

>>> ddf._meta_nonempty
   a    b
0  1  foo
1  1  foo

有时此操作可能在用户定义函数(例如使用 DataFrame.apply 时)中失败,或者可能非常昂贵。对于这些情况,许多函数支持一个可选的 meta 关键字,允许直接指定元数据,避免推理步骤。为方便起见,它支持多种选项:

  1. 一个具有适当数据类型和名称的 Pandas 对象。如果非空,将取一个空切片。

>>> ddf.map_partitions(foo, meta=pd.DataFrame({'a': [1], 'b': [2]}))
  1. 对适当名称和数据类型的描述。这可以有几种形式:

  • 一个 dict{name: dtype}(name, dtype) 的可迭代对象指定一个 DataFrame。请注意,顺序很重要:meta 中名称的顺序应与列的顺序匹配。

  • 一个 (name, dtype) 的元组指定一个 Series。

此关键字可用于所有接受用户提供的可调用对象(例如 DataFrame.map_partitionsDataFrame.apply 等)的函数/方法,以及许多创建函数(例如 dd.from_delayed)。

分区

在内部,一个 Dask DataFrame 被分成许多分区,每个分区都是一个 Pandas DataFrame。这些 DataFrame 沿索引垂直分割。当我们的索引已排序并且我们知道分区的 divisions 值时,我们就可以巧妙高效地处理昂贵的算法(例如 groupby、join 等)。

例如,如果我们的索引是时间序列,那么我们的分区可以按月划分:所有一月的数据将位于一个分区,而所有二月的数据将位于下一个分区。在这些情况下,沿索引的诸如 locgroupbyjoin/merge 等操作会比并行情况下可能实现的效率高得多。您可以使用以下字段查看 DataFrame 的分区数量和 divisions 值:

>>> df.npartitions
4
>>> df.divisions
['2015-01-01', '2015-02-01', '2015-03-01', '2015-04-01', '2015-04-31']

分区数量和 division 值可能会在优化过程中发生变化。优化器将尝试创建大小合理的分区,以避免因许多小分区而给调度器带来压力。

Divisions 包含每个分区的索引最小值以及最后一个分区的索引最大值。在上面的示例中,如果用户搜索特定的日期时间范围,我们就知道需要检查哪些分区以及可以丢弃哪些分区。

>>> df.loc['2015-01-20': '2015-02-10']  # Must inspect first two partitions

通常我们没有关于分区的信息。例如,读取 CSV 文件时,如果不额外输入,我们不知道数据是如何划分的。在这种情况下,.divisions 将全部为 None

>>> df.divisions
[None, None, None, None, None]

在这些情况下,任何需要具有已知 divisions 的干净分区 DataFrame 的操作都必须执行排序。这通常可以通过调用 df.set_index(...) 来实现。

Groupby

默认情况下,groupby 会根据几个不同的因素来选择输出分区的数量。它会查看分组键的数量来猜测数据的基数。它会利用这些信息计算一个基于输入分区数量的因子。您可以通过使用 split_out 参数指定输出分区的数量来覆盖此行为。

result = df.groupby('id').value.mean()
result.npartitions  # returns 1

result = df.groupby(['id', 'id2']).value.mean()
result.npartitions  # returns 5

result = df.groupby('id').value.mean(split_out=8)
result.npartitions  # returns 8

一些 groupby 聚合函数的 split_out 默认值不同。split_out=True 会保持分区数量不变,这对于那些不会显著减少行数的操作很有用。

result = df.groupby('id').value.nunique()
result.npartitions  # returns same as df.npartitions