Delta: plateforme de synchronisation et d'enrichissement des données

En prévision du lancement d'un nouveau volet au cours Data Engineer, nous avons préparé une traduction de matériel intéressant.






Aperçu


Nous allons parler d'un modèle assez populaire selon lequel les applications utilisent plusieurs magasins de données, où chaque magasin est utilisé à ses propres fins, par exemple, pour stocker la forme canonique de données (MySQL, etc.), fournir des capacités de recherche avancées (ElasticSearch, etc. .), la mise en cache (Memcached, etc.) et autres. En règle générale, lorsque vous utilisez plusieurs stockages de données, l'un d'entre eux fonctionne comme stockage principal et l'autre comme stockage dérivé. Le seul problème est de savoir comment synchroniser ces magasins de données.

Nous avons examiné un certain nombre de modèles différents qui ont tenté de résoudre le problème de la synchronisation de plusieurs référentiels, tels que la double entrée, les transactions distribuées, etc. Cependant, ces approches présentent des limites importantes en termes d'utilisation réelle, de fiabilité et de maintenance. Outre la synchronisation des données, certaines applications doivent également enrichir les données en appelant des services externes.

Pour résoudre ces problèmes, Delta a été développé. Delta est finalement une plate-forme cohérente et événementielle pour synchroniser et enrichir les données.

Solutions existantes


Double saisie


Pour synchroniser deux magasins de données, vous pouvez utiliser un double enregistrement, qui écrit dans un magasin, puis écrit immédiatement dans un autre. Le premier enregistrement peut être répété et le second peut être interrompu si le premier échoue après avoir épuisé le nombre de tentatives. Cependant, deux magasins de données peuvent arrêter la synchronisation si l'écriture dans le deuxième magasin échoue. Ce problème est généralement résolu en créant une procédure de récupération qui peut périodiquement retransférer les données du premier stockage vers le second ou le faire uniquement si des différences sont constatées dans les données.

Problèmes:

L'exécution d'une procédure de récupération est un travail spécifique qui ne peut pas être réutilisé. De plus, les données entre les stockages ne sont pas synchronisées jusqu'à ce que la procédure de récupération soit terminée. La solution est compliquée si plus de deux magasins de données sont utilisés. Enfin, la procédure de récupération peut ajouter du stress à la source de données d'origine.

Table des journaux des modifications


Lorsque des modifications se produisent dans un ensemble de tables (par exemple, l'insertion, la mise à jour et la suppression d'enregistrements), les enregistrements de modification sont ajoutés à la table de journal dans le cadre de la même transaction. Un autre thread ou processus demande constamment des événements à la table des journaux et les écrit dans un ou plusieurs stockages de données, lorsqu'il devient nécessaire de supprimer des événements de la table des journaux après avoir confirmé l'enregistrement par tous les stockages.

Problèmes:

ce modèle doit être implémenté en tant que bibliothèque et, idéalement, sans changer le code d'application l'utilisant. Dans un environnement polyglotte, l'implémentation d'une telle bibliothèque doit exister dans tous les langages nécessaires, mais il est très difficile d'assurer la coordination des fonctions et des comportements entre les langages.

Un autre problème réside dans l'obtention de modifications de schéma dans des systèmes qui ne prennent pas en charge les modifications de schéma transactionnelles [1] [2], comme MySQL. Par conséquent, un modèle pour effectuer une modification (par exemple, modifier un schéma) et l'écrire dans la table du journal des modifications ne fonctionnera pas toujours.

Transactions distribuées


Les transactions distribuées peuvent être utilisées pour fractionner une transaction entre plusieurs stockages de données hétérogènes afin que l'opération soit validée dans tous les magasins utilisés ou non validée dans aucun d'entre eux.

Problèmes:

Les transactions distribuées sont un très gros problème pour les entrepôts de données hétérogènes. De par leur nature, ils ne peuvent compter que sur le plus petit dénominateur commun des systèmes concernés. Par exemple, les transactions XA bloquent l'exécution en cas d'échec lors du processus de préparation. En outre, XA ne fournit pas de détection de blocage et ne prend pas en charge les schémas de gestion de concurrence optimiste. De plus, certains systèmes comme ElasticSearch ne prennent pas en charge XA ou tout autre modèle de transaction hétérogène. Ainsi, garantir l'atomicité de l'enregistrement dans diverses technologies de stockage de données reste une tâche très difficile pour les applications [3].

Delta


Delta a été conçu pour répondre aux limites des solutions de synchronisation de données existantes, et il enrichit également les données à la volée. Notre objectif était d'abstraire tous ces points complexes des développeurs d'applications afin qu'ils puissent se concentrer pleinement sur la mise en œuvre de la fonctionnalité métier. Ensuite, nous décrirons «Movie Search», le cas d'utilisation réel de Delta de Netflix.

Netflix utilise largement l'architecture de microservices et chaque microservice sert généralement un type de données. Les principales informations sur le film sont extraites d'un microservice appelé Movie Service, ainsi que les données connexes, telles que les informations sur les producteurs, les acteurs, les vendeurs, etc., sont gérées par plusieurs autres microservices (à savoir, Deal Service, Talent Service et Vendor Service).
Les utilisateurs professionnels des studios Netflix doivent souvent rechercher différents critères pour les films, c'est pourquoi il est très important pour eux de pouvoir rechercher toutes les données relatives aux films.

Avant Delta, l'équipe de recherche de films devait récupérer les données de plusieurs microservices avant d'indexer les données de films. De plus, l'équipe a dû développer un système qui mettrait périodiquement à jour l'index de recherche, en demandant des changements à d'autres microservices, même s'il n'y avait aucun changement. Ce système est rapidement devenu envahi par la complexité et est devenu difficile à entretenir.


Figure 1. Système de vote avant Delta
Après avoir utilisé Delta, le système a été simplifié en un système piloté par les événements, comme illustré dans la figure suivante. Les événements CDC (Change-Data-Capture) sont envoyés aux rubriques Keystone Kafka à l'aide du Delta-Connector. Une application Delta construite à l'aide du Delta Stream Processing Framework (basé sur Flink) reçoit les événements CDC du sujet, les enrichit, en invoquant d'autres microservices, et transmet enfin les données enrichies à l'index de recherche dans Elasticsearch. L'ensemble du processus se déroule en temps quasi réel, c'est-à-dire que dès que les modifications sont enregistrées dans l'entrepôt de données, les index de recherche sont mis à jour.


Figure 2. Pipeline de données utilisant Delta
Dans les sections suivantes, nous décrivons le travail du Delta-Connector, qui se connecte au référentiel et publie les événements CDC au niveau du transport, qui est une infrastructure de transmission de données en temps réel qui dirige les événements CDC vers les sujets Kafka. Et à la toute fin, nous parlerons de l'infrastructure de traitement de flux Delta que les développeurs d'applications peuvent utiliser pour la logique de traitement et d'enrichissement.

CDC (Change-Data-Capture)


Nous avons développé un service CDC appelé Delta-Connector, qui peut capturer les modifications validées du magasin de données en temps réel et les écrire dans le flux. Les modifications en temps réel sont extraites du journal des transactions et des vidages de stockage. Les vidages sont utilisés car les journaux de transactions ne stockent généralement pas l'intégralité de l'historique des modifications. Les modifications sont généralement sérialisées en tant qu'événements Delta, de sorte que le destinataire n'a pas à se soucier de l'origine du changement.

Delta-Connector prend en charge plusieurs fonctionnalités supplémentaires, telles que:

  • PossibilitĂ© d'Ă©crire une sortie personnalisĂ©e après Kafka.
  • La possibilitĂ© d'activer les vidages manuels Ă  tout moment pour toutes les tables, une table spĂ©cifique ou pour certaines clĂ©s primaires.
  • Les vidages peuvent ĂŞtre rĂ©cupĂ©rĂ©s par morceaux, il n'est donc pas nĂ©cessaire de tout recommencer en cas de panne.
  • Il n'est pas nĂ©cessaire de mettre des verrous sur les tables, ce qui est très important pour que le trafic d'Ă©criture vers la base de donnĂ©es ne soit jamais bloquĂ© par notre service.
  • Haute disponibilitĂ© grâce aux sauvegardes dans les zones de disponibilitĂ© AWS.

Nous prenons actuellement en charge MySQL et Postgres, y compris les déploiements sur AWS RDS et Aurora. Nous prenons également en charge Cassandra (multimaître). Vous pouvez en savoir plus sur le Delta-Connector sur ce blog .

Kafka et niveau de transport


La couche Delta Event Transport est basée sur le service de messagerie de la plate-forme Keystone .

Donc, historiquement, la publication sur Netflix a été optimisée pour une disponibilité accrue plutôt que pour la longévité (voir l'article précédent ). Un compromis était l'incohérence potentielle des données des courtiers dans divers scénarios frontaliers. Par exemple, l' élection d'un chef impur est chargée de s'assurer que le destinataire duplique ou perd potentiellement des événements.

Avec Delta, nous voulions obtenir des garanties de durabilité plus solides afin d'assurer la livraison des événements CDC aux stockages dérivés. Pour ce faire, nous avons proposé un cluster Kafka spécialement conçu comme objet de première classe. Vous pouvez consulter certains paramètres de courtier dans le tableau ci-dessous:



Dans les clusters Keystone Kafka, l' élection de dirigeants impurs est généralement activée pour garantir la disponibilité de l'éditeur. Cela peut entraîner une perte de message si une réplique non synchronisée est sélectionnée comme leader. Pour le nouveau cluster Kafka hautement fiable, l'option d' élection de chef impur est désactivée pour éviter la perte de messages.

Nous avons également augmenté le facteur de réplication de 2 à 3 et les répliques d'insync minimalesde 1 à 2. Les éditeurs qui écrivent dans ce cluster ont besoin d'acquis de tous les autres, garantissant que 2 répliques sur 3 auront les messages les plus à jour envoyés par l'éditeur.

Lorsque l'instance de courtier se ferme, la nouvelle instance remplace l'ancienne. Cependant, le nouveau courtier devra rattraper les répliques non synchronisées, ce qui peut prendre plusieurs heures. Pour réduire le temps de récupération de ce scénario, nous avons commencé à utiliser Amazon Elastic Block Store au lieu des disques des courtiers locaux. Lorsqu'une nouvelle instance remplace une instance de courtier terminée, elle attache le volume EBS que l'instance terminée possédait et commence à rattraper les nouveaux messages. Ce processus réduit le temps nécessaire pour éliminer le backlog de plusieurs heures à plusieurs minutes, car la nouvelle instance n'a plus besoin d'être répliquée à partir d'un état vide. En général, des cycles de vie de stockage et de courtage séparés réduisent considérablement l'effet du changement de courtier.

Pour augmenter encore la garantie de livraison des données, nous avons utilisé un système de suivi des messages pour détecter toute perte de messages dans des conditions extrêmes (par exemple, synchronisation d'horloge dans le chef de section).

Cadre de traitement de flux


Le niveau de traitement chez Delta est basé sur la plate-forme Netflix SPaaS, qui permet l'intégration d'Apache Flink avec l'écosystème Netflix. La plate-forme fournit une interface utilisateur qui contrôle le déploiement des travaux Flink et l'orchestration des clusters Flink au-dessus de notre plate-forme de gestion de conteneurs Titus. L'interface gère également les configurations de travaux et permet aux utilisateurs d'effectuer des changements de configuration de manière dynamique sans avoir à recompiler les travaux Flink.

Delta fournit un cadre de traitement de flux pour les données basées sur Flink et SPaaS, qui utilise des annotationsDSL (Domain Specific Language) pour résumer les détails techniques. Par exemple, afin de déterminer l'étape par laquelle les événements seront enrichis en appelant des services externes, les utilisateurs doivent écrire la prochaine DSL, et le framework créera un modèle basé sur celui-ci que Flink exécutera.


Figure 3. Exemple d'enrichissement DSL dans Delta

Le cadre de traitement raccourcit non seulement la courbe d'apprentissage, mais fournit également des fonctions générales de traitement des flux, telles que la déduplication, la schématisation, ainsi que la flexibilité et la tolérance aux pannes pour résoudre les problèmes courants au travail.

Le Delta Stream Processing Framework se compose de deux modules clés, le module DSL & API et le module Runtime. Le module DSL et API fournit les API DSL et UDF (User-Defined-Function) afin que les utilisateurs puissent écrire leur propre logique de traitement (comme le filtrage ou les transformations). Le module Runtime fournit une implémentation de l'analyseur DSL, qui construit une représentation interne des étapes de traitement dans les modèles DAG. Le composant Execution interprète les modèles DAG pour initialiser les instructions Flink réelles et finalement lancer l'application Flink. L'architecture du cadre est illustrée dans la figure suivante.


Figure 4. Architecture du cadre de traitement du flux Delta

Cette approche présente plusieurs avantages:

  • - Flink SPaaS.
  • , - (UDF).
  • Delta , , .



Delta est en production depuis plus d'un an et joue un rôle clé dans de nombreuses applications Netflix Studio. Il a aidé les équipes à mettre en œuvre des cas d'utilisation tels que l'indexation de recherche, le stockage de données et les workflows événementiels. Voici un aperçu de l'architecture de haut niveau de la plate-forme Delta.


Figure 5. L'architecture de haut niveau de Delta.

Remerciements


Nous tenons à remercier les personnes suivantes qui ont contribué à la création et au développement de Delta chez Netflix: Allen Wang, Charles Zhao, Jaebin Yoon, Josh Snyder, Kasturi Chatterjee, Mark Cho, Olof Johansson, Piyush Goyal, Prashanth Ramdas, Raghuram Onti Srinivasan, Sandeep Gupta , Steven Wu, Tharanga Gamaethige, Yun Wang et Zhenzhong Xu.

Sources


  1. dev.mysql.com/doc/refman/5.7/en/implicit-commit.html
  2. dev.mysql.com/doc/refman/5.7/en/cannot-roll-back.html
  3. Martin Kleppmann, Alastair R. Beresford, Boerge Svingen: Online event processing. Commun. ACM 62(5): 43–49 (2019). DOI: doi.org/10.1145/3312527

: «Data Build Tool Amazon Redshift».

Source: https://habr.com/ru/post/undefined/


All Articles