用户界面
目录
用户界面¶
Dask 支持多种用户界面
- 高级
数组: 并行 NumPy
包: 并行列表
DataFrames: 并行 Pandas
机器学习 : 并行 Scikit-Learn
其他来自外部项目,例如 XArray
这些用户界面都采用相同的底层并行计算机制,因此具有相同的扩展性、诊断能力、弹性等特性,但每个界面提供一组不同的并行算法和编程风格。
本文档帮助您决定哪种用户界面最适合您的需求,并提供适用于所有界面的一些通用信息。上面链接的页面提供了更深入的每个界面的更多信息。
高级集合¶
许多开始使用 Dask 的人都在明确寻找 NumPy、Pandas 或 Scikit-Learn 的可扩展版本。对于这些情况,Dask 中的起点通常相当明确。如果您需要可扩展的 NumPy 数组,请从 Dask array 开始;如果您需要可扩展的 Pandas DataFrames,请从 Dask DataFrame 开始,依此类推。
这些高级接口复制了标准接口,并进行了一些细微的变动。这些接口会自动为您在更大的数据集上并行化原始项目的大部分 API。
# Arrays
import dask.array as da
rng = da.random.default_rng()
x = rng.uniform(low=0, high=10, size=(10000, 10000), # normal numpy code
chunks=(1000, 1000)) # break into chunks of size 1000x1000
y = x + x.T - x.mean(axis=0) # Use normal syntax for high level algorithms
# DataFrames
import dask.dataframe as dd
df = dd.read_csv('2018-*-*.csv', parse_dates='timestamp', # normal Pandas code
blocksize=64000000) # break text into 64MB chunks
s = df.groupby('name').balance.mean() # Use normal syntax for high level algorithms
# Bags / lists
import dask.bag as db
b = db.read_text('*.json').map(json.loads)
total = (b.filter(lambda d: d['name'] == 'Alice')
.map(lambda d: d['balance'])
.sum())
重要的是要记住,虽然 API 可能相似,但确实存在一些差异。此外,由于并行编程的优缺点,某些算法的性能可能与其内存中对应算法不同。使用 Dask 时仍然需要一些思考和注意。
低级接口¶
在并行化现有代码库或构建自定义算法时,您经常会遇到可以并行化的代码,但它不仅仅是一个大型 DataFrame 或数组。考虑下面的循环代码
results = []
for a in A:
for b in B:
if a < b:
c = f(a, b)
else:
c = g(a, b)
results.append(c)
这段代码具有潜在的并行性(对 f
和 g
的多次调用可以并行执行),但尚不清楚如何将其重写成一个大型数组或 DataFrame 以便使用更高级别的 API。即使您可以将其重写成其中一种范式,也不清楚这是否是个好主意。很多含义很可能在转换过程中丢失,而且对于更复杂的系统来说,这个过程会变得更加困难。
相反,Dask 的低级 API 允许您在现有循环的上下文中一次一个函数调用地编写并行代码。这里一个常见的解决方案是使用 Dask delayed 将单个函数调用包装成一个惰性构建的任务图。
import dask
lazy_results = []
for a in A:
for b in B:
if a < b:
c = dask.delayed(f)(a, b) # add lazy task
else:
c = dask.delayed(g)(a, b) # add lazy task
lazy_results.append(c)
results = dask.compute(*lazy_results) # compute all in parallel
结合高级和低级接口¶
结合高级和低级接口是常见的做法。例如,您可以使用 Dask array/bag/dataframe 加载数据并进行初步预处理,然后切换到 Dask delayed 来实现特定于您领域的自定义算法,然后再切换回 Dask array/dataframe 来清理和存储结果。理解这两套用户界面以及如何在它们之间切换,可以是一种高效的组合。
# Convert to a list of delayed Pandas dataframes
delayed_values = df.to_delayed()
# Manipulate delayed values arbitrarily as you like
# Convert many delayed Pandas DataFrames back to a single Dask DataFrame
df = dd.from_delayed(delayed_values)
惰性计算¶
大多数 Dask 用户界面都是 惰性 的,这意味着它们不会执行计算,直到您使用 compute
方法明确请求结果。
# This array syntax doesn't cause computation
y = x + x.T - x.mean(axis=0)
# Trigger computation by explicitly calling the compute method
y = y.compute()
如果您有多个想要同时计算的结果,请使用 dask.compute
函数。这样可以共享中间结果,从而提高效率。
# compute multiple results at the same time with the compute function
min, max = dask.compute(y.min(), y.max())
请注意,compute()
函数返回内存中的结果。它将 Dask DataFrames 转换为 Pandas DataFrames,将 Dask arrays 转换为 NumPy arrays,将 Dask bags 转换为列表。 您应该只对可以轻松容纳在内存中的结果调用 compute 。如果您的结果不适合内存,那么您可以考虑将其写入磁盘。
# Write larger results out to disk rather than store them in memory
my_dask_dataframe.to_parquet('myfile.parquet')
my_dask_array.to_hdf5('myfile.hdf5')
my_dask_bag.to_textfiles('myfile.*.txt')
持久化到分布式内存¶
警告
persist 会将整个数据集存储在内存中。这有一个缺点,即可用内存必须实际超过数据集的大小。仅当交互式地反复迭代相同数据集时才使用 persist,并在生产用例中尽可能避免使用它。
另外,如果您在集群上,您可能希望触发计算并将结果存储在分布式内存中。在这种情况下,您不需要调用 compute
,因为它会创建一个单独的 Pandas、NumPy 或列表结果。相反,您应该调用 persist
,它返回一个新的 Dask 对象,该对象指向正在计算或已经计算完成并分布在集群内存中的结果。
# Compute returns an in-memory non-Dask object
y = y.compute()
# Persist returns an in-memory Dask object that uses distributed storage if available
y = y.persist()
这通常在数据加载和预处理步骤之后、但在快速迭代、探索或复杂算法之前可以看到。例如,我们可能会读入大量数据,过滤到一个更易于管理的数据子集,然后将数据持久化到内存中,以便我们可以快速迭代。
import dask.dataframe as dd
df = dd.read_parquet('...')
df = df[df.name == 'Alice'] # select important subset of data
df = df.persist() # trigger computation in the background
# These are all relatively fast now that the relevant data is in memory
df.groupby(df.id).balance.sum().compute() # explore data quickly
df.groupby(df.id).balance.mean().compute() # explore data quickly
df.id.nunique() # explore data quickly
惰性 vs 即时¶
如上所述,大多数 Dask 工作负载都是惰性的,也就是说,它们不会开始任何工作,直到您通过调用 compute()
明确触发它们。但是,有时您 确实 希望尽快提交工作,随着时间跟踪它,根据部分结果提交新工作或取消工作等等。这在跟踪或响应实时事件、处理流数据或构建复杂和自适应算法时非常有用。
对于这些情况,人们通常会转向 futures 接口,它是一种像 Dask delayed 一样的低级接口,但它是立即操作而不是惰性操作。
这里是使用 Dask delayed 和 Dask futures 来说明差异的相同示例。
Delayed: 惰性¶
@dask.delayed
def inc(x):
return x + 1
@dask.delayed
def add(x, y):
return x + y
a = inc(1) # no work has happened yet
b = inc(2) # no work has happened yet
c = add(a, b) # no work has happened yet
c = c.compute() # This triggers all of the above computations
Futures: 即时¶
from dask.distributed import Client
client = Client()
def inc(x):
return x + 1
def add(x, y):
return x + y
a = client.submit(inc, 1) # work starts immediately
b = client.submit(inc, 2) # work starts immediately
c = client.submit(add, a, b) # work starts immediately
c = c.result() # block until work finishes, then gather result
您还可以使用 persist
函数触发高级集合的工作。在使用分布式调度器时,这将导致工作在后台进行。
结合接口¶
有既定的方法可以结合上述接口
高级接口 (array, bag, dataframe) 有一个
to_delayed
方法,可以将它们转换为 Dask delayed 对象的序列(或网格)delayeds = df.to_delayed()
高级接口 (array, bag, dataframe) 有一个
from_delayed
方法,可以从 Delayed 或 Future 对象转换df = dd.from_delayed(delayeds) df = dd.from_delayed(futures)
Client.compute
方法将 Delayed 对象转换为 Futuresfutures = client.compute(delayeds)
dask.distributed.futures_of
函数从持久化集合中收集 futuresfrom dask.distributed import futures_of df = df.persist() # start computation in the background futures = futures_of(df)
Dask.delayed 对象将 Futures 转换为 delayed 对象
delayed_value = dask.delayed(future)
上述方法足以将任何接口转换为其他任何接口。我们经常看到一些效果不佳的反模式
在高级对象(如 Dask arrays 或 DataFrames)上调用低级 API(delayed 或 futures)。这会将这些对象降级为它们的 NumPy 或 Pandas 等效对象,这可能不是期望的结果。人们通常寻找像
dask.array.map_blocks
或dask.dataframe.map_partitions
这样的 API。在 Future 对象上调用
compute()
。人们通常想要.result()
方法。在高级 Dask 对象上调用 NumPy/Pandas 函数或在 NumPy/Pandas 对象上调用高级 Dask 函数