Dask
目录
Dask¶
Dask 是一个用于并行和分布式计算的 Python 库。Dask…
易于使用和设置 (它只是一个 Python 库)
强大,能够提供扩展性并解锁复杂算法
而且 有趣 🎉
如何使用 Dask¶
Dask 提供了多种 API。选择最适合你的一个
Dask Futures 可以并行化任意 for 循环风格的 Python 代码,提供
灵活的工具,允许你构建自定义管道和工作流
强大的扩展技术,每秒处理数千个任务
响应迅速的反馈,实现直观的执行和有用的仪表盘
Dask futures 是其他 Dask 工作的基础
在 Futures 文档 了解更多,或在 Futures 示例 查看示例
from dask.distributed import LocalCluster
client = LocalCluster().get_client()
# Submit work to happen in parallel
results = []
for filename in filenames:
data = client.submit(load, filename)
result = client.submit(process, data)
results.append(result)
# Gather results back to local computer
results = client.gather(results)

Dask Dataframes 可以并行化流行的 pandas 库,提供
内存溢出执行,即使在单机上,也允许你处理比可用 RAM 更大的数据
并行执行,处理速度更快
分布式计算,处理千兆字节级别的数据集
在这方面,Dask Dataframes 类似于 Apache Spark,但使用了熟悉的 pandas API 和内存模型。一个 Dask dataframe 只是不同计算机上 pandas dataframes 的集合。
在 DataFrame 文档 了解更多,或在 DataFrame 示例 查看示例
import dask.dataframe as dd
# Read large datasets in parallel
df = dd.read_parquet("s3://mybucket/data.*.parquet")
df = df[df.value < 0]
result = df.groupby(df.name).amount.mean()
result = result.compute() # Compute to get pandas result
result.plot()
Dask Arrays 可以并行化流行的 NumPy 库,提供
内存溢出执行,即使在单机上,也允许你处理比可用 RAM 更大的数据
并行执行,处理速度更快
分布式计算,处理千兆字节级别的数据集
Dask Arrays 允许科学家和研究人员对大型数据集执行直观且复杂的操作,同时使用熟悉的 NumPy API 和内存模型。一个 Dask array 只是不同计算机上 NumPy arrays 的集合。
在 Array 文档 了解更多,或在 Array 示例 查看示例
import dask.array as da
x = da.random.random((10000, 10000))
y = (x + x.T) - x.mean(axis=1)
z = y.var(axis=0).compute()
Xarray 是一个流行的下游项目,它封装了 Dask array,提供了带标签的轴,并同时跟踪许多 Dask array,从而实现更直观的分析。Xarray 非常流行,尤其在地理空间和成像社区中,占 Dask array 当前使用的大多数。
在 Xarray 文档 了解更多,或在 Xarray 示例 查看示例
import xarray as xr
ds = xr.open_mfdataset("data/*.nc")
da.groupby('time.month').mean('time').compute()

Dask Bags 是简单的并行 Python 列表,常用于处理文本或原始 Python 对象。它们…
简单,提供易于使用的 map 和 reduce 功能
低内存处理数据,以流式方式最大限度地减少内存使用
适用于预处理,特别是将文本或 JSON 数据摄取到 dataframes 之前
在这方面,Dask bags 类似于 Spark RDD 或普通的 Python 数据结构和迭代器。一个 Dask bag 只是不同计算机上并行处理的 Python 迭代器的集合。
import dask.bag as db
# Read large datasets in parallel
lines = db.read_text("s3://mybucket/data.*.json")
records = (lines
.map(json.loads)
.filter(lambda d: d["value"] > 0)
)
df = records.to_dask_dataframe()
如何部署 Dask¶
你可以在单机上使用 Dask,也可以将其部署在分布式硬件上。
在 部署文档 了解更多
如果你创建一个 LocalCluster
对象,Dask 可以轻松地在你的 Python 会话中自行设置一切。
from dask.distributed import LocalCluster
cluster = LocalCluster()
client = cluster.get_client()
# Normal Dask work ...
或者,你可以跳过这部分,Dask 将完全在你本地进程内的线程池中运行。
Coiled 是一款商业 SaaS 产品,可在 AWS、GCP 和 Azure 等云平台上部署 Dask 集群。
import coiled
cluster = coiled.Cluster(
n_workers=100,
region="us-east-2",
worker_memory="16 GiB",
spot_policy="spot_with_fallback",
)
client = cluster.get_client()
在 Coiled 文档 了解更多
Dask-Jobqueue 项目 在 SLURM、PBS、SGE、LSF、Torque、Condor 等流行 HPC 作业提交系统上部署 Dask 集群。
from dask_jobqueue import PBSCluster
cluster = PBSCluster(
cores=24,
memory="100GB",
queue="regular",
account="my-account",
)
cluster.scale(jobs=100)
client = cluster.get_client()
在 Dask-Jobqueue 文档 了解更多
Dask Kubernetes 项目 提供了一个 Dask Kubernetes Operator,用于在 Kubernetes 集群上部署 Dask。
from dask_kubernetes.operator import KubeCluster
cluster = KubeCluster(
name="my-dask-cluster",
image="ghcr.io/dask/dask:latest",
resources={"requests": {"memory": "2Gi"}, "limits": {"memory": "64Gi"}},
)
cluster.scale(10)
client = cluster.get_client()
在 Dask Kubernetes 文档 了解更多
通过示例学习¶
Dask 应用广泛,涵盖所有行业和规模。只要使用 Python 并因大规模数据或密集计算而遇到困难的地方,都可以使用 Dask。
你可以在以下资源中了解更多关于 Dask 的应用
此外,我们鼓励你查阅本网站上与最符合你应用的 API 相关的参考文档。
Dask 被设计成易于使用且强大。我们希望它能帮助你愉快地工作。