使用 Dask DataFrames 加载和保存数据
目录
使用 Dask DataFrames 加载和保存数据¶
您可以从各种数据存储格式创建 Dask DataFrame,例如 CSV、HDF、Apache Parquet 等。对于大多数格式,数据可以存储在各种存储系统中,包括本地磁盘、网络文件系统 (NFS)、Hadoop 分布式文件系统 (HDFS)、Google Cloud Storage 和 Amazon S3(HDF 除外,它仅在类似 POSIX 的文件系统上可用)。
有关 dask.dataframe
的范围、使用和限制,请参阅DataFrame 概览页面;有关常见问题的更多技巧和解决方案,请参阅DataFrame 最佳实践。
API¶
以下函数提供了在 Dask DataFrames、文件格式以及其他 Dask 或 Python 集合之间转换的访问方式。
文件格式
|
将 CSV 文件读取到 Dask.DataFrame 中 |
|
将 Parquet 文件读取到 Dask DataFrame 中 |
|
将 HDF 文件读取到 Dask DataFrame 中 |
|
从 ORC 文件读取 DataFrame |
|
从一组 JSON 文件创建 DataFrame |
|
将 SQL 数据库表读取到 DataFrame 中。 |
|
将 SQL 查询读取到 DataFrame 中。 |
|
将 SQL 查询或数据库表读取到 DataFrame 中。 |
|
将带分隔符的文件读取到 Dask.DataFrame 中 |
|
将固定宽度文件读取到 Dask.DataFrame 中 |
|
将任何可切片数组读取到 Dask Dataframe 中 |
|
将 Dask DataFrame 存储到 CSV 文件 |
|
将 Dask.dataframe 存储到 Parquet 文件 |
|
将 Dask Dataframe 存储到分层数据格式 (HDF) 文件 |
|
将 Dask Dataframe 存储到 SQL 表中 |
Dask 集合
|
从多个 Dask Delayed 对象创建 Dask DataFrame |
|
从 Dask 数组创建 Dask DataFrame。 |
|
从自定义函数映射创建 DataFrame 集合。 |
|
从 Dask Bag 创建 Dask Dataframe。 |
|
转换为 |
|
从 Dask Dataframe 创建 Dask 数组 |
|
从 Dask DataFrame 创建 Dask Bag |
Pandas
|
从 Pandas DataFrame 构造 Dask DataFrame |
|
从 Python 字典构造 Dask DataFrame |
其他文件格式
创建¶
从 CSV 读取¶
您可以使用 read_csv()
将一个或多个 CSV 文件读取到 Dask DataFrame 中。它支持使用 globstrings 一次加载多个文件
>>> df = dd.read_csv('myfiles.*.csv')
您可以使用 blocksize
参数拆分单个大型文件
>>> df = dd.read_csv('largefile.csv', blocksize=25e6) # 25MB chunks
更改 blocksize
参数将改变分区数量(参见分区的解释)。使用 Dask DataFrames 时,一个好的经验法则是将每个分区的大小保持在 100MB 以下。
从 Parquet 读取¶
类似地,您可以使用 read_parquet()
读取一个或多个 Parquet 文件。您可以读取单个 Parquet 文件
>>> df = dd.read_parquet("path/to/mydata.parquet")
或者读取本地 Parquet 文件目录
>>> df = dd.read_parquet("path/to/my/parquet/")
有关使用 Parquet 文件的更多详细信息,包括技巧和最佳实践,请参阅Dask Dataframe 和 Parquet 的文档。
从云存储读取¶
Dask 可以从各种数据存储读取数据,包括云对象存储。您可以通过在 dd.read_csv
等常见数据访问函数中使用的路径前加上诸如 s3://
之类的协议来做到这一点
>>> df = dd.read_csv('s3://bucket/path/to/data-*.csv')
>>> df = dd.read_parquet('gcs://bucket/path/to/data-*.parq')
对于 Amazon S3 或 Google Cloud Storage 等远程系统,您可能需要提供凭据。这些通常存储在配置文件中,但在某些情况下,您可能希望将存储特定的选项传递给存储后端。您可以使用 storage_options
参数来做到这一点
>>> df = dd.read_csv('s3://bucket-name/my-data-*.csv',
... storage_options={'anon': True})
>>> df = dd.read_parquet('gs://dask-nyc-taxi/yellowtrip.parquet',
... storage_options={'token': 'anon'})
有关连接到 Amazon S3 或 Google Cloud Storage 的文档,请参见相关页面。
从函数映射¶
对于上述函数未涵盖、但可以通过简单的 map
操作捕获的情况,from_map()
可能是创建 DataFrame 最便捷的方式。例如,可以使用此 API 将任意 PyArrow Dataset
对象通过将片段映射到 DataFrame 分区来转换为 DataFrame 集合
>>> import pyarrow.dataset as ds
>>> dataset = ds.dataset("hive_data_path", format="orc", partitioning="hive")
>>> fragments = dataset.get_fragments()
>>> func = lambda frag: frag.to_table().to_pandas()
>>> df = dd.from_map(func, fragments)
Dask Delayed¶
Dask delayed 在简单的 map
操作不足以捕获您的数据布局复杂性时特别有用。它允许您从任意 Python 函数调用构造 Dask DataFrames,这对于处理自定义数据格式或在加载数据时嵌入特定逻辑非常有用。请参阅关于将 dask.delayed 与集合一起使用的文档。
存储¶
本地写入文件¶
您可以本地保存文件,前提是每个 worker 都可以访问相同的文件系统。worker 可以位于同一台机器上,或者可以将网络文件系统挂载并在每个 worker 节点上引用相同的路径位置。请参阅本地访问数据的文档。
写入远程位置¶
Dask 可以写入各种数据存储,包括云对象存储。例如,您可以将 dask.dataframe
写入 Azure 存储 blob,如下所示
>>> d = {'col1': [1, 2, 3, 4], 'col2': [5, 6, 7, 8]}
>>> df = dd.from_pandas(pd.DataFrame(data=d), npartitions=2)
>>> dd.to_parquet(df=df,
... path='abfs://CONTAINER/FILE.parquet'
... storage_options={'account_name': 'ACCOUNT_NAME',
... 'account_key': 'ACCOUNT_KEY'}
请参阅连接远程数据的操作指南。