使用 Dask DataFrames 加载和保存数据

您可以从各种数据存储格式创建 Dask DataFrame,例如 CSV、HDF、Apache Parquet 等。对于大多数格式,数据可以存储在各种存储系统中,包括本地磁盘、网络文件系统 (NFS)、Hadoop 分布式文件系统 (HDFS)、Google Cloud Storage 和 Amazon S3(HDF 除外,它仅在类似 POSIX 的文件系统上可用)。

有关 dask.dataframe 的范围、使用和限制,请参阅DataFrame 概览页面;有关常见问题的更多技巧和解决方案,请参阅DataFrame 最佳实践

API

以下函数提供了在 Dask DataFrames、文件格式以及其他 Dask 或 Python 集合之间转换的访问方式。

文件格式

read_csv(urlpath[, blocksize, ...])

将 CSV 文件读取到 Dask.DataFrame 中

read_parquet([path, columns, filters, ...])

将 Parquet 文件读取到 Dask DataFrame 中

read_hdf(pattern, key[, start, stop, ...])

将 HDF 文件读取到 Dask DataFrame 中

read_orc(path[, engine, columns, index, ...])

从 ORC 文件读取 DataFrame

read_json(url_path[, orient, lines, ...])

从一组 JSON 文件创建 DataFrame

read_sql_table(table_name, con, index_col[, ...])

将 SQL 数据库表读取到 DataFrame 中。

read_sql_query(sql, con, index_col[, ...])

将 SQL 查询读取到 DataFrame 中。

read_sql(sql, con, index_col, **kwargs)

将 SQL 查询或数据库表读取到 DataFrame 中。

read_table(urlpath[, blocksize, ...])

将带分隔符的文件读取到 Dask.DataFrame 中

read_fwf(urlpath[, blocksize, ...])

将固定宽度文件读取到 Dask.DataFrame 中

from_array(arr[, chunksize, columns, meta])

将任何可切片数组读取到 Dask Dataframe 中

to_csv(df, filename[, single_file, ...])

将 Dask DataFrame 存储到 CSV 文件

to_parquet(df, path[, compression, ...])

将 Dask.dataframe 存储到 Parquet 文件

to_hdf(df, path, key[, mode, append, ...])

将 Dask Dataframe 存储到分层数据格式 (HDF) 文件

to_sql(df, name, uri[, schema, if_exists, ...])

将 Dask Dataframe 存储到 SQL 表中

Dask 集合

from_delayed(dfs[, meta, divisions, prefix, ...])

从多个 Dask Delayed 对象创建 Dask DataFrame

from_dask_array(x[, columns, index, meta])

从 Dask 数组创建 Dask DataFrame。

from_map(func, *iterables[, args, meta, ...])

从自定义函数映射创建 DataFrame 集合。

dask.bag.core.Bag.to_dataframe([meta, ...])

从 Dask Bag 创建 Dask Dataframe。

DataFrame.to_delayed([optimize_graph])

转换为 dask.delayed 对象列表,每个分区一个。

to_records(df)

从 Dask Dataframe 创建 Dask 数组

to_bag(df[, index, format])

从 Dask DataFrame 创建 Dask Bag

Pandas

from_pandas(data[, npartitions, sort, chunksize])

从 Pandas DataFrame 构造 Dask DataFrame

DataFrame.from_dict(data, *[, npartitions, ...])

从 Python 字典构造 Dask DataFrame

其他文件格式

创建

从 CSV 读取

您可以使用 read_csv() 将一个或多个 CSV 文件读取到 Dask DataFrame 中。它支持使用 globstrings 一次加载多个文件

>>> df = dd.read_csv('myfiles.*.csv')

您可以使用 blocksize 参数拆分单个大型文件

>>> df = dd.read_csv('largefile.csv', blocksize=25e6)  # 25MB chunks

更改 blocksize 参数将改变分区数量(参见分区的解释)。使用 Dask DataFrames 时,一个好的经验法则是将每个分区的大小保持在 100MB 以下。

从 Parquet 读取

类似地,您可以使用 read_parquet() 读取一个或多个 Parquet 文件。您可以读取单个 Parquet 文件

>>> df = dd.read_parquet("path/to/mydata.parquet")

或者读取本地 Parquet 文件目录

>>> df = dd.read_parquet("path/to/my/parquet/")

有关使用 Parquet 文件的更多详细信息,包括技巧和最佳实践,请参阅Dask Dataframe 和 Parquet 的文档。

从云存储读取

Dask 可以从各种数据存储读取数据,包括云对象存储。您可以通过在 dd.read_csv 等常见数据访问函数中使用的路径前加上诸如 s3:// 之类的协议来做到这一点

>>> df = dd.read_csv('s3://bucket/path/to/data-*.csv')
>>> df = dd.read_parquet('gcs://bucket/path/to/data-*.parq')

对于 Amazon S3 或 Google Cloud Storage 等远程系统,您可能需要提供凭据。这些通常存储在配置文件中,但在某些情况下,您可能希望将存储特定的选项传递给存储后端。您可以使用 storage_options 参数来做到这一点

>>> df = dd.read_csv('s3://bucket-name/my-data-*.csv',
...                  storage_options={'anon': True})
>>> df = dd.read_parquet('gs://dask-nyc-taxi/yellowtrip.parquet',
...                      storage_options={'token': 'anon'})

有关连接到 Amazon S3Google Cloud Storage 的文档,请参见相关页面。

从函数映射

对于上述函数未涵盖、但可以通过简单的 map 操作捕获的情况,from_map() 可能是创建 DataFrame 最便捷的方式。例如,可以使用此 API 将任意 PyArrow Dataset 对象通过将片段映射到 DataFrame 分区来转换为 DataFrame 集合

>>> import pyarrow.dataset as ds
>>> dataset = ds.dataset("hive_data_path", format="orc", partitioning="hive")
>>> fragments = dataset.get_fragments()
>>> func = lambda frag: frag.to_table().to_pandas()
>>> df = dd.from_map(func, fragments)

Dask Delayed

Dask delayed 在简单的 map 操作不足以捕获您的数据布局复杂性时特别有用。它允许您从任意 Python 函数调用构造 Dask DataFrames,这对于处理自定义数据格式或在加载数据时嵌入特定逻辑非常有用。请参阅关于将 dask.delayed 与集合一起使用的文档

存储

本地写入文件

您可以本地保存文件,前提是每个 worker 都可以访问相同的文件系统。worker 可以位于同一台机器上,或者可以将网络文件系统挂载并在每个 worker 节点上引用相同的路径位置。请参阅本地访问数据的文档。

写入远程位置

Dask 可以写入各种数据存储,包括云对象存储。例如,您可以将 dask.dataframe 写入 Azure 存储 blob,如下所示

>>> d = {'col1': [1, 2, 3, 4], 'col2': [5, 6, 7, 8]}
>>> df = dd.from_pandas(pd.DataFrame(data=d), npartitions=2)
>>> dd.to_parquet(df=df,
...               path='abfs://CONTAINER/FILE.parquet'
...               storage_options={'account_name': 'ACCOUNT_NAME',
...                                'account_key': 'ACCOUNT_KEY'}

请参阅连接远程数据的操作指南