连接到远程数据
目录
连接到远程数据¶
Dask 可以从各种数据存储读取数据,包括本地文件系统、网络文件系统、云对象存储和 Hadoop。通常通过在常用数据访问函数(如 dd.read_csv
)中使用的路径前加上诸如 "s3://"
的协议来完成。
import dask.dataframe as dd
df = dd.read_csv('s3://bucket/path/to/data-*.csv')
df = dd.read_parquet('gcs://bucket/path/to/data-*.parq')
import dask.bag as db
b = db.read_text('hdfs://path/to/*.json').map(json.loads)
Dask 使用 fsspec 进行本地、集群和远程数据 I/O。其他文件交互,例如加载配置,则使用普通的 Python 方法。
以下远程服务得到了很好的支持,并针对主代码库进行了测试
本地或网络文件系统:
file://
- 本地文件系统,在没有指定协议时的默认值。Hadoop 文件系统:
hdfs://
- Hadoop 分布式文件系统,用于集群内的弹性、复制文件。这使用 PyArrow 作为后端。Amazon S3:
s3://
- Amazon S3 远程二进制存储,常与 Amazon EC2 一起使用,使用 s3fs 库。Google Cloud Storage:
gcs://
或gs://
- Google Cloud Storage,通常与 Google Compute 资源一起使用,使用 gcsfs。Microsoft Azure Storage:
adl://
,abfs://
或az://
- Microsoft Azure Storage,使用 adlfs。Hugging Face:
hf://
- 用于 AI 数据集的 Hugging Face Hub,使用 huggingface_hub 库。HTTP(s):
http://
或https://
,用于直接从 HTTP Web 服务器读取数据。
fsspec 还提供了其他 Dask 用户可能感兴趣的文件系统,例如 ssh、ftp、webhdfs 和 dropbox。有关更多信息,请参阅文档。
指定存储位置时,应使用通用形式 protocol://path/to/data
提供 URL。如果未提供协议,则默认为本地文件系统(与 file://
相同)。
有关 Dask 如何处理远程数据的更低级别细节,请参阅下面的内部原理部分
可选参数¶
有两种方法可以将参数传递给后端文件系统驱动程序:扩展 URL 以包含用户名、密码、服务器、端口等;以及提供 storage_options
,一个用于传递参数的字典。后一种形式更通用,因为可以传递任意数量的文件系统特定选项。
示例
df = dd.read_csv('hdfs://user@server:port/path/*.csv')
df = dd.read_parquet('s3://bucket/path',
storage_options={'anon': True, 'use_ssl': False})
接下来列出了如何为主要后端提供配置的详细信息,但更详细的信息可以在相关后端的文档页面中找到。
每个后端都有额外的安装要求,并且可能在运行时不可用。字典 fsspec.registry
包含当前导入的文件系统。要查看 fsspec
知道如何导入哪些后端,您可以这样做
from fsspec.registry import known_implementations
known_implementations
请注意,如果某些后端可以通过多个协议字符串(如“http”和“https”)引用,它们会显示两次。
本地文件系统¶
本地文件始终可访问,并且通过 URL(路径本身之外)或 storage_options
字典传递的所有参数都将被忽略。
这是默认后端,在未传递任何协议时使用此后端。
我们在此假设每个 worker 都可以访问相同的文件系统 - 要么 worker 位于同一台机器上,要么网络文件系统被挂载并在每个 worker 节点上的相同路径位置被引用。
Dask 的数据 I/O 方法通常会将相对路径规范化为绝对路径,以便 worker 即使其工作目录与您的 client 不同,也能读取数据。但是,其他函数(包括您自己编写的函数)可能不会规范化路径。如果您的 Client 的当前工作目录与 Worker 的工作目录不同,您可能会遇到由于相对路径不正确而找不到文件的问题。
通常,如果您的 Client、Workers(以及可能的 Scheduler)都共享相同的工作目录,事情会最简单。您可以使用类似以下代码片段检查这一点
>>> import os
>>> from dask.distributed import Client, LocalCluster
>>> client = Client(LocalCluster())
>>> client.run(os.getcwd)
{'tcp://127.0.0.1:64597': '/home/coder',
'tcp://127.0.0.1:64598': '/home/coder'}
Hadoop 文件系统¶
Hadoop 文件系统 (HDFS) 是一个广泛部署的分布式、数据本地的文件系统,用 Java 编写。此文件系统支持许多运行 Hadoop 和 Spark 的集群。HDFS 支持由 PyArrow 提供。
默认情况下,后端会尝试从每个节点上的本地 Hadoop 配置文件中读取默认服务器和端口,因此可能不需要任何配置。但是,服务器、端口和用户可以作为 URL 的一部分传递:hdfs://user:pass@server:port/path/to/data
,或使用 storage_options=
kwarg。
PyArrow 的额外配置¶
以下额外选项可以通过 storage_options
传递给 PyArrow
驱动程序
host
,port
,user
: 基本认证
kerb_ticket
: Kerberos 票据缓存路径
PyArrow 的 libhdfs
驱动程序也可能受到一些环境变量的影响。有关这些变量的更多信息,请参阅 PyArrow 文档。
Amazon S3¶
Amazon S3(简单存储服务)是亚马逊网络服务 (Amazon Web Services) 提供的一项 Web 服务。
Dask 可用的 S3 后端是 s3fs,在导入 Dask 时可以导入它。
S3 的认证由底层库 boto3 提供。正如 auth docs 中所述,这可以通过将凭据文件放置在每个节点上的几个位置之一来实现:~/.aws/credentials
、~/.aws/config
、/etc/boto.cfg
和 ~/.boto
。另外,对于位于 Amazon EC2 中的节点,可以为每个节点设置 IAM 角色,之后不再需要进一步配置。用户凭据的最终认证选项可以直接在 URL 中传递(s3://keyID:keySecret/bucket/key/name
)或使用 storage_options
。然而,在这种情况下,密钥/秘密信息将以明文形式传递给所有 worker,因此此方法仅在安全良好的网络中推荐使用。
以下参数可以使用 storage_options
传递给 s3fs
anon: 访问是否应为匿名(默认为 False)
key, secret: 用于用户认证
token: 如果已使用其他 S3 客户端完成认证
use_ssl: 连接是否加密和安全(默认为 True)
client_kwargs: 传递给 boto3 客户端的字典,包含 region_name 或 endpoint_url 等键。注意:请勿在此处传递 config 选项,而是将其内容传递给 config_kwargs。
config_kwargs: 传递给 s3fs.S3FileSystem 的字典,它将此字典传递给 boto3 client 的 config 选项。
requester_pays: 如果认证用户将承担传输费用,则设置为 True,某些批量数据提供商需要此设置
default_block_size, default_fill_cache: 这些对 Dask 用户来说不是特别重要,因为它们涉及连续读取之间的缓冲区行为
kwargs: 其他参数传递给 boto3 Session 对象,例如 profile_name,用于从上面提到的配置文件中选择一个认证部分(参见此处)
使用其他 S3 兼容服务¶
通过使用 endpoint_url 选项,您可以使用其他 S3 兼容服务,例如使用 AlibabaCloud OSS
dask_function(...,
storage_options={
"key": ...,
"secret": ...,
"client_kwargs": {
"endpoint_url": "http://some-region.some-s3-compatible.com",
},
# this dict goes to boto3 client's `config`
# `addressing_style` is required by AlibabaCloud, other services may not
"config_kwargs": {"s3": {"addressing_style": "virtual"}},
})
Google Cloud Storage¶
Google Cloud Storage 是一种 RESTful 在线文件存储 Web 服务,用于在 Google 基础设施上存储和访问数据。
GCS 后端通过协议标识符 gcs
和 gs
标识,它们的效果相同。
支持多种认证模式。这些选项应作为 {'token': ..}
包含在 storage_options
字典中,并在调用基于存储的 Dask 函数/方法时提交。有关更多详细信息,请参阅 gcsfs 文档。
分布式集群的通用建议,按顺序排列
对公共数据使用
anon
如果可用则使用
cloud
使用 gcloud 生成 JSON 文件,将其分发给所有 worker,并提供文件的路径
直接使用 gcsfs 的
browser
方法生成令牌缓存文件(~/.gcs_tokens
),并将其分发给所有 worker,之后使用cache
方法
下面显示了最后一个建议,这对于认证访问(与匿名访问相对)来说可能是最快、最简单的,因为它不需要重新认证。然而,此方法不安全,因为凭据将直接在集群中传递。如果您确定集群本身是安全的,则此方法没有问题。您需要使用任何适合您的方法创建 GCSFileSystem
对象,然后直接传递其凭据
gcs = GCSFileSystem(...)
dask_function(..., storage_options={'token': gcs.session.credentials})
Microsoft Azure Storage¶
Microsoft Azure Storage 包括 Data Lake Storage (Gen1) 和 Blob Storage (Gen2)。它们分别由 adlfs 后端提供的协议标识符 adl
和 abfs
标识。
adl
的认证需要在 storage_options
字典中包含 tenant_id
、client_id
和 client_secret
。
abfs
的认证需要在 storage_options
中包含 account_name
、tenant_id
、client_id
和 client_secret
,用于 RBAC 和 ACL 访问模型;或者包含 account_name
和 account_key
,用于共享密钥访问模型。
HTTP(S)¶
通过 HTTP 和 HTTPS 可以直接以文件方式访问任意 URL。但是,HTTP 上不存在 glob
功能,因此只能使用显式文件列表。
服务器实现提供的vary信息不同——它们可能通过 HEAD 请求或在下载开始时指定文件大小,也可能不指定——并且有些服务器可能不遵守字节范围请求。因此,HTTPFileSystem 提供了尽力而为的行为:下载是流式的,但是如果看到的数据超过配置的块大小,则会引发错误。为了能够访问此类数据,您必须一次性读取整个文件(并且它必须适合内存)。
使用块大小为 0 将返回正常的 requests
流式文件对象,这些对象是稳定的,但不提供随机访问。
开发者 API¶
任何文件系统后端的原型都可以在 fsspec.spec.AbstractFileSystem
中找到。任何新的实现都应提供相同的 API,或直接继承,并作为协议供 Dask 使用。例如,以下代码将注册由实现类 MyProtoFileSystem
描述的协议“myproto”。此后,形式为 myproto://
的 URL 将分派给此类的相应方法
fsspec.registry['myproto'] = MyProtoFileSystem
然而,最好向 fsspec
提交 PR,将该类包含在 known_implementations
中。
内部原理¶
Dask 在 dask.bytes
包中包含了用于可扩展数据摄取的内部工具,并使用来自 fsspec 的 open_files
等外部工具。. 这些函数更侧重于开发者,而非直接供用户使用。这些函数支持面向用户的函数,如 dd.read_csv
和 db.read_text
,这些函数对大多数用户来说可能更有用。
|
给定一个或多个路径,返回从这些路径读取的 delayed 对象。 |
此函数在其输出格式(字节)、输入位置(文件系统、S3、HDFS)、行分隔符和压缩格式方面具有可扩展性。
此函数是惰性的,返回指向字节块(read_bytes
)的指针。它通过在协议前加上前缀(如 s3://
或 hdfs://
(见下文))来处理不同的存储后端。它处理 fsspec.compression
中列出的压缩格式,其中一些可能需要安装额外的包。
此函数并非用于所有数据源。一些数据源,如 HDF5,非常特殊,需要特殊处理。
分隔符¶
read_bytes
函数接受一个路径(或路径的 glob 字符串),并生成第一个文件的样本以及每个其他文件的延迟对象列表。如果传递了分隔符(例如 delimiter=b'\n'
),它将确保字节块直接在分隔符之后开始,并在分隔符之前结束。这允许其他函数(如 pd.read_csv
)以预期的行为对这些延迟值进行操作。
这些分隔符对于典型的基于行的格式(日志文件、CSV、JSON)以及其他分隔格式(如 Avro,它可能使用复杂的哨兵字符串分隔逻辑块)都很有用。请注意,分隔符查找算法很简单,不会考虑被转义的字符、UTF-8 码序列的一部分或字符串引号内的字符。
压缩¶
这些函数支持广泛可用的压缩技术,如 gzip
、bz2
、xz
、snappy
和 lz4
。通过将函数插入到 fsspec.compression
模块中可用的字典中,可以轻松添加更多压缩格式。这可以在运行时完成,无需直接添加到代码库。
然而,大多数压缩技术(如 gzip
)不支持高效的随机访问,因此对于流式处理 fsspec.open_files
很有用,但对于在不同点分割文件的 read_bytes
则不太有用。
API¶
- dask.bytes.read_bytes(urlpath, delimiter=None, not_zero=False, blocksize='128 MiB', sample='10 kiB', compression=None, include_path=False, **kwargs)[source]¶
给定一个或多个路径,返回从这些路径读取的 delayed 对象。
路径可以是像
'2015-01-01.csv'
这样的文件名,也可以是像'2015-*-*.csv'
这样的 glob 字符串。如果安装了相应的库,路径前面可以带上协议,例如
s3://
或hdfs://
。如果提供了分隔符,此函数会根据分隔符清晰地分割数据,以便块边界直接在分隔符之后开始并在分隔符处结束。
- 参数
- urlpath字符串或列表
绝对或相对文件路径。以
s3://
等协议为前缀,以便从其他文件系统读取。要从多个文件读取,您可以传递一个 glob 字符串或路径列表,但需要注意它们必须具有相同的协议。- delimiter字节
可选的分隔符,例如
b'\n'
,用于分割字节块。- not_zero布尔值
强制查找文件开始分隔符,丢弃头部。
- blocksize整型,字符串
块大小(字节),默认为“128 MiB”
- compression字符串或 None
诸如 ‘gzip’ 或 ‘xz’ 的字符串。必须支持高效的随机访问。
- sample整型,字符串或布尔值
是否返回头部样本。值可以是
False
表示“不请求样本”,也可以是整数或字符串值,例如2**20
或"1 MiB"
。- include_path布尔值
是否在表示特定文件的字节中包含路径。默认为 False。
- **kwargs字典
特定存储连接有意义的额外选项,例如 host、port、username、password 等。
- 返回值
- sample字节
样本头部
- blocks包含
dask.Delayed
列表的列表 每个列表对应一个文件,每个延迟对象计算得出该文件的一个字节块。
- paths字符串列表,仅在 include_path 为 True 时包含
长度与 blocks 相同,每个项目是对应块表示的文件的路径。
示例
>>> sample, blocks = read_bytes('2015-*-*.csv', delimiter=b'\n') >>> sample, blocks = read_bytes('s3://bucket/2015-*-*.csv', delimiter=b'\n') >>> sample, paths, blocks = read_bytes('2015-*-*.csv', include_path=True)