Dask Dataframe 和 SQL
目录
Dask Dataframe 和 SQL¶
SQL 是一种在数据库服务器上执行表格计算的方法。类似的操作也可以在 Dask Dataframes 上完成。用户通常希望将两者结合起来。
本文档描述了 Dask 与 SQL 数据库之间的连接,并旨在澄清我们经常收到的一些用户问题。
Dask 是否实现了 SQL?¶
简短的答案是“没有”。Dask 没有用于 SQL 查询的解析器或查询规划器。然而,Dask Dataframes 的 Pandas API(两者大体相同)有许多与 SQL 操作类似的功能。关于如何将 SQL 映射到 Pandas 语法的良好描述可以在 Pandas 文档中找到。
以下包可能让您感兴趣
数据库还是 Dask?¶
数据库服务器能够像 Dask Dataframe 一样处理表格数据并生成结果。您为什么会选择其中一个而不是另一个呢?
如今,数据库服务器可以是分片/分布式系统,能够处理数百万行的表。大多数数据库实现都倾向于按行检索和(原子)更新表的小部分数据。配置数据库以使其对特定类型的查询快速运行可能具有挑战性,但假设您的所有数据都已在数据库中,这很可能是最好的解决方案——特别是如果您对 SQL 查询计划优化有所了解的话。SQL 实现可以非常有效地分析查询,仅提取表中一小部分进行考虑,而其余部分则被条件排除。
Dask 比数据库灵活得多,专门设计用于处理大于内存的数据集,支持并行处理,并可能分布式部署在集群上。如果您的工作流程不太适合 SQL,请使用 dask。如果您的数据库服务器在处理大量数据时遇到困难,dask 可能会表现得更好。最好对您的查询进行性能分析(并考虑到其他资源使用者!)。如果您需要组合来自不同来源的数据,dask 可能是您的最佳选择。
您可能会发现 Dask API 比编写 SQL 更易于使用(如果您已经习惯了 Pandas),并且诊断反馈也更有用。这些优点可以说是有利于 Dask 的地方。
使用 read_sql_table 或 read_sql_query 从 SQL 加载¶
Dask 允许您使用函数 dask.dataframe.read_sql_table()
和 dask.dataframe.read_sql_query()
从 SQL 表和查询构建 dataframe,这些函数基于 Pandas 版本,共享大部分参数,并使用 SQLAlchemy 来实际处理查询。您可能需要为所选数据库服务器安装额外的驱动程序包。
由于 Dask 设计用于处理大于内存的数据集,或在集群上进行分布式处理,以下是与 Pandas 相比需要注意的主要差异
Dask 不支持任意文本查询,只支持整张表和 SQLAlchemy 的 sql 表达式
con 参数必须是 URI 字符串,而不是 SQLAlchemy engine/connection 对象
分区信息是必需的,可以像提供索引列参数一样简单,也可以更明确(见下文)
不使用 chunksize 参数,因为必须通过索引列进行分区
如果您需要比这更灵活的方法,或者该方法对您不起作用(例如,在类型推断方面),请跳到下一节。
为何存在差异¶
Dask 旨在使处理大量数据成为可能,包括可能将处理分布在集群上。对于从 SQL 服务器检索数据,这意味着查询必须是可分区的:每个分区可以独立于其他分区获取,不依赖于某些全局状态,并且任务的定义必须是可序列化的,即可以表示为传输给工作节点的字节流。
这些限制意味着我们不能直接接受 SQLAlchemy engines 或 connection 对象,因为它们具有无法序列化的内部状态(缓冲区等)。必须使用 URI 字符串,可以在工作节点上重新创建为新的 engine。同样,我们无法适应依赖数据库游标内部状态的分块查询;也无法适应 LIMIT/OFFSET 查询,因为它们无法保证可重复性,并且需要在服务器上扫描整个查询(效率非常低)。
如果您的数据足够小,不需要 Dask 的 out-of-core 和/或分布式功能,那么您最好直接使用 Pandas 或 SQLAlchemy。
索引列¶
我们需要一种方法将一个主查询转化为每个分区的子查询。对于大多数合理的数据库表,应该有一个明显的列可以用于分区——它可能是数字类型,并且肯定应该在数据库中创建索引。后一个条件很重要,因为一旦 Dask 开始计算,许多同时的查询会命中您的服务器。
只需为 index 参数提供列名,您就隐含地表示该列是数字类型,Dask 通过将最小值和最大值之间的空间平均分割成 npartitions
个区间来猜测合理的分区。您还可以提供您希望考虑的最大值/最小值,以便 Dask 不需要查询这些值。或者,您可以让 Dask 获取前几行(默认为 5 行),并使用它们来猜测每行的典型字节大小,并以此为基础确定分区大小。无需多言,对于不寻常的同质表,结果会有很大差异。
特定分区¶
在某些情况下,您可能非常清楚如何对数据进行分区,例如基于具有有限数量唯一值或类别的列。这使得可以使用字符串列或任何具有自然顺序的列作为索引列,而不仅仅是数值类型。
在这种情况下,您将提供一组特定的 divisions
,即每个分区的索引列的起始/结束值。例如,如果某一列恰好包含十六进制字符串格式的随机 ID,那么您可以通过以下方式指定 16 个分区
df = read_sql_table("mytable", divisions=list("0123456789abcdefh"),
index_col="hexID")
因此,第一个分区将包含值 "0" <= hexID < "1"
的 ID,即前导字符为“0”。
SQLAlchemy 表达式¶
由于我们只发送数据库连接 URI 而不是 engine 对象,因此我们不能依赖 SQLAlchemy 的表类推断和 ORM 来执行查询。但是,我们可以使用“select” sql 表达式,这些表达式仅在执行时才格式化为文本查询。
from sqlalchemy import sql
number = sql.column("number")
name = sql.column("name")
s1 = sql.select([
number, name, sql.func.length(name).label("lenname")
]
).select_from(sql.table("test"))
data = read_sql_query(
s1, db, npartitions=2, index_col=number
)
这里我们还演示了使用函数 length
在服务器端执行操作。请注意,需要为此类操作添加标签,但只要它们也在选定列集合中,您就可以将它们用于索引列。如果用于索引/分区,为了性能,该列仍然应该在数据库中建立索引。如果 pandas 在推断数据类型时遇到问题,要考虑的最重要的函数之一是 cast
,用于在数据库中指定输出数据类型或转换。
需要提醒您的是,SQLAlchemy 表达式需要一些时间来适应,您可以先使用 Pandas 进行练习,只读取查询的第一个小块数据,直到一切看起来正常。您可以在 这个 gist 中找到一个更完整的面向对象示例。
从 SQL 加载,手动方法¶
如果 read_sql_table
不足以满足您的需求,您可以尝试以下方法之一。
来自 Map 函数¶
通常,您对自己数据和服务器的了解比上述通用方法允许的更多。实际上,一些类似数据库的服务器可能根本不受 SQLAlchemy 支持,或者提供了优化更好的替代 API。
如果您已经有按分区从数据库获取数据的方法,那么您可以使用 dask.dataframe.from_map()
以这种方式构建 dataframe。它可能看起来像这样。
import dask.dataframe as dd
def fetch_partition(part):
conn = establish_connection()
df = fetch_query(base_query.format(part))
return df.astype(known_types)
ddf = dd.from_map(fetch_partition,
parts,
meta=known_types,
divisions=div_from_parts(parts))
您必须提供自己的函数来设置与服务器的连接、您自己的查询,以及一种格式化查询以使其特定于每个分区的方法。例如,您可能使用 WHERE 子句指定范围或特定的唯一值。known_types
在这里用于转换 dataframe 分区并提供 meta
,以帮助保持一致性并避免 Dask 必须预先分析一个分区来猜测列/类型;您可能还想显式设置索引。
通过客户端进行流处理¶
在某些情况下,工作节点可能无法访问数据,但客户端可以;或者数据的初始加载时间不重要,只要数据集随后保存在集群内存中并可用于 dask-dataframe 查询。可以通过从客户端上传数据块来构造 dataframe
有关如何执行此操作的完整示例,请参阅此处
直接访问数据文件¶
一些数据库系统,例如 Apache Hive,将数据存储在 Dask 可能直接访问的位置和格式,例如 S3 或 HDFS 上的 Parquet 文件。在您的 SQL 查询会读取整个数据集并将其传递给 Dask 的情况下,从数据库流式传输数据很可能是瓶颈,并且直接读取源数据文件可能会更快。