创建 Dask Bag
目录
创建 Dask Bag¶
有几种方法可以基于您的数据创建 Dask Bag
db.from_sequence
¶
您可以从现有的 Python 可迭代对象创建 Bag
>>> import dask.bag as db
>>> b = db.from_sequence([1, 2, 3, 4, 5, 6])
您可以控制将数据分到多少个分区中
>>> b = db.from_sequence([1, 2, 3, 4, 5, 6], npartitions=2)
这控制了您暴露并行性的粒度。默认情况下,Dask 会尝试将您的数据分割成大约 100 个分区。
重要提示:不要先将数据加载到 Python 中,然后再将数据加载到 Dask Bag 中。相反,请使用 Dask Bag 直接加载数据。这会并行化加载步骤并减少工作节点间的通信。
>>> b = db.from_sequence(['1.dat', '2.dat', ...]).map(load_from_filename)
db.read_text
¶
Dask Bag 可以直接从文本文件中加载数据。您可以传入单个文件名、文件名列表或 glob 字符串。生成的 Bag 将每行作为一项,每个文件作为一个分区。
>>> b = db.read_text('myfile.txt')
>>> b = db.read_text(['myfile.1.txt', 'myfile.2.txt', ...])
>>> b = db.read_text('myfile.*.txt')
它支持标准压缩库,例如 gzip
、bz2
、xz
或任何易于安装的、具有文件类对象的压缩库。压缩将根据文件名扩展名推断,或者使用 compression='gzip'
关键字参数。
>>> b = db.read_text('myfile.*.txt.gz')
Bag 中的结果项是字符串。如果您的数据是编码过的,例如行分隔的 JSON,那么您可能需要对 Bag 执行解码或加载函数映射。
>>> import json
>>> b = db.read_text('myfile.*.json').map(json.loads)
或者进行字符串处理任务。为了方便,Bag 直接附带一个字符串命名空间,使用 .str.methodname
。
>>> b = db.read_text('myfile.*.csv').str.strip().str.split(',')
db.read_avro
¶
如果安装了 fastavro,Dask Bag 可以读取 Avro 格式的二进制文件。可以从一个或多个文件创建 Bag,并可在文件内部选择性地进行分块。生成的 Bag 将把每个 Avro 记录作为一项,该项将是一个由 Avro schema 定义的字典。每个输入文件至少会有一个分区。
>>> b = db.read_avro('datafile.avro')
>>> b = db.read_avro('data.*.avro')
默认情况下,Dask 会将数据文件分割成大小约 blocksize
字节的块。您实际获得的块取决于文件的内部块结构。
对于创建后才压缩的文件(这与 Avro 使用的内部“codec”不同),不应使用分块,并且每个文件将恰好有一个分区。
> b = bd.read_avro('compressed.*.avro.gz', blocksize=None, compression='gzip')
db.from_delayed
¶
您可以使用 db.from_delayed
函数从 dask.delayed 值构建 Dask Bag。更多信息请参阅 使用 dask.delayed 与集合的文档。
存储 Dask Bag¶
内存中¶
您可以通过调用 compute()
或将对象转换为列表,将 Dask Bag 转换为列表或 Python 可迭代对象。
>>> result = b.compute()
or
>>> result = list(b)
到文本文件¶
您可以通过调用 .to_textfiles()
方法将 Dask Bag 转换为磁盘上的文件序列。
- dask.bag.core.to_textfiles(b, path, name_function=None, compression='infer', encoding='utf-8', compute=True, storage_options=None, last_endline=False, **kwargs)[source]¶
将 dask Bag 写入磁盘,每个分区一个文件名,每个元素一行。
路径:这将为您的 Bag 中的每个分区创建一个文件。您可以通过多种方式指定文件名。
使用 glob 字符串
>>> b.to_textfiles('/path/to/data/*.json.gz')
* 将被递增序列 1, 2, ... 替换。
/path/to/data/0.json.gz /path/to/data/1.json.gz
使用 glob 字符串和
name_function=
关键字参数。name_function 函数应该接收一个整数并生成一个字符串。name_function 生成的字符串必须保留其对应分区索引的顺序。>>> from datetime import date, timedelta >>> def name(i): ... return str(date(2015, 1, 1) + i * timedelta(days=1))
>>> name(0) '2015-01-01' >>> name(15) '2015-01-16'
>>> b.to_textfiles('/path/to/data/*.json.gz', name_function=name)
/path/to/data/2015-01-01.json.gz /path/to/data/2015-01-02.json.gz ...
您也可以提供明确的路径列表。
>>> paths = ['/path/to/data/alice.json.gz', '/path/to/data/bob.json.gz', ...] >>> b.to_textfiles(paths)
压缩:文件名扩展名对应已知压缩算法(gz, bz2)的文件将相应地进行压缩。
Bag 内容:调用
to_textfiles
的 Bag 必须是文本字符串的 Bag。例如,一个字典 Bag 可以通过先对 Bag 映射json.dumps
,然后调用to_textfiles
来写入 JSON 文本文件。>>> b_dict.map(json.dumps).to_textfiles("/path/to/data/*.json")
末尾换行:默认情况下,最后一行不以换行符结尾。传入
last_endline=True
可反转默认行为。
到 Avro¶
如果安装了 fastavro,Dask Bag 可以直接写入 Avro 二进制格式。每个 Bag 分区将写入一个文件。这要求用户提供一个完全指定的 schema 字典(参见 .to_avro()
方法的 docstring)。
- dask.bag.avro.to_avro(b, filename, schema, name_function=None, storage_options=None, codec='null', sync_interval=16000, metadata=None, compute=True, **kwargs)[source]¶
将 Bag 写入 Avro 文件集
schema 是一个复杂的字典,描述数据,详情请参阅 https://avro.apache.org/docs/1.8.2/gettingstartedpython.html#Defining+a+schema 和 https://fastavro.readthedocs.io/en/latest/writer.html 。其结构如下:
{'name': 'Test', 'namespace': 'Test', 'doc': 'Descriptive text', 'type': 'record', 'fields': [ {'name': 'a', 'type': 'int'}, ]}
其中 “name” 字段是必需的,“namespace” 和 “doc” 是可选的描述符;“type” 必须始终是 “record”。字段列表应包含输入记录每个键的条目,类型类似于 Avro 规范的原始类型、复杂类型或逻辑类型(https://avro.apache.org/docs/1.8.2/spec.html)。
结果是每个输入分区生成一个 Avro 文件。
- 参数
- b: dask.bag.Bag
- filename: list of str 或 str
要写入的文件名。如果是列表,数量必须与分区数量匹配。如果是字符串,必须包含 glob 字符 “*”,该字符将使用 name_function 扩展。
- schema: dict
Avro schema 字典,参见上文。
- name_function: None 或 callable
将整数扩展为字符串,参见
dask.bytes.utils.build_name_function
。- storage_options: None 或 dict
传递给后端文件系统的额外键/值选项。
- codec: ‘null’, ‘deflate’, 或 ‘snappy’
压缩算法。
- sync_interval: int
文件中每个块包含的记录数。
- metadata: None 或 dict
包含在文件头中。
- compute: bool
如果为 True,文件立即写入,并且函数阻塞。如果为 False,返回 delayed 对象,用户可以在方便时进行计算。
- kwargs: 传递给 compute(),如果 compute=True
示例
>>> import dask.bag as db >>> b = db.from_sequence([{'name': 'Alice', 'value': 100}, ... {'name': 'Bob', 'value': 200}]) >>> schema = {'name': 'People', 'doc': "Set of people's scores", ... 'type': 'record', ... 'fields': [ ... {'name': 'name', 'type': 'string'}, ... {'name': 'value', 'type': 'int'}]} >>> b.to_avro('my-data.*.avro', schema) ['my-data.0.avro', 'my-data.1.avro']
到 DataFrames¶
您可以将 Dask Bag 转换为 Dask DataFrame 并使用其存储解决方案。
- Bag.to_dataframe(meta=None, columns=None, optimize_graph=True)[source]¶
从 Dask Bag 创建 Dask Dataframe。
Bag 应包含元组、字典记录或标量。
索引不会特别有意义。必要时之后使用
reindex
。- 参数
- metapd.DataFrame, dict, iterable, 可选
一个空的
pd.DataFrame
,其 dtype 和列名与输出匹配。此元数据对于许多 dask dataframe 算法正常工作是必需的。为了便于使用,也提供了一些替代输入。除了DataFrame
,还可以提供dict
形式的{name: dtype}
或(name, dtype)
形式的可迭代对象。如果未提供或为列表,将计算第一个分区的单个元素,触发一个可能很昂贵的compute
调用。这可能导致意外结果,因此建议提供meta
。更多信息请参阅dask.dataframe.utils.make_meta
。- columns序列, 可选
要使用的列名。如果传入的数据没有与之关联的名称,此参数提供列的名称。否则,此参数指示结果中列的顺序(数据中未找到的任何名称将成为全 NA 列)。注意,如果提供了
meta
,则列名将从中获取,此参数无效。- optimize_graphbool, 可选
如果为 True [默认],在转换为
dask.dataframe.DataFrame
之前优化图。
示例
>>> import dask.bag as db >>> b = db.from_sequence([{'name': 'Alice', 'balance': 100}, ... {'name': 'Bob', 'balance': 200}, ... {'name': 'Charlie', 'balance': 300}], ... npartitions=2) >>> df = b.to_dataframe()
>>> df.compute() name balance 0 Alice 100 1 Bob 200 0 Charlie 300
到 Delayed 值¶
您可以将 Dask Bag 转换为 Dask delayed values 列表,并从那里实现自定义存储解决方案。