Plongez dans Delta Lake: application et évolution

Bonjour, Habr! Je vous présente la traduction de l'article «Diving Into Delta Lake: Schema Enforcement & Evolution» de Burak Yavuz, Brenner Heintz et Denny Lee, qui a été préparé avant le début du cours Data Engineer d'OTUS.





Les données, comme notre expérience, s'accumulent et se développent constamment. Pour suivre, nos modèles mentaux du monde doivent s'adapter à de nouvelles données, dont certaines contiennent de nouvelles dimensions - de nouvelles façons d'observer des choses dont nous n'avions aucune idée auparavant. Ces modèles mentaux ne sont pas très différents des schémas de table qui déterminent la façon dont nous classons et traitons les nouvelles informations.

Cela nous amène à la question de la gestion des circuits. À mesure que les tâches et les exigences métier évoluent avec le temps, la structure de vos données change. Delta Lake facilite l'implémentation de nouvelles mesures lorsque les données changent. Les utilisateurs ont accès à une sémantique simple pour gérer les dispositions de leurs tables. Ces outils incluent Schema Enforcement, qui protège les utilisateurs contre le colmatage par inadvertance de leurs tables avec des erreurs ou des données inutiles, ainsi que Schema Evolution, qui ajoute automatiquement de nouvelles colonnes contenant des données précieuses aux endroits appropriés. Dans cet article, nous approfondirons l'utilisation de ces outils.

Comprendre les schémas de table


Chaque DataFrame dans Apache Spark contient un schéma qui définit un formulaire de données, tel que les types de données, les colonnes et les métadonnées. À l'aide de Delta Lake, le schéma de table est enregistré au format JSON dans le journal des transactions.

Qu'est-ce que le schéma d'application?


L'application de schéma, également connue sous le nom de validation de schéma, est un mécanisme de défense dans Delta Lake qui garantit la qualité des données en rejetant les enregistrements qui ne correspondent pas au schéma de table. Comme l'hôtesse à la réception dans un restaurant populaire, qui n'accepte que sur réservation préalable, il vérifie si chaque colonne de données saisie dans le tableau se trouve dans la liste correspondante des colonnes attendues (en d'autres termes, y a-t-il une «réservation» pour chacune d'entre elles), et rejette toutes les entrées dont les colonnes ne figurent pas dans la liste.

Comment fonctionne l'application d'un circuit?


Delta Lake utilise une vérification de schéma lors de l'écriture, ce qui signifie que la compatibilité de tous les nouveaux enregistrements de la table avec le schéma de la table cible lors de l'enregistrement est vérifiée. Si le schéma est incompatible, Delta Lake annule complètement la transaction (les données ne sont pas écrites) et lève une exception pour informer l'utilisateur de l'écart.
Pour déterminer la compatibilité des enregistrements avec une table, Delta Lake utilise les règles suivantes. DataFrame inscriptible:

  • ne peut pas contenir de colonnes supplémentaires qui ne figurent pas dans le schéma de la table cible. Et vice versa, tout est en ordre si les données d'entrée ne contiennent pas absolument toutes les colonnes de la table - ces colonnes se verront simplement attribuer des valeurs nulles.
  • , . StringType, DataFrame IntegerType, .
  • ne peut pas contenir de noms de colonnes qui ne diffèrent qu'en cas. Cela signifie que vous ne pouvez pas avoir de colonnes avec les noms «Foo» et «foo» définis dans la même table. Bien que Spark puisse être utilisé en respectant la casse ou insensible à la casse (par défaut), Delta Lake est sensible à la casse, mais insensible à la casse. Parquet est sensible à la casse lors du stockage et du renvoi des informations de colonne. Afin d'éviter d'éventuelles erreurs, corruption de données ou perte de données (que nous avons personnellement rencontrées dans les Databricks), nous avons décidé d'ajouter cette restriction.

Pour illustrer cela, examinons ce qui se passe dans le code ci-dessous lorsque vous essayez d'ajouter des colonnes récemment générées à une table Delta Lake qui n'est pas encore configurée pour les accepter.

#  DataFrame ,       Delta Lake
loans = sql("""
            SELECT addr_state, CAST(rand(10)*count as bigint) AS count,
            CAST(rand(10) * 10000 * count AS double) AS amount
            FROM loan_by_state_delta
            """)

#    DataFrame
original_loans.printSchema()

root
  |-- addr_state: string (nullable = true)
  |-- count: integer (nullable = true)
 
#    DataFrame
loans.printSchema()
 
root
  |-- addr_state: string (nullable = true)
  |-- count: integer (nullable = true)
  |-- amount: double (nullable = true) # new column
 
#    DataFrame (  )   
loans.write.format("delta") \
           .mode("append") \
           .save(DELTALAKE_PATH)

Returns:

A schema mismatch detected when writing to the Delta table.
 
To enable schema migration, please set:
'.option("mergeSchema", "true")\'
 
Table schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
 
Data schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
-- amount: double (nullable = true)
 
If Table ACLs are enabled, these options will be ignored. Please use the ALTER TABLE command for changing the schema.

Au lieu d'ajouter automatiquement de nouvelles colonnes, Delta Lake impose un graphique et arrête l'enregistrement. Pour aider à déterminer quelle colonne (ou plusieurs d'entre elles) est à l'origine de la non-concordance, Spark extrait les deux schémas de la pile de pile pour comparaison.

Quelle est l'utilité d'appliquer un programme?


Étant donné que l'application d'un schéma est un test assez rigoureux, c'est un excellent outil pour utiliser un ensemble de données propre et entièrement transformé qui est prêt pour la production ou la consommation en tant que contrôleur d'accès. En règle générale, il est appliqué aux tables qui fournissent directement des données:

  • Algorithmes d'apprentissage automatique
  • Tableaux de bord BI
  • Outils d'analyse et de visualisation des données
  • Tout système de production qui nécessite des schémas sémantiques strictement structurés et fortement typés.

Pour préparer leurs données à cette dernière barrière, de nombreux utilisateurs utilisent une architecture simple «multi-sauts», qui structure progressivement leurs tables. Pour en savoir plus à ce sujet, vous pouvez consulter l'article d' apprentissage automatique au niveau machine de Delta Lake.

Bien sûr, vous pouvez utiliser l'application forcée du schéma n'importe où dans votre pipeline, mais rappelez-vous que le streaming vers une table dans ce cas peut être frustrant, car, par exemple, vous avez oublié que vous avez ajouté une autre colonne aux données d'entrée.

Prévention de l'amincissement des données


À ce stade, vous vous demandez peut-être pourquoi une telle agitation? Après tout, parfois une erreur inattendue de «discordance de schéma» peut vous placer sur le train en marche de votre flux de travail, surtout si vous êtes nouveau sur Delta Lake. Pourquoi ne pas simplement laisser le schéma changer au besoin pour que je puisse écrire mon DataFrame, quoi qu'il arrive?

Comme le dit le vieil adage, "une once de prévention vaut mieux que guérir". À un moment donné, si vous ne prenez pas soin d'appliquer votre schéma, les problèmes de compatibilité des types de données soulèveront vos têtes dégoûtantes - à première vue, les sources homogènes de données brutes peuvent contenir des cas limites, des colonnes endommagées, des mappages malformés ou d'autres choses effrayantes qui rêvent dans les cauchemars. La meilleure approche consiste à arrêter ces ennemis aux portes - en appliquant le schéma - et à les traiter à la lumière, pas plus tard, lorsqu'ils commencent à parcourir les profondeurs sombres de votre code de travail.

L'application d'un schéma vous donne l'assurance que le schéma de votre table ne changera pas, sauf si vous confirmez vous-même l'option de modification. Cela évite la dilution des données qui peut se produire lorsque de nouvelles colonnes sont ajoutées si souvent que les tables compressées précédemment valables perdent leur valeur et leur utilité en raison des flux de données. En vous encourageant à être intentionnel, à fixer des normes élevées et à vous attendre à une qualité élevée, l'application du programme fait exactement ce qu'il était censé vous aider à rester honnête et à garder vos tables propres.

Si, après un examen plus approfondi, vous décidez que vous devez vraiment ajouter une nouvelle colonne - pas de problème, voici un correctif sur une seule ligne. La solution est l'évolution du circuit!

Qu'est-ce que l'évolution du circuit?


L'évolution du schéma est une fonctionnalité qui permet aux utilisateurs de modifier facilement le schéma de table actuel en fonction des données qui évoluent dans le temps. Le plus souvent, il est utilisé lors d'une opération d'ajout ou de remplacement pour adapter automatiquement la mise en page pour inclure une ou plusieurs nouvelles colonnes.

Comment fonctionne l'évolution des circuits?


En suivant l'exemple de la section précédente, les développeurs peuvent facilement utiliser l'évolution du schéma pour ajouter de nouvelles colonnes qui ont été précédemment rejetées en raison de l'inadéquation du schéma. L'évolution du schéma est activée en ajoutant .option('mergeSchema', 'true')à votre équipe Spark..write .writeStream.

#   mergeSchema
loans.write.format("delta") \
           .option("mergeSchema", "true") \
           .mode("append") \
           .save(DELTALAKE_SILVER_PATH)

Pour afficher le graphique, exécutez la requête Spark SQL suivante

#     ,  ,    
%sql
SELECT addr_state, sum(`amount`) AS amount
FROM loan_by_state_delta
GROUP BY addr_state
ORDER BY sum(`amount`)
DESC LIMIT 10

image
Vous pouvez également définir cette option pour toute la session Spark en ajoutant spark.databricks.delta.schema.autoMerge = TrueSpark à la configuration. Mais utilisez-le avec prudence, car l'application d'un schéma ne vous avertira plus des incohérences involontaires avec le schéma.

En incluant un paramètre dans la requête mergeSchema, toutes les colonnes qui sont présentes dans le DataFrame mais qui ne sont pas dans la table cible sont automatiquement ajoutées à la fin du schéma dans le cadre de la transaction d'écriture. Les champs imbriqués peuvent également être ajoutés et ils seront également ajoutés à la fin des colonnes de structure correspondantes.

Les ingénieurs de date et les scientifiques peuvent utiliser cette option pour ajouter de nouvelles colonnes (peut-être une mesure récemment suivie ou une colonne de mesures de vente ce mois-ci) à leurs tables de production d'apprentissage automatique existantes sans casser les modèles existants basés sur d'anciennes colonnes.

Les types de modifications de schéma suivants sont autorisés dans le cadre de l'évolution du schéma lors de l'ajout ou de la réécriture d'une table:

  • Ajout de nouvelles colonnes (c'est le scénario le plus courant)
  • Modifier les types de données de NullType -> tout autre type ou augmenter de ByteType -> ShortType -> IntegerType

D'autres modifications inacceptables dans le cadre de l'évolution d'un schéma nécessitent que le schéma et les données soient écrasés par l'ajout .option("overwriteSchema", "true"). Par exemple, dans le cas où la colonne "Foo" était à l'origine un entier et que le nouveau schéma serait un type de données de chaîne, tous les fichiers Parquet (données) devraient être réécrits. Ces changements comprennent:

  • supprimer la colonne
  • changer le type de données d'une colonne existante (en place)
  • renommer des colonnes qui ne sont sensibles qu'à la casse (par exemple, "Foo" et "foo")

Enfin, avec la prochaine version de Spark 3.0, le DDL explicite (en utilisant ALTER TABLE) sera entièrement pris en charge, ce qui permettra aux utilisateurs d'effectuer les actions suivantes sur les schémas de table:

  • ajout de colonnes
  • modifier les commentaires de colonne
  • définir des propriétés de table qui déterminent le comportement de la table, par exemple, définir la durée de stockage du journal des transactions.

Quelle est l'utilité de l'évolution des circuits?


Vous pouvez toujours utiliser l'évolution du schéma lorsque vous avez l' intention de modifier le schéma de votre table (par opposition aux cas où vous avez accidentellement ajouté à votre DataFrame des colonnes qui ne devraient pas y être). C'est le moyen le plus simple de migrer votre schéma car il ajoute automatiquement les noms de colonne et les types de données corrects sans qu'il soit nécessaire de les déclarer explicitement.

Conclusion


Forcer un schéma rejette toutes les nouvelles colonnes ou autres modifications de schéma qui ne sont pas compatibles avec votre table. En fixant et en maintenant ces normes élevées, les analystes et les ingénieurs peuvent compter sur le fait que leurs données ont le plus haut niveau d'intégrité, en raisonnant clairement et clairement, ce qui leur permet de prendre des décisions commerciales plus efficaces.

D'autre part, l'évolution du circuit complète l'application, simplifiant les prétendues modifications automatiques du circuit. Au final, il ne devrait pas être difficile d'ajouter une colonne.

L'application forcée du circuit est le yang, où l'évolution du circuit est le yin. Lorsqu'elles sont utilisées ensemble, ces fonctionnalités rendent la réduction du bruit et le réglage du signal plus faciles que jamais.

Nous tenons également à remercier Mukul Murti et Pranava Ananda pour leurs contributions à cet article.

Autres articles de la série:

Plongez dans Delta Lake: décompresser un journal de transactions



Articles Liés


Apprentissage automatique au niveau de la production de Delta Lake

Qu'est-ce qu'un lac de données?



En savoir plus sur le cours



All Articles