DBLog - ein gemeinsames Framework für die Erfassung von Änderungsdaten

Hallo alle zusammen! Wir bieten Ihnen an, die Übersetzung des Artikels zu lesen, den wir speziell für Studenten des Kurses "High Load Architect" vorbereitet haben .




Einführung


Durch die Verfolgung von Datenänderungen (Change Data Capture, CDC) können Sie die festgeschriebenen Änderungen in der Datenbank in Echtzeit empfangen und an verschiedene Verbraucher verteilen [1] [2]. CDC wird immer beliebter, wenn eine Synchronisation zwischen heterogenem Data Warehousing erforderlich ist (z. B. MySQL und ElasticSearch) und eine Alternative zu herkömmlichen Methoden wie Dual-Write- und verteilten Transaktionen darstellt [3] [4].

Die Quelle für die CDC in Datenbanken wie MySQL und PostgreSQL ist das Transaktionsprotokoll (Transaktionsprotokoll). Da Transaktionsprotokolle normalerweise abgeschnitten werden, enthalten sie möglicherweise nicht den gesamten Änderungsverlauf. Um den vollständigen Status der Quelle zu erhalten, benötigen wir daher Dumps. Wir haben mehrere Open-Source-CDC-Projekte untersucht, die häufig dieselben Bibliotheken, Datenbank-APIs und Protokolle verwenden, und dabei eine Reihe von Einschränkungen festgestellt, die unseren Anforderungen nicht entsprachen. Beispiel: Stoppen der Verarbeitung von Protokollereignissen bis zum Abschluss des Speicherauszugs (vollständiger Datenschnappschuss), Unfähigkeit, Speicherauszug bei Bedarf oder Implementierung zu initiieren, was sich auf den Schreibverkehr aufgrund der Verwendung von Tabellensperren auswirkt.

Dies führte uns zur Entwicklung von DBLog.mit einem einheitlichen Ansatz für die Verarbeitung von Protokollen und Dumps. Um dies zu unterstützen, sollten im DBMS eine Reihe von Funktionen implementiert werden, die sich bereits in MySQL, PostgreSQL, MariaDB und einer Reihe anderer Datenbanken befinden.

Einige der Funktionen von DBLog:

  • Protokollereignisse werden in der Reihenfolge ihres Auftretens verarbeitet.
  • Dumps können jederzeit für alle Tabellen, für eine Tabelle oder für bestimmte Primärschlüssel einer Tabelle erstellt werden.
  • Die Protokollverarbeitung wechselt mit der Speicherauszugsverarbeitung und unterteilt den Speicherauszug in Blöcke. Somit kann die Protokollverarbeitung parallel zur Speicherauszugsverarbeitung erfolgen. Wenn der Vorgang beendet ist, kann er nach dem letzten abgeschlossenen Block fortgesetzt werden, ohne dass ein erneuter Start erforderlich ist. Außerdem können Sie den Durchsatz beim Erstellen eines Speicherauszugs anpassen und bei Bedarf die Erstellung anhalten.
  • , .
  • : , API.
  • . , .


Zuvor haben wir Delta ( Übersetzung ) besprochen , eine Plattform zur Datenanreicherung und -synchronisation. Das Ziel von Delta ist die Synchronisierung mehrerer Datenspeicher, von denen einer primär ist (z. B. MySQL), und die anderen Derivate (z. B. ElasticSearch). Eine der wichtigsten Entwicklungsanforderungen war eine geringe Verzögerung bei der Weitergabe von Änderungen von der Quelle an die Empfänger sowie eine hohe Verfügbarkeit des Ereignisstroms. Diese Bedingungen gelten unabhängig davon, ob alle Data Warehouses von einem Team verwendet werden oder ob ein Team die Daten besitzt und das andere sie verwendet. In einem Artikel über Delta ( Übersetzung ) haben wir auch Anwendungsfälle beschrieben, die über die Datensynchronisation hinausgehen, z. B. die Ereignisverarbeitung.

Um Daten zu synchronisieren und Ereignisse zu verarbeiten, müssen wir nicht nur Änderungen in Echtzeit verfolgen können, sondern auch die folgenden Anforderungen erfüllen:

  • Den vollen Zustand erhalten . Abgeleitete Speicher (wie ElasticSearch) sollten letztendlich den vollständigen Status der Quelle speichern. Wir implementieren dies durch Dumps der ursprünglichen Datenbank.
  • Starten Sie die Statuswiederherstellung jederzeit. Anstatt den Speicherauszug nur für die Primärinitialisierung als einmalige Operation zu betrachten, können wir dies jederzeit tun: für alle Tabellen, für eine Tabelle oder für bestimmte Primärschlüssel. Dies ist sehr wichtig für die Wiederherstellung von Verbrauchern bei Verlust oder Beschädigung von Daten.
  • - . . , (, ). - . , - .
  • . . API, , , . , , , .
  • . Netflix , Kafka, SQS, Kinesis, Netflix, Keystone. , (, ), (, ). . API.
  • . Netflix , (MySQL, PostgreSQL) AWS RDS. .



Wir haben mehrere vorhandene Open Source-Lösungen untersucht, darunter Maxwell , SpinalTap , Yelp MySQL Streamer und Debezium . In Bezug auf die Datenerfassung arbeiten alle auf ähnliche Weise mithilfe eines Transaktionsprotokolls. Verwenden Sie beispielsweise das binlog-Replikationsprotokoll in MySQL oder die Replikationssteckplätze in PostgreSQL.

Bei der Verarbeitung von Dumps gelten jedoch mindestens eine der folgenden Einschränkungen:

  • Beenden Sie die Verarbeitung von Protokollereignissen, während Sie einen Speicherauszug erstellen . Wenn der Speicherauszug groß ist, wird die Verarbeitung von Protokollereignissen daher für einen langen Zeitraum gestoppt. Dies ist ein Problem, wenn sich die Verbraucher auf kleine Verzögerungen bei der Verbreitung der Änderungen verlassen.
  • . . (, ElasticSearch) .
  • . . [5]. . , . . , PostgreSQL RDS .
  • Verwendung spezifischer Datenbankfunktionen . Wir haben festgestellt, dass einige Lösungen zusätzliche Datenbankfunktionen verwenden, die nicht auf allen Systemen vorhanden sind. Verwenden Sie beispielsweise die Blackhole-Engine in MySQL oder erstellen Sie eine konsistente Momentaufnahme der Speicherauszüge über Replikationssteckplätze in PostgreSQL. Dies begrenzt die Wiederverwendung von Code zwischen verschiedenen Datenbanken.

Am Ende haben wir uns für einen anderen Ansatz bei der Arbeit mit Müllkippen entschieden:

  • alternative Protokoll- und Speicherauszugsereignisse, damit sie zusammen ausgeführt werden können;
  • Starten Sie jederzeit einen Dump.
  • Verwenden Sie keine Tischschlösser
  • Verwenden Sie keine spezifischen Datenbankfunktionen.

DBLog Framework


DBLog ist ein Java-Framework zum Empfangen von Dumps und Änderungen in Echtzeit. Dumps werden in Teilen ausgeführt, so dass sie sich mit Echtzeitereignissen abwechseln und ihre Verarbeitung nicht über einen langen Zeitraum verzögern. Dumps können jederzeit über die API erstellt werden. Auf diese Weise können Verbraucher in der Initialisierungsphase oder später den vollständigen Status der Datenbank für die Notfallwiederherstellung abrufen.

Beim Entwerfen des Frameworks haben wir darüber nachgedacht, die Auswirkungen auf die Datenbank zu minimieren. Dumps können angehalten und bei Bedarf fortgesetzt werden. Dies funktioniert sowohl für die Wiederherstellung nach einem Absturz als auch für das Stoppen, wenn die Datenbank zu einem Engpass geworden ist. Wir sperren auch keine Tabellen, um die Schreibvorgänge nicht zu beeinträchtigen.

Mit DBLog können Sie Ereignisse in verschiedenen Formen aufzeichnen, einschließlich in einer anderen Datenbank oder über die API. Zum Speichern des Status, der mit der Verarbeitung von Protokollen und Speicherauszügen verbunden ist, sowie zum Auswählen des Hauptknotens verwenden wir Zookeeper. Beim Erstellen von DBLog haben wir die Möglichkeit implementiert, verschiedene Plugins zu verbinden, sodass Sie die Implementierung nach Ihren Wünschen ändern können (z. B. Zookeeper durch etwas anderes ersetzen).

Als nächstes betrachten wir die Verarbeitung von Protokollen und Dumps genauer.

Protokolle


Das Framework erfordert, dass die Datenbank Ereignisse für jede geänderte Zeile in Echtzeit aufzeichnet, während die Reihenfolge der Festschreibungen beibehalten wird. Als Quelle dieser Ereignisse wird das Transaktionsprotokoll angenommen. Die Datenbank sendet sie über einen Transport, den DBLog verwenden kann. Für diesen Transport verwenden wir den Begriff "Änderungsprotokoll". Es gibt folgende Arten von Ereignissen: Erstellen, Erstellen, Aktualisieren oder Löschen. Für jedes Ereignis müssen die folgenden Informationen bereitgestellt werden: die Sequenznummer im Protokoll (Protokollsequenznummer), der Status der Spalte während der Operation und das Schema, das zum Zeitpunkt der Ausführung der Operation angewendet wurde.

Jede Änderung wird in das DBLog-Ereignisformat serialisiert und zur weiteren Übertragung an die Ausgabe an den Writer gesendet. Das Senden von Ereignissen an den Writer ist eine nicht blockierende Operation, da der Writer in einem separaten Thread ausgeführt wird und Ereignisse im internen Puffer sammelt. Gepufferte Ereignisse werden in der Reihenfolge gesendet, in der sie empfangen wurden. Mit dem Framework können Sie einen benutzerdefinierten Formatierer verbinden, um Ereignisse in ein beliebiges Format zu serialisieren. Die Ausgabe ist eine einfache Schnittstelle, über die Sie eine Verbindung zu einem beliebigen Empfänger herstellen können, z. B. einem Stream, einem Data Warehouse oder sogar einer API.

Dumps


Speicherauszüge sind erforderlich, da Transaktionsprotokolle eine begrenzte Speicherzeit haben, sodass sie nicht zur Wiederherstellung des vollständigen Originaldatensatzes verwendet werden können. Dumps werden in Blöcken (Chunk) erstellt, sodass sie sich mit Protokollereignissen abwechseln können und gleichzeitig verarbeitet werden können. Für jede ausgewählte Zeile des Blocks wird ein Ereignis im gleichen Format wie das Protokollereignis generiert und serialisiert. Somit muss sich der Verbraucher keine Sorgen machen, dass ein Ereignis aus dem Protokoll oder dem Speicherauszug stammt. Sowohl Protokollereignisse als auch Speicherauszugsereignisse werden über denselben Writer an die Ausgabe gesendet.

Speicherauszüge können jederzeit über die API für alle Tabellen, eine Tabelle oder für bestimmte Primärschlüssel der Tabelle geplant werden. Ein Dump der Tabelle wird von Blöcken einer bestimmten Größe ausgeführt. Sie können auch eine Verzögerung bei der Verarbeitung neuer Blöcke konfigurieren, sodass derzeit nur Protokollereignisse zulässig sind. Die Blockgröße und Verzögerung ermöglichen es Ihnen, die Verarbeitung von Protokoll- und Speicherauszugsereignissen auszugleichen. Beide Einstellungen können zur Laufzeit geändert werden.

Blöcke (Chunk) werden ausgewählt, indem die Tabelle in aufsteigender Reihenfolge des Primärschlüssels sortiert und Zeilen ausgewählt werden, in denen der Primärschlüssel größer als der letzte Primärschlüssel des vorherigen Blocks ist. Die Datenbank muss diese Abfrage effizient ausführen. Dies gilt normalerweise für Systeme, die einen Bereichsscan für einen Bereich von Primärschlüsseln implementieren.


Abbildung 1. Tabellenaufschlüsselung mit 4 Spalten c1-c4 und c1 als Primärschlüssel (pk). Der Primärschlüssel eines Integer-Typs, Blockgröße 3. Block 2 wird durch die Bedingung c1> 4 ausgewählt.

Blöcke müssen so genommen werden, dass die Verarbeitung von Protokollereignissen nicht über einen langen Zeitraum verzögert wird und der Änderungsverlauf gespeichert wird, damit die ausgewählte Zeile mit dem alten Wert die neuere nicht überschreiben kann Veranstaltung.

Um Blöcke nacheinander auswählen zu können, erstellen wir im Änderungsprotokoll erkennbare „Wasserzeichen“. Wasserzeichen werden über eine Tabelle in der Quellendatenbank implementiert. Diese Tabelle wird in einem speziellen Namespace gespeichert, damit keine Konflikte mit den Anwendungstabellen auftreten. Darin ist nur eine Zeile mit einem UUID-Wert gespeichert. Ein Wasserzeichen wird erstellt, wenn sich dieser Wert in eine bestimmte UUID ändert. Das Aktualisieren der Zeile führt zu einem Änderungsereignis, das wir letztendlich über das Änderungsprotokoll erhalten.

Dumps mit Wasserzeichen werden wie folgt erstellt:

  1. Wir setzen die Verarbeitung von Protokollereignissen vorübergehend aus.
  2. Generieren Sie ein "niedriges" Wasserzeichen, indem Sie die Wasserzeichentabelle aktualisieren.
  3. Wir starten SELECT für den nächsten Block und speichern das vom Primärschlüssel indizierte Ergebnis im Speicher.
  4. “” (high) , .
  5. . .
  6. , .
  7. , , .
  8. , 1.

SELECT soll einen Status zurückgeben, der eine festgeschriebene Änderung bis zu einem bestimmten Punkt in der Geschichte darstellt. Oder entsprechend dem Folgenden: SELECT wird an einer bestimmten Position des Änderungsprotokolls ausgeführt, wobei Änderungen bis zu diesem Punkt berücksichtigt werden. Datenbanken liefern normalerweise keine Informationen über die Ausführungszeit eines SELECT (mit Ausnahme von MariaDB ).

Die Hauptidee unseres Ansatzes ist, dass wir im Änderungsprotokoll ein Fenster definieren, das die Beibehaltung der SELECT-Position im Block garantiert. Das Fenster wird durch Schreiben des unteren Wasserzeichens geöffnet, wonach SELECT ausgeführt wird, und das Fenster wird durch Schreiben des oberen Wasserzeichens geschlossen. Da die genaue Position von SELECT unbekannt ist, werden alle ausgewählten Zeilen gelöscht, die mit den Protokollereignissen in diesem Fenster in Konflikt stehen. Dadurch wird sichergestellt, dass der Verlauf im Änderungsprotokoll nicht neu geschrieben wird.

Damit dies funktioniert, muss SELECT den Status der Tabelle ab dem Moment des unteren Wasserzeichens oder später lesen (es ist zulässig, die Änderungen einzuschließen, die nach dem unteren Wasserzeichen und vor dem Lesen vorgenommen wurden). Im Allgemeinen ist SELECT erforderlich, um Änderungen anzuzeigen, die vor der Ausführung vorgenommen wurden.. Wir nennen es "nicht veraltete Lesungen". Da das obere Wasserzeichen danach geschrieben wird, ist außerdem garantiert, dass SELECT davor ausgeführt wird.

Die 2a und 2b veranschaulichen den Blockauswahlalgorithmus. Als Beispiel geben wir eine Tabelle mit Primärschlüsseln von k1 bis k6 an. Jeder Eintrag im Änderungsprotokoll repräsentiert ein Ereignis zum Erstellen, Aktualisieren oder Löschen des Primärschlüssels. Abbildung 2a zeigt die Erzeugung von Wasserzeichen und die Blockauswahl (Schritte 1 bis 4). Durch Aktualisieren der Wasserzeichentabelle in den Schritten 2 und 4 werden zwei Änderungsereignisse (Magenta) erstellt, die letztendlich über das Protokoll empfangen werden. In Abbildung 2b konzentrieren wir uns auf die Zeilen des aktuellen Blocks, die mit Primärschlüsseln zwischen den Wasserzeichen aus der Ergebnismenge entfernt werden (Schritte 5 bis 7).


Abbildung 2a - Wasserzeichenalgorithmus für die Blockauswahl (Schritte 1–4).


Abbildung 2b - Wasserzeichenalgorithmus zur Auswahl von Blöcken (Schritte 5–7).

Bitte beachten Sie, dass zwischen dem unteren und oberen Wasserzeichen eine große Anzahl von Ereignissen im Protokoll angezeigt werden kann, wenn eine oder mehrere Transaktionen viele Zeilenänderungen vorgenommen haben. Aus diesem Grund setzen wir die Verarbeitung des Protokolls in den Schritten 2 bis 4 kurzfristig aus, um die Wasserzeichen nicht zu übersehen. Somit kann die Verarbeitung von Protokollereignissen Ereignis für Ereignis fortgesetzt werden, wodurch Sie letztendlich Wasserzeichen erkennen können, ohne Protokollereignisse des Protokolls zwischenspeichern zu müssen. Die Protokollverarbeitung wird nur für kurze Zeit angehalten, da die Schritte 2 bis 4 voraussichtlich schnell ausgeführt werden: Das Aktualisieren von Wasserzeichen ist eine einzelne Schreiboperation, und SELECT wird mit einer Einschränkung ausgeführt.

Sobald das obere Wasserzeichen in Schritt 7 empfangen wird, werden die nicht widersprüchlichen Zeilen des Blocks in der Reihenfolge, in der sie empfangen wurden, an den Ausgang gesendet. Dies ist ein nicht blockierender Vorgang, da der Writer in einem separaten Thread ausgeführt wird, mit dem Sie die Verarbeitung des Protokolls nach Schritt 7 schnell fortsetzen können. Danach wird die Verarbeitung des Protokolls für Ereignisse fortgesetzt, die nach dem oberen Wasserzeichen auftreten.

Abbildung 2c zeigt die Aufzeichnungsreihenfolge für den gesamten Block anhand des gleichen Beispiels wie in den Abbildungen 2a und 2b. Ereignisse im Protokoll, die vor dem oberen Wasserzeichen angezeigt werden, werden zuerst aufgezeichnet. Dann ergeben sich die restlichen Zeilen aus dem Block (Magenta). Und schließlich werden Ereignisse aufgezeichnet, die nach dem oberen Wasserzeichen auftreten.


Abbildung 2c - Die Reihenfolge der Aufzeichnung der Ausgabe. Streifenwechsel mit Dump.

Unterstützte Datenbanken


Um DBLog verwenden zu können, muss die Datenbank ein Änderungsprotokoll als linearen Verlauf festgeschriebener Änderungen mit nicht veralteten Lesevorgängen bereitstellen. Diese Bedingungen werden von Systemen wie MySQL, PostgreSQL, MariaDB usw. erfüllt, sodass das Framework auf die gleiche Weise mit diesen Datenbanken verwendet werden kann.
Bisher haben wir Unterstützung für MySQL und PostgreSQL hinzugefügt. Jede Datenbank verwendet ihre eigenen Bibliotheken, um Protokollereignisse zu empfangen, da jede von ihnen ein proprietäres Protokoll verwendet. Für MySQL verwenden wir den Shyiko / MySQL -Binlog-Connector , der das Binlog-Replikationsprotokoll implementiert. Für PostgreSQL gibt es Replikations-Slots mit dem wal2json- Plugin . Änderungen werden über das Streaming-Replikationsprotokoll akzeptiert, das vom JDBC-Treiber implementiert wirdPostgreSQL Die Schemadefinition für jede erfasste Änderung unterscheidet sich in MySQL und PostgreSQL. In PostgreSQL enthält wal2json Namen, Spaltentypen und Werte. Für MySQL sollten Schemaänderungen als Binlog-Ereignisse verfolgt werden.

Die Speicherauszugsverarbeitung wurde mit SQL und JDBC durchgeführt, wobei nur die Blockauswahl implementiert und das Wasserzeichen aktualisiert werden musste. Für MySQL und PostgreSQL wird derselbe Code verwendet, der für andere ähnliche Datenbanken verwendet werden kann. Die Speicherauszugsverarbeitung selbst ist nicht von SQL oder JDBC abhängig und ermöglicht die Verwendung von Datenbanken, die die Anforderungen von DBLog erfüllen, auch wenn sie unterschiedliche Standards verwenden.


Abbildung 3 - Hochrangige DBLog-Architektur

Hohe Verfügbarkeit


DBLog verwendet eine Aktiv-Passiv-Architektur mit einem Knoten. Eine Instanz ist aktiv (führend), während die anderen passiv sind (Standby). Zur Auswahl des Hosts verwenden wir Zookeeper. Für den Masterknoten wird ein Lease verwendet, der regelmäßig aktualisiert werden muss, um weiterhin der Master zu sein. Im Falle der Beendigung der Verlängerung des Mietvertrags werden die Funktionen des Leiters auf einen anderen Knoten übertragen. Derzeit stellen wir eine Kopie für jede AZ bereit (Verfügbarkeitszone, normalerweise haben wir 3 AZ). Wenn also eine AZ fällt, kann die Kopie in einer anderen AZ mit einer minimalen Gesamtausfallzeit weiter verarbeitet werden. Sicherungsinstanzen können sich in verschiedenen Regionen befinden. Es wird jedoch empfohlen, dass Sie in derselben Region wie der Datenbankhost arbeiten, um eine geringe Latenz für die Erfassung von Änderungen zu gewährleisten.

Verwendung in der Produktion


DBLog ist die Basis für die in Delta verwendeten MySQL- und PostgreSQL-Konnektoren . Seit 2018 wird Delta in der Produktion zur Synchronisierung von Data Warehouses und zur Ereignisverarbeitung in Netflix Studio-Anwendungen eingesetzt. Delta-Konnektoren verwenden ihren Ereignis-Serializer. Netflix-spezifische Streams wie Keystone werden als Ausgabe verwendet .


Abbildung 4 - Delta-Anschluss.

Neben Delta wird DBLog bei Netflix auch zum Erstellen von Konnektoren für andere Datenbewegungsplattformen verwendet, die über eigene Datenformate verfügen.

Bleib bei uns


DBLog verfügt über zusätzliche Funktionen, die in diesem Artikel nicht behandelt werden, z.

  • Möglichkeit, Tabellenschemata ohne Verwendung von Sperren abzurufen.
  • Integration in den Schemaspeicher. Für jedes Ereignis wird das Schema im Speicher gespeichert, dessen Verknüpfung in der Ereignisnutzlast angegeben ist.
  • Monotoner Schreibmodus. Stellen Sie sicher, dass nach dem Speichern des Status einer bestimmten Zeile der vergangene Status nicht überschrieben werden kann. Somit erhalten Verbraucher Zustandsänderungen nur in Vorwärtsrichtung, ohne sich rechtzeitig hin und her zu bewegen.

Wir planen, den DBLog-Quellcode im Jahr 2020 zu öffnen und zusätzliche Dokumentation hinzuzufügen.

Danksagung


Wir möchten den folgenden Personen für ihren Beitrag zur Entwicklung von DBLog danken: Josh Snyder , Raghuram Onti Srinivasan , Tharanga Gamaethige und Yun Wang .

Verweise


[1] Das, Shirshanka et al. "Alle an Bord des Databus !: Linkedins skalierbare Plattform zur Erfassung konsistenter Änderungsdaten." Drittes ACM-Symposium zum Thema Cloud Computing. ACM, 2012
[2] „Über Change Data Capture (SQL Server)“ , Microsoft SQL-Dokumente, 2019
[3] Kleppmann, Martin, „Verwenden von Protokollen zum Aufbau einer soliden Dateninfrastruktur (oder: Warum Dual Writes eine schlechte Idee sind)“ , Confluent, 2015
[4] Kleppmann, Martin, Alastair R. Beresford, Boerge Svingen. "Online-Ereignisverarbeitung." Mitteilungen des ACM 62.5 (2019): 43–49
[5] https://debezium.io/documentation/reference/0.10/connectors/mysql.html#snapshots


Erfahren Sie mehr über den Kurs.

All Articles