创建 Dask Bag

有几种方法可以根据您的数据创建 Dask Bag

db.from_sequence

您可以从现有的 Python 可迭代对象创建一个 Bag

>>> import dask.bag as db
>>> b = db.from_sequence([1, 2, 3, 4, 5, 6])

您可以控制将数据分箱(partitioned)的数量

>>> b = db.from_sequence([1, 2, 3, 4, 5, 6], npartitions=2)

这控制了您所暴露的并行性的粒度。默认情况下,Dask 会尝试将您的数据分成大约 100 个分区。

重要提示:不要先将数据加载到 Python 中,然后再将数据加载到 Dask Bag 中。而是直接使用 Dask Bag 来加载数据。这可以并行化加载步骤并减少工作节点间通信

>>> b = db.from_sequence(['1.dat', '2.dat', ...]).map(load_from_filename)

db.read_text

Dask Bag 可以直接从文本文件加载数据。您可以传入单个文件名、文件名列表或 glob 字符串。生成的 Bag 将每行一个项目,每个文件一个分区

>>> b = db.read_text('myfile.txt')
>>> b = db.read_text(['myfile.1.txt', 'myfile.2.txt', ...])
>>> b = db.read_text('myfile.*.txt')

这处理了标准的压缩库,如 gzip, bz2, xz,或任何易于安装的具有类文件对象的压缩库。压缩将通过文件名扩展名推断,或通过使用 compression='gzip' 关键字来指定

>>> b = db.read_text('myfile.*.txt.gz')

Bag 中的结果项是字符串。如果您有像行分隔 JSON 这样的编码数据,您可能需要在 Bag 上映射一个解码或加载函数

>>> import json
>>> b = db.read_text('myfile.*.json').map(json.loads)

或者执行字符串处理任务。为了方便起见,有一个字符串命名空间直接附加到 Bag 上,使用 .str.methodname

>>> b = db.read_text('myfile.*.csv').str.strip().str.split(',')

db.read_avro

如果安装了 fastavro,Dask Bag 可以读取 Avro 格式的二进制文件。可以从一个或多个文件创建 Bag,并可选择在文件内分块。生成的 Bag 将每个 Avro 记录包含一个项目,该项目将是 Avro schema 定义的字典形式。每个输入文件至少会有一个分区

>>> b = db.read_avro('datafile.avro')
>>> b = db.read_avro('data.*.avro')

默认情况下,Dask 会将数据文件分割成大约 blocksize 字节大小的块。您实际获得的块取决于文件的内部阻塞。

对于创建后压缩的文件(这与 Avro 使用的内部“codec”不同),不应使用分块,每个文件恰好会有一个分区

> b = bd.read_avro('compressed.*.avro.gz', blocksize=None, compression='gzip')

db.from_delayed

您可以使用 db.from_delayed 函数从 dask.delayed 值构造一个 Dask Bag。更多信息,请参阅使用 dask.delayed 与集合的文档

存储 Dask Bag

内存中

您可以通过调用 compute() 或将对象转换为列表来将 Dask Bag 转换为列表或 Python 可迭代对象

>>> result = b.compute()
or
>>> result = list(b)

到文本文件

您可以通过调用 .to_textfiles() 方法将 Dask Bag 转换为磁盘上的文件序列

dask.bag.core.to_textfiles(b, path, name_function=None, compression='infer', encoding='utf-8', compute=True, storage_options=None, last_endline=False, **kwargs)[source]

将 Dask Bag 写入磁盘,每个分区一个文件名,每个元素一行。

路径(Paths):这将为 Bag 中的每个分区创建一个文件。您可以通过多种方式指定文件名。

使用 glob 字符串

>>> b.to_textfiles('/path/to/data/*.json.gz')  

* 将被替换为递增的序列 1, 2, …

/path/to/data/0.json.gz
/path/to/data/1.json.gz

使用 glob 字符串和 name_function= 关键字参数。 name_function 函数应该接收一个整数并生成一个字符串。 name_function 生成的字符串必须保留其对应分区索引的顺序。

>>> from datetime import date, timedelta
>>> def name(i):
...     return str(date(2015, 1, 1) + i * timedelta(days=1))
>>> name(0)
'2015-01-01'
>>> name(15)
'2015-01-16'
>>> b.to_textfiles('/path/to/data/*.json.gz', name_function=name)  
/path/to/data/2015-01-01.json.gz
/path/to/data/2015-01-02.json.gz
...

您也可以提供一个明确的路径列表。

>>> paths = ['/path/to/data/alice.json.gz', '/path/to/data/bob.json.gz', ...]  
>>> b.to_textfiles(paths) 

压缩(Compression):具有已知压缩算法(gz、bz2)对应扩展名的文件名将相应地被压缩。

Bag 内容(Bag Contents):调用 to_textfiles 的 Bag 必须是文本字符串的 Bag。例如,可以通过先将 json.dumps 映射到 Bag 上,然后调用 to_textfiles,将字典 Bag 写入 JSON 文本文件

>>> b_dict.map(json.dumps).to_textfiles("/path/to/data/*.json")  

末尾换行符(Last endline):默认情况下,最后一行不会以换行符结尾。传递 last_endline=True 来反转默认行为。

到 Avro

使用 fastavro 可以将 Dask Bag 直接写入 Avro 二进制格式。每个 Bag 分区将写入一个文件。这要求用户提供一个完整的 schema 字典(请参阅 .to_avro() 方法的 docstring)。

dask.bag.avro.to_avro(b, filename, schema, name_function=None, storage_options=None, codec='null', sync_interval=16000, metadata=None, compute=True, **kwargs)[source]

将 Bag 写入一组 avro 文件

schema 是一个复杂的字典,用于描述数据,请参阅 https://avro.apache.org/docs/1.8.2/gettingstartedpython.html#Defining+a+schemahttps://fastavro.readthedocs.io/en/latest/writer.html 。其结构如下:

{'name': 'Test',
 'namespace': 'Test',
 'doc': 'Descriptive text',
 'type': 'record',
 'fields': [
    {'name': 'a', 'type': 'int'},
 ]}

其中 “name” 字段是必需的,但 “namespace” 和 “doc” 是可选描述符;“type” 必须始终是 “record”。字段列表应包含输入记录的每个键的条目,并且类型类似于 Avro 规范( https://avro.apache.org/docs/1.8.2/spec.html )的原始、复杂或逻辑类型。

每个输入分区生成一个 avro 文件。

参数
b: dask.bag.Bag
filename: str 或 str 列表

要写入的文件名。如果是列表,数量必须与分区数量匹配。如果是字符串,必须包含 glob 字符 “*”,该字符将使用 name_function 展开

schema: dict

Avro schema 字典,见上文

name_function: None 或 callable

将整数展开为字符串,参见 dask.bytes.utils.build_name_function

storage_options: None 或 dict

传递给后端文件系统的额外键/值选项

codec: ‘null’、‘deflate’ 或 ‘snappy’

压缩算法

sync_interval: int

每个文件内每个块包含的记录数

metadata: None 或 dict

包含在文件头中

compute: bool

如果为 True [默认],则立即写入文件,函数阻塞。如果为 False,则返回延迟对象,用户可以在方便时进行计算。

kwargs: 传递给 compute(),如果 compute=True

示例

>>> import dask.bag as db
>>> b = db.from_sequence([{'name': 'Alice', 'value': 100},
...                       {'name': 'Bob', 'value': 200}])
>>> schema = {'name': 'People', 'doc': "Set of people's scores",
...           'type': 'record',
...           'fields': [
...               {'name': 'name', 'type': 'string'},
...               {'name': 'value', 'type': 'int'}]}
>>> b.to_avro('my-data.*.avro', schema)  
['my-data.0.avro', 'my-data.1.avro']

到 DataFrame

您可以将 Dask Bag 转换为 Dask DataFrame 并使用这些存储解决方案。

Bag.to_dataframe(meta=None, columns=None, optimize_graph=True)[source]

从 Dask Bag 创建 Dask DataFrame。

Bag 应包含元组、字典记录或标量。

索引不会特别有意义。如果需要,之后使用 reindex

参数
metapd.DataFrame, dict, iterable, optional

一个空的 pd.DataFrame,其 dtype 和列名与输出匹配。对于 Dask DataFrame 中的许多算法,此元数据是必需的。为了方便使用,还提供了其他一些输入选项。除了 DataFrame,还可以提供一个 dict 格式的 {name: dtype} 或可迭代对象格式的 (name, dtype)。如果未提供或提供的是列表,将计算第一个分区中的一个元素,这可能会触发一个潜在开销较大的 compute 调用。这可能导致意外结果,因此建议提供 meta。更多信息请参阅 dask.dataframe.utils.make_meta

columnssequence, optional

要使用的列名。如果传入的数据没有关联的名称,此参数提供列的名称。否则,此参数指示结果中列的顺序(数据中未找到的任何名称将成为全 NA 列)。请注意,如果提供了 meta,列名将从那里获取,此参数无效。

optimize_graphbool, optional

如果为 True [默认],则在转换为 dask.dataframe.DataFrame 之前会优化图。

示例

>>> import dask.bag as db
>>> b = db.from_sequence([{'name': 'Alice',   'balance': 100},
...                       {'name': 'Bob',     'balance': 200},
...                       {'name': 'Charlie', 'balance': 300}],
...                      npartitions=2)
>>> df = b.to_dataframe()
>>> df.compute()
      name  balance
0    Alice      100
1      Bob      200
0  Charlie      300

到 Delayed 值

您可以将 Dask Bag 转换为 Dask delayed 值 的列表,然后从那里进行自定义存储。

Bag.to_delayed(optimize_graph=True)[source]

转换为 dask.delayed 对象列表,每个分区一个。

参数
optimize_graphbool, optional

如果为 True [默认],则在转换为 dask.delayed 对象之前会优化图。