机器学习

机器学习是一个涉及多种不同工作流程的广阔领域。本页面列出了一些 Dask 在 ML 工作负载中提供帮助的常见方式。

超参数优化

Optuna

对于最先进的超参数优化 (HPO),我们推荐使用 Optuna 库及其相关的 Dask 集成

在 Optuna 中,您构建一个目标函数,该函数接受一个 trial 对象,该对象根据您在代码中定义的分布生成参数。您的目标函数最终会产生一个分数。Optuna 会根据收到的分数智能地建议分布中的值。

def objective(trial):
    params = {
        "max_depth": trial.suggest_int("max_depth", 2, 10, step=1),
        "learning_rate": trial.suggest_float("learning_rate", 1e-8, 1.0, log=True),
        ...
    }
    model = train_model(train_data, **params)
    result = score(model, test_data)
    return result

Dask 和 Optuna 通常一起使用,方法是并行运行多个目标函数,并在 Dask 调度器上同步分数和参数选择。为此,我们使用 Optuna 中的 DaskStore 对象。

import optuna

storage = optuna.integration.DaskStorage()

study = optuna.create_study(
    direction="maximize",
    storage=storage,  # This makes the study Dask-enabled
)

然后我们并行运行多个优化方法。

from dask.distributed import LocalCluster, wait

cluster = LocalCluster(processes=False)  # replace this with some scalable cluster
client = cluster.get_client()

futures = [
    client.submit(study.optimize, objective, n_trials=1, pure=False) for _ in range(500)
]
wait(futures)

print(study.best_params)

有关更完整的示例,请参阅此Optuna + XGBoost 示例

Dask Futures

此外,对于更简单的情况,人们通常使用 Dask Futures 在大量参数上训练同一个模型。Dask Futures 是一个通用的 API,用于在各种输入上运行普通的 Python 函数。示例可能如下所示

from dask.distributed import LocalCluster

cluster = LocalCluster(processes=False)  # replace this with some scalable cluster
client = cluster.get_client()

def train_and_score(params: dict) -> float:
    data = load_data()
    model = make_model(**params)
    train(model)
    score = evaluate(model)
    return score

params_list = [...]
futures = [
    client.submit(train_and_score, params) for params in params_list
]
scores = client.gather(futures)
best = max(scores)

best_params = params_list[scores.index(best)]

有关更完整的示例,请参阅 Futures 文档

梯度提升树

流行的 GBT 库,如 XGBoost 和 LightGBM,具有原生的 Dask 支持,允许您在超大型数据集上并行训练模型。

例如,使用 Dask DataFrame、XGBoost 和本地 Dask 集群的示例如下

import dask.dataframe as dd
import xgboost as xgb
from dask.distributed import LocalCluster

df = dask.datasets.timeseries()  # Randomly generated data
# df = dd.read_parquet(...)      # In practice, you would probably read data though

train, test = df.random_split([0.80, 0.20])
X_train, y_train, X_test, y_test = ...

with LocalCluster() as cluster:
    with cluster.get_client() as client:
        d_train = xgb.dask.DaskDMatrix(client, X_train, y_train, enable_categorical=True)
        model = xgb.dask.train(
            ...
            d_train,
        )
        predictions = xgb.dask.predict(client, model, X_test)

有关更完整的示例,请参阅此XGBoost 示例

批量预测

模型训练完成后,通常需要在大量数据上应用该模型。我们最常看到以下两种实现方式

  1. 使用 Dask Futures

  2. 使用 DataFrame.map_partitionsArray.map_blocks

下面我们将展示每种方法的示例。

Dask Futures

Dask Futures 是一个通用 API,允许您在 Python 数据上并行运行任意 Python 函数。将此工具应用于解决批量预测问题非常容易。

例如,当人们希望在许多数据文件上应用模型时,我们经常会看到这种情况。

from dask.distributed import LocalCluster

cluster = LocalCluster(processes=False)  # replace this with some scalable cluster
client = cluster.get_client()

filenames = [...]

def predict(filename, model):
    data = load(filename)
    result = model.predict(data)
    return result

model = client.submit(load_model, path_to_model)
predictions = client.map(predict, filenames, model=model)
results = client.gather(predictions)

有关更完整的示例,请参阅 计算机视觉工作负载的批量评分 (视频)

Dask DataFrame

有时我们希望使用更高级别的 Dask API(如 Dask DataFrame 或 Dask Array)来处理模型。这在记录数据中更常见,例如如果我们有一组患者记录,并想查看哪些患者可能生病。

import dask.dataframe as dd

df = dd.read_parquet("/path/to/my/data.parquet")

model = load_model("/path/to/my/model")

# pandas code
# predictions = model.predict(df)
# predictions.to_parquet("/path/to/results.parquet")

# Dask code
predictions = df.map_partitions(model.predict)
predictions.to_parquet("/path/to/results.parquet")

更多信息请参阅 Dask DataFrame 文档