Dive into Delta Lake: Enforcement and Evolution

Hello, Habr! I present to you the translation of the article “Diving Into Delta Lake: Schema Enforcement & Evolution” by Burak Yavuz, Brenner Heintz and Denny Lee, which was prepared in advance of the start of the Data Engineer course from OTUS.





Data, like our experience, is constantly accumulating and developing. To keep up, our mental models of the world must adapt to new data, some of which contain new dimensions - new ways to observe things that we had no idea about before. These mental models are not much different from table schemas that determine how we classify and process new information.

This brings us to the issue of circuit management. As business tasks and requirements change over time, the structure of your data changes. Delta Lake makes it easy to implement new measurements when data changes. Users have access to simple semantics for managing the layouts of their tables. These tools include Schema Enforcement, which protects users from inadvertently clogging their tables with errors or unnecessary data, as well as Schema Evolution, which automatically adds new columns with valuable data to the appropriate places. In this article, we will delve deeper into the use of these tools.

Understanding Table Schemas


Each DataFrame in Apache Spark contains a schema that defines a data form, such as data types, columns, and metadata. Using Delta Lake, the table schema is saved in JSON format inside the transaction log.

What is enforcing schema?


Schema Enforcement, also known as Schema Validation, is a defense mechanism in Delta Lake that guarantees data quality by rejecting records that do not match the table schema. Like the hostess at the reception in a popular restaurant, which accepts only by prior reservation, he checks to see if each column of data entered in the table is in the corresponding list of expected columns (in other words, is there a “reservation” for each of them), and rejects any entries with columns that are not in the list.

How does enforcing a circuit work?


Delta Lake uses a schema check when writing, which means that all new records in the table are checked for compatibility with the schema of the target table during recording. If the scheme is incompatible, Delta Lake completely cancels the transaction (data is not written) and throws an exception to inform the user about the discrepancy.
To determine record compatibility with a table, Delta Lake uses the following rules. Writable DataFrame:

  • cannot contain additional columns that are not in the schema of the target table. And vice versa, everything is in order if the input data does not contain absolutely all the columns from the table - these columns will simply be assigned zero values.
  • , . StringType, DataFrame IntegerType, .
  • cannot contain column names that differ only in case. This means that you cannot have columns with the names 'Foo' and 'foo' defined in the same table. Although Spark can be used in case-sensitive or case-insensitive (default), Delta Lake is case-sensitive, but case-insensitive. Parquet is case sensitive when storing and returning column information. In order to avoid possible errors, data corruption or data loss (which we personally encountered in the Databricks), we decided to add this restriction.

To illustrate this, let's take a look at what happens in the code below when trying to add some recently generated columns to a Delta Lake table that is not yet configured to accept them.

#  DataFrame ,       Delta Lake
loans = sql("""
            SELECT addr_state, CAST(rand(10)*count as bigint) AS count,
            CAST(rand(10) * 10000 * count AS double) AS amount
            FROM loan_by_state_delta
            """)

#    DataFrame
original_loans.printSchema()

root
  |-- addr_state: string (nullable = true)
  |-- count: integer (nullable = true)
 
#    DataFrame
loans.printSchema()
 
root
  |-- addr_state: string (nullable = true)
  |-- count: integer (nullable = true)
  |-- amount: double (nullable = true) # new column
 
#    DataFrame (  )   
loans.write.format("delta") \
           .mode("append") \
           .save(DELTALAKE_PATH)

Returns:

A schema mismatch detected when writing to the Delta table.
 
To enable schema migration, please set:
'.option("mergeSchema", "true")\'
 
Table schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
 
Data schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
-- amount: double (nullable = true)
 
If Table ACLs are enabled, these options will be ignored. Please use the ALTER TABLE command for changing the schema.

Instead of automatically adding new columns, Delta Lake imposes a chart and stops recording. To help determine which column (or a plurality of them) is the cause of the mismatch, Spark pulls both schemes from the stack stack for comparison.

What is the use of enforcing a scheme?


Since enforcing a schema is a fairly rigorous test, it is an excellent tool for using a clean, fully transformed dataset that is ready for production or consumption as a gatekeeper. As a rule, it is applied to tables that directly supply data:

  • Machine Learning Algorithms
  • BI dashboards
  • Data Analytics and Visualization Tools
  • Any production system that requires strictly structured, strongly typed semantic schemes.

To prepare their data for this final barrier, many users use a simple “multi-hop” architecture, which gradually brings structure to their tables. To learn more about this, you can check out Delta Lake's Machine-Level Machine Learning article .

Of course, you can use the forced application of the scheme anywhere in your pipeline, but remember that streaming to a table in this case can be frustrating, because, for example, you forgot that you added another column to the input data.

Data Thinning Prevention


At this point, you may be wondering why such a stir? After all, sometimes an unexpected “schema mismatch” error can set you up on the bandwagon in your workflow, especially if you're new to Delta Lake. Why not just let the schema change as needed so that I can write my DataFrame, no matter what?

As the old saying goes, "an ounce of prevention is worth a pound of cure." At some point, if you don’t take care of applying your scheme, data type compatibility problems will raise your disgusting heads - at first glance, homogeneous sources of raw data may contain borderline cases, damaged columns, malformed mappings, or other scary things that dream in nightmares. The best approach is to stop these enemies at the gates — by enforcing the scheme — and deal with them in the light, not later, when they begin to scour the dark depths of your work code.

Enforcing a schema gives you confidence that the schema of your table will not change, unless you yourself confirm the change option. This prevents the dilution of data that can occur when new columns are added so often that previously valuable, compressed tables lose their value and usefulness due to floods of data. By encouraging you to be intentional, set high standards, and expect high quality, enforcing the scheme does exactly what it was intended to help you stay honest, and keep your tables clean.

If, upon further consideration, you decide that you really need to add a new column - no problem, below is a single-line fix. The solution is the evolution of the circuit!

What is circuit evolution?


Schema evolution is a feature that allows users to easily change the current table schema according to data that changes over time. Most often, it is used when performing an add or rewrite operation to automatically adapt the layout to include one or more new columns.

How does circuit evolution work?


Following the example from the previous section, developers can easily use the evolution of the schema to add new columns that were previously rejected due to the mismatch of the schema. Scheme evolution is activated by adding .option('mergeSchema', 'true')to your Spark team..write .writeStream.

#   mergeSchema
loans.write.format("delta") \
           .option("mergeSchema", "true") \
           .mode("append") \
           .save(DELTALAKE_SILVER_PATH)

To view the graph, execute the following Spark SQL query

#     ,  ,    
%sql
SELECT addr_state, sum(`amount`) AS amount
FROM loan_by_state_delta
GROUP BY addr_state
ORDER BY sum(`amount`)
DESC LIMIT 10

image
Alternatively, you can set this option for the entire Spark session by adding spark.databricks.delta.schema.autoMerge = TrueSpark to the configuration. But use this with caution, because enforcing a scheme will no longer warn you about inadvertent inconsistencies with the scheme.

By including a parameter in the query mergeSchema, all columns that are present in the DataFrame but are not in the target table are automatically added to the end of the schema as part of the write transaction. Nested fields can also be added, and they will also be added to the end of the corresponding structure columns.

Date engineers and scientists can use this option to add new columns (perhaps a recently tracked metric or a column of sales metrics this month) to their existing machine learning production tables without breaking existing models based on old columns.

The following types of schema changes are permissible as part of the evolution of the schema when adding or rewriting a table:

  • Adding new columns (this is the most common scenario)
  • Change data types from NullType -> any other type or raise from ByteType -> ShortType -> IntegerType

Other changes that are unacceptable as part of the evolution of a schema require that the schema and data be overwritten by adding .option("overwriteSchema", "true"). For example, in the case where the “Foo” column was originally an integer, and the new scheme would be a string data type, then all Parquet (data) files would need to be rewritten. These changes include:

  • delete column
  • change the data type of an existing column (in place)
  • renaming columns that are only case sensitive (for example, “Foo” and “foo”)

Finally, with the next release of Spark 3.0, explicit DDL (using ALTER TABLE) will be fully supported, allowing users to perform the following actions on table schemas:

  • adding columns
  • change column comments
  • setting table properties that determine the behavior of the table, for example, setting the duration of transaction log storage.

What is the use of circuit evolution?


You can always use schema evolution when you intend to change the schema of your table (as opposed to the cases when you accidentally add columns to your DataFrame that should not be there). This is the easiest way to migrate your schema because it automatically adds the correct column names and data types without the need to explicitly declare them.

Conclusion


Forcing a schema rejects any new columns or other schema changes that are not compatible with your table. By setting and maintaining these high standards, analysts and engineers can rely on the fact that their data has the highest level of integrity, reasoning about it clearly and clearly, allowing them to make more effective business decisions.

On the other hand, the evolution of the circuit complements the enforcement, simplifying the alleged automatic changes to the circuit. In the end, it should not be difficult to add a column.

Forced application of the circuit is yang, where the evolution of the circuit is yin. When used together, these features make noise reduction and signal tuning easier than ever.

We would also like to thank Mukul Murti and Pranava Ananda for their contributions to this article.

Other articles in the series:

Dive into Delta Lake: Unpacking a Transaction Log



Related Articles


Delta Lake Production Level Machine Learning

What is a data lake?



Learn more about the course



All Articles