用户界面
目录
用户界面¶
Dask 支持多种用户界面
- 高级
数组: 并行 NumPy
包 (Bag): 并行列表
DataFrames: 并行 Pandas
机器学习 : 并行 Scikit-Learn
其他外部项目,例如 XArray
这些用户界面都采用相同的底层并行计算机制,因此具有相同的伸缩性、诊断能力、弹性等,但每个界面提供了不同的并行算法集和编程风格。
本文档帮助您决定哪个用户界面最适合您的需求,并提供适用于所有界面的一些通用信息。上面链接的页面更深入地提供了有关每个界面的更多信息。
高级集合 (Collections)¶
许多开始使用 Dask 的人明确地在寻找 NumPy、Pandas 或 Scikit-Learn 的可伸缩版本。对于这些情况,Dask 中的起点通常相当明确。如果您需要可伸缩的 NumPy 数组,那么从 Dask 数组开始;如果您需要可伸缩的 Pandas DataFrames,那么从 Dask DataFrame 开始,依此类推。
这些高级接口复制了标准接口,并有细微变化。对于原始项目 API 的很大一部分,这些接口会自动为您并行处理更大的数据集。
# Arrays
import dask.array as da
rng = da.random.default_rng()
x = rng.uniform(low=0, high=10, size=(10000, 10000), # normal numpy code
chunks=(1000, 1000)) # break into chunks of size 1000x1000
y = x + x.T - x.mean(axis=0) # Use normal syntax for high level algorithms
# DataFrames
import dask.dataframe as dd
df = dd.read_csv('2018-*-*.csv', parse_dates='timestamp', # normal Pandas code
blocksize=64000000) # break text into 64MB chunks
s = df.groupby('name').balance.mean() # Use normal syntax for high level algorithms
# Bags / lists
import dask.bag as db
b = db.read_text('*.json').map(json.loads)
total = (b.filter(lambda d: d['name'] == 'Alice')
.map(lambda d: d['balance'])
.sum())
重要的是要记住,虽然 API 可能相似,但确实存在一些差异。此外,由于并行编程的优缺点,某些算法的性能可能与其内存中对应的算法不同。使用 Dask 时仍然需要一些思考和关注。
低级接口¶
通常在并行化现有代码库或构建自定义算法时,您会遇到可并行化但并非仅仅是大型 DataFrame 或数组的代码。考虑下面的循环代码
results = []
for a in A:
for b in B:
if a < b:
c = f(a, b)
else:
c = g(a, b)
results.append(c)
此代码中存在潜在的并行性(对 f
和 g
的多次调用可以并行完成),但尚不清楚如何将其重写为大型数组或 DataFrame 以便使用更高级别的 API。即使您可以将其重写为这些范例之一,也不清楚这是否是一个好主意。大部分意义很可能在转换中丢失,并且对于更复杂的系统,此过程将变得更加困难。
相反,Dask 的低级 API 允许您在现有循环的上下文中一次调用一个函数来编写并行代码。一个常见的解决方案是使用 Dask delayed 将单个函数调用包装到惰性构造的任务图中
import dask
lazy_results = []
for a in A:
for b in B:
if a < b:
c = dask.delayed(f)(a, b) # add lazy task
else:
c = dask.delayed(g)(a, b) # add lazy task
lazy_results.append(c)
results = dask.compute(*lazy_results) # compute all in parallel
结合高级和低级接口¶
结合高级和低级接口是很常见的。例如,您可以使用 Dask 数组/包/DataFrame 加载数据并进行初步预处理,然后切换到 Dask delayed 进行特定于您领域的自定义算法,然后再切换回 Dask 数组/DataFrame 清理并存储结果。理解两套用户界面以及如何在其间切换,可以是一种高效的组合。
# Convert to a list of delayed Pandas dataframes
delayed_values = df.to_delayed()
# Manipulate delayed values arbitrarily as you like
# Convert many delayed Pandas DataFrames back to a single Dask DataFrame
df = dd.from_delayed(delayed_values)
惰性计算¶
大多数 Dask 用户界面是惰性的,这意味着它们直到您使用 compute
方法显式请求结果时才会进行评估
# This array syntax doesn't cause computation
y = x + x.T - x.mean(axis=0)
# Trigger computation by explicitly calling the compute method
y = y.compute()
如果您想同时计算多个结果,请使用 dask.compute
函数。这可以共享中间结果,因此更高效
# compute multiple results at the same time with the compute function
min, max = dask.compute(y.min(), y.max())
请注意,compute()
函数返回内存中的结果。它将 Dask DataFrames 转换为 Pandas DataFrames,Dask 数组转换为 NumPy 数组,Dask 包转换为列表。您只应在能够舒适地容纳在内存中的结果上调用 compute。如果您的结果无法容纳在内存中,那么您可以考虑将其写入磁盘。
# Write larger results out to disk rather than store them in memory
my_dask_dataframe.to_parquet('myfile.parquet')
my_dask_array.to_hdf5('myfile.hdf5')
my_dask_bag.to_textfiles('myfile.*.txt')
持久化到分布式内存¶
警告
persist 会将整个数据集存储在内存中。这样做的缺点是可用内存必须实际超过数据集的大小。仅在反复交互式处理同一数据集时使用 persist,并在生产用例中尽可能避免使用它。
另外,如果您在集群上,您可能希望触发计算并将结果存储在分布式内存中。在这种情况下,您不想调用 compute
,因为这会创建一个单一的 Pandas、NumPy 或列表结果。相反,您希望调用 persist
,它返回一个新的 Dask 对象,指向正在积极计算或已经计算完毕并分散在集群内存中的结果。
# Compute returns an in-memory non-Dask object
y = y.compute()
# Persist returns an in-memory Dask object that uses distributed storage if available
y = y.persist()
这通常在数据加载和预处理步骤之后,但在快速迭代、探索或复杂算法之前看到。例如,我们可能会读取大量数据,筛选出更易于管理的子集,然后将数据持久化到内存中,以便快速迭代。
import dask.dataframe as dd
df = dd.read_parquet('...')
df = df[df.name == 'Alice'] # select important subset of data
df = df.persist() # trigger computation in the background
# These are all relatively fast now that the relevant data is in memory
df.groupby(df.id).balance.sum().compute() # explore data quickly
df.groupby(df.id).balance.mean().compute() # explore data quickly
df.id.nunique() # explore data quickly
惰性 vs 即时¶
如上所述,大多数 Dask 工作负载是惰性的,也就是说,它们直到您通过调用 compute()
显式触发时才会开始任何工作。然而,有时您确实希望尽快提交工作,随时间跟踪它,根据部分结果提交新工作或取消工作等等。这在跟踪或响应实时事件、处理流数据或构建复杂自适应算法时非常有用。
对于这些情况,人们通常转向 futures 接口,它是一个像 Dask delayed 一样的低级接口,但它是立即操作而不是惰性操作。
下面是使用 Dask delayed 和 Dask futures 的同一个示例,以说明它们的区别。
Delayed: 惰性¶
@dask.delayed
def inc(x):
return x + 1
@dask.delayed
def add(x, y):
return x + y
a = inc(1) # no work has happened yet
b = inc(2) # no work has happened yet
c = add(a, b) # no work has happened yet
c = c.compute() # This triggers all of the above computations
Futures: 即时¶
from dask.distributed import Client
client = Client()
def inc(x):
return x + 1
def add(x, y):
return x + y
a = client.submit(inc, 1) # work starts immediately
b = client.submit(inc, 2) # work starts immediately
c = client.submit(add, a, b) # work starts immediately
c = c.result() # block until work finishes, then gather result
您还可以使用 persist
函数触发高级集合的工作。在使用分布式调度器时,这将使工作在后台进行。
结合接口¶
有一些既定的方法可以组合上述接口
高级接口(数组、包、DataFrame)有一个
to_delayed
方法,可以将对象转换为 Dask delayed 对象的序列(或网格)delayeds = df.to_delayed()
高级接口(数组、包、DataFrame)有一个
from_delayed
方法,可以从 Delayed 或 Future 对象转换而来df = dd.from_delayed(delayeds) df = dd.from_delayed(futures)
Client.compute
方法将 Delayed 对象转换为 Futuresfutures = client.compute(delayeds)
dask.distributed.futures_of
函数从持久化集合中收集 futuresfrom dask.distributed import futures_of df = df.persist() # start computation in the background futures = futures_of(df)
Dask.delayed 对象将 Futures 转换为 delayed 对象
delayed_value = dask.delayed(future)
上述方法应该足以将任何接口转换为任何其他接口。我们经常看到一些效果不佳的反模式
在高级对象(如 Dask 数组或 DataFrame)上调用低级 API(delayed 或 futures)。这会将这些对象降级为它们的 NumPy 或 Pandas 对应物,这可能不是期望的结果。人们通常会寻找像
dask.array.map_blocks
或dask.dataframe.map_partitions
这样的 API。在 Future 对象上调用
compute()
。人们通常想要.result()
方法。在高级 Dask 对象上调用 NumPy/Pandas 函数,或在 NumPy/Pandas 对象上调用高级 Dask 函数