Dask
目录
Dask¶
Dask 是一个用于并行和分布式计算的 Python 库。Dask 是
易于使用和设置(它只是一个 Python 库)
在提供规模和解锁复杂算法方面功能强大
并且有趣 🎉
如何使用 Dask¶
Dask 提供多种 API。选择最适合您的一种
Dask Futures 并行化任意 for 循环风格的 Python 代码,提供
灵活的工具,让您能够构建自定义管道和工作流
强大的扩展技术,每秒处理数千个任务
响应迅速的反馈,实现直观执行和有用的仪表盘
Dask futures 构成了 Dask 其他工作的基础
在 Futures 文档 了解更多,或在 Futures 示例 查看示例
from dask.distributed import LocalCluster
client = LocalCluster().get_client()
# Submit work to happen in parallel
results = []
for filename in filenames:
data = client.submit(load, filename)
result = client.submit(process, data)
results.append(result)
# Gather results back to local computer
results = client.gather(results)

Dask DataFrames 并行化了流行的 pandas 库,提供
为单机提供超内存执行,让您能够处理大于可用 RAM 的数据
并行执行以加快处理速度
为 TB 级数据集提供分布式计算
在这方面,Dask DataFrames 与 Apache Spark 类似,但使用熟悉的 pandas API 和内存模型。一个 Dask dataframe 只是不同计算机上 pandas dataframes 的集合。
在 DataFrame 文档 了解更多,或在 DataFrame 示例 查看示例
import dask.dataframe as dd
# Read large datasets in parallel
df = dd.read_parquet("s3://mybucket/data.*.parquet")
df = df[df.value < 0]
result = df.groupby(df.name).amount.mean()
result = result.compute() # Compute to get pandas result
result.plot()
Dask 数组并行化了流行的 NumPy 库,提供
为单机提供超内存执行,让您能够处理大于可用 RAM 的数据
并行执行以加快处理速度
为 TB 级数据集提供分布式计算
Dask 数组允许科学家和研究人员对大型数据集执行直观和复杂的运算,同时使用熟悉的 NumPy API 和内存模型。一个 Dask 数组只是不同计算机上 NumPy 数组的集合。
import dask.array as da
x = da.random.random((10000, 10000))
y = (x + x.T) - x.mean(axis=1)
z = y.var(axis=0).compute()
Xarray 包裹了 Dask 数组,是一个流行的下游项目,提供带标签的轴,并同时跟踪许多 Dask 数组,从而实现更直观的分析。Xarray 很受欢迎,目前 Dask 数组的大部分用例都在 Xarray 中,特别是在地理空间和成像社区。
在 Xarray 文档 了解更多,或在 Xarray 示例 查看示例
import xarray as xr
ds = xr.open_mfdataset("data/*.nc")
da.groupby('time.month').mean('time').compute()

Dask Bags 是简单的并行 Python 列表,常用于处理文本或原始 Python 对象。它们…
简单,提供易于使用的 map 和 reduce 功能
低内存,以流式方式处理数据,最大限度地减少内存使用
适合预处理,特别是文本或 JSON 数据,在将其摄取到 dataframes 之前
在这方面,Dask bags 与 Spark RDDs 或普通 Python 数据结构和迭代器类似。一个 Dask bag 只是不同计算机上并行处理的 Python 迭代器的集合。
import dask.bag as db
# Read large datasets in parallel
lines = db.read_text("s3://mybucket/data.*.json")
records = (lines
.map(json.loads)
.filter(lambda d: d["value"] > 0)
)
df = records.to_dask_dataframe()
如何部署 Dask¶
您可以在单台机器上使用 Dask,或将其部署到分布式硬件上。
在 部署文档 了解更多
如果在您的 Python 会话中创建一个 LocalCluster
对象,Dask 可以轻松地自行设置,它会为您设置好一切。
from dask.distributed import LocalCluster
cluster = LocalCluster()
client = cluster.get_client()
# Normal Dask work ...
或者,您可以跳过这一部分,Dask 将在完全包含在您的本地进程中的线程池中运行。
Coiled 是一个商业 SaaS 产品,它在 AWS、GCP 和 Azure 等云平台上部署 Dask 集群。
import coiled
cluster = coiled.Cluster(
n_workers=100,
region="us-east-2",
worker_memory="16 GiB",
spot_policy="spot_with_fallback",
)
client = cluster.get_client()
在 Coiled 文档 了解更多
Dask-Jobqueue 项目 在流行的 HPC 作业提交系统(如 SLURM、PBS、SGE、LSF、Torque、Condor 等)上部署 Dask 集群。
from dask_jobqueue import PBSCluster
cluster = PBSCluster(
cores=24,
memory="100GB",
queue="regular",
account="my-account",
)
cluster.scale(jobs=100)
client = cluster.get_client()
在 Dask-Jobqueue 文档 了解更多
Dask Kubernetes 项目 提供了一个 Dask Kubernetes Operator,用于在 Kubernetes 集群上部署 Dask。
from dask_kubernetes.operator import KubeCluster
cluster = KubeCluster(
name="my-dask-cluster",
image="ghcr.io/dask/dask:latest",
resources={"requests": {"memory": "2Gi"}, "limits": {"memory": "64Gi"}},
)
cluster.scale(10)
client = cluster.get_client()
在 Dask Kubernetes 文档 了解更多
通过示例学习¶
Dask 的应用非常广泛,跨越所有行业和规模。Dask 适用于任何使用 Python 并因大规模数据或密集计算而遇到困难的场景。
您可以从以下来源了解更多 Dask 应用
此外,我们鼓励您查阅本网站上与您的应用最匹配的 API 相关参考文档。
Dask 的设计宗旨是易于使用且功能强大。我们希望它能帮助您享受工作乐趣。