Dask

Dask 是一个用于并行和分布式计算的 Python 库。 Dask 是

  • 易于使用和设置(它只是一个 Python 库)

  • 在提供规模和解锁复杂算法方面强大

  • 并且有趣 🎉

 

如何使用 Dask

Dask 提供了几种 API。选择最适合您的一种

Dask Futures 并行化任意 for 循环风格的 Python 代码,提供

  • 灵活的工具,允许您构建自定义管道和工作流

  • 强大的扩展技术,每秒处理数千个任务

  • 响应迅速的反馈,实现直观执行和有用的仪表盘

Dask future 构成了其他 Dask 工作的基础

Futures 文档 中了解更多信息,或在 Futures 示例 中查看示例

from dask.distributed import LocalCluster
client = LocalCluster().get_client()

# Submit work to happen in parallel
results = []
for filename in filenames:
    data = client.submit(load, filename)
    result = client.submit(process, data)
    results.append(result)

# Gather results back to local computer
results = client.gather(results)
_images/futures-graph.png

Dask Dataframes 并行化流行的 pandas 库,提供

  • 适用于单机大于内存的执行,允许您处理大于可用 RAM 的数据

  • 并行执行,实现更快处理

  • 适用于 TB 级数据集的分布式计算

Dask Dataframes 在这方面与 Apache Spark 相似,但使用熟悉的 pandas API 和内存模型。一个 Dask dataframe 就是不同计算机上 pandas dataframes 的集合。

DataFrame 文档 中了解更多信息,或在 DataFrame 示例 中查看示例

import dask.dataframe as dd

# Read large datasets in parallel
df = dd.read_parquet("s3://mybucket/data.*.parquet")
df = df[df.value < 0]
result = df.groupby(df.name).amount.mean()

result = result.compute()  # Compute to get pandas result
result.plot()
_images/dask-dataframe.svg

Dask Arrays 并行化流行的 NumPy 库,提供

  • 适用于单机大于内存的执行,允许您处理大于可用 RAM 的数据

  • 并行执行,实现更快处理

  • 适用于 TB 级数据集的分布式计算

Dask Arrays 允许科学家和研究人员对大型数据集执行直观而复杂的运算,但使用熟悉的 NumPy API 和内存模型。一个 Dask array 就是不同计算机上 NumPy 数组的集合。

Array 文档 中了解更多信息,或在 Array 示例 中查看示例

import dask.array as da

x = da.random.random((10000, 10000))
y = (x + x.T) - x.mean(axis=1)

z = y.var(axis=0).compute()
_images/dask-array.svg

Xarray 封装了 Dask array,是一个流行的下游项目,提供带标签的轴并同时跟踪多个 Dask array,从而实现更直观的分析。Xarray 很受欢迎,并且目前在地理空间和图像社区中占据了 Dask array 使用的大部分份额。

Xarray 文档 中了解更多信息,或在 Xarray 示例 中查看示例

import xarray as xr

ds = xr.open_mfdataset("data/*.nc")
da.groupby('time.month').mean('time').compute()
https://docs.xarray.dev/en/stable/_static/logos/Xarray_Logo_RGB_Final.png

Dask Bags 是简单的并行 Python 列表,常用于处理文本或原始 Python 对象。它们是...

  • 简单,提供易于使用的 map 和 reduce 功能

  • 低内存,以流式方式处理数据,最大限度地减少内存使用

  • 适用于预处理,特别是文本或 JSON 数据,以便之后导入 dataframes

Dask bags 在这方面与 Spark RDD 或原生的 Python 数据结构和迭代器相似。一个 Dask bag 就是不同计算机上并行处理的 Python 迭代器的集合。

Bag 文档 中了解更多信息,或在 Bag 示例 中查看示例

import dask.bag as db

# Read large datasets in parallel
lines = db.read_text("s3://mybucket/data.*.json")
records = (lines
    .map(json.loads)
    .filter(lambda d: d["value"] > 0)
)
df = records.to_dask_dataframe()

如何安装 Dask

使用 pipconda 安装 Dask 非常容易。

安装文档 中了解更多信息

pip install "dask[complete]"
conda install dask

如何部署 Dask

您可以在单台机器上使用 Dask,或将其部署到分布式硬件上。

部署文档 中了解更多信息

如果您创建一个 LocalCluster 对象,Dask 可以轻松地在您的 Python 会话中自行设置,该对象会为您设置好一切。

from dask.distributed import LocalCluster
cluster = LocalCluster()
client = cluster.get_client()

# Normal Dask work ...

或者,您可以跳过此部分,Dask 将在完全包含在您的本地进程中的线程池中运行。

Coiled 是一个商业 SaaS 产品,可在 AWS、GCP 和 Azure 等云平台上部署 Dask 集群。

import coiled
cluster = coiled.Cluster(
   n_workers=100,
   region="us-east-2",
   worker_memory="16 GiB",
   spot_policy="spot_with_fallback",
)
client = cluster.get_client()

Coiled 文档 中了解更多信息

Dask-Jobqueue 项目 在 SLURM、PBS、SGE、LSF、Torque、Condor 等流行的高性能计算作业提交系统上部署 Dask 集群。

from dask_jobqueue import PBSCluster
cluster = PBSCluster(
   cores=24,
   memory="100GB",
   queue="regular",
   account="my-account",
)
cluster.scale(jobs=100)
client = cluster.get_client()

Dask-Jobqueue 文档 中了解更多信息

Dask Kubernetes 项目 提供了一个 Dask Kubernetes Operator,用于在 Kubernetes 集群上部署 Dask。

from dask_kubernetes.operator import KubeCluster
cluster = KubeCluster(
   name="my-dask-cluster",
   image="ghcr.io/dask/dask:latest",
   resources={"requests": {"memory": "2Gi"}, "limits": {"memory": "64Gi"}},
)
cluster.scale(10)
client = cluster.get_client()

Dask Kubernetes 文档 中了解更多信息

通过示例学习

Dask 应用广泛,遍及各行各业和各种规模。只要人们在使用 Python 并因大规模数据或繁重计算而感到痛苦,就会使用 Dask。

您可以在以下来源了解更多关于 Dask 的应用

此外,我们鼓励您查阅本网站上与您应用最接近的 API 相关的参考文档。

Dask 的设计理念是易于使用功能强大。我们希望它能帮助您愉快地完成工作。