潜入三角洲湖:执法与演变

哈Ha!我向您介绍了Burak Yavuz,Brenner Heintz和Denny Lee撰写的文章“深入三角洲:模式实施与演化”,该翻译是在OTUS开设数据工程师课程之前准备的。





就像我们的经验一样,数据也在不断积累和发展。为了跟上步伐,我们的世界思维模型必须适应新的数据,其中一些包含新的维度-观察我们以前不知道的事物的新方法。这些思维模型与决定我们如何分类和处理新信息的表架构没有太大不同。

这将我们带到电路管理的问题。随着业务任务和需求随时间变化,数据的结构也会变化。数据变化时,Delta Lake可以轻松实现新的测量。用户可以访问用于管理表布局的简单语义。这些工具包括Schema Enforcement(模式实施),该模式可以保护用户避免因错误或不必要的数据而无意中阻塞其表;以及Schema Evolution(模式演进),该模式可以将包含有价值数据的新列自动添加到适当的位置。在本文中,我们将更深入地研究这些工具的使用。

了解表架构


Apache Spark中的每个DataFrame都包含一个架构,该架构定义了一种数据形式,例如数据类型,列和元数据。使用Delta Lake,表架构以JSON格式保存在事务日志中。

什么是强制架构?


模式执行(又称为模式验证)是Delta Lake中的一种防御机制,通过拒绝与表模式不匹配的记录来保证数据质量。就像一家受欢迎的餐厅的接待处的女主人一样,该餐厅仅接受事先预约,他检查表中输入的每一列是否在相应的预期列列表中(换句话说,是否为每个列都进行了预约),并拒绝任何不在列表中的具有列的条目。

实施电路如何工作?


Delta Lake在写入时使用模式检查,这意味着在记录过程中将检查表中的所有新记录与目标表的模式是否兼容。如果该方案不兼容,则Delta Lake会完全取消该事务(不写入数据),并引发异常以通知用户有关差异。
为了确定与表的记录兼容性,Delta Lake使用以下规则。可写的DataFrame:

  • 不能包含不在目标表的架构中的其他列。反之亦然,如果输入数据绝对不包含表中的所有列,则一切都是正常的-这些列将被简单地分配零值。
  • , . StringType, DataFrame IntegerType, .
  • 不能包含仅大小写不同的列名。这意味着您不能在同一表中定义名称为“ Foo”和“ foo”的列。尽管Spark可以区分大小写或不区分大小写(默认),但Delta Lake区分大小写,但不区分大小写。存储和返回列信息时,Parquet区分大小写。为了避免可能的错误,数据损坏或数据丢失(我们在Databricks中亲自遇到),我们决定添加此限制。

为了说明这一点,让我们看看下面的代码在尝试将一些最近生成的列添加到尚未配置为接受它们的Delta Lake表中时发生了什么。

#  DataFrame ,       Delta Lake
loans = sql("""
            SELECT addr_state, CAST(rand(10)*count as bigint) AS count,
            CAST(rand(10) * 10000 * count AS double) AS amount
            FROM loan_by_state_delta
            """)

#    DataFrame
original_loans.printSchema()

root
  |-- addr_state: string (nullable = true)
  |-- count: integer (nullable = true)
 
#    DataFrame
loans.printSchema()
 
root
  |-- addr_state: string (nullable = true)
  |-- count: integer (nullable = true)
  |-- amount: double (nullable = true) # new column
 
#    DataFrame (  )   
loans.write.format("delta") \
           .mode("append") \
           .save(DELTALAKE_PATH)

Returns:

A schema mismatch detected when writing to the Delta table.
 
To enable schema migration, please set:
'.option("mergeSchema", "true")\'
 
Table schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
 
Data schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
-- amount: double (nullable = true)
 
If Table ACLs are enabled, these options will be ignored. Please use the ALTER TABLE command for changing the schema.

Delta Lake不会自动添加新列,而是强加图表并停止记录。为了帮助确定哪一列(或多列)是不匹配的原因,Spark从堆栈中提取了两种方案进行比较。

实施方案有什么用?


由于强制执行模式是一项相当严格的测试,因此,它是将准备好用于生产或使用的干净,完全转换的数据集用作网守的绝佳工具。通常,它适用于直接提供数据的表:

  • 机器学习算法
  • BI仪表板
  • 数据分析和可视化工具
  • 任何需要严格结构化,强类型语义方案的生产系统。

为了为最终的障碍准备数据,许多用户使用了一种简单的“多跳”体系结构,该体系结构逐渐将其表结构化。要了解更多信息,请查看Delta Lake的机器级机器学习文章

当然,您可以在管道的任何位置使用该方案的强制应用程序,但是请记住,在这种情况下,流到表可能会令人沮丧,因为例如,您忘记了在输入数据中添加了另一列。

数据细化预防


在这一点上,您可能想知道为什么会出现这种情况?毕竟,有时会出现意外的“模式不匹配”​​错误,这会使您在工作流程中陷入困境,特别是如果您不熟悉Delta Lake。为什么不让架构根据需要进行更改,以便无论如何我都可以编写DataFrame?

俗话说,“一盎司预防胜于一磅治疗”。在某些时候,如果您不注意应用方案,那么数据类型兼容性问题将使他们感到恶心-乍一看,同类原始数据源可能包含边界情况,列损坏,映射格式错误或其他令人梦dream以求的事情在噩梦中。最好的方法是通过实施方案将这些敌人拦在门外,并在他们开始搜寻您工作代码的黑暗深度时立即(在以后)对付它们。

强制执行模式使您有信心表的模式不会更改,除非您自己确认更改选项。这样可以防止在添加新列时经常发生的数据稀释现象,以至于以前有价值的压缩表由于数据泛滥而失去了价值和实用性。鼓励您刻意,设置高标准并期望高质量,实施该计划完全可以帮助您保持诚实并保持桌子清洁。

如果经过进一步考虑,您确定确实需要添加新列-没问题,下面是单行修复。解决方案是电路的发展!

什么是电路演进?


模式演变是一项功能,使用户可以根据随时间变化的数据轻松更改当前表模式。通常,在执行添加或重写操作时会使用它来自动调整布局以包括一个或多个新列。

电路演进如何工作?


按照上一节中的示例,开发人员可以轻松地使用模式的演变来添加新的列,这些新列以前由于模式不匹配而被拒绝。通过添加.option('mergeSchema', 'true')到您的Spark团队来激活计划演变.write .writeStream.

#   mergeSchema
loans.write.format("delta") \
           .option("mergeSchema", "true") \
           .mode("append") \
           .save(DELTALAKE_SILVER_PATH)

要查看图,执行以下Spark SQL查询

#     ,  ,    
%sql
SELECT addr_state, sum(`amount`) AS amount
FROM loan_by_state_delta
GROUP BY addr_state
ORDER BY sum(`amount`)
DESC LIMIT 10

图片
另外,您可以通过将spark.databricks.delta.schema.autoMerge = TrueSpark 添加到配置中来为整个Spark会话设置此选项。但是请谨慎使用,因为强制实施方案将不再警告您与方案无意之间的不一致。

通过在查询中包含参数,mergeSchemaDataFrame中存在但目标表中不存在的所有列将作为写事务的一部分自动添加到架构的末尾。嵌套字段也可以添加,并且它们也将添加到相应结构列的末尾。

日期工程师和科学家可以使用此选项将新列(也许是最近跟踪的指标或本月的销售指标列)添加到他们现有的机器学习生产表中,而不会破坏基于旧列的现有模型。

添加或重写表时,以下类型的架构更改可以作为架构演进的一部分被接受:

  • 添加新列(这是最常见的情况)
  • 从NullType->任何其他类型更改数据类型,或从ByteType-> ShortType-> IntegerType引发数据类型

作为模式演变的一部分不可接受的其他更改要求通过添加来覆盖模式和数据.option("overwriteSchema", "true")例如,在“ Foo”列最初是整数,而新方案将是字符串数据类型的情况下,则需要重写所有Parquet(数据)文件。这些更改包括:

  • 删除栏
  • 更改现有列的数据类型(就地)
  • 重命名仅区分大小写的列(例如,“ Foo”和“ foo”)

最后,在下一版本的Spark 3.0中,将完全支持显式DDL(使用ALTER TABLE),这将允许用户对表架构执行以下操作:

  • 添加列
  • 更改列注释
  • 设置确定表行为的表属性,例如,设置事务日志存储的持续时间。

电路演进有什么用?


当您打算更改表的模式时,可以始终使用模式演变(与不小心将不应该存在的列添加到DataFrame的情况相反)。这是迁移架构的最简单方法,因为它会自动添加正确的列名和数据类型,而无需显式声明它们。

结论


强制架构会拒绝与表不兼容的任何新列或其他架构更改。通过设置和维护这些高标准,分析人员和工程师可以依靠其数据具有最高完整性这一事实,从而对其进行清晰明确的推理,从而使他们能够做出更有效的业务决策。

另一方面,电路的演进补充了执法,简化了所谓的电路自动更改。最后,添加一列应该不难。

电路的强制应用是阳,电路的演化是阴。当一起使用这些功能时,降噪和信号调谐比以往任何时候都容易。

我们还要感谢Mukul Murti和Pranava Ananda对本文的贡献。

该系列中的其他文章:

深入Delta Lake:打开事务日志的包装



相关文章


Delta Lake生产水平机器学习

什么是数据湖?



了解有关该课程的更多信息



All Articles