连接到远程数据

Dask 可以从各种数据存储读取数据,包括本地文件系统、网络文件系统、云对象存储和 Hadoop。通常,这是通过在常用数据访问函数(例如 dd.read_csv)中使用的路径前添加协议(例如 "s3://")来实现的。

import dask.dataframe as dd
df = dd.read_csv('s3://bucket/path/to/data-*.csv')
df = dd.read_parquet('gcs://bucket/path/to/data-*.parq')

import dask.bag as db
b = db.read_text('hdfs://path/to/*.json').map(json.loads)

Dask 使用 fsspec 进行本地、集群和远程数据 IO。其他文件交互,例如加载配置,则使用普通的 Python 方法完成。

以下远程服务得到了良好支持,并针对主代码库进行了测试

  • 本地或网络文件系统file:// - 本地文件系统,在没有指定任何协议时的默认选项。

  • Hadoop 文件系统hdfs:// - Hadoop 分布式文件系统,用于集群内具有弹性、复制的文件。它使用 PyArrow 作为后端。

  • Amazon S3s3:// - Amazon S3 远程二进制存储,常与 Amazon EC2 一起使用,使用库 s3fs

  • Google Cloud Storagegcs://gs:// - Google Cloud Storage,通常与 Google Compute 资源一起使用,使用 gcsfs

  • Microsoft Azure Storageadl://abfs://az:// - Microsoft Azure Storage,使用 adlfs

  • Hugging Facehf:// - 用于 AI 的 Hugging Face 数据集中心,使用 huggingface_hub 库。

  • HTTP(s)http://https:// 用于直接从 HTTP Web 服务器读取数据。

fsspec 还提供了其他可能对 Dask 用户有用的文件系统,例如 ssh、ftp、webhdfs 和 dropbox。详情请参阅文档。

指定存储位置时,应使用通用形式 protocol://path/to/data 提供 URL。如果未提供协议,则假定为本地文件系统(与 file:// 相同)。

关于 Dask 如何处理远程数据的较低级别细节在下面的“内部机制”部分中描述

可选参数

存在两种方法将参数传递给后端文件系统驱动程序:扩展 URL 以包含用户名、密码、服务器、端口等;以及提供 storage_options,这是一个用于传递参数的字典。第二种形式更通用,因为可以传递任意数量的文件系统特定选项。

示例

df = dd.read_csv('hdfs://user@server:port/path/*.csv')

df = dd.read_parquet('s3://bucket/path',
                     storage_options={'anon': True, 'use_ssl': False})

接下来列出了如何为主要后端提供配置的详细信息,但更多细节可以在相关后端的文档页面中找到。

每个后端都有额外的安装要求,并且可能在运行时不可用。字典 fsspec.registry 包含当前导入的文件系统。要查看 fsspec 知道如何导入哪些后端,您可以这样做

from fsspec.registry import known_implementations
known_implementations

请注意,如果某些后端可以使用多个协议字符串(例如“http”和“https”)引用,它们会显示两次。

本地文件系统

本地文件始终可访问,作为 URL 一部分(路径本身之外)或使用 storage_options 字典传递的所有参数都将被忽略。

这是默认后端,如果在调用中完全没有传递协议,则使用此后端。

我们在这里假设每个工作进程都可以访问相同的文件系统 - 工作进程要么位于同一机器上,要么网络文件系统已挂载并在每个工作进程节点上引用相同的路径位置。

Dask 的数据 IO 方法通常会将相对路径规范化为绝对路径,以便工作进程即使其工作目录与您的客户端不同,也能读取数据。但是,包括您自己编写的函数在内的其他函数可能不会规范化路径。如果您的客户端的当前工作目录与工作进程的工作目录不同,您可能会遇到文件无法找到的问题,因为相对路径将不正确。

通常,如果您的客户端和工作进程(以及可能的调度器)都共享相同的工作目录,事情会更简单。您可以使用如下代码片段进行检查

>>> import os
>>> from dask.distributed import Client, LocalCluster
>>> client = Client(LocalCluster())
>>> client.run(os.getcwd)  
{'tcp://127.0.0.1:64597': '/home/coder',
 'tcp://127.0.0.1:64598': '/home/coder'}

Hadoop 文件系统

Hadoop 文件系统 (HDFS) 是一种广泛部署的分布式、数据本地文件系统,使用 Java 编写。许多运行 Hadoop 和 Spark 的集群都使用此文件系统作为后端。HDFS 支持可以通过 PyArrow 提供。

默认情况下,后端尝试从每个节点上的本地 Hadoop 配置文件读取默认服务器和端口,因此可能不需要任何配置。但是,服务器、端口和用户可以作为 URL 的一部分传递:hdfs://user:pass@server:port/path/to/data,或者使用 storage_options= kwarg。

PyArrow 的额外配置

以下附加选项可以通过 storage_options 传递给 PyArrow 驱动程序

  • host, port, user:基本认证

  • kerb_ticket:Kerberos 票据缓存的路径

PyArrow 的 libhdfs 驱动程序也可能受到一些环境变量的影响。有关这些变量的更多信息,请参阅 PyArrow 文档

Amazon S3

Amazon S3 (Simple Storage Service) 是 Amazon Web Services 提供的一项 Web 服务。

Dask 可用的 S3 后端是 s3fs,在导入 Dask 时即可导入。

S3 的认证由底层库 boto3 提供。如认证文档中所述,这可以通过将凭据文件放置在每个节点上的多个位置之一来实现:~/.aws/credentials~/.aws/config/etc/boto.cfg~/.boto。另外,对于位于 Amazon EC2 内部的节点,可以为每个节点设置 IAM 角色,然后无需进一步配置。用户凭据的最后一种认证选项可以直接在 URL 中传递(s3://keyID:keySecret/bucket/key/name)或使用 storage_options。但是,在这种情况下,密钥/秘密将以明文形式传递给所有工作进程,因此仅建议在安全网络上使用此方法。

以下参数可以使用 storage_options 传递给 s3fs

  • anon:访问是否应为匿名(默认为 False)

  • key, secret:用于用户认证

  • token:如果已使用其他 S3 客户端完成了认证

  • use_ssl:连接是否加密且安全(默认为 True)

  • client_kwargs:传递给 boto3 客户端的字典,包含 region_nameendpoint_url 等键。注意:不要在此处传递 config 选项,请将其内容传递给 config_kwargs

  • config_kwargs:传递给 s3fs.S3FileSystem 的字典,它会将其传递给 boto3 客户端的 config 选项。

  • requester_pays:如果认证用户将承担传输费用,则设置为 True,某些批量数据提供商要求这样做

  • default_block_size, default_fill_cache:这些对 Dask 用户来说不是特别重要,因为它们涉及连续读取之间缓冲区的行为

  • kwargs:其他参数传递给 boto3 Session 对象,例如 profile_name,用于从上面提到的配置文件中选择一个认证部分(参见此处

使用其他兼容 S3 的服务

通过使用 endpoint_url 选项,您可以使用其他兼容 s3 的服务,例如使用 AlibabaCloud OSS

dask_function(...,
    storage_options={
        "key": ...,
        "secret": ...,
        "client_kwargs": {
            "endpoint_url": "http://some-region.some-s3-compatible.com",
        },
        # this dict goes to boto3 client's `config`
        #   `addressing_style` is required by AlibabaCloud, other services may not
        "config_kwargs": {"s3": {"addressing_style": "virtual"}},
    })

Google Cloud Storage

Google Cloud Storage 是一个 RESTful 在线文件存储 Web 服务,用于在 Google 基础设施上存储和访问数据。

GCS 后端由协议标识符 gcsgs 标识,它们的效果相同。

支持多种认证模式。这些选项应包含在 storage_options 字典中,作为 {'token': ..} 随您的存储相关的 Dask 函数/方法的调用一起提交。更多详细信息请参阅 gcsfs 文档。

分布式集群的通用建议,按顺序

  • 对公共数据使用 anon

  • 如果可用,使用 cloud

  • 使用 gcloud 生成 JSON 文件,并将其分发到所有工作进程,然后提供文件的路径

  • 直接使用 gcsfs 的 browser 方法生成令牌缓存文件(~/.gcs_tokens),并将其分发到所有工作进程,然后使用方法 cache

下面显示了最后一个建议,这可能是已认证访问(而非匿名访问)最快和最简单的方法,因为它不需要重新认证。然而,此方法不安全,因为凭据会直接在集群中传递。如果您确定集群本身是安全的,则此方法是可以接受的。您需要使用任何适合您的方法创建一个 GCSFileSystem 对象,然后直接传递其凭据

gcs = GCSFileSystem(...)
dask_function(..., storage_options={'token': gcs.session.credentials})

Microsoft Azure Storage

Microsoft Azure Storage 由 Data Lake Storage (Gen1) 和 Blob Storage (Gen2) 组成。它们分别由 adlfs 后端提供的协议标识符 adlabfs 标识。

adl 的认证需要在 storage_options 字典中提供 tenant_idclient_idclient_secret

abfs 的认证需要 storage_options 包含用于 RBAC 和 ACL 访问模型的 account_nametenant_idclient_idclient_secret,或者用于 共享密钥 访问模型的 account_nameaccount_key

HTTP(S)

通过 HTTP 和 HTTPS 可以直接以文件形式访问任意 URL。但是,HTTP 上没有 glob 功能,因此只能使用显式的文件列表。

服务器实现提供的 Sainfo 有所不同 - 它们可能通过 HEAD 请求或在下载开始时指定文件大小,也可能不指定 - 并且某些服务器可能不遵守字节范围请求。因此,HTTPFileSystem 提供尽力而为的行为:下载是流式的,但如果看到的数据超过配置的块大小,则会引发错误。要能够访问此类数据,您必须一次性读取整个文件(并且它必须适合内存)。

使用块大小为 0 将返回正常的 requests 流式文件对象,这些对象是稳定的,但不提供随机访问。

开发者 API

任何文件系统后端的原型都可以在 fsspec.spec.AbstractFileSystem 中找到。任何新的实现都应提供相同的 API,或直接继承,并将其自身作为协议提供给 Dask。例如,以下代码将注册由实现类 MyProtoFileSystem 描述的协议“myproto”。形式为 myproto:// 的 URL 此后将被分派到此类的方法。

fsspec.registry['myproto'] = MyProtoFileSystem

但是,最好向 fsspec 提交 PR,将该类包含在 known_implementations 中。

内部机制

Dask 在 dask.bytes 包中包含用于可扩展数据摄取的内部工具,并使用 fsspec 中的 open_files 等外部工具。 。这些函数更侧重于开发者,而非直接供用户使用。这些函数为面向用户的函数(例如 dd.read_csv db.read_text提供支持,这些函数可能对大多数用户更有用。

read_bytes(urlpath[, delimiter, not_zero, ...])

给定一个或多个路径,返回从这些路径读取数据的延迟对象。

此函数在输出格式(字节)、输入位置(文件系统、S3、HDFS)、行分隔符和压缩格式方面具有可扩展性。

此函数是惰性的,返回指向字节块的指针(read_bytes)。它通过在路径前添加协议(例如 s3://hdfs://)(见下文)来处理不同的存储后端。它处理 fsspec.compression 中列出的压缩格式,其中一些可能需要安装额外的软件包。

此函数并非用于所有数据源。某些数据源(例如 HDF5)非常特殊,并接受自定义处理。

分隔符

The read_bytes function takes a path (or globstring of paths) and produces a sample of the first file and a list of delayed objects for each of the other files. If passed a delimiter such as delimiter=b'\n', it will ensure that the blocks of bytes start directly after a delimiter and end directly before a delimiter. This allows other functions, like pd.read_csv, to operate on these delayed values with expected behavior.

这些分隔符对于典型的基于行的格式(日志文件、CSV、JSON)以及其他带分隔符的格式(如 Avro,它可能通过复杂的标记字符串分隔逻辑块)都很有用。请注意,分隔符查找算法很简单,不会考虑转义字符、UTF-8 编码序列的一部分或字符串引号内的字符。

压缩

这些函数支持广泛使用的压缩技术,例如 gzipbz2xzsnappylz4。通过将函数插入到 fsspec.compression 模块中可用的字典中,可以轻松添加更多压缩格式。这可以在运行时完成,无需直接添加到代码库中。

然而,大多数压缩技术(例如 gzip)不支持高效的随机访问,因此它们适用于流式 fsspec.open_files,但不适用于在不同位置分割文件的 read_bytes

API

dask.bytes.read_bytes(urlpath, delimiter=None, not_zero=False, blocksize='128 MiB', sample='10 kiB', compression=None, include_path=False, **kwargs)[source]

给定一个或多个路径,返回从这些路径读取数据的延迟对象。

路径可以是文件名,例如 '2015-01-01.csv',也可以是 globstring,例如 '2015-*-*.csv'

如果已安装相应的库,路径可以在前面加上协议,例如 s3://hdfs://

如果给定分隔符,这将根据分隔符干净地分割数据,以便块边界紧接在分隔符之后开始,并在分隔符处结束。

参数
urlpath字符串或列表

绝对或相对文件路径。使用协议(例如 s3://)作为前缀以从其他文件系统读取。要从多个文件读取,您可以传递 globstring 或路径列表,但要注意它们必须都具有相同的协议。

delimiterbytes

可选的分隔符,例如 b'\n',用于分割字节块。

not_zerobool

强制查找文件开头的分隔符,并丢弃头部。

blocksizeint, str

块大小(以字节为单位),默认为“128 MiB”

compressionstring 或 None

字符串,例如 'gzip' 或 'xz'。必须支持高效的随机访问。

sampleint, string 或 boolean

是否返回头部样本。值为 False 表示“不请求样本”,或者是一个整数或字符串值,例如 2**20"1 MiB"

include_pathbool

是否包含表示特定文件的字节的路径。默认为 False。

**kwargsdict

对特定存储连接有意义的额外选项,例如主机、端口、用户名、密码等。

返回值
samplebytes

样本头部

blocksdask.Delayed 的列表的列表

每个列表对应一个文件,每个延迟对象计算该文件的字节块。

paths字符串列表,仅在 include_path 为 True 时包含

与 blocks 长度相同的列表,其中每个项是对应块中表示的文件路径。

示例

>>> sample, blocks = read_bytes('2015-*-*.csv', delimiter=b'\n')  
>>> sample, blocks = read_bytes('s3://bucket/2015-*-*.csv', delimiter=b'\n')  
>>> sample, paths, blocks = read_bytes('2015-*-*.csv', include_path=True)