连接到远程数据

Dask 可以从各种数据存储读取数据,包括本地文件系统、网络文件系统、云对象存储和 Hadoop。通常通过在常用数据访问函数(如 dd.read_csv)中使用的路径前加上诸如 "s3://" 的协议来完成。

import dask.dataframe as dd
df = dd.read_csv('s3://bucket/path/to/data-*.csv')
df = dd.read_parquet('gcs://bucket/path/to/data-*.parq')

import dask.bag as db
b = db.read_text('hdfs://path/to/*.json').map(json.loads)

Dask 使用 fsspec 进行本地、集群和远程数据 I/O。其他文件交互,例如加载配置,则使用普通的 Python 方法。

以下远程服务得到了很好的支持,并针对主代码库进行了测试

  • 本地或网络文件系统: file:// - 本地文件系统,在没有指定协议时的默认值。

  • Hadoop 文件系统: hdfs:// - Hadoop 分布式文件系统,用于集群内的弹性、复制文件。这使用 PyArrow 作为后端。

  • Amazon S3: s3:// - Amazon S3 远程二进制存储,常与 Amazon EC2 一起使用,使用 s3fs 库。

  • Google Cloud Storage: gcs://gs:// - Google Cloud Storage,通常与 Google Compute 资源一起使用,使用 gcsfs

  • Microsoft Azure Storage: adl://, abfs://az:// - Microsoft Azure Storage,使用 adlfs

  • Hugging Face: hf:// - 用于 AI 数据集的 Hugging Face Hub,使用 huggingface_hub 库。

  • HTTP(s): http://https://,用于直接从 HTTP Web 服务器读取数据。

fsspec 还提供了其他 Dask 用户可能感兴趣的文件系统,例如 ssh、ftp、webhdfs 和 dropbox。有关更多信息,请参阅文档。

指定存储位置时,应使用通用形式 protocol://path/to/data 提供 URL。如果未提供协议,则默认为本地文件系统(与 file:// 相同)。

有关 Dask 如何处理远程数据的更低级别细节,请参阅下面的内部原理部分

可选参数

有两种方法可以将参数传递给后端文件系统驱动程序:扩展 URL 以包含用户名、密码、服务器、端口等;以及提供 storage_options,一个用于传递参数的字典。后一种形式更通用,因为可以传递任意数量的文件系统特定选项。

示例

df = dd.read_csv('hdfs://user@server:port/path/*.csv')

df = dd.read_parquet('s3://bucket/path',
                     storage_options={'anon': True, 'use_ssl': False})

接下来列出了如何为主要后端提供配置的详细信息,但更详细的信息可以在相关后端的文档页面中找到。

每个后端都有额外的安装要求,并且可能在运行时不可用。字典 fsspec.registry 包含当前导入的文件系统。要查看 fsspec 知道如何导入哪些后端,您可以这样做

from fsspec.registry import known_implementations
known_implementations

请注意,如果某些后端可以通过多个协议字符串(如“http”和“https”)引用,它们会显示两次。

本地文件系统

本地文件始终可访问,并且通过 URL(路径本身之外)或 storage_options 字典传递的所有参数都将被忽略。

这是默认后端,在未传递任何协议时使用此后端。

我们在此假设每个 worker 都可以访问相同的文件系统 - 要么 worker 位于同一台机器上,要么网络文件系统被挂载并在每个 worker 节点上的相同路径位置被引用。

Dask 的数据 I/O 方法通常会将相对路径规范化为绝对路径,以便 worker 即使其工作目录与您的 client 不同,也能读取数据。但是,其他函数(包括您自己编写的函数)可能不会规范化路径。如果您的 Client 的当前工作目录与 Worker 的工作目录不同,您可能会遇到由于相对路径不正确而找不到文件的问题。

通常,如果您的 Client、Workers(以及可能的 Scheduler)都共享相同的工作目录,事情会最简单。您可以使用类似以下代码片段检查这一点

>>> import os
>>> from dask.distributed import Client, LocalCluster
>>> client = Client(LocalCluster())
>>> client.run(os.getcwd)  
{'tcp://127.0.0.1:64597': '/home/coder',
 'tcp://127.0.0.1:64598': '/home/coder'}

Hadoop 文件系统

Hadoop 文件系统 (HDFS) 是一个广泛部署的分布式、数据本地的文件系统,用 Java 编写。此文件系统支持许多运行 Hadoop 和 Spark 的集群。HDFS 支持由 PyArrow 提供。

默认情况下,后端会尝试从每个节点上的本地 Hadoop 配置文件中读取默认服务器和端口,因此可能不需要任何配置。但是,服务器、端口和用户可以作为 URL 的一部分传递:hdfs://user:pass@server:port/path/to/data,或使用 storage_options= kwarg。

PyArrow 的额外配置

以下额外选项可以通过 storage_options 传递给 PyArrow 驱动程序

  • host, port, user: 基本认证

  • kerb_ticket: Kerberos 票据缓存路径

PyArrow 的 libhdfs 驱动程序也可能受到一些环境变量的影响。有关这些变量的更多信息,请参阅 PyArrow 文档

Amazon S3

Amazon S3(简单存储服务)是亚马逊网络服务 (Amazon Web Services) 提供的一项 Web 服务。

Dask 可用的 S3 后端是 s3fs,在导入 Dask 时可以导入它。

S3 的认证由底层库 boto3 提供。正如 auth docs 中所述,这可以通过将凭据文件放置在每个节点上的几个位置之一来实现:~/.aws/credentials~/.aws/config/etc/boto.cfg~/.boto。另外,对于位于 Amazon EC2 中的节点,可以为每个节点设置 IAM 角色,之后不再需要进一步配置。用户凭据的最终认证选项可以直接在 URL 中传递(s3://keyID:keySecret/bucket/key/name)或使用 storage_options。然而,在这种情况下,密钥/秘密信息将以明文形式传递给所有 worker,因此此方法仅在安全良好的网络中推荐使用。

以下参数可以使用 storage_options 传递给 s3fs

  • anon: 访问是否应为匿名(默认为 False)

  • key, secret: 用于用户认证

  • token: 如果已使用其他 S3 客户端完成认证

  • use_ssl: 连接是否加密和安全(默认为 True)

  • client_kwargs: 传递给 boto3 客户端的字典,包含 region_nameendpoint_url 等键。注意:请勿在此处传递 config 选项,而是将其内容传递给 config_kwargs

  • config_kwargs: 传递给 s3fs.S3FileSystem 的字典,它将此字典传递给 boto3 client 的 config 选项。

  • requester_pays: 如果认证用户将承担传输费用,则设置为 True,某些批量数据提供商需要此设置

  • default_block_size, default_fill_cache: 这些对 Dask 用户来说不是特别重要,因为它们涉及连续读取之间的缓冲区行为

  • kwargs: 其他参数传递给 boto3 Session 对象,例如 profile_name,用于从上面提到的配置文件中选择一个认证部分(参见此处

使用其他 S3 兼容服务

通过使用 endpoint_url 选项,您可以使用其他 S3 兼容服务,例如使用 AlibabaCloud OSS

dask_function(...,
    storage_options={
        "key": ...,
        "secret": ...,
        "client_kwargs": {
            "endpoint_url": "http://some-region.some-s3-compatible.com",
        },
        # this dict goes to boto3 client's `config`
        #   `addressing_style` is required by AlibabaCloud, other services may not
        "config_kwargs": {"s3": {"addressing_style": "virtual"}},
    })

Google Cloud Storage

Google Cloud Storage 是一种 RESTful 在线文件存储 Web 服务,用于在 Google 基础设施上存储和访问数据。

GCS 后端通过协议标识符 gcsgs 标识,它们的效果相同。

支持多种认证模式。这些选项应作为 {'token': ..} 包含在 storage_options 字典中,并在调用基于存储的 Dask 函数/方法时提交。有关更多详细信息,请参阅 gcsfs 文档。

分布式集群的通用建议,按顺序排列

  • 对公共数据使用 anon

  • 如果可用则使用 cloud

  • 使用 gcloud 生成 JSON 文件,将其分发给所有 worker,并提供文件的路径

  • 直接使用 gcsfs 的 browser 方法生成令牌缓存文件(~/.gcs_tokens),并将其分发给所有 worker,之后使用 cache 方法

下面显示了最后一个建议,这对于认证访问(与匿名访问相对)来说可能是最快、最简单的,因为它不需要重新认证。然而,此方法不安全,因为凭据将直接在集群中传递。如果您确定集群本身是安全的,则此方法没有问题。您需要使用任何适合您的方法创建 GCSFileSystem 对象,然后直接传递其凭据

gcs = GCSFileSystem(...)
dask_function(..., storage_options={'token': gcs.session.credentials})

Microsoft Azure Storage

Microsoft Azure Storage 包括 Data Lake Storage (Gen1) 和 Blob Storage (Gen2)。它们分别由 adlfs 后端提供的协议标识符 adlabfs 标识。

adl 的认证需要在 storage_options 字典中包含 tenant_idclient_idclient_secret

abfs 的认证需要在 storage_options 中包含 account_nametenant_idclient_idclient_secret,用于 RBAC 和 ACL 访问模型;或者包含 account_nameaccount_key,用于共享密钥访问模型。

HTTP(S)

通过 HTTP 和 HTTPS 可以直接以文件方式访问任意 URL。但是,HTTP 上不存在 glob 功能,因此只能使用显式文件列表。

服务器实现提供的vary信息不同——它们可能通过 HEAD 请求或在下载开始时指定文件大小,也可能不指定——并且有些服务器可能不遵守字节范围请求。因此,HTTPFileSystem 提供了尽力而为的行为:下载是流式的,但是如果看到的数据超过配置的块大小,则会引发错误。为了能够访问此类数据,您必须一次性读取整个文件(并且它必须适合内存)。

使用块大小为 0 将返回正常的 requests 流式文件对象,这些对象是稳定的,但不提供随机访问。

开发者 API

任何文件系统后端的原型都可以在 fsspec.spec.AbstractFileSystem 中找到。任何新的实现都应提供相同的 API,或直接继承,并作为协议供 Dask 使用。例如,以下代码将注册由实现类 MyProtoFileSystem 描述的协议“myproto”。此后,形式为 myproto:// 的 URL 将分派给此类的相应方法

fsspec.registry['myproto'] = MyProtoFileSystem

然而,最好向 fsspec 提交 PR,将该类包含在 known_implementations 中。

内部原理

Dask 在 dask.bytes 包中包含了用于可扩展数据摄取的内部工具,并使用来自 fsspecopen_files 等外部工具。. 这些函数更侧重于开发者,而非直接供用户使用。这些函数支持面向用户的函数,如 dd.read_csv db.read_text这些函数对大多数用户来说可能更有用。

read_bytes(urlpath[, delimiter, not_zero, ...])

给定一个或多个路径,返回从这些路径读取的 delayed 对象。

此函数在其输出格式(字节)、输入位置(文件系统、S3、HDFS)、行分隔符和压缩格式方面具有可扩展性。

此函数是惰性的,返回指向字节块(read_bytes)的指针。它通过在协议前加上前缀(如 s3://hdfs://(见下文))来处理不同的存储后端。它处理 fsspec.compression 中列出的压缩格式,其中一些可能需要安装额外的包。

此函数并非用于所有数据源。一些数据源,如 HDF5,非常特殊,需要特殊处理。

分隔符

read_bytes 函数接受一个路径(或路径的 glob 字符串),并生成第一个文件的样本以及每个其他文件的延迟对象列表。如果传递了分隔符(例如 delimiter=b'\n'),它将确保字节块直接在分隔符之后开始,并在分隔符之前结束。这允许其他函数(如 pd.read_csv)以预期的行为对这些延迟值进行操作。

这些分隔符对于典型的基于行的格式(日志文件、CSV、JSON)以及其他分隔格式(如 Avro,它可能使用复杂的哨兵字符串分隔逻辑块)都很有用。请注意,分隔符查找算法很简单,不会考虑被转义的字符、UTF-8 码序列的一部分或字符串引号内的字符。

压缩

这些函数支持广泛可用的压缩技术,如 gzipbz2xzsnappylz4。通过将函数插入到 fsspec.compression 模块中可用的字典中,可以轻松添加更多压缩格式。这可以在运行时完成,无需直接添加到代码库。

然而,大多数压缩技术(如 gzip)不支持高效的随机访问,因此对于流式处理 fsspec.open_files 很有用,但对于在不同点分割文件的 read_bytes 则不太有用。

API

dask.bytes.read_bytes(urlpath, delimiter=None, not_zero=False, blocksize='128 MiB', sample='10 kiB', compression=None, include_path=False, **kwargs)[source]

给定一个或多个路径,返回从这些路径读取的 delayed 对象。

路径可以是像 '2015-01-01.csv' 这样的文件名,也可以是像 '2015-*-*.csv' 这样的 glob 字符串。

如果安装了相应的库,路径前面可以带上协议,例如 s3://hdfs://

如果提供了分隔符,此函数会根据分隔符清晰地分割数据,以便块边界直接在分隔符之后开始并在分隔符处结束。

参数
urlpath字符串或列表

绝对或相对文件路径。以 s3:// 等协议为前缀,以便从其他文件系统读取。要从多个文件读取,您可以传递一个 glob 字符串或路径列表,但需要注意它们必须具有相同的协议。

delimiter字节

可选的分隔符,例如 b'\n',用于分割字节块。

not_zero布尔值

强制查找文件开始分隔符,丢弃头部。

blocksize整型,字符串

块大小(字节),默认为“128 MiB”

compression字符串或 None

诸如 ‘gzip’ 或 ‘xz’ 的字符串。必须支持高效的随机访问。

sample整型,字符串或布尔值

是否返回头部样本。值可以是 False 表示“不请求样本”,也可以是整数或字符串值,例如 2**20"1 MiB"

include_path布尔值

是否在表示特定文件的字节中包含路径。默认为 False。

**kwargs字典

特定存储连接有意义的额外选项,例如 host、port、username、password 等。

返回值
sample字节

样本头部

blocks包含 dask.Delayed 列表的列表

每个列表对应一个文件,每个延迟对象计算得出该文件的一个字节块。

paths字符串列表,仅在 include_path 为 True 时包含

长度与 blocks 相同,每个项目是对应块表示的文件的路径。

示例

>>> sample, blocks = read_bytes('2015-*-*.csv', delimiter=b'\n')  
>>> sample, blocks = read_bytes('s3://bucket/2015-*-*.csv', delimiter=b'\n')  
>>> sample, paths, blocks = read_bytes('2015-*-*.csv', include_path=True)