使用 Dask DataFrames 加载和保存数据

您可以从各种数据存储格式(如 CSV、HDF、Apache Parquet 等)创建 Dask DataFrame。对于大多数格式,这些数据可以存储在各种存储系统中,包括本地磁盘、网络文件系统 (NFS)、Hadoop 分布式文件系统 (HDFS)、Google Cloud Storage 和 Amazon S3(HDF 除外,它仅适用于类似 POSIX 的文件系统)。

有关 dask.dataframe 的范围、用法和限制的更多信息,请参阅 DataFrame 概述页面;有关常见问题的更多技巧和解决方案,请参阅 DataFrame 最佳实践

API

以下函数提供了 Dask DataFrames、文件格式以及其他 Dask 或 Python 集合之间的转换访问。

文件格式

read_csv(urlpath[, blocksize, ...])

将 CSV 文件读取到 Dask.DataFrame 中

read_parquet([path, columns, filters, ...])

将 Parquet 文件读取到 Dask DataFrame 中

read_hdf(pattern, key[, start, stop, ...])

将 HDF 文件读取到 Dask DataFrame 中

read_orc(path[, engine, columns, index, ...])

从 ORC 文件读取 DataFrame

read_json(url_path[, orient, lines, ...])

从一组 JSON 文件创建 DataFrame

read_sql_table(table_name, con, index_col[, ...])

将 SQL 数据库表读取到 DataFrame 中。

read_sql_query(sql, con, index_col[, ...])

将 SQL 查询读取到 DataFrame 中。

read_sql(sql, con, index_col, **kwargs)

将 SQL 查询或数据库表读取到 DataFrame 中。

read_table(urlpath[, blocksize, ...])

将分隔符文件读取到 Dask.DataFrame 中

read_fwf(urlpath[, blocksize, ...])

将固定宽度文件读取到 Dask.DataFrame 中

from_array(arr[, chunksize, columns, meta])

将任何可切片的数组读取到 Dask DataFrame 中

to_csv(df, filename[, single_file, ...])

将 Dask DataFrame 存储到 CSV 文件

to_parquet(df, path[, compression, ...])

将 Dask.dataframe 存储到 Parquet 文件

to_hdf(df, path, key[, mode, append, ...])

将 Dask Dataframe 存储到分层数据格式 (HDF) 文件

to_sql(df, name, uri[, schema, if_exists, ...])

将 Dask Dataframe 存储到 SQL 表

Dask 集合

from_delayed(dfs[, meta, divisions, prefix, ...])

从多个 Dask Delayed 对象创建 Dask DataFrame

from_dask_array(x[, columns, index, meta])

从 Dask Array 创建 Dask DataFrame。

from_map(func, *iterables[, args, meta, ...])

从自定义函数映射创建 DataFrame 集合。

dask.bag.core.Bag.to_dataframe([meta, ...])

从 Dask Bag 创建 Dask DataFrame。

DataFrame.to_delayed([optimize_graph])

转换为 dask.delayed 对象列表,每个分区对应一个对象。

to_records(df)

从 Dask Dataframe 创建 Dask Array

to_bag(df[, index, format])

从 Dask DataFrame 创建 Dask Bag

Pandas

from_pandas(data[, npartitions, sort, chunksize])

从 Pandas DataFrame 构建 Dask DataFrame

DataFrame.from_dict(data, *[, npartitions, ...])

从 Python 字典构建 Dask DataFrame

其他文件格式

创建

从 CSV 读取

您可以使用 read_csv() 将一个或多个 CSV 文件读取到 Dask DataFrame 中。它支持使用 globstrings 同时加载多个文件

>>> df = dd.read_csv('myfiles.*.csv')

您可以使用 blocksize 参数拆分单个大文件

>>> df = dd.read_csv('largefile.csv', blocksize=25e6)  # 25MB chunks

更改 blocksize 参数会更改分区的数量(参见分区的解释)。处理 Dask DataFrames 的一个经验法则是将您的分区大小保持在 100MB 以下。

从 Parquet 读取

类似地,您可以使用 read_parquet() 读取一个或多个 Parquet 文件。您可以读取单个 Parquet 文件

>>> df = dd.read_parquet("path/to/mydata.parquet")

或本地 Parquet 文件目录

>>> df = dd.read_parquet("path/to/my/parquet/")

有关使用 Parquet 文件的更多详细信息,包括技巧和最佳实践,请参阅Dask Dataframe 和 Parquet 文档。

从云存储读取

Dask 可以从各种数据存储(包括云对象存储)读取数据。您可以通过在 dd.read_csv 等常用数据访问函数中使用的路径前加上 s3:// 等协议来实现这一点

>>> df = dd.read_csv('s3://bucket/path/to/data-*.csv')
>>> df = dd.read_parquet('gcs://bucket/path/to/data-*.parq')

对于 Amazon S3 或 Google Cloud Storage 等远程系统,您可能需要提供凭据。这些通常存储在配置文件中,但在某些情况下,您可能希望将存储特定的选项传递给存储后端。您可以使用 storage_options 参数来实现这一点

>>> df = dd.read_csv('s3://bucket-name/my-data-*.csv',
...                  storage_options={'anon': True})
>>> df = dd.read_parquet('gs://dask-nyc-taxi/yellowtrip.parquet',
...                      storage_options={'token': 'anon'})

请参阅有关连接 Amazon S3Google Cloud Storage 的文档。

从函数映射

对于上述函数未涵盖的情况,但可以通过简单的 map 操作捕获的情况,from_map() 可能是创建 DataFrame 最方便的方式。例如,可以使用此 API 将任意 PyArrow Dataset 对象通过将片段映射到 DataFrame 分区来转换为 DataFrame 集合

>>> import pyarrow.dataset as ds
>>> dataset = ds.dataset("hive_data_path", format="orc", partitioning="hive")
>>> fragments = dataset.get_fragments()
>>> func = lambda frag: frag.to_table().to_pandas()
>>> df = dd.from_map(func, fragments)

Dask Delayed

Dask delayed 在简单的 map 操作不足以捕获您的数据布局复杂性时特别有用。它允许您从任意 Python 函数调用构造 Dask DataFrames,这对于处理自定义数据格式或在加载数据时嵌入特定逻辑非常有用。请参阅使用 dask.delayed 处理集合的文档。

存储

本地写入文件

您可以将文件保存在本地,前提是每个 worker 都可以访问相同的文件系统。Worker 可以位于同一台机器上,或者可以将网络文件系统挂载并在每个 worker 节点上引用相同的路径位置。请参阅有关本地访问数据的文档。

写入远程位置

Dask 可以写入各种数据存储,包括云对象存储。例如,您可以将 dask.dataframe 写入 Azure 存储 blob,如下所示:

>>> d = {'col1': [1, 2, 3, 4], 'col2': [5, 6, 7, 8]}
>>> df = dd.from_pandas(pd.DataFrame(data=d), npartitions=2)
>>> dd.to_parquet(df=df,
...               path='abfs://CONTAINER/FILE.parquet'
...               storage_options={'account_name': 'ACCOUNT_NAME',
...                                'account_key': 'ACCOUNT_KEY'}

请参阅连接远程数据的操作指南。