诊断(本地)

并行代码的性能分析可能具有挑战性,但 dask.diagnostics 提供了功能来帮助使用 本地任务调度器 进行执行分析和检查。

本页介绍以下几个内置选项

  1. 进度条

  2. 性能分析器

  3. 资源性能分析器

  4. 缓存性能分析器

此外,本页还提供了关于如何构建您自己的自定义诊断工具的说明。

进度条

ProgressBar([minimum, width, dt, out])

Dask 的进度条。

ProgressBar 类基于上面描述的调度器回调构建,用于在计算过程中在终端或 notebook 中显示进度条。这可以在长时间运行的图执行期间提供良好的反馈。它可以作为围绕 getcompute 调用的 上下文管理器 使用,以分析计算

>>> import dask.array as da
>>> from dask.diagnostics import ProgressBar
>>> a = da.random.default_rng().normal(size=(10000, 10000), chunks=(1000, 1000))
>>> res = a.dot(a.T).mean(axis=0)

>>> with ProgressBar():
...     out = res.compute()
[########################################] | 100% Completed | 17.1 s

或使用 register 方法全局注册

>>> pbar = ProgressBar()
>>> pbar.register()
>>> out = res.compute()
[########################################] | 100% Completed | 17.1 s

要从全局回调中注销,请调用 unregister 方法

>>> pbar.unregister()

性能分析器

性能分析器()

Dask 执行任务级别的性能分析器。

Dask 提供了一些用于分析执行的工具。与 ProgressBar 一样,它们都可以用作 上下文管理器 或全局注册。

Profiler 类用于分析 Dask 在任务级别的执行。在执行期间,它记录每个任务的以下信息:

  1. 任务

  2. 自 epoch 以来的开始时间(秒)

  3. 自 epoch 以来的结束时间(秒)

  4. 工作进程 ID

资源性能分析器

ResourceProfiler([dt])

用于资源使用的性能分析器。

ResourceProfiler 类用于分析 Dask 在资源级别的执行。在执行期间,它记录每个时间步的以下信息:

  1. 自 epoch 以来时间(秒)

  2. 内存使用量(MB)

  3. CPU 使用率 (%)

默认时间步长是 1 秒,但可以使用 dt 关键字手动设置

>>> from dask.diagnostics import ResourceProfiler
>>> rprof = ResourceProfiler(dt=0.5)

缓存性能分析器

CacheProfiler([metric, metric_name])

Dask 在调度器缓存级别的性能分析器。

CacheProfiler 类用于分析 Dask 在调度器缓存级别的执行。在执行期间,它记录每个任务的以下信息:

  1. 任务

  2. 大小度量

  3. 缓存进入时间(自 epoch 以来秒)

  4. 缓存退出时间(自 epoch 以来秒)

此处的大小度量是对每个任务结果调用的函数的输出。默认度量是计数每个任务(对所有任务而言,metric 为 1)。也可以通过 metric 关键字使用其他函数作为度量。例如,可以使用 cachey 中的 nbytes 函数来测量调度器缓存中的字节数

>>> from dask.diagnostics import CacheProfiler
>>> from cachey import nbytes
>>> cprof = CacheProfiler(metric=nbytes)

示例

作为演示使用诊断工具的示例,我们将分析使用 Dask Array 完成的一些线性代数运算。我们将创建一个随机数组,进行 QR 分解,然后通过将 Q 和 R 分量相乘来重构原始数组。请注意,由于性能分析器(和所有诊断工具)只是 上下文管理器,因此可以在一个 `with` 块中使用多个性能分析器

>>> import dask.array as da
>>> from dask.diagnostics import Profiler, ResourceProfiler, CacheProfiler
>>> a = da.random.random(size=(10000, 1000), chunks=(1000, 1000))
>>> q, r = da.linalg.qr(a)
>>> a2 = q.dot(r)

>>> with Profiler() as prof, ResourceProfiler(dt=0.25) as rprof,
...         CacheProfiler() as cprof:
...     out = a2.compute()

每个性能分析器的结果都存储在其 results 属性中,作为 namedtuple 对象的列表

>>> prof.results[0]
TaskData(key=('tsqr-8d16e396b237bf7a731333130d310cb9_QR_st1', 5, 0),
         task=(qr, (_apply_random, 'random_sample', 1060164455, (1000, 1000), (), {})),
         start_time=1454368444.493292,
         end_time=1454368444.902987,
         worker_id=4466937856)

>>> rprof.results[0]
ResourceData(time=1454368444.078748, mem=74.100736, cpu=0.0)

>>> cprof.results[0]
CacheData(key=('tsqr-8d16e396b237bf7a731333130d310cb9_QR_st1', 7, 0),
          task=(qr, (_apply_random, 'random_sample', 1310656009, (1000, 1000), (), {})),
          metric=1,
          cache_time=1454368444.49662,
          free_time=1454368446.769452)

这些结果可以单独分析,或使用每个性能分析器上提供的 visualize 方法在 bokeh 图中查看

>>> prof.visualize()

要同时查看多个性能分析器,可以使用 dask.diagnostics.visualize() 函数。它接受一个性能分析器列表,并创建沿 x 轴对齐的垂直堆叠图。

>>> from dask.diagnostics import visualize
>>> visualize([prof, rprof, cprof])

查看上图,从上到下依次是:

  1. Profiler 对象的结果:这显示了每个任务的执行时间,以矩形表示,沿 y 轴按工作进程(在此例中是线程)组织。类似的任务按颜色分组,将鼠标悬停在每个任务上,可以看到每个块代表的键和任务。

  2. ResourceProfiler 对象的结果:这显示两条线,一条表示所有工作进程使用的总 CPU 百分比,另一条表示总内存使用量。

  3. CacheProfiler 对象的结果:这为每个任务组显示一条线,绘制缓存中当前 metric 的总和随时间的变化。在此例中,它是默认度量(计数),线表示缓存中每个对象在特定时间点的数量。注意,分组和着色与 Profiler 图相同,并且可以通过将鼠标悬停在线上找到每条线代表的任务。

从这些图中我们可以看到,初始任务(对每个块调用 numpy.random.randomnumpy.linalg.qr)是并发运行的,但 CPU 使用率仅略高于 100%。这是因为目前调用 numpy.linalg.qr 不会释放全局解释器锁(GIL),因此这些调用无法真正并行执行。接下来,有一个归约步骤,其中所有块被合并。这需要将第一步的所有结果保存在内存中,如缓存中结果数量增加和内存使用量增加所示。此任务结束后,缓存中的元素数量立即减少,表明它们仅在此步骤需要。最后,有一系列 dotsum 的交错调用。查看 CPU 图,显示这些调用既并发又并行运行,因为 CPU 百分比飙升至约 350%。

自定义回调

Callback([start, start_state, pretask, ...])

使用回调机制的基类

基于 dask.local.get_async 的调度器(目前包括 dask.getdask.threaded.getdask.multiprocessing.get)接受五种回调,允许检查调度器执行。

回调包括

1. start(dsk):在执行开始时运行,就在状态初始化之前。接收 Dask 图

2. start_state(dsk, state):在执行开始时运行,就在状态初始化之后。接收 Dask 图和调度器状态

3. pretask(key, dsk, state):每次新任务开始时运行。接收待运行任务的键、Dask 图和调度器状态

4. posttask(key, result, dsk, state, id):每次任务完成时运行。接收刚完成任务的键、结果、Dask 图、调度器状态以及运行该任务的工作进程 ID

5. finish(dsk, state, errored):在执行结束时运行,就在结果返回之前。接收 Dask 图、调度器状态以及一个布尔值,指示退出是否由于错误

自定义诊断工具可以通过将上述某些方法作为关键字实例化 Callback 类来创建,或通过继承 Callback 类来实现。这里我们创建一个类,该类在计算每个键时打印其名称

from dask.callbacks import Callback
class PrintKeys(Callback):
    def _pretask(self, key, dask, state):
        """Print the key of every task as it's started"""
        print("Computing: {0}!".format(repr(key)))

现在可以在计算期间将其用作 上下文管理器

>>> from operator import add, mul
>>> dsk = {'a': (add, 1, 2), 'b': (add, 3, 'a'), 'c': (mul, 'a', 'b')}

>>> with PrintKeys():
...     get(dsk, 'c')
Computing 'a'!
Computing 'b'!
Computing 'c'!

或者,可以将函数作为关键字参数传递给 Callback

>>> def printkeys(key, dask, state):
...    print("Computing: {0}!".format(repr(key)))

>>> with Callback(pretask=printkeys):
...     get(dsk, 'c')
Computing 'a'!
Computing 'b'!
Computing 'c'!

API

CacheProfiler([metric, metric_name])

Dask 在调度器缓存级别的性能分析器。

Callback([start, start_state, pretask, ...])

使用回调机制的基类

性能分析器()

Dask 执行任务级别的性能分析器。

ProgressBar([minimum, width, dt, out])

Dask 的进度条。

ResourceProfiler([dt])

用于资源使用的性能分析器。

visualize(profilers[, filename, show, save, ...])

在 bokeh 图中可视化性能分析结果。

dask.diagnostics.ProgressBar(minimum=0, width=40, dt=0.1, out=None)[source]

Dask 的进度条。

参数
minimumint, 可选

显示进度条之前的最小时间阈值(秒)。默认值为 0(始终显示)

widthint, 可选

进度条宽度

dtfloat, 可选

更新分辨率(秒),默认值为 0.1 秒

outfile object, 可选

进度条将写入的文件对象。它可以是 sys.stdoutsys.stderr 或任何其他能够写入 str 对象的文件对象。默认值为 sys.stdout

示例

下面我们创建一个进度条,其最小显示阈值为 1 秒。对于廉价计算,不会显示任何内容

>>> with ProgressBar(minimum=1.0):      
...     out = some_fast_computation.compute()

但对于昂贵计算,会显示完整的进度条

>>> with ProgressBar(minimum=1.0):      
...     out = some_slow_computation.compute()
[########################################] | 100% Completed | 10.4 s

上次计算的持续时间可作为属性访问

>>> pbar = ProgressBar()                
>>> with pbar:                          
...     out = some_computation.compute()
[########################################] | 100% Completed | 10.4 s
>>> pbar.last_duration                  
10.4

您也可以注册一个进度条,以便它显示所有计算的进度

>>> pbar = ProgressBar()                
>>> pbar.register()                     
>>> some_slow_computation.compute()     
[########################################] | 100% Completed | 10.4 s
dask.diagnostics.Profiler()[source]

Dask 执行任务级别的性能分析器。

记录每个任务的以下信息:
  1. 任务

  2. 自 epoch 以来的开始时间(秒)

  3. 自 epoch 以来的结束时间(秒)

  4. 工作进程 ID

示例

>>> from operator import add, mul
>>> from dask.threaded import get
>>> from dask.diagnostics import Profiler
>>> dsk = {'x': 1, 'y': (add, 'x', 10), 'z': (mul, 'y', 2)}
>>> with Profiler() as prof:
...     get(dsk, 'z')
22
>>> prof.results        
[TaskData(key='y', task=(add, 'x', 10), start_time=..., end_time=..., worker_id=...),
 TaskData(key='z', task=(mul, 'y', 2), start_time=..., end_time=..., worker_id=...)]

这些结果可以使用 visualize 方法在 bokeh 图中可视化。注意,这需要安装 bokeh。

>>> prof.visualize()    

您可以全局激活性能分析器

>>> prof.register()

如果您全局使用性能分析器,则需要手动清除旧结果。

>>> prof.clear()
>>> prof.unregister()
dask.diagnostics.ResourceProfiler(dt=1)[source]

用于资源使用的性能分析器。

记录每个时间步的以下信息:
  1. 自 epoch 以来时间(秒)

  2. 内存使用量(MB)

  3. CPU 使用率 (%)

示例

>>> from operator import add, mul
>>> from dask.threaded import get
>>> dsk = {'x': 1, 'y': (add, 'x', 10), 'z': (mul, 'y', 2)}
>>> with ResourceProfiler() as prof:
...     get(dsk, 'z')
22

这些结果可以使用 visualize 方法在 bokeh 图中可视化。注意,这需要安装 bokeh。

>>> prof.visualize() 

您可以全局激活性能分析器

>>> prof.register()

如果您全局使用性能分析器,则需要手动清除旧结果。

>>> prof.clear()

注意,当用作 上下文管理器 时,数据将在包含的块的整个持续时间内收集。相反,当全局注册时,数据仅在 dask 调度器处于活动状态时收集。

>>> prof.unregister()
dask.diagnostics.CacheProfiler(metric=None, metric_name=None)[source]

Dask 在调度器缓存级别的性能分析器。

记录每个任务的以下信息:
  1. 任务

  2. 大小度量

  3. 缓存进入时间(自 epoch 以来秒)

  4. 缓存退出时间(自 epoch 以来秒)

示例

>>> from operator import add, mul
>>> from dask.threaded import get
>>> from dask.diagnostics import CacheProfiler
>>> dsk = {'x': 1, 'y': (add, 'x', 10), 'z': (mul, 'y', 2)}
>>> with CacheProfiler() as prof:
...     get(dsk, 'z')
22
>>> prof.results    
[CacheData(key='y', task=(add, 'x', 10), metric=1, cache_time=..., free_time=...),
 CacheData(key='z', task=(mul, 'y', 2), metric=1, cache_time=..., free_time=...)]

默认是计数每个任务(对所有任务而言,metric 为 1)。可以通过 metric 关键字使用其他函数作为度量。例如,可以使用 cachey 中的 nbytes 函数来测量缓存中的字节数。

>>> from cachey import nbytes                   
>>> with CacheProfiler(metric=nbytes) as prof:  
...     get(dsk, 'z')
22

性能分析结果可以使用 visualize 方法在 bokeh 图中可视化。注意,这需要安装 bokeh。

>>> prof.visualize() 

您可以全局激活性能分析器

>>> prof.register()

如果您全局使用性能分析器,则需要手动清除旧结果。

>>> prof.clear()
>>> prof.unregister()
dask.diagnostics.Callback(start=None, start_state=None, pretask=None, posttask=None, finish=None)[source]

使用回调机制的基类

使用具有以下签名的函数创建回调

>>> def start(dsk):
...     pass
>>> def start_state(dsk, state):
...     pass
>>> def pretask(key, dsk, state):
...     pass
>>> def posttask(key, result, dsk, state, worker_id):
...     pass
>>> def finish(dsk, state, failed):
...     pass

然后您可以使用其中任意数量的函数构建一个回调对象

>>> cb = Callback(pretask=pretask, finish=finish)

并将其用作 compute/get 调用的 上下文管理器

>>> with cb:            
...     x.compute()

或使用 register 方法全局使用

>>> cb.register()
>>> cb.unregister()

或者,您可以使用自己的方法继承 Callback 类。

>>> class PrintKeys(Callback):
...     def _pretask(self, key, dask, state):
...         print("Computing: {0}!".format(repr(key)))
>>> with PrintKeys():   
...     x.compute()
dask.diagnostics.visualize(profilers, filename='profile.html', show=True, save=None, mode=None, **kwargs)[source]

在 bokeh 图中可视化性能分析结果。

如果传入多个性能分析器,图将垂直堆叠。

参数
profilers性能分析器或列表

性能分析器或性能分析器列表。

filenamestring, 可选

图输出文件的名称。

showboolean, 可选

如果为 True(默认值),则在浏览器中打开图。

saveboolean, 可选

如果为 True(在 notebook 之外时默认),则将图保存到磁盘。

modestr, 可选

传递给 bokeh.output_file() 的模式

**kwargs

其他关键字参数,传递给 bokeh.figure。这些将覆盖 visualize 设置的所有默认值。

返回
完成的 bokeh 图对象。