调试
目录
调试¶
调试并行程序很困难。当异常发生在远程机器、不同进程或线程中时,日志记录、使用 pdb
与回溯交互等常规调试工具将无法正常工作。
Dask 提供了多种机制来简化此过程。根据您的情况,其中一些方法可能比其他方法更适用。
这些方法按从轻量级或简单解决方案到更复杂解决方案的顺序列出。
打印¶
最基本的调试方法之一是简单地打印值并进行检查。然而,在使用 Python 内置的 print()
函数配合 Dask 时,这些打印通常发生在远程机器上,而不是用户的 Python 会话中,这通常不是开发人员在调试时期望的体验。
因此,Dask 提供了一个 dask.distributed.print
函数,它的行为与 Python 内置的 print()
函数类似,但也会将打印输出转发到客户端的 Python 会话中。这使得分布式调试感觉更像本地调试。
异常¶
当计算中的任务失败时,了解问题所在的标准方法是查看异常和回溯。人们通常使用 pdb
模块、IPython 的 %debug
或 %pdb
magic 命令来做到这一点,或者仅仅通过查看回溯来调查异常发生在代码的哪个位置。
通常,当计算在单独的线程或不同的机器上执行时,这些方法就会失效。为了解决这个问题,Dask 提供了一些机制来重现正常的 Python 调试体验。
检查异常和回溯¶
默认情况下,Dask 会复制异常和回溯,无论它们发生在哪里,并在本地重新引发该异常。如果您的任务在远程因 ZeroDivisionError
失败,那么您将在交互式会话中收到一个 ZeroDivisionError
。同样,您将看到此错误发生位置的完整回溯,就像在正常的 Python 中一样,这可以帮助您识别代码中的问题所在。
然而,您无法使用 pdb
模块或 %debug
IPython magic 命令配合这些回溯来查看失败期间变量的值。您只能通过视觉方式检查。此外,回溯的顶部可能填充了 Dask 特定的且与您的问题无关的函数,因此您可以安全地忽略这些。
单机调度器和分布式调度器都具有此功能。
使用单线程调度器¶
Dask 提供了一个简单的单线程调度器。它不提供任何并行性能改进,但在本地线程中忠实地运行您的 Dask 计算,允许您使用 pdb
、%debug
IPython magic 命令、cProfile
模块等性能分析工具以及 snakeviz 等常规工具。这使您可以在 Dask 计算中使用所有常用的 Python 调试技巧,只要您不需要并行性。
例如,可以通过在计算调用中设置 scheduler='single-threaded'
来使用单线程调度器
>>> x.compute(scheduler='single-threaded')
有关配置调度器的更多方法,请参阅调度器配置文档。
这仅适用于单机调度器。它不适用于 dask.distributed
,除非您熟悉使用 Tornado API(请参阅测试基础设施文档,其中实现了这一点)。此外,由于它在单机上运行,它假设您的计算可以在不超过内存限制的情况下在单机上运行。如果可能,明智的做法是在问题的较小版本上使用此方法。
在本地重新运行失败的任务¶
如果远程任务失败,我们可以收集函数和所有输入,将它们带到本地线程,然后重新运行该函数,希望在本地触发相同的异常,以便使用常规调试工具。
对于单机调度器,使用 rerun_exceptions_locally=True
关键字
>>> x.compute(rerun_exceptions_locally=True)
在分布式调度器上,对任何包含 Futures
的对象使用 recreate_error_locally
方法
>>> x.compute()
ZeroDivisionError(...)
>>> %pdb
>>> future = client.compute(x)
>>> client.recreate_error_locally(future)
手动移除失败的 Futures¶
有时,您的计算只有部分失败,例如,如果 CSV 数据集中的某些行存在某种故障。在使用分布式调度器运行时,如果您转而处理 Futures,则可以移除产生错误结果的数据块
>>> import dask.dataframe as dd
>>> df = ... # create dataframe
>>> df = df.persist() # start computing on the cluster
>>> from distributed.client import futures_of
>>> futures = futures_of(df) # get futures behind dataframe
>>> futures
[<Future: status: finished, type: pd.DataFrame, key: load-1>
<Future: status: finished, type: pd.DataFrame, key: load-2>
<Future: status: error, key: load-3>
<Future: status: pending, key: load-4>
<Future: status: error, key: load-5>]
>>> # wait until computation is done
>>> while any(f.status == 'pending' for f in futures):
... sleep(0.1)
>>> # pick out only the successful futures and reconstruct the dataframe
>>> good_futures = [f for f in futures if f.status == 'finished']
>>> df = dd.from_delayed(good_futures, meta=df._meta)
这有点像一种权宜之计,但在初次探索混乱数据时通常很实用。如果您正在使用 concurrent.futures API(map, submit, gather),那么这种方法会更自然。
检查调度状态¶
并非所有错误都表现为异常。例如,在分布式系统中,工作进程可能意外终止,您的计算可能由于工作进程间通信或调度器开销而异常缓慢,或者存在其他一些问题。获取有关正在发生情况的反馈可以帮助识别故障和一般的性能瓶颈。
对于单机调度器,请参阅本地诊断文档。本节的其余部分将假设您正在使用分布式调度器,这些问题在分布式调度器中更常见。
Web 诊断¶
首先,分布式调度器有许多诊断工具,显示数十种记录的指标,例如 CPU、内存、网络和磁盘使用情况,先前任务的历史记录,任务到工作进程的分配,工作进程内存压力,工作窃取,打开文件句柄限制等。通过检查这些页面可以正确诊断许多问题。默认情况下,这些信息可在 http://scheduler:8787/
访问,其中 scheduler
应替换为调度器的地址。有关更多信息,请参阅诊断性能文档。
日志¶
调度器、工作进程和客户端都使用Python 的标准日志模块发出日志。默认情况下,这些日志输出到标准错误。当 Dask 由集群作业调度器(SGE/SLURM/YARN/Mesos/Marathon/Kubernetes/等等)启动时,该系统将跟踪这些日志并提供一个接口来帮助您访问它们。如果您自行启动 Dask,它们很可能会直接输出到屏幕,除非您将标准错误重定向到文件。
您可以在配置中控制日志记录的详细程度,例如,~/.config/dask/*.yaml
文件。当前的默认设置如下
logging:
distributed: info
distributed.client: warning
bokeh: error
可以独立配置特定组件的日志记录,例如 distributed.client
、distributed.scheduler
、distributed.nanny
、distributed.worker
等。因此,例如,您可以添加一行 distributed.worker: debug
来获取工作进程的非常详细的输出。
此外,您可以显式地将处理程序分配给日志记录器。以下示例将文件(“output.log”)和控制台输出都分配给调度器和工作进程。有关此处特定术语含义的信息,请参阅python logging文档。
logging:
version: 1
handlers:
file:
class: logging.handlers.RotatingFileHandler
filename: output.log
level: INFO
console:
class: logging.StreamHandler
level: INFO
loggers:
distributed.worker:
level: INFO
handlers:
- file
- console
distributed.scheduler:
level: INFO
handlers:
- file
- console
LocalCluster¶
如果您从单机使用分布式调度器,您可以通过命令行界面手动设置工作进程,或者您可以使用 LocalCluster,当您只调用 Client()
时,它就会运行。
>>> from dask.distributed import Client, LocalCluster
>>> client = Client() # This is actually the following two commands
>>> cluster = LocalCluster()
>>> client = Client(cluster.scheduler.address)
LocalCluster 非常有用,因为调度器和工作进程与您在同一进程中,因此您可以在它们运行时轻松检查它们的状态(它们运行在单独的线程中)。
>>> cluster.scheduler.processing
{'worker-one:59858': {'inc-123', 'add-443'},
'worker-two:48248': {'inc-456'}}
如果您在没有 nanny 进程的情况下运行工作进程,您也可以对工作进程执行此操作
>>> cluster = LocalCluster(nanny=False)
>>> client = Client(cluster)
如果您想使用 Dask 分布式 API,并且仍然想直接调查工作进程内部正在发生的事情,这会非常有帮助。信息不像 Web 诊断那样为您提炼,但您可以完全访问底层细节。