机器学习

机器学习是一个涉及多种不同工作流的广泛领域。本页面列出了 Dask 可以帮助您处理 ML 工作负载的一些更常见的方式。

超参数优化

Optuna

对于最先进的超参数优化 (HPO),我们推荐 Optuna 库及其相关的 Dask 集成

在 Optuna 中,您构建一个目标函数,该函数接收一个 trial 对象,该对象根据您在代码中定义的分布生成参数。您的目标函数最终会产生一个分数。Optuna 会根据已接收的分数智能地建议分布中的哪些值。

def objective(trial):
    params = {
        "max_depth": trial.suggest_int("max_depth", 2, 10, step=1),
        "learning_rate": trial.suggest_float("learning_rate", 1e-8, 1.0, log=True),
        ...
    }
    model = train_model(train_data, **params)
    result = score(model, test_data)
    return result

Dask 和 Optuna 通常一起使用,通过并行运行多个目标函数并在 Dask 调度器上同步分数和参数选择。为此,我们使用 Optuna 中找到的 DaskStore 对象。

import optuna

storage = optuna.integration.DaskStorage()

study = optuna.create_study(
    direction="maximize",
    storage=storage,  # This makes the study Dask-enabled
)

然后我们并行运行多个优化方法。

from dask.distributed import LocalCluster, wait

cluster = LocalCluster(processes=False)  # replace this with some scalable cluster
client = cluster.get_client()

futures = [
    client.submit(study.optimize, objective, n_trials=1, pure=False) for _ in range(500)
]
wait(futures)

print(study.best_params)

有关更完整的示例,请参阅此 Optuna + XGBoost 示例

Dask Futures

此外,对于更简单的情况,人们经常使用 Dask Futures 在大量参数上训练同一个模型。Dask Futures 是一个通用 API,用于在各种输入上运行普通 Python 函数。一个示例如下所示:

from dask.distributed import LocalCluster

cluster = LocalCluster(processes=False)  # replace this with some scalable cluster
client = cluster.get_client()

def train_and_score(params: dict) -> float:
    data = load_data()
    model = make_model(**params)
    train(model)
    score = evaluate(model)
    return score

params_list = [...]
futures = [
    client.submit(train_and_score, params) for params in params_list
]
scores = client.gather(futures)
best = max(scores)

best_params = params_list[scores.index(best)]

有关更完整的示例,请参阅 Futures 文档

梯度提升树

像 XGBoost 和 LightGBM 这样流行的 GBT 库具有原生的 Dask 支持,这使您能够并行地在大规模数据集上训练模型。

例如,使用 Dask DataFrame、XGBoost 和本地 Dask 集群的示例如下所示:

import dask.dataframe as dd
import xgboost as xgb
from dask.distributed import LocalCluster

df = dask.datasets.timeseries()  # Randomly generated data
# df = dd.read_parquet(...)      # In practice, you would probably read data though

train, test = df.random_split([0.80, 0.20])
X_train, y_train, X_test, y_test = ...

with LocalCluster() as cluster:
    with cluster.get_client() as client:
        d_train = xgb.dask.DaskDMatrix(client, X_train, y_train, enable_categorical=True)
        model = xgb.dask.train(
            ...
            d_train,
        )
        predictions = xgb.dask.predict(client, model, X_test)

有关更完整的示例,请参阅此 XGBoost 示例

批量预测

模型训练完成后,通常需要将模型应用于大量数据。我们发现最常见的方法有两种:

  1. 使用 Dask Futures

  2. 使用 DataFrame.map_partitionsArray.map_blocks

我们将在下面展示每种方法的示例。

Dask Futures

Dask Futures 是一个通用 API,允许您并行地在 Python 数据上运行任意 Python 函数。应用此工具解决批量预测问题非常容易。

例如,当人们想要将模型应用于许多数据文件时,我们经常看到这种用法。

from dask.distributed import LocalCluster

cluster = LocalCluster(processes=False)  # replace this with some scalable cluster
client = cluster.get_client()

filenames = [...]

def predict(filename, model):
    data = load(filename)
    result = model.predict(data)
    return result

model = client.submit(load_model, path_to_model)
predictions = client.map(predict, filenames, model=model)
results = client.gather(predictions)

有关更完整的示例,请参阅 计算机视觉工作负载的批量评分 (视频)

Dask DataFrame

有时我们希望使用更高级别的 Dask API 来处理我们的模型,例如 Dask DataFrame 或 Dask Array。这在处理记录数据时更常见,例如如果我们有一组患者记录,并想查看哪些患者可能患病。

import dask.dataframe as dd

df = dd.read_parquet("/path/to/my/data.parquet")

model = load_model("/path/to/my/model")

# pandas code
# predictions = model.predict(df)
# predictions.to_parquet("/path/to/results.parquet")

# Dask code
predictions = df.map_partitions(model.predict)
predictions.to_parquet("/path/to/results.parquet")

更多信息请参阅 Dask DataFrame 文档