Dask
目录
Dask¶
Dask 是一个用于并行和分布式计算的 Python 库。 Dask 是
易于使用和设置(它只是一个 Python 库)
在提供规模和解锁复杂算法方面强大
并且有趣 🎉
如何使用 Dask¶
Dask 提供了几种 API。选择最适合您的一种
Dask Futures 并行化任意 for 循环风格的 Python 代码,提供
灵活的工具,允许您构建自定义管道和工作流
强大的扩展技术,每秒处理数千个任务
响应迅速的反馈,实现直观执行和有用的仪表盘
Dask future 构成了其他 Dask 工作的基础
在 Futures 文档 中了解更多信息,或在 Futures 示例 中查看示例
from dask.distributed import LocalCluster
client = LocalCluster().get_client()
# Submit work to happen in parallel
results = []
for filename in filenames:
data = client.submit(load, filename)
result = client.submit(process, data)
results.append(result)
# Gather results back to local computer
results = client.gather(results)

Dask Dataframes 并行化流行的 pandas 库,提供
适用于单机大于内存的执行,允许您处理大于可用 RAM 的数据
并行执行,实现更快处理
适用于 TB 级数据集的分布式计算
Dask Dataframes 在这方面与 Apache Spark 相似,但使用熟悉的 pandas API 和内存模型。一个 Dask dataframe 就是不同计算机上 pandas dataframes 的集合。
在 DataFrame 文档 中了解更多信息,或在 DataFrame 示例 中查看示例
import dask.dataframe as dd
# Read large datasets in parallel
df = dd.read_parquet("s3://mybucket/data.*.parquet")
df = df[df.value < 0]
result = df.groupby(df.name).amount.mean()
result = result.compute() # Compute to get pandas result
result.plot()
Dask Arrays 并行化流行的 NumPy 库,提供
适用于单机大于内存的执行,允许您处理大于可用 RAM 的数据
并行执行,实现更快处理
适用于 TB 级数据集的分布式计算
Dask Arrays 允许科学家和研究人员对大型数据集执行直观而复杂的运算,但使用熟悉的 NumPy API 和内存模型。一个 Dask array 就是不同计算机上 NumPy 数组的集合。
在 Array 文档 中了解更多信息,或在 Array 示例 中查看示例
import dask.array as da
x = da.random.random((10000, 10000))
y = (x + x.T) - x.mean(axis=1)
z = y.var(axis=0).compute()
Xarray 封装了 Dask array,是一个流行的下游项目,提供带标签的轴并同时跟踪多个 Dask array,从而实现更直观的分析。Xarray 很受欢迎,并且目前在地理空间和图像社区中占据了 Dask array 使用的大部分份额。
在 Xarray 文档 中了解更多信息,或在 Xarray 示例 中查看示例
import xarray as xr
ds = xr.open_mfdataset("data/*.nc")
da.groupby('time.month').mean('time').compute()

Dask Bags 是简单的并行 Python 列表,常用于处理文本或原始 Python 对象。它们是...
简单,提供易于使用的 map 和 reduce 功能
低内存,以流式方式处理数据,最大限度地减少内存使用
适用于预处理,特别是文本或 JSON 数据,以便之后导入 dataframes
Dask bags 在这方面与 Spark RDD 或原生的 Python 数据结构和迭代器相似。一个 Dask bag 就是不同计算机上并行处理的 Python 迭代器的集合。
在 Bag 文档 中了解更多信息,或在 Bag 示例 中查看示例
import dask.bag as db
# Read large datasets in parallel
lines = db.read_text("s3://mybucket/data.*.json")
records = (lines
.map(json.loads)
.filter(lambda d: d["value"] > 0)
)
df = records.to_dask_dataframe()
如何安装 Dask¶
使用 pip
或 conda
安装 Dask 非常容易。
在 安装文档 中了解更多信息
pip install "dask[complete]"
conda install dask
如何部署 Dask¶
您可以在单台机器上使用 Dask,或将其部署到分布式硬件上。
在 部署文档 中了解更多信息
如果您创建一个 LocalCluster
对象,Dask 可以轻松地在您的 Python 会话中自行设置,该对象会为您设置好一切。
from dask.distributed import LocalCluster
cluster = LocalCluster()
client = cluster.get_client()
# Normal Dask work ...
或者,您可以跳过此部分,Dask 将在完全包含在您的本地进程中的线程池中运行。
Coiled 是一个商业 SaaS 产品,可在 AWS、GCP 和 Azure 等云平台上部署 Dask 集群。
import coiled
cluster = coiled.Cluster(
n_workers=100,
region="us-east-2",
worker_memory="16 GiB",
spot_policy="spot_with_fallback",
)
client = cluster.get_client()
在 Coiled 文档 中了解更多信息
Dask-Jobqueue 项目 在 SLURM、PBS、SGE、LSF、Torque、Condor 等流行的高性能计算作业提交系统上部署 Dask 集群。
from dask_jobqueue import PBSCluster
cluster = PBSCluster(
cores=24,
memory="100GB",
queue="regular",
account="my-account",
)
cluster.scale(jobs=100)
client = cluster.get_client()
在 Dask-Jobqueue 文档 中了解更多信息
Dask Kubernetes 项目 提供了一个 Dask Kubernetes Operator,用于在 Kubernetes 集群上部署 Dask。
from dask_kubernetes.operator import KubeCluster
cluster = KubeCluster(
name="my-dask-cluster",
image="ghcr.io/dask/dask:latest",
resources={"requests": {"memory": "2Gi"}, "limits": {"memory": "64Gi"}},
)
cluster.scale(10)
client = cluster.get_client()
在 Dask Kubernetes 文档 中了解更多信息
通过示例学习¶
Dask 应用广泛,遍及各行各业和各种规模。只要人们在使用 Python 并因大规模数据或繁重计算而感到痛苦,就会使用 Dask。
您可以在以下来源了解更多关于 Dask 的应用
此外,我们鼓励您查阅本网站上与您应用最接近的 API 相关的参考文档。
Dask 的设计理念是易于使用且功能强大。我们希望它能帮助您愉快地完成工作。