Future

Dask 支持一个实时任务框架,它扩展了 Python 的 concurrent.futures 接口。Dask future 允许您以最少的代码修改,在 Dask 集群上扩展通用 Python 工作流。

这个接口对于任意任务调度(如 dask.delayed)很有用,但它是立即执行而不是惰性执行的,这在计算可能随时间演变的情况下提供了更大的灵活性。这些特性依赖于 dask.distributed 中的第二代任务调度器(尽管名字是 distributed,但它在单机上运行得非常好)。

示例

请访问 https://examples.dask.org.cn/futures.html 查看并运行使用 Dask future 的示例。

启动 Dask Client

您必须启动一个 Client 才能使用 future 接口。它会跟踪各种 worker 进程或线程之间的状态。

from dask.distributed import Client

client = Client()  # start local workers as processes
# or
client = Client(processes=False)  # start local workers as threads

如果您安装了 Bokeh,这会在 http://localhost:8787 启动一个诊断 Dashboard。

提交任务

Client.submit(func, *args[, key, workers, ...])

向调度器提交函数应用

Client.map(func, *iterables[, key, workers, ...])

对参数序列映射应用函数

Future.result([timeout])

等待计算完成,并将结果收集到本地进程。

您可以使用 submit 方法提交单个任务。

def inc(x):
    return x + 1

def add(x, y):
    return x + y

a = client.submit(inc, 10)  # calls inc(10) in background thread or process
b = client.submit(inc, 20)  # calls inc(20) in background thread or process

submit 函数返回一个 Future 对象,它引用一个远程结果。这个结果可能尚未完成。

>>> a
<Future: status: pending, key: inc-b8aaf26b99466a7a1980efa1ade6701d>

最终它会完成。结果会保留在远程线程/进程/worker 中,直到您明确请求将其取回。

>>> a
<Future: status: finished, type: int, key: inc-b8aaf26b99466a7a1980efa1ade6701d>

>>> a.result()  # blocks until task completes and data arrives
11

您可以将 future 作为输入传递给 submit。Dask 会自动处理依赖跟踪;一旦所有输入 future 完成,它们将被移动到一个 worker 上(如果需要),然后开始依赖于它们的计算。您无需等待输入完成即可提交新任务;Dask 会自动处理。

c = client.submit(add, a, b)  # calls add on the results of a and b

与 Python 的 map 类似,您可以使用 Client.map 对多个输入调用同一个函数。

futures = client.map(inc, range(1000))

然而,请注意,每个任务都会产生大约 1ms 的开销。如果您想对大量输入映射函数,那么可以考虑使用 dask.bagdask.dataframe

移动数据

Future.result([timeout])

等待计算完成,并将结果收集到本地进程。

Client.gather(futures[, errors, direct, ...])

从分布式内存中收集 future

Client.scatter(data[, workers, broadcast, ...])

将数据分散到分布式内存中

对于任何 future,您可以调用 .result 方法来收集结果。这将阻塞直到 future 计算完成,并在必要时将结果传回您的本地进程。

>>> c.result()
32

您可以使用 Client.gather 方法同时收集多个结果。这比按顺序对每个 future 调用 .result() 更高效。

>>> # results = [future.result() for future in futures]
>>> results = client.gather(futures)  # this can be faster

如果您有重要的本地数据想要包含在计算中,您可以将其作为普通输入包含在 submit 或 map 调用中

>>> df = pd.read_csv('training-data.csv')
>>> future = client.submit(my_function, df)

或者您可以明确地 scatter(分散)它。Scattering 会将您的数据移动到 worker,并返回一个指向该数据的 future。

>>> remote_df = client.scatter(df)
>>> remote_df
<Future: status: finished, type: DataFrame, key: bbd0ca93589c56ea14af49cba470006e>

>>> future = client.submit(my_function, remote_df)

这两种方式都能达到同样的结果,但使用 scatter 有时会更快。如果您使用进程或分布式 worker(需要数据传输),并且希望在许多计算中使用 df,这一点尤其重要。提前 scatter 数据可以避免不必要的数据移动。

对列表调用 scatter 会分散所有元素。Dask 会以轮询(round-robin)方式将这些元素均匀地分布到各个 worker 上。

>>> client.scatter([1, 2, 3])
[<Future: status: finished, type: int, key: c0a8a20f903a4915b94db8de3ea63195>,
 <Future: status: finished, type: int, key: 58e78e1b34eb49a68c65b54815d1b158>,
 <Future: status: finished, type: int, key: d3395e15f605bc35ab1bac6341a285e2>]

引用、取消和异常

Future.cancel([reason, msg])

取消运行此 future 的请求

Future.exception([timeout])

返回失败任务的异常

Future.traceback([timeout])

返回失败任务的追溯信息 (traceback)

Client.cancel(futures[, asynchronous, ...])

取消正在运行的 future 这会阻止尚未运行的 future 任务被调度,并删除已经运行的任务。

Dask 只会计算并保留存在活动 future 的结果。通过这种方式,您的本地变量定义了 Dask 中的活动内容。当您的本地 Python 会话垃圾回收一个 future 时,Dask 可以自由地删除该数据或停止正在尝试生成它的计算。

>>> del future  # deletes remote data once future is garbage collected

您也可以使用 Future.cancelClient.cancel 方法显式取消任务。

>>> future.cancel()  # deletes data even if other futures point to it

如果 future 失败,当您尝试获取结果时,Dask 会抛出远程异常和追溯信息。

def div(x, y):
    return x / y

>>> a = client.submit(div, 1, 0)  # 1 / 0 raises a ZeroDivisionError
>>> a
<Future: status: error, key: div-3601743182196fb56339e584a2bf1039>

>>> a.result()
      1 def div(x, y):
----> 2     return x / y

ZeroDivisionError: division by zero

所有依赖于出错 future 的 future 也会因相同的异常而出错。

>>> b = client.submit(inc, a)
>>> b
<Future: status: error, key: inc-15e2e4450a0227fa38ede4d6b1a952db>

您可以使用 Future.exceptionFuture.traceback 方法显式收集异常或追溯信息。

等待 Future

as_completed([futures, loop, with_results, ...])

按照 future 完成的顺序返回它们

wait(fs[, timeout, return_when])

等待直到所有/任一 future 完成

您可以使用 wait 函数等待一个或一组 future。

from dask.distributed import wait

>>> wait(futures)

这会阻塞直到所有 future 完成或出错。

您还可以使用 as_completed 函数在 future 完成时对其进行迭代。

from dask.distributed import as_completed

futures = client.map(score, x_values)

best = -1
for future in as_completed(futures):
   y = future.result()
   if y > best:
       best = y

为了提高效率,您还可以让 as_completed 在后台收集结果。

for future, result in as_completed(futures, with_results=True):
    # y = future.result()  # don't need this
   ...

或者分批收集自上次迭代以来已到达的所有 future。

for batch in as_completed(futures, with_results=True).batches():
   for future, result in batch:
       ...

此外,对于迭代算法,您可以在迭代*期间*向 as_completed 迭代器中添加更多 future。

seq = as_completed(futures)

for future in seq:
    y = future.result()
    if condition(y):
        new_future = client.submit(...)
        seq.add(new_future)  # add back into the loop

或使用 seq.update(futures) 一次添加多个 future。

即发即忘

fire_and_forget(obj)

至少运行任务一次,即使我们释放了 future

有时我们不关心收集任务的结果,只关心它可能产生的副作用,例如将结果写入文件。

>>> a = client.submit(load, filename)
>>> b = client.submit(process, a)
>>> c = client.submit(write, b, out_filename)

如上所述,Dask 会停止没有任何活动 future 的工作。它认为由于没有人持有指向此数据的指针,因此没有人关心。您可以使用 fire_and_forget 函数告诉 Dask 无论如何都要计算任务,即使没有活动 future。

from dask.distributed import fire_and_forget

>>> fire_and_forget(c)

当 future 可能会超出作用域时(例如,作为函数的一部分),这特别有用。

def process(filename):
    out_filename = 'out-' + filename
    a = client.submit(load, filename)
    b = client.submit(process, a)
    c = client.submit(write, b, out_filename)
    fire_and_forget(c)
    return  # here we lose the reference to c, but that's now ok

for filename in filenames:
    process(filename)

从另一个进程提交任务并检索结果

有时我们关心检索结果,但不一定是从同一个进程。

from distributed import Variable

var = Variable("my-result")
fut = client.submit(...)
var.set(fut)

使用 Variable 指示 Dask 在给定名称下记住此任务的结果,以便稍后可以检索它,而无需在此期间保持 Client 存活。

var = Variable("my-result")
fut = var.get()
result = fut.result()

从任务中提交任务

get_client([address, timeout, resolve_address])

在任务内部获取 client。

rejoin()

让此线程重新加入 ThreadPoolExecutor

secede()

让此任务脱离 worker 的线程池

这是一个高级功能,在通常情况下很少需要。

任务可以通过获取自己的 client 来启动其他任务。这使得复杂的、高度动态的工作负载成为可能。

from dask.distributed import get_client

def my_function(x):
    ...

    # Get locally created client
    client = get_client()

    # Do normal client operations, asking cluster for computation
    a = client.submit(...)
    b = client.submit(...)
    a, b = client.gather([a, b])

    return a + b

它还允许您设置监视其他资源(如 socket 或物理传感器)的长时间运行任务。

def monitor(device):
   client = get_client()
   while True:
       data = device.read_data()
       future = client.submit(process, data)
       fire_and_forget(future)

for device in devices:
    fire_and_forget(client.submit(monitor))

然而,每个正在运行的任务都占用一个线程,因此如果您启动许多会启动其他任务的任务,如果不小心,可能会导致系统死锁。您可以在任务内部调用 secede 函数,使其从专用线程池中脱离到不会占用 Dask worker 中槽位的管理线程。

from dask.distributed import get_client, secede

def monitor(device):
   client = get_client()
   secede()  # remove this task from the thread pool
   while True:
       data = device.read_data()
       future = client.submit(process, data)
       fire_and_forget(future)

如果您打算在等待 client 工作完成后在同一个线程中进行更多工作,您可能需要显式阻塞,直到线程能够重新加入线程池。这允许对创建的线程数量进行一定的控制,并防止同时激活过多线程,从而使您的硬件过载。

def f(n):  # assume that this runs as a task
   client = get_client()

   secede()  # secede while we wait for results to come back
   futures = client.map(func, range(n))
   results = client.gather(futures)

   rejoin()  # block until a slot is open in the thread pool
   result = analyze(results)
   return result

或者,您可以在任务内部使用正常的 compute 函数。这会自动适当地调用 secederejoin

def f(name, fn):
    df = dd.read_csv(fn)  # note that this is a dask collection
    result = df[df.name == name].count()

    # This calls secede
    # Then runs the computation on the cluster (including this worker)
    # Then blocks on rejoin, and finally delivers the answer
    result = result.compute()

    return result

协调原语

Queue([name, client, maxsize])

分布式队列

Variable([name, client])

分布式全局变量

Lock([name, client, scheduler_rpc, loop])

分布式集中锁

Event([name, client])

分布式集中事件,等同于 asyncio.Event

Semaphore([max_leases, name, scheduler_rpc, ...])

这个 semaphore 将跟踪调度器上的租约,这些租约可以被此类的实例获取和释放。

有时会出现任务、worker 或 client 需要以超出使用 future 进行正常任务调度的方式彼此协调的情况。在这些情况下,Dask 提供了额外的原语来帮助处理复杂情况。

Dask 提供了锁、事件、队列和全局变量等协调原语的分布式版本,在适当的情况下,它们与其内存中的对应物匹配。这些可以用来控制对外部资源的访问、跟踪正在进行的计算的进度,或者在多个 worker、client 和任务之间合理地通过侧通道共享数据。

这些功能对于 Dask 的常规使用来说很少是必需的。我们建议初级用户坚持使用上面介绍的更简单的 future(如 Client.submitClient.gather),而不是采用不必要的复杂技术。

队列

Queue([name, client, maxsize])

分布式队列

Dask 队列遵循标准 Python Queue 的 API,但现在在 client 之间移动 future 或小消息。队列会合理地序列化,并在必要时在远程 client 上重新连接自身。

from dask.distributed import Queue

def load_and_submit(filename):
    data = load(filename)
    client = get_client()
    future = client.submit(process, data)
    queue.put(future)

client = Client()

queue = Queue()

for filename in filenames:
    future = client.submit(load_and_submit, filename)
    fire_and_forget(future)

while True:
    future = queue.get()
    print(future.result())

队列还可以发送少量信息,任何 msgpack 可编码的内容(整数、字符串、布尔值、列表、字典等)。这对于发送小分数或管理消息很有用。

def func(x):
    try:
       ...
    except Exception as e:
        error_queue.put(str(e))

error_queue = Queue()

队列由中央调度器中介,因此不适合发送大量数据(您发送的所有内容都将通过一个中心点路由)。它们非常适合传输少量元数据或 future。这些 future 可以安全地指向更大的数据块。

>>> x = ... # my large numpy array

# Don't do this!
>>> q.put(x)

# Do this instead
>>> future = client.scatter(x)
>>> q.put(future)

# Or use futures for metadata
>>> q.put({'status': 'OK', 'stage=': 1234})

全局变量

Variable([name, client])

分布式全局变量

Variables 类似于 Queues,它们在 client 之间通信 future 和小数据。然而,Variables 只持有一个值。您可以随时获取或设置该值。

>>> var = Variable('stopping-criterion')
>>> var.set(False)

>>> var.get()
False

这通常用于在 client 之间传递停止标准或当前参数的信号。

如果您想共享大量信息,则应先 scatter 数据。

>>> parameters = np.array(...)
>>> future = client.scatter(parameters)
>>> var.set(future)

Lock([name, client, scheduler_rpc, loop])

分布式集中锁

您还可以使用 Lock 对象保持集群范围的锁。Dask Lock 具有与普通 threading.Lock 对象相同的 API,不同之处在于它们在集群范围内工作。

from dask.distributed import Lock
lock = Lock()

with lock:
    # access protected resource

您可以同时管理多个锁。Lock 可以被赋予一个一致的名称,或者您可以直接传递 lock 对象。

当您想要锁定某个已知的命名资源时,使用一致的名称很方便。

from dask.distributed import Lock

def load(fn):
    with Lock('the-production-database'):
        # read data from filename using some sensitive source
        return ...

futures = client.map(load, filenames)

传递 lock 对象也有效,当您想为特定情况创建短期锁时更简单。

from dask.distributed import Lock
lock = Lock()

def load(fn, lock=None):
    with lock:
        # read data from filename using some sensitive source
        return ...

futures = client.map(load, filenames, lock=lock)

如果您想控制对某些外部资源(如数据库或非线程安全库)的并发访问,这会很有用。

事件

Event([name, client])

分布式集中事件,等同于 asyncio.Event

Dask Event 模仿 asyncio.Event 对象,但在集群范围内工作。它们持有一个可以设置或清除的单一标志。Client 可以等待直到事件标志被设置。与 Lock 不同,每个 client 都可以设置或清除标志,并且事件没有“所有权”。

您可以使用事件来同步多个 client。

# One one client
from dask.distributed import Event

event = Event("my-event-1")
event.wait()

wait 调用会阻塞,直到事件被设置(例如,在另一个 client 中)。

# In another client
from dask.distributed import Event

event = Event("my-event-1")

# do some work

event.set()

事件可以多次设置、清除和等待。每个引用相同事件名称的等待者都会在事件设置时收到通知(不像锁那样只有第一个)。

from dask.distributed import Event

def wait_for_event(x):
   event = Event("my-event")

   event.wait()
   # at this point, all function calls
   # are in sync once the event is set

futures = client.map(wait_for_event, range(10))

Event("my-event").set()
client.gather(futures)

信号量

Semaphore([max_leases, name, scheduler_rpc, ...])

这个 semaphore 将跟踪调度器上的租约,这些租约可以被此类的实例获取和释放。

类似于单值的 Lock,也可以使用集群范围的信号量来协调和限制对敏感资源(如数据库)的访问。

from dask.distributed import Semaphore

sem = Semaphore(max_leases=2, name="database")

def access_limited(val, sem):
   with sem:
      # Interact with the DB
      return

futures = client.map(access_limited, range(10), sem=sem)
client.gather(futures)
sem.close()

Actors

Actors 允许 worker 管理快速变化的状态,而无需与中央调度器协调。这样做的好处是降低延迟(worker 到 worker 的往返延迟约为 1ms),减轻中央调度器的压力(worker 可以在彼此之间完全协调 Actors),并且还能够实现需要有状态或原地内存操作的工作流。

然而,这些好处是有代价的。调度器不了解 Actors,因此它们无法从诊断、负载均衡或弹性中受益。一旦一个 Actor 在 worker 上运行,它就永远绑定到那个 worker。如果该 worker 过载或死亡,则没有机会恢复工作负载。

由于 Actors 绕过中央调度器,它们可以实现高性能,但缺乏弹性。

示例:计数器

Actor 是一个包含状态和方法,并被提交到 worker 的类。

class Counter:
    n = 0

    def __init__(self):
        self.n = 0

    def increment(self):
        self.n += 1
        return self.n

from dask.distributed import Client

if __name__ == '__main__':
    client = Client()

    future = client.submit(Counter, actor=True)
    counter = future.result()

>>> counter
<Actor: Counter, key=Counter-afa1cdfb6b4761e616fa2cfab42398c8>

对该对象的方法调用会产生 ActorFutures,这类似于普通 Future,但只与持有 Actor 的 worker 进行交互。

>>> future = counter.increment()
>>> future
<ActorFuture>

>>> future.result()
1

属性访问是同步且阻塞的。

>>> counter.n
1

示例:参数服务器

本示例将使用参数服务器执行以下最小化:

\[\min_{p\in\mathbb{R}^{1000}} \sum_{i=1}^{1000} (p_i - 1)^2\]

这是一个简单的最小化问题,将作为说明性示例。

Dask Actor 将充当参数服务器,用于保存模型。client 将计算上述损失函数的梯度。

import numpy as np

from dask.distributed import Client
client = Client(processes=False)

class ParameterServer:
    def __init__(self):
        self.data = dict()

    def put(self, key, value):
        self.data[key] = value

    def get(self, key):
        return self.data[key]

def train(params, lr=0.1):
    grad = 2 * (params - 1)  # gradient of (params - 1)**2
    new_params = params - lr * grad
    return new_params

ps_future = client.submit(ParameterServer, actor=True)
ps = ps_future.result()

ps.put('parameters', np.random.default_rng().random(1000))
for k in range(20):
    params = ps.get('parameters').result()
    new_params = train(params)
    ps.put('parameters', new_params)
    print(new_params.mean())
    # k=0: "0.5988202981316124"
    # k=10: "0.9569236575164062"

此示例有效,并且损失函数被最小化。上面的(简单)方程被最小化,因此每个 \(p_i\) 都收敛到 1。如果需要,可以将此示例修改为使用更复杂的函数进行最小化的机器学习问题。

异步操作

所有需要与远程 worker 通信的操作都是可等待的 (awaitable)。

async def f():
    future = client.submit(Counter, actor=True)
    counter = await future  # gather actor object locally

    counter.increment()  # send off a request asynchronously
    await counter.increment()  # or wait until it was received

    n = await counter.n  # attribute access also must be awaited

通常,所有触发计算的 I/O 操作(例如 to_parquet)都应使用 compute=False 参数来避免异步阻塞。

await client.compute(ddf.to_parquet('/tmp/some.parquet', compute=False))

API

Client

Client([address, loop, timeout, ...])

连接到 Dask 集群并提交计算

Client.cancel(futures[, asynchronous, ...])

取消正在运行的 future 这会阻止尚未运行的 future 任务被调度,并删除已经运行的任务。

Client.compute(collections[, sync, ...])

在集群上计算 dask 集合

Client.gather(futures[, errors, direct, ...])

从分布式内存中收集 future

Client.get(dsk, keys[, workers, ...])

计算 dask 图

Client.get_dataset(name[, default])

如果存在,从调度器获取命名数据集。

Client.get_executor(**kwargs)

返回一个 concurrent.futures Executor,用于在此 Client 上提交任务

Client.has_what([workers])

哪些 worker 持有哪些键

Client.list_datasets(**kwargs)

列出调度器上可用的命名数据集

Client.map(func, *iterables[, key, workers, ...])

对参数序列映射应用函数

Client.ncores([workers])

每个 worker 节点上可用的线程/核心数

Client.persist(collections[, ...])

在集群上持久化 dask 集合

Client.profile([key, start, stop, workers, ...])

收集关于最近工作的统计分析信息 (profiling)

Client.publish_dataset(*args, **kwargs)

向调度器发布命名数据集

Client.rebalance([futures, workers])

在网络内重新平衡数据

Client.replicate(futures[, n, workers, ...])

设置 future 在网络内的复制数量

Client.restart([timeout, wait_for_workers])

重启所有 worker。

Client.run(function, *args[, workers, wait, ...])

在所有 worker 上运行函数,不通过任务调度系统

Client.run_on_scheduler(function, *args, ...)

在调度器进程上运行函数

Client.scatter(data[, workers, broadcast, ...])

将数据分散到分布式内存中

Client.shutdown()

关闭连接的调度器和 worker

Client.scheduler_info([n_workers])

关于集群中 worker 的基本信息

Client.submit(func, *args[, key, workers, ...])

向调度器提交函数应用

Client.unpublish_dataset(name, **kwargs)

从调度器中移除命名数据集

Client.upload_file(filename[, load])

将本地包上传到调度器和 worker

Client.who_has([futures])

存储每个 future 数据的 worker

Future

Future(key[, client, state, _id])

一个远程运行的计算

Future.add_done_callback(fn)

在 future 完成后调用其回调函数

Future.cancel([reason, msg])

取消运行此 future 的请求

Future.cancelled()

如果 future 已被取消,则返回 True

Future.done()

返回计算是否完成。

Future.exception([timeout])

返回失败任务的异常

Future.result([timeout])

等待计算完成,并将结果收集到本地进程。

Future.traceback([timeout])

返回失败任务的追溯信息 (traceback)

函数

as_completed([futures, loop, with_results, ...])

按照 future 完成的顺序返回它们

fire_and_forget(obj)

至少运行任务一次,即使我们释放了 future

get_client([address, timeout, resolve_address])

在任务内部获取 client。

secede()

让此任务脱离 worker 的线程池

rejoin()

让此线程重新加入 ThreadPoolExecutor

wait(fs[, timeout, return_when])

等待直到所有/任一 future 完成

print(*args[, sep, end, file, flush])

用于从 worker 向 client 进行远程打印的内置 print 函数的替代品。

warn(message[, category, stacklevel, source])

用于从 worker 向 client 进行远程发出警告的内置 warnings.warn() 函数的替代品。

distributed.as_completed(futures=None, loop=None, with_results=False, raise_errors=True, *, timeout=None)[source]

按照 future 完成的顺序返回它们

这会返回一个迭代器,它按照输入 future 对象完成的顺序产生它们。对迭代器调用 next 会阻塞,直到下一个 future 完成,无论顺序如何。

此外,您还可以在计算期间使用 .add 方法向此对象添加更多 future。

参数
futures: future 集合

要按照完成顺序迭代的 Future 对象列表

with_results: 布尔值 (False)

是否也等待并包含 future 的结果;在这种情况下,as_completed 会生成一个 (future, result) 元组

raise_errors: 布尔值 (True)

当 future 的结果引发异常时是否应该抛出异常;仅在 with_results=True 时影响行为。

timeout: 整数 (可选)

如果调用 __next__()__anext__(),并且从原始调用 as_completed() 起经过超时秒数后结果仍然不可用,则返回的迭代器会引发 dask.distributed.TimeoutError。如果未指定 timeout 或其值为 None,则等待时间没有限制。

示例

>>> x, y, z = client.map(inc, [1, 2, 3])  
>>> for future in as_completed([x, y, z]):  
...     print(future.result())  
3
2
4

在计算期间添加更多 future

>>> x, y, z = client.map(inc, [1, 2, 3])  
>>> ac = as_completed([x, y, z])  
>>> for future in ac:  
...     print(future.result())  
...     if random.random() < 0.5:  
...         ac.add(c.submit(double, future))  
4
2
8
3
6
12
24

可选地等待直到结果也被收集

>>> ac = as_completed([x, y, z], with_results=True)  
>>> for future, result in ac:  
...     print(result)  
2
4
3
distributed.fire_and_forget(obj)[source]

至少运行任务一次,即使我们释放了 future

在正常操作下,Dask 不会运行任何没有活动 future 的任务(这在许多情况下避免了不必要的工作)。然而,有时您只想“即发即忘”一个任务,不跟踪其 future,并期望它最终完成。您可以在一个 future 或 future 集合上使用此函数,要求 Dask 完成该任务,即使没有活动的 client 正在跟踪它。

任务完成后,结果将不会保留在内存中(除非有活动的 future),因此这仅对依赖于副作用的任务有用。

参数
objFuture, list, dict, dask 集合

您希望至少运行一次的 future

示例

>>> fire_and_forget(client.submit(func, *args))  
distributed.get_client(address=None, timeout=None, resolve_address=True) Client[source]

在任务内部获取 client。

此 client 连接到 worker 连接到的同一个调度器。

参数
address字符串, 可选

要连接的调度器的地址。默认为 worker 连接到的调度器。

timeout整数或字符串

获取 Client 的超时时间(秒)。默认为 distributed.comm.timeouts.connect 配置值。

resolve_address布尔值, 默认 True

是否将 address 解析为其规范形式。

返回值
Client

示例

>>> def f():
...     client = get_client(timeout="10s")
...     futures = client.map(lambda x: x + 1, range(10))  # spawn many tasks
...     results = client.gather(futures)
...     return sum(results)
>>> future = client.submit(f)  
>>> future.result()  
55
distributed.secede()[source]

让此任务脱离 worker 的线程池

这会为新任务打开一个新的调度槽位和一个新线程。这使得 client 能够在该节点上调度任务,这在等待其他作业完成时(例如,使用 client.gather)特别有用。

另请参阅

get_client
get_worker

示例

>>> def mytask(x):
...     # do some work
...     client = get_client()
...     futures = client.map(...)  # do some remote work
...     secede()  # while that work happens, remove ourself from the pool
...     return client.gather(futures)  # return gathered results
distributed.rejoin()[source]

让此线程重新加入 ThreadPoolExecutor

这将阻塞直到执行器中出现新的槽位。下一个完成任务的线程将离开池,以便允许此线程加入。

另请参阅

secede

离开线程池

distributed.wait(fs, timeout=None, return_when='ALL_COMPLETED')[source]

等待直到所有/任一 future 完成

参数
fsList[Future]
timeout数字, 字符串, 可选

经过多少时间后引发 dask.distributed.TimeoutError。可以是像 "10 minutes" 这样的字符串,也可以是等待的秒数。

return_when字符串, 可选

ALL_COMPLETEDFIRST_COMPLETED 之一

返回值
已完成、未完成的命名元组
distributed.print(*args, sep: str | None = ' ', end: str | None = '\n', file: TextIO | None = None, flush: bool = False) None[source]

用于从 worker 向 client 进行远程打印的内置 print 函数的替代品。如果从 Dask worker 之外调用,其参数将直接传递给 builtins.print()。如果由在 worker 上运行的代码调用,则除了本地打印外,连接到(可能远程)管理此 worker 的调度器的任何 client 都将收到一个事件,指示它们将相同的输出打印到其自己的标准输出或标准错误流。例如,用户可以通过在提交的代码中包含对此 print 函数的调用,并在本地 Jupyter notebook 或解释器会话中检查输出来对远程计算进行简单调试。

所有参数的行为与 builtins.print() 相同,但有一个例外:如果指定了 file 关键字参数,它必须是 sys.stdoutsys.stderr;不允许任意的文件类对象。

所有非关键字参数都使用 str() 转换为字符串,并写入流中,用 sep 分隔,后面跟着 endsepend 都必须是字符串;它们也可以是 None,这意味着使用默认值。如果没有给出对象,print() 将只写入 end

参数
sep字符串, 可选

插入值之间的字符串,默认为一个空格。

end字符串, 可选

追加在最后一个值后面的字符串,默认为一个换行符。

filesys.stdoutsys.stderr, 可选

默认为当前的 sys.stdout。

flush布尔值, 默认 False

是否强制刷新流。

示例

>>> from dask.distributed import Client, print
>>> client = distributed.Client(...)
>>> def worker_function():
...     print("Hello from worker!")
>>> client.submit(worker_function)
<Future: finished, type: NoneType, key: worker_function-...>
Hello from worker!
distributed.warn(message: str | Warning, category: type[Warning] | None = <class 'UserWarning'>, stacklevel: int = 1, source: typing.Any = None) None[source]

用于从 worker 向 client 进行远程发出警告的内置 warnings.warn() 函数的替代品。

如果从 Dask worker 之外调用,其参数将直接传递给 warnings.warn()。如果由在 worker 上运行的代码调用,则除了本地发出警告外,连接到(可能远程)管理此 worker 的调度器的任何 client 都将收到一个事件,指示它们发出相同的警告(取决于其自身的本地过滤器等)。当实现可能在 worker 上运行的计算时,用户可以调用此 warn 函数,以确保任何远程 client 会话都能看到其警告,例如在 Jupyter 输出单元中。

尽管所有参数都受到本地发出的警告的尊重(与 warnings.warn() 中的含义相同),但 stacklevelsource 会被 client 忽略,因为它们在 client 的线程中没有意义。

示例

>>> from dask.distributed import Client, warn
>>> client = Client()
>>> def do_warn():
...    warn("A warning from a worker.")
>>> client.submit(do_warn).result()
/path/to/distributed/client.py:678: UserWarning: A warning from a worker.
class distributed.Client(address=None, loop=None, timeout=_NoDefault.no_default, set_as_default=True, scheduler_file=None, security=None, asynchronous=False, name=None, heartbeat_interval=None, serializers=None, deserializers=None, extensions={}, direct_to_workers=None, connection_limit=512, **kwargs)[source]

连接到 Dask 集群并提交计算

Client 将用户连接到 Dask 集群。它围绕函数和 future 提供了一个异步用户界面。此类类似于 concurrent.futures 中的执行器,但也允许在 submit/map 调用中使用 Future 对象。实例化 Client 时,默认会接管所有 dask.computedask.persist 调用。

通常也可以在不指定调度器地址的情况下创建 Client,例如 Client()。在这种情况下,Client 会在后台创建一个 LocalCluster 并连接到它。在这种情况下,任何额外的关键字都将从 Client 传递到 LocalCluster。有关更多信息,请参阅 LocalCluster 文档。

参数
address: 字符串,或 Cluster

这可以是 Scheduler 服务器的地址,例如字符串 '127.0.0.1:8786',或像 LocalCluster() 这样的 cluster 对象。

loop

事件循环

timeout: 整数 (默认为配置 ``distributed.comm.timeouts.connect``)

初次连接调度器的超时时长

set_as_default: 布尔值 (True)

使用此 Client 作为全局 Dask 调度器

scheduler_file: 字符串 (可选)

指向包含调度器信息的文件路径(如果可用)

security: Security 或 布尔值, 可选

可选的安全信息。如果创建本地集群,也可以传入 True,在这种情况下将自动创建临时的自签名凭证。

asynchronous: 布尔值 (默认为 False)

如果在 async/await 函数或 Tornado gen.coroutines 中使用此 client,则设置为 True。否则对于普通使用应保持 False。

name: 字符串 (可选)

为 client 指定一个名称,该名称将包含在调度器生成的与此 client 相关的日志中。

heartbeat_interval: 整数 (可选)

向调度器发送心跳的时间间隔(毫秒)

serializers

序列化对象时要使用的方法迭代器。更多信息请参阅 序列化

deserializers

反序列化对象时要使用的方法迭代器。更多信息请参阅 序列化

extensions列表

扩展

direct_to_workers: 布尔值 (可选)

是否直接连接到 worker,或者请求调度器充当中间人。

connection_limit整数

在连接池中同时维护的打开通信通道数量

**kwargs

如果您不传递调度器地址,Client 将创建一个 LocalCluster 对象,并传递任何额外的关键字参数。

示例

在初始化时提供集群调度器节点的地址

>>> client = Client('127.0.0.1:8786')  

使用 submit 方法将单个计算发送到集群

>>> a = client.submit(add, 1, 2)  
>>> b = client.submit(add, 10, 20)  

继续对结果使用 submit 或 map 来构建更大的计算

>>> c = client.submit(add, a, b)  

使用 gather 方法收集结果。

>>> client.gather(c)  
33

您也可以不带参数调用 Client 来创建自己的本地集群。

>>> client = Client()  # makes your own local "cluster" 

额外的关键字将直接传递给 LocalCluster

>>> client = Client(n_workers=2, threads_per_worker=4)  
property amm

Active Memory Manager 的便捷访问器

as_current()[source]

线程局部、任务局部的上下文管理器,它使 Client.current 类方法返回自身。在此上下文管理器中反序列化的任何 Future 对象将自动附加到此 Client。

benchmark_hardware() dict[source]

在 worker 上运行内存、磁盘和网络带宽的基准测试

返回值
result: 字典

一个字典,将名称“disk”、“memory”和“network”映射到将大小映射到带宽的字典。这些带宽是在集群中运行计算的许多 worker 上的平均值。

call_stack(futures=None, keys=None)[source]

所有相关键的正在运行的调用栈

您可以通过在 futures= 关键字中提供 future 或集合,或者在 keys= 关键字中提供显式键列表来指定感兴趣的数据。如果两者都没有提供,将返回所有调用栈。

参数
futures列表 (可选)

future 列表,默认为所有数据

keys列表 (可选)

键名列表,默认为所有数据

示例

>>> df = dd.read_parquet(...).persist()  
>>> client.call_stack(df)  # call on collections
>>> client.call_stack()  # Or call with no arguments for all activity  
cancel(futures, asynchronous=None, force=False, reason=None, msg=None)[source]

取消正在运行的 future 这会阻止尚未运行的 future 任务被调度,并删除已经运行的任务。调用后,此结果和所有依赖的结果将不再可访问。

参数
futuresList[Future]

Future 列表

asynchronous: 布尔值

如果为 True,则 client 处于异步模式

force布尔值 (False)

即使其他 client 需要,也取消此 future

reason: 字符串

取消 future 的原因

msg字符串

将附加到已取消 future 的消息

close(timeout=_NoDefault.no_default)[source]

关闭此 client

当您的 Python 会话结束时,Client 也会自动关闭。

如果您没有带参数启动 client,例如 Client(),这也会同时关闭启动的本地集群。

参数
timeout数字

经过多少秒后引发 dask.distributed.TimeoutError

另请参阅

Client.restart
compute(collections, sync=False, optimize_graph=True, workers=None, allow_other_workers=False, resources=None, retries=0, priority=0, fifo_timeout='60s', actors=None, traverse=True, **kwargs)[source]

在集群上计算 dask 集合

参数
collectionsdask 对象的可迭代对象或单个 dask 对象

dask.array 或 dataframe 或 dask.value 对象等集合

sync布尔值 (可选)

如果为 False(默认),则返回 Future;如果为 True,则返回具体值

optimize_graph布尔值

是否优化底层图

workers字符串或字符串可迭代对象

可以执行计算的 worker 主机名集合。留空表示默认使用所有 worker(常见情况)

allow_other_workers布尔值 (默认为 False)

workers 一起使用。指示计算是否可以在不属于 workers 集合中的 worker 上执行。

retries整数 (默认为 0)

如果计算结果失败,允许的自动重试次数

priority数字

可选的任务优先级。零为默认值。优先级越高,越优先执行。

fifo_timeouttimedelta 字符串 (默认为 ’60s’)

两次调用之间允许的时间间隔,用于考虑相同的优先级

traverse布尔值 (默认为 True)

默认情况下,Dask 会遍历内置 Python 集合,寻找传递给 compute 的 Dask 对象。对于大型集合,这可能会很耗时。如果没有任何参数包含 Dask 对象,请将 traverse=False 以避免进行此遍历。

resources字典 (默认为 {})

定义此映射任务的每个实例在 worker 上所需的 resources;例如 {'GPU': 2}。有关定义资源的详细信息,请参阅 worker resources

actors布尔值或字典 (默认为 None)

这些任务是否应作为有状态 Actors 存在于 worker 上。可以在全局级别 (True/False) 或按任务级别 ({'x': True, 'y': False}) 指定。有关更多详细信息,请参阅 Actors

**kwargs

传递给图优化调用的选项

返回值
如果输入是序列,则为 Future 列表;否则为单个 future

另请参阅

Client.get

普通的同步 dask.get 函数

示例

>>> from dask import delayed
>>> from operator import add
>>> x = delayed(add)(1, 2)
>>> y = delayed(add)(x, x)
>>> xx, yy = client.compute([x, y])  
>>> xx  
<Future: status: finished, key: add-8f6e709446674bad78ea8aeecfee188e>
>>> xx.result()  
3
>>> yy.result()  
6

也支持单个参数

>>> xx = client.compute(x)  
classmethod current(allow_global=True)[source]

当在 as_client 的上下文中运行时,返回上下文本地的当前客户端。否则,返回最新初始化的 Client。如果不存在 Client 实例,则引发 ValueError。如果 allow_global 设置为 False,并且在 as_client 上下文管理器之外运行,则引发 ValueError。

参数
allow_globalbool

如果为 True,则返回默认客户端

返回值
Client

当前客户端

Raises
ValueError

如果没有设置客户端,则引发 ValueError

另请参阅

default_client

调度器仪表板的链接。

返回值
str

仪表板 URL。

示例

在您的默认网络浏览器中打开仪表板

>>> import webbrowser
>>> from distributed import Client
>>> client = Client()
>>> webbrowser.open(client.dashboard_link)
dump_cluster_state(filename: str = 'dask-cluster-dump', write_from_scheduler: bool | None = None, exclude: collections.abc.Collection[str] = (), format: Literal['msgpack', 'yaml'] = 'msgpack', **storage_options)[source]

提取整个集群状态的转储并持久化到磁盘或 URL。这仅用于调试目的。

警告:调度器(以及客户端,如果本地写入转储)上的内存使用量可能很大。在大型或长时间运行的集群上,这可能需要几分钟。在处理转储时,调度器可能无响应。

结果将存储在字典中

{
    "scheduler": {...},  # scheduler state
    "workers": {
        worker_addr: {...},  # worker state
        ...
    }
    "versions": {
        "scheduler": {...},
        "workers": {
            worker_addr: {...},
            ...
        }
    }
}
参数
filename

要写入的路径或 URL。适当的文件后缀(.msgpack.gz.yaml)将自动附加。

必须是由 fsspec.open() 支持的路径(例如 s3://my-bucket/cluster-dumpcluster-dumps/dump)。请参阅 write_from_scheduler 以控制转储是直接从调度器写入到 filename,还是通过网络发送回客户端,然后再在本地写入。

write_from_scheduler

如果为 None(默认),则根据 filename 是 URL 还是本地路径来推断:如果文件名包含 ://(例如 s3://my-bucket/cluster-dump),则为 True;否则(例如 local_dir/cluster-dump),则为 False。

如果为 True,则直接从调度器将集群状态写入 filename。如果 filename 是本地路径,则转储将写入到调度器文件系统上的该路径,因此如果调度器在临时硬件上运行,请务必小心。当调度器连接到网络文件系统或持久磁盘时,或者用于写入存储桶时非常有用。

如果为 False,则通过网络将集群状态从调度器传输回客户端,然后将其写入 filename。对于大型转储,这种效率要低得多,但当调度器无法访问任何持久存储时非常有用。

exclude

要从转储中排除的属性名称集合,例如排除代码、回溯、日志等。

默认为排除 run_spec,即序列化的用户代码。这通常不是调试所必需的。要允许对此进行序列化,请传入一个空元组。

format

可以是 "msgpack""yaml"。如果使用 msgpack(默认),则输出将存储在 gzipped 文件中作为 msgpack。

要读取

import gzip, msgpack
with gzip.open("filename") as fd:
    state = msgpack.unpack(fd)

or

import yaml
try:
    from yaml import CLoader as Loader
except ImportError:
    from yaml import Loader
with open("filename") as fd:
    state = yaml.load(fd, Loader=Loader)
**storage_options

写入 URL 时传递给 fsspec.open() 的任何附加参数。

forward_logging(logger_name=None, level=0)[source]

开始将给定记录器(默认为根记录器)及其下的所有记录器从工作任务转发到客户端进程。每当指定名称的记录器在工作端处理 LogRecord 时,该记录器将被序列化,发送到客户端,并由客户端上具有相同名称的记录器处理。

请注意,仅当工作端记录器的级别设置得当时,它们才会处理 LogRecord,并且仅当客户端记录器自己的级别设置得同样适当时,它才会发出转发的 LogRecord。例如,如果您提交的任务向记录器“foo”记录了一条 DEBUG 消息,那么为了让 forward_logging() 在客户端会话中发出该消息,您必须确保记录器“foo”在工作进程中将其级别设置为 DEBUG(或更低),并且在客户端进程中也是如此。

参数
logger_namestr, optional

要开始转发的记录器的名称。logging 模块分层命名系统的常规规则适用。例如,如果 name"foo",则不仅 "foo",而且 "foo.bar""foo.baz" 等都将被转发。如果 nameNone,则表示根记录器,因此所有记录器都将被转发。

请注意,记录器仅当记录器的级别足以处理 LogRecord 时,才会转发给定的 LogRecord。

levelstr | int, optional

可选地限制转发到此级别或更高级别的 LogRecord,即使转发的记录器自己的级别较低。

示例

为了示例的目的,假设我们像用户那样配置客户端日志:将一个 StreamHandler 附加到根记录器,输出级别为 INFO,输出格式简单。

import logging
import distributed
import io, yaml

TYPICAL_LOGGING_CONFIG = '''
version: 1
handlers:
  console:
    class : logging.StreamHandler
    formatter: default
    level   : INFO
formatters:
  default:
    format: '%(asctime)s %(levelname)-8s [worker %(worker)s] %(name)-15s %(message)s'
    datefmt: '%Y-%m-%d %H:%M:%S'
root:
  handlers:
    - console
'''
config = yaml.safe_load(io.StringIO(TYPICAL_LOGGING_CONFIG))
logging.config.dictConfig(config)

现在创建一个客户端并开始将根记录器从工作器转发回我们的本地客户端进程。

>>> client = distributed.Client()
>>> client.forward_logging()  # forward the root logger at any handled level

然后提交一个在工作器上进行错误日志记录的任务。我们看到客户端 StreamHandler 的输出。

>>> def do_error():
...     logging.getLogger("user.module").error("Hello error")
...     return 42
>>> client.submit(do_error).result()
2022-11-09 03:43:25 ERROR    [worker tcp://127.0.0.1:34783] user.module     Hello error
42

请注意,dask 还向转发的 LogRecord 添加了一个属性 "worker",我们的自定义格式化程序使用了该属性。这对于识别哪个工作器记录了错误非常有用。

一个值得强调的细微之处是:即使我们的客户端根记录器配置为 INFO 级别,工作端根记录器仍然具有其默认的 ERROR 级别,因为我们尚未在工作器上进行任何显式的日志配置。因此,工作端 INFO 日志将不会被转发,因为它们根本没有被处理。

>>> def do_info_1():
...     # no output on the client side
...     logging.getLogger("user.module").info("Hello info the first time")
...     return 84
>>> client.submit(do_info_1).result()
84

需要在客户端将记录器级别设置为 INFO,然后 info 消息才会被处理并转发到客户端。换句话说,客户端转发日志的“有效”级别是每个记录器在客户端和工作端级别中的最大值。

>>> def do_info_2():
...     logger = logging.getLogger("user.module")
...     logger.setLevel(logging.INFO)
...     # now produces output on the client side
...     logger.info("Hello info the second time")
...     return 84
>>> client.submit(do_info_2).result()
2022-11-09 03:57:39 INFO     [worker tcp://127.0.0.1:42815] user.module     Hello info the second time
84
futures_of(futures)[source]

futures_of 的包装方法

参数
futurestuple

Future 列表

gather(futures, errors='raise', direct=None, asynchronous=None)[source]

从分布式内存中收集 future

接受一个 future、futures 的嵌套容器、迭代器或队列。返回类型将与输入类型匹配。

参数
futuresfutures 集合

这可以是一个包含 Future 对象的可能嵌套的集合。集合可以是列表、集合或字典。

errorsstring

如果 future 出错,是要引发异常(‘raise’)还是跳过其在输出集合中的包含(‘skip’)

directboolean

是直接连接到工作器,还是要求调度器充当中介。这也可以在创建 Client 时设置。

asynchronous: 布尔值

如果为 True,则 client 处于异步模式

返回值
results: 与输入类型相同的集合,但现在包含
已收集的结果,而不是 futures

另请参阅

Client.scatter

将数据发送到集群

示例

>>> from operator import add  
>>> c = Client('127.0.0.1:8787')  
>>> x = c.submit(add, 1, 2)  
>>> c.gather(x)  
3
>>> c.gather([x, [x], x])  # support lists and dicts 
[3, [3], 3]
get(dsk, keys, workers=None, allow_other_workers=None, resources=None, sync=True, asynchronous=None, direct=None, retries=None, priority=0, fifo_timeout='60s', actors=None, **kwargs)[source]

计算 dask 图

参数
dskdict
keysobject, 或对象的嵌套列表
workers字符串或字符串可迭代对象

一组工作器地址或主机名,可在其上执行计算。留空则默认为所有工作器(常见情况)

allow_other_workers布尔值 (默认为 False)

workers 一起使用。表示计算是否可以在 workers 集合之外的工作器上执行。

resources字典 (默认为 {})

定义此映射任务的每个实例在工作器上所需的 resources;例如 {'GPU': 2}。有关定义资源的详细信息,请参阅工作器资源

sync布尔值 (可选)

如果为 False 则返回 Futures,如果为 True(默认)则返回具体值。

asynchronous: 布尔值

如果为 True,则 client 处于异步模式

directbool

是直接连接到工作器,还是要求调度器充当中介。这也可以在创建 Client 时设置。

retries整数 (默认为 0)

如果计算结果失败,允许的自动重试次数

priority数字

可选的任务优先级。零为默认值。优先级越高,越优先执行。

fifo_timeouttimedelta 字符串 (默认为 ’60s’)

两次调用之间允许的时间间隔,用于考虑相同的优先级

actors布尔值或字典 (默认为 None)

这些任务是否应作为有状态 Actors 存在于 worker 上。可以在全局级别 (True/False) 或按任务级别 ({'x': True, 'y': False}) 指定。有关更多详细信息,请参阅 Actors

返回值
results

如果 'sync' 为 True,则返回结果。否则,返回已知数据。如果 'sync' 为 False,则返回已知数据。否则,返回结果

另请参阅

Client.compute

计算异步集合

示例

>>> from operator import add  
>>> c = Client('127.0.0.1:8787')  
>>> c.get({'x': (add, 1, 2)}, 'x')  
3
get_dataset(name, default=_NoDefault.no_default, **kwargs)[source]

如果存在,从调度器获取命名数据集。如果不存在,则返回默认值或引发 KeyError。

参数
namestr

要检索的数据集的名称

defaultstr

可选,默认未设置。如果已设置,则如果名称不存在,则不引发 KeyError,而是返回此默认值。

kwargsdict

传递给 _get_dataset 的附加关键字参数

返回值
如果存在,则从调度器获取数据集
get_events(topic: str | None = None)[source]

检索结构化主题日志

参数
topicstr, optional

要检索事件的主题日志名称。如果没有提供 topic,则返回所有主题的日志。

get_executor(**kwargs)[source]

返回一个 concurrent.futures Executor,用于在此 Client 上提交任务

参数
**kwargs

任何与 submit() 或 map() 兼容的参数,例如 workersresources

返回值
ClientExecutor

一个与 concurrent.futures API 完全兼容的 Executor 对象。

get_metadata(keys, default=_NoDefault.no_default)[source]

从调度器获取任意元数据

请参阅 set_metadata 获取包含示例的完整文档字符串

参数
keyskey 或列表

要访问的键。如果是一个列表,则获取嵌套集合中的元素。

defaultoptional

如果键不存在,则返回此值。如果未提供此值,则如果键不存在,将引发 KeyError。

另请参阅

Client.set_metadata
get_scheduler_logs(n=None)[source]

从调度器获取日志

参数
nint

要检索的日志数量。默认最多为 10000 条,可通过 distributed.admin.log-length 配置值进行配置。

返回值
日志按倒序返回(最新的在前)
get_task_stream(start=None, stop=None, count=None, plot=False, filename='task-stream.html', bokeh_resources=None)[source]

从调度器获取任务流数据

这会收集仪表板诊断“任务流”图表中存在的数据。它包括特定持续时间内每个任务的开始、停止、传输和反序列化时间。

请注意,任务流诊断默认情况下不运行。您可能希望在开始工作之前调用此函数一次以确保开始记录,然后在完成工作后再调用一次。

参数
start数字或字符串

您希望何时开始记录。如果是一个数字,则应为调用 time() 的结果。如果是一个字符串,则应为距离当前时间的时间差,例如 '60s' 或 '500 ms'。

stop数字或字符串

您希望何时停止记录

countint

所需的记录数量,如果同时指定了 start 和 stop,则忽略此参数

plotboolean, str

如果为 true,则也返回 Bokeh 图;如果 plot == 'save',则将图保存到文件。

filenamestr (optional)

如果设置了 plot='save',则保存到的文件名

bokeh_resourcesbokeh.resources.Resources (optional)

指定资源组件是 INLINE 还是 CDN

返回值
L: List[Dict]

另请参阅

get_task_stream

此方法的上下文管理器版本

示例

>>> client.get_task_stream()  # prime plugin if not already connected
>>> x.compute()  # do some work
>>> client.get_task_stream()
[{'task': ...,
  'type': ...,
  'thread': ...,
  ...}]

传入 plot=Trueplot='save' 关键字参数以获取 Bokeh 图

>>> data, figure = client.get_task_stream(plot='save', filename='myfile.html')

或者考虑使用上下文管理器

>>> from dask.distributed import get_task_stream
>>> with get_task_stream() as ts:
...     x.compute()
>>> ts.data
[...]
get_versions(check: bool = False, packages: collections.abc.Sequence[str] | None = None) distributed.client.VersionsDict | collections.abc.Coroutine[Any, Any, distributed.client.VersionsDict][source]

返回调度器、所有工作器和我自己的版本信息

参数
check

如果所有必需和可选包不匹配,则引发 ValueError

packages

要检查的额外包名称

示例

>>> c.get_versions()  
>>> c.get_versions(packages=['sklearn', 'geopandas'])  
get_worker_logs(n=None, workers=None, nanny=False)[source]

从工作器获取日志

参数
nint

要检索的日志数量。默认最多为 10000 条,可通过 distributed.admin.log-length 配置值进行配置。

workersiterable

要检索的工作器地址列表。默认获取所有工作器。

nannybool, default False

是获取工作器(False)还是 nanny(True)的日志。如果指定此项,workers 中的地址仍应是工作器地址,而不是 nanny 地址。

返回值
将工作器地址映射到日志的字典。
日志按倒序返回(最新的在前)
has_what(workers=None, **kwargs)[source]

哪些 worker 持有哪些键

这会返回每个工作器内存中持有的数据的键。

参数
workerslist (optional)

工作器地址列表,默认为所有工作器

**kwargsdict

远程函数的可选关键字参数

示例

>>> x, y, z = c.map(inc, [1, 2, 3])  
>>> wait([x, y, z])  
>>> c.has_what()  
{'192.168.1.141:46784': ['inc-1c8dd6be1c21646c71f76c16d09304ea',
                         'inc-fd65c238a7ea60f6a01bf4c8a5fcf44b',
                         'inc-1e297fc27658d7b67b3a758f16bcf47a']}
list_datasets(**kwargs)[source]

列出调度器上可用的命名数据集

log_event(topic: str | collections.abc.Collection[str], msg: Any)[source]

在给定主题下记录事件

参数
topicstr, list[str]

记录事件的主题名称。要在多个主题下记录相同的事件,请传递一个主题名称列表。

msg

要记录的事件消息。请注意,此消息必须可由 msgpack 序列化。

示例

>>> from time import time
>>> client.log_event("current-time", time())
map(func: Callable, *iterables: collections.abc.Collection, key: str | list | None = None, workers: str | collections.abc.Iterable[str] | None = None, retries: int | None = None, resources: dict[str, Any] | None = None, priority: int = 0, allow_other_workers: bool =False, fifo_timeout: str ='100 ms', actor: bool =False, actors: bool =False, pure: bool =True, batch_size=None, **kwargs)[source]

对参数序列映射应用函数

参数可以是普通对象或 Futures

参数
funccallable

计划执行的可调用对象。如果 func 返回协程,它将在工作器的主事件循环上运行。否则 func 将在工作器的任务执行器池中运行(有关详细信息,请参阅 Worker.executors)。

iterables可迭代对象

要映射处理的类列表对象。它们的长度应相同。

keystr, list

如果为字符串,则为任务名称的前缀。如果为列表,则为显式名称。

workers字符串或字符串可迭代对象

可以执行计算的 worker 主机名集合。留空表示默认使用所有 worker(常见情况)

retries整数 (默认为 0)

如果任务失败,允许的自动重试次数

resources字典 (默认为 {})

定义此映射任务的每个实例在 worker 上所需的 resources;例如 {'GPU': 2}。有关定义资源的详细信息,请参阅 worker resources

priority数字

可选的任务优先级。零为默认值。优先级越高,越优先执行。

allow_other_workers布尔值 (默认为 False)

workers 一起使用。指示计算是否可以在不属于 workers 集合中的 worker 上执行。

fifo_timeoutstr timedelta (default ‘100ms’)

两次调用之间允许的时间间隔,用于考虑相同的优先级

actorbool (default False)

这些任务是否应作为有状态的 actor 存在于工作器上。有关详细信息,请参阅Actors

actorsbool (default False)

actor 的别名

purebool (defaults to True)

函数是否为纯函数。对于 np.random.random 等非纯函数,请设置为 pure=False。请注意,如果 actorpure 关键字参数都设置为 True,则 pure 的值将恢复为 False,因为 actor 是有状态的。有关详细信息,请参阅纯函数(默认)

batch_sizeint, optional (default: just one batch whose size is the entire iterable)

将任务以至多 batch_size 的批量大小提交给调度器。批量大小的权衡在于,大批量可以避免更多的每批量开销,但过大的批量可能需要很长时间才能提交,并可能不合理地延迟集群开始处理。

**kwargsdict

要发送给函数的额外关键字参数。大型值将显式包含在任务图中。

返回值
Future 列表、迭代器或队列,取决于输入的类型。
inputs.

另请参阅

Client.submit

提交单个函数

Notes

当前的任务图解析实现会查找 key 的出现并将其替换为相应的 Future 结果。如果作为参数传递给任务的字符串与集群上已存在的某个 key 匹配,这可能导致不需要的字符串替换。为了避免这种情况,如果手动设置 key,则需要使用唯一的值。请参阅https://github.com/dask/dask/issues/9969 以跟踪此问题的解决进度。

示例

>>> L = client.map(func, sequence)  
nbytes(keys=None, summary=True, **kwargs)[source]

集群上每个键占用的字节数

这是通过 sys.getsizeof 测量得出的,可能无法准确反映真实成本。

参数
keys列表 (可选)

键列表,默认为所有键

summaryboolean, (optional)

将键汇总为键类型

**kwargsdict

远程函数的可选关键字参数

另请参阅

Client.who_has

示例

>>> x, y, z = c.map(inc, [1, 2, 3])  
>>> c.nbytes(summary=False)  
{'inc-1c8dd6be1c21646c71f76c16d09304ea': 28,
 'inc-1e297fc27658d7b67b3a758f16bcf47a': 28,
 'inc-fd65c238a7ea60f6a01bf4c8a5fcf44b': 28}
>>> c.nbytes(summary=True)  
{'inc': 84}
ncores(workers=None, **kwargs)

每个 worker 节点上可用的线程/核心数

参数
workerslist (optional)

我们特别关心的工作器列表。留空以接收所有工作器的信息。

**kwargsdict

远程函数的可选关键字参数

示例

>>> c.nthreads()  
{'192.168.1.141:46784': 8,
 '192.167.1.142:47548': 8,
 '192.167.1.143:47329': 8,
 '192.167.1.144:37297': 8}
normalize_collection(collection)[source]

用已存在的 futures 替换集合的任务(如果它们存在)

这会根据调度器中已知的 futures 对集合的任务图中的任务进行规范化。它返回一个集合副本,其任务图包含重叠的 futures。

参数
collectiondask object

dask.array 或 dataframe 或 dask.value 对象等集合

返回值
collectiondask object

集合,其任务被任何已存在的 futures 替换。

另请参阅

Client.persist

触发集合任务的计算

示例

>>> len(x.__dask_graph__())  # x is a dask collection with 100 tasks  
100
>>> set(client.futures).intersection(x.__dask_graph__())  # some overlap exists  
10
>>> x = client.normalize_collection(x)  
>>> len(x.__dask_graph__())  # smaller computational graph  
20
nthreads(workers=None, **kwargs)[source]

每个 worker 节点上可用的线程/核心数

参数
workerslist (optional)

我们特别关心的工作器列表。留空以接收所有工作器的信息。

**kwargsdict

远程函数的可选关键字参数

示例

>>> c.nthreads()  
{'192.168.1.141:46784': 8,
 '192.167.1.142:47548': 8,
 '192.167.1.143:47329': 8,
 '192.167.1.144:37297': 8}
persist(collections, optimize_graph=True, workers=None, allow_other_workers=None, resources=None, retries=None, priority=0, fifo_timeout='60s', actors=None, **kwargs)[source]

在集群上持久化 dask 集合

在集群后台启动集合的计算。提供一个新的 dask 集合,它在语义上与前一个集合相同,但现在基于当前正在执行的 futures。

参数
collectionssequence 或单个 dask 对象

dask.array 或 dataframe 或 dask.value 对象等集合

optimize_graph布尔值

是否优化底层图

workers字符串或字符串可迭代对象

可以执行计算的 worker 主机名集合。留空表示默认使用所有 worker(常见情况)

allow_other_workers布尔值 (默认为 False)

workers 一起使用。指示计算是否可以在不属于 workers 集合中的 worker 上执行。

retries整数 (默认为 0)

如果计算结果失败,允许的自动重试次数

priority数字

可选的任务优先级。零为默认值。优先级越高,越优先执行。

fifo_timeouttimedelta 字符串 (默认为 ’60s’)

两次调用之间允许的时间间隔,用于考虑相同的优先级

resources字典 (默认为 {})

定义此映射任务的每个实例在 worker 上所需的 resources;例如 {'GPU': 2}。有关定义资源的详细信息,请参阅 worker resources

actors布尔值或字典 (默认为 None)

这些任务是否应作为有状态 Actors 存在于 worker 上。可以在全局级别 (True/False) 或按任务级别 ({'x': True, 'y': False}) 指定。有关更多详细信息,请参阅 Actors

返回值
集合列表,或单个集合,取决于输入类型。

另请参阅

Client.compute

示例

>>> xx = client.persist(x)  
>>> xx, yy = client.persist([x, y])  
processing(workers=None)[source]

当前在每个工作器上运行的任务

参数
workerslist (optional)

工作器地址列表,默认为所有工作器

示例

>>> x, y, z = c.map(inc, [1, 2, 3])  
>>> c.processing()  
{'192.168.1.141:46784': ['inc-1c8dd6be1c21646c71f76c16d09304ea',
                         'inc-fd65c238a7ea60f6a01bf4c8a5fcf44b',
                         'inc-1e297fc27658d7b67b3a758f16bcf47a']}
profile(key=None, start=None, stop=None, workers=None, merge_workers=True, plot=False, filename=None, server=False, scheduler=False)[source]

收集关于最近工作的统计分析信息 (profiling)

参数
keystr

要选择的键前缀,通常是 'inc' 等函数名称。留空 None 以收集所有数据。

starttime
stoptime
workerslist

限制配置信息的工作器列表

serverbool

如果为 True,则返回工作器管理线程的配置文件,而不是工作器线程的配置文件。这对于分析 Dask 本身而非用户代码非常有用。

schedulerbool

如果为 True,则返回调度器管理线程而非工作器的配置信息。这对于分析 Dask 自身的调度非常有用。

plotboolean 或 string

是否返回图对象

filenamestr

保存图的文件名

示例

>>> client.profile()  # call on collections
>>> client.profile(filename='dask-profile.html')  # save to html file
publish_dataset(*args, **kwargs)[source]

向调度器发布命名数据集

这会将对 dask 集合或 future 列表的命名引用存储在调度器上。其他 Client 可以通过 get_dataset 下载这些集合或 futures。

数据集不会立即计算。您可能希望在发布数据集之前调用 Client.persist

参数
argslist of objects to publish as name
kwargsdict

要在调度器上发布的命名集合

返回值
None

示例

发布客户端

>>> df = dd.read_csv('s3://...')  
>>> df = c.persist(df) 
>>> c.publish_dataset(my_dataset=df)  

另一种调用方式 >>> c.publish_dataset(df, name=’my_dataset’)

接收客户端

>>> c.list_datasets()  
['my_dataset']
>>> df2 = c.get_dataset('my_dataset')  
rebalance(futures=None, workers=None, **kwargs)[source]

在网络内重新平衡数据

在工作器之间移动数据以大致平衡内存负担。这会影响键/工作器的子集或整个网络,具体取决于关键字参数。

有关算法和配置选项的详细信息,请参阅调度器端的匹配方法 rebalance()

Warning

此操作通常未针对调度器的正常运行进行充分测试。不建议在等待计算时使用此操作。

参数
futureslist, optional

要平衡的 futures 列表,默认为所有数据

workerslist, optional

要在其上平衡的工作器列表,默认为所有工作器

**kwargsdict

函数的可选关键字参数

register_plugin(plugin: distributed.diagnostics.plugin.NannyPlugin | distributed.diagnostics.plugin.SchedulerPlugin | distributed.diagnostics.plugin.WorkerPlugin, name: str | None = None, idempotent: bool | None = None)[source]

注册插件。

请参阅 https://distributed.dask.org.cn/en/latest/plugins.html

参数
plugin

要注册的 nanny、调度器或工作器插件。

name

插件的名称;如果为 None,则从插件实例获取名称,如果不存在则自动生成。

idempotent

如果已存在具有给定名称的插件,则不重新注册。如果为 None,则使用 plugin.idempotent(如果已定义),否则为 False。

register_scheduler_plugin(plugin: distributed.diagnostics.plugin.SchedulerPlugin, name: str | None = None, idempotent: bool | None = None)[source]

注册调度器插件。

自版本 2023.9.2 起已弃用: 请改用 Client.register_plugin()

请参阅 https://distributed.dask.org.cn/en/latest/plugins.html#scheduler-plugins

参数
pluginSchedulerPlugin

要传递给调度器的 SchedulerPlugin 实例。

namestr

插件的名称;如果为 None,则从插件实例获取名称,如果不存在则自动生成。

idempotentbool

如果已存在具有给定名称的插件,则不重新注册。

register_worker_callbacks(setup=None)[source]

为所有当前和将来的工作器注册一个 setup 回调函数。

这会为此集群中的工作器注册一个新的 setup 函数。该函数将立即在所有当前连接的工作器上运行。它还将在将来添加的任何工作器连接时运行。可以注册多个 setup 函数 - 这些函数将按添加顺序调用。

如果函数接受一个名为 dask_worker 的输入参数,则该变量将填充工作器本身。

参数
setupcallable(dask_worker: Worker) -> None

要在所有工作器上注册并运行的函数

register_worker_plugin(plugin: distributed.diagnostics.plugin.NannyPlugin | distributed.diagnostics.plugin.WorkerPlugin, name: str | None = None, nanny: bool | None =None)[source]

为所有当前和将来的工作器注册一个生命周期工作器插件。

自版本 2023.9.2 起已弃用: 请改用 Client.register_plugin()

这会注册一个新对象来处理此集群中工作器的 setup、任务状态转换和 teardown。插件将在所有当前连接的工作器上实例化自身。它还将在将来连接的任何工作器上运行。

插件可能包含方法 setupteardowntransitionrelease_key。请参阅 dask.distributed.WorkerPlugin 类或下面的示例以了解接口和文档字符串。它必须可以使用 pickle 或 cloudpickle 模块序列化。

如果插件具有 name 属性,或者使用了 name= 关键字参数,则将控制幂等性。如果已注册了同名插件,则将其移除并替换为新的插件。

对于插件的替代方案,您还可以考虑预加载脚本。

参数
pluginWorkerPlugin or NannyPlugin

要注册的 WorkerPlugin 或 NannyPlugin 实例。

namestr, optional

插件的名称。注册同名插件不会产生任何效果。如果插件没有 name 属性,则使用随机名称。

nannybool, optional

是向工作器还是向 nanny 注册插件。

另请参阅

distributed.WorkerPlugin
unregister_worker_plugin

示例

>>> class MyPlugin(WorkerPlugin):
...     def __init__(self, *args, **kwargs):
...         pass  # the constructor is up to you
...     def setup(self, worker: dask.distributed.Worker):
...         pass
...     def teardown(self, worker: dask.distributed.Worker):
...         pass
...     def transition(self, key: str, start: str, finish: str,
...                    **kwargs):
...         pass
...     def release_key(self, key: str, state: str, cause: str | None, reason: None, report: bool):
...         pass
>>> plugin = MyPlugin(1, 2, 3)
>>> client.register_plugin(plugin)

可以使用 get_worker 函数访问插件

>>> client.register_plugin(other_plugin, name='my-plugin')
>>> def f():
...    worker = get_worker()
...    plugin = worker.plugins['my-plugin']
...    return plugin.my_state
>>> future = client.run(f)
replicate(futures, n=None, workers=None, branching_factor=2, **kwargs)[source]

设置 future 在网络内的复制数量

将数据复制到多个工作器上。这有助于广播频繁访问的数据并提高弹性。

这会在网络中对每一块数据分别执行数据的树状复制。此操作将阻塞直到完成。它不保证数据会复制到将来的工作器上。

Note

此方法与活动内存管理器的 ReduceReplicas 策略不兼容。如果要使用它,必须首先禁用该策略或完全禁用 AMM。

参数
futureslist of futures

我们希望复制的 futures

nint, optional

要在集群上复制数据的进程数。默认为所有进程。

workerslist of worker addresses

我们希望限制复制范围的工作器列表。默认为所有工作器。

branching_factorint, optional

每一代中可以复制数据的工作器数量

**kwargsdict

远程函数的可选关键字参数

另请参阅

Client.rebalance

示例

>>> x = c.submit(func, *args)  
>>> c.replicate([x])  # send to all workers  
>>> c.replicate([x], n=3)  # send to three workers  
>>> c.replicate([x], workers=['alice', 'bob'])  # send to specific  
>>> c.replicate([x], n=1, workers=['alice', 'bob'])  # send to one of specific workers  
>>> c.replicate([x], n=1)  # reduce replications 
restart(timeout: typing.Union[str, int, float, typing.Literal[<no_default>]] = _NoDefault.no_default, wait_for_workers: bool = True)[source]

重启所有工作器。重置本地状态。可选地等待工作器返回。

没有 nanny 的工作器将关闭,希望外部部署系统会重启它们。因此,如果不使用 nanny 且部署系统不会自动重启工作器,restart 将只会关闭所有工作器,然后超时!

restart 之后,所有连接的工作器都是新的,无论是否引发了 TimeoutError。未能及时关闭的工作器将被移除,并且将来可能会或可能不会自行关闭。

参数
timeout

如果 wait_for_workers 为 True,则等待工作节点关闭并重新启动的时长;否则仅等待工作节点关闭的时长。如果超出此时间,则会引发 asyncio.TimeoutError

wait_for_workers

是否等待所有工作节点重新连接,或者仅等待它们关闭(默认为 True)。结合使用 restart(wait_for_workers=False)Client.wait_for_workers() 可以细粒度地控制要等待多少个工作节点。

restart_workers(workers: list[str], timeout: typing.Union[str, int, float, typing.Literal[<no_default>]] = _NoDefault.no_default, raise_for_error: bool = True)[source]

重新启动指定的工作节点集合

Note

只有由 distributed.Nanny 监控的工作节点才能重新启动。有关更多详细信息,请参阅 Nanny.restart

参数
workerslist[str]

要重新启动的工作节点。这可以是一个工作节点地址列表、名称列表或两者皆有。

timeoutint | float | None

等待的秒数

raise_for_error: bool (默认为 True)

如果在 timeout 时间内未能完成工作节点的重新启动,或者因重新启动工作节点导致其他异常,是否引发 TimeoutError

返回值
dict[str, “OK” | “removed” | “timed out”]

工作节点与重启状态的映射,键将匹配通过 workers 传入的原始值。

另请参阅

Client.restart

Notes

此方法与 Client.restart() 的区别在于,此方法仅重新启动指定的工作节点集,而 Client.restart 将重新启动所有工作节点并重置集群上的本地状态(例如,释放所有键)。

此外,此方法无法妥善处理在工作节点重新启动时正在执行的任务。这些任务可能会失败或其可疑计数会增加。

示例

您可以使用以下方法获取有关活动工作节点的信息

>>> workers = client.scheduler_info()['workers']

您可以从该列表中选择一些工作节点进行重新启动

>>> client.restart_workers(workers=['tcp://address:port', ...])
retire_workers(workers: list[str] | None = None, close_workers: bool = True, **kwargs)[source]

在调度器上弃用某些工作节点

有关完整的文档字符串,请参阅 distributed.Scheduler.retire_workers()

参数
workers
close_workers
**kwargsdict

远程函数的可选关键字参数

另请参阅

dask.distributed.Scheduler.retire_workers

示例

您可以使用以下方法获取有关活动工作节点的信息

>>> workers = client.scheduler_info()['workers']

您可以从该列表中选择一些工作节点进行关闭

>>> client.retire_workers(workers=['tcp://address:port', ...])
retry(futures, asynchronous=None)[source]

重试失败的 Future

参数
futuresFuture 列表

Future 列表

asynchronous: 布尔值

如果为 True,则 client 处于异步模式

run(function, *args, workers: list[str] | None = None, wait: bool = True, nanny: bool = False, on_error: Literal['raise', 'return', 'ignore'] = 'raise', **kwargs)[source]

在所有 worker 上运行函数,不通过任务调度系统

此方法立即在所有当前已知的工作节点上调用一个函数,阻塞直到结果返回,并以工作节点地址为键返回结果字典(异步)。此方法通常用于副作用,例如收集诊断信息或安装库。

如果您的函数接受名为 dask_worker 的输入参数,则该变量将填充工作节点本身。

参数
function可调用对象 (callable)

要运行的函数

*argstuple

远程函数的可选参数

**kwargsdict

远程函数的可选关键字参数

workerslist

运行函数的工作节点。默认为所有已知工作节点。

waitboolean (可选)

如果函数是异步的,是否等待该函数完成。

nannybool, default False

是否在 nanny 上运行 function。默认情况下,函数在工作节点进程上运行。如果指定,workers 中的地址仍应是工作节点地址,而不是 nanny 地址。

on_error: “raise” | “return” | “ignore”

如果函数在工作节点上引发错误

raise

(默认)在客户端重新引发异常。其他工作节点的输出将丢失。

return

返回 Exception 对象作为工作节点的输出,而不是函数输出

ignore

忽略异常并从结果字典中移除该工作节点

示例

>>> c.run(os.getpid)  
{'192.168.0.100:9000': 1234,
 '192.168.0.101:9000': 4321,
 '192.168.0.102:9000': 5555}

使用 workers= 关键字参数将计算限制在特定的工作节点上。

>>> c.run(os.getpid, workers=['192.168.0.100:9000',
...                           '192.168.0.101:9000'])  
{'192.168.0.100:9000': 1234,
 '192.168.0.101:9000': 4321}
>>> def get_status(dask_worker):
...     return dask_worker.status
>>> c.run(get_status)  
{'192.168.0.100:9000': 'running',
 '192.168.0.101:9000': 'running}

在后台运行异步函数

>>> async def print_state(dask_worker):  
...    while True:
...        print(dask_worker.status)
...        await asyncio.sleep(1)
>>> c.run(print_state, wait=False)  
run_on_scheduler(function, *args, **kwargs)[source]

在调度器进程上运行函数

这通常用于实时调试。函数应接受一个关键字参数 dask_scheduler=,该参数将接收调度器对象本身。

参数
function可调用对象 (callable)

要在调度器进程上运行的函数

*argstuple

函数的可选参数

**kwargsdict

函数的可选关键字参数

另请参阅

Client.run

在所有工作节点上运行函数

示例

>>> def get_number_of_tasks(dask_scheduler=None):
...     return len(dask_scheduler.tasks)
>>> client.run_on_scheduler(get_number_of_tasks)  
100

在后台运行异步函数

>>> async def print_state(dask_scheduler):  
...    while True:
...        print(dask_scheduler.status)
...        await asyncio.sleep(1)
>>> c.run(print_state, wait=False)  
scatter(data, workers=None, broadcast=False, direct=None, hash=True, timeout=_NoDefault.no_default, asynchronous=None)[source]

将数据分散到分布式内存中

此方法将数据从本地客户端进程移动到分布式调度器的工作节点中。请注意,通常最好将加载数据的作业提交给工作节点,而不是在本地加载数据后再分散到工作节点。

参数
data列表、字典或对象

要分散到工作节点的数据。输出类型与输入类型匹配。

workerstuple 列表 (可选)

可选地限制数据的位置。将工作节点指定为 hostname/port 对,例如 ('127.0.0.1', 8787)

broadcastbool (默认为 False)

是否将每个数据元素发送到所有工作节点。默认情况下,我们根据核心数进行轮询。

Note

将此标志设置为 True 与 Active Memory Manager 的 ReduceReplicas 策略不兼容。如果您希望使用它,必须首先禁用该策略或完全禁用 AMM。

directbool (默认为自动检查)

是直接连接到工作器,还是要求调度器充当中介。这也可以在创建 Client 时设置。

hashbool (可选)

是否对数据进行哈希以确定键。如果为 False,则使用随机键。

timeoutnumber, 可选

经过多少秒后引发 dask.distributed.TimeoutError

asynchronous: 布尔值

如果为 True,则 client 处于异步模式

返回值
与输入类型匹配的 Future 列表、字典、迭代器或队列。

另请参阅

Client.gather

将数据收集回本地进程

Notes

分散字典时,使用 dict 键创建 Future 键。当前的任务图解析实现会查找 key 的出现并将其替换为相应的 Future 结果。如果作为任务参数传递的字符串与集群上已存在的某个 key 匹配,则可能导致不必要的替换。为避免这些情况,如果手动设置 key,则需要使用唯一值。请参阅 https://github.com/dask/dask/issues/9969 以跟踪解决此问题的进展。

示例

>>> c = Client('127.0.0.1:8787')  
>>> c.scatter(1) 
<Future: status: finished, key: c0a8a20f903a4915b94db8de3ea63195>
>>> c.scatter([1, 2, 3])  
[<Future: status: finished, key: c0a8a20f903a4915b94db8de3ea63195>,
 <Future: status: finished, key: 58e78e1b34eb49a68c65b54815d1b158>,
 <Future: status: finished, key: d3395e15f605bc35ab1bac6341a285e2>]
>>> c.scatter({'x': 1, 'y': 2, 'z': 3})  
{'x': <Future: status: finished, key: x>,
 'y': <Future: status: finished, key: y>,
 'z': <Future: status: finished, key: z>}

将数据位置限制在工作节点子集

>>> c.scatter([1, 2, 3], workers=[('hostname', 8788)])   

将数据广播到所有工作节点

>>> [future] = c.scatter([element], broadcast=True)  

使用客户端 Future 接口将分散的数据发送到并行化函数

>>> data = c.scatter(data, broadcast=True)  
>>> res = [c.submit(func, data, i) for i in range(100)]
scheduler_info(n_workers: int = 5, **kwargs: Any) distributed.objects.SchedulerInfo[source]

关于集群中 worker 的基本信息

参数
n_workers: int

要获取信息的工作节点数量。要获取所有信息,请使用 -1。

**kwargsdict

远程函数的可选关键字参数

示例

>>> c.scheduler_info()  
{'id': '2de2b6da-69ee-11e6-ab6a-e82aea155996',
 'services': {},
 'type': 'Scheduler',
 'workers': {'127.0.0.1:40575': {'active': 0,
                                 'last-seen': 1472038237.4845693,
                                 'name': '127.0.0.1:40575',
                                 'services': {},
                                 'stored': 0,
                                 'time-delay': 0.0061032772064208984}}}
set_metadata(key, value)[source]

在调度器中设置任意元数据

这允许您在中心调度器进程上存储少量数据,用于管理目的。数据应该是 msgpack 可序列化的(整数、字符串、列表、字典)。

如果键对应于一个任务,则当调度器忘记该任务时,该键将被清除。

如果键是一个列表,则假定您想使用这些键索引嵌套的字典结构。例如,如果您调用以下代码:

>>> client.set_metadata(['a', 'b', 'c'], 123)

则这与设置以下内容相同:

>>> scheduler.task_metadata['a']['b']['c'] = 123

较低级别的字典将按需创建。

另请参阅

get_metadata

示例

>>> client.set_metadata('x', 123)  
>>> client.get_metadata('x')  
123
>>> client.set_metadata(['x', 'y'], 123)  
>>> client.get_metadata('x')  
{'y': 123}
>>> client.set_metadata(['x', 'w', 'z'], 456)  
>>> client.get_metadata('x')  
{'y': 123, 'w': {'z': 456}}
>>> client.get_metadata(['x', 'w'])  
{'z': 456}
shutdown()[source]

关闭连接的调度器和 worker

注意,这可能会中断正在使用同一调度器和工作节点的其他客户端。

另请参阅

Client.close

仅关闭此客户端

start(**kwargs)[source]

在单独的线程中启动调度器运行

story(*keys_or_stimuli, on_error='raise')[source]

返回给定键或 stimulus_id 的集群范围故事

submit(func, *args, key=None, workers=None, resources=None, retries=None, priority=0, fifo_timeout='100 ms', allow_other_workers=False, actor=False, actors=False, pure=True, **kwargs)[source]

向调度器提交函数应用

参数
funccallable

要安排为 func(*args **kwargs) 执行的可调用对象。如果 func 返回一个协程,它将在工作节点的主事件循环上运行。否则,func 将在工作节点的任务执行器池中运行(有关更多信息,请参阅 Worker.executors)。

*argstuple

可选位置参数

keystr

任务的唯一标识符。默认为函数名和哈希值。

workers字符串或字符串可迭代对象

一组工作器地址或主机名,可在其上执行计算。留空则默认为所有工作器(常见情况)

resources字典 (默认为 {})

定义此映射任务的每个实例在工作器上所需的 resources;例如 {'GPU': 2}。有关定义资源的详细信息,请参阅工作器资源

retries整数 (默认为 0)

如果任务失败,允许的自动重试次数

priority数字

可选的任务优先级。零为默认值。优先级越高,越优先执行。

fifo_timeoutstr timedelta (default ‘100ms’)

两次调用之间允许的时间间隔,用于考虑相同的优先级

allow_other_workers布尔值 (默认为 False)

workers 一起使用。表示计算是否可以在 workers 集合之外的工作器上执行。

actorbool (default False)

此任务是否应作为有状态 actor 存在于工作节点上。有关更多详细信息,请参阅 Actors

actorsbool (default False)

actor 的别名

purebool (defaults to True)

函数是否为纯函数。对于 np.random.random 等非纯函数,请设置为 pure=False。请注意,如果 actorpure 关键字参数都设置为 True,则 pure 的值将恢复为 False,因为 actor 是有状态的。有关详细信息,请参阅纯函数(默认)

**kwargs
返回值
Future

如果在异步模式下运行,则返回 Future。否则返回具体值。

Raises
TypeError

如果 ‘func’ 不可调用,则引发 TypeError

ValueError

如果 ‘allow_other_workers’ 为 True 且 ‘workers’ 为 None,则引发 ValueError

另请参阅

Client.map

一次提交多个参数的计算

Notes

当前的任务图解析实现会查找 key 的出现并将其替换为相应的 Future 结果。如果作为参数传递给任务的字符串与集群上已存在的某个 key 匹配,这可能导致不需要的字符串替换。为了避免这种情况,如果手动设置 key,则需要使用唯一的值。请参阅https://github.com/dask/dask/issues/9969 以跟踪此问题的解决进度。

示例

>>> c = client.submit(add, a, b)  
subscribe_topic(topic, handler)[source]

订阅主题并为每个接收到的事件执行处理器

参数
topic: str

主题名称

handler: callable or coroutine function

一个为每个接收到的事件调用的处理器。处理器必须接受一个参数 event,它是一个元组 (timestamp, msg),其中 timestamp 指的是调度器上的时钟。

另请参阅

dask.distributed.Client.unsubscribe_topic
dask.distributed.Client.get_events
dask.distributed.Client.log_event

示例

>>> import logging
>>> logger = logging.getLogger("myLogger")  # Log config not shown
>>> client.subscribe_topic("topic-name", lambda: logger.info)
unforward_logging(logger_name=None)[source]

停止将给定日志记录器(默认为 root)从工作节点任务转发到客户端进程。

unpublish_dataset(name, **kwargs)[source]

从调度器中移除命名数据集

参数
namestr

要取消发布的数据集名称

另请参阅

Client.publish_dataset

示例

>>> c.list_datasets()  
['my_dataset']
>>> c.unpublish_dataset('my_dataset')  
>>> c.list_datasets()  
[]
unregister_scheduler_plugin(name)[source]

取消注册调度器插件

请参阅 https://distributed.dask.org.cn/en/latest/plugins.html#scheduler-plugins

参数
namestr

要取消注册的插件名称。有关更多信息,请参阅 Client.register_scheduler_plugin() 的文档字符串。

示例

>>> class MyPlugin(SchedulerPlugin):
...     def __init__(self, *args, **kwargs):
...         pass  # the constructor is up to you
...     async def start(self, scheduler: Scheduler) -> None:
...         pass
...     async def before_close(self) -> None:
...         pass
...     async def close(self) -> None:
...         pass
...     def restart(self, scheduler: Scheduler) -> None:
...         pass
>>> plugin = MyPlugin(1, 2, 3)
>>> client.register_plugin(plugin, name='foo')
>>> client.unregister_scheduler_plugin(name='foo')
unregister_worker_plugin(name, nanny=None)[source]

取消注册生命周期工作节点插件

此方法取消注册现有的工作节点插件。作为取消注册过程的一部分,将调用插件的 teardown 方法。

参数
namestr

要取消注册的插件名称。有关更多信息,请参阅 Client.register_plugin() 的文档字符串。

另请参阅

register_plugin

示例

>>> class MyPlugin(WorkerPlugin):
...     def __init__(self, *args, **kwargs):
...         pass  # the constructor is up to you
...     def setup(self, worker: dask.distributed.Worker):
...         pass
...     def teardown(self, worker: dask.distributed.Worker):
...         pass
...     def transition(self, key: str, start: str, finish: str, **kwargs):
...         pass
...     def release_key(self, key: str, state: str, cause: str | None, reason: None, report: bool):
...         pass
>>> plugin = MyPlugin(1, 2, 3)
>>> client.register_plugin(plugin, name='foo')
>>> client.unregister_worker_plugin(name='foo')
unsubscribe_topic(topic)[source]

取消订阅主题并移除事件处理器

另请参阅

dask.distributed.Client.subscribe_topic
dask.distributed.Client.get_events
dask.distributed.Client.log_event
upload_file(filename, load: bool = True)[source]

将本地包上传到调度器和 worker

此方法将本地文件发送到调度器和所有工作节点。该文件将放置在每个节点的工作目录中,请参阅配置选项 temporary-directory(默认为 tempfile.gettempdir())。

此目录将被添加到 Python 的系统路径中,以便任何 .py.egg.zip 文件都可以导入。

参数
filenamestring

要发送到工作节点的 .py.egg.zip 文件的文件名

loadbool, 可选

是否在上传过程中导入模块。默认为 True

示例

>>> client.upload_file('mylibrary.egg')  
>>> from mylibrary import myfunc  
>>> L = client.map(myfunc, seq)  
>>>
>>> # Where did that file go? Use `dask_worker.local_directory`.
>>> def where_is_mylibrary(dask_worker):
>>>     path = pathlib.Path(dask_worker.local_directory) / 'mylibrary.egg'
>>>     assert path.exists()
>>>     return str(path)
>>>
>>> client.run(where_is_mylibrary)  
wait_for_workers(n_workers: int = 5, timeout: float | None = None) None[source]

阻塞调用,等待 n 个工作节点准备就绪后继续

参数
n_workersint

工作节点的数量

timeoutnumber, 可选

经过多少秒后引发 dask.distributed.TimeoutError

who_has(futures=None, **kwargs)

存储每个 Future 数据的作节点

参数
futures列表 (可选)

Future 列表,默认为所有数据

**kwargsdict

远程函数的可选关键字参数

示例

>>> x, y, z = c.map(inc, [1, 2, 3])  
>>> wait([x, y, z])  
>>> c.who_has()  
{'inc-1c8dd6be1c21646c71f76c16d09304ea': ['192.168.1.141:46784'],
 'inc-1e297fc27658d7b67b3a758f16bcf47a': ['192.168.1.141:46784'],
 'inc-fd65c238a7ea60f6a01bf4c8a5fcf44b': ['192.168.1.141:46784']}
>>> c.who_has([x, y])  
{'inc-1c8dd6be1c21646c71f76c16d09304ea': ['192.168.1.141:46784'],
 'inc-1e297fc27658d7b67b3a758f16bcf47a': ['192.168.1.141:46784']}
write_scheduler_file(scheduler_file)

将调度器信息写入 json 文件。

这有助于使用文件系统轻松共享调度器信息。可以使用调度器文件实例化一个使用同一调度器的第二个客户端。

参数
scheduler_filestr

写入调度器文件的路径。

示例

>>> client = Client()  
>>> client.write_scheduler_file('scheduler.json')  
# connect to previous client's scheduler
>>> client2 = Client(scheduler_file='scheduler.json')  
class distributed.Future(key, client=None, state=None, _id=None)

一个远程运行的计算

Future 是远程工作节点上运行的结果的本地代理。用户在本地 Python 进程中管理 Future 对象,以确定在更大集群中发生的事情。

Note

用户不应手动实例化 Future。这可能导致状态损坏和集群死锁。

参数
key: str, or tuple

此 Future 引用的远程数据的键

client: Client

应拥有此 Future 的 Client。默认为 _get_global_client()。

inform: bool

是否通知调度器我们需要此 Future 的更新

state: FutureState

Future 的状态

另请参阅

Client

创建 Future

示例

Future 通常来自 Client 计算

>>> my_future = client.submit(add, 1, 2)  

我们可以跟踪 Future 的进度和结果

>>> my_future  
<Future: status: finished, key: add-8f6e709446674bad78ea8aeecfee188e>

我们可以从 Future 中获取结果或异常和回溯

>>> my_future.result()  
add_done_callback(fn)

在 future 完成后调用其回调函数

回调函数 fn 应将 Future 作为其唯一参数。无论 Future 是成功完成、出错还是被取消,都将调用此回调函数。

回调函数在单独的线程中执行。

参数
fn可调用对象 (callable)

要调用的方法或函数

cancel(reason=None, msg=None, **kwargs)

取消运行此 future 的请求

另请参阅

Client.cancel
cancelled()

如果 future 已被取消,则返回 True

返回值
bool

如果 Future 被“取消”,则为 True,否则为 False

done()

返回计算是否完成。

返回值
bool

如果计算完成,则为 True,否则为 False

exception(timeout=None, **kwargs)

返回失败任务的异常

参数
timeoutnumber, 可选

经过多少秒后引发 dask.distributed.TimeoutError

**kwargsdict

函数的可选关键字参数

返回值
Exception

引发的异常。如果在返回前经过了 timeout 秒,则会引发 dask.distributed.TimeoutError

另请参阅

Future.traceback
property executor

返回执行器,即客户端。

返回值
Client

执行器

release()

Notes

此方法可以从不同的线程调用(例如,参见 Client.get() 或 Future.__del__())

result(timeout=None)

等待计算完成,并将结果收集到本地进程。

参数
timeoutnumber, 可选

经过多少秒后引发 dask.distributed.TimeoutError

返回值
结果

计算结果。如果客户端是异步的,则为协程。

Raises
dask.distributed.TimeoutError

如果在返回前经过了 timeout 秒,则会引发 dask.distributed.TimeoutError

retry(**kwargs)

如果此 Future 失败,则重试它

另请参阅

Client.retry
property status

返回状态

返回值
str

状态

traceback(timeout=None, **kwargs)

返回失败任务的追溯信息 (traceback)

这返回一个回溯对象。您可以使用 traceback 模块检查此对象。或者,如果您调用 future.result(),此回溯将伴随引发的异常。

参数
timeoutnumber, 可选

在经过此秒数后引发 dask.distributed.TimeoutError。如果在返回前经过了 timeout 秒,则会引发 dask.distributed.TimeoutError

返回值
回溯

回溯对象。如果客户端是异步的,则为协程。

另请参阅

Future.exception

示例

>>> import traceback  
>>> tb = future.traceback()  
>>> traceback.format_tb(tb)  
[...]
property type

返回类型

class distributed.Queue(name=None, client=None, maxsize=0)

分布式队列

这允许多个客户端通过一个多生产者/多消费者队列共享 Future 或少量数据。所有元数据都通过调度器进行序列化。

队列的元素必须是 Future 或 msgpack 可编码的数据(整数、字符串、列表、字典)。所有数据都通过调度器发送,因此不宜发送大型对象。要共享大型对象,请分散数据并共享 Future。

Warning

此对象是实验性的

参数
name: 字符串 (可选)

其他客户端和调度器用于标识队列的名称。如果未给出,将生成一个随机名称。

client: Client (可选)

用于与调度器通信的 Client。如果未给出,将使用默认的全局 Client。

maxsize: int (可选)

队列中允许的项目数。如果为 0(默认值),则队列大小无限制。

另请参阅

Variable

客户端之间共享的变量

示例

>>> from dask.distributed import Client, Queue  
>>> client = Client()  
>>> queue = Queue('x')  
>>> future = client.submit(f, x)  
>>> queue.put(future)  
get(timeout=None, batch=False, **kwargs)

从队列获取数据

参数
timeoutnumber or string or timedelta, 可选

超时前等待的秒数。除了秒数,还可以指定字符串格式的时间间隔,例如“200ms”。

batchboolean, int (可选)

如果为 True,则返回队列中当前所有等待的元素。如果为一个整数,则从队列返回该数量的元素。如果为 False(默认值),则一次返回一个项目。

put(value, timeout=None, **kwargs)

将数据放入队列

参数
timeoutnumber or string or timedelta, 可选

超时前等待的秒数。除了秒数,还可以指定字符串格式的时间间隔,例如“200ms”。

qsize(**kwargs)

队列中当前元素的数量

class distributed.Variable(name=None, client=None)

分布式全局变量

这允许多个客户端通过单个可变变量相互共享 Future 和数据。所有元数据都通过调度器进行序列化。可能发生竞争条件。

值必须是 Future 或 msgpack 可编码的数据(整数、列表、字符串等)。所有数据都将保存在并通过调度器发送,因此不宜发送太多。如果要共享大量数据,请将其 scatter 然后共享 Future。

参数
name: 字符串 (可选)

其他客户端和调度器用于标识变量的名称。如果未给出,将生成一个随机名称。

client: Client (可选)

用于与调度器通信的 Client。如果未给出,将使用默认的全局 Client。

另请参阅

Queue

客户端之间共享的多生产者/多消费者队列

示例

>>> from dask.distributed import Client, Variable 
>>> client = Client()  
>>> x = Variable('x')  
>>> x.set(123)  # docttest: +SKIP
>>> x.get()  # docttest: +SKIP
123
>>> future = client.submit(f, x)  
>>> x.set(future)  
delete()

删除此变量

注意,这将影响当前指向此变量的所有客户端。

get(timeout=None, **kwargs)

获取此变量的值

参数
timeoutnumber or string or timedelta, 可选

超时前等待的秒数。除了秒数,还可以指定字符串格式的时间间隔,例如“200ms”。

set(value, timeout='30 s', **kwargs)

设置此变量的值

参数
valueFuture 或对象

必须是 Future 或 msgpack 可编码的值

class distributed.Lock(name=None, client=<object object>, scheduler_rpc=None, loop=None)

分布式集中锁

Warning

此实现使用 distributed.Semaphore 作为后端,该后端容易发生租约超额预订。对于 Lock 而言,这意味着如果一个租约超时,可能会有两个或更多实例同时获取锁。要禁用租约超时,请将 distributed.scheduler.locks.lease-timeout 设置为 inf,例如:

with dask.config.set({"distributed.scheduler.locks.lease-timeout": "inf"}):
    lock = Lock("x")
    ...

请注意,如果没有租约超时,Lock 在集群缩容或工作节点失败时可能会发生死锁。

参数
name: 字符串 (可选)

要获取的锁的名称。选择相同的名称允许两个不连接的进程协调锁。如果未给出,将生成一个随机名称。

client: Client (可选)

用于与调度器通信的 Client。如果未给出,将使用默认的全局客户端。

示例

>>> lock = Lock('x')  
>>> lock.acquire(timeout=1)  
>>> # do things with protected resource
>>> lock.release()  
acquire(blocking=True, timeout=None)

获取锁

参数
blockingbool, 可选

如果为 false,则完全不在调度器中等待锁。

timeoutstring or number or timedelta, 可选

在调度器中等待锁的秒数。这不包括本地协程时间、网络传输时间等。当 blocking 为 false 时,禁止指定超时。除了秒数,还可以指定字符串格式的时间间隔,例如“200ms”。

返回值
是否成功获取锁,返回 True 或 False

示例

>>> lock = Lock('x')  
>>> lock.acquire(timeout="1s")  
class distributed.Event(name=None, client=None)

分布式集中事件,等同于 asyncio.Event

一个事件存储一个单一标志,该标志在开始时设置为 false。可以使用 set() 调用将标志设置为 true,或使用 clear() 调用将其设置为 false。每次调用 wait() 都会阻塞,直到事件标志设置为 true。

参数
name: 字符串 (可选)

事件的名称。选择相同的名称允许两个不连接的进程协调一个事件。如果未给出,将生成一个随机名称。

client: Client (可选)

用于与调度器通信的 Client。如果未给出,将使用默认的全局客户端。

示例

>>> event_1 = Event('a')  
>>> event_1.wait(timeout=1)  
>>> # in another process
>>> event_2 = Event('a')  
>>> event_2.set() 
>>> # now event_1 will stop waiting
clear()

清除事件(将其标志设置为 false)。

所有等待者现在将阻塞。

is_set()

检查事件是否已设置

set()

设置事件(将其标志设置为 true)。

所有等待者现在将被释放。

wait(timeout=None)

等待直到事件被设置。

参数
timeoutnumber or string or timedelta, 可选

在调度器中等待事件的秒数。这不包括本地协程时间、网络传输时间等。除了秒数,还可以指定字符串格式的时间间隔,例如“200ms”。

返回值
如果事件被设置则为 True,如果发生超时则为 False。

示例

>>> event = Event('a')  
>>> event.wait(timeout="1s")  
class distributed.Semaphore(max_leases=1, name=None, scheduler_rpc=None, loop=None)

信号量 将在调度器上跟踪租约,这些租约可由本类的实例获取和释放。如果已获取最大数量的租约,则无法获取更多租约,调用方将等待直到另一个租约被释放。

租约的生命周期通过超时控制。此超时由本实例的 Client 定期刷新,并在工作节点失败时提供死锁或资源饥饿的保护。超时可以使用配置选项 distributed.scheduler.locks.lease-timeout 控制,调度器验证超时的间隔使用选项 distributed.scheduler.locks.lease-validation-interval 设置。

与 Python 标准库的信号量的一个显著区别是,此实现不允许释放次数多于获取次数。如果发生这种情况,将发出警告,但内部状态不会被修改。

Warning

在发生租约超时时,此实现容易发生租约超额预订。建议监控日志信息并调整上述配置选项,使其适合用户应用程序的值。

参数
max_leases: int (可选)

同时可以授予的最大租约数量。这有效地设置了对特定资源并行访问数量的上限。默认为 1。

name: 字符串 (可选)

要获取的信号量的名称。选择相同的名称允许两个不连接的进程进行协调。如果未给出,将生成一个随机名称。

register: bool

如果为 True,则向调度器注册信号量。这必须在获取任何租约之前完成。如果未在初始化期间完成,也可以通过调用本类的 register 方法来完成。注册时需要等待。

scheduler_rpc: ConnectionPool

用于连接到调度器的 ConnectionPool。如果提供 None,则使用工作节点或客户端连接池。此参数主要用于测试。

loop: IOLoop

此实例使用的事件循环。如果提供 None,则重用活动工作节点或客户端的循环。

Notes

如果客户端尝试释放信号量但未获取租约,将引发异常。

dask 默认假定函数是纯函数执行,在此类函数中使用信号量获取/释放时,必须注意实际上存在副作用,因此,该函数不再被视为纯函数。如果不考虑这一点,可能导致意外行为。

示例

>>> from distributed import Semaphore
... sem = Semaphore(max_leases=2, name='my_database')
...
... def access_resource(s, sem):
...     # This automatically acquires a lease from the semaphore (if available) which will be
...     # released when leaving the context manager.
...     with sem:
...         pass
...
... futures = client.map(access_resource, range(10), sem=sem)
... client.gather(futures)
... # Once done, close the semaphore to clean up the state on scheduler side.
... sem.close()
acquire(timeout=None)

获取信号量。

如果内部计数器大于零,则减一并立即返回 True。如果为零,则等待 release() 调用并返回 True。

参数
timeoutnumber or string or timedelta, 可选

等待获取信号量的秒数。这不包括本地协程时间、网络传输时间等。除了秒数,还可以指定字符串格式的时间间隔,例如“200ms”。

get_value()

返回当前注册的租约数量。

release()

释放信号量。

返回值
bool

此值指示租约是否立即释放。请注意,用户不应重试此操作。在某些情况下(例如调度器过载),租约可能不会立即释放,但它将始终在使用“distributed.scheduler.locks.lease-validation-interval”和“distributed.scheduler.locks.lease-timeout”配置的特定间隔后自动释放。