用户界面

Dask 支持多种用户界面

这些用户界面都采用相同的底层并行计算机制,因此具有相同的扩展性、诊断能力、弹性等特性,但每个界面提供一组不同的并行算法和编程风格。

本文档帮助您决定哪种用户界面最适合您的需求,并提供适用于所有界面的一些通用信息。上面链接的页面提供了更深入的每个界面的更多信息。

高级集合

许多开始使用 Dask 的人都在明确寻找 NumPy、Pandas 或 Scikit-Learn 的可扩展版本。对于这些情况,Dask 中的起点通常相当明确。如果您需要可扩展的 NumPy 数组,请从 Dask array 开始;如果您需要可扩展的 Pandas DataFrames,请从 Dask DataFrame 开始,依此类推。

这些高级接口复制了标准接口,并进行了一些细微的变动。这些接口会自动为您在更大的数据集上并行化原始项目的大部分 API。

# Arrays
import dask.array as da
rng = da.random.default_rng()
x = rng.uniform(low=0, high=10, size=(10000, 10000),  # normal numpy code
                chunks=(1000, 1000))  # break into chunks of size 1000x1000

y = x + x.T - x.mean(axis=0)  # Use normal syntax for high level algorithms

# DataFrames
import dask.dataframe as dd
df = dd.read_csv('2018-*-*.csv', parse_dates='timestamp',  # normal Pandas code
                 blocksize=64000000)  # break text into 64MB chunks

s = df.groupby('name').balance.mean()  # Use normal syntax for high level algorithms

# Bags / lists
import dask.bag as db
b = db.read_text('*.json').map(json.loads)
total = (b.filter(lambda d: d['name'] == 'Alice')
          .map(lambda d: d['balance'])
          .sum())

重要的是要记住,虽然 API 可能相似,但确实存在一些差异。此外,由于并行编程的优缺点,某些算法的性能可能与其内存中对应算法不同。使用 Dask 时仍然需要一些思考和注意。

低级接口

在并行化现有代码库或构建自定义算法时,您经常会遇到可以并行化的代码,但它不仅仅是一个大型 DataFrame 或数组。考虑下面的循环代码

results = []
for a in A:
    for b in B:
        if a < b:
            c = f(a, b)
        else:
            c = g(a, b)
        results.append(c)

这段代码具有潜在的并行性(对 fg 的多次调用可以并行执行),但尚不清楚如何将其重写成一个大型数组或 DataFrame 以便使用更高级别的 API。即使您可以将其重写成其中一种范式,也不清楚这是否是个好主意。很多含义很可能在转换过程中丢失,而且对于更复杂的系统来说,这个过程会变得更加困难。

相反,Dask 的低级 API 允许您在现有循环的上下文中一次一个函数调用地编写并行代码。这里一个常见的解决方案是使用 Dask delayed 将单个函数调用包装成一个惰性构建的任务图。

import dask

lazy_results = []
for a in A:
    for b in B:
        if a < b:
            c = dask.delayed(f)(a, b)  # add lazy task
        else:
            c = dask.delayed(g)(a, b)  # add lazy task
        lazy_results.append(c)

results = dask.compute(*lazy_results)  # compute all in parallel

结合高级和低级接口

结合高级和低级接口是常见的做法。例如,您可以使用 Dask array/bag/dataframe 加载数据并进行初步预处理,然后切换到 Dask delayed 来实现特定于您领域的自定义算法,然后再切换回 Dask array/dataframe 来清理和存储结果。理解这两套用户界面以及如何在它们之间切换,可以是一种高效的组合。

# Convert to a list of delayed Pandas dataframes
delayed_values = df.to_delayed()

# Manipulate delayed values arbitrarily as you like

# Convert many delayed Pandas DataFrames back to a single Dask DataFrame
df = dd.from_delayed(delayed_values)

惰性计算

大多数 Dask 用户界面都是 惰性 的,这意味着它们不会执行计算,直到您使用 compute 方法明确请求结果。

# This array syntax doesn't cause computation
y = x + x.T - x.mean(axis=0)

# Trigger computation by explicitly calling the compute method
y = y.compute()

如果您有多个想要同时计算的结果,请使用 dask.compute 函数。这样可以共享中间结果,从而提高效率。

# compute multiple results at the same time with the compute function
min, max = dask.compute(y.min(), y.max())

请注意,compute() 函数返回内存中的结果。它将 Dask DataFrames 转换为 Pandas DataFrames,将 Dask arrays 转换为 NumPy arrays,将 Dask bags 转换为列表。 您应该只对可以轻松容纳在内存中的结果调用 compute 。如果您的结果不适合内存,那么您可以考虑将其写入磁盘。

# Write larger results out to disk rather than store them in memory
my_dask_dataframe.to_parquet('myfile.parquet')
my_dask_array.to_hdf5('myfile.hdf5')
my_dask_bag.to_textfiles('myfile.*.txt')

持久化到分布式内存

警告

persist 会将整个数据集存储在内存中。这有一个缺点,即可用内存必须实际超过数据集的大小。仅当交互式地反复迭代相同数据集时才使用 persist,并在生产用例中尽可能避免使用它。

另外,如果您在集群上,您可能希望触发计算并将结果存储在分布式内存中。在这种情况下,您不需要调用 compute,因为它会创建一个单独的 Pandas、NumPy 或列表结果。相反,您应该调用 persist,它返回一个新的 Dask 对象,该对象指向正在计算或已经计算完成并分布在集群内存中的结果。

# Compute returns an in-memory non-Dask object
y = y.compute()

# Persist returns an in-memory Dask object that uses distributed storage if available
y = y.persist()

这通常在数据加载和预处理步骤之后、但在快速迭代、探索或复杂算法之前可以看到。例如,我们可能会读入大量数据,过滤到一个更易于管理的数据子集,然后将数据持久化到内存中,以便我们可以快速迭代。

import dask.dataframe as dd
df = dd.read_parquet('...')
df = df[df.name == 'Alice']  # select important subset of data
df = df.persist()  # trigger computation in the background

# These are all relatively fast now that the relevant data is in memory
df.groupby(df.id).balance.sum().compute()   # explore data quickly
df.groupby(df.id).balance.mean().compute()  # explore data quickly
df.id.nunique()                             # explore data quickly

惰性 vs 即时

如上所述,大多数 Dask 工作负载都是惰性的,也就是说,它们不会开始任何工作,直到您通过调用 compute() 明确触发它们。但是,有时您 确实 希望尽快提交工作,随着时间跟踪它,根据部分结果提交新工作或取消工作等等。这在跟踪或响应实时事件、处理流数据或构建复杂和自适应算法时非常有用。

对于这些情况,人们通常会转向 futures 接口,它是一种像 Dask delayed 一样的低级接口,但它是立即操作而不是惰性操作。

这里是使用 Dask delayed 和 Dask futures 来说明差异的相同示例。

Delayed: 惰性

@dask.delayed
def inc(x):
    return x + 1

@dask.delayed
def add(x, y):
    return x + y

a = inc(1)       # no work has happened yet
b = inc(2)       # no work has happened yet
c = add(a, b)    # no work has happened yet

c = c.compute()  # This triggers all of the above computations

Futures: 即时

from dask.distributed import Client
client = Client()

def inc(x):
    return x + 1

def add(x, y):
    return x + y

a = client.submit(inc, 1)     # work starts immediately
b = client.submit(inc, 2)     # work starts immediately
c = client.submit(add, a, b)  # work starts immediately

c = c.result()                # block until work finishes, then gather result

您还可以使用 persist 函数触发高级集合的工作。在使用分布式调度器时,这将导致工作在后台进行。

结合接口

有既定的方法可以结合上述接口

  1. 高级接口 (array, bag, dataframe) 有一个 to_delayed 方法,可以将它们转换为 Dask delayed 对象的序列(或网格)

    delayeds = df.to_delayed()
    
  2. 高级接口 (array, bag, dataframe) 有一个 from_delayed 方法,可以从 Delayed Future 对象转换

    df = dd.from_delayed(delayeds)
    df = dd.from_delayed(futures)
    
  3. Client.compute 方法将 Delayed 对象转换为 Futures

    futures = client.compute(delayeds)
    
  4. dask.distributed.futures_of 函数从持久化集合中收集 futures

    from dask.distributed import futures_of
    
    df = df.persist()  # start computation in the background
    futures = futures_of(df)
    
  5. Dask.delayed 对象将 Futures 转换为 delayed 对象

    delayed_value = dask.delayed(future)
    

上述方法足以将任何接口转换为其他任何接口。我们经常看到一些效果不佳的反模式

  1. 在高级对象(如 Dask arrays 或 DataFrames)上调用低级 API(delayed 或 futures)。这会将这些对象降级为它们的 NumPy 或 Pandas 等效对象,这可能不是期望的结果。人们通常寻找像 dask.array.map_blocksdask.dataframe.map_partitions 这样的 API。

  2. 在 Future 对象上调用 compute()。人们通常想要 .result() 方法。

  3. 在高级 Dask 对象上调用 NumPy/Pandas 函数或在 NumPy/Pandas 对象上调用高级 Dask 函数

结论

大多数使用 Dask 的人刚开始只使用上述接口中的一个,但最终学会了如何将几个接口结合使用。这有助于他们利用高级接口中复杂的算法,同时通过低级接口解决棘手的问题。

有关更多信息,请参阅下面特定用户界面的文档