Dask Dataframe 和 SQL
目录
Dask Dataframe 和 SQL¶
SQL 是一种在数据库服务器上执行表格计算的方法。类似的操作也可以在 Dask Dataframe 上完成。用户通常希望将两者连接起来。
本文档描述了 Dask 与 SQL 数据库之间的连接,并旨在澄清我们经常收到的用户提出的几个问题。
Dask 是否实现了 SQL?¶
简短的回答是“否”。Dask 没有 SQL 查询的解析器或查询规划器。然而,Pandas API(对于 Dask Dataframe 而言大部分相同)有很多与 SQL 操作类似的功能。关于将 SQL 映射到 Pandas 语法的详细说明可以在 pandas 文档中找到。
以下软件包可能会引起您的兴趣
数据库还是 Dask?¶
数据库服务器能够处理表格数据并生成结果,就像 Dask Dataframe 一样。你为什么会选择使用其中一个而不是另一个呢?
如今,数据库服务器可以是分片/分布式系统,能够处理数百万行的表。大多数数据库实现都倾向于按行检索和(原子)更新表的少量子集。配置数据库以针对特定类型的查询进行优化可能具有挑战性,但假设您的所有数据都已经存在于数据库中,那么它可能是一个不错的解决方案——特别是如果您对 SQL 查询计划优化有所了解的话。当其他部分被条件排除时,SQL 实现可以非常有效地分析查询,仅提取表的一小部分进行处理。
Dask 比数据库灵活得多,并且明确设计用于处理大于内存的数据集,可以并行处理,并可能分布在集群中。如果您的工作流程不适合 SQL,请使用 dask。如果您的数据库服务器处理大量数据时遇到困难,dask 可能会做得更好。最好分析您的查询(并记住资源的 其他用户!)。如果您需要组合来自不同来源的数据,dask 可能是您的最佳选择。
您可能会发现 Dask API 比编写 SQL 更易于使用(如果您已经习惯了 Pandas),并且诊断反馈更具参考价值。这些观点可以说是 Dask 的优势。
使用 read_sql_table 或 read_sql_query 从 SQL 加载¶
Dask 允许您使用函数 dask.dataframe.read_sql_table()
和 dask.dataframe.read_sql_query()
从 SQL 表和查询构建 Dataframe,这些函数基于 Pandas 版本,共享大多数参数,并使用 SQLAlchemy 处理实际查询。您可能需要为您选择的数据库服务器安装额外的驱动程序包。
由于 Dask 旨在处理大于内存的数据集或分布在集群中,因此与 Pandas 相比,以下是需要注意的主要区别:
Dask 不支持任意文本查询,只支持整个表和 SQLAlchemy sql 表达式
con 参数必须是 URI 字符串,而不是 SQLAlchemy 引擎/连接对象
分区信息是 必需的,可以像提供索引列参数一样简单,也可以更明确(见下文)
chunksize 参数不使用,因为分区必须通过索引列进行
如果您需要比这更灵活的方法,或者此方法对您不起作用(例如,在类型推断方面),请跳到下一节。
为何存在差异¶
Dask 旨在实现处理大量数据,包括可能将处理分布在集群中。对于从 SQL 服务器检索数据,这意味着查询必须可分区:每个分区可以独立于其他分区获取,不依赖于某些全局状态,并且任务定义必须可序列化,即可以表示为发送给工作节点的字节流。
这些限制意味着我们不能直接接受 SQLAlchemy 引擎或连接对象,因为它们具有无法序列化的内部状态(缓冲区等)。必须使用 URI 字符串,它可以在工作节点上重新创建为新的引擎。同样,我们无法处理依赖于数据库游标内部状态的分块查询;也无法处理 LIMIT/OFFSET 查询,因为它们不保证可重复,并且涉及在服务器上扫描整个查询(这非常低效)。
如果 您的数据足够小,不需要 Dask 的核外和/或分布式能力,那么您最好直接使用 Pandas 或 SQLAlchemy。
索引列¶
我们需要一种方法将单个主查询转化为每个分区的子查询。对于大多数合理的数据库表,应该有一个明显的列可以用于分区——它可能是数值型的,并且肯定应该在数据库中建立索引。后一个条件很重要,因为一旦 Dask 开始计算,许多并发查询将命中您的服务器。
通过仅为 index 参数提供列名,您就隐含地说明该列是数值型的,Dask 通过将最小值和最大值之间的空间均匀分割成 npartitions
个间隔来猜测合理的分区。您还可以提供您希望考虑的最大/最小值,这样 Dask 就不需要查询这些值。或者,您可以让 Dask 获取前几行(默认为 5 行),并利用它们猜测典型的字节/行,然后在此基础上确定分区大小。毋庸置疑,对于非同质的表,结果会有很大差异。
特定分区¶
在某些情况下,您可能对如何分区数据有很好的了解,例如基于具有有限数量唯一值或类别的列。这使得可以使用字符串列或任何具有自然顺序的列作为索引列,而不仅仅是数值类型。
在这种情况下,您将提供一组特定的 divisions
,即每个分区的索引列的起始/结束值。例如,如果某一列恰好包含十六进制字符串格式的随机 ID,那么您可以使用以下方式指定 16 个分区:
df = read_sql_table("mytable", divisions=list("0123456789abcdefh"),
index_col="hexID")
因此第一个分区将包含值 "0" <= hexID < "1"
的 ID,即开头字符为“0”。
SQLAlchemy 表达式¶
由于我们只发送数据库连接 URI 而不是引擎对象,因此我们无法依赖 SQLAlchemy 的表类推断和 ORM 来执行查询。然而,我们可以使用“select” sql 表达式,它们仅在执行时格式化为文本查询。
from sqlalchemy import sql
number = sql.column("number")
name = sql.column("name")
s1 = sql.select([
number, name, sql.func.length(name).label("lenname")
]
).select_from(sql.table("test"))
data = read_sql_query(
s1, db, npartitions=2, index_col=number
)
这里我们还演示了使用函数 length
在服务器端执行操作。请注意,此类操作必须 标记,但只要它也包含在选定的列集中,就可以将其用作索引列。如果用作索引/分区,为了性能,该列仍然应该在数据库中建立索引。需要考虑的一个最重要的函数是 cast
,用于在数据库中指定输出数据类型或进行转换,如果 pandas 在推断数据类型时遇到问题。
需要提醒的是,SQLAlchemy 表达式需要一些时间来熟悉,您可以先用 Pandas 练习,只读取查询的第一个小块,直到看起来正确为止。您可以在 此 gist 中找到更完整的面向对象示例。
从 SQL 加载,手动方法¶
如果 read_sql_table
不能满足您的需求,您可以尝试以下方法之一。
来自 Map 函数¶
通常,您比上述通用方法更了解您的数据和服务器。事实上,某些类似数据库的服务器可能根本不支持 SQLAlchemy,或者提供一个更优化的备用 API。
如果您已经有一种方法可以按分区从数据库获取数据,那么您可以使用 dask.dataframe.from_map()
并以这种方式构建 dataframe。它可能看起来像这样。
import dask.dataframe as dd
def fetch_partition(part):
conn = establish_connection()
df = fetch_query(base_query.format(part))
return df.astype(known_types)
ddf = dd.from_map(fetch_partition,
parts,
meta=known_types,
divisions=div_from_parts(parts))
您必须提供自己的函数来设置服务器连接、自己的查询,以及一种方法来格式化该查询以使其特定于每个分区。例如,您可能使用 WHERE 子句来指定范围或特定的唯一值。这里的 known_types
用于转换 dataframe 分区并提供 meta
,以帮助保持一致性,并避免 Dask 需要预先分析一个分区来猜测列/类型;您可能还需要显式设置索引。
通过客户端流式传输¶
在某些情况下,工作节点可能无法访问数据,但客户端可以;或者数据的初始加载时间并不重要,只要数据集随后保存在集群内存中并可用于 dask-dataframe 查询即可。可以通过从客户端上传数据块来构建 dataframe
有关如何执行此操作的完整示例,请参见 此处
直接访问数据文件¶
一些数据库系统,如 Apache Hive,将其数据存储在 Dask 可以直接访问的位置和格式中,例如 S3 或 HDFS 上的 parquet 文件。如果您的 SQL 查询会读取整个数据集并将其传递给 Dask,那么从数据库流式传输数据很可能是瓶颈,直接读取源数据文件可能会更快。