Python API
目录
Python API¶
您可以通过导入并创建一个不带参数的 Client
来创建一个 dask.distributed
调度器。这将覆盖之前设置的任何默认值。
from dask.distributed import Client
client = Client()
如果您安装了 Bokeh,可以导航到 http://localhost:8787/status
查看诊断仪表盘。
Client¶
您可以通过实例化一个不带参数的 Dask Client 轻松地在您的机器上设置一个本地集群
from dask.distributed import Client
client = Client()
这将在您的本地进程中设置一个调度器,以及与机器核心数相关的多个 worker 和每个 worker 的线程数。
如果您想在同一进程中运行 worker,可以传递 processes=False
关键字参数。
client = Client(processes=False)
如果您想避免 worker 间的通信并且您的计算会释放 GIL,有时更倾向于这样做。这在使用 NumPy 或 Dask Array 时很常见。
LocalCluster¶
上面描述的 Client()
调用是创建 LocalCluster 并将其传递给您的 client 的简写。
from dask.distributed import Client, LocalCluster
cluster = LocalCluster()
client = Client(cluster)
这相当于,但更显式一些。
您可能想查看 LocalCluster
上可用的关键字参数,以了解处理线程和进程混合使用的选项,例如指定显式端口等。
要创建一个所有 worker 都在专用子进程中运行的本地集群,dask.distributed
还提供了实验性的 SubprocessCluster
。
集群管理器特性¶
实例化一个像 LocalCluster
这样的集群管理器类然后将其传递给 Client
是一种常见的模式。集群管理器还提供了有用的工具来帮助您了解正在发生的事情。
例如,您可以检索仪表盘 URL。
>>> cluster.dashboard_link
'http://127.0.0.1:8787/status'
您可以检索集群组件的日志。
>>> cluster.get_logs()
{'Cluster': '',
'Scheduler': "distributed.scheduler - INFO - Clear task state\ndistributed.scheduler - INFO - S...
如果您使用的集群管理器支持扩缩容,您可以根据工作负载手动或自动修改 worker 的数量。
>>> cluster.scale(10) # Sets the number of workers to 10
>>> cluster.adapt(minimum=1, maximum=10) # Allows the cluster to auto scale to 10 when tasks are computed
参考¶
- class distributed.deploy.local.LocalCluster(name=None, n_workers=None, threads_per_worker=None, processes=None, loop=None, start=None, host=None, ip=None, scheduler_port=0, silence_logs=30, dashboard_address=':8787', worker_dashboard_address=None, diagnostics_port=None, services=None, worker_services=None, service_kwargs=None, asynchronous=False, security=None, protocol=None, blocked_handlers=None, interface=None, worker_class=None, scheduler_kwargs=None, scheduler_sync_interval=1, **worker_kwargs)[source]¶
创建本地调度器和 Worker
这会在本地机器上创建一个由一个调度器和多个 worker 组成的“集群”。
- 参数
- n_workers: int
要启动的 worker 数量
- memory_limit: str, float, int 或 None,默认值 “auto”
设置每个 worker 的内存限制。
关于参数数据类型的说明
如果为 None 或 0,则不应用限制。
如果为“auto”,则将总系统内存平均分配给 worker。
如果为 float,则将该比例的系统内存用于每个 worker。
如果为表示字节数的字符串(例如
"1GiB"
),则将该数量用于每个 worker。如果为 int,则将该字节数用于每个 worker。
请注意,该限制仅在
processes=True
时生效,并且该限制仅是尽力而为的约束 — worker 仍然可能超过此限制。- processes: bool
是否使用进程 (True) 或线程 (False)。默认为 True,除非 worker_class=Worker,此时默认为 False。
- threads_per_worker: int
每个 worker 的线程数量
- scheduler_port: int
调度器的端口。使用 0 选择随机端口(默认)。8786 是一个常用选项。
- silence_logs: 日志级别
要打印到标准输出的日志级别。默认为
logging.WARN
。使用 False 或 None 等假值表示不改变。- host: string
调度器监听的主机地址,默认为仅 localhost
- ip: string
已弃用。请参阅上面的
host
。- dashboard_address: str
Bokeh 诊断服务器监听的地址,例如 ‘localhost:8787’ 或 ‘0.0.0.0:8787’。默认为 ‘:8787’。设置为
None
以禁用仪表盘。使用 ‘:0’ 表示随机端口。仅指定端口(例如 ‘:8787’)时,仪表盘将绑定到host
参数指定的接口。如果host
为空,则绑定到所有接口 ‘0.0.0.0’。为避免在本地部署时出现防火墙问题,请将host
设置为 ‘localhost’。- worker_dashboard_address: str
Bokeh worker 诊断服务器监听的地址,例如 ‘localhost:8787’ 或 ‘0.0.0.0:8787’。默认为 None,禁用仪表盘。使用 ‘:0’ 表示随机端口。
- diagnostics_port: int
已弃用。请参阅 dashboard_address。
- asynchronous: bool (默认为 False)
如果在 async/await 函数或 Tornado gen.coroutines 中使用此集群,请设置为 True。正常使用时应保持 False。
- blocked_handlers: List[str]
一个字符串列表,指定调度器上要禁用的处理程序的黑名单,例如
['feed', 'run_function']
- service_kwargs: Dict[str, Dict]
传递给正在运行的服务 的额外关键字
- securitySecurity 或 bool,可选
配置此集群中的通信安全性。可以是 Security 对象,或 True。如果为 True,将自动创建临时的自签名凭据。
- protocol: str (可选)
要使用的协议,例如
tcp://
,tls://
,inproc://
。根据processes
和security
等其他关键字参数,这会默认为合适的选择。- interface: str (可选)
要使用的网络接口。默认为 lo/localhost
- worker_class: Worker
用于实例化 worker 的 Worker 类。如果 processes=False 则默认为 Worker,如果 processes=True 或省略则默认为 Nanny。
- **worker_kwargs
额外的 worker 参数。任何附加关键字参数将传递给
Worker
类构造函数。
示例
>>> cluster = LocalCluster() # Create a local cluster >>> cluster LocalCluster("127.0.0.1:8786", workers=8, threads=8)
>>> c = Client(cluster) # connect to local cluster
将集群扩缩容至三个 worker
>>> cluster.scale(3)
将额外的关键字参数传递给 Bokeh
>>> LocalCluster(service_kwargs={'dashboard': {'prefix': '/foo'}})