机器学习
目录
机器学习¶
机器学习是一个涉及多种不同工作流的广泛领域。本页面列出了 Dask 可以帮助您处理 ML 工作负载的一些更常见的方式。
超参数优化¶
Optuna¶
对于最先进的超参数优化 (HPO),我们推荐 Optuna 库及其相关的 Dask 集成。
在 Optuna 中,您构建一个目标函数,该函数接收一个 trial 对象,该对象根据您在代码中定义的分布生成参数。您的目标函数最终会产生一个分数。Optuna 会根据已接收的分数智能地建议分布中的哪些值。
def objective(trial):
params = {
"max_depth": trial.suggest_int("max_depth", 2, 10, step=1),
"learning_rate": trial.suggest_float("learning_rate", 1e-8, 1.0, log=True),
...
}
model = train_model(train_data, **params)
result = score(model, test_data)
return result
Dask 和 Optuna 通常一起使用,通过并行运行多个目标函数并在 Dask 调度器上同步分数和参数选择。为此,我们使用 Optuna 中找到的 DaskStore
对象。
import optuna
storage = optuna.integration.DaskStorage()
study = optuna.create_study(
direction="maximize",
storage=storage, # This makes the study Dask-enabled
)
然后我们并行运行多个优化方法。
from dask.distributed import LocalCluster, wait
cluster = LocalCluster(processes=False) # replace this with some scalable cluster
client = cluster.get_client()
futures = [
client.submit(study.optimize, objective, n_trials=1, pure=False) for _ in range(500)
]
wait(futures)
print(study.best_params)
有关更完整的示例,请参阅此 Optuna + XGBoost 示例。
Dask Futures¶
此外,对于更简单的情况,人们经常使用 Dask Futures 在大量参数上训练同一个模型。Dask Futures 是一个通用 API,用于在各种输入上运行普通 Python 函数。一个示例如下所示:
from dask.distributed import LocalCluster
cluster = LocalCluster(processes=False) # replace this with some scalable cluster
client = cluster.get_client()
def train_and_score(params: dict) -> float:
data = load_data()
model = make_model(**params)
train(model)
score = evaluate(model)
return score
params_list = [...]
futures = [
client.submit(train_and_score, params) for params in params_list
]
scores = client.gather(futures)
best = max(scores)
best_params = params_list[scores.index(best)]
有关更完整的示例,请参阅 Futures 文档。
梯度提升树¶
像 XGBoost 和 LightGBM 这样流行的 GBT 库具有原生的 Dask 支持,这使您能够并行地在大规模数据集上训练模型。
例如,使用 Dask DataFrame、XGBoost 和本地 Dask 集群的示例如下所示:
import dask.dataframe as dd
import xgboost as xgb
from dask.distributed import LocalCluster
df = dask.datasets.timeseries() # Randomly generated data
# df = dd.read_parquet(...) # In practice, you would probably read data though
train, test = df.random_split([0.80, 0.20])
X_train, y_train, X_test, y_test = ...
with LocalCluster() as cluster:
with cluster.get_client() as client:
d_train = xgb.dask.DaskDMatrix(client, X_train, y_train, enable_categorical=True)
model = xgb.dask.train(
...
d_train,
)
predictions = xgb.dask.predict(client, model, X_test)
有关更完整的示例,请参阅此 XGBoost 示例。
批量预测¶
模型训练完成后,通常需要将模型应用于大量数据。我们发现最常见的方法有两种:
使用 Dask Futures
我们将在下面展示每种方法的示例。
Dask Futures¶
Dask Futures 是一个通用 API,允许您并行地在 Python 数据上运行任意 Python 函数。应用此工具解决批量预测问题非常容易。
例如,当人们想要将模型应用于许多数据文件时,我们经常看到这种用法。
from dask.distributed import LocalCluster
cluster = LocalCluster(processes=False) # replace this with some scalable cluster
client = cluster.get_client()
filenames = [...]
def predict(filename, model):
data = load(filename)
result = model.predict(data)
return result
model = client.submit(load_model, path_to_model)
predictions = client.map(predict, filenames, model=model)
results = client.gather(predictions)
有关更完整的示例,请参阅 计算机视觉工作负载的批量评分 (视频)。
Dask DataFrame¶
有时我们希望使用更高级别的 Dask API 来处理我们的模型,例如 Dask DataFrame 或 Dask Array。这在处理记录数据时更常见,例如如果我们有一组患者记录,并想查看哪些患者可能患病。
import dask.dataframe as dd
df = dd.read_parquet("/path/to/my/data.parquet")
model = load_model("/path/to/my/model")
# pandas code
# predictions = model.predict(df)
# predictions.to_parquet("/path/to/results.parquet")
# Dask code
predictions = df.map_partitions(model.predict)
predictions.to_parquet("/path/to/results.parquet")
更多信息请参阅 Dask DataFrame 文档。