调试

调试并行程序很困难。当异常发生在远程机器、不同进程或线程中时,日志记录和使用 pdb 与追踪信息交互等常规调试工具通常会停止工作。

Dask 提供了多种机制来简化此过程。根据您的情况,其中一些方法可能比其他方法更适用。

这些方法按照从轻量级或简单到更复杂解决方案的顺序排列。

打印

调试最基本的方法之一就是简单地打印值并检查它们。然而,当在 Dask 中使用 Python 的内置函数 print() 时,这些打印通常发生在远程机器上,而不是用户的 Python 会话中,这通常不是开发人员在调试时想要获得的体验。

正因为如此,Dask 提供了一个函数 dask.distributed.print,它与 Python 的内置函数 print() 功能相似,但也会将打印输出转发到客户端 Python 会话。这使得分布式调试感觉更像是本地调试。

异常

当计算中的任务失败时,理解问题所在的标准方法是查看异常和追踪信息。人们通常使用 pdb 模块、IPython 的 %debug%pdb 魔法命令,或者仅仅通过查看追踪信息并调查代码中发生异常的位置来做到这一点。

通常,当计算在单独的线程或不同的机器中执行时,这些方法就会失效。为了解决这个问题,Dask 提供了一些机制来重新创建正常的 Python 调试体验。

检查异常和追踪信息

默认情况下,Dask 已经复制了异常和追踪信息(无论它们发生在哪里),并在本地重新引发该异常。如果您的任务在远程失败并出现 ZeroDivisionError,那么您将在交互式会话中收到一个 ZeroDivisionError。同样,您将看到此错误发生的完整追踪信息,这就像在普通 Python 中一样,可以帮助您识别代码中的问题位置。

然而,您无法使用 pdb 模块或 %debug IPython 魔法命令配合这些追踪信息来查看失败期间变量的值。您只能进行可视化检查。此外,追踪信息的顶部可能充满 Dask 特定的函数,这些函数与您的问题无关,因此您可以安全地忽略它们。

单机调度器和分布式调度器都支持这样做。

使用单线程调度器

Dask 提供了一个简单的单线程调度器。它不提供任何并行性能改进,但会忠实地在您的本地线程中运行 Dask 计算,允许您使用 pdb%debug IPython 魔法命令、cProfile 模块等性能分析工具以及 snakeviz 等常规工具。这使您能够在 Dask 计算中使用所有常规 Python 调试技巧,前提是您不需要并行性。

例如,在计算调用中设置 scheduler='single-threaded' 即可使用单线程调度器

>>> x.compute(scheduler='single-threaded')

有关配置调度器的更多方法,请参阅调度器配置文档

这仅适用于单机调度器。除非您熟悉使用 Tornado API(请查看测试基础设施文档,其中描述了如何实现这一点),否则它不适用于 dask.distributed。此外,由于这在单机上运行,因此它假设您的计算可以在单机上运行而不会超出内存限制。如果可能,明智的做法是在问题的较小版本上使用这种方法。

在本地重新运行失败的任务

如果远程任务失败,我们可以收集函数及其所有输入,将它们带到本地线程,然后重新运行该函数,希望在本地触发相同的异常,以便使用常规调试工具。

使用单机调度器时,使用关键字 rerun_exceptions_locally=True

>>> x.compute(rerun_exceptions_locally=True)

在分布式调度器上,对任何包含 Futures 的对象使用 recreate_error_locally 方法

>>> x.compute()
ZeroDivisionError(...)

>>> %pdb
>>> future = client.compute(x)
>>> client.recreate_error_locally(future)

手动移除失败的 Futures

有时您的计算只有一部分失败,例如,如果 CSV 数据集中的某些行存在问题。使用分布式调度器运行时,如果您切换到处理 Futures,则可以移除产生不良结果的数据块

>>> import dask.dataframe as dd
>>> df = ...           # create dataframe
>>> df = df.persist()  # start computing on the cluster

>>> from distributed.client import futures_of
>>> futures = futures_of(df)  # get futures behind dataframe
>>> futures
[<Future: status: finished, type: pd.DataFrame, key: load-1>
 <Future: status: finished, type: pd.DataFrame, key: load-2>
 <Future: status: error, key: load-3>
 <Future: status: pending, key: load-4>
 <Future: status: error, key: load-5>]

>>> # wait until computation is done
>>> while any(f.status == 'pending' for f in futures):
...     sleep(0.1)

>>> # pick out only the successful futures and reconstruct the dataframe
>>> good_futures = [f for f in futures if f.status == 'finished']
>>> df = dd.from_delayed(good_futures, meta=df._meta)

这有点像一种变通方法,但在首次探索混乱数据时通常很实用。如果您正在使用 concurrent.futures API (map, submit, gather),那么这种方法会更自然。

检查调度状态

并非所有错误都表现为异常。例如,在分布式系统中,worker 可能会意外死亡,您的计算可能会由于 worker 间通信或调度器开销而异常缓慢,或者存在其他一些问题。获取关于正在发生什么的反馈可以帮助识别故障和一般的性能瓶颈。

对于单机调度器,请参阅本地诊断文档。本节的其余部分将假设您正在使用分布式调度器,这些问题在该环境中更常见。

Web 诊断

首先,分布式调度器具有许多诊断工具,显示数十个记录的指标,如 CPU、内存、网络和磁盘使用情况,以前任务的历史记录,任务分配给 worker 的情况,worker 内存压力,工作窃取,打开文件句柄限制等。通过检查这些页面可以正确诊断出许多问题。默认情况下,这些信息可在 http://scheduler:8787/ 访问,其中 scheduler 应替换为调度器的地址。有关更多信息,请参阅诊断性能文档

日志

调度器、worker 和 client 都使用Python 标准日志模块发出日志。默认情况下,这些日志输出到标准错误。当 Dask 由集群作业调度器(如 SGE/SLURM/YARN/Mesos/Marathon/Kubernetes/或其他)启动时,该系统将跟踪这些日志并提供界面帮助您访问它们。如果您自行启动 Dask,除非您将标准错误重定向到文件,否则日志可能会直接打印到屏幕上。

您可以在配置中控制日志的详细程度,例如,~/.config/dask/*.yaml 文件。默认配置当前如下所示

logging:
  distributed: info
  distributed.client: warning
  bokeh: error

distributed.clientdistributed.schedulerdistributed.nannydistributed.worker 等特定组件的日志都可以独立配置。因此,例如,您可以添加一行类似 distributed.worker: debug 的配置,以便从 worker 获取非常详细的输出。

此外,您可以显式地为日志记录器分配处理程序。以下示例为调度器和 worker 分配了文件(“output.log”)和控制台输出。有关此处特定术语的含义,请参阅python 日志记录文档。

logging:
  version: 1
  handlers:
    file:
      class: logging.handlers.RotatingFileHandler
      filename: output.log
      level: INFO
    console:
      class: logging.StreamHandler
      level: INFO
  loggers:
    distributed.worker:
      level: INFO
      handlers:
        - file
        - console
    distributed.scheduler:
      level: INFO
      handlers:
        - file
        - console

LocalCluster

如果您在单机上使用分布式调度器,您可以通过命令行界面手动设置 worker,或者您可以使用 LocalCluster,这是您直接调用 Client() 时运行的内容

>>> from dask.distributed import Client, LocalCluster
>>> client = Client()  # This is actually the following two commands

>>> cluster = LocalCluster()
>>> client = Client(cluster.scheduler.address)

LocalCluster 非常有用,因为调度器和 worker 与您在同一个进程中,因此您可以在它们运行时轻松检查它们的状态(它们运行在单独的线程中)

>>> cluster.scheduler.processing
{'worker-one:59858': {'inc-123', 'add-443'},
 'worker-two:48248': {'inc-456'}}

如果您运行 worker 时不使用 nanny 进程,您也可以这样做

>>> cluster = LocalCluster(nanny=False)
>>> client = Client(cluster)

如果您想使用 Dask 分布式 API 并且仍然想直接调查 worker 内部的情况,这将非常有用。信息不像 Web 诊断中那样为您提炼,但您拥有完整的低级别访问权限。