创建 Dask Bag

有几种方法可以基于您的数据创建 Dask Bag

db.from_sequence

您可以从现有的 Python 可迭代对象创建 Bag

>>> import dask.bag as db
>>> b = db.from_sequence([1, 2, 3, 4, 5, 6])

您可以控制将数据分到多少个分区中

>>> b = db.from_sequence([1, 2, 3, 4, 5, 6], npartitions=2)

这控制了您暴露并行性的粒度。默认情况下,Dask 会尝试将您的数据分割成大约 100 个分区。

重要提示:不要先将数据加载到 Python 中,然后再将数据加载到 Dask Bag 中。相反,请使用 Dask Bag 直接加载数据。这会并行化加载步骤并减少工作节点间的通信。

>>> b = db.from_sequence(['1.dat', '2.dat', ...]).map(load_from_filename)

db.read_text

Dask Bag 可以直接从文本文件中加载数据。您可以传入单个文件名、文件名列表或 glob 字符串。生成的 Bag 将每行作为一项,每个文件作为一个分区。

>>> b = db.read_text('myfile.txt')
>>> b = db.read_text(['myfile.1.txt', 'myfile.2.txt', ...])
>>> b = db.read_text('myfile.*.txt')

它支持标准压缩库,例如 gzipbz2xz 或任何易于安装的、具有文件类对象的压缩库。压缩将根据文件名扩展名推断,或者使用 compression='gzip' 关键字参数。

>>> b = db.read_text('myfile.*.txt.gz')

Bag 中的结果项是字符串。如果您的数据是编码过的,例如行分隔的 JSON,那么您可能需要对 Bag 执行解码或加载函数映射。

>>> import json
>>> b = db.read_text('myfile.*.json').map(json.loads)

或者进行字符串处理任务。为了方便,Bag 直接附带一个字符串命名空间,使用 .str.methodname

>>> b = db.read_text('myfile.*.csv').str.strip().str.split(',')

db.read_avro

如果安装了 fastavro,Dask Bag 可以读取 Avro 格式的二进制文件。可以从一个或多个文件创建 Bag,并可在文件内部选择性地进行分块。生成的 Bag 将把每个 Avro 记录作为一项,该项将是一个由 Avro schema 定义的字典。每个输入文件至少会有一个分区。

>>> b = db.read_avro('datafile.avro')
>>> b = db.read_avro('data.*.avro')

默认情况下,Dask 会将数据文件分割成大小约 blocksize 字节的块。您实际获得的块取决于文件的内部块结构。

对于创建后才压缩的文件(这与 Avro 使用的内部“codec”不同),不应使用分块,并且每个文件将恰好有一个分区。

> b = bd.read_avro('compressed.*.avro.gz', blocksize=None, compression='gzip')

db.from_delayed

您可以使用 db.from_delayed 函数从 dask.delayed 值构建 Dask Bag。更多信息请参阅 使用 dask.delayed 与集合的文档

存储 Dask Bag

内存中

您可以通过调用 compute() 或将对象转换为列表,将 Dask Bag 转换为列表或 Python 可迭代对象。

>>> result = b.compute()
or
>>> result = list(b)

到文本文件

您可以通过调用 .to_textfiles() 方法将 Dask Bag 转换为磁盘上的文件序列。

dask.bag.core.to_textfiles(b, path, name_function=None, compression='infer', encoding='utf-8', compute=True, storage_options=None, last_endline=False, **kwargs)[source]

将 dask Bag 写入磁盘,每个分区一个文件名,每个元素一行。

路径:这将为您的 Bag 中的每个分区创建一个文件。您可以通过多种方式指定文件名。

使用 glob 字符串

>>> b.to_textfiles('/path/to/data/*.json.gz')  

* 将被递增序列 1, 2, ... 替换。

/path/to/data/0.json.gz
/path/to/data/1.json.gz

使用 glob 字符串和 name_function= 关键字参数。name_function 函数应该接收一个整数并生成一个字符串。name_function 生成的字符串必须保留其对应分区索引的顺序。

>>> from datetime import date, timedelta
>>> def name(i):
...     return str(date(2015, 1, 1) + i * timedelta(days=1))
>>> name(0)
'2015-01-01'
>>> name(15)
'2015-01-16'
>>> b.to_textfiles('/path/to/data/*.json.gz', name_function=name)  
/path/to/data/2015-01-01.json.gz
/path/to/data/2015-01-02.json.gz
...

您也可以提供明确的路径列表。

>>> paths = ['/path/to/data/alice.json.gz', '/path/to/data/bob.json.gz', ...]  
>>> b.to_textfiles(paths) 

压缩:文件名扩展名对应已知压缩算法(gz, bz2)的文件将相应地进行压缩。

Bag 内容:调用 to_textfiles 的 Bag 必须是文本字符串的 Bag。例如,一个字典 Bag 可以通过先对 Bag 映射 json.dumps,然后调用 to_textfiles 来写入 JSON 文本文件。

>>> b_dict.map(json.dumps).to_textfiles("/path/to/data/*.json")  

末尾换行:默认情况下,最后一行不以换行符结尾。传入 last_endline=True 可反转默认行为。

到 Avro

如果安装了 fastavro,Dask Bag 可以直接写入 Avro 二进制格式。每个 Bag 分区将写入一个文件。这要求用户提供一个完全指定的 schema 字典(参见 .to_avro() 方法的 docstring)。

dask.bag.avro.to_avro(b, filename, schema, name_function=None, storage_options=None, codec='null', sync_interval=16000, metadata=None, compute=True, **kwargs)[source]

将 Bag 写入 Avro 文件集

schema 是一个复杂的字典,描述数据,详情请参阅 https://avro.apache.org/docs/1.8.2/gettingstartedpython.html#Defining+a+schemahttps://fastavro.readthedocs.io/en/latest/writer.html 。其结构如下:

{'name': 'Test',
 'namespace': 'Test',
 'doc': 'Descriptive text',
 'type': 'record',
 'fields': [
    {'name': 'a', 'type': 'int'},
 ]}

其中 “name” 字段是必需的,“namespace” 和 “doc” 是可选的描述符;“type” 必须始终是 “record”。字段列表应包含输入记录每个键的条目,类型类似于 Avro 规范的原始类型、复杂类型或逻辑类型(https://avro.apache.org/docs/1.8.2/spec.html)。

结果是每个输入分区生成一个 Avro 文件。

参数
b: dask.bag.Bag
filename: list of str 或 str

要写入的文件名。如果是列表,数量必须与分区数量匹配。如果是字符串,必须包含 glob 字符 “*”,该字符将使用 name_function 扩展。

schema: dict

Avro schema 字典,参见上文。

name_function: None 或 callable

将整数扩展为字符串,参见 dask.bytes.utils.build_name_function

storage_options: None 或 dict

传递给后端文件系统的额外键/值选项。

codec: ‘null’, ‘deflate’, 或 ‘snappy’

压缩算法。

sync_interval: int

文件中每个块包含的记录数。

metadata: None 或 dict

包含在文件头中。

compute: bool

如果为 True,文件立即写入,并且函数阻塞。如果为 False,返回 delayed 对象,用户可以在方便时进行计算。

kwargs: 传递给 compute(),如果 compute=True

示例

>>> import dask.bag as db
>>> b = db.from_sequence([{'name': 'Alice', 'value': 100},
...                       {'name': 'Bob', 'value': 200}])
>>> schema = {'name': 'People', 'doc': "Set of people's scores",
...           'type': 'record',
...           'fields': [
...               {'name': 'name', 'type': 'string'},
...               {'name': 'value', 'type': 'int'}]}
>>> b.to_avro('my-data.*.avro', schema)  
['my-data.0.avro', 'my-data.1.avro']

到 DataFrames

您可以将 Dask Bag 转换为 Dask DataFrame 并使用其存储解决方案。

Bag.to_dataframe(meta=None, columns=None, optimize_graph=True)[source]

从 Dask Bag 创建 Dask Dataframe。

Bag 应包含元组、字典记录或标量。

索引不会特别有意义。必要时之后使用 reindex

参数
metapd.DataFrame, dict, iterable, 可选

一个空的 pd.DataFrame,其 dtype 和列名与输出匹配。此元数据对于许多 dask dataframe 算法正常工作是必需的。为了便于使用,也提供了一些替代输入。除了 DataFrame,还可以提供 dict 形式的 {name: dtype}(name, dtype) 形式的可迭代对象。如果未提供或为列表,将计算第一个分区的单个元素,触发一个可能很昂贵的 compute 调用。这可能导致意外结果,因此建议提供 meta。更多信息请参阅 dask.dataframe.utils.make_meta

columns序列, 可选

要使用的列名。如果传入的数据没有与之关联的名称,此参数提供列的名称。否则,此参数指示结果中列的顺序(数据中未找到的任何名称将成为全 NA 列)。注意,如果提供了 meta,则列名将从中获取,此参数无效。

optimize_graphbool, 可选

如果为 True [默认],在转换为 dask.dataframe.DataFrame 之前优化图。

示例

>>> import dask.bag as db
>>> b = db.from_sequence([{'name': 'Alice',   'balance': 100},
...                       {'name': 'Bob',     'balance': 200},
...                       {'name': 'Charlie', 'balance': 300}],
...                      npartitions=2)
>>> df = b.to_dataframe()
>>> df.compute()
      name  balance
0    Alice      100
1      Bob      200
0  Charlie      300

到 Delayed 值

您可以将 Dask Bag 转换为 Dask delayed values 列表,并从那里实现自定义存储解决方案。

Bag.to_delayed(optimize_graph=True)[source]

转换为 dask.delayed 对象列表,每个分区一个。

参数
optimize_graphbool, 可选

如果为 True [默认],在转换为 dask.delayed 对象之前优化图。

另请参见

dask.bag.from_delayed