DBLog - a common framework for Change Data Capture

Hello everyone! We offer you to read the translation of the article, which we prepared especially for students of the course "High Load Architect" .




Introduction


Tracking data changes (Change Data Capture, CDC) allows you to receive in real time the committed changes in the database and distribute them to various consumers [1] [2]. CDC is becoming increasingly popular when synchronization between heterogeneous data warehousing is required (for example, MySQL and ElasticSearch) and is an alternative to traditional methods such as dual-write and distributed transactions [3] [4].

The source for the CDC, in databases such as MySQL and PostgreSQL, is the transaction log (transaction log). But since transaction logs are usually truncated, they may not contain the entire history of changes. Therefore, to obtain the full state of the source, we need dumps. We examined several open source CDC projects, often using the same libraries, database APIs, and protocols, and found a number of limitations in them that did not meet our requirements. For example, stopping the processing of log events until completion of the dump (full data snapshot), the inability to initiate dump dumping on demand or implementation, affecting write traffic due to the use of table locks.

This led us to develop DBLog.with a unified approach to processing logs and dumps. To support it, a number of functions should be implemented in the DBMS, which are already in MySQL, PostgreSQL, MariaDB, and a number of other databases.

Some of the features of DBLog:

  • Log events are processed in the order they occur.
  • Dumps can be made at any time for all tables, for one table or for specific primary keys of a table.
  • Log processing alternates with dump processing, dividing the dump into blocks. Thus, log processing can take place in parallel with dump processing. If the process ends then it can be resumed after the last completed block without having to start all over again. It also allows you to adjust the throughput when creating a dump and, if necessary, pause its creation.
  • , .
  • : , API.
  • . , .


Earlier, we discussed Delta ( translation ), a platform for data enrichment and synchronization. The goal of Delta is to synchronize several data stores, where one of them is primary (for example, MySQL), and the other derivatives (for example, ElasticSearch). One of the key development requirements was a low delay in propagating changes from the source to the recipients, as well as a high availability of the event stream. These conditions apply regardless of whether all data warehouses are used by one team or if one team owns the data and the other consumes it. In an article on Delta ( translation ), we also described use cases that go beyond data synchronization, such as event processing.

In order to synchronize data and process events, in addition to being able to track changes in real time, we need to fulfill the following requirements:

  • Getting the full state . Derived stores (such as ElasticSearch) should ultimately store the full state of the source. We implement this through dumps of the original database.
  • Start state recovery at any time. Instead of considering the dump as a one-time operation only for primary initialization, we can do it at any time: for all tables, for one table or for specific primary keys. This is very important for the recovery of consumers in cases of loss or damage to data.
  • - . . , (, ). - . , - .
  • . . API, , , . , , , .
  • . Netflix , Kafka, SQS, Kinesis, Netflix, Keystone. , (, ), (, ). . API.
  • . Netflix , (MySQL, PostgreSQL) AWS RDS. .



We examined several existing open source solutions, including: Maxwell , SpinalTap , Yelp MySQL Streamer, and Debezium . In terms of data collection, they all work in a similar way, using a transaction log. For example, using the binlog replication protocol in MySQL or the replication slots in PostgreSQL.

But when processing dumps, they have at least one of the following limitations:

  • Stop processing log events while creating a dump . As a result, if the dump is large, then the processing of log events stops for a long period. This will be a problem if consumers rely on small delays in propagating the changes.
  • . . (, ElasticSearch) .
  • . . [5]. . , . . , PostgreSQL RDS .
  • Using specific database functions . We found that some solutions use additional database features that are not present on all systems. For example, using the blackhole engine in MySQL or getting a consistent snapshot of dumps through replication slots in PostgreSQL. This limits code reuse between different databases.

In the end, we decided to take a different approach to working with dumps:

  • alternate log and dump events so that they can be executed together;
  • start a dump at any time;
  • Do not use table locks
  • Do not use specific database features.

DBLog Framework


DBLog is a java framework for receiving dumps and changes in real time. Dumps are performed in parts so that they alternate with real-time events and do not delay their processing for a long period. Dumps can be made at any time through the API. This allows consumers to get the full state of the database at the initialization stage or later for disaster recovery.

When designing the framework, we thought about minimizing the impact on the database. Dumps can be paused and resumed as necessary. This works both for crash recovery and for stopping if the database has become a bottleneck. We also do not lock tables so as not to affect write operations.

DBLog allows you to record events in various forms, including in another database or through the API. To store the state associated with the processing of logs and dumps, as well as to select the master node, we use Zookeeper. When creating DBLog, we implemented the ability to connect various plugins, allowing you to change the implementation as you like (for example, replace Zookeeper with something else).

Next, we consider in more detail the processing of logs and dumps.

Logs


The framework requires the database to record events for each changed row in real time, while maintaining the order of commits. The source of these events is assumed to be the transaction log. The database sends them through a transport that DBLog can use. For this transport we use the term “change log”. An event can be of the following types: create, create, update, or delete. For each event, the following information must be provided: the sequence number in the log (log sequence number), the state of the column during the operation, and the scheme that was applied at the time the operation was performed.

Each change is serialized into the DBLog event format and sent to writer, for its further transfer to the output. Sending events to writer is a non-blocking operation, because writer runs in a separate thread and accumulates events in the internal buffer. Buffered events are sent out in the order they were received. The framework allows you to connect a custom formatter to serialize events into an arbitrary format. Output is a simple interface that allows you to connect to any recipient, such as a stream, data warehouse, or even an API.

Dumps


Dumps are necessary because transaction logs have a limited storage time, which prevents them from being used to restore the full original data set. Dumps are created in blocks (chunk) so that they can alternate with log events, allowing them to be processed simultaneously. For each selected line of the block, an event is generated and serialized in the same format as the log event. Thus, the consumer does not need to worry an event came from the log or dump. Both log events and dump events are sent to the output through the same writer.

Dumps can be scheduled at any time through the API for all tables, one table, or for specific primary keys of the table. A dump of the table is performed by blocks of a given size. You can also configure a delay in processing new blocks, allowing only log events at this time. The block size and delay allow you to balance the processing of log and dump events. Both settings can be changed at runtime.

Blocks (chunk) are selected by sorting the table in ascending order of the primary key and selecting rows where the primary key is greater than the last primary key of the previous block. The database is required to execute this query efficiently, which is usually applicable to systems that implement a range scan on a range of primary keys.


Figure 1. Table breakdown with 4 columns c1-c4 and c1 as the primary key (pk). The primary key of an integer type, block size 3. Block 2 is selected by the condition c1> 4.

Blocks must be taken in such a way as not to delay the processing of log events for a long period and save the change history so that the selected row with the old value could not overwrite the newer event.

In order to be able to select blocks sequentially, in the change log we create recognizable “watermarks”. Watermarks are implemented through a table in the source database. This table is stored in a special namespace so that there are no conflicts with the application tables. Only one line with a UUID value is stored in it. A watermark is created when this value changes to a specific UUID. Updating the line leads to a change event, which we ultimately receive through the change log.

Watermarked dumps are created as follows:

  1. We temporarily suspend the processing of log events.
  2. Generate a “low” watermark by updating the watermark table.
  3. We start SELECT for the next block and save in memory the result indexed by the primary key.
  4. “” (high) , .
  5. . .
  6. , .
  7. , , .
  8. , 1.

SELECT is supposed to return a state that represents a committed change up to a point in history. Or, equivalent to the following: SELECT is executed in a certain position of the change log, taking into account changes up to this point. Databases usually do not provide information about the execution time of a SELECT (with the exception of MariaDB ).

The main idea of ​​our approach is that in the change log we define a window that guarantees the preservation of the SELECT position in the block. The window is opened by writing the lower watermark, after which SELECT is executed, and the window is closed by writing the upper watermark. Since the exact position of the SELECT is unknown, all selected rows that conflict with the log events in this window are deleted. This ensures that there is no rewriting of the history in the change log.

For this to work, SELECT must read the state of the table from the moment of the lower watermark or later (it is permissible to include the changes that were made after the lower watermark and before reading). In general, SELECT is required to see changes made before it is executed.. We call it “non-stale reads”. In addition, since the upper watermark is written after, it is guaranteed that SELECT will be executed before it.

Figures 2a and 2b illustrate the block selection algorithm. As an example, we give a table with primary keys from k1 to k6. Each entry in the change log represents a create, update, or delete event for the primary key. Figure 2a shows the watermark generation and block selection (steps 1 to 4). Updating the watermark table in steps 2 and 4 creates two change events (magenta), which are ultimately received through the log. In Figure 2b, we focus on the lines of the current block that are removed from the result set with primary keys that appear between the watermarks (steps 5 to 7).


Figure 2a - Watermark algorithm for block selection (steps 1–4).


Figure 2b - Watermark algorithm for selecting blocks (steps 5–7).

Please note that between the lower and upper watermarks a large number of events can appear in the log if one or more transactions made many line changes. For this reason, we do a short-term suspension of the processing of the log at stages 2–4 so as not to miss the watermarks. Thus, the processing of log events can be resumed event by event, which ultimately allows you to detect watermarks without the need to cache log events of the log. Log processing is suspended only for a short time, as steps 2–4 are expected to be quick: updating watermarks is a single write operation, and SELECT is performed with a restriction.

As soon as the upper watermark is received in step 7, the non-conflicting lines of the block are transmitted to the output in the order they were received. This is a non-blocking operation, because the writer runs in a separate thread, which allows you to quickly resume processing the log after step 7. After that, the processing of the log continues for events that occur after the upper watermark.

Figure 2c shows the recording order for the entire block using the same example as in figures 2a and 2b. Events in the log that appear before the upper watermark are recorded first. Then the remaining lines from the block result (magenta). And finally, events that occur after the top watermark are recorded.


Figure 2c - The order of recording the output. Strip alternation with dump.

Supported Databases


To use DBLog, the database must provide a change log as a linear history of committed changes with non-stale reads. These conditions are met by systems such as MySQL, PostgreSQL, MariaDB, etc., so the framework can be used in the same way with these databases.
So far, we have added support for MySQL and PostgreSQL. Each database uses its own libraries to receive log events, since each of them uses a proprietary protocol. For MySQL, we use shyiko / mysql-binlog-connector , which implements the binlog replication protocol. For PostgreSQL, there are replication slots with the wal2json plugin . Changes are accepted through the streaming replication protocol, which is implemented by the jdbc driverPostgreSQL The schema definition for each captured change is different in MySQL and PostgreSQL. In PostgreSQL, wal2json contains names, column types, and values. For MySQL, schema changes should be tracked as binlog events.

Dump processing was done using SQL and JDBC, requiring only the implementation of block selection and updating the watermark. For MySQL and PostgreSQL, the same code is used, which can be used for other similar databases. The dump processing itself is not dependent on SQL or JDBC and allows you to use databases that meet the requirements of DBLog, even if they use different standards.


Figure 3 - High-level DBLog architecture.

High availability


DBLog uses a single-node active-passive architecture. One instance is active (leading), while the others are passive (standby). To select the host, we use Zookeeper. A lease is used for the master node, which must be updated periodically to continue to be the master. In case of termination of the renewal of the lease, the functions of the leader are transferred to another node. Currently, we deploy one copy for each AZ (availability zone, usually we have 3 AZ), so if one AZ falls, then the copy in another AZ can continue processing with a minimum total downtime. Backup instances can be located in different regions, although it is recommended that you work in the same region as the database host to provide low latency for capturing changes.

Use on Production


DBLog is the basis for the MySQL and PostgreSQL connectors used in Delta . Since 2018, Delta has been used in production for synchronizing data warehouses and event processing in Netflix studio applications. Delta connectors use their event serializer. Netflix-specific streams such as Keystone are used as output .


Figure 4 - Delta Connector.

In addition to Delta, DBLog is also used at Netflix to create connectors for other data movement platforms that have their own data formats.

stay with us


DBLog has additional features that are not covered in this article, such as:

  • Ability to get table schemas without using locks.
  • Integration with schema storage. For each event, the scheme is stored in the storage, the link to which is indicated in the event payload.
  • Monotonic writes mode. Ensuring that after saving the state of a particular row, its past state cannot be overwritten. Thus, consumers receive state changes only in the forward direction, without moving back and forth in time.

We plan to open the DBLog source code in 2020 and include additional documentation in it.

Acknowledgments


We would like to thank the following people for contributing to the development of DBLog: Josh Snyder , Raghuram Onti Srinivasan , Tharanga Gamaethige and Yun Wang .

References


[1] Das, Shirshanka, et al. “All aboard the Databus !: Linkedin's scalable consistent change data capture platform.” Third ACM Symposium on Cloud Computing. ACM, 2012
[2] “About Change Data Capture (SQL Server)” , Microsoft SQL docs, 2019
[3] Kleppmann, Martin, “Using logs to build a solid data infrastructure (or: why dual writes are a bad idea)“ , Confluent, 2015
[4] Kleppmann, Martin, Alastair R. Beresford, Boerge Svingen. “Online event processing.” Communications of the ACM 62.5 (2019): 43–49
[5] https://debezium.io/documentation/reference/0.10/connectors/mysql.html#snapshots


Learn more about the course.

All Articles