Delta: Data Synchronization and Enrichment Platform

In anticipation of the launch of a new stream at the Data Engineer course , we prepared a translation of interesting material.






Overview


We’ll talk about a fairly popular pattern by which applications use several data stores, where each store is used for its own purposes, for example, to store the canonical form of data (MySQL, etc.), provide advanced search capabilities (ElasticSearch, etc. .), caching (Memcached, etc.) and others. Typically, when using multiple data storages, one of them works as the main storage, and the other as derivative storage. The only problem is how to synchronize these data stores.

We looked at a number of different patterns that tried to solve the problem of synchronizing multiple repositories, such as double-entry, distributed transactions, etc. However, these approaches have significant limitations in terms of real-life use, reliability, and maintenance. In addition to data synchronization, some applications also need to enrich data by invoking external services.

To solve these problems, Delta was developed. Delta ultimately is a consistent, event-driven platform for synchronizing and enriching data.

Existing solutions


Double entry


To synchronize two data stores, you can use double recording, which writes to one store, and then immediately writes to another. The first record can be repeated, and the second can be interrupted if the first fails after exhausting the number of attempts. However, two data stores may stop synchronizing if writing to the second store fails. This problem is usually solved by creating a recovery procedure that can periodically re-transfer data from the first storage to the second or do this only if differences are found in the data.

Problems:

Performing a recovery procedure is a specific job that cannot be reused. In addition, data between storages remains out of sync until the recovery procedure is completed. The solution is complicated if more than two data stores are used. Finally, the recovery procedure can add stress to the original data source.

Change Log Table


When changes occur in a set of tables (for example, inserting, updating, and deleting records), change records are added to the log table as part of the same transaction. Another thread or process constantly requests events from the log table and writes them to one or more data storages, when it becomes necessary to delete events from the log table after confirming the record by all storages.

Problems:

This pattern should be implemented as a library and, ideally, without changing the application code using it. In a polyglot environment, the implementation of such a library must exist in any necessary language, but it is very difficult to ensure the coordination of the functions and behavior between languages.

Another problem lies in obtaining schema changes in systems that do not support transactional schema changes [1] [2], such as MySQL. Therefore, a template for making a change (for example, changing a scheme) and writing it to the change log table will not always work.

Distributed Transactions


Distributed transactions can be used to split a transaction between several heterogeneous data storages so that the operation is either committed in all the stores used or not committed in any of them.

Problems:

Distributed transactions are a very big problem for heterogeneous data warehouses. By their nature, they can only rely on the smallest common denominator of the systems involved. For example, XA transactions block execution if a failure occurs during the preparation process. In addition, XA does not provide deadlock detection and does not support optimistic concurrency management schemes. In addition, some systems like ElasticSearch do not support XA or any other heterogeneous transaction model. Thus, ensuring atomicity of recording in various data storage technologies remains a very difficult task for applications [3].

Delta


Delta was designed to address the limitations of existing data synchronization solutions, and it also enriches data on the fly. Our goal was to abstract all these complex points from application developers so that they could fully concentrate on the implementation of business functionality. Next, we will describe “Movie Search,” the actual use case for Netflix's Delta.

Netflix makes extensive use of microservice architecture and each microservice typically serves one type of data. The main information about the film is taken out in a microservice called the Movie Service, as well as related data, such as information about producers, actors, vendors and so on, are managed by several other microservices (namely, Deal Service, Talent Service and Vendor Service).
Business users at Netflix Studios often need to search by various criteria for films, which is why it is very important for them to be able to search all the data related to films.

Before Delta, the movie search team needed to retrieve data from several microservices before indexing movie data. In addition, the team had to develop a system that would periodically update the search index, requesting changes from other microservices, even if there were no changes at all. This system quickly became overgrown with complexity and became difficult to maintain.


Figure 1. Polling system before Delta
After using Delta, the system was simplified to an event-driven system, as shown in the following figure. CDC (Change-Data-Capture) events are sent to Keystone Kafka topics using the Delta-Connector. A Delta application built using the Delta Stream Processing Framework (based on Flink) receives CDC events from the topic, enriches them, invoking other microservices, and finally passes the enriched data to the search index in Elasticsearch. The whole process takes place in almost real time, that is, as soon as changes are recorded in the data warehouse, search indexes are updated.


Figure 2. Data pipeline using Delta
In the following sections, we describe the work of the Delta-Connector, which connects to the repository and publishes CDC events at the transport level, which is a real-time data transmission infrastructure that directs CDC events to Kafka topics. And at the very end, we’ll talk about the Delta stream processing framework that application developers can use for processing and enrichment logic.

CDC (Change-Data-Capture)


We have developed a CDC service called Delta-Connector, which can capture committed changes from the data store in real time and write them to the stream. Real-time changes are taken from the transaction log and storage dumps. Dumps are used because transaction logs do not usually store the entire change history. Changes are usually serialized as Delta events, so the recipient does not have to worry about where the change comes from.

Delta-Connector supports several additional features, such as:

  • Ability to write custom output past Kafka.
  • The ability to activate manual dumps at any time for all tables, a specific table or for certain primary keys.
  • Dumps can be picked up by chunks, so there is no need to start all over again in the event of a failure.
  • There is no need to put locks on tables, which is very important so that the write traffic to the database is never blocked by our service.
  • High availability due to backups in AWS Availability Zones.

We currently support MySQL and Postgres, including deployments to AWS RDS and Aurora. We also support Cassandra (multi-master). You can learn more about the Delta-Connector on this blog .

Kafka and transport level


The Delta Event Transport layer is built on the Keystone platform messaging service .

So historically, posting on Netflix has been optimized for increased availability rather than longevity (see previous article ). A compromise was the potential inconsistency of broker data in various border scenarios. For example, the unclean leader election is responsible for ensuring that the recipient potentially duplicates or loses events.

With Delta, we wanted to get stronger guarantees of durability in order to ensure the delivery of CDC events to derivative storages. To do this, we proposed a specially designed Kafka cluster as an object of the first class. You can look at some broker settings in the table below:



In Keystone Kafka clusters, unclean leader election is usually enabled to ensure publisher availability. This can result in message loss if an unsynchronized replica is selected as the leader. For the new highly reliable Kafka cluster, the unclean leader election option is disabled to prevent message loss.

We also increased the replication factor from 2 to 3 and minimum insync replicasfrom 1 to 2. Publishers writing to this cluster require acks from all others, ensuring that 2 out of 3 replicas will have the most up-to-date messages sent by the publisher.

When the broker instance exits, the new instance replaces the old one. However, the new broker will need to catch up with unsynchronized replicas, which can take several hours. To reduce the recovery time for this scenario, we started using the Amazon Elastic Block Store instead of local brokers disks. When a new instance replaces a completed broker instance, it attaches the EBS volume that the completed instance had and begins to catch up with new messages. This process reduces the time to eliminate the backlog from several hours to several minutes, since the new instance no longer needs to be replicated from an empty state. In general, separate storage and broker life cycles significantly reduce the effect of the broker change.

To further increase the guarantee of data delivery, we used a message tracking system to detect any loss of messages in extreme conditions (for example, clock synchronization in the section leader).

Stream processing framework


The processing level at Delta is based on the Netflix SPaaS platform, which enables the integration of Apache Flink with the Netflix ecosystem. The platform provides a user interface that controls the deployment of Flink jobs and the orchestration of Flink clusters on top of our Titus container management platform. The interface also manages job configurations and allows users to make configuration changes dynamically without having to recompile Flink jobs.

Delta provides a stream processing framework for Flink and SPaaS- based data, which uses annotation-basedDSL (Domain Specific Language) to abstract technical details. For example, to determine the step by which events will be enriched by calling external services, users need to write the next DSL, and the framework will create a model based on it that Flink will execute.


Figure 3. DSL enrichment example in Delta

The processing framework not only shortens the learning curve, but also provides general flow processing functions, such as deduplication, schematization, as well as flexibility and fault tolerance to solve common problems at work.

The Delta Stream Processing Framework consists of two key modules, the DSL & API module and the Runtime module. The DSL & API module provides the DSL and UDF (User-Defined-Function) APIs so that users can write their own processing logic (such as filtering or transformations). The Runtime module provides an implementation of the DSL parser, which builds an internal representation of the processing steps in DAG models. The Execution component interprets DAG models to initialize the actual Flink statements and ultimately launch the Flink application. The architecture of the framework is illustrated in the following figure.


Figure 4. Delta Stream Processing Framework architecture

This approach has several advantages:

  • - Flink SPaaS.
  • , - (UDF).
  • Delta , , .



Delta has been in production for over a year and plays a key role in many Netflix Studio applications. It helped teams implement use cases such as search indexing, data storage, and event-driven workflows. The following is an overview of the high-level architecture of the Delta platform.


Figure 5. The high-level architecture of Delta.

Acknowledgments


We would like to thank the following people who contributed to the creation and development of Delta at Netflix: Allen Wang, Charles Zhao, Jaebin Yoon, Josh Snyder, Kasturi Chatterjee, Mark Cho, Olof Johansson, Piyush Goyal, Prashanth Ramdas, Raghuram Onti Srinivasan, Sandeep Gupta , Steven Wu, Tharanga Gamaethige, Yun Wang and Zhenzhong Xu.

Sources


  1. dev.mysql.com/doc/refman/5.7/en/implicit-commit.html
  2. dev.mysql.com/doc/refman/5.7/en/cannot-roll-back.html
  3. Martin Kleppmann, Alastair R. Beresford, Boerge Svingen: Online event processing. Commun. ACM 62(5): 43–49 (2019). DOI: doi.org/10.1145/3312527

: «Data Build Tool Amazon Redshift».

Source: https://habr.com/ru/post/undefined/


All Articles