Dask

Dask 是一个用于并行和分布式计算的 Python 库。Dask…

  • 易于使用和设置 (它只是一个 Python 库)

  • 强大,能够提供扩展性并解锁复杂算法

  • 而且 有趣 🎉

 

如何使用 Dask

Dask 提供了多种 API。选择最适合你的一个

Dask Futures 可以并行化任意 for 循环风格的 Python 代码,提供

  • 灵活的工具,允许你构建自定义管道和工作流

  • 强大的扩展技术,每秒处理数千个任务

  • 响应迅速的反馈,实现直观的执行和有用的仪表盘

Dask futures 是其他 Dask 工作的基础

Futures 文档 了解更多,或在 Futures 示例 查看示例

from dask.distributed import LocalCluster
client = LocalCluster().get_client()

# Submit work to happen in parallel
results = []
for filename in filenames:
    data = client.submit(load, filename)
    result = client.submit(process, data)
    results.append(result)

# Gather results back to local computer
results = client.gather(results)
_images/futures-graph.png

Dask Dataframes 可以并行化流行的 pandas 库,提供

  • 内存溢出执行,即使在单机上,也允许你处理比可用 RAM 更大的数据

  • 并行执行,处理速度更快

  • 分布式计算,处理千兆字节级别的数据集

在这方面,Dask Dataframes 类似于 Apache Spark,但使用了熟悉的 pandas API 和内存模型。一个 Dask dataframe 只是不同计算机上 pandas dataframes 的集合。

DataFrame 文档 了解更多,或在 DataFrame 示例 查看示例

import dask.dataframe as dd

# Read large datasets in parallel
df = dd.read_parquet("s3://mybucket/data.*.parquet")
df = df[df.value < 0]
result = df.groupby(df.name).amount.mean()

result = result.compute()  # Compute to get pandas result
result.plot()
_images/dask-dataframe.svg

Dask Arrays 可以并行化流行的 NumPy 库,提供

  • 内存溢出执行,即使在单机上,也允许你处理比可用 RAM 更大的数据

  • 并行执行,处理速度更快

  • 分布式计算,处理千兆字节级别的数据集

Dask Arrays 允许科学家和研究人员对大型数据集执行直观且复杂的操作,同时使用熟悉的 NumPy API 和内存模型。一个 Dask array 只是不同计算机上 NumPy arrays 的集合。

Array 文档 了解更多,或在 Array 示例 查看示例

import dask.array as da

x = da.random.random((10000, 10000))
y = (x + x.T) - x.mean(axis=1)

z = y.var(axis=0).compute()
_images/dask-array.svg

Xarray 是一个流行的下游项目,它封装了 Dask array,提供了带标签的轴,并同时跟踪许多 Dask array,从而实现更直观的分析。Xarray 非常流行,尤其在地理空间和成像社区中,占 Dask array 当前使用的大多数。

Xarray 文档 了解更多,或在 Xarray 示例 查看示例

import xarray as xr

ds = xr.open_mfdataset("data/*.nc")
da.groupby('time.month').mean('time').compute()
https://docs.xarray.dev/en/stable/_static/logos/Xarray_Logo_RGB_Final.png

Dask Bags 是简单的并行 Python 列表,常用于处理文本或原始 Python 对象。它们…

  • 简单,提供易于使用的 map 和 reduce 功能

  • 低内存处理数据,以流式方式最大限度地减少内存使用

  • 适用于预处理,特别是将文本或 JSON 数据摄取到 dataframes 之前

在这方面,Dask bags 类似于 Spark RDD 或普通的 Python 数据结构和迭代器。一个 Dask bag 只是不同计算机上并行处理的 Python 迭代器的集合。

Bag 文档 了解更多,或在 Bag 示例 查看示例

import dask.bag as db

# Read large datasets in parallel
lines = db.read_text("s3://mybucket/data.*.json")
records = (lines
    .map(json.loads)
    .filter(lambda d: d["value"] > 0)
)
df = records.to_dask_dataframe()

如何安装 Dask

使用 pipconda 安装 Dask 非常简单。

安装文档 了解更多

pip install "dask[complete]"
conda install dask

如何部署 Dask

你可以在单机上使用 Dask,也可以将其部署在分布式硬件上。

部署文档 了解更多

如果你创建一个 LocalCluster 对象,Dask 可以轻松地在你的 Python 会话中自行设置一切。

from dask.distributed import LocalCluster
cluster = LocalCluster()
client = cluster.get_client()

# Normal Dask work ...

或者,你可以跳过这部分,Dask 将完全在你本地进程内的线程池中运行。

Coiled 是一款商业 SaaS 产品,可在 AWS、GCP 和 Azure 等云平台上部署 Dask 集群。

import coiled
cluster = coiled.Cluster(
   n_workers=100,
   region="us-east-2",
   worker_memory="16 GiB",
   spot_policy="spot_with_fallback",
)
client = cluster.get_client()

Coiled 文档 了解更多

Dask-Jobqueue 项目 在 SLURM、PBS、SGE、LSF、Torque、Condor 等流行 HPC 作业提交系统上部署 Dask 集群。

from dask_jobqueue import PBSCluster
cluster = PBSCluster(
   cores=24,
   memory="100GB",
   queue="regular",
   account="my-account",
)
cluster.scale(jobs=100)
client = cluster.get_client()

Dask-Jobqueue 文档 了解更多

Dask Kubernetes 项目 提供了一个 Dask Kubernetes Operator,用于在 Kubernetes 集群上部署 Dask。

from dask_kubernetes.operator import KubeCluster
cluster = KubeCluster(
   name="my-dask-cluster",
   image="ghcr.io/dask/dask:latest",
   resources={"requests": {"memory": "2Gi"}, "limits": {"memory": "64Gi"}},
)
cluster.scale(10)
client = cluster.get_client()

Dask Kubernetes 文档 了解更多

通过示例学习

Dask 应用广泛,涵盖所有行业和规模。只要使用 Python 并因大规模数据或密集计算而遇到困难的地方,都可以使用 Dask。

你可以在以下资源中了解更多关于 Dask 的应用

此外,我们鼓励你查阅本网站上与最符合你应用的 API 相关的参考文档。

Dask 被设计成易于使用强大。我们希望它能帮助你愉快地工作。