自定义初始化
目录
自定义初始化¶
我们经常希望在启动或关闭调度器或工作节点时运行自定义代码。我们可以使用 Client.run
或 Client.run_on_scheduler
之类的函数手动执行此操作,但这容易出错且难以自动化。
为了解决这个问题,Dask 包含了一些机制,可以在调度器、工作节点、Nanny 或客户端的生命周期内运行任意代码。
预加载脚本¶
dask-scheduler
和 dask-worker
都支持 --preload
选项,允许分别对每个调度器/工作节点进行自定义初始化。作为 --preload
值传递的模块或 Python 文件保证在建立任何连接之前导入。如果找到 dask_setup(service)
函数,则会调用该函数,并将 Scheduler
、Worker
、Nanny
或 Client
实例作为参数。当服务停止时,如果存在 dask_teardown(service)
,则会调用该函数。
为了支持附加配置,单个 --preload
模块可以通过将 dask_setup
公开为 Click 命令来注册附加的命令行参数。此命令将用于解析提供给 dask-worker
或 dask-scheduler
的附加参数,并在服务初始化之前调用。
示例¶
举例来说,请看下面的文件,它创建了一个 调度器插件 并将其注册到调度器中
# scheduler-setup.py
import click
from distributed.diagnostics.plugin import SchedulerPlugin
class MyPlugin(SchedulerPlugin):
def __init__(self, print_count):
self.print_count = print_count
super().__init__()
def add_worker(self, scheduler=None, worker=None, **kwargs):
print("Added a new worker at:", worker)
if self.print_count and scheduler is not None:
print("Total workers:", len(scheduler.workers))
@click.command()
@click.option("--print-count/--no-print-count", default=False)
def dask_setup(scheduler, print_count):
plugin = MyPlugin(print_count)
scheduler.add_plugin(plugin)
然后我们可以在启动调度器时通过引用其文件名(如果在路径中,也可以是模块名)来运行此预加载脚本
dask-scheduler --preload scheduler-setup.py --print-count
类型¶
预加载可以按以下任一形式指定
脚本的路径,例如
/path/to/myfile.py
路径中的模块名,例如
my_module.initialize
Python 脚本文本,例如
import os; os.environ["A"] = "value"
配置¶
预加载也可以通过以下配置值进行注册
distributed:
scheduler:
preload:
- "import os; os.environ['A'] = 'b'" # use Python text
- /path/to/myfile.py # or a filename
- my_module # or a module name
preload-argv:
- [] # Pass optional keywords
- ["--option", "value"]
- []
worker:
preload: []
preload-argv: []
nanny:
preload: []
preload-argv: []
client:
preload: []
preload-argv: []
注意
因为 dask-worker
命令需要接受 Worker 和 Nanny (如果使用了 Nanny) 的关键字,所以它同时有 --preload
和 --preload-nanny
关键字。所有额外的关键字 (例如上面的 --print-count
) 将发送给工作节点而不是 Nanny。目前无法在命令行上为 Nanny 预加载脚本指定额外的关键字。如果需要,我们建议使用更灵活的配置。
Worker 生命周期插件¶
您还可以创建一个包含 setup
、teardown
和 transition
方法的类,并使用 Client.register_worker_plugin
方法将其注册到调度器,以便提供给每个工作节点。
|
为所有当前和未来的工作节点注册一个生命周期工作节点插件。 |
- Client.register_worker_plugin(plugin: distributed.diagnostics.plugin.NannyPlugin | distributed.diagnostics.plugin.WorkerPlugin, name: str | None = None, nanny: bool | None = None)[source]
为所有当前和未来的工作节点注册一个生命周期工作节点插件。
自版本 2023.9.2 起已弃用:请改用
Client.register_plugin()
。这将注册一个新对象来处理此集群中工作节点的设置、任务状态转换和拆卸。该插件将在所有当前连接的工作节点上实例化。它也将在将来连接的任何工作节点上运行。
该插件可以包含
setup
、teardown
、transition
和release_key
方法。有关接口和文档字符串,请参阅dask.distributed.WorkerPlugin
类或下面的示例。它必须可以使用 pickle 或 cloudpickle 模块进行序列化。如果插件有
name
属性,或者使用了name=
关键字,则这将控制幂等性。如果已注册同名插件,则将移除并替换为新插件。对于插件的替代方案,您可能还希望查看预加载脚本。
- 参数
- pluginWorkerPlugin 或 NannyPlugin
要注册的 WorkerPlugin 或 NannyPlugin 实例。
- namestr,可选
插件的名称。注册同名插件将无效。如果插件没有 name 属性,则使用随机名称。
- nannybool,可选
是向工作节点还是向 Nanny 注册插件。
另请参阅
distributed.WorkerPlugin
unregister_worker_plugin
示例
>>> class MyPlugin(WorkerPlugin): ... def __init__(self, *args, **kwargs): ... pass # the constructor is up to you ... def setup(self, worker: dask.distributed.Worker): ... pass ... def teardown(self, worker: dask.distributed.Worker): ... pass ... def transition(self, key: str, start: str, finish: str, ... **kwargs): ... pass ... def release_key(self, key: str, state: str, cause: str | None, reason: None, report: bool): ... pass
>>> plugin = MyPlugin(1, 2, 3) >>> client.register_plugin(plugin)
您可以使用
get_worker
函数访问插件>>> client.register_plugin(other_plugin, name='my-plugin') >>> def f(): ... worker = get_worker() ... plugin = worker.plugins['my-plugin'] ... return plugin.my_state
>>> future = client.run(f)