Delta: Plattform für Datensynchronisation und -anreicherung

Im Vorgriff auf den Start eines neuen Streams im Data Engineer- Kurs haben wir eine Übersetzung von interessantem Material vorbereitet.






Überblick


Wir werden über ein ziemlich beliebtes Muster sprechen, nach dem Anwendungen mehrere Datenspeicher verwenden, wobei jeder Speicher für seine eigenen Zwecke verwendet wird, um beispielsweise die kanonische Form von Daten (MySQL usw.) zu speichern und erweiterte Suchfunktionen (ElasticSearch usw.) bereitzustellen. .), Caching (Memcached usw.) und andere. Wenn Sie mehrere Datenspeicher verwenden, fungiert normalerweise einer als Hauptspeicher und der andere als Derivatspeicher. Das einzige Problem ist, wie diese Datenspeicher synchronisiert werden.

Wir haben uns verschiedene Muster angesehen, mit denen versucht wurde, das Problem der Synchronisierung mehrerer Repositorys zu lösen, z. B. doppelte Eingabe, verteilte Transaktionen usw. Diese Ansätze weisen jedoch erhebliche Einschränkungen hinsichtlich der tatsächlichen Verwendung, Zuverlässigkeit und Wartung auf. Zusätzlich zur Datensynchronisation müssen einige Anwendungen Daten durch Aufrufen externer Dienste anreichern.

Um diese Probleme zu lösen, wurde Delta entwickelt. Delta ist letztendlich eine konsistente, ereignisgesteuerte Plattform zum Synchronisieren und Anreichern von Daten.

Bestehende Lösungen


Doppelte Einreise


Um zwei Datenspeicher zu synchronisieren, können Sie die doppelte Aufzeichnung verwenden, die in einen Speicher schreibt und dann sofort in einen anderen schreibt. Der erste Datensatz kann wiederholt werden, und der zweite kann unterbrochen werden, wenn der erste nach Erschöpfung der Anzahl der Versuche fehlschlägt. Zwei Datenspeicher werden jedoch möglicherweise nicht mehr synchronisiert, wenn das Schreiben in den zweiten Speicher fehlschlägt. Dieses Problem wird normalerweise gelöst, indem ein Wiederherstellungsverfahren erstellt wird, mit dem Daten regelmäßig vom ersten Speicher in den zweiten Speicher übertragen werden können, oder dies nur, wenn Unterschiede in den Daten festgestellt werden.

Probleme:

Das Durchführen eines Wiederherstellungsvorgangs ist ein bestimmter Job, der nicht wiederverwendet werden kann. Darüber hinaus bleiben die Daten zwischen den Speichern nicht synchron, bis der Wiederherstellungsvorgang abgeschlossen ist. Die Lösung ist kompliziert, wenn mehr als zwei Datenspeicher verwendet werden. Schließlich kann das Wiederherstellungsverfahren die ursprüngliche Datenquelle belasten.

Protokolltabelle ändern


Wenn Änderungen in einer Reihe von Tabellen auftreten (z. B. Einfügen, Aktualisieren und Löschen von Datensätzen), werden Änderungsdatensätze als Teil derselben Transaktion zur Protokolltabelle hinzugefügt. Ein anderer Thread oder Prozess fordert ständig Ereignisse aus der Protokolltabelle an und schreibt sie in einen oder mehrere Datenspeicher, wenn Ereignisse nach Bestätigung des Datensatzes durch alle Speicher aus der Protokolltabelle gelöscht werden müssen.

Probleme:

Dieses Muster sollte als Bibliothek implementiert werden und im Idealfall ohne den Anwendungscode zu ändern, der es verwendet. In einer polyglotten Umgebung muss die Implementierung einer solchen Bibliothek in einer beliebigen Sprache vorhanden sein, es ist jedoch sehr schwierig, die Koordination der Funktionen und des Verhaltens zwischen den Sprachen sicherzustellen.

Ein weiteres Problem besteht darin, Schemaänderungen in Systemen zu erhalten, die keine Transaktionsschemaänderungen unterstützen [1] [2], wie z. B. MySQL. Daher funktioniert eine Vorlage zum Vornehmen einer Änderung (z. B. zum Ändern eines Schemas) und zum Schreiben in die Änderungsprotokolltabelle nicht immer.

Verteilte Transaktionen


Verteilte Transaktionen können verwendet werden, um eine Transaktion auf mehrere heterogene Datenspeicher aufzuteilen, sodass der Vorgang entweder in allen verwendeten Speichern festgeschrieben oder in keinem von ihnen festgeschrieben wird.

Probleme:

Verteilte Transaktionen sind ein sehr großes Problem für heterogene Data Warehouses. Sie können sich naturgemäß nur auf den kleinsten gemeinsamen Nenner der beteiligten Systeme verlassen. Beispielsweise blockieren XA-Transaktionen die Ausführung, wenn während des Vorbereitungsprozesses ein Fehler auftritt. Darüber hinaus bietet XA keine Deadlock-Erkennung und unterstützt keine optimistischen Parallelitätsverwaltungsschemata. Darüber hinaus unterstützen einige Systeme wie ElasticSearch weder XA noch ein anderes heterogenes Transaktionsmodell. Daher bleibt es für Anwendungen eine sehr schwierige Aufgabe, die Atomizität der Aufzeichnung in verschiedenen Datenspeichertechnologien sicherzustellen [3].

Delta


Delta wurde entwickelt, um die Einschränkungen bestehender Datensynchronisationslösungen zu beseitigen und Daten im laufenden Betrieb anzureichern. Unser Ziel war es, all diese komplexen Punkte von Anwendungsentwicklern zu abstrahieren, damit sie sich voll und ganz auf die Implementierung von Geschäftsfunktionen konzentrieren können. Als nächstes beschreiben wir "Movie Search", den eigentlichen Anwendungsfall für Netflix's Delta.

Netflix nutzt die Microservice-Architektur in großem Umfang und jeder Microservice bedient normalerweise einen Datentyp. Die Hauptinformationen über den Film werden in einem Mikrodienst namens Movie Service herausgenommen, und verwandte Daten, wie Informationen über Produzenten, Schauspieler, Anbieter usw., werden von mehreren anderen Mikrodiensten verwaltet (nämlich Deal Service, Talent Service und Vendor Service).
Geschäftsanwender in Netflix Studios müssen häufig nach verschiedenen Kriterien nach Filmen suchen, weshalb es für sie sehr wichtig ist, alle Daten zu Filmen durchsuchen zu können.

Vor Delta musste das Filmsuche-Team Daten von mehreren Microservices abrufen, bevor Filmdaten indiziert werden konnten. Darüber hinaus musste das Team ein System entwickeln, das den Suchindex regelmäßig aktualisiert und Änderungen von anderen Microservices anfordert, auch wenn überhaupt keine Änderungen vorgenommen wurden. Dieses System wurde schnell von Komplexität überwachsen und schwer zu warten.


Abbildung 1. Abfragesystem vor Delta
Nach der Verwendung von Delta wurde das System zu einem ereignisgesteuerten System vereinfacht, wie in der folgenden Abbildung dargestellt. CDC-Ereignisse (Change-Data-Capture) werden über den Delta-Connector an Keystone Kafka-Themen gesendet. Eine Delta-Anwendung, die mit dem Delta Stream Processing Framework (basierend auf Flink) erstellt wurde, empfängt CDC-Ereignisse aus dem Thema, bereichert sie, ruft andere Microservices auf und übergibt die angereicherten Daten schließlich an den Suchindex in Elasticsearch. Der gesamte Prozess findet fast in Echtzeit statt, dh sobald Änderungen im Data Warehouse erfasst werden, werden die Suchindizes aktualisiert.


Abbildung 2. Datenpipeline mit Delta
In den folgenden Abschnitten beschreiben wir die Arbeit des Delta-Connector, der eine Verbindung zum Repository herstellt und CDC-Ereignisse auf Transportebene veröffentlicht. Hierbei handelt es sich um eine Echtzeit-Datenübertragungsinfrastruktur, die CDC-Ereignisse zu Kafka-Themen leitet. Ganz am Ende werden wir über das Delta-Stream-Verarbeitungsframework sprechen, das Anwendungsentwickler für die Verarbeitungs- und Anreicherungslogik verwenden können.

CDC (Change-Data-Capture)


Wir haben einen CDC-Dienst namens Delta-Connector entwickelt, der festgeschriebene Änderungen aus dem Datenspeicher in Echtzeit erfassen und in den Stream schreiben kann. Änderungen in Echtzeit werden aus dem Transaktionsprotokoll und den Speicherabbildern übernommen. Dumps werden verwendet, da in Transaktionsprotokollen normalerweise nicht der gesamte Änderungsverlauf gespeichert wird. Änderungen werden normalerweise als Delta-Ereignisse serialisiert, sodass sich der Empfänger keine Gedanken darüber machen muss, woher die Änderung stammt.

Delta-Connector unterstützt mehrere zusätzliche Funktionen, wie z.

  • Möglichkeit, benutzerdefinierte Ausgaben nach Kafka zu schreiben.
  • Die Möglichkeit, manuelle Speicherauszüge jederzeit für alle Tabellen, eine bestimmte Tabelle oder für bestimmte Primärschlüssel zu aktivieren.
  • Dumps können von Brocken aufgenommen werden, sodass Sie im Falle eines Fehlers nicht noch einmal von vorne beginnen müssen.
  • Es ist nicht erforderlich, Tabellen zu sperren, was sehr wichtig ist, damit der Schreibverkehr in die Datenbank von unserem Service niemals blockiert wird.
  • Hohe Verfügbarkeit aufgrund von Backups in AWS Availability Zones.

Derzeit unterstützen wir MySQL und Postgres, einschließlich Bereitstellungen für AWS RDS und Aurora. Wir unterstützen auch Cassandra (Multi-Master). Weitere Informationen zum Delta-Connector finden Sie in diesem Blog .

Kafka und Transportniveau


Die Delta Event Transport-Schicht basiert auf dem Keystone- Plattform-Messaging-Dienst .

In der Vergangenheit wurde das Posten auf Netflix eher für eine höhere Verfügbarkeit als für eine lange Lebensdauer optimiert (siehe vorherigen Artikel ). Ein Kompromiss war die mögliche Inkonsistenz der Maklerdaten in verschiedenen Grenzszenarien. Zum Beispiel ist die Wahl des unreinen Führers dafür verantwortlich, dass der Empfänger möglicherweise Ereignisse dupliziert oder verliert.

Mit Delta wollten wir stärkere Garantien für die Haltbarkeit erhalten, um die Lieferung von CDC-Ereignissen an Derivatspeicher sicherzustellen. Zu diesem Zweck haben wir einen speziell entwickelten Kafka-Cluster als Objekt der ersten Klasse vorgeschlagen. In der folgenden Tabelle können Sie sich einige Broker-Einstellungen ansehen:



In Keystone Kafka-Clustern wird normalerweise eine unsaubere Leader-Wahl aktiviert, um die Verfügbarkeit von Publishern sicherzustellen. Dies kann zu einem Nachrichtenverlust führen, wenn ein nicht synchronisiertes Replikat als Anführer ausgewählt wird. Für den neuen hochzuverlässigen Kafka-Cluster ist die Option für unsaubere Führerwahlen deaktiviert, um den Verlust von Nachrichten zu verhindern.

Wir haben auch den Replikationsfaktor von 2 auf 3 und die minimalen Insync-Replikate erhöhtvon 1 bis 2. Publisher, die in diesen Cluster schreiben, benötigen Bestätigungen von allen anderen, um sicherzustellen, dass 2 von 3 Replikaten die aktuellsten vom Publisher gesendeten Nachrichten erhalten.

Wenn die Brokerinstanz beendet wird, ersetzt die neue Instanz die alte. Der neue Broker muss jedoch nicht synchronisierte Replikate nachholen, was mehrere Stunden dauern kann. Um die Wiederherstellungszeit für dieses Szenario zu verkürzen, haben wir begonnen, den Amazon Elastic Block Store anstelle der lokalen Broker-Festplatten zu verwenden. Wenn eine neue Instanz eine abgeschlossene Brokerinstanz ersetzt, hängt sie das EBS-Volume an, über das die abgeschlossene Instanz verfügt, und beginnt, neue Nachrichten einzuholen. Dieser Prozess reduziert die Zeit zum Beseitigen des Rückstands von mehreren Stunden auf einige Minuten, da die neue Instanz nicht mehr aus einem leeren Zustand repliziert werden muss. Im Allgemeinen reduzieren separate Speicher- und Brokerlebenszyklen die Auswirkungen des Brokerwechsels erheblich.

Um die Garantie für die Datenübermittlung weiter zu erhöhen, haben wir ein Nachrichtenverfolgungssystem verwendet , um den Verlust von Nachrichten unter extremen Bedingungen (z. B. Taktsynchronisation im Abschnittsleiter) zu erkennen.

Stream-Verarbeitungs-Framework


Die Verarbeitungsebene bei Delta basiert auf der Netflix SPaaS-Plattform, die die Integration von Apache Flink in das Netflix-Ökosystem ermöglicht. Die Plattform bietet eine Benutzeroberfläche, die die Bereitstellung von Flink-Jobs und die Orchestrierung von Flink-Clustern auf unserer Titus-Containerverwaltungsplattform steuert. Die Schnittstelle verwaltet auch Jobkonfigurationen und ermöglicht es Benutzern, Konfigurationsänderungen dynamisch vorzunehmen, ohne Flink-Jobs neu kompilieren zu müssen.

Delta bietet ein Stream-Verarbeitungsframework für Flink- und SPaaS- basierte Daten, das annotationsbasiert verwendetDSL (Domain Specific Language) zur Zusammenfassung technischer Details. Um beispielsweise den Schritt zu bestimmen, um den Ereignisse durch Aufrufen externer Dienste angereichert werden, müssen Benutzer das nächste DSL schreiben, und das Framework erstellt darauf basierend ein Modell, das Flink ausführt.


Abbildung 3. Beispiel für eine DSL-Anreicherung in Delta

Das Verarbeitungsframework verkürzt nicht nur die Lernkurve, sondern bietet auch allgemeine Flussverarbeitungsfunktionen wie Deduplizierung, Schematisierung sowie Flexibilität und Fehlertoleranz zur Lösung häufiger Probleme bei der Arbeit.

Das Delta Stream Processing Framework besteht aus zwei Schlüsselmodulen, dem DSL- und API-Modul und dem Runtime-Modul. Das DSL- und API-Modul stellt die DSL- und UDF-APIs (User-Defined-Function) bereit, damit Benutzer ihre eigene Verarbeitungslogik (z. B. Filterung oder Transformationen) schreiben können. Das Runtime-Modul bietet eine Implementierung des DSL-Parsers, der eine interne Darstellung der Verarbeitungsschritte in DAG-Modellen erstellt. Die Ausführungskomponente interpretiert DAG-Modelle, um die tatsächlichen Flink-Anweisungen zu initialisieren und schließlich die Flink-Anwendung zu starten. Die Architektur des Frameworks ist in der folgenden Abbildung dargestellt.


Abbildung 4. Architektur des Delta Stream Processing Framework

Dieser Ansatz bietet mehrere Vorteile:

  • - Flink SPaaS.
  • , - (UDF).
  • Delta , , .



Delta ist seit über einem Jahr in Produktion und spielt in vielen Netflix Studio-Anwendungen eine Schlüsselrolle. Es half Teams bei der Implementierung von Anwendungsfällen wie Suchindizierung, Datenspeicherung und ereignisgesteuerten Workflows. Das Folgende ist eine Übersicht über die allgemeine Architektur der Delta-Plattform.


Abbildung 5. Die allgemeine Architektur von Delta.

Danksagung


Wir möchten den folgenden Personen danken, die zur Schaffung und Entwicklung von Delta bei Netflix beigetragen haben: Allen Wang, Charles Zhao, Jaebin Yoon, Josh Snyder, Kasturi Chatterjee, Mark Cho, Olof Johansson, Piyush Goyal, Prashanth Ramdas, Raghuram Onti Srinivasan und Sandeep Gupta , Steven Wu, Tharanga Gamaethige, Yun Wang und Zhenzhong Xu.

Quellen


  1. dev.mysql.com/doc/refman/5.7/en/implicit-commit.html
  2. dev.mysql.com/doc/refman/5.7/de/cannot-roll-back.html
  3. Martin Kleppmann, Alastair R. Beresford, Boerge Svingen: Online event processing. Commun. ACM 62(5): 43–49 (2019). DOI: doi.org/10.1145/3312527

: «Data Build Tool Amazon Redshift».

Source: https://habr.com/ru/post/undefined/


All Articles