使用 Dask DataFrames 加载和保存数据
目录
使用 Dask DataFrames 加载和保存数据¶
您可以从各种数据存储格式(如 CSV、HDF、Apache Parquet 等)创建 Dask DataFrame。对于大多数格式,这些数据可以存储在各种存储系统中,包括本地磁盘、网络文件系统 (NFS)、Hadoop 分布式文件系统 (HDFS)、Google Cloud Storage 和 Amazon S3(HDF 除外,它仅适用于类似 POSIX 的文件系统)。
有关 dask.dataframe
的范围、用法和限制的更多信息,请参阅 DataFrame 概述页面;有关常见问题的更多技巧和解决方案,请参阅 DataFrame 最佳实践。
API¶
以下函数提供了 Dask DataFrames、文件格式以及其他 Dask 或 Python 集合之间的转换访问。
文件格式
|
将 CSV 文件读取到 Dask.DataFrame 中 |
|
将 Parquet 文件读取到 Dask DataFrame 中 |
|
将 HDF 文件读取到 Dask DataFrame 中 |
|
从 ORC 文件读取 DataFrame |
|
从一组 JSON 文件创建 DataFrame |
|
将 SQL 数据库表读取到 DataFrame 中。 |
|
将 SQL 查询读取到 DataFrame 中。 |
|
将 SQL 查询或数据库表读取到 DataFrame 中。 |
|
将分隔符文件读取到 Dask.DataFrame 中 |
|
将固定宽度文件读取到 Dask.DataFrame 中 |
|
将任何可切片的数组读取到 Dask DataFrame 中 |
|
将 Dask DataFrame 存储到 CSV 文件 |
|
将 Dask.dataframe 存储到 Parquet 文件 |
|
将 Dask Dataframe 存储到分层数据格式 (HDF) 文件 |
|
将 Dask Dataframe 存储到 SQL 表 |
Dask 集合
|
从多个 Dask Delayed 对象创建 Dask DataFrame |
|
从 Dask Array 创建 Dask DataFrame。 |
|
从自定义函数映射创建 DataFrame 集合。 |
|
从 Dask Bag 创建 Dask DataFrame。 |
|
转换为 |
|
从 Dask Dataframe 创建 Dask Array |
|
从 Dask DataFrame 创建 Dask Bag |
Pandas
|
从 Pandas DataFrame 构建 Dask DataFrame |
|
从 Python 字典构建 Dask DataFrame |
其他文件格式
创建¶
从 CSV 读取¶
您可以使用 read_csv()
将一个或多个 CSV 文件读取到 Dask DataFrame 中。它支持使用 globstrings 同时加载多个文件
>>> df = dd.read_csv('myfiles.*.csv')
您可以使用 blocksize
参数拆分单个大文件
>>> df = dd.read_csv('largefile.csv', blocksize=25e6) # 25MB chunks
更改 blocksize
参数会更改分区的数量(参见分区的解释)。处理 Dask DataFrames 的一个经验法则是将您的分区大小保持在 100MB 以下。
从 Parquet 读取¶
类似地,您可以使用 read_parquet()
读取一个或多个 Parquet 文件。您可以读取单个 Parquet 文件
>>> df = dd.read_parquet("path/to/mydata.parquet")
或本地 Parquet 文件目录
>>> df = dd.read_parquet("path/to/my/parquet/")
有关使用 Parquet 文件的更多详细信息,包括技巧和最佳实践,请参阅Dask Dataframe 和 Parquet 文档。
从云存储读取¶
Dask 可以从各种数据存储(包括云对象存储)读取数据。您可以通过在 dd.read_csv
等常用数据访问函数中使用的路径前加上 s3://
等协议来实现这一点
>>> df = dd.read_csv('s3://bucket/path/to/data-*.csv')
>>> df = dd.read_parquet('gcs://bucket/path/to/data-*.parq')
对于 Amazon S3 或 Google Cloud Storage 等远程系统,您可能需要提供凭据。这些通常存储在配置文件中,但在某些情况下,您可能希望将存储特定的选项传递给存储后端。您可以使用 storage_options
参数来实现这一点
>>> df = dd.read_csv('s3://bucket-name/my-data-*.csv',
... storage_options={'anon': True})
>>> df = dd.read_parquet('gs://dask-nyc-taxi/yellowtrip.parquet',
... storage_options={'token': 'anon'})
请参阅有关连接 Amazon S3 或 Google Cloud Storage 的文档。
从函数映射¶
对于上述函数未涵盖的情况,但可以通过简单的 map
操作捕获的情况,from_map()
可能是创建 DataFrame 最方便的方式。例如,可以使用此 API 将任意 PyArrow Dataset
对象通过将片段映射到 DataFrame 分区来转换为 DataFrame 集合
>>> import pyarrow.dataset as ds
>>> dataset = ds.dataset("hive_data_path", format="orc", partitioning="hive")
>>> fragments = dataset.get_fragments()
>>> func = lambda frag: frag.to_table().to_pandas()
>>> df = dd.from_map(func, fragments)
Dask Delayed¶
Dask delayed 在简单的 map
操作不足以捕获您的数据布局复杂性时特别有用。它允许您从任意 Python 函数调用构造 Dask DataFrames,这对于处理自定义数据格式或在加载数据时嵌入特定逻辑非常有用。请参阅使用 dask.delayed 处理集合的文档。
存储¶
本地写入文件¶
您可以将文件保存在本地,前提是每个 worker 都可以访问相同的文件系统。Worker 可以位于同一台机器上,或者可以将网络文件系统挂载并在每个 worker 节点上引用相同的路径位置。请参阅有关本地访问数据的文档。
写入远程位置¶
Dask 可以写入各种数据存储,包括云对象存储。例如,您可以将 dask.dataframe
写入 Azure 存储 blob,如下所示:
>>> d = {'col1': [1, 2, 3, 4], 'col2': [5, 6, 7, 8]}
>>> df = dd.from_pandas(pd.DataFrame(data=d), npartitions=2)
>>> dd.to_parquet(df=df,
... path='abfs://CONTAINER/FILE.parquet'
... storage_options={'account_name': 'ACCOUNT_NAME',
... 'account_key': 'ACCOUNT_KEY'}
请参阅连接远程数据的操作指南。