用户界面

Dask 支持多种用户界面

这些用户界面都采用相同的底层并行计算机制,因此具有相同的伸缩性、诊断能力、弹性等,但每个界面提供了不同的并行算法集和编程风格。

本文档帮助您决定哪个用户界面最适合您的需求,并提供适用于所有界面的一些通用信息。上面链接的页面更深入地提供了有关每个界面的更多信息。

高级集合 (Collections)

许多开始使用 Dask 的人明确地在寻找 NumPy、Pandas 或 Scikit-Learn 的可伸缩版本。对于这些情况,Dask 中的起点通常相当明确。如果您需要可伸缩的 NumPy 数组,那么从 Dask 数组开始;如果您需要可伸缩的 Pandas DataFrames,那么从 Dask DataFrame 开始,依此类推。

这些高级接口复制了标准接口,并有细微变化。对于原始项目 API 的很大一部分,这些接口会自动为您并行处理更大的数据集。

# Arrays
import dask.array as da
rng = da.random.default_rng()
x = rng.uniform(low=0, high=10, size=(10000, 10000),  # normal numpy code
                chunks=(1000, 1000))  # break into chunks of size 1000x1000

y = x + x.T - x.mean(axis=0)  # Use normal syntax for high level algorithms

# DataFrames
import dask.dataframe as dd
df = dd.read_csv('2018-*-*.csv', parse_dates='timestamp',  # normal Pandas code
                 blocksize=64000000)  # break text into 64MB chunks

s = df.groupby('name').balance.mean()  # Use normal syntax for high level algorithms

# Bags / lists
import dask.bag as db
b = db.read_text('*.json').map(json.loads)
total = (b.filter(lambda d: d['name'] == 'Alice')
          .map(lambda d: d['balance'])
          .sum())

重要的是要记住,虽然 API 可能相似,但确实存在一些差异。此外,由于并行编程的优缺点,某些算法的性能可能与其内存中对应的算法不同。使用 Dask 时仍然需要一些思考和关注。

低级接口

通常在并行化现有代码库或构建自定义算法时,您会遇到可并行化但并非仅仅是大型 DataFrame 或数组的代码。考虑下面的循环代码

results = []
for a in A:
    for b in B:
        if a < b:
            c = f(a, b)
        else:
            c = g(a, b)
        results.append(c)

此代码中存在潜在的并行性(对 fg 的多次调用可以并行完成),但尚不清楚如何将其重写为大型数组或 DataFrame 以便使用更高级别的 API。即使您可以将其重写为这些范例之一,也不清楚这是否是一个好主意。大部分意义很可能在转换中丢失,并且对于更复杂的系统,此过程将变得更加困难。

相反,Dask 的低级 API 允许您在现有循环的上下文中一次调用一个函数来编写并行代码。一个常见的解决方案是使用 Dask delayed 将单个函数调用包装到惰性构造的任务图中

import dask

lazy_results = []
for a in A:
    for b in B:
        if a < b:
            c = dask.delayed(f)(a, b)  # add lazy task
        else:
            c = dask.delayed(g)(a, b)  # add lazy task
        lazy_results.append(c)

results = dask.compute(*lazy_results)  # compute all in parallel

结合高级和低级接口

结合高级和低级接口是很常见的。例如,您可以使用 Dask 数组/包/DataFrame 加载数据并进行初步预处理,然后切换到 Dask delayed 进行特定于您领域的自定义算法,然后再切换回 Dask 数组/DataFrame 清理并存储结果。理解两套用户界面以及如何在其间切换,可以是一种高效的组合。

# Convert to a list of delayed Pandas dataframes
delayed_values = df.to_delayed()

# Manipulate delayed values arbitrarily as you like

# Convert many delayed Pandas DataFrames back to a single Dask DataFrame
df = dd.from_delayed(delayed_values)

惰性计算

大多数 Dask 用户界面是惰性的,这意味着它们直到您使用 compute 方法显式请求结果时才会进行评估

# This array syntax doesn't cause computation
y = x + x.T - x.mean(axis=0)

# Trigger computation by explicitly calling the compute method
y = y.compute()

如果您想同时计算多个结果,请使用 dask.compute 函数。这可以共享中间结果,因此更高效

# compute multiple results at the same time with the compute function
min, max = dask.compute(y.min(), y.max())

请注意,compute() 函数返回内存中的结果。它将 Dask DataFrames 转换为 Pandas DataFrames,Dask 数组转换为 NumPy 数组,Dask 包转换为列表。您只应在能够舒适地容纳在内存中的结果上调用 compute。如果您的结果无法容纳在内存中,那么您可以考虑将其写入磁盘。

# Write larger results out to disk rather than store them in memory
my_dask_dataframe.to_parquet('myfile.parquet')
my_dask_array.to_hdf5('myfile.hdf5')
my_dask_bag.to_textfiles('myfile.*.txt')

持久化到分布式内存

警告

persist 会将整个数据集存储在内存中。这样做的缺点是可用内存必须实际超过数据集的大小。仅在反复交互式处理同一数据集时使用 persist,并在生产用例中尽可能避免使用它。

另外,如果您在集群上,您可能希望触发计算并将结果存储在分布式内存中。在这种情况下,您不想调用 compute,因为这会创建一个单一的 Pandas、NumPy 或列表结果。相反,您希望调用 persist,它返回一个新的 Dask 对象,指向正在积极计算或已经计算完毕并分散在集群内存中的结果。

# Compute returns an in-memory non-Dask object
y = y.compute()

# Persist returns an in-memory Dask object that uses distributed storage if available
y = y.persist()

这通常在数据加载和预处理步骤之后,但在快速迭代、探索或复杂算法之前看到。例如,我们可能会读取大量数据,筛选出更易于管理的子集,然后将数据持久化到内存中,以便快速迭代。

import dask.dataframe as dd
df = dd.read_parquet('...')
df = df[df.name == 'Alice']  # select important subset of data
df = df.persist()  # trigger computation in the background

# These are all relatively fast now that the relevant data is in memory
df.groupby(df.id).balance.sum().compute()   # explore data quickly
df.groupby(df.id).balance.mean().compute()  # explore data quickly
df.id.nunique()                             # explore data quickly

惰性 vs 即时

如上所述,大多数 Dask 工作负载是惰性的,也就是说,它们直到您通过调用 compute() 显式触发时才会开始任何工作。然而,有时您确实希望尽快提交工作,随时间跟踪它,根据部分结果提交新工作或取消工作等等。这在跟踪或响应实时事件、处理流数据或构建复杂自适应算法时非常有用。

对于这些情况,人们通常转向 futures 接口,它是一个像 Dask delayed 一样的低级接口,但它是立即操作而不是惰性操作。

下面是使用 Dask delayed 和 Dask futures 的同一个示例,以说明它们的区别。

Delayed: 惰性

@dask.delayed
def inc(x):
    return x + 1

@dask.delayed
def add(x, y):
    return x + y

a = inc(1)       # no work has happened yet
b = inc(2)       # no work has happened yet
c = add(a, b)    # no work has happened yet

c = c.compute()  # This triggers all of the above computations

Futures: 即时

from dask.distributed import Client
client = Client()

def inc(x):
    return x + 1

def add(x, y):
    return x + y

a = client.submit(inc, 1)     # work starts immediately
b = client.submit(inc, 2)     # work starts immediately
c = client.submit(add, a, b)  # work starts immediately

c = c.result()                # block until work finishes, then gather result

您还可以使用 persist 函数触发高级集合的工作。在使用分布式调度器时,这将使工作在后台进行。

结合接口

有一些既定的方法可以组合上述接口

  1. 高级接口(数组、包、DataFrame)有一个 to_delayed 方法,可以将对象转换为 Dask delayed 对象的序列(或网格)

    delayeds = df.to_delayed()
    
  2. 高级接口(数组、包、DataFrame)有一个 from_delayed 方法,可以从 Delayed Future 对象转换而来

    df = dd.from_delayed(delayeds)
    df = dd.from_delayed(futures)
    
  3. Client.compute 方法将 Delayed 对象转换为 Futures

    futures = client.compute(delayeds)
    
  4. dask.distributed.futures_of 函数从持久化集合中收集 futures

    from dask.distributed import futures_of
    
    df = df.persist()  # start computation in the background
    futures = futures_of(df)
    
  5. Dask.delayed 对象将 Futures 转换为 delayed 对象

    delayed_value = dask.delayed(future)
    

上述方法应该足以将任何接口转换为任何其他接口。我们经常看到一些效果不佳的反模式

  1. 在高级对象(如 Dask 数组或 DataFrame)上调用低级 API(delayed 或 futures)。这会将这些对象降级为它们的 NumPy 或 Pandas 对应物,这可能不是期望的结果。人们通常会寻找像 dask.array.map_blocksdask.dataframe.map_partitions 这样的 API。

  2. 在 Future 对象上调用 compute()。人们通常想要 .result() 方法。

  3. 在高级 Dask 对象上调用 NumPy/Pandas 函数,或在 NumPy/Pandas 对象上调用高级 Dask 函数

结论

大多数使用 Dask 的人最初只使用上述接口中的一个,但最终会学习如何将几个接口结合使用。这有助于他们利用高级接口中的复杂算法,同时也能解决低级接口中的棘手问题。

有关更多信息,请参阅下面特定用户界面的文档