Dask DataFrame 设计
目录
Dask DataFrame 设计¶
Dask DataFrame 协调了许多沿索引排列的 Pandas DataFrame/Series。我们使用以下组件定义 Dask DataFrame 对象:
具有指定分区的特殊键集合的 Dask 图,例如
('x', 0), ('x', 1), ...
用于标识 Dask 图中哪些键指向此 DataFrame 的名称,例如
'x'
一个空的 Pandas 对象,包含适当的元数据(例如列名、dtypes 等)
沿索引的分区边界序列,称为
divisions
元数据¶
许多 DataFrame 操作依赖于知道列的名称和 dtype。为了跟踪这些信息,所有 Dask DataFrame 对象都有一个 _meta
属性,该属性包含一个空的 Pandas 对象,具有相同的 dtypes 和名称。例如:
>>> df = pd.DataFrame({'a': [1, 2, 3], 'b': ['x', 'y', 'z']})
>>> ddf = dd.from_pandas(df, npartitions=2)
>>> ddf._meta
Empty DataFrame
Columns: [a, b]
Index: []
>>> ddf._meta.dtypes
a int64
b object
dtype: object
在内部,Dask DataFrame 尽力在所有操作中传播此信息,因此大多数时候用户不必担心这个问题。通常这是通过在少量模拟数据样本上评估操作来完成的,这些样本可以在 _meta_nonempty
属性上找到
>>> ddf._meta_nonempty
a b
0 1 foo
1 1 foo
有时,此操作可能在用户定义函数中失败(例如,使用 DataFrame.apply
时),或者可能过于昂贵。对于这些情况,许多函数支持一个可选的 meta
关键字,该关键字允许直接指定元数据,从而避免推断步骤。为方便起见,它支持多种选项:
具有适当 dtypes 和名称的 Pandas 对象。如果不为空,将取空切片。
>>> ddf.map_partitions(foo, meta=pd.DataFrame({'a': [1], 'b': [2]}))
对适当名称和 dtypes 的描述。这可以有几种形式:
一个
dict
格式的{name: dtype}
或一个(name, dtype)
可迭代对象指定一个 DataFrame。请注意,顺序很重要:meta
中名称的顺序应与列的顺序匹配。一个
(name, dtype)
元组指定一个 series。
此关键字适用于所有接受用户提供的可调用对象(例如 DataFrame.map_partitions
, DataFrame.apply
等)的函数/方法,以及许多创建函数(例如 dd.from_delayed
)。
分区¶
在内部,Dask DataFrame 被分割成许多分区,每个分区是一个 Pandas DataFrame。这些 DataFrame 沿索引纵向分割。当我们的索引已排序并且我们知道分区划分的值时,我们可以在昂贵的算法(例如 groupby、join 等)方面变得更加智能和高效。
例如,如果我们有一个时间序列索引,那么我们的分区可能按月划分:所有一月份的数据将位于一个分区,而所有二月份的数据将位于下一个分区。在这些情况下,沿索引进行诸如 loc
、groupby
和 join/merge
的操作将比并行方式效率高 很多。您可以使用以下字段查看 DataFrame 的分区数量和 divisions 值:
>>> df.npartitions
4
>>> df.divisions
['2015-01-01', '2015-02-01', '2015-03-01', '2015-04-01', '2015-04-31']
分区数量和 divisions 值在优化期间可能会发生变化。优化器将尝试创建大小合理的分区,以避免通过许多小分区给调度器带来压力。
Divisions
包括每个分区索引的最小值和最后一个分区索引的最大值。在上面的示例中,如果用户搜索特定的日期时间范围,那么我们就知道需要检查哪些分区以及可以丢弃哪些分区。
>>> df.loc['2015-01-20': '2015-02-10'] # Must inspect first two partitions
通常我们没有关于分区的信息。例如,读取 CSV 文件时,如果我们没有额外的用户输入,就不知道数据是如何划分的。在这种情况下,.divisions
将全部为 None
。
>>> df.divisions
[None, None, None, None, None]
在这些情况下,任何需要具有已知 divisions 的清晰分区 DataFrame 的操作都必须执行排序。这通常可以通过调用 df.set_index(...)
来实现。
分组¶
默认情况下,groupby 将根据几个不同的因素选择输出分区的数量。它将查看分组键的数量来猜测数据的基数。它将利用此信息计算基于输入分区数量的因子。您可以通过使用 split_out 参数指定输出分区的数量来覆盖此行为。
result = df.groupby('id').value.mean()
result.npartitions # returns 1
result = df.groupby(['id', 'id2']).value.mean()
result.npartitions # returns 5
result = df.groupby('id').value.mean(split_out=8)
result.npartitions # returns 8
一些 groupby 聚合函数具有不同的 split_out 默认值。split_out=True 将保持分区数量不变,这对于不会大幅减少行数的操作很有用。
result = df.groupby('id').value.nunique()
result.npartitions # returns same as df.npartitions