Tauchen Sie ein in den Delta Lake: Durchsetzung und Evolution

Hallo Habr! Ich präsentiere Ihnen die Übersetzung des Artikels „Tauchen in den Delta-See: Schema-Durchsetzung und -Evolution“ von Burak Yavuz, Brenner Heintz und Denny Lee, der vor dem Start des Data Engineer- Kurses von OTUS vorbereitet wurde.





Daten sammeln sich wie unsere Erfahrung ständig an und entwickeln sich weiter. Um Schritt zu halten, müssen sich unsere mentalen Modelle der Welt an neue Daten anpassen, von denen einige neue Dimensionen enthalten - neue Arten, Dinge zu beobachten, von denen wir vorher keine Ahnung hatten. Diese mentalen Modelle unterscheiden sich nicht wesentlich von Tabellenschemata, die bestimmen, wie wir neue Informationen klassifizieren und verarbeiten.

Dies bringt uns zum Thema Schaltungsmanagement. Wenn sich Geschäftsaufgaben und -anforderungen im Laufe der Zeit ändern, ändert sich die Struktur Ihrer Daten. Delta Lake macht es einfach, neue Messungen durchzuführen, wenn sich Daten ändern. Benutzer haben Zugriff auf einfache Semantik zum Verwalten der Layouts ihrer Tabellen. Zu diesen Tools gehören die Schema-Durchsetzung, mit der Benutzer vor versehentlichem Verstopfen ihrer Tabellen mit Fehlern oder unnötigen Daten geschützt werden, sowie Schema Evolution, mit dem automatisch neue Spalten mit wertvollen Daten an den entsprechenden Stellen hinzugefügt werden. In diesem Artikel werden wir uns eingehender mit der Verwendung dieser Tools befassen.

Tabellenschemata verstehen


Jeder DataFrame in Apache Spark enthält ein Schema, das ein Datenformular definiert, z. B. Datentypen, Spalten und Metadaten. Mit Delta Lake wird das Tabellenschema im JSON-Format im Transaktionsprotokoll gespeichert.

Was ist das Erzwingen eines Schemas?


Schema Enforcement, auch als Schema Validation bezeichnet, ist ein Verteidigungsmechanismus in Delta Lake, der die Datenqualität garantiert, indem Datensätze abgelehnt werden, die nicht mit dem Tabellenschema übereinstimmen. Wie die Gastgeberin an der Rezeption in einem beliebten Restaurant, das nur nach vorheriger Reservierung akzeptiert, prüft er, ob jede in der Tabelle eingegebene Datenspalte in der entsprechenden Liste der erwarteten Spalten enthalten ist (mit anderen Worten, gibt es für jede Spalte eine „Reservierung“). und lehnt alle Einträge mit Spalten ab, die nicht in der Liste enthalten sind.

Wie funktioniert die Durchsetzung einer Schaltung?


Delta Lake verwendet beim Schreiben eine Schemaüberprüfung. Dies bedeutet, dass alle neuen Datensätze in der Tabelle während der Aufzeichnung auf Kompatibilität mit dem Schema der Zieltabelle überprüft werden. Wenn das Schema nicht kompatibel ist, bricht Delta Lake die Transaktion vollständig ab (Daten werden nicht geschrieben) und löst eine Ausnahme aus, um den Benutzer über die Diskrepanz zu informieren.
Um die Datensatzkompatibilität mit einer Tabelle zu bestimmen, verwendet Delta Lake die folgenden Regeln. Beschreibbarer DataFrame:

  • darf keine zusätzlichen Spalten enthalten, die nicht im Schema der Zieltabelle enthalten sind. Und umgekehrt ist alles in Ordnung, wenn die Eingabedaten nicht absolut alle Spalten aus der Tabelle enthalten - diesen Spalten werden einfach Nullwerte zugewiesen.
  • , . StringType, DataFrame IntegerType, .
  • darf keine Spaltennamen enthalten, die sich nur für den Fall unterscheiden. Dies bedeutet, dass Sie keine Spalten mit den Namen 'Foo' und 'foo' in derselben Tabelle definieren können. Obwohl Spark in Groß- und Kleinschreibung oder ohne Berücksichtigung der Groß- und Kleinschreibung verwendet werden kann (Standard), unterscheidet Delta Lake zwischen Groß- und Kleinschreibung, jedoch nicht zwischen Groß- und Kleinschreibung. Bei Parkett wird beim Speichern und Zurückgeben von Spalteninformationen zwischen Groß- und Kleinschreibung unterschieden. Um mögliche Fehler, Datenbeschädigungen oder Datenverluste (die wir persönlich in den Databricks festgestellt haben) zu vermeiden, haben wir beschlossen, diese Einschränkung hinzuzufügen.

Um dies zu veranschaulichen, werfen wir einen Blick darauf, was im folgenden Code passiert, wenn Sie versuchen, einige kürzlich generierte Spalten zu einer Delta Lake-Tabelle hinzuzufügen, die noch nicht für die Annahme dieser Spalten konfiguriert ist.

#  DataFrame ,       Delta Lake
loans = sql("""
            SELECT addr_state, CAST(rand(10)*count as bigint) AS count,
            CAST(rand(10) * 10000 * count AS double) AS amount
            FROM loan_by_state_delta
            """)

#    DataFrame
original_loans.printSchema()

root
  |-- addr_state: string (nullable = true)
  |-- count: integer (nullable = true)
 
#    DataFrame
loans.printSchema()
 
root
  |-- addr_state: string (nullable = true)
  |-- count: integer (nullable = true)
  |-- amount: double (nullable = true) # new column
 
#    DataFrame (  )   
loans.write.format("delta") \
           .mode("append") \
           .save(DELTALAKE_PATH)

Returns:

A schema mismatch detected when writing to the Delta table.
 
To enable schema migration, please set:
'.option("mergeSchema", "true")\'
 
Table schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
 
Data schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
-- amount: double (nullable = true)
 
If Table ACLs are enabled, these options will be ignored. Please use the ALTER TABLE command for changing the schema.

Anstatt automatisch neue Spalten hinzuzufügen, legt Delta Lake ein Diagramm fest und beendet die Aufzeichnung. Um festzustellen, welche Spalte (oder mehrere davon) die Ursache für die Nichtübereinstimmung ist, zieht Spark beide Schemata zum Vergleich aus dem Stapelstapel.

Was nützt die Durchsetzung eines Schemas?


Da das Erzwingen eines Schemas ein ziemlich strenger Test ist, ist es ein hervorragendes Werkzeug für die Verwendung eines sauberen, vollständig transformierten Datensatzes, der als Gatekeeper für die Produktion oder den Verbrauch bereit ist. In der Regel wird es auf Tabellen angewendet, die direkt Daten liefern:

  • Algorithmen für maschinelles Lernen
  • BI-Dashboards
  • Tools für Datenanalyse und Visualisierung
  • Jedes Produktionssystem, das streng strukturierte, stark typisierte semantische Schemata erfordert.

Um ihre Daten auf diese letzte Barriere vorzubereiten, verwenden viele Benutzer eine einfache „Multi-Hop“ -Architektur, die ihre Tabellen schrittweise strukturiert. Weitere Informationen hierzu finden Sie im Artikel über maschinelles Lernen auf Maschinenebene von Delta Lake.

Natürlich können Sie die erzwungene Anwendung des Schemas an einer beliebigen Stelle in Ihrer Pipeline verwenden. Beachten Sie jedoch, dass das Streaming in eine Tabelle in diesem Fall frustrierend sein kann, da Sie beispielsweise vergessen haben, dass Sie den Eingabedaten eine weitere Spalte hinzugefügt haben.

Verhinderung der Datenverdünnung


An diesem Punkt wundern Sie sich vielleicht, warum so viel Aufsehen erregt? Schließlich kann manchmal ein unerwarteter Fehler „Schema-Nichtübereinstimmung“ dazu führen, dass Sie in Ihrem Workflow auf dem Zug sind, insbesondere wenn Sie Delta Lake noch nicht kennen. Warum nicht einfach das Schema nach Bedarf ändern lassen, damit ich meinen DataFrame schreiben kann, egal was passiert?

Wie das alte Sprichwort sagt: "Eine Unze Prävention ist ein Pfund Heilung wert." Wenn Sie sich nicht um die Anwendung Ihres Schemas kümmern, werden Probleme mit der Datentypkompatibilität irgendwann zu Ekel führen. Auf den ersten Blick können homogene Rohdatenquellen Grenzfälle, beschädigte Spalten, fehlerhafte Zuordnungen oder andere beängstigende Dinge enthalten, die träumen in Albträumen. Der beste Ansatz besteht darin, diese Feinde vor den Toren zu stoppen - indem Sie das Schema durchsetzen - und sie im Licht zu behandeln, nicht später, wenn sie beginnen, die dunklen Tiefen Ihres Arbeitscodes zu durchsuchen.

Durch das Erzwingen eines Schemas können Sie sicher sein, dass sich das Schema Ihrer Tabelle nicht ändert, es sei denn, Sie bestätigen die Änderungsoption selbst. Dies verhindert die Verwässerung von Daten, die auftreten können, wenn neue Spalten so oft hinzugefügt werden, dass zuvor wertvolle, komprimierte Tabellen aufgrund von Datenfluten ihren Wert und ihre Nützlichkeit verlieren. Indem Sie dazu ermutigt werden, absichtlich zu sein, hohe Standards zu setzen und hohe Qualität zu erwarten, bewirkt die Durchsetzung des Systems genau das, was es Ihnen helfen soll, ehrlich zu bleiben und Ihre Tische sauber zu halten.

Wenn bei weiterer Überlegung, Sie entscheiden , dass Sie wirklich brauchen eine neue Spalte hinzufügen - kein Problem, ist eine einzeilige fix. Die Lösung ist die Entwicklung der Schaltung!

Was ist Schaltungsentwicklung?


Die Schemaentwicklung ist eine Funktion, mit der Benutzer das aktuelle Tabellenschema einfach anhand von Daten ändern können, die sich im Laufe der Zeit ändern. In den meisten Fällen wird es beim Ausführen eines Hinzufügens oder Umschreibens verwendet, um das Layout automatisch so anzupassen, dass es eine oder mehrere neue Spalten enthält.

Wie funktioniert die Schaltungsentwicklung?


Nach dem Beispiel aus dem vorherigen Abschnitt können Entwickler die Entwicklung des Schemas problemlos verwenden, um neue Spalten hinzuzufügen, die zuvor aufgrund der Nichtübereinstimmung des Schemas abgelehnt wurden. Die Schemaentwicklung wird durch Hinzufügen .option('mergeSchema', 'true')zu Ihrem Spark-Team aktiviert ..write .writeStream.

#   mergeSchema
loans.write.format("delta") \
           .option("mergeSchema", "true") \
           .mode("append") \
           .save(DELTALAKE_SILVER_PATH)

Führen Sie die folgende Spark SQL-Abfrage aus, um das Diagramm anzuzeigen

#     ,  ,    
%sql
SELECT addr_state, sum(`amount`) AS amount
FROM loan_by_state_delta
GROUP BY addr_state
ORDER BY sum(`amount`)
DESC LIMIT 10

Bild
Alternativ können Sie diese Option für die gesamte Spark-Sitzung festlegen, indem Sie spark.databricks.delta.schema.autoMerge = Trueder Konfiguration Spark hinzufügen . Verwenden Sie dies jedoch mit Vorsicht, da Sie beim Durchsetzen eines Schemas nicht mehr vor versehentlichen Inkonsistenzen mit dem Schema gewarnt werden.

Durch mergeSchemaEinfügen eines Parameters in die Abfrage werden alle Spalten, die im DataFrame vorhanden sind, sich jedoch nicht in der Zieltabelle befinden, als Teil der Schreibtransaktion automatisch am Ende des Schemas hinzugefügt. Verschachtelte Felder können ebenfalls hinzugefügt werden und werden auch am Ende der entsprechenden Strukturspalten hinzugefügt.

Datumsingenieure und Wissenschaftler können diese Option verwenden, um neue Spalten (möglicherweise eine kürzlich erfasste Metrik oder eine Spalte mit Verkaufsmetriken in diesem Monat) zu ihren vorhandenen Produktionstabellen für maschinelles Lernen hinzuzufügen, ohne vorhandene Modelle basierend auf alten Spalten zu beschädigen.

Die folgenden Arten von Schemaänderungen sind im Rahmen der Weiterentwicklung des Schemas beim Hinzufügen oder Umschreiben einer Tabelle zulässig:

  • Hinzufügen neuer Spalten (dies ist das häufigste Szenario)
  • Ändern Sie die Datentypen von NullType -> einem anderen Typ oder erhöhen Sie sie von ByteType -> ShortType -> IntegerType

Andere Änderungen, die im Rahmen der Entwicklung eines Schemas nicht akzeptabel sind, erfordern, dass das Schema und die Daten durch Hinzufügen überschrieben werden .option("overwriteSchema", "true"). Wenn beispielsweise die Spalte „Foo“ ursprünglich eine Ganzzahl war und das neue Schema ein Zeichenfolgendatentyp ist, müssen alle Parkettdateien (Daten) neu geschrieben werden. Diese Änderungen umfassen:

  • Spalte löschen
  • Ändern Sie den Datentyp einer vorhandenen Spalte (vorhanden).
  • Umbenennen von Spalten, bei denen nur zwischen Groß- und Kleinschreibung unterschieden wird (z. B. "Foo" und "foo")

Schließlich wird mit der nächsten Version von Spark 3.0 die explizite DDL (mit ALTER TABLE) vollständig unterstützt, sodass Benutzer die folgenden Aktionen für Tabellenschemata ausführen können:

  • Spalten hinzufügen
  • Spaltenkommentare ändern
  • Festlegen von Tabelleneigenschaften, die das Verhalten der Tabelle bestimmen, z. B. Festlegen der Dauer der Transaktionsprotokollspeicherung.

Was nützt die Schaltungsentwicklung?


Sie können immer Schemaevolution verwenden , wenn Sie beabsichtigen, das Schema Ihrer Tabelle zu ändern (im Gegensatz zu den Fällen , im Gegensatz , wenn Sie versehentlich Spalten zu Ihrem Datenrahmen hinzufügen , die nicht da sein sollen). Dies ist der einfachste Weg, um Ihr Schema zu migrieren, da automatisch die richtigen Spaltennamen und Datentypen hinzugefügt werden, ohne dass diese explizit deklariert werden müssen.

Fazit


Das Erzwingen eines Schemas lehnt alle neuen Spalten oder andere Schemaänderungen ab, die nicht mit Ihrer Tabelle kompatibel sind. Durch die Festlegung und Aufrechterhaltung dieser hohen Standards können sich Analysten und Ingenieure auf die Tatsache verlassen, dass ihre Daten ein Höchstmaß an Integrität aufweisen. Sie können dies klar und deutlich begründen und so effektivere Geschäftsentscheidungen treffen.

Andererseits ergänzt die Entwicklung der Schaltung die Durchsetzung und vereinfacht die angeblichen automatischen Änderungen an der Schaltung. Am Ende sollte es nicht schwierig sein, eine Spalte hinzuzufügen.

Die erzwungene Anwendung der Schaltung ist Yang, wobei die Entwicklung der Schaltung Yin ist. Zusammengenommen machen diese Funktionen die Rauschunterdrückung und Signalabstimmung einfacher als je zuvor.

Wir möchten uns auch bei Mukul Murti und Pranava Ananda für ihre Beiträge zu diesem Artikel bedanken.

Weitere Artikel in der Reihe:

Tauchen Sie ein in Delta Lake: Auspacken eines Transaktionsprotokolls



Zum Thema passende Artikel


Maschinelles Lernen auf Produktionsebene in Delta Lake

Was ist ein Datensee?



Erfahren Sie mehr über den Kurs



All Articles