诊断(本地)
目录
诊断(本地)¶
并行代码的性能分析可能具有挑战性,但 dask.diagnostics
提供了功能来帮助使用 本地任务调度器 进行执行分析和检查。
本页介绍以下几个内置选项
进度条
性能分析器
资源性能分析器
缓存性能分析器
此外,本页还提供了关于如何构建您自己的自定义诊断工具的说明。
进度条¶
|
Dask 的进度条。 |
的 ProgressBar
类基于上面描述的调度器回调构建,用于在计算过程中在终端或 notebook 中显示进度条。这可以在长时间运行的图执行期间提供良好的反馈。它可以作为围绕 get
或 compute
调用的 上下文管理器 使用,以分析计算
>>> import dask.array as da
>>> from dask.diagnostics import ProgressBar
>>> a = da.random.default_rng().normal(size=(10000, 10000), chunks=(1000, 1000))
>>> res = a.dot(a.T).mean(axis=0)
>>> with ProgressBar():
... out = res.compute()
[########################################] | 100% Completed | 17.1 s
或使用 register
方法全局注册
>>> pbar = ProgressBar()
>>> pbar.register()
>>> out = res.compute()
[########################################] | 100% Completed | 17.1 s
要从全局回调中注销,请调用 unregister
方法
>>> pbar.unregister()
性能分析器¶
|
Dask 执行任务级别的性能分析器。 |
Dask 提供了一些用于分析执行的工具。与 ProgressBar
一样,它们都可以用作 上下文管理器 或全局注册。
的 Profiler
类用于分析 Dask 在任务级别的执行。在执行期间,它记录每个任务的以下信息:
键
任务
自 epoch 以来的开始时间(秒)
自 epoch 以来的结束时间(秒)
工作进程 ID
资源性能分析器¶
|
用于资源使用的性能分析器。 |
的 ResourceProfiler
类用于分析 Dask 在资源级别的执行。在执行期间,它记录每个时间步的以下信息:
自 epoch 以来时间(秒)
内存使用量(MB)
CPU 使用率 (%)
默认时间步长是 1 秒,但可以使用 dt
关键字手动设置
>>> from dask.diagnostics import ResourceProfiler
>>> rprof = ResourceProfiler(dt=0.5)
缓存性能分析器¶
|
Dask 在调度器缓存级别的性能分析器。 |
的 CacheProfiler
类用于分析 Dask 在调度器缓存级别的执行。在执行期间,它记录每个任务的以下信息:
键
任务
大小度量
缓存进入时间(自 epoch 以来秒)
缓存退出时间(自 epoch 以来秒)
此处的大小度量是对每个任务结果调用的函数的输出。默认度量是计数每个任务(对所有任务而言,metric
为 1)。也可以通过 metric
关键字使用其他函数作为度量。例如,可以使用 cachey
中的 nbytes
函数来测量调度器缓存中的字节数
>>> from dask.diagnostics import CacheProfiler
>>> from cachey import nbytes
>>> cprof = CacheProfiler(metric=nbytes)
示例¶
作为演示使用诊断工具的示例,我们将分析使用 Dask Array 完成的一些线性代数运算。我们将创建一个随机数组,进行 QR 分解,然后通过将 Q 和 R 分量相乘来重构原始数组。请注意,由于性能分析器(和所有诊断工具)只是 上下文管理器,因此可以在一个 `with` 块中使用多个性能分析器
>>> import dask.array as da
>>> from dask.diagnostics import Profiler, ResourceProfiler, CacheProfiler
>>> a = da.random.random(size=(10000, 1000), chunks=(1000, 1000))
>>> q, r = da.linalg.qr(a)
>>> a2 = q.dot(r)
>>> with Profiler() as prof, ResourceProfiler(dt=0.25) as rprof,
... CacheProfiler() as cprof:
... out = a2.compute()
每个性能分析器的结果都存储在其 results
属性中,作为 namedtuple
对象的列表
>>> prof.results[0]
TaskData(key=('tsqr-8d16e396b237bf7a731333130d310cb9_QR_st1', 5, 0),
task=(qr, (_apply_random, 'random_sample', 1060164455, (1000, 1000), (), {})),
start_time=1454368444.493292,
end_time=1454368444.902987,
worker_id=4466937856)
>>> rprof.results[0]
ResourceData(time=1454368444.078748, mem=74.100736, cpu=0.0)
>>> cprof.results[0]
CacheData(key=('tsqr-8d16e396b237bf7a731333130d310cb9_QR_st1', 7, 0),
task=(qr, (_apply_random, 'random_sample', 1310656009, (1000, 1000), (), {})),
metric=1,
cache_time=1454368444.49662,
free_time=1454368446.769452)
这些结果可以单独分析,或使用每个性能分析器上提供的 visualize
方法在 bokeh 图中查看
>>> prof.visualize()
要同时查看多个性能分析器,可以使用 dask.diagnostics.visualize()
函数。它接受一个性能分析器列表,并创建沿 x 轴对齐的垂直堆叠图。
>>> from dask.diagnostics import visualize
>>> visualize([prof, rprof, cprof])
查看上图,从上到下依次是:
的
Profiler
对象的结果:这显示了每个任务的执行时间,以矩形表示,沿 y 轴按工作进程(在此例中是线程)组织。类似的任务按颜色分组,将鼠标悬停在每个任务上,可以看到每个块代表的键和任务。的
ResourceProfiler
对象的结果:这显示两条线,一条表示所有工作进程使用的总 CPU 百分比,另一条表示总内存使用量。的
CacheProfiler
对象的结果:这为每个任务组显示一条线,绘制缓存中当前metric
的总和随时间的变化。在此例中,它是默认度量(计数),线表示缓存中每个对象在特定时间点的数量。注意,分组和着色与Profiler
图相同,并且可以通过将鼠标悬停在线上找到每条线代表的任务。
从这些图中我们可以看到,初始任务(对每个块调用 numpy.random.random
和 numpy.linalg.qr
)是并发运行的,但 CPU 使用率仅略高于 100%。这是因为目前调用 numpy.linalg.qr
不会释放全局解释器锁(GIL),因此这些调用无法真正并行执行。接下来,有一个归约步骤,其中所有块被合并。这需要将第一步的所有结果保存在内存中,如缓存中结果数量增加和内存使用量增加所示。此任务结束后,缓存中的元素数量立即减少,表明它们仅在此步骤需要。最后,有一系列 dot
和 sum
的交错调用。查看 CPU 图,显示这些调用既并发又并行运行,因为 CPU 百分比飙升至约 350%。
自定义回调¶
|
使用回调机制的基类 |
基于 dask.local.get_async
的调度器(目前包括 dask.get
、dask.threaded.get
和 dask.multiprocessing.get
)接受五种回调,允许检查调度器执行。
回调包括
1. start(dsk)
:在执行开始时运行,就在状态初始化之前。接收 Dask 图
2. start_state(dsk, state)
:在执行开始时运行,就在状态初始化之后。接收 Dask 图和调度器状态
3. pretask(key, dsk, state)
:每次新任务开始时运行。接收待运行任务的键、Dask 图和调度器状态
4. posttask(key, result, dsk, state, id)
:每次任务完成时运行。接收刚完成任务的键、结果、Dask 图、调度器状态以及运行该任务的工作进程 ID
5. finish(dsk, state, errored)
:在执行结束时运行,就在结果返回之前。接收 Dask 图、调度器状态以及一个布尔值,指示退出是否由于错误
自定义诊断工具可以通过将上述某些方法作为关键字实例化 Callback
类来创建,或通过继承 Callback
类来实现。这里我们创建一个类,该类在计算每个键时打印其名称
from dask.callbacks import Callback
class PrintKeys(Callback):
def _pretask(self, key, dask, state):
"""Print the key of every task as it's started"""
print("Computing: {0}!".format(repr(key)))
现在可以在计算期间将其用作 上下文管理器
>>> from operator import add, mul
>>> dsk = {'a': (add, 1, 2), 'b': (add, 3, 'a'), 'c': (mul, 'a', 'b')}
>>> with PrintKeys():
... get(dsk, 'c')
Computing 'a'!
Computing 'b'!
Computing 'c'!
或者,可以将函数作为关键字参数传递给 Callback
>>> def printkeys(key, dask, state):
... print("Computing: {0}!".format(repr(key)))
>>> with Callback(pretask=printkeys):
... get(dsk, 'c')
Computing 'a'!
Computing 'b'!
Computing 'c'!
API¶
|
Dask 在调度器缓存级别的性能分析器。 |
|
使用回调机制的基类 |
|
Dask 执行任务级别的性能分析器。 |
|
Dask 的进度条。 |
|
用于资源使用的性能分析器。 |
|
在 bokeh 图中可视化性能分析结果。 |
- dask.diagnostics.ProgressBar(minimum=0, width=40, dt=0.1, out=None)[source]¶
Dask 的进度条。
- 参数
- minimumint, 可选
显示进度条之前的最小时间阈值(秒)。默认值为 0(始终显示)
- widthint, 可选
进度条宽度
- dtfloat, 可选
更新分辨率(秒),默认值为 0.1 秒
- outfile object, 可选
进度条将写入的文件对象。它可以是
sys.stdout
、sys.stderr
或任何其他能够写入str
对象的文件对象。默认值为sys.stdout
示例
下面我们创建一个进度条,其最小显示阈值为 1 秒。对于廉价计算,不会显示任何内容
>>> with ProgressBar(minimum=1.0): ... out = some_fast_computation.compute()
但对于昂贵计算,会显示完整的进度条
>>> with ProgressBar(minimum=1.0): ... out = some_slow_computation.compute() [########################################] | 100% Completed | 10.4 s
上次计算的持续时间可作为属性访问
>>> pbar = ProgressBar() >>> with pbar: ... out = some_computation.compute() [########################################] | 100% Completed | 10.4 s >>> pbar.last_duration 10.4
您也可以注册一个进度条,以便它显示所有计算的进度
>>> pbar = ProgressBar() >>> pbar.register() >>> some_slow_computation.compute() [########################################] | 100% Completed | 10.4 s
- dask.diagnostics.Profiler()[source]¶
Dask 执行任务级别的性能分析器。
- 记录每个任务的以下信息:
键
任务
自 epoch 以来的开始时间(秒)
自 epoch 以来的结束时间(秒)
工作进程 ID
示例
>>> from operator import add, mul >>> from dask.threaded import get >>> from dask.diagnostics import Profiler >>> dsk = {'x': 1, 'y': (add, 'x', 10), 'z': (mul, 'y', 2)} >>> with Profiler() as prof: ... get(dsk, 'z') 22
>>> prof.results [TaskData(key='y', task=(add, 'x', 10), start_time=..., end_time=..., worker_id=...), TaskData(key='z', task=(mul, 'y', 2), start_time=..., end_time=..., worker_id=...)]
这些结果可以使用
visualize
方法在 bokeh 图中可视化。注意,这需要安装 bokeh。>>> prof.visualize()
您可以全局激活性能分析器
>>> prof.register()
如果您全局使用性能分析器,则需要手动清除旧结果。
>>> prof.clear() >>> prof.unregister()
- dask.diagnostics.ResourceProfiler(dt=1)[source]¶
用于资源使用的性能分析器。
- 记录每个时间步的以下信息:
自 epoch 以来时间(秒)
内存使用量(MB)
CPU 使用率 (%)
示例
>>> from operator import add, mul >>> from dask.threaded import get >>> dsk = {'x': 1, 'y': (add, 'x', 10), 'z': (mul, 'y', 2)} >>> with ResourceProfiler() as prof: ... get(dsk, 'z') 22
这些结果可以使用
visualize
方法在 bokeh 图中可视化。注意,这需要安装 bokeh。>>> prof.visualize()
您可以全局激活性能分析器
>>> prof.register()
如果您全局使用性能分析器,则需要手动清除旧结果。
>>> prof.clear()
注意,当用作 上下文管理器 时,数据将在包含的块的整个持续时间内收集。相反,当全局注册时,数据仅在 dask 调度器处于活动状态时收集。
>>> prof.unregister()
- dask.diagnostics.CacheProfiler(metric=None, metric_name=None)[source]¶
Dask 在调度器缓存级别的性能分析器。
- 记录每个任务的以下信息:
键
任务
大小度量
缓存进入时间(自 epoch 以来秒)
缓存退出时间(自 epoch 以来秒)
示例
>>> from operator import add, mul >>> from dask.threaded import get >>> from dask.diagnostics import CacheProfiler >>> dsk = {'x': 1, 'y': (add, 'x', 10), 'z': (mul, 'y', 2)} >>> with CacheProfiler() as prof: ... get(dsk, 'z') 22
>>> prof.results [CacheData(key='y', task=(add, 'x', 10), metric=1, cache_time=..., free_time=...), CacheData(key='z', task=(mul, 'y', 2), metric=1, cache_time=..., free_time=...)]
默认是计数每个任务(对所有任务而言,
metric
为 1)。可以通过metric
关键字使用其他函数作为度量。例如,可以使用cachey
中的nbytes
函数来测量缓存中的字节数。>>> from cachey import nbytes >>> with CacheProfiler(metric=nbytes) as prof: ... get(dsk, 'z') 22
性能分析结果可以使用
visualize
方法在 bokeh 图中可视化。注意,这需要安装 bokeh。>>> prof.visualize()
您可以全局激活性能分析器
>>> prof.register()
如果您全局使用性能分析器,则需要手动清除旧结果。
>>> prof.clear() >>> prof.unregister()
- dask.diagnostics.Callback(start=None, start_state=None, pretask=None, posttask=None, finish=None)[source]¶
使用回调机制的基类
使用具有以下签名的函数创建回调
>>> def start(dsk): ... pass >>> def start_state(dsk, state): ... pass >>> def pretask(key, dsk, state): ... pass >>> def posttask(key, result, dsk, state, worker_id): ... pass >>> def finish(dsk, state, failed): ... pass
然后您可以使用其中任意数量的函数构建一个回调对象
>>> cb = Callback(pretask=pretask, finish=finish)
并将其用作 compute/get 调用的 上下文管理器
>>> with cb: ... x.compute()
或使用
register
方法全局使用>>> cb.register() >>> cb.unregister()
或者,您可以使用自己的方法继承
Callback
类。>>> class PrintKeys(Callback): ... def _pretask(self, key, dask, state): ... print("Computing: {0}!".format(repr(key)))
>>> with PrintKeys(): ... x.compute()
- dask.diagnostics.visualize(profilers, filename='profile.html', show=True, save=None, mode=None, **kwargs)[source]¶
在 bokeh 图中可视化性能分析结果。
如果传入多个性能分析器,图将垂直堆叠。
- 参数
- profilers性能分析器或列表
性能分析器或性能分析器列表。
- filenamestring, 可选
图输出文件的名称。
- showboolean, 可选
如果为 True(默认值),则在浏览器中打开图。
- saveboolean, 可选
如果为 True(在 notebook 之外时默认),则将图保存到磁盘。
- modestr, 可选
传递给 bokeh.output_file() 的模式
- **kwargs
其他关键字参数,传递给 bokeh.figure。这些将覆盖 visualize 设置的所有默认值。
- 返回
- 完成的 bokeh 图对象。