Python API

您可以通过导入并创建一个不带参数的 Client 来创建一个 dask.distributed 调度器。这将覆盖之前设置的任何默认值。

from dask.distributed import Client
client = Client()

如果您安装了 Bokeh,可以导航到 http://localhost:8787/status 查看诊断控制面板。

Client

通过实例化一个不带参数的 Dask Client,您可以轻松地在您的机器上设置一个本地集群

from dask.distributed import Client
client = Client()

这会在您的本地进程中设置一个调度器,以及与您机器中的核心数量相关的多个工作进程和每个工作进程中的线程数。

如果您想在同一个进程中运行工作进程,可以传递 processes=False 关键字参数。

client = Client(processes=False)

如果您想避免工作进程间通信并且您的计算释放 GIL,这有时是首选的。这在使用 NumPy 或 Dask Array 时很常见。

LocalCluster

上述的 Client() 调用是创建 LocalCluster 并将其传递给您的客户端的简写形式。

from dask.distributed import Client, LocalCluster
cluster = LocalCluster()
client = Client(cluster)

这与前者等效,但表达更明确一些。

您可能需要查看 LocalCluster 上可用的关键字参数,以了解处理线程和进程混合的可用选项,例如指定明确的端口等。

要创建一个所有工作进程都在专用子进程中运行的本地集群,dask.distributed 还提供了实验性的 SubprocessCluster

集群管理器功能

实例化一个像 LocalCluster 这样的集群管理器类,然后将其传递给 Client 是一种常见模式。集群管理器还提供了有用的工具来帮助您了解正在发生的事情。

例如,您可以检索控制面板 URL。

>>> cluster.dashboard_link
'http://127.0.0.1:8787/status'

您可以从集群组件中检索日志。

>>> cluster.get_logs()
{'Cluster': '',
'Scheduler': "distributed.scheduler - INFO - Clear task state\ndistributed.scheduler - INFO -   S...

如果您使用的集群管理器支持伸缩,您可以根据工作负载手动或自动修改工作进程的数量。

>>> cluster.scale(10)  # Sets the number of workers to 10

>>> cluster.adapt(minimum=1, maximum=10)  # Allows the cluster to auto scale to 10 when tasks are computed

参考

class distributed.deploy.local.LocalCluster(name=None, n_workers=None, threads_per_worker=None, processes=None, loop=None, start=None, host=None, ip=None, scheduler_port=0, silence_logs=30, dashboard_address=':8787', worker_dashboard_address=None, diagnostics_port=None, services=None, worker_services=None, service_kwargs=None, asynchronous=False, security=None, protocol=None, blocked_handlers=None, interface=None, worker_class=None, scheduler_kwargs=None, scheduler_sync_interval=1, **worker_kwargs)[source]

创建本地调度器和工作进程

这会在本地机器上创建一个包含调度器和工作进程的“集群”。

参数
n_workers: int

启动的工作进程数量

memory_limit: str, float, int, 或 None,默认为 "auto"

设置每个工作进程的内存限制。

关于参数数据类型的说明

  • 如果是 None 或 0,不应用任何限制。

  • 如果是 "auto",则将总系统内存平均分配给工作进程。

  • 如果是 float,则使用系统内存的该比例作为每个工作进程的内存。

  • 如果是指定字节数的字符串(如 "1GiB"),则使用该数量作为每个工作进程的内存。

  • 如果是 int,则使用该字节数作为每个工作进程的内存。

请注意,该限制仅在 processes=True 时生效,并且该限制是尽力而为的——工作进程仍然有可能超过此限制。

processes: bool

是否使用进程 (True) 或线程 (False)。默认为 True,除非 worker_class=Worker,此时默认为 False。

threads_per_worker: int

每个工作进程的线程数

scheduler_port: int

调度器的端口。使用 0 选择随机端口(默认)。8786 是常用端口。

silence_logs: logging level

输出到 stdout 的日志级别。默认为 logging.WARN。使用 False 或 None 等假值表示不改变。

host: string

调度器监听的主机地址,默认为仅 localhost

ip: string

已弃用。参见上面的 host

dashboard_address: str

Bokeh 诊断服务器监听的地址,例如 'localhost:8787' 或 '0.0.0.0:8787'。默认为 ':8787'。设置为 None 禁用控制面板。使用 ':0' 表示随机端口。仅指定端口(如 ':8787')时,控制面板将绑定到来自 host 参数的给定接口。如果 host 为空,将绑定到所有接口 '0.0.0.0'。在本地部署时,为避免防火墙问题,将 host 设置为 'localhost'。

worker_dashboard_address: str

Bokeh 工作进程诊断服务器监听的地址,例如 'localhost:8787' 或 '0.0.0.0:8787'。默认为 None,禁用控制面板。使用 ':0' 表示随机端口。

diagnostics_port: int

已弃用。参见 dashboard_address。

asynchronous: bool (默认为 False)

如果在 async/await 函数或 Tornado gen.coroutines 中使用此集群,请设置为 True。正常使用时应保持为 False。

blocked_handlers: List[str]

字符串列表,指定调度器上禁止使用的处理程序黑名单,例如 ['feed', 'run_function']

service_kwargs: Dict[str, Dict]

传递给运行服务的额外关键字参数

securitySecurity 或 bool,可选

配置此集群的通信安全性。可以是 security 对象或 True。如果为 True,将自动创建临时的自签名凭据。

protocol: str (可选)

要使用的协议,例如 tcp://, tls://, inproc://。默认根据其他关键字参数(如 processessecurity)选择一个合理的值。

interface: str (可选)

要使用的网络接口。默认为 lo/localhost

worker_class: Worker

用于实例化工作进程的工作进程类。如果 processes=False 则默认为 Worker,如果 processes=True 或省略则默认为 Nanny。

**worker_kwargs

额外的工作进程参数。任何额外的关键字参数都将传递给 Worker 类构造函数。

示例

>>> cluster = LocalCluster()  # Create a local cluster  
>>> cluster  
LocalCluster("127.0.0.1:8786", workers=8, threads=8)
>>> c = Client(cluster)  # connect to local cluster  

将集群伸缩到三个工作进程

>>> cluster.scale(3)  

将额外关键字参数传递给 Bokeh

>>> LocalCluster(service_kwargs={'dashboard': {'prefix': '/foo'}})