DBLog - un cadre commun pour la capture de données modifiées

Bonjour à tous! Nous vous proposons de lire la traduction de l'article, que nous avons préparé spécialement pour les étudiants du cours "Architecte High Load" .




introduction


Le suivi des modifications de données (Change Data Capture, CDC) vous permet de recevoir en temps réel les modifications validées dans la base de données et de les distribuer à différents consommateurs [1] [2]. Le CDC devient de plus en plus populaire lorsqu'une synchronisation entre un stockage de données hétérogène est requise (par exemple, MySQL et ElasticSearch) et est une alternative aux méthodes traditionnelles telles que les transactions à double écriture et distribuées [3] [4].

La source du CDC, dans les bases de données telles que MySQL et PostgreSQL, est le journal des transactions (journal des transactions). Mais étant donné que les journaux de transactions sont généralement tronqués, ils peuvent ne pas contenir l'historique complet des modifications. Par conséquent, pour obtenir l'état complet de la source, nous avons besoin de vidages. Nous avons examiné plusieurs projets CDC open source, utilisant souvent les mêmes bibliothèques, API de base de données et protocoles, et y avons trouvé un certain nombre de limitations qui ne répondaient pas à nos exigences. Par exemple, l'arrêt du traitement des événements du journal jusqu'à la fin du vidage (instantané complet des données), l'impossibilité d'initier le vidage sur demande ou la mise en œuvre, affectant le trafic d'écriture en raison de l'utilisation de verrous de table.

Cela nous a conduit à développer DBLog.avec une approche unifiée du traitement des journaux et des vidages. Pour le prendre en charge, un certain nombre de fonctions devraient être implémentées dans le SGBD, qui sont déjà dans MySQL, PostgreSQL, MariaDB et un certain nombre d'autres bases de données.

Certaines des fonctionnalités de DBLog:

  • Les événements de journal sont traités dans l'ordre où ils se produisent.
  • Des vidages peuvent être effectués à tout moment pour toutes les tables, pour une table ou pour des clés primaires spécifiques d'une table.
  • Le traitement des journaux alterne avec le traitement des vidages, divisant le vidage en blocs. Ainsi, le traitement des journaux peut avoir lieu en parallèle avec le traitement de vidage. Si le processus se termine, il peut être repris après le dernier bloc terminé sans avoir à tout recommencer. Il vous permet également d'ajuster le débit lors de la création d'un vidage et, si nécessaire, de suspendre sa création.
  • , .
  • : , API.
  • . , .


Plus tôt, nous avons discuté de Delta ( traduction ), une plate-forme d'enrichissement et de synchronisation des données. L'objectif de Delta est de synchroniser plusieurs magasins de données, dont l'un est principal (par exemple, MySQL) et les autres dérivés (par exemple, ElasticSearch). L'une des principales exigences de développement était un faible retard dans la propagation des modifications de la source vers les destinataires, ainsi qu'une haute disponibilité du flux d'événements. Ces conditions s'appliquent indépendamment du fait que tous les entrepôts de données soient utilisés par une équipe ou si une équipe possède les données et que l'autre les consomme. Dans un article sur Delta ( traduction ), nous avons également décrit des cas d'utilisation qui vont au-delà de la synchronisation des données, comme le traitement des événements.

Pour synchroniser les données et traiter les événements, en plus de pouvoir suivre les changements en temps réel, nous devons remplir les conditions suivantes:

  • Obtenir l'état complet . Les magasins dérivés (comme ElasticSearch) devraient en fin de compte stocker l'état complet de la source. Nous l'implémentons via des vidages de la base de données d'origine.
  • Démarrez la récupération de l'état à tout moment. Au lieu de considérer le vidage comme une opération ponctuelle uniquement pour l'initialisation principale, nous pouvons le faire à tout moment: pour toutes les tables, pour une table ou pour des clés primaires spécifiques. Ceci est très important pour la récupération des consommateurs en cas de perte ou d'endommagement des données.
  • - . . , (, ). - . , - .
  • . . API, , , . , , , .
  • . Netflix , Kafka, SQS, Kinesis, Netflix, Keystone. , (, ), (, ). . API.
  • . Netflix , (MySQL, PostgreSQL) AWS RDS. .



Nous avons examiné plusieurs solutions open source existantes, notamment: Maxwell , SpinalTap , Yelp MySQL Streamer et Debezium . En termes de collecte de données, ils fonctionnent tous de manière similaire, en utilisant un journal des transactions. Par exemple, en utilisant le protocole de réplication binlog dans MySQL ou les emplacements de réplication dans PostgreSQL.

Mais lors du traitement des vidages, ils présentent au moins l'une des limitations suivantes:

  • Arrêtez de traiter les événements du journal lors de la création d'un vidage . Par conséquent, si le vidage est volumineux, le traitement des événements de journal s'arrête pendant une longue période. Ce sera un problème si les consommateurs comptent sur de petits retards pour propager les changements.
  • . . (, ElasticSearch) .
  • . . [5]. . , . . , PostgreSQL RDS .
  • Utilisation de fonctions de base de données spécifiques . Nous avons constaté que certaines solutions utilisent des fonctionnalités de base de données supplémentaires qui ne sont pas présentes sur tous les systèmes. Par exemple, utiliser le moteur Blackhole dans MySQL ou obtenir un instantané cohérent des vidages via les emplacements de réplication dans PostgreSQL. Cela limite la réutilisation du code entre différentes bases de données.

En fin de compte, nous avons décidé d'adopter une approche différente pour travailler avec les vidages:

  • les événements de journalisation et de vidage alternatifs afin qu'ils puissent être exécutés ensemble;
  • démarrer un vidage à tout moment;
  • N'utilisez pas de verrous de table
  • N'utilisez pas de fonctionnalités de base de données spécifiques.

Cadre DBLog


DBLog est un framework java pour recevoir des vidages et des modifications en temps réel. Les vidages sont effectués par parties afin qu'ils alternent avec les événements en temps réel et ne retardent pas leur traitement pendant une longue période. Les vidages peuvent être effectués à tout moment via l'API. Cela permet aux consommateurs d'obtenir l'état complet de la base de données au stade de l'initialisation ou ultérieurement pour la reprise après sinistre.

Lors de la conception du framework, nous avons pensé à minimiser l'impact sur la base de données. Les vidages peuvent être interrompus et repris si nécessaire. Cela fonctionne à la fois pour la récupération après incident et pour l'arrêt si la base de données est devenue un goulot d'étranglement. Nous ne verrouillons pas non plus les tables afin de ne pas affecter les opérations d'écriture.

DBLog vous permet d'enregistrer des événements sous différentes formes, y compris dans une autre base de données ou via l'API. Pour stocker l'état associé au traitement des journaux et des vidages, ainsi que pour sélectionner le nœud maître, nous utilisons Zookeeper. Lors de la création de DBLog, nous avons implémenté la possibilité de connecter différents plugins, vous permettant de modifier l'implémentation comme vous le souhaitez (par exemple, remplacer Zookeeper par autre chose).

Ensuite, nous considérons plus en détail le traitement des journaux et des vidages.

Journaux


Le cadre requiert que la base de données enregistre les événements pour chaque ligne modifiée en temps réel, tout en maintenant l'ordre des validations. La source de ces événements est supposée être le journal des transactions. La base de données les envoie via un transport que DBLog peut utiliser. Pour ce transport, nous utilisons le terme «journal des modifications». Un événement peut être des types suivants: créer, créer, mettre à jour ou supprimer. Pour chaque événement, les informations suivantes doivent être fournies: le numéro de séquence dans le journal (numéro de séquence du journal), l'état de la colonne pendant l'opération et le schéma appliqué au moment de l'opération.

Chaque modification est sérialisée dans le format d'événement DBLog et envoyée au rédacteur, pour son transfert ultérieur vers la sortie. L'envoi d'événements à l'enregistreur est une opération non bloquante, car l'enregistreur s'exécute dans un thread distinct et accumule les événements dans le tampon interne. Les événements enregistrés sont envoyés dans l'ordre où ils ont été reçus. Le cadre vous permet de connecter un formateur personnalisé pour sérialiser les événements dans un format arbitraire. La sortie est une interface simple qui vous permet de vous connecter à n'importe quel destinataire, tel qu'un flux, un entrepôt de données ou même une API.

Dumps


Les vidages sont nécessaires car les journaux de transactions ont un temps de stockage limité, ce qui les empêche d'être utilisés pour restaurer l'ensemble de données d'origine complet. Les vidages sont créés en blocs (morceaux) afin de pouvoir alterner avec les événements du journal, ce qui permet de les traiter simultanément. Pour chaque ligne sélectionnée du bloc, un événement est généré et sérialisé au même format que l'événement de journal. Ainsi, le consommateur n'a pas à s'inquiéter qu'un événement provienne du journal ou du vidage. Les événements de journal et les événements de vidage sont envoyés à la sortie via le même écrivain.

Les vidages peuvent être planifiés à tout moment via l'API pour toutes les tables, une table ou pour des clés primaires spécifiques de la table. Un vidage de la table est effectué par blocs d'une taille donnée. Vous pouvez également configurer un délai dans le traitement des nouveaux blocs, autorisant uniquement les événements de journal pour le moment. La taille et le délai des blocs vous permettent d'équilibrer le traitement des événements de journal et de vidage. Les deux paramètres peuvent être modifiés lors de l'exécution.

Les blocs (bloc) sont sélectionnés en triant le tableau dans l'ordre croissant de la clé primaire et en sélectionnant des lignes où la clé primaire est supérieure à la dernière clé primaire du bloc précédent. La base de données est requise pour exécuter cette requête efficacement, ce qui est généralement applicable aux systèmes qui implémentent une analyse de plage sur une plage de clés primaires.


Figure 1. Répartition des tableaux avec 4 colonnes c1-c4 et c1 comme clé primaire (pk). Clé primaire d'un type entier, taille de bloc 3. Le bloc 2 est sélectionné par la condition c1> 4. Les

blocs doivent être pris de manière à ne pas retarder le traitement des événements du journal pendant une longue période et à sauvegarder l'historique des modifications afin que la ligne sélectionnée avec l'ancienne valeur ne puisse pas remplacer la plus récente. un événement.

Afin de pouvoir sélectionner les blocs de manière séquentielle, nous créons dans le journal des modifications des «filigranes» reconnaissables. Les filigranes sont implémentés via une table dans la base de données source. Cette table est stockée dans un espace de noms spécial afin qu'il n'y ait aucun conflit avec les tables d'application. Une seule ligne avec une valeur UUID y est stockée. Un filigrane est créé lorsque cette valeur devient un UUID spécifique. La mise à jour de la ligne entraîne un événement de modification, que nous recevons finalement via le journal des modifications.

Les vidages filigranés sont créés comme suit:

  1. Nous suspendons temporairement le traitement des événements du journal.
  2. Générez un filigrane «bas» en mettant à jour la table des filigranes.
  3. Nous démarrons SELECT pour le bloc suivant et enregistrons en mémoire le résultat indexé par la clé primaire.
  4. “” (high) , .
  5. . .
  6. , .
  7. , , .
  8. , 1.

SELECT est censé renvoyer un état qui représente un changement validé jusqu'à un point de l'histoire. Ou, équivalent à ce qui suit: SELECT est exécuté dans une certaine position du journal des modifications, en tenant compte des modifications jusqu'à ce point. Les bases de données ne fournissent généralement pas d'informations sur le temps d'exécution d'un SELECT (à l'exception de MariaDB ).

L'idée principale de notre approche est que dans le journal des modifications, nous définissons une fenêtre qui garantit la préservation de la position SELECT dans le bloc. La fenêtre est ouverte en écrivant le filigrane inférieur, après quoi SELECT est exécuté, et la fenêtre est fermée en écrivant le filigrane supérieur. Étant donné que la position exacte de SELECT est inconnue, toutes les lignes sélectionnées qui entrent en conflit avec les événements de journal dans cette fenêtre sont supprimées. Cela garantit qu'il n'y a pas de réécriture de l'historique dans le journal des modifications.

Pour que cela fonctionne, SELECT doit lire l'état de la table à partir du moment du filigrane inférieur ou ultérieur (il est possible d'inclure les modifications qui ont été apportées après le filigrane inférieur et avant la lecture). En général, SELECT est nécessaire pour voir les modifications apportées avant son exécution.. Nous l'appelons «lectures non périmées». De plus, puisque le filigrane supérieur est écrit après, il est garanti que SELECT sera exécuté avant lui.

Les figures 2a et 2b illustrent l'algorithme de sélection de bloc. À titre d'exemple, nous donnons une table avec des clés primaires de k1 à k6. Chaque entrée du journal des modifications représente un événement de création, de mise à jour ou de suppression pour la clé primaire. La figure 2a montre la génération de filigrane et la sélection de bloc (étapes 1 à 4). La mise à jour de la table des filigranes aux étapes 2 et 4 crée deux événements de modification (magenta), qui sont finalement reçus via le journal. Dans la figure 2b, nous nous concentrons sur les lignes du bloc actuel qui sont supprimées du jeu de résultats avec des clés primaires qui apparaissent entre les filigranes (étapes 5 à 7).


Figure 2a - Algorithme de filigrane pour la sélection de bloc (étapes 1 à 4).


Figure 2b - Algorithme de filigrane pour sélectionner des blocs (étapes 5 à 7).

Veuillez noter qu'entre les filigranes inférieur et supérieur, un grand nombre d'événements peuvent apparaître dans le journal si une ou plusieurs transactions ont effectué de nombreux changements de ligne. Pour cette raison, nous faisons une suspension à court terme du traitement du journal aux étapes 2 à 4 afin de ne pas manquer les filigranes. Ainsi, le traitement des événements du journal peut être repris événement par événement, ce qui vous permet finalement de détecter des filigranes sans avoir besoin de mettre en cache les événements du journal. Le traitement du journal n'est suspendu que pendant une courte période, car les étapes 2 à 4 devraient être rapides: la mise à jour des filigranes est une opération d'écriture unique et SELECT est effectué avec une restriction.

Dès que le filigrane supérieur est reçu à l'étape 7, les lignes non conflictuelles du bloc sont transmises à la sortie dans l'ordre où elles ont été reçues. Il s'agit d'une opération non bloquante, car le programme d'écriture s'exécute dans un thread distinct, ce qui vous permet de reprendre rapidement le traitement du journal après l'étape 7. Après cela, le traitement du journal se poursuit pour les événements qui se produisent après le filigrane supérieur.

La figure 2c montre l'ordre d'enregistrement pour le bloc entier en utilisant le même exemple que sur les figures 2a et 2b. Les événements du journal qui apparaissent avant le filigrane supérieur sont enregistrés en premier. Ensuite, les lignes restantes du résultat du bloc (magenta). Et enfin, les événements qui se produisent après le filigrane supérieur sont enregistrés.


Figure 2c - L'ordre d'enregistrement de la sortie. Alternance de bande avec vidage.

Bases de données prises en charge


Pour utiliser DBLog, la base de données doit fournir un journal des modifications sous forme d'historique linéaire des modifications validées avec des lectures non périmées. Ces conditions sont remplies par des systèmes tels que MySQL, PostgreSQL, MariaDB, etc., de sorte que le framework peut être utilisé de la même manière avec ces bases de données.
Jusqu'à présent, nous avons ajouté la prise en charge de MySQL et PostgreSQL. Chaque base de données utilise ses propres bibliothèques pour recevoir les événements de journal, car chacune utilise un protocole propriétaire. Pour MySQL, nous utilisons shyiko / mysql-binlog-connector , qui implémente le protocole de réplication binlog. Pour PostgreSQL, il existe des emplacements de réplication avec le plugin wal2json . Les modifications sont acceptées via le protocole de réplication en continu, qui est implémenté par le pilote jdbcPostgreSQL La définition de schéma pour chaque modification capturée est différente dans MySQL et PostgreSQL. Dans PostgreSQL, wal2json contient des noms, des types de colonnes et des valeurs. Pour MySQL, les modifications de schéma doivent être suivies en tant qu'événements binlog.

Le traitement de vidage a été effectué en utilisant SQL et JDBC, ne nécessitant que la mise en œuvre de la sélection de bloc et la mise à jour du filigrane. Pour MySQL et PostgreSQL, le même code est utilisé, qui peut être utilisé pour d'autres bases de données similaires. Le traitement de vidage lui-même ne dépend pas de SQL ou JDBC et vous permet d'utiliser des bases de données qui répondent aux exigences de DBLog, même si elles utilisent des normes différentes.


Figure 3 - Architecture DBLog de haut niveau.

La haute disponibilité


DBLog utilise une architecture active-passive à nœud unique. Une instance est active (principale), tandis que les autres sont passives (veille). Pour sélectionner l'hôte, nous utilisons Zookeeper. Un bail est utilisé pour le nœud maître, qui doit être mis à jour périodiquement pour continuer à être le maître. En cas de résiliation du renouvellement du bail, les fonctions du chef de file sont transférées vers un autre nœud. Actuellement, nous déployons une copie pour chaque AZ (zone de disponibilité, nous avons généralement 3 AZ), donc si un AZ tombe, alors la copie dans un autre AZ peut continuer le traitement avec un temps d'arrêt total minimum. Les instances de sauvegarde peuvent être situées dans différentes régions, mais il est recommandé de travailler dans la même région que l'hôte de base de données pour fournir une faible latence pour la capture des modifications.

Utilisation en production


DBLog est la base des connecteurs MySQL et PostgreSQL utilisés dans Delta . Depuis 2018, Delta est utilisé en production pour la synchronisation des entrepôts de données et le traitement des événements dans les applications studio Netflix. Les connecteurs Delta utilisent leur sérialiseur d'événements. Les flux spécifiques à Netflix tels que Keystone sont utilisés comme sortie .


Figure 4 - Connecteur Delta.

En plus de Delta, DBLog est également utilisé chez Netflix pour créer des connecteurs pour d'autres plates-formes de mouvement de données qui ont leurs propres formats de données.

rester avec nous


DBLog possède des fonctionnalités supplémentaires qui ne sont pas traitées dans cet article, telles que:

  • Possibilité d'obtenir des schémas de table sans utiliser de verrous.
  • Intégration avec le stockage de schéma. Pour chaque événement, le schéma est stocké dans le stockage, dont le lien est indiqué dans la charge utile de l'événement.
  • Mode d'écriture monotone. S'assurer qu'après avoir enregistré l'état d'une ligne particulière, son état passé ne peut pas être écrasé. Ainsi, les consommateurs ne reçoivent des changements d'état que vers l'avant, sans se déplacer d'avant en arrière dans le temps.

Nous prévoyons d'ouvrir le code source DBLog en 2020 et d'y inclure de la documentation supplémentaire.

Remerciements


Nous tenons à remercier les personnes suivantes pour leur contribution au développement de DBLog: Josh Snyder , Raghuram Onti Srinivasan , Tharanga Gamaethige et Yun Wang .

Références


[1] Das, Shirshanka et al. «Tous à bord du Databus!: La plateforme évolutive de capture de données à changement cohérent de Linkedin.» Troisième Symposium ACM sur le Cloud Computing. ACM, 2012
[2] «À propos de Change Data Capture (SQL Server)» , Microsoft SQL docs, 2019
[3] Kleppmann, Martin, «Utilisation des journaux pour créer une infrastructure de données solide (ou: pourquoi les doubles écritures sont une mauvaise idée)» , Confluent, 2015
[4] Kleppmann, Martin, Alastair R. Beresford, Boerge Svingen. "Traitement des événements en ligne." Communications de l'ACM 62.5 (2019): 43–49
[5] https://debezium.io/documentation/reference/0.10/connectors/mysql.html#snapshots


En savoir plus sur le cours.

All Articles