Python API

您可以通过导入并创建一个不带参数的 Client 来创建一个 dask.distributed 调度器。这将覆盖之前设置的任何默认值。

from dask.distributed import Client
client = Client()

如果您安装了 Bokeh,可以导航到 http://localhost:8787/status 查看诊断仪表盘。

Client

您可以通过实例化一个不带参数的 Dask Client 轻松地在您的机器上设置一个本地集群

from dask.distributed import Client
client = Client()

这将在您的本地进程中设置一个调度器,以及与机器核心数相关的多个 worker 和每个 worker 的线程数。

如果您想在同一进程中运行 worker,可以传递 processes=False 关键字参数。

client = Client(processes=False)

如果您想避免 worker 间的通信并且您的计算会释放 GIL,有时更倾向于这样做。这在使用 NumPy 或 Dask Array 时很常见。

LocalCluster

上面描述的 Client() 调用是创建 LocalCluster 并将其传递给您的 client 的简写。

from dask.distributed import Client, LocalCluster
cluster = LocalCluster()
client = Client(cluster)

这相当于,但更显式一些。

您可能想查看 LocalCluster 上可用的关键字参数,以了解处理线程和进程混合使用的选项,例如指定显式端口等。

要创建一个所有 worker 都在专用子进程中运行的本地集群,dask.distributed 还提供了实验性的 SubprocessCluster

集群管理器特性

实例化一个像 LocalCluster 这样的集群管理器类然后将其传递给 Client 是一种常见的模式。集群管理器还提供了有用的工具来帮助您了解正在发生的事情。

例如,您可以检索仪表盘 URL。

>>> cluster.dashboard_link
'http://127.0.0.1:8787/status'

您可以检索集群组件的日志。

>>> cluster.get_logs()
{'Cluster': '',
'Scheduler': "distributed.scheduler - INFO - Clear task state\ndistributed.scheduler - INFO -   S...

如果您使用的集群管理器支持扩缩容,您可以根据工作负载手动或自动修改 worker 的数量。

>>> cluster.scale(10)  # Sets the number of workers to 10

>>> cluster.adapt(minimum=1, maximum=10)  # Allows the cluster to auto scale to 10 when tasks are computed

参考

class distributed.deploy.local.LocalCluster(name=None, n_workers=None, threads_per_worker=None, processes=None, loop=None, start=None, host=None, ip=None, scheduler_port=0, silence_logs=30, dashboard_address=':8787', worker_dashboard_address=None, diagnostics_port=None, services=None, worker_services=None, service_kwargs=None, asynchronous=False, security=None, protocol=None, blocked_handlers=None, interface=None, worker_class=None, scheduler_kwargs=None, scheduler_sync_interval=1, **worker_kwargs)[source]

创建本地调度器和 Worker

这会在本地机器上创建一个由一个调度器和多个 worker 组成的“集群”。

参数
n_workers: int

要启动的 worker 数量

memory_limit: str, float, int 或 None,默认值 “auto”

设置每个 worker 的内存限制。

关于参数数据类型的说明

  • 如果为 None 或 0,则不应用限制。

  • 如果为“auto”,则将总系统内存平均分配给 worker。

  • 如果为 float,则将该比例的系统内存用于每个 worker

  • 如果为表示字节数的字符串(例如 "1GiB"),则将该数量用于每个 worker

  • 如果为 int,则将该字节数用于每个 worker

请注意,该限制仅在 processes=True 时生效,并且该限制仅是尽力而为的约束 — worker 仍然可能超过此限制。

processes: bool

是否使用进程 (True) 或线程 (False)。默认为 True,除非 worker_class=Worker,此时默认为 False。

threads_per_worker: int

每个 worker 的线程数量

scheduler_port: int

调度器的端口。使用 0 选择随机端口(默认)。8786 是一个常用选项。

silence_logs: 日志级别

要打印到标准输出的日志级别。默认为 logging.WARN。使用 False 或 None 等假值表示不改变。

host: string

调度器监听的主机地址,默认为仅 localhost

ip: string

已弃用。请参阅上面的 host

dashboard_address: str

Bokeh 诊断服务器监听的地址,例如 ‘localhost:8787’ 或 ‘0.0.0.0:8787’。默认为 ‘:8787’。设置为 None 以禁用仪表盘。使用 ‘:0’ 表示随机端口。仅指定端口(例如 ‘:8787’)时,仪表盘将绑定到 host 参数指定的接口。如果 host 为空,则绑定到所有接口 ‘0.0.0.0’。为避免在本地部署时出现防火墙问题,请将 host 设置为 ‘localhost’。

worker_dashboard_address: str

Bokeh worker 诊断服务器监听的地址,例如 ‘localhost:8787’ 或 ‘0.0.0.0:8787’。默认为 None,禁用仪表盘。使用 ‘:0’ 表示随机端口。

diagnostics_port: int

已弃用。请参阅 dashboard_address。

asynchronous: bool (默认为 False)

如果在 async/await 函数或 Tornado gen.coroutines 中使用此集群,请设置为 True。正常使用时应保持 False。

blocked_handlers: List[str]

一个字符串列表,指定调度器上要禁用的处理程序的黑名单,例如 ['feed', 'run_function']

service_kwargs: Dict[str, Dict]

传递给正在运行的服务 的额外关键字

securitySecurity 或 bool,可选

配置此集群中的通信安全性。可以是 Security 对象,或 True。如果为 True,将自动创建临时的自签名凭据。

protocol: str (可选)

要使用的协议,例如 tcp://, tls://, inproc://。根据 processessecurity 等其他关键字参数,这会默认为合适的选择。

interface: str (可选)

要使用的网络接口。默认为 lo/localhost

worker_class: Worker

用于实例化 worker 的 Worker 类。如果 processes=False 则默认为 Worker,如果 processes=True 或省略则默认为 Nanny。

**worker_kwargs

额外的 worker 参数。任何附加关键字参数将传递给 Worker 类构造函数。

示例

>>> cluster = LocalCluster()  # Create a local cluster  
>>> cluster  
LocalCluster("127.0.0.1:8786", workers=8, threads=8)
>>> c = Client(cluster)  # connect to local cluster  

将集群扩缩容至三个 worker

>>> cluster.scale(3)  

将额外的关键字参数传递给 Bokeh

>>> LocalCluster(service_kwargs={'dashboard': {'prefix': '/foo'}})