期物
目录
期物¶
Dask 支持一个实时任务框架,它扩展了 Python 的 concurrent.futures 接口。Dask 期物允许您以最少的代码更改,在 Dask 集群上扩展通用 Python 工作流。
这个接口适用于任意任务调度,例如 dask.delayed,但它是即时执行而不是惰性执行的,这在计算可能随时间演变的情况下提供了更大的灵活性。这些功能依赖于在 dask.distributed 中找到的第二代任务调度器(尽管名称如此,但在单机上运行得非常好)。
示例¶
请访问 https://examples.dask.org.cn/futures.html 查看和运行使用 Dask 期物的示例。
启动 Dask 客户端¶
您必须启动一个 Client
才能使用期物接口。它跟踪各个 worker 进程或线程之间的状态。
from dask.distributed import Client
client = Client() # start local workers as processes
# or
client = Client(processes=False) # start local workers as threads
如果您安装了 Bokeh,这将在 http://localhost:8787
启动一个诊断控制面板。
提交任务¶
|
将函数应用提交给调度器 |
|
对参数序列应用函数 |
|
等待计算完成,将结果收集到本地进程。 |
您可以使用 submit
方法提交单个任务。
def inc(x):
return x + 1
def add(x, y):
return x + y
a = client.submit(inc, 10) # calls inc(10) in background thread or process
b = client.submit(inc, 20) # calls inc(20) in background thread or process
submit
函数返回一个 Future
,它引用一个远程结果。这个结果可能尚未完成。
>>> a
<Future: status: pending, key: inc-b8aaf26b99466a7a1980efa1ade6701d>
最终它将完成。结果会保留在远程线程/进程/worker 中,直到您明确请求取回它。
>>> a
<Future: status: finished, type: int, key: inc-b8aaf26b99466a7a1980efa1ade6701d>
>>> a.result() # blocks until task completes and data arrives
11
您可以将期物作为输入传递给 submit。Dask 自动处理依赖跟踪;一旦所有输入期物完成,它们(如有必要)将被移动到一个 worker 上,然后依赖它们的计算将开始。您无需等待输入完成即可提交新任务;Dask 会自动处理。
c = client.submit(add, a, b) # calls add on the results of a and b
类似于 Python 的 map
,您可以使用 Client.map
对许多输入调用同一个函数。
futures = client.map(inc, range(1000))
但是,请注意,每个任务大约有 1 毫秒的开销。如果您想对大量输入应用一个函数,那么您可以考虑使用 dask.bag 或 dask.dataframe。
移动数据¶
|
等待计算完成,将结果收集到本地进程。 |
|
从分布式内存中收集期物 |
|
将数据分散到分布式内存中 |
对于任何期物,您可以调用 .result
方法来收集结果。这将阻塞直到期物计算完成,然后如有必要,将结果传回您的本地进程。
>>> c.result()
32
您可以使用 Client.gather
方法并发地收集多个结果。这比按顺序对每个期物调用 .result()
更高效。
>>> # results = [future.result() for future in futures]
>>> results = client.gather(futures) # this can be faster
如果您有重要的本地数据想要包含在计算中,您可以将其作为普通输入包含在 submit 或 map 调用中。
>>> df = pd.read_csv('training-data.csv')
>>> future = client.submit(my_function, df)
或者您可以显式地对其进行 scatter
操作。scatter 会将您的数据移动到一个 worker 上,并返回一个指向该数据的期物。
>>> remote_df = client.scatter(df)
>>> remote_df
<Future: status: finished, type: DataFrame, key: bbd0ca93589c56ea14af49cba470006e>
>>> future = client.submit(my_function, remote_df)
这两种方式都能达到相同的结果,但使用 scatter 有时会更快。特别是当您使用进程或分布式 worker(需要数据传输)并且想在许多计算中使用 df
时,这一点尤为突出。事先 scatter 数据可以避免过度的数据移动。
对一个列表调用 scatter 会单独 scatter 所有元素。Dask 会以轮询的方式将这些元素均匀分布到 worker 中。
>>> client.scatter([1, 2, 3])
[<Future: status: finished, type: int, key: c0a8a20f903a4915b94db8de3ea63195>,
<Future: status: finished, type: int, key: 58e78e1b34eb49a68c65b54815d1b158>,
<Future: status: finished, type: int, key: d3395e15f605bc35ab1bac6341a285e2>]
引用、取消和异常¶
|
取消运行此期物的请求 |
|
返回失败任务的异常 |
|
返回失败任务的跟踪信息 |
|
取消正在运行的期物。这会阻止尚未运行的未来任务被调度,如果它们已经运行,则会删除它们。 |
Dask 只会计算并保留有活跃期物引用的结果。通过这种方式,您的本地变量定义了 Dask 中的哪些内容是活跃的。当您的本地 Python 会话对某个期物进行垃圾回收时,Dask 可以随意删除该数据或停止正在尝试产生该数据的计算。
>>> del future # deletes remote data once future is garbage collected
您也可以使用 Future.cancel
或 Client.cancel
方法显式取消任务。
>>> future.cancel() # deletes data even if other futures point to it
如果一个期物失败,当您尝试获取结果时,Dask 将会抛出远程异常和跟踪信息。
def div(x, y):
return x / y
>>> a = client.submit(div, 1, 0) # 1 / 0 raises a ZeroDivisionError
>>> a
<Future: status: error, key: div-3601743182196fb56339e584a2bf1039>
>>> a.result()
1 def div(x, y):
----> 2 return x / y
ZeroDivisionError: division by zero
所有依赖于出错期物的期物也会以相同的异常出错。
>>> b = client.submit(inc, a)
>>> b
<Future: status: error, key: inc-15e2e4450a0227fa38ede4d6b1a952db>
您可以使用 Future.exception
或 Future.traceback
方法显式地收集异常或跟踪信息。
等待期物¶
|
按完成顺序返回期物 |
|
等待所有/任意期物完成 |
您可以使用 wait
函数等待一个或一组期物。
from dask.distributed import wait
>>> wait(futures)
这会阻塞直到所有期物完成或出错。
您也可以使用 as_completed
函数,在期物完成时对其进行迭代。
from dask.distributed import as_completed
futures = client.map(score, x_values)
best = -1
for future in as_completed(futures):
y = future.result()
if y > best:
best = y
为了提高效率,您还可以要求 as_completed
在后台收集结果。
for future, result in as_completed(futures, with_results=True):
# y = future.result() # don't need this
...
或者批量收集自上次迭代以来已到达的所有期物。
for batch in as_completed(futures, with_results=True).batches():
for future, result in batch:
...
此外,对于迭代算法,您可以在迭代期间向 as_completed
迭代器中添加更多期物。
seq = as_completed(futures)
for future in seq:
y = future.result()
if condition(y):
new_future = client.submit(...)
seq.add(new_future) # add back into the loop
或者使用 seq.update(futures)
一次性添加多个期物。
即发即忘¶
|
至少运行一次任务,即使我们释放了期物 |
有时我们不关心收集任务的结果,只关心它可能产生的副作用,例如将结果写入文件。
>>> a = client.submit(load, filename)
>>> b = client.submit(process, a)
>>> c = client.submit(write, b, out_filename)
如上所述,Dask 会停止没有任何活跃期物的引用任务。它认为因为没有人指向这些数据,所以没有人关心。您可以使用 fire_and_forget
函数告诉 Dask 无论如何都要计算一个任务,即使没有活跃的期物引用。
from dask.distributed import fire_and_forget
>>> fire_and_forget(c)
当一个期物可能超出作用域时(例如,作为函数的一部分),这尤其有用。
def process(filename):
out_filename = 'out-' + filename
a = client.submit(load, filename)
b = client.submit(process, a)
c = client.submit(write, b, out_filename)
fire_and_forget(c)
return # here we lose the reference to c, but that's now ok
for filename in filenames:
process(filename)
从不同进程提交任务并检索结果¶
有时我们关心检索结果,但不一定是从同一个进程。
from distributed import Variable
var = Variable("my-result")
fut = client.submit(...)
var.set(fut)
使用 Variable
会指示 dask 以给定的名称记住此任务的结果,以便以后可以检索它,而无需在此期间保持 Client 处于活动状态。
var = Variable("my-result")
fut = var.get()
result = fut.result()
从任务中提交任务¶
|
在任务内部获取客户端。 |
|
让这个线程重新加入 ThreadPoolExecutor |
|
让这个任务退出 worker 的线程池 |
这是一个高级功能,在通常情况下很少需要。
任务可以通过获取自己的客户端来启动其他任务。这使得复杂且高度动态的工作负载成为可能。
from dask.distributed import get_client
def my_function(x):
...
# Get locally created client
client = get_client()
# Do normal client operations, asking cluster for computation
a = client.submit(...)
b = client.submit(...)
a, b = client.gather([a, b])
return a + b
它还允许您设置长时间运行的任务,监控套接字或物理传感器等其他资源。
def monitor(device):
client = get_client()
while True:
data = device.read_data()
future = client.submit(process, data)
fire_and_forget(future)
for device in devices:
fire_and_forget(client.submit(monitor))
然而,每个正在运行的任务都会占用一个线程,因此如果您启动许多任务来启动其他任务,如果不小心,可能会导致系统死锁。您可以在任务内部调用 secede
函数,使其从专用线程池中脱离,进入一个不占用 Dask worker 内部槽位的管理线程。
from dask.distributed import get_client, secede
def monitor(device):
client = get_client()
secede() # remove this task from the thread pool
while True:
data = device.read_data()
future = client.submit(process, data)
fire_and_forget(future)
如果您打算在等待客户端工作之后在同一个线程中执行更多工作,您可能需要显式地阻塞,直到线程能够重新加入线程池。这允许一定程度上控制创建的线程数量,并防止同时活跃的线程过多,导致硬件过载。
def f(n): # assume that this runs as a task
client = get_client()
secede() # secede while we wait for results to come back
futures = client.map(func, range(n))
results = client.gather(futures)
rejoin() # block until a slot is open in the thread pool
result = analyze(results)
return result
或者,您也可以直接在任务内部使用普通的 compute
函数。这将自动适当地调用 secede
和 rejoin
。
def f(name, fn):
df = dd.read_csv(fn) # note that this is a dask collection
result = df[df.name == name].count()
# This calls secede
# Then runs the computation on the cluster (including this worker)
# Then blocks on rejoin, and finally delivers the answer
result = result.compute()
return result
协调原语¶
|
分布式队列 |
|
分布式全局变量 |
|
分布式集中式锁 |
|
分布式集中式事件,等同于 asyncio.Event |
|
这个 信号量 将跟踪调度器上的租约,该租约可以被该类的一个实例获取和释放。 |
有时会出现在任务、worker 或客户端之间需要以超出使用期物进行正常任务调度的方式互相协调的情况。在这些情况下,Dask 提供了额外的原语来帮助处理复杂情况。
Dask 提供了锁、事件、队列和全局变量等协调原语的分布式版本,这些版本在适用的情况下与其内存中的对应项相匹配。它们可以用来控制对外部资源的访问、跟踪正在进行的计算的进度,或者在多个 worker、客户端和任务之间明智地通过侧通道共享数据。
对于 Dask 的常见用法来说,这些功能很少是必需的。我们建议初学者坚持使用上面介绍的更简单的期物(如 Client.submit
和 Client.gather
),而不是采用不必要的复杂技术。
队列¶
|
分布式队列 |
Dask 队列遵循标准 Python Queue 的 API,但现在在客户端之间移动期物或小消息。队列可以可靠地序列化,并在必要时在远程客户端上重新连接自身。
from dask.distributed import Queue
def load_and_submit(filename):
data = load(filename)
client = get_client()
future = client.submit(process, data)
queue.put(future)
client = Client()
queue = Queue()
for filename in filenames:
future = client.submit(load_and_submit, filename)
fire_and_forget(future)
while True:
future = queue.get()
print(future.result())
队列还可以发送小块信息,任何可以 msgpack 编码的数据(整数、字符串、布尔值、列表、字典等)。这对于发送小分数或管理消息非常有用。
def func(x):
try:
...
except Exception as e:
error_queue.put(str(e))
error_queue = Queue()
队列由中心调度器协调,因此它们不适合发送大量数据(您发送的所有数据都会通过中心点路由)。它们非常适合移动少量元数据或期物。这些期物可以安全地指向更大的数据块。
>>> x = ... # my large numpy array
# Don't do this!
>>> q.put(x)
# Do this instead
>>> future = client.scatter(x)
>>> q.put(future)
# Or use futures for metadata
>>> q.put({'status': 'OK', 'stage=': 1234})
全局变量¶
|
分布式全局变量 |
变量与队列类似,它们在客户端之间通信期物和小数据。但是,变量只保存一个值。您可以随时获取或设置该值。
>>> var = Variable('stopping-criterion')
>>> var.set(False)
>>> var.get()
False
这通常用于在客户端之间传递停止条件或当前参数信号。
如果您想共享大量信息,请先 scatter 数据。
>>> parameters = np.array(...)
>>> future = client.scatter(parameters)
>>> var.set(future)
锁¶
|
分布式集中式锁 |
您也可以使用 Lock
对象持有集群范围的锁。Dask Lock 的 API 与正常的 threading.Lock
对象相同,只是它们可以在整个集群上工作。
from dask.distributed import Lock
lock = Lock()
with lock:
# access protected resource
您可以同时管理多个锁。Lock 可以给定一个一致的名称,或者您可以传递锁对象本身。
当您想要锁定某些已知的命名资源时,使用一致的名称很方便。
from dask.distributed import Lock
def load(fn):
with Lock('the-production-database'):
# read data from filename using some sensitive source
return ...
futures = client.map(load, filenames)
传递锁对象也可以工作,并且在您想为特定情况创建短期锁时更简单。
from dask.distributed import Lock
lock = Lock()
def load(fn, lock=None):
with lock:
# read data from filename using some sensitive source
return ...
futures = client.map(load, filenames, lock=lock)
如果您想控制对数据库或非线程安全库等外部资源的并发访问,这非常有用。
事件¶
|
分布式集中式事件,等同于 asyncio.Event |
Dask Event 模仿 asyncio.Event
对象,但在集群范围内工作。它们持有一个可以设置或清除的标志。客户端可以等待直到事件标志被设置。与 Lock
不同,每个客户端都可以设置或清除标志,并且事件没有“所有权”。
您可以使用事件,例如同步多个客户端。
# One one client
from dask.distributed import Event
event = Event("my-event-1")
event.wait()
调用 wait 会阻塞直到事件被设置,例如在另一个客户端中。
# In another client
from dask.distributed import Event
event = Event("my-event-1")
# do some work
event.set()
事件可以被多次设置、清除和等待。引用同一事件名称的每个等待者都将在事件设置时收到通知(不像锁那样只有第一个等待者)。
from dask.distributed import Event
def wait_for_event(x):
event = Event("my-event")
event.wait()
# at this point, all function calls
# are in sync once the event is set
futures = client.map(wait_for_event, range(10))
Event("my-event").set()
client.gather(futures)
信号量¶
|
这个 信号量 将跟踪调度器上的租约,该租约可以被该类的一个实例获取和释放。 |
类似于单值的 Lock
,也可以使用集群范围的信号量来协调和限制对数据库等敏感资源的访问。
from dask.distributed import Semaphore
sem = Semaphore(max_leases=2, name="database")
def access_limited(val, sem):
with sem:
# Interact with the DB
return
futures = client.map(access_limited, range(10), sem=sem)
client.gather(futures)
sem.close()
Actor¶
Actor 允许 worker 管理快速变化的状态,而无需与中心调度器协调。这样做的好处是减少延迟(worker 之间的往返延迟约为 1 毫秒),减轻中心调度器的压力(worker 之间可以完全自行协调 Actor),并且还支持需要有状态或原地内存操作的工作流。
然而,这些好处是有代价的。调度器对 actor 不知情,因此它们无法从诊断、负载均衡或弹性中受益。一旦一个 actor 在一个 worker 上运行,它就永远绑定到那个 worker。如果那个 worker 过载或死亡,那么就没有机会恢复工作负载。
由于 Actor 避免了中心调度器,它们可以实现高性能,但不具备弹性。
示例:计数器¶
Actor 是一个包含状态和方法的类,被提交到一个 worker。
class Counter:
n = 0
def __init__(self):
self.n = 0
def increment(self):
self.n += 1
return self.n
from dask.distributed import Client
if __name__ == '__main__':
client = Client()
future = client.submit(Counter, actor=True)
counter = future.result()
>>> counter
<Actor: Counter, key=Counter-afa1cdfb6b4761e616fa2cfab42398c8>
对这个对象的方法调用会产生 ActorFutures
,它类似于普通的 Future,但只与持有 Actor 的 worker 交互。
>>> future = counter.increment()
>>> future
<ActorFuture>
>>> future.result()
1
属性访问是同步且阻塞的。
>>> counter.n
1
示例:参数服务器¶
此示例将使用参数服务器执行以下最小化:
这是一个简单的最小化,将作为一个说明性示例。
Dask Actor 将充当参数服务器,持有模型。客户端将计算上述损失函数的梯度。
import numpy as np
from dask.distributed import Client
client = Client(processes=False)
class ParameterServer:
def __init__(self):
self.data = dict()
def put(self, key, value):
self.data[key] = value
def get(self, key):
return self.data[key]
def train(params, lr=0.1):
grad = 2 * (params - 1) # gradient of (params - 1)**2
new_params = params - lr * grad
return new_params
ps_future = client.submit(ParameterServer, actor=True)
ps = ps_future.result()
ps.put('parameters', np.random.default_rng().random(1000))
for k in range(20):
params = ps.get('parameters').result()
new_params = train(params)
ps.put('parameters', new_params)
print(new_params.mean())
# k=0: "0.5988202981316124"
# k=10: "0.9569236575164062"
此示例有效,并且损失函数被最小化。\(p_i\) 收敛到 1。如果需要,可以将此示例改编为机器学习,使用更复杂的函数进行最小化。
异步操作¶
所有需要与远程 worker 通信的操作都是可等待的(awaitable)。
async def f():
future = client.submit(Counter, actor=True)
counter = await future # gather actor object locally
counter.increment() # send off a request asynchronously
await counter.increment() # or wait until it was received
n = await counter.n # attribute access also must be awaited
通常,所有触发计算的 I/O 操作(例如 to_parquet
)都应该使用 compute=False
参数完成,以避免异步阻塞。
await client.compute(ddf.to_parquet('/tmp/some.parquet', compute=False))
API¶
Client
|
连接并向 Dask 集群提交计算 |
|
取消正在运行的期物。这会阻止尚未运行的未来任务被调度,如果它们已经运行,则会删除它们。 |
|
在集群上计算 dask 集合 |
|
从分布式内存中收集期物 |
|
计算 dask 图 |
|
如果存在,从调度器获取命名数据集。 |
|
返回一个 concurrent.futures Executor,用于在此 Client 上提交任务 |
|
哪些 worker 持有哪些键 |
|
列出调度器上可用的命名数据集 |
|
对参数序列应用函数 |
|
每个 worker 节点上可用的线程/核心数 |
|
在集群上持久化 dask 集合 |
|
收集有关近期工作的统计分析信息 |
|
向调度器发布命名数据集 |
|
在网络内重新平衡数据 |
|
设置网络内期物的复制数量 |
|
重启所有 worker。 |
|
在任务调度系统之外的所有 worker 上运行函数 |
|
在调度器进程上运行函数 |
|
将数据分散到分布式内存中 |
关闭连接的调度器和 worker |
|
|
集群中 worker 的基本信息 |
|
将函数应用提交给调度器 |
|
从调度器中移除命名数据集 |
|
将本地包上传到调度器和 worker |
|
存储每个期物数据的 worker |
Future
|
一个远程运行的计算 |
在期物完成后调用回调函数 |
|
|
取消运行此期物的请求 |
如果期物已被取消,返回 True |
|
返回计算是否完成。 |
|
|
返回失败任务的异常 |
|
等待计算完成,将结果收集到本地进程。 |
|
返回失败任务的跟踪信息 |
函数
|
按完成顺序返回期物 |
|
至少运行一次任务,即使我们释放了期物 |
|
在任务内部获取客户端。 |
|
让这个任务退出 worker 的线程池 |
|
让这个线程重新加入 ThreadPoolExecutor |
|
等待所有/任意期物完成 |
|
内置 |
|
内置 |
- distributed.as_completed(futures=None, loop=None, with_results=False, raise_errors=True, *, timeout=None)[source]¶
按完成顺序返回期物
这会返回一个迭代器,按照输入期物的完成顺序生成它们。对迭代器调用
next
会阻塞,直到下一个期物完成,无论顺序如何。此外,您还可以在计算过程中使用
.add
方法向此对象添加更多期物。- 参数
- futures: 期物集合
一个 Future 对象列表,将按照它们的完成顺序进行迭代。
- with_results: 布尔值 (False)
是否也等待并包含期物的结果;在这种情况下,
as_completed
会生成一个包含 (future, result) 的元组。- raise_errors: 布尔值 (True)
当期物的结果引发异常时是否应该抛出异常;仅在
with_results=True
时影响行为。- timeout: 整数 (可选)
如果调用
__next__()
或__anext__()
并且在从最初调用as_completed()
开始的 timeout 秒后结果仍不可用,返回的迭代器会引发dask.distributed.TimeoutError
。如果未指定或为None
,则等待时间没有限制。
示例
>>> x, y, z = client.map(inc, [1, 2, 3]) >>> for future in as_completed([x, y, z]): ... print(future.result()) 3 2 4
在计算过程中添加更多期物
>>> x, y, z = client.map(inc, [1, 2, 3]) >>> ac = as_completed([x, y, z]) >>> for future in ac: ... print(future.result()) ... if random.random() < 0.5: ... ac.add(c.submit(double, future)) 4 2 8 3 6 12 24
可选地等待直到结果也被收集
>>> ac = as_completed([x, y, z], with_results=True) >>> for future, result in ac: ... print(result) 2 4 3
- distributed.fire_and_forget(obj)[source]¶
至少运行一次任务,即使我们释放了期物
在正常操作下,Dask 不会运行任何没有活跃期物引用的任务(这在许多情况下避免了不必要的工作)。然而,有时您只想启动一个任务,不跟踪其期物,并期望它最终完成。您可以对期物或期物集合使用此函数,要求 Dask 完成任务,即使没有活跃的客户端在跟踪它。
任务完成后结果不会保留在内存中(除非有活跃的期物引用),因此这仅适用于依赖副作用的任务。
- 参数
- objFuture, list, dict, dask collection
您希望至少运行一次的期物。
示例
>>> fire_and_forget(client.submit(func, *args))
- distributed.get_client(address=None, timeout=None, resolve_address=True) Client [source]¶
在任务内部获取客户端。
此客户端连接到 worker 所连接的同一个调度器。
- 参数
- address字符串, 可选
要连接的调度器地址。默认为 worker 所连接的调度器。
- timeout整数或字符串
获取 Client 的超时时间(秒)。默认为
distributed.comm.timeouts.connect
配置值。- resolve_address布尔值, 默认为 True
是否将 address 解析为其规范形式。
- 返回值
- Client
示例
>>> def f(): ... client = get_client(timeout="10s") ... futures = client.map(lambda x: x + 1, range(10)) # spawn many tasks ... results = client.gather(futures) ... return sum(results)
>>> future = client.submit(f) >>> future.result() 55
- distributed.secede()[source]¶
让这个任务退出 worker 的线程池
这会为新任务打开一个新的调度槽位和一个新线程。这使得客户端可以在此节点上调度任务,在等待其他作业完成时(例如使用
client.gather
)尤其有用。另请参阅
示例
>>> def mytask(x): ... # do some work ... client = get_client() ... futures = client.map(...) # do some remote work ... secede() # while that work happens, remove ourself from the pool ... return client.gather(futures) # return gathered results
- distributed.rejoin()[source]¶
让这个线程重新加入 ThreadPoolExecutor
这将阻塞,直到 executor 中出现新的槽位。下一个完成任务的线程将离开线程池,以允许此线程加入。
另请参阅
secede
离开线程池
- distributed.wait(fs, timeout=None, return_when='ALL_COMPLETED')[source]¶
等待所有/任意期物完成
- 参数
- fsList[Future]
- timeout数字, 字符串, 可选
在此时间后将引发
dask.distributed.TimeoutError
。可以是字符串,例如"10 minutes"
,也可以是等待的秒数。- return_when字符串, 可选
ALL_COMPLETED 或 FIRST_COMPLETED 中的一个。
- 返回值
- 已完成和未完成的命名元组
- 已完成和未完成的命名元组
distributed.print(*args, sep: str | None = ' ', end: str | None = '\n', file: TextIO | None = None, flush: bool = False) None [source]¶
内置
print
函数的直接替代,用于从 worker 到客户端的远程打印。如果从 Dask worker 外部调用,其参数将直接传递给builtins.print()
。如果在 worker 上运行的代码调用此函数,除了在本地打印外,任何连接(可能远程)到管理此 worker 的调度器的客户端都将收到一个事件,指示它们将相同的输出打印到其标准输出或标准错误流。例如,用户可以通过在提交的代码中包含此print
函数的调用,并在本地 Jupyter notebook 或解释器会话中检查输出来对远程计算进行简单调试。所有参数的行为与
builtins.print()
的参数相同,但指定file
关键字参数时,它必须是sys.stdout
或sys.stderr
;不允许使用任意文件类对象。- 参数
- 所有非关键字参数都使用
str()
转换为字符串并写入流中,由sep
分隔,后跟end
。sep
和end
都必须是字符串;它们也可以是None
,表示使用默认值。如果没有给出对象,print()
将只写入end
。 sep字符串, 可选
- 值之间插入的字符串,默认为空格。
end字符串, 可选
- 追加在最后一个值之后的字符串,默认为换行符。
file
sys.stdout
或sys.stderr
, 可选- 默认为当前的 sys.stdout。
flush布尔值, 默认为 False
- 所有非关键字参数都使用
示例
>>> from dask.distributed import Client, print >>> client = distributed.Client(...) >>> def worker_function(): ... print("Hello from worker!") >>> client.submit(worker_function) <Future: finished, type: NoneType, key: worker_function-...> Hello from worker!
- 是否强制刷新流。
内置
warnings.warn()
函数的直接替代,用于从 worker 向客户端远程发出警告。distributed.warn(message: str | Warning, category: type[Warning] | None = <class 'UserWarning'>, stacklevel: int = 1, source: typing.Any = None) None [source]¶
内置
warnings.warn()
函数的直接替代,用于从 worker 向客户端远程发出警告。如果从 Dask worker 外部调用,其参数将直接传递给warnings.warn()
。如果在 worker 上运行的代码调用此函数,除了在本地发出警告外,任何连接(可能远程)到管理此 worker 的调度器的客户端都将收到一个事件,指示它们发出相同的警告(受其自己的本地过滤器等限制)。在实现可能在 worker 上运行的计算时,用户可以调用此warn
函数,以确保任何远程客户端会话都能看到其警告,例如在 Jupyter 输出单元格中。示例
>>> from dask.distributed import Client, warn >>> client = Client() >>> def do_warn(): ... warn("A warning from a worker.") >>> client.submit(do_warn).result() /path/to/distributed/client.py:678: UserWarning: A warning from a worker.
-
本地发出的警告会遵循所有参数(与
warnings.warn()
中的含义相同),但客户端会忽略stacklevel
和source
,因为它们在客户端线程中没有意义。 连接并向 Dask 集群提交计算
class distributed.Client(address=None, loop=None, timeout=_NoDefault.no_default, set_as_default=True, scheduler_file=None, security=None, asynchronous=False, name=None, heartbeat_interval=None, serializers=None, deserializers=None, extensions={}, direct_to_workers=None, connection_limit=512, **kwargs)[source]¶
Client 将用户连接到 Dask 集群。它提供了一个围绕函数和期物的异步用户界面。此类的功能类似于
concurrent.futures
中的 executor,但也在submit/map
调用中允许使用Future
对象。当 Client 实例化时,它默认会接管所有dask.compute
和dask.persist
调用。- 参数
- 通常也可以不指定调度器地址来创建 Client,例如
Client()
。在这种情况下,Client 会在后台创建一个LocalCluster
并连接到它。此时,任何额外的关键字参数都会从 Client 传递给 LocalCluster。有关更多信息,请参阅 LocalCluster 文档。 address: 字符串或 Cluster
- 这可以是
Scheduler
服务器的地址,例如字符串'127.0.0.1:8786'
,也可以是LocalCluster()
等集群对象。 loop
- 事件循环。
timeout: 整数 (默认为配置 ``distributed.comm.timeouts.connect``)
- 初始化连接到调度器的超时持续时间。
set_as_default: 布尔值 (True)
- 使用此 Client 作为全局 Dask 调度器。
scheduler_file: 字符串 (可选)
- 包含调度器信息的文件路径(如果可用)。
security: Security 或 布尔值, 可选
- 可选的安全信息。如果创建本地集群,也可以传入
True
,这种情况下会自动创建临时的自签名凭据。 asynchronous: 布尔值 (默认为 False)
- 如果在 async/await 函数或 Tornado gen.coroutines 中使用此客户端,请将其设置为 True。否则,对于正常使用应保持 False。
name: 字符串 (可选)
- 为客户端指定一个名称,该名称将包含在调度器上生成的与此客户端相关的日志中。
heartbeat_interval: 整数 (可选)
- 向调度器发送心跳之间的时间(毫秒)。
serializers
- 序列化对象时使用的可迭代方法集合。详见 序列化。
deserializers
- 反序列化对象时使用的可迭代方法集合。详见 序列化。
extensions列表
- 扩展。
direct_to_workers: 布尔值 (可选)
- 是否直接连接到 worker,或者请求调度器作为中介。
connection_limit整数
- 在连接池中同时保持的开放通信连接数。
**kwargs
- 通常也可以不指定调度器地址来创建 Client,例如
另请参阅
如果您不传递调度器地址,Client 将创建一个
LocalCluster
对象,并将任何额外的关键字参数传递给它。distributed.scheduler.Scheduler
内部调度器。
示例
distributed.LocalCluster
>>> client = Client('127.0.0.1:8786')
在初始化时提供集群的调度器节点地址。
>>> a = client.submit(add, 1, 2) >>> b = client.submit(add, 10, 20)
使用
submit
方法将单个计算发送到集群。>>> c = client.submit(add, a, b)
继续对结果使用 submit 或 map 构建更大的计算。
>>> client.gather(c) 33
使用
gather
方法收集结果。>>> client = Client() # makes your own local "cluster"
您也可以在不带参数的情况下调用 Client 来创建自己的本地集群。
>>> client = Client(n_workers=2, threads_per_worker=4)
- 额外的关键字参数将直接传递给 LocalCluster。
property amm¶
- 方便访问 Active Memory Manager。
- 线程本地、任务本地上下文管理器,使得 Client.current 类方法返回自身。在此上下文管理器中反序列化的任何 Future 对象都将自动附加到此 Client。
benchmark_hardware() dict [source]¶
- 返回值
- 在 worker 上运行内存、磁盘和网络带宽基准测试。
结果: 字典
- 一个字典,将名称“disk”、“memory”和“network”映射到另一个字典,该字典将大小映射到带宽。这些带宽是在集群上运行计算的多个 worker 的平均值。
call_stack(futures=None, keys=None)[source]¶
所有相关键的活跃调用栈。
- 参数
- 您可以通过在
futures=
关键字中提供期物或集合,或者在keys=
关键字中提供显式键列表来指定感兴趣的数据。如果两者都未提供,则返回所有调用栈。 futures列表 (可选)
- 期物列表,默认为所有数据。
keys列表 (可选)
- 您可以通过在
示例
>>> df = dd.read_parquet(...).persist() >>> client.call_stack(df) # call on collections
>>> client.call_stack() # Or call with no arguments for all activity
- 键名称列表,默认为所有数据。
cancel(futures, asynchronous=None, force=False, reason=None, msg=None)[source]¶
- 参数
- 取消正在运行的期物。这会阻止尚未运行的未来任务被调度,如果它们已经运行,则会删除它们。调用后,此结果和所有依赖的结果将不再可访问。
futuresList[Future]
- 期物列表。
asynchronous: 布尔值
- 如果为 True,则客户端处于异步模式。
force布尔值 (False)
- 即使其他客户端需要此期物也取消它。
reason: 字符串
- 取消期物的原因。
msg字符串
- 将附加到已取消期物的消息。
close(timeout=_NoDefault.no_default)[source]¶
关闭此客户端。
当您的 Python 会话结束时,客户端也会自动关闭。
- 参数
- 如果您在不带参数的情况下启动了客户端,例如
Client()
,那么这也会关闭同时启动的本地集群。 timeout数字
- 如果您在不带参数的情况下启动了客户端,例如
- Client.restart
在集群上计算 dask 集合
- 参数
- compute(collections, sync=False, optimize_graph=True, workers=None, allow_other_workers=False, resources=None, retries=0, priority=0, fifo_timeout='60s', actors=None, traverse=True, **kwargs)[source]¶
collectionsdask 对象或单个 dask 对象的可迭代对象
- dask.array 或 dataframe 或 dask.value 等集合对象。
sync布尔值 (可选)
- 如果为 False(默认),返回 Futures;如果为 True,返回具体值。
optimize_graph布尔值
- 是否优化底层图。
workers字符串或字符串的可迭代对象
- 允许执行计算的 worker 主机名集合。留空则默认为所有 worker(常见情况)。
allow_other_workers布尔值 (默认为 False)
- 与 workers 一起使用。指示计算是否可以在不在 workers 集合中的 worker 上执行。
retries整数 (默认为 0)
- 计算结果失败时允许的自动重试次数。
priority数字
- 可选的任务优先级。零为默认值。优先级越高越优先。
fifo_timeouttimedelta 字符串 (默认为 ’60s’)
- 考虑相同优先级调用之间允许的时间量。
traverse布尔值 (默认为 True)
- 默认情况下,dask 会遍历内置的 Python 集合,查找传递给
compute
的 dask 对象。对于大型集合来说,这可能很耗时。如果任何参数都不包含 dask 对象,请设置traverse=False
以避免进行此遍历。 resources字典 (默认为 {})
- 定义此映射任务的每个实例在 worker 上所需的 resources;例如
{'GPU': 2}`. 有关定义资源的详细信息,请参阅 worker resources。
actors布尔值或字典 (默认为 None)
- 在连接池中同时保持的开放通信连接数。
这些任务是否应作为有状态 Actor 存在于 worker 上。可以在全局(True/False)或按任务(
{'x': True, 'y': False}
)基础指定。有关更多详细信息,请参阅 Actor。
- 返回值
- 传递给图优化调用的选项。
另请参阅
如果输入是序列,则为 Futures 列表;否则为单个期物。
Client.get
示例
>>> from dask import delayed >>> from operator import add >>> x = delayed(add)(1, 2) >>> y = delayed(add)(x, x) >>> xx, yy = client.compute([x, y]) >>> xx <Future: status: finished, key: add-8f6e709446674bad78ea8aeecfee188e> >>> xx.result() 3 >>> yy.result() 6
普通的同步 dask.get 函数。
>>> xx = client.compute(x)
- 也支持单个参数。
classmethod current(allow_global=True)[source]¶
- 参数
- 在 as_client 上下文内运行时,返回上下文本地的当前客户端。否则,返回最新初始化的 Client。如果不存在 Client 实例,则引发 ValueError。如果 allow_global 设置为 False,并且在 as_client 上下文管理器之外运行,则引发 ValueError。
allow_global布尔值
- 返回值
- Client
如果为 True,返回默认客户端。
- 当前客户端。
- 引发
ValueError
另请参阅
如果未设置客户端,则引发 ValueError。
- default_client
property dashboard_link¶
- 返回值
- 调度器控制面板的链接。
字符串
示例
控制面板 URL。
>>> import webbrowser >>> from distributed import Client >>> client = Client() >>> webbrowser.open(client.dashboard_link)
- 在您的默认网页浏览器中打开控制面板。
dump_cluster_state(filename: str = 'dask-cluster-dump', write_from_scheduler: bool | None = None, exclude: collections.abc.Collection[str] = (), format: Literal['msgpack', 'yaml'] = 'msgpack', **storage_options)[source]¶
提取整个集群状态的转储,并持久化到磁盘或 URL。此功能仅用于调试目的。
警告:调度器(以及客户端,如果本地写入转储)的内存使用量可能很大。在大型或长时间运行的集群上,这可能需要几分钟。处理转储时,调度器可能无响应。
{ "scheduler": {...}, # scheduler state "workers": { worker_addr: {...}, # worker state ... } "versions": { "scheduler": {...}, "workers": { worker_addr: {...}, ... } } }
- 参数
- 结果将存储在一个字典中。
filename
要写入的路径或 URL。将自动附加适当的文件后缀(
.msgpack.gz
或.yaml
)。- 必须是
fsspec.open()
支持的路径(例如s3://my-bucket/cluster-dump
或cluster-dumps/dump
)。请参阅write_from_scheduler
控制转储是直接从调度器写入filename
,还是通过网络发送回客户端,然后在本地写入。 write_from_scheduler
如果为 True,则直接从调度器将集群状态写入
filename
。如果filename
是本地路径,则转储将写入到调度器文件系统上的该路径,因此如果调度器运行在临时硬件上,请务必小心。当调度器连接到网络文件系统、持久磁盘或写入到对象存储时非常有用。如果为 False,则通过网络将集群状态从调度器传回客户端,然后将其写入
filename
。对于大型转储来说,这种效率要低得多,但当调度器无法访问任何持久存储时非常有用。- exclude
要从转储中排除的属性名称集合,例如排除代码、追踪信息、日志等。
默认为排除
run_spec
,即序列化的用户代码。调试时通常不需要此项。要允许对此进行序列化,请传递一个空元组。- format
为
"msgpack"
或"yaml"
。如果使用 msgpack(默认),输出将存储为 msgpack 格式的 gzipped 文件。要读取
import gzip, msgpack with gzip.open("filename") as fd: state = msgpack.unpack(fd)
或
import yaml try: from yaml import CLoader as Loader except ImportError: from yaml import Loader with open("filename") as fd: state = yaml.load(fd, Loader=Loader)
- **storage_options
写入 URL 时传递给
fsspec.open()
的任何额外参数。
- forward_logging(logger_name=None, level=0)[source]¶
开始将给定的 logger(默认为 root)及其下面的所有 logger 从 worker 任务转发到客户端进程。无论何时命名 logger 在 worker 端处理一个 LogRecord,该记录都会被序列化,发送到客户端,并在客户端被同名 logger 处理。
请注意,只有在 worker 端 logger 的 level 设置适当的情况下,它们才会处理 LogRecords;并且只有在客户端 logger 的 level 也相应设置适当的情况下,它才会发出转发的 LogRecord。例如,如果您提交的任务将一个 DEBUG 消息记录到 logger "foo",那么为了让
forward_logging()
使该消息在您的客户端会话中发出,您必须确保 logger "foo" 在 worker 进程和客户端进程中将其 level 设置为 DEBUG(或更低)。- 参数
- logger_namestr, 可选
要开始转发的 logger 名称。
logging
模块分层命名系统的常规规则适用。例如,如果name
是"foo"
,则不仅"foo"
,还包括"foo.bar"
、"foo.baz"
等都将被转发。如果name
是None
,则表示 root logger,因此所有 logger 都将被转发。请注意,只有当 logger 的 level 足以处理给定的 LogRecord 时,它才会转发该 LogRecord。
- levelstr | int, 可选
可以选择将转发限制在此 level 或更高级别的 LogRecords,即使被转发的 logger 自身的 level 更低。
示例
出于示例目的,假设我们像用户一样配置客户端日志:将一个 StreamHandler 附加到 root logger,输出 level 为 INFO,并采用简单的输出格式。
import logging import distributed import io, yaml TYPICAL_LOGGING_CONFIG = ''' version: 1 handlers: console: class : logging.StreamHandler formatter: default level : INFO formatters: default: format: '%(asctime)s %(levelname)-8s [worker %(worker)s] %(name)-15s %(message)s' datefmt: '%Y-%m-%d %H:%M:%S' root: handlers: - console ''' config = yaml.safe_load(io.StringIO(TYPICAL_LOGGING_CONFIG)) logging.config.dictConfig(config)
现在创建一个客户端,并开始将 root logger 从 worker 转发回我们的本地客户端进程。
>>> client = distributed.Client() >>> client.forward_logging() # forward the root logger at any handled level
然后提交一个在 worker 上执行某些错误日志记录的任务。我们看到了客户端 StreamHandler 的输出。
>>> def do_error(): ... logging.getLogger("user.module").error("Hello error") ... return 42 >>> client.submit(do_error).result() 2022-11-09 03:43:25 ERROR [worker tcp://127.0.0.1:34783] user.module Hello error 42
请注意 dask 如何将属性
"worker"
添加到转发的 LogRecord 中,我们的自定义格式化程序使用了此属性。这对于准确识别哪个 worker 记录了错误非常有用。一个值得强调的细微之处:尽管我们的客户端 root logger 配置的 level 为 INFO,但 worker 端 root logger 仍保持其默认 level ERROR,因为我们在 worker 上没有进行任何明确的日志配置。因此,worker 端的 INFO 日志将不会被转发,因为它们根本从未被处理。
>>> def do_info_1(): ... # no output on the client side ... logging.getLogger("user.module").info("Hello info the first time") ... return 84 >>> client.submit(do_info_1).result() 84
在 INFO 消息被处理并转发到客户端之前,必须将客户端 logger 的 level 设置为 INFO。换句话说,客户端转发日志的“有效”level 是每个 logger 客户端 level 和 worker 端 level 的最大值。
>>> def do_info_2(): ... logger = logging.getLogger("user.module") ... logger.setLevel(logging.INFO) ... # now produces output on the client side ... logger.info("Hello info the second time") ... return 84 >>> client.submit(do_info_2).result() 2022-11-09 03:57:39 INFO [worker tcp://127.0.0.1:42815] user.module Hello info the second time 84
- gather(futures, errors='raise', direct=None, asynchronous=None)[source]¶
从分布式内存中收集期物
接受一个 future、嵌套的 future 容器、迭代器或队列。返回类型将与输入类型匹配。
- 参数
- futuresCollection of futures
这可以是一个可能嵌套的 Future 对象集合。集合可以是列表、集合或字典。
- errorsstring
如果 future 出错,应抛出异常(‘raise’)或跳过其包含在输出集合中(‘skip’)。
- directboolean
是否直接连接到 worker,还是请求调度器作为中介。这也可以在创建 Client 时设置。
- 期物列表。
asynchronous: 布尔值
- 返回值
- results: 与输入类型相同的集合,但现在包含
- 已收集的结果而非 futures
另请参阅
Client.scatter
将数据发送到集群
示例
>>> from operator import add >>> c = Client('127.0.0.1:8787') >>> x = c.submit(add, 1, 2) >>> c.gather(x) 3 >>> c.gather([x, [x], x]) # support lists and dicts [3, [3], 3]
- get(dsk, keys, workers=None, allow_other_workers=None, resources=None, sync=True, asynchronous=None, direct=None, retries=None, priority=0, fifo_timeout='60s', actors=None, **kwargs)[source]¶
计算 dask 图
- 参数
- dskdict
- keysobject, 或嵌套的对象列表
- 是否优化底层图。
可在其上执行计算的 worker 地址或主机名集合。留空则默认为所有 worker(常见情况)。
- 允许执行计算的 worker 主机名集合。留空则默认为所有 worker(常见情况)。
与
workers
一起使用。指示计算是否可以在不在 workers 集合中的 worker 上执行。- 默认情况下,dask 会遍历内置的 Python 集合,查找传递给
compute
的 dask 对象。对于大型集合来说,这可能很耗时。如果任何参数都不包含 dask 对象,请设置traverse=False
以避免进行此遍历。 定义此映射任务的每个实例在 worker 上所需的
resources
;例如{'GPU': 2}
。有关定义资源的详细信息,请参阅 worker resources。- dask.array 或 dataframe 或 dask.value 等集合对象。
如果为 False,则返回 Futures;如果为 True(默认),则返回具体值。
- 期物列表。
asynchronous: 布尔值
- directbool
是否直接连接到 worker,还是请求调度器作为中介。这也可以在创建 Client 时设置。
- 与 workers 一起使用。指示计算是否可以在不在 workers 集合中的 worker 上执行。
retries整数 (默认为 0)
- 计算结果失败时允许的自动重试次数。
priority数字
- 可选的任务优先级。零为默认值。优先级越高越优先。
fifo_timeouttimedelta 字符串 (默认为 ’60s’)
- 定义此映射任务的每个实例在 worker 上所需的 resources;例如
{'GPU': 2}`. 有关定义资源的详细信息,请参阅 worker resources。
actors布尔值或字典 (默认为 None)
- 返回值
- results
如果 'sync' 为 True,则返回结果。否则,返回已知数据 packed。如果 'sync' 为 False,则返回已知数据。否则,返回结果。
另请参阅
Client.compute
计算异步集合
示例
>>> from operator import add >>> c = Client('127.0.0.1:8787') >>> c.get({'x': (add, 1, 2)}, 'x') 3
- get_dataset(name, default=_NoDefault.no_default, **kwargs)[source]¶
如果调度器中存在命名数据集,则获取该数据集。如果不存在,则返回 default 或抛出 KeyError。
- 参数
- namestr
要检索的数据集名称
- defaultstr
可选,默认不设置。如果设置,则在 name 不存在时,不抛出 KeyError,而是返回此 default。
- kwargsdict
_get_dataset 的额外关键字参数
- 返回值
- 调度器中的数据集,如果存在
- get_events(topic: str | None = None)[source]¶
检索结构化的主题日志
- 参数
- topicstr, 可选
要检索事件的主题日志名称。如果没有提供
topic
,则返回所有主题的日志。
- get_executor(**kwargs)[source]¶
返回一个 concurrent.futures Executor,用于在此 Client 上提交任务
- 参数
- 在连接池中同时保持的开放通信连接数。
任何 submit() 或 map() 兼容的参数,例如 workers 或 resources。
- 返回值
- ClientExecutor
与 concurrent.futures API 完全兼容的 Executor 对象。
- get_metadata(keys, default=_NoDefault.no_default)[source]¶
从调度器获取任意元数据
有关完整文档字符串和示例,请参阅 set_metadata。
- 参数
- keyskey 或 list
要访问的键。如果为列表,则在嵌套集合中获取。
- default可选
如果键不存在,则返回此值。如果未提供,则在键不存在时抛出 KeyError。
另请参阅
- get_scheduler_logs(n=None)[source]¶
从调度器获取日志
- 参数
- nint
要检索的日志数量。默认为最多 10000 条,可通过
distributed.admin.log-length
配置值进行配置。
- 返回值
- 日志按倒序返回(最新在前)
- get_task_stream(start=None, stop=None, count=None, plot=False, filename='task-stream.html', bokeh_resources=None)[source]¶
从调度器获取任务流数据
这会收集仪表盘上诊断性“Task Stream”绘图中的数据。它包括特定持续时间内每个任务的开始、停止、传输和反序列化时间。
请注意,任务流诊断默认为关闭状态。您可能希望在开始工作之前调用此函数一次以确保开始记录,然后在完成后再次调用。
- 参数
- start数字或字符串
您希望何时开始记录。如果为数字,应为 time() 的结果。如果为字符串,则应为与当前时间的差异,例如 '60s' 或 '500 ms'。
- stop数字或字符串
您希望何时停止记录
- countint
所需的记录数量,如果同时指定了 start 和 stop,则忽略此参数
- plotboolean, str
如果为 True,则同时返回一个 Bokeh 图形。如果 plot == 'save',则将图形保存到文件。
- filenamestr (可选)
如果设置了
plot='save'
,则保存到的文件名- bokeh_resourcesbokeh.resources.Resources (可选)
指定资源组件是 INLINE 还是 CDN
- 返回值
- L: List[Dict]
另请参阅
get_task_stream
此方法的上下文管理器版本
示例
>>> client.get_task_stream() # prime plugin if not already connected >>> x.compute() # do some work >>> client.get_task_stream() [{'task': ..., 'type': ..., 'thread': ..., ...}]
传递关键字参数
plot=True
或plot='save'
以获取 Bokeh 图形。>>> data, figure = client.get_task_stream(plot='save', filename='myfile.html')
或者考虑使用上下文管理器
>>> from dask.distributed import get_task_stream >>> with get_task_stream() as ts: ... x.compute() >>> ts.data [...]
- get_versions(check: bool = False, packages: collections.abc.Sequence[str] | None = None) distributed.client.VersionsDict | collections.abc.Coroutine[Any, Any, distributed.client.VersionsDict] [source]¶
返回调度器、所有 worker 和客户端自身的版本信息
- 参数
- check
如果所有必需和可选软件包不匹配,则抛出 ValueError
- packages
要检查的额外软件包名称
示例
>>> c.get_versions()
>>> c.get_versions(packages=['sklearn', 'geopandas'])
- get_worker_logs(n=None, workers=None, nanny=False)[source]¶
从 worker 获取日志
- 参数
- nint
要检索的日志数量。默认为最多 10000 条,可通过
distributed.admin.log-length
配置值进行配置。- workers可迭代对象
要检索的 worker 地址列表。默认为获取所有 worker。
- nannybool, 默认 False
是获取 worker 的日志(False)还是 nanny 的日志(True)。如果指定,workers 中的地址仍应为 worker 地址,而非 nanny 地址。
- 返回值
- 将 worker 地址映射到日志的字典。
- 日志按倒序返回(最新在前)
- has_what(workers=None, **kwargs)[source]¶
哪些 worker 持有哪些键
此方法返回存储在每个 worker 内存中的数据的键。
- 参数
- workerslist (可选)
worker 地址列表,默认为所有 worker
- **kwargsdict
远程函数的可选关键字参数
示例
>>> x, y, z = c.map(inc, [1, 2, 3]) >>> wait([x, y, z]) >>> c.has_what() {'192.168.1.141:46784': ['inc-1c8dd6be1c21646c71f76c16d09304ea', 'inc-fd65c238a7ea60f6a01bf4c8a5fcf44b', 'inc-1e297fc27658d7b67b3a758f16bcf47a']}
- log_event(topic: str | collections.abc.Collection[str], msg: Any)[source]¶
在给定主题下记录事件
- 参数
- topicstr, list[str]
记录事件的主题名称。要在多个主题下记录同一事件,请传递主题名称列表。
- msg
要记录的事件消息。注意,这必须是 msgpack 可序列化的。
示例
>>> from time import time >>> client.log_event("current-time", time())
- map(func: Callable, *iterables: collections.abc.Collection, key: str | list | None = None, workers: str | collections.abc.Iterable[str] | None = None, retries: int | None = None, resources: dict[str, Any] | None = None, priority: int = 0, allow_other_workers: bool = False, fifo_timeout: str = '100 ms', actor: bool = False, actors: bool = False, pure: bool = True, batch_size=None, **kwargs)[source]¶
对参数序列应用函数
参数可以是普通对象或 Futures
- 参数
- func可调用对象
计划执行的可调用对象。如果
func
返回一个协程,它将在 worker 的主事件循环中运行。否则,func
将在 worker 的任务执行器池中运行(有关详细信息,请参阅Worker.executors
)。- iterables可迭代对象
要映射的可迭代对象列表。它们的长度应该相同。
- keystr, list
如果为字符串,则作为任务名称的前缀。如果为列表,则为显式名称。
- 是否优化底层图。
workers字符串或字符串的可迭代对象
- 与 workers 一起使用。指示计算是否可以在不在 workers 集合中的 worker 上执行。
任务失败时允许的自动重试次数
- 默认情况下,dask 会遍历内置的 Python 集合,查找传递给
compute
的 dask 对象。对于大型集合来说,这可能很耗时。如果任何参数都不包含 dask 对象,请设置traverse=False
以避免进行此遍历。 resources字典 (默认为 {})
- 计算结果失败时允许的自动重试次数。
priority数字
- 允许执行计算的 worker 主机名集合。留空则默认为所有 worker(常见情况)。
allow_other_workers布尔值 (默认为 False)
- fifo_timeoutstr timedelta (default ‘100ms’)
fifo_timeouttimedelta 字符串 (默认为 ’60s’)
- actorbool (default False)
这些任务是否应作为有状态 actor 存在于 worker 上。有关更多详细信息,请参阅 Actors。
- actorsbool (default False)
是 actor 的别名
- purebool (默认为 True)
函数是否是纯函数。对于像
np.random.random
这样的非纯函数,请设置pure=False
。请注意,如果actor
和pure
关键字参数都设置为 True,则pure
的值将恢复为 False,因为 actor 是有状态的。有关更多详细信息,请参阅 Pure Functions by Default。- batch_sizeint, 可选 (默认: 只有一个批次,其大小为整个可迭代对象)
以(最多)
batch_size
大小的批次向调度器提交任务。批次大小的权衡在于,大批次可以避免更多的每批次开销,但过大的批次可能需要很长时间才能提交,不合理地延迟集群开始处理。- **kwargsdict
发送到函数的额外关键字参数。大型值将明确包含在任务图中。
- 返回值
- Futures 的列表、迭代器或队列,具体取决于
- 输入的类型。
另请参阅
Client.submit
提交单个函数
备注
当前的任务图解析实现会搜索
key
的出现,并将其替换为相应的Future
结果。如果作为参数传递的字符串与集群上已存在的某个key
匹配,则可能导致对这些字符串进行不必要的替换。为避免这些情况,如果手动设置了key
,则需要使用唯一值。请参阅 https://github.com/dask/dask/issues/9969 以跟踪此问题的解决进度。示例
>>> L = client.map(func, sequence)
- nbytes(keys=None, summary=True, **kwargs)[source]¶
集群上每个键占用的字节数
这是通过
sys.getsizeof
衡量的,可能无法准确反映真实成本。- 参数
- 期物列表,默认为所有数据。
键列表,默认为所有键
- summaryboolean, (可选)
将键汇总为键类型
- **kwargsdict
远程函数的可选关键字参数
另请参阅
示例
>>> x, y, z = c.map(inc, [1, 2, 3]) >>> c.nbytes(summary=False) {'inc-1c8dd6be1c21646c71f76c16d09304ea': 28, 'inc-1e297fc27658d7b67b3a758f16bcf47a': 28, 'inc-fd65c238a7ea60f6a01bf4c8a5fcf44b': 28}
>>> c.nbytes(summary=True) {'inc': 84}
- ncores(workers=None, **kwargs)¶
每个 worker 节点上可用的线程/核心数
- 参数
- workerslist (可选)
我们特别关注的 worker 列表。留空则接收所有 worker 的信息。
- **kwargsdict
远程函数的可选关键字参数
示例
>>> c.nthreads() {'192.168.1.141:46784': 8, '192.167.1.142:47548': 8, '192.167.1.143:47329': 8, '192.167.1.144:37297': 8}
- normalize_collection(collection)[source]¶
如果集合的任务已存在 futures,则替换为现有 futures
这会根据调度器中的已知 futures 标准化集合任务图中的任务。它返回一个包含重叠 futures 的任务图的集合副本。
- 参数
- collectiondask 对象
dask.array、dataframe 或 dask.value 对象等集合
- 返回值
- collectiondask 对象
任务已被现有 futures 替换的集合。
另请参阅
Client.persist
触发集合任务的计算
示例
>>> len(x.__dask_graph__()) # x is a dask collection with 100 tasks 100 >>> set(client.futures).intersection(x.__dask_graph__()) # some overlap exists 10
>>> x = client.normalize_collection(x) >>> len(x.__dask_graph__()) # smaller computational graph 20
- nthreads(workers=None, **kwargs)[source]¶
每个 worker 节点上可用的线程/核心数
- 参数
- workerslist (可选)
我们特别关注的 worker 列表。留空则接收所有 worker 的信息。
- **kwargsdict
远程函数的可选关键字参数
示例
>>> c.nthreads() {'192.168.1.141:46784': 8, '192.167.1.142:47548': 8, '192.167.1.143:47329': 8, '192.167.1.144:37297': 8}
- persist(collections, optimize_graph=True, workers=None, allow_other_workers=None, resources=None, retries=None, priority=0, fifo_timeout='60s', actors=None, **kwargs)[source]¶
在集群上持久化 dask 集合
在后台开始在集群上计算集合。提供了一个新的 dask 集合,它在语义上与前一个集合相同,但现在基于当前正在执行的 futures。
- 参数
- collections序列或单个 dask 对象
collectionsdask 对象或单个 dask 对象的可迭代对象
- 如果为 False(默认),返回 Futures;如果为 True,返回具体值。
optimize_graph布尔值
- 是否优化底层图。
workers字符串或字符串的可迭代对象
- 允许执行计算的 worker 主机名集合。留空则默认为所有 worker(常见情况)。
allow_other_workers布尔值 (默认为 False)
- 与 workers 一起使用。指示计算是否可以在不在 workers 集合中的 worker 上执行。
retries整数 (默认为 0)
- 计算结果失败时允许的自动重试次数。
priority数字
- 可选的任务优先级。零为默认值。优先级越高越优先。
fifo_timeouttimedelta 字符串 (默认为 ’60s’)
- 默认情况下,dask 会遍历内置的 Python 集合,查找传递给
compute
的 dask 对象。对于大型集合来说,这可能很耗时。如果任何参数都不包含 dask 对象,请设置traverse=False
以避免进行此遍历。 resources字典 (默认为 {})
- 定义此映射任务的每个实例在 worker 上所需的 resources;例如
{'GPU': 2}`. 有关定义资源的详细信息,请参阅 worker resources。
actors布尔值或字典 (默认为 None)
- 返回值
- 集合列表或单个集合,取决于输入类型。
另请参阅
示例
>>> xx = client.persist(x) >>> xx, yy = client.persist([x, y])
- processing(workers=None)[source]¶
每个 worker 上当前正在运行的任务
- 参数
- workerslist (可选)
worker 地址列表,默认为所有 worker
示例
>>> x, y, z = c.map(inc, [1, 2, 3]) >>> c.processing() {'192.168.1.141:46784': ['inc-1c8dd6be1c21646c71f76c16d09304ea', 'inc-fd65c238a7ea60f6a01bf4c8a5fcf44b', 'inc-1e297fc27658d7b67b3a758f16bcf47a']}
- profile(key=None, start=None, stop=None, workers=None, merge_workers=True, plot=False, filename=None, server=False, scheduler=False)[source]¶
收集有关近期工作的统计分析信息
- 参数
- keystr
要选择的键前缀,通常是函数名,如 'inc'。保留为 None 以收集所有数据。
- start时间
- stop时间
- workerslist
限制分析信息的 worker 列表
- serverbool
如果为 true,则返回 worker 管理线程的 profile,而不是 worker 线程的 profile。这在对 Dask 本身进行 profile 时很有用,而不是用户代码。
- schedulerbool
如果为 true,则返回调度器管理线程的 profile 信息,而不是 worker 的。这在对 Dask 调度本身进行 profile 时很有用。
- plotboolean 或 string
是否返回一个绘图对象
- filenamestr
保存绘图的文件名
示例
>>> client.profile() # call on collections >>> client.profile(filename='dask-profile.html') # save to html file
- publish_dataset(*args, **kwargs)[source]¶
向调度器发布命名数据集
此方法将 dask 集合或 futures 列表的命名引用存储在调度器上。这些引用对其他客户端可用,这些客户端可以使用
get_dataset
下载集合或 futures。数据集不会立即计算。您可能希望在发布数据集之前调用
Client.persist
。- 参数
- args要作为 name 发布的对象列表
- kwargsdict
要在调度器上发布的命名集合
- 返回值
- None
示例
发布客户端
>>> df = dd.read_csv('s3://...') >>> df = c.persist(df) >>> c.publish_dataset(my_dataset=df)
另一种调用方式 >>> c.publish_dataset(df, name=’my_dataset’)
接收客户端
>>> c.list_datasets() ['my_dataset'] >>> df2 = c.get_dataset('my_dataset')
- rebalance(futures=None, workers=None, **kwargs)[source]¶
在网络内重新平衡数据
在 worker 之间移动数据以大致平衡内存负担。这会影响键/worker 的一个子集或整个网络,具体取决于关键字参数。
有关算法和配置选项的详细信息,请参阅调度器端的匹配方法
rebalance()
。警告
此操作通常未针对调度器的正常操作进行充分测试。不建议在等待计算时使用它。
- 参数
- futureslist, 可选
要平衡的 futures 列表,默认为所有数据
- workerslist, 可选
要平衡的 worker 列表,默认为所有 worker
- **kwargsdict
函数的可选关键字参数
- register_plugin(plugin: distributed.diagnostics.plugin.NannyPlugin | distributed.diagnostics.plugin.SchedulerPlugin | distributed.diagnostics.plugin.WorkerPlugin, name: str | None = None, idempotent: bool | None = None)[source]¶
注册一个插件。
请参阅 https://distributed.dask.org.cn/en/latest/plugins.html
- 参数
- plugin
要注册的 nanny、scheduler 或 worker 插件。
- name
插件的名称;如果为 None,则从插件实例中获取名称,或在不存在时自动生成。
- idempotent
如果已存在给定名称的插件,则不再注册。如果为 None,则如果定义了
plugin.idempotent
,则取其值,否则为 False。
- register_scheduler_plugin(plugin: distributed.diagnostics.plugin.SchedulerPlugin, name: str | None = None, idempotent: bool | None = None)[source]¶
注册调度器插件。
自版本 2023.9.2 起已废弃: 请改用
Client.register_plugin()
。请参阅 https://distributed.dask.org.cn/en/latest/plugins.html#scheduler-plugins
- 参数
- pluginSchedulerPlugin
要传递给调度器的 SchedulerPlugin 实例。
- namestr
插件的名称;如果为 None,则从插件实例中获取名称,或在不存在时自动生成。
- idempotentbool
如果给定名称的插件已存在,则不再注册。
- register_worker_callbacks(setup=None)[source]¶
注册所有当前和未来 worker 的 setup 回调函数。
这会为此集群的 worker 注册一个新的 setup 函数。该函数将立即在所有当前连接的 worker 上运行。它还将在未来添加的任何 worker 连接时运行。可以注册多个 setup 函数 - 这些函数将按照添加的顺序被调用。
如果函数接受一个名为
dask_worker
的输入参数,则该变量将填充 worker 本身。- 参数
- setupcallable(dask_worker: Worker) -> None
要在所有 worker 上注册和运行的函数
- register_worker_plugin(plugin: distributed.diagnostics.plugin.NannyPlugin | distributed.diagnostics.plugin.WorkerPlugin, name: str | None = None, nanny: bool | None = None)[source]¶
注册所有当前和未来 worker 的生命周期 worker 插件。
自版本 2023.9.2 起已废弃: 请改用
Client.register_plugin()
。这会注册一个新的对象来处理此集群中 worker 的 setup、任务状态转换和 teardown。该插件将在所有当前连接的 worker 上实例化自身。它还将在未来连接的任何 worker 上运行。
插件可能包含方法
setup
、teardown
、transition
和release_key
。请参阅dask.distributed.WorkerPlugin
类或下面的示例以了解接口和文档字符串。它必须可以使用 pickle 或 cloudpickle 模块进行序列化。如果插件具有
name
属性,或者使用了name=
关键字,则将控制幂等性。如果已经注册了具有该名称的插件,则将移除并替换为新的插件。对于插件的替代方案,您还可以查看预加载脚本。
- 参数
- pluginWorkerPlugin 或 NannyPlugin
要注册的 WorkerPlugin 或 NannyPlugin 实例。
- namestr, 可选
插件的名称。注册同名插件将无效。如果插件没有 name 属性,则使用随机名称。
- nannybool, 可选
是向 worker 还是 nanny 注册插件。
另请参阅
distributed.WorkerPlugin
unregister_worker_plugin
示例
>>> class MyPlugin(WorkerPlugin): ... def __init__(self, *args, **kwargs): ... pass # the constructor is up to you ... def setup(self, worker: dask.distributed.Worker): ... pass ... def teardown(self, worker: dask.distributed.Worker): ... pass ... def transition(self, key: str, start: str, finish: str, ... **kwargs): ... pass ... def release_key(self, key: str, state: str, cause: str | None, reason: None, report: bool): ... pass
>>> plugin = MyPlugin(1, 2, 3) >>> client.register_plugin(plugin)
您可以使用
get_worker
函数访问插件>>> client.register_plugin(other_plugin, name='my-plugin') >>> def f(): ... worker = get_worker() ... plugin = worker.plugins['my-plugin'] ... return plugin.my_state
>>> future = client.run(f)
- replicate(futures, n=None, workers=None, branching_factor=2, **kwargs)[source]¶
设置网络内期物的复制数量
将数据复制到多个 worker 上。这有助于广播频繁访问的数据,并可提高弹性。
此操作会单独对每块数据在网络中执行树形复制。此操作会阻塞直到完成。它不保证数据会复制到未来的 worker。
备注
此方法与 Active Memory Manager 的 ReduceReplicas 策略不兼容。如果您希望使用它,必须先禁用该策略或完全禁用 AMM。
- 参数
- futuresfutures 列表
我们希望复制的 futures
- nint, 可选
在集群上复制数据的进程数量。默认为所有进程。
- workersworker 地址列表
我们希望限制复制的 worker。默认为所有 worker。
- branching_factorint, 可选
每个生成中可以复制数据的 worker 数量
- **kwargsdict
远程函数的可选关键字参数
另请参阅
示例
>>> x = c.submit(func, *args) >>> c.replicate([x]) # send to all workers >>> c.replicate([x], n=3) # send to three workers >>> c.replicate([x], workers=['alice', 'bob']) # send to specific >>> c.replicate([x], n=1, workers=['alice', 'bob']) # send to one of specific workers >>> c.replicate([x], n=1) # reduce replications
- restart(timeout: typing.Union[str, int, float, typing.Literal[<no_default>]] = _NoDefault.no_default, wait_for_workers: bool = True)[source]¶
重启所有 worker。重置本地状态。可选地等待 worker 返回。
没有 nanny 的 worker 会被关闭,希望外部部署系统能重启它们。因此,如果不使用 nanny 并且您的部署系统不会自动重启 worker,
restart
将只会关闭所有 worker,然后超时!restart
后,所有连接的 worker 都是新的,无论是否抛出了TimeoutError
。任何未及时关闭的 worker 都将被移除,将来可能会或可能不会自行关闭。- 参数
- timeout
如果
wait_for_workers
为 True,则等待 worker 关闭并重新连接的时间;否则只等待 worker 关闭的时间。如果超出此时间,则抛出asyncio.TimeoutError
。- wait_for_workers
是等待所有 worker 重新连接,还是只等待它们关闭(默认为 True)。结合
Client.wait_for_workers()
使用restart(wait_for_workers=False)
,以对等待多少个 worker 进行细粒度控制。
- restart_workers(workers: list[str], timeout: typing.Union[str, int, float, typing.Literal[<no_default>]] = _NoDefault.no_default, raise_for_error: bool = True)[source]¶
重启指定的 worker 集合
备注
只有被
distributed.Nanny
监控的 worker 才能被重启。有关更多详细信息,请参阅Nanny.restart
。- 参数
- workerslist[str]
要重启的 worker。可以是 worker 地址列表、名称列表或两者都有。
- timeoutint | float | None
等待的秒数
- raise_for_error: bool (default True)
如果重启 worker 未在
timeout
内完成,或者由于重启 worker 导致其他异常,是否抛出TimeoutError
。
- 返回值
- dict[str, “OK” | “removed” | “timed out”]
worker 和重启状态的映射,键将与通过
workers
传递的原始值匹配。
备注
此方法与
Client.restart()
的区别在于,此方法仅重启指定的 worker 集合,而Client.restart
将重启所有 worker 并重置集群上的本地状态(例如,所有键都被释放)。此外,此方法无法优雅地处理在 worker 重启时正在执行的任务。这些任务可能会失败或其可疑计数会增加。
示例
您可以使用以下方法获取有关活动 worker 的信息:
>>> workers = client.scheduler_info()['workers']
您可以从该列表中选择一些 worker 进行重启。
>>> client.restart_workers(workers=['tcp://address:port', ...])
- retire_workers(workers: list[str] | None = None, close_workers: bool = True, **kwargs)[source]¶
在调度器上退役某些 worker
有关完整文档字符串,请参阅
distributed.Scheduler.retire_workers()
。- 参数
- 工作节点
- 关闭工作节点
- **kwargsdict
远程函数的可选关键字参数
另请参阅
dask.distributed.Scheduler.retire_workers
示例
您可以使用以下方法获取有关活动 worker 的信息:
>>> workers = client.scheduler_info()['workers']
您可能需要从该列表中选择一些工作节点来关闭
>>> client.retire_workers(workers=['tcp://address:port', ...])
- retry(futures, asynchronous=None)[source]¶
重试失败的 Future
- 参数
- futuresFuture 列表
futuresList[Future]
- 期物列表。
asynchronous: 布尔值
- run(function, *args, workers: list[str] | None = None, wait: bool = True, nanny: bool = False, on_error: Literal['raise', 'return', 'ignore'] = 'raise', **kwargs)[source]¶
在任务调度系统之外的所有 worker 上运行函数
此方法立即在所有已知工作节点上调用函数,阻塞直到结果返回,并以工作节点地址为键的字典形式异步返回结果。此方法通常用于副作用,例如收集诊断信息或安装库。
如果您的函数接受名为
dask_worker
的输入参数,则该变量将填充工作节点本身。- 参数
- function可调用对象
要运行的函数
- *args元组
远程函数的可选参数
- **kwargsdict
远程函数的可选关键字参数
- workerslist
运行函数的工作节点。默认为所有已知工作节点。
- wait布尔值(可选)
如果函数是异步的,是否等待该函数完成。
- nannybool, 默认 False
是否在 nanny 上运行
function
。默认情况下,函数在工作节点进程上运行。如果指定,workers
中的地址仍应是工作节点地址,而不是 nanny 地址。- on_error: “raise” | “return” | “ignore”
如果函数在工作节点上引发错误
- raise
(默认)在客户端重新引发异常。其他工作节点的输出将丢失。
- return
返回 Exception 对象而不是该工作节点的函数输出
- ignore
忽略异常并从结果字典中移除该工作节点
示例
>>> c.run(os.getpid) {'192.168.0.100:9000': 1234, '192.168.0.101:9000': 4321, '192.168.0.102:9000': 5555}
使用
workers=
关键字参数将计算限制在特定的工作节点上。>>> c.run(os.getpid, workers=['192.168.0.100:9000', ... '192.168.0.101:9000']) {'192.168.0.100:9000': 1234, '192.168.0.101:9000': 4321}
>>> def get_status(dask_worker): ... return dask_worker.status
>>> c.run(get_status) {'192.168.0.100:9000': 'running', '192.168.0.101:9000': 'running}
在后台运行异步函数
>>> async def print_state(dask_worker): ... while True: ... print(dask_worker.status) ... await asyncio.sleep(1)
>>> c.run(print_state, wait=False)
- run_on_scheduler(function, *args, **kwargs)[source]¶
在调度器进程上运行函数
这通常用于实时调试。函数应接受一个关键字参数
dask_scheduler=
,该参数将接收调度器对象本身。- 参数
- function可调用对象
在调度器进程上运行的函数
- *args元组
函数的可选参数
- **kwargsdict
函数的可选关键字参数
另请参阅
Client.run
在所有工作节点上运行函数
示例
>>> def get_number_of_tasks(dask_scheduler=None): ... return len(dask_scheduler.tasks)
>>> client.run_on_scheduler(get_number_of_tasks) 100
在后台运行异步函数
>>> async def print_state(dask_scheduler): ... while True: ... print(dask_scheduler.status) ... await asyncio.sleep(1)
>>> c.run(print_state, wait=False)
- scatter(data, workers=None, broadcast=False, direct=None, hash=True, timeout=_NoDefault.no_default, asynchronous=None)[source]¶
将数据分散到分布式内存中
这会将数据从本地客户端进程移动到分布式调度器的工作节点中。请注意,通常最好向工作节点提交加载数据的任务,而不是在本地加载数据然后再分散到它们。
- 参数
- datalist、dict 或 object
要分散到工作节点的数据。输出类型与输入类型匹配。
- workers元组列表(可选)
可选地限制数据位置。将工作节点指定为主机名/端口对,例如
('127.0.0.1', 8787)
。- broadcastbool(默认为 False)
是否将每个数据元素发送到所有工作节点。默认情况下,我们根据核心数量进行轮询。
备注
将此标志设置为 True 与活跃内存管理器的 ReduceReplicas 策略不兼容。如果您希望使用它,必须首先禁用该策略或完全禁用 AMM。
- directbool(默认为自动检查)
是否直接连接到 worker,还是请求调度器作为中介。这也可以在创建 Client 时设置。
- hashbool(可选)
是否哈希数据以确定键。如果为 False,则使用随机键
- timeout数字,可选
timeout数字
- 期物列表。
asynchronous: 布尔值
- 返回值
- 与输入类型匹配的 Future 列表、字典、迭代器或队列。
另请参阅
Client.gather
将数据收集回本地进程
备注
分散字典使用
dict
键创建Future
键。当前的任务图解析实现会搜索key
的出现,并将其替换为相应的Future
结果。如果作为参数传递给任务的字符串与集群上已存在的某个key
匹配,这可能导致意外的字符串替换。为了避免这种情况,如果手动设置key
,则需要使用唯一值。请参阅 https://github.com/dask/dask/issues/9969 以跟踪此问题的解决进展。示例
>>> c = Client('127.0.0.1:8787') >>> c.scatter(1) <Future: status: finished, key: c0a8a20f903a4915b94db8de3ea63195>
>>> c.scatter([1, 2, 3]) [<Future: status: finished, key: c0a8a20f903a4915b94db8de3ea63195>, <Future: status: finished, key: 58e78e1b34eb49a68c65b54815d1b158>, <Future: status: finished, key: d3395e15f605bc35ab1bac6341a285e2>]
>>> c.scatter({'x': 1, 'y': 2, 'z': 3}) {'x': <Future: status: finished, key: x>, 'y': <Future: status: finished, key: y>, 'z': <Future: status: finished, key: z>}
将数据位置限制在工作节点的子集
>>> c.scatter([1, 2, 3], workers=[('hostname', 8788)])
将数据广播到所有工作节点
>>> [future] = c.scatter([element], broadcast=True)
使用客户端 Future 接口将分散的数据发送到并行化函数
>>> data = c.scatter(data, broadcast=True) >>> res = [c.submit(func, data, i) for i in range(100)]
- scheduler_info(n_workers: int = 5, **kwargs: Any) distributed.objects.SchedulerInfo [source]¶
集群中 worker 的基本信息
- 参数
- n_workers: int
要获取信息的工作节点数量。获取所有工作节点请使用 -1。
- **kwargsdict
远程函数的可选关键字参数
示例
>>> c.scheduler_info() {'id': '2de2b6da-69ee-11e6-ab6a-e82aea155996', 'services': {}, 'type': 'Scheduler', 'workers': {'127.0.0.1:40575': {'active': 0, 'last-seen': 1472038237.4845693, 'name': '127.0.0.1:40575', 'services': {}, 'stored': 0, 'time-delay': 0.0061032772064208984}}}
- set_metadata(key, value)[source]¶
在调度器中设置任意元数据
这允许您在中心调度器进程上存储少量数据用于管理目的。数据应该是 msgpack 可序列化的(整数、字符串、列表、字典)
如果键对应于一个任务,则当调度器忘记该任务时,该键将被清理。
如果键是一个列表,则假定您想使用这些键索引到嵌套字典结构中。例如,如果您调用以下内容
>>> client.set_metadata(['a', 'b', 'c'], 123)
则这与设置以下内容相同
>>> scheduler.task_metadata['a']['b']['c'] = 123
较低级别的字典将按需创建。
另请参阅
示例
>>> client.set_metadata('x', 123) >>> client.get_metadata('x') 123
>>> client.set_metadata(['x', 'y'], 123) >>> client.get_metadata('x') {'y': 123}
>>> client.set_metadata(['x', 'w', 'z'], 456) >>> client.get_metadata('x') {'y': 123, 'w': {'z': 456}}
>>> client.get_metadata(['x', 'w']) {'z': 456}
- shutdown()[source]¶
关闭连接的调度器和 worker
注意,这可能会干扰使用相同调度器和工作节点的其他客户端。
另请参阅
Client.close
仅关闭此客户端
- submit(func, *args, key=None, workers=None, resources=None, retries=None, priority=0, fifo_timeout='100 ms', allow_other_workers=False, actor=False, actors=False, pure=True, **kwargs)[source]¶
将函数应用提交给调度器
- 参数
- func可调用对象
可调用对象,将安排为
func(*args **kwargs)
运行。如果func
返回一个协程,它将在工作节点的主事件循环上运行。否则,func
将在工作节点的任务执行器池中运行(有关更多信息,请参阅Worker.executors
)。- *args元组
可选位置参数
- keystr
任务的唯一标识符。默认为函数名和哈希值
- 是否优化底层图。
可在其上执行计算的 worker 地址或主机名集合。留空则默认为所有 worker(常见情况)。
- 默认情况下,dask 会遍历内置的 Python 集合,查找传递给
compute
的 dask 对象。对于大型集合来说,这可能很耗时。如果任何参数都不包含 dask 对象,请设置traverse=False
以避免进行此遍历。 定义此映射任务的每个实例在 worker 上所需的
resources
;例如{'GPU': 2}
。有关定义资源的详细信息,请参阅 worker resources。- 与 workers 一起使用。指示计算是否可以在不在 workers 集合中的 worker 上执行。
任务失败时允许的自动重试次数
- 计算结果失败时允许的自动重试次数。
priority数字
- fifo_timeoutstr timedelta (default ‘100ms’)
fifo_timeouttimedelta 字符串 (默认为 ’60s’)
- 允许执行计算的 worker 主机名集合。留空则默认为所有 worker(常见情况)。
与
workers
一起使用。指示计算是否可以在不在 workers 集合中的 worker 上执行。- actorbool (default False)
此任务是否应作为有状态 actor 存在于工作节点上。有关更多详细信息,请参阅 Actors。
- actorsbool (default False)
是 actor 的别名
- purebool (默认为 True)
函数是否是纯函数。对于像
np.random.random
这样的非纯函数,请设置pure=False
。请注意,如果actor
和pure
关键字参数都设置为 True,则pure
的值将恢复为 False,因为 actor 是有状态的。有关更多详细信息,请参阅 Pure Functions by Default。- 在连接池中同时保持的开放通信连接数。
- 返回值
- Future
如果在异步模式下运行,则返回 Future。否则返回具体值
- 当前客户端。
- TypeError
如果“func”不是可调用对象,则会引发 TypeError
- 引发
如果“allow_other_workers”为 True 且“workers”为 None,则会引发 ValueError
另请参阅
Client.map
一次提交多个参数
备注
当前的任务图解析实现会搜索
key
的出现,并将其替换为相应的Future
结果。如果作为参数传递的字符串与集群上已存在的某个key
匹配,则可能导致对这些字符串进行不必要的替换。为避免这些情况,如果手动设置了key
,则需要使用唯一值。请参阅 https://github.com/dask/dask/issues/9969 以跟踪此问题的解决进度。示例
>>> c = client.submit(add, a, b)
- subscribe_topic(topic, handler)[source]¶
订阅一个主题并为每个接收到的事件执行一个处理程序
- 参数
- topic: str
主题名称
- handler: 可调用对象或协程函数
为每个接收到的事件调用的处理程序。处理程序必须接受一个参数 event,它是一个元组 (timestamp, msg),其中 timestamp 指的是调度器上的时钟。
另请参阅
dask.distributed.Client.unsubscribe_topic
dask.distributed.Client.get_events
dask.distributed.Client.log_event
示例
>>> import logging >>> logger = logging.getLogger("myLogger") # Log config not shown >>> client.subscribe_topic("topic-name", lambda: logger.info)
- unpublish_dataset(name, **kwargs)[source]¶
从调度器中移除命名数据集
- 参数
- namestr
要取消发布的uyang数据集名称
示例
>>> c.list_datasets() ['my_dataset'] >>> c.unpublish_dataset('my_dataset') >>> c.list_datasets() []
- unregister_scheduler_plugin(name)[source]¶
取消注册调度器插件
请参阅 https://distributed.dask.org.cn/en/latest/plugins.html#scheduler-plugins
- 参数
- namestr
要取消注册的插件名称。有关更多信息,请参阅
Client.register_scheduler_plugin()
的文档字符串。
示例
>>> class MyPlugin(SchedulerPlugin): ... def __init__(self, *args, **kwargs): ... pass # the constructor is up to you ... async def start(self, scheduler: Scheduler) -> None: ... pass ... async def before_close(self) -> None: ... pass ... async def close(self) -> None: ... pass ... def restart(self, scheduler: Scheduler) -> None: ... pass
>>> plugin = MyPlugin(1, 2, 3) >>> client.register_plugin(plugin, name='foo') >>> client.unregister_scheduler_plugin(name='foo')
- unregister_worker_plugin(name, nanny=None)[source]¶
取消注册生命周期工作节点插件
这将取消注册现有的工作节点插件。作为取消注册过程的一部分,将调用插件的
teardown
方法。- 参数
- namestr
要取消注册的插件名称。有关更多信息,请参阅
Client.register_plugin()
的文档字符串。
另请参阅
示例
>>> class MyPlugin(WorkerPlugin): ... def __init__(self, *args, **kwargs): ... pass # the constructor is up to you ... def setup(self, worker: dask.distributed.Worker): ... pass ... def teardown(self, worker: dask.distributed.Worker): ... pass ... def transition(self, key: str, start: str, finish: str, **kwargs): ... pass ... def release_key(self, key: str, state: str, cause: str | None, reason: None, report: bool): ... pass
>>> plugin = MyPlugin(1, 2, 3) >>> client.register_plugin(plugin, name='foo') >>> client.unregister_worker_plugin(name='foo')
- unsubscribe_topic(topic)[source]¶
取消订阅主题并移除事件处理程序
另请参阅
dask.distributed.Client.subscribe_topic
dask.distributed.Client.get_events
dask.distributed.Client.log_event
- upload_file(filename, load: bool = True)[source]¶
将本地包上传到调度器和 worker
这会将本地文件发送到调度器和所有工作节点。此文件会放置在每个节点的工作目录中,请参阅配置选项
temporary-directory
(默认为tempfile.gettempdir()
)。此目录将添加到 Python 的系统路径中,因此任何
.py
、.egg
或.zip
文件都将可导入。- 参数
- filenamestring
要发送到工作节点的
.py
、.egg
或.zip
文件名- loadbool,可选
是否在上传过程中导入模块。默认为
True
。
示例
>>> client.upload_file('mylibrary.egg') >>> from mylibrary import myfunc >>> L = client.map(myfunc, seq) >>> >>> # Where did that file go? Use `dask_worker.local_directory`. >>> def where_is_mylibrary(dask_worker): >>> path = pathlib.Path(dask_worker.local_directory) / 'mylibrary.egg' >>> assert path.exists() >>> return str(path) >>> >>> client.run(where_is_mylibrary)
- wait_for_workers(n_workers: int = None, timeout: float | None = None) None [source]¶
阻塞调用,等待 n 个工作节点后继续
- 参数
- n_workersint
工作节点数量
- timeout数字,可选
timeout数字
- who_has(futures=None, **kwargs)[source]¶
存储每个 Future 数据的工节点
- 参数
- 您可以通过在
futures=
关键字中提供期物或集合,或者在keys=
关键字中提供显式键列表来指定感兴趣的数据。如果两者都未提供,则返回所有调用栈。 Future 列表,默认为所有数据
- **kwargsdict
远程函数的可选关键字参数
- 您可以通过在
示例
>>> x, y, z = c.map(inc, [1, 2, 3]) >>> wait([x, y, z]) >>> c.who_has() {'inc-1c8dd6be1c21646c71f76c16d09304ea': ['192.168.1.141:46784'], 'inc-1e297fc27658d7b67b3a758f16bcf47a': ['192.168.1.141:46784'], 'inc-fd65c238a7ea60f6a01bf4c8a5fcf44b': ['192.168.1.141:46784']}
>>> c.who_has([x, y]) {'inc-1c8dd6be1c21646c71f76c16d09304ea': ['192.168.1.141:46784'], 'inc-1e297fc27658d7b67b3a758f16bcf47a': ['192.168.1.141:46784']}
- write_scheduler_file(scheduler_file)[source]¶
将调度器信息写入 json 文件。
这便于使用文件系统轻松共享调度器信息。调度器文件可用于使用同一调度器实例化第二个 Client。
- 参数
- scheduler_filestr
要写入调度器文件的路径。
示例
>>> client = Client() >>> client.write_scheduler_file('scheduler.json') # connect to previous client's scheduler >>> client2 = Client(scheduler_file='scheduler.json')
- class distributed.Future(key, client=None, state=None, _id=None)[source]¶
一个远程运行的计算
Future 是远程工作节点上正在运行的结果的本地代理。用户在本地 Python 进程中管理 Future 对象,以确定在更大集群中发生的事情。
备注
用户不应手动实例化 Future。这可能导致状态损坏和集群死锁。
- 参数
- key: str 或 tuple
此 Future 引用的远程数据的键
- client: Client
拥有此 Future 的客户端。默认为 _get_global_client()
- inform: bool
我们是否通知调度器我们需要此 Future 的更新
- state: FutureState
Future 的状态
另请参阅
Client
创建 Future
示例
Future 通常产生于 Client 计算
>>> my_future = client.submit(add, 1, 2)
我们可以跟踪 Future 的进度和结果
>>> my_future <Future: status: finished, key: add-8f6e709446674bad78ea8aeecfee188e>
我们可以从 Future 获取结果或异常及回溯信息
>>> my_future.result()
- add_done_callback(fn)[source]¶
在期物完成后调用回调函数
回调函数
fn
应将 Future 作为其唯一参数。无论 Future 是成功完成、出错还是被取消,都将调用此函数。回调函数在单独的线程中执行。
- 参数
- fn可调用对象
要调用的方法或函数
- exception(timeout=None, **kwargs)[source]¶
返回失败任务的异常
- 参数
- timeout数字,可选
timeout数字
- **kwargsdict
函数的可选关键字参数
- 返回值
- Exception
引发的异常。如果在返回之前经过 timeout 秒,则会引发
dask.distributed.TimeoutError
。
另请参阅
- property executor¶
返回执行器,即客户端。
- 返回值
- Client
执行器
- result(timeout=None)[source]¶
等待计算完成,将结果收集到本地进程。
- 参数
- timeout数字,可选
timeout数字
- 返回值
- 结果
计算结果。如果客户端是异步的,则为协程。
- 当前客户端。
- dask.distributed.TimeoutError
如果在返回之前经过 timeout 秒,则会引发
dask.distributed.TimeoutError
。
- property status¶
返回状态
- 返回值
- 调度器控制面板的链接。
状态
- traceback(timeout=None, **kwargs)[source]¶
返回失败任务的跟踪信息
这会返回一个回溯对象。您可以使用
traceback
模块检查此对象。或者,如果您调用future.result()
,此回溯信息将伴随引发的异常。- 参数
- timeout数字,可选
超时后引发
dask.distributed.TimeoutError
的秒数。如果在返回之前经过 timeout 秒,则会引发dask.distributed.TimeoutError
。
- 返回值
- 回溯信息
回溯对象。如果客户端是异步的,则为协程。
另请参阅
示例
>>> import traceback >>> tb = future.traceback() >>> traceback.format_tb(tb) [...]
- property type¶
返回类型
- class distributed.Queue(name=None, client=None, maxsize=0)[source]¶
分布式队列
这允许多个客户端通过多生产者/多消费者队列相互共享 Future 或少量数据。所有元数据都通过调度器按顺序处理。
队列的元素必须是 Future 或 msgpack 可编码数据(整数、字符串、列表、字典)。所有数据都通过调度器发送,因此不建议发送大型对象。要共享大型对象,请分散数据并共享 Future。
警告
此对象是实验性的
- 参数
- 如果在 async/await 函数或 Tornado gen.coroutines 中使用此客户端,请将其设置为 True。否则,对于正常使用应保持 False。
其他客户端和调度器用于标识队列的名称。如果未给出,则会生成一个随机名称。
- client: Client(可选)
用于与调度器通信的客户端。如果未给出,将使用默认的全局客户端。
- maxsize: int(可选)
队列中允许的最大项目数。如果为 0(默认值),则队列大小不受限制。
另请参阅
变量
客户端之间共享的变量
示例
>>> from dask.distributed import Client, Queue >>> client = Client() >>> queue = Queue('x') >>> future = client.submit(f, x) >>> queue.put(future)
- get(timeout=None, batch=False, **kwargs)[source]¶
从队列获取数据
- 参数
- timeout数字、字符串或 timedelta,可选
在超时前等待的秒数。除了秒数,还可以指定字符串格式的 timedelta,例如 “200ms”。
- batch布尔值、int(可选)
如果为 True,则返回当前在队列中等待的所有元素。如果是一个整数,则返回队列中指定数量的元素。如果为 False(默认),则一次返回一个项目
- class distributed.Variable(name=None, client=None)[source]¶
分布式全局变量
这允许多个客户端通过单个可变变量相互共享 Future 和数据。所有元数据都通过调度器按顺序处理。可能发生竞态条件。
值必须是 Future 或 msgpack 可编码数据(整数、列表、字符串等)。所有数据都将保留并通过调度器发送,因此不建议发送太多数据。如果您想共享大量数据,请
scatter
它并改为共享 Future。- 参数
- 如果在 async/await 函数或 Tornado gen.coroutines 中使用此客户端,请将其设置为 True。否则,对于正常使用应保持 False。
其他客户端和调度器用于标识变量的名称。如果未给出,则会生成一个随机名称。
- client: Client(可选)
用于与调度器通信的客户端。如果未给出,将使用默认的全局客户端。
另请参阅
Queue
客户端之间共享的多生产者/多消费者队列
示例
>>> from dask.distributed import Client, Variable >>> client = Client() >>> x = Variable('x') >>> x.set(123) # docttest: +SKIP >>> x.get() # docttest: +SKIP 123 >>> future = client.submit(f, x) >>> x.set(future)
- class distributed.Lock(name=None, client=<object object>, scheduler_rpc=None, loop=None)[source]¶
分布式集中式锁
警告
这使用
distributed.Semaphore
作为后端,它容易受到租约超订的影响。对于 Lock,这意味着如果一个租约超时,两个或更多实例可能同时获取锁。要禁用租约超时,请将distributed.scheduler.locks.lease-timeout
设置为 inf,例如with dask.config.set({"distributed.scheduler.locks.lease-timeout": "inf"}): lock = Lock("x") ...
请注意,如果没有租约超时,Lock 在集群缩减或工作节点失败的情况下可能发生死锁。
- 参数
- 如果在 async/await 函数或 Tornado gen.coroutines 中使用此客户端,请将其设置为 True。否则,对于正常使用应保持 False。
要获取的锁的名称。选择相同的名称允许多个断开连接的进程协调一个锁。如果未给出,将生成一个随机名称。
- client: Client(可选)
用于与调度器通信的客户端。如果未给出,将使用默认的全局客户端。
示例
>>> lock = Lock('x') >>> lock.acquire(timeout=1) >>> # do things with protected resource >>> lock.release()
- class distributed.Event(name=None, client=None)[source]¶
分布式集中式事件,等同于 asyncio.Event
事件存储一个标志,启动时设置为 false。可以通过 set() 调用将标志设置为 true,或通过 clear() 调用将其设置回 false。每次调用 wait() 都会阻塞,直到事件标志设置为 true。
- 参数
- 如果在 async/await 函数或 Tornado gen.coroutines 中使用此客户端,请将其设置为 True。否则,对于正常使用应保持 False。
事件的名称。选择相同的名称允许两个断开连接的进程协调一个事件。如果未给出,将生成一个随机名称。
- client: Client(可选)
用于与调度器通信的客户端。如果未给出,将使用默认的全局客户端。
示例
>>> event_1 = Event('a') >>> event_1.wait(timeout=1) >>> # in another process >>> event_2 = Event('a') >>> event_2.set() >>> # now event_1 will stop waiting
- class distributed.Semaphore(max_leases=1, name=None, scheduler_rpc=None, loop=None)[source]¶
此 信号量 将跟踪调度器上的租约,这些租约可以由此类的一个实例获取和释放。如果已获取最大数量的租约,则无法再获取更多租约,并且调用者将等待直到另一个租约被释放。
租约的生命周期由超时控制。此超时由该实例的
Client
定期刷新,并在工作节点失败时提供防止死锁或资源枯竭的保护。可以使用配置选项distributed.scheduler.locks.lease-timeout
控制超时时间,验证超时的时间间隔使用选项distributed.scheduler.locks.lease-validation-interval
设置。与 Python 标准库中的 Semaphore 一个显著区别是,此实现不允许释放比获取更频繁。如果发生这种情况,会发出警告,但内部状态不会修改。
警告
在租约超时的情况下,此实现容易受到租约超订的影响。建议监视日志信息并调整上述配置选项,使其适用于用户应用程序。
- 参数
- max_leases: int(可选)
同时可以授予的最大租约数量。这有效地设置了对特定资源的并行访问数量的上限。默认为 1。
- 如果在 async/await 函数或 Tornado gen.coroutines 中使用此客户端,请将其设置为 True。否则,对于正常使用应保持 False。
要获取的信号量名称。选择相同的名称允许两个断开连接的进程进行协调。如果未给出,将生成一个随机名称。
- register: bool
如果为 True,则向调度器注册信号量。这必须在获取任何租约之前完成。如果在初始化期间未完成,也可以通过调用此类的 register 方法来完成。注册时需要等待。
- scheduler_rpc: ConnectionPool
用于连接调度器的 ConnectionPool。如果提供 None,则使用工作节点或客户端连接池。此参数主要用于测试。
- loop: IOLoop
此实例正在使用的事件循环。如果提供 None,则重用活动工作节点或客户端的循环。
备注
如果客户端尝试释放信号量但未获取租约,则会引发异常。
dask 默认假设函数是纯函数来执行它们,当在此类函数内部使用信号量获取/释放时,必须注意实际上 存在 副作用,因此,该函数不能再被视为纯函数。如果不考虑这一点,可能会导致意外行为。
示例
>>> from distributed import Semaphore ... sem = Semaphore(max_leases=2, name='my_database') ... ... def access_resource(s, sem): ... # This automatically acquires a lease from the semaphore (if available) which will be ... # released when leaving the context manager. ... with sem: ... pass ... ... futures = client.map(access_resource, range(10), sem=sem) ... client.gather(futures) ... # Once done, close the semaphore to clean up the state on scheduler side. ... sem.close()