诊断 (本地)

对并行代码进行性能分析可能具有挑战性,但 dask.diagnostics 提供了功能来帮助使用 本地任务调度器 进行性能分析和检查执行。

本页介绍以下内置选项

  1. 进度条

  2. 性能分析器

  3. 资源性能分析器

  4. 缓存性能分析器

此外,本页还提供了如何构建自己的自定义诊断工具的说明。

进度条

ProgressBar([minimum, width, dt, out])

Dask 的进度条。

The ProgressBar 类基于上述调度器回调构建,用于在计算过程中在终端或笔记本中显示进度条。这可以在长时间运行的图执行期间提供良好的反馈。它可以用作围绕 getcompute 调用的上下文管理器来对计算进行性能分析

>>> import dask.array as da
>>> from dask.diagnostics import ProgressBar
>>> a = da.random.default_rng().normal(size=(10000, 10000), chunks=(1000, 1000))
>>> res = a.dot(a.T).mean(axis=0)

>>> with ProgressBar():
...     out = res.compute()
[########################################] | 100% Completed | 17.1 s

或者使用 register 方法全局注册

>>> pbar = ProgressBar()
>>> pbar.register()
>>> out = res.compute()
[########################################] | 100% Completed | 17.1 s

要从全局回调中注销,请调用 unregister 方法

>>> pbar.unregister()

性能分析器

性能分析器()

在任务级别对 Dask 执行进行性能分析的工具。

Dask 提供了一些用于性能分析执行的工具。与 ProgressBar 一样,它们都可以用作上下文管理器或全局注册。

The Profiler 类用于在任务级别对 Dask 的执行进行性能分析。在执行期间,它记录每个任务的以下信息

  1. Key

  2. Task

  3. 开始时间(自 epoch 以来的秒数)

  4. 结束时间(自 epoch 以来的秒数)

  5. Worker ID

资源性能分析器

ResourceProfiler([dt])

资源使用性能分析器。

The ResourceProfiler 类用于在资源级别对 Dask 的执行进行性能分析。在执行期间,它记录每个时间步长的以下信息

  1. 时间(自 epoch 以来的秒数)

  2. 内存使用量(MB)

  3. CPU 使用百分比 (%)

默认时间步长为 1 秒,但可以使用 dt 关键字手动设置

>>> from dask.diagnostics import ResourceProfiler
>>> rprof = ResourceProfiler(dt=0.5)

缓存性能分析器

CacheProfiler([metric, metric_name])

在调度器缓存级别对 Dask 执行进行性能分析的工具。

The CacheProfiler 类用于在调度器缓存级别对 Dask 的执行进行性能分析。在执行期间,它记录每个任务的以下信息

  1. Key

  2. Task

  3. 大小指标

  4. 缓存进入时间(自 epoch 以来的秒数)

  5. 缓存退出时间(自 epoch 以来的秒数)

这里的大小指标是对每个任务结果调用的函数的输出。默认指标是计算每个任务(对所有任务来说,metric 为 1)。也可以通过 metric 关键字使用其他函数作为指标。例如,cachey 中找到的 nbytes 函数可用于测量调度器缓存中的字节数

>>> from dask.diagnostics import CacheProfiler
>>> from cachey import nbytes
>>> cprof = CacheProfiler(metric=nbytes)

示例

为了演示如何使用这些诊断工具,我们将对使用 Dask Array 完成的一些线性代数运算进行性能分析。我们将创建一个随机数组,对其进行 QR 分解,然后通过将 Q 和 R 分量相乘来重建初始数组。请注意,由于性能分析器(和所有诊断工具)都只是上下文管理器,因此可以在一个 with 块中使用多个性能分析器

>>> import dask.array as da
>>> from dask.diagnostics import Profiler, ResourceProfiler, CacheProfiler
>>> a = da.random.random(size=(10000, 1000), chunks=(1000, 1000))
>>> q, r = da.linalg.qr(a)
>>> a2 = q.dot(r)

>>> with Profiler() as prof, ResourceProfiler(dt=0.25) as rprof,
...         CacheProfiler() as cprof:
...     out = a2.compute()

每个性能分析器的结果都存储在其 results 属性中,เป็น列表形式,其中包含 namedtuple 对象

>>> prof.results[0]
TaskData(key=('tsqr-8d16e396b237bf7a731333130d310cb9_QR_st1', 5, 0),
         task=(qr, (_apply_random, 'random_sample', 1060164455, (1000, 1000), (), {})),
         start_time=1454368444.493292,
         end_time=1454368444.902987,
         worker_id=4466937856)

>>> rprof.results[0]
ResourceData(time=1454368444.078748, mem=74.100736, cpu=0.0)

>>> cprof.results[0]
CacheData(key=('tsqr-8d16e396b237bf7a731333130d310cb9_QR_st1', 7, 0),
          task=(qr, (_apply_random, 'random_sample', 1310656009, (1000, 1000), (), {})),
          metric=1,
          cache_time=1454368444.49662,
          free_time=1454368446.769452)

这些结果可以单独分析,或者使用每个性能分析器提供的 visualize 方法在 bokeh 图中查看

>>> prof.visualize()

要同时查看多个性能分析器,可以使用 dask.diagnostics.visualize() 函数。该函数接受一个性能分析器列表,并创建一个沿 x 轴对齐的垂直堆叠图

>>> from dask.diagnostics import visualize
>>> visualize([prof, rprof, cprof])

查看上图,从上到下

  1. The results from the Profiler object: This shows the execution time for each task as a rectangle, organized along the y-axis by worker (in this case threads). Similar tasks are grouped by color and, by hovering over each task, one can see the key and task that each block represents. -> Profiler 对象的结果:这显示了每个任务的执行时间,以矩形表示,沿 y 轴按 worker(本例中是线程)组织。相似的任务按颜色分组,通过悬停在每个任务上,可以看到每个块代表的 key 和 task。

  2. The results from the ResourceProfiler object: This shows two lines, one for total CPU percentage used by all the workers, and one for total memory usage. -> ResourceProfiler 对象的结果:这显示了两条线,一条代表所有 workers 使用的总 CPU 百分比,另一条代表总内存使用量。

  3. The results from the CacheProfiler object: This shows a line for each task group, plotting the sum of the current metric in the cache against time. In this case it’s the default metric (count) and the lines represent the number of each object in the cache at time. Note that the grouping and coloring is the same as for the Profiler plot, and that the task represented by each line can be found by hovering over the line. -> CacheProfiler 对象的结果:这显示了每个任务组的一条线,绘制了缓存中当前 metric 的总和随时间的变化。在本例中,它是默认指标(计数),线条代表缓存中每个对象在某个时间点的数量。注意,分组和颜色与 Profiler 图相同,并且可以通过悬停在线上找到每条线代表的任务。

From these plots we can see that the initial tasks (calls to numpy.random.random and numpy.linalg.qr for each chunk) are run concurrently, but only use slightly more than 100% CPU. This is because the call to numpy.linalg.qr currently doesn’t release the Global Interpreter Lock (GIL), so those calls can’t truly be done in parallel. Next, there’s a reduction step where all the blocks are combined. This requires all the results from the first step to be held in memory, as shown by the increased number of results in the cache, and increase in memory usage. Immediately after this task ends, the number of elements in the cache decreases, showing that they were only needed for this step. Finally, there’s an interleaved set of calls to dot and sum. Looking at the CPU plot, it shows that these run both concurrently and in parallel, as the CPU percentage spikes up to around 350%. -> 从这些图中我们可以看到,初始任务(对每个块调用 numpy.random.randomnumpy.linalg.qr)并发运行,但 CPU 使用率仅略高于 100%。这是因为对 numpy.linalg.qr 的调用目前没有释放全局解释器锁 (GIL),因此这些调用无法真正并行执行。接下来,有一个归约步骤,所有块被组合在一起。这需要将第一步的所有结果保留在内存中,如缓存中结果数量增加和内存使用量增加所示。此任务结束后,缓存中的元素数量立即减少,表明它们仅在此步骤中需要。最后,交错执行了一系列对 dotsum 的调用。查看 CPU 图,它显示这些调用既并发又并行运行,因为 CPU 百分比峰值达到约 350%。

自定义回调

Callback([start, start_state, pretask, ...])

使用回调机制的基类

基于 dask.local.get_async 的调度器(目前包括 dask.getdask.threaded.getdask.multiprocessing.get)接受五种回调,允许检查调度器执行。

回调函数包括

1. start(dsk): 在执行开始时,状态初始化之前运行。接收 Dask 图

2. start_state(dsk, state): 在执行开始时,状态初始化之后运行。接收 Dask 图和调度器状态

3. pretask(key, dsk, state): 每次新任务启动时运行。接收要运行的任务的 key、Dask 图和调度器状态

4. posttask(key, result, dsk, state, id): 每次任务完成时运行。接收刚刚完成的任务的 key、结果、Dask 图、调度器状态以及运行该任务的 worker ID

5. finish(dsk, state, errored): 在执行结束时,结果返回之前运行。接收 Dask 图、调度器状态以及一个布尔值,指示退出是否由于错误

可以通过将上述部分方法作为关键字实例化 Callback 类,或者通过继承 Callback 类来创建自定义诊断工具。这里我们创建一个类,该类在计算每个 key 时打印其名称

from dask.callbacks import Callback
class PrintKeys(Callback):
    def _pretask(self, key, dask, state):
        """Print the key of every task as it's started"""
        print("Computing: {0}!".format(repr(key)))

这现在可以在计算期间用作上下文管理器

>>> from operator import add, mul
>>> dsk = {'a': (add, 1, 2), 'b': (add, 3, 'a'), 'c': (mul, 'a', 'b')}

>>> with PrintKeys():
...     get(dsk, 'c')
Computing 'a'!
Computing 'b'!
Computing 'c'!

另外,也可以将函数作为关键字参数传递给 Callback

>>> def printkeys(key, dask, state):
...    print("Computing: {0}!".format(repr(key)))

>>> with Callback(pretask=printkeys):
...     get(dsk, 'c')
Computing 'a'!
Computing 'b'!
Computing 'c'!

API

CacheProfiler([metric, metric_name])

在调度器缓存级别对 Dask 执行进行性能分析的工具。

Callback([start, start_state, pretask, ...])

使用回调机制的基类

性能分析器()

在任务级别对 Dask 执行进行性能分析的工具。

ProgressBar([minimum, width, dt, out])

Dask 的进度条。

ResourceProfiler([dt])

资源使用性能分析器。

visualize(profilers[, filename, show, save, ...])

在 bokeh 图中可视化性能分析结果。

dask.diagnostics.ProgressBar(minimum=0, width=40, dt=0.1, out=None)[source]

Dask 的进度条。

参数
minimum整数,可选

在显示进度条之前的最小时间阈值(秒)。默认值为 0(始终显示)

width整数,可选

进度条的宽度

dt浮点数,可选

更新分辨率(秒),默认为 0.1 秒

out文件对象,可选

写入进度条的文件对象。可以是 sys.stdout, sys.stderr 或任何其他能够写入 str 对象的 文件对象。默认值为 sys.stdout

示例

下面我们创建一个进度条,设置了在显示之前有 1 秒的最小阈值。对于廉价计算,不显示任何内容

>>> with ProgressBar(minimum=1.0):      
...     out = some_fast_computation.compute()

但对于昂贵的计算,会显示一个完整的进度条

>>> with ProgressBar(minimum=1.0):      
...     out = some_slow_computation.compute()
[########################################] | 100% Completed | 10.4 s

上次计算的持续时间可作为属性获取

>>> pbar = ProgressBar()                
>>> with pbar:                          
...     out = some_computation.compute()
[########################################] | 100% Completed | 10.4 s
>>> pbar.last_duration                  
10.4

您也可以注册一个进度条,使其显示所有计算的进度

>>> pbar = ProgressBar()                
>>> pbar.register()                     
>>> some_slow_computation.compute()     
[########################################] | 100% Completed | 10.4 s
dask.diagnostics.Profiler()[source]

在任务级别对 Dask 执行进行性能分析的工具。

记录每个任务的以下信息
  1. Key

  2. Task

  3. 开始时间(自 epoch 以来的秒数)

  4. 结束时间(自 epoch 以来的秒数)

  5. Worker ID

示例

>>> from operator import add, mul
>>> from dask.threaded import get
>>> from dask.diagnostics import Profiler
>>> dsk = {'x': 1, 'y': (add, 'x', 10), 'z': (mul, 'y', 2)}
>>> with Profiler() as prof:
...     get(dsk, 'z')
22
>>> prof.results        
[TaskData(key='y', task=(add, 'x', 10), start_time=..., end_time=..., worker_id=...),
 TaskData(key='z', task=(mul, 'y', 2), start_time=..., end_time=..., worker_id=...)]

这些结果可以使用 visualize 方法在 bokeh 图中可视化。注意,这需要安装 bokeh。

>>> prof.visualize()    

您可以全局激活性能分析器

>>> prof.register()

如果您全局使用性能分析器,您需要手动清除旧结果。

>>> prof.clear()
>>> prof.unregister()
dask.diagnostics.ResourceProfiler(dt=1)[source]

资源使用性能分析器。

记录每个时间步长的以下信息
  1. 时间(自 epoch 以来的秒数)

  2. 内存使用量(MB)

  3. CPU 使用百分比 (%)

示例

>>> from operator import add, mul
>>> from dask.threaded import get
>>> dsk = {'x': 1, 'y': (add, 'x', 10), 'z': (mul, 'y', 2)}
>>> with ResourceProfiler() as prof:
...     get(dsk, 'z')
22

这些结果可以使用 visualize 方法在 bokeh 图中可视化。注意,这需要安装 bokeh。

>>> prof.visualize() 

您可以全局激活性能分析器

>>> prof.register()

如果您全局使用性能分析器,您需要手动清除旧结果。

>>> prof.clear()

请注意,当用作上下文管理器时,数据将在包含的块的整个持续时间内收集。相比之下,当全局注册时,数据仅在 Dask 调度器处于活动状态时收集。

>>> prof.unregister()
dask.diagnostics.CacheProfiler(metric=None, metric_name=None)[source]

在调度器缓存级别对 Dask 执行进行性能分析的工具。

记录每个任务的以下信息
  1. Key

  2. Task

  3. 大小指标

  4. 缓存进入时间(自 epoch 以来的秒数)

  5. 缓存退出时间(自 epoch 以来的秒数)

示例

>>> from operator import add, mul
>>> from dask.threaded import get
>>> from dask.diagnostics import CacheProfiler
>>> dsk = {'x': 1, 'y': (add, 'x', 10), 'z': (mul, 'y', 2)}
>>> with CacheProfiler() as prof:
...     get(dsk, 'z')
22
>>> prof.results    
[CacheData(key='y', task=(add, 'x', 10), metric=1, cache_time=..., free_time=...),
 CacheData(key='z', task=(mul, 'y', 2), metric=1, cache_time=..., free_time=...)]

默认是计算每个任务(对所有任务来说,metric 为 1)。也可以通过 metric 关键字使用其他函数作为指标。例如,cachey 中找到的 nbytes 函数可用于测量缓存中的字节数。

>>> from cachey import nbytes                   
>>> with CacheProfiler(metric=nbytes) as prof:  
...     get(dsk, 'z')
22

性能分析结果可以使用 visualize 方法在 bokeh 图中可视化。注意,这需要安装 bokeh。

>>> prof.visualize() 

您可以全局激活性能分析器

>>> prof.register()

如果您全局使用性能分析器,您需要手动清除旧结果。

>>> prof.clear()
>>> prof.unregister()
dask.diagnostics.Callback(start=None, start_state=None, pretask=None, posttask=None, finish=None)[source]

使用回调机制的基类

使用具有以下签名的函数创建回调

>>> def start(dsk):
...     pass
>>> def start_state(dsk, state):
...     pass
>>> def pretask(key, dsk, state):
...     pass
>>> def posttask(key, result, dsk, state, worker_id):
...     pass
>>> def finish(dsk, state, failed):
...     pass

然后您可以构建一个包含任意数量这些函数的回调对象

>>> cb = Callback(pretask=pretask, finish=finish)

并将其用作 compute/get 调用的上下文管理器

>>> with cb:            
...     x.compute()

或者使用 register 方法全局使用

>>> cb.register()
>>> cb.unregister()

另外,也可以通过使用自己的方法继承 Callback 类。

>>> class PrintKeys(Callback):
...     def _pretask(self, key, dask, state):
...         print("Computing: {0}!".format(repr(key)))
>>> with PrintKeys():   
...     x.compute()
dask.diagnostics.visualize(profilers, filename='profile.html', show=True, save=None, mode=None, **kwargs)[source]

在 bokeh 图中可视化性能分析结果。

如果传入多个性能分析器,则图将垂直堆叠。

参数
profilers性能分析器或列表

性能分析器或性能分析器列表。

filename字符串,可选

图输出文件的名称。

show布尔值,可选

如果为 True(默认值),将在浏览器中打开图。

save布尔值,可选

如果为 True(不在笔记本中时的默认值),图将保存到磁盘。

mode字符串,可选

传递给 bokeh.output_file() 的模式

**kwargs

其他关键字参数,传递给 bokeh.figure。这些将覆盖 visualize 设置的所有默认值。

返回值
完成的 bokeh 图对象。