DBLog: un marco común para la captura de datos modificados

¡Hola a todos! Le ofrecemos leer la traducción del artículo, que preparamos especialmente para los estudiantes del curso "Arquitecto de alta carga" .




Introducción


El seguimiento de los cambios de datos (Change Data Capture, CDC) le permite recibir en tiempo real los cambios confirmados en la base de datos y distribuirlos a varios consumidores [1] [2]. Los CDC se están volviendo cada vez más populares cuando se requiere la sincronización entre el almacenamiento heterogéneo de datos (por ejemplo, MySQL y ElasticSearch) y es una alternativa a los métodos tradicionales como la escritura dual y las transacciones distribuidas [3] [4].

La fuente del CDC, en bases de datos como MySQL y PostgreSQL, es el registro de transacciones (registro de transacciones). Pero dado que los registros de transacciones generalmente se truncan, es posible que no contengan todo el historial de cambios. Por lo tanto, para obtener el estado completo de la fuente, necesitamos volcados. Examinamos varios proyectos de CDC de código abierto, a menudo utilizando las mismas bibliotecas, API de bases de datos y protocolos, y encontramos una serie de limitaciones en ellos que no cumplían con nuestros requisitos. Por ejemplo, detener el procesamiento de eventos de registro hasta la finalización del volcado (instantánea de datos completa), la imposibilidad de iniciar el volcado de volcado bajo demanda o implementación, afectando el tráfico de escritura debido al uso de bloqueos de tabla.

Esto nos llevó a desarrollar DBLog.con un enfoque unificado para procesar registros y volcados. Para admitirlo, se deben implementar varias funciones en el DBMS, que ya están en MySQL, PostgreSQL, MariaDB y otras bases de datos.

Algunas de las características de DBLog:

  • Los eventos de registro se procesan en el orden en que ocurren.
  • Los volcados se pueden realizar en cualquier momento para todas las tablas, para una tabla o para claves primarias específicas de una tabla.
  • El procesamiento de registros se alterna con el procesamiento de volcado, dividiendo el volcado en bloques. Por lo tanto, el procesamiento de registros puede tener lugar en paralelo con el procesamiento de volcado. Si el proceso finaliza, se puede reanudar después del último bloque completado sin tener que comenzar de nuevo. También le permite ajustar el rendimiento al crear un volcado y, si es necesario, pausar su creación.
  • , .
  • : , API.
  • . , .


Anteriormente, discutimos Delta ( traducción ), una plataforma para enriquecimiento y sincronización de datos. El objetivo de Delta es sincronizar varios almacenes de datos, donde uno de ellos es primario (por ejemplo, MySQL) y los otros derivados (por ejemplo, ElasticSearch). Uno de los requisitos clave de desarrollo fue un bajo retraso en la propagación de los cambios desde el origen a los destinatarios, así como una alta disponibilidad del flujo de eventos. Estas condiciones se aplican independientemente de si un equipo usa todos los almacenes de datos o si un equipo posee los datos y el otro los consume. En un artículo sobre Delta ( traducción ), también describimos casos de uso que van más allá de la sincronización de datos, como el procesamiento de eventos.

Para sincronizar datos y procesar eventos, además de poder realizar un seguimiento de los cambios en tiempo real, debemos cumplir los siguientes requisitos:

  • Obteniendo el estado completo . Las tiendas derivadas (como ElasticSearch) deberían almacenar en última instancia el estado completo de la fuente. Implementamos esto a través de volcados de la base de datos original.
  • Comience la recuperación del estado en cualquier momento. En lugar de considerar el volcado como una operación única para la inicialización primaria, podemos hacerlo en cualquier momento: para todas las tablas, para una tabla o para claves primarias específicas. Esto es muy importante para la recuperación de los consumidores en casos de pérdida de datos o corrupción.
  • - . . , (, ). - . , - .
  • . . API, , , . , , , .
  • . Netflix , Kafka, SQS, Kinesis, Netflix, Keystone. , (, ), (, ). . API.
  • . Netflix , (MySQL, PostgreSQL) AWS RDS. .



Examinamos varias soluciones de código abierto existentes, que incluyen: Maxwell , SpinalTap , Yelp MySQL Streamer y Debezium . En términos de recopilación de datos, todos funcionan de manera similar, utilizando un registro de transacciones. Por ejemplo, usando el protocolo de replicación binlog en MySQL o las ranuras de replicación en PostgreSQL.

Pero cuando procesan volcados, tienen al menos una de las siguientes limitaciones:

  • Deje de procesar eventos de registro mientras crea un volcado . Como resultado, si el volcado es grande, el procesamiento de eventos de registro se detiene durante un período prolongado. Esto será un problema si los consumidores confían en pequeños retrasos en la propagación de los cambios.
  • . . (, ElasticSearch) .
  • . . [5]. . , . . , PostgreSQL RDS .
  • Usando funciones específicas de la base de datos . Descubrimos que algunas soluciones usan características de base de datos adicionales que no están presentes en todos los sistemas. Por ejemplo, usando el motor de agujeros negros en MySQL u obteniendo una instantánea consistente de los volcados a través de ranuras de replicación en PostgreSQL. Esto limita la reutilización de código entre diferentes bases de datos.

Al final, decidimos adoptar un enfoque diferente para trabajar con vertederos:

  • alternar eventos de registro y volcado para que puedan ejecutarse juntos;
  • iniciar un volcado en cualquier momento;
  • No use cerraduras de mesa
  • No use características específicas de la base de datos.

Marco DBLog


DBLog es un marco de Java para recibir volcados y cambios en tiempo real. Los volcados se realizan en partes para que se alternen con eventos en tiempo real y no retrasen su procesamiento durante un largo período. Los volcados se pueden realizar en cualquier momento a través de la API. Esto permite a los consumidores obtener el estado completo de la base de datos en la etapa de inicialización o posterior para la recuperación ante desastres.

Al diseñar el marco, pensamos en minimizar el impacto en la base de datos. Los volcados se pueden pausar y reanudar según sea necesario. Esto funciona tanto para la recuperación de fallos como para detenerse si la base de datos se ha convertido en un cuello de botella. Tampoco bloqueamos las tablas para no afectar las operaciones de escritura.

DBLog le permite grabar eventos en varias formas, incluso en otra base de datos o mediante la API. Para almacenar el estado asociado con el procesamiento de registros y volcados, así como para seleccionar el nodo maestro, utilizamos Zookeeper. Al crear DBLog, implementamos la capacidad de conectar varios complementos, lo que le permite cambiar la implementación a su gusto (por ejemplo, reemplazar Zookeeper por otra cosa).

A continuación, consideramos con más detalle el procesamiento de registros y volcados.

Registros


El marco requiere que la base de datos registre eventos para cada fila cambiada en tiempo real, mientras mantiene el orden de los commits. Se supone que la fuente de estos eventos es el registro de transacciones. La base de datos los envía a través de un transporte que DBLog puede usar. Para este transporte utilizamos el término "registro de cambios". Un evento puede ser de los siguientes tipos: crear, crear, actualizar o eliminar. Para cada evento, se debe proporcionar la siguiente información: el número de secuencia en el registro (número de secuencia de registro), el estado de la columna durante la operación y el esquema que se aplicó en el momento en que se realizó la operación.

Cada cambio se serializa en el formato de evento DBLog y se envía al escritor para su posterior transferencia a la salida. Enviar eventos al escritor es una operación sin bloqueo, porque el escritor se ejecuta en un hilo separado y acumula eventos en el búfer interno. Los eventos almacenados en el búfer se envían en el orden en que se recibieron. El marco le permite conectar un formateador personalizado para serializar eventos en un formato arbitrario. La salida es una interfaz simple que le permite conectarse a cualquier destinatario, como una secuencia, un almacén de datos o incluso una API.

Deshecho


Los volcados son necesarios porque los registros de transacciones tienen un tiempo de almacenamiento limitado, lo que evita que se utilicen para restaurar el conjunto de datos original completo. Los volcados se crean en bloques (fragmento) para que puedan alternar con eventos de registro, lo que les permite ser procesados ​​simultáneamente. Para cada línea seleccionada del bloque, se genera un evento y se serializa en el mismo formato que el evento de registro. Por lo tanto, el consumidor no necesita preocuparse de que un evento provenga del registro o el volcado. Tanto los eventos de registro como los eventos de volcado se envían a la salida a través del mismo escritor.

Los volcados se pueden programar en cualquier momento a través de la API para todas las tablas, una tabla o para claves primarias específicas de la tabla. Un volcado de la tabla se realiza mediante bloques de un tamaño determinado. También puede configurar una demora en el procesamiento de nuevos bloques, permitiendo que solo se registren eventos en este momento. El tamaño del bloque y el retraso le permiten equilibrar el procesamiento de eventos de registro y volcado. Ambas configuraciones se pueden cambiar en tiempo de ejecución.

Los bloques (fragmento) se seleccionan ordenando la tabla en orden ascendente de la clave primaria y seleccionando las filas donde la clave primaria es mayor que la última clave primaria del bloque anterior. La base de datos es necesaria para ejecutar esta consulta de manera eficiente, lo que generalmente es aplicable a los sistemas que implementan un escaneo de rango en un rango de claves primarias.


Figura 1. Desglose de la tabla con 4 columnas c1-c4 y c1 como clave principal (pk). La clave principal de un tipo entero, tamaño de bloque 3. El bloque 2 se selecciona con la condición c1> 4. Los

bloques deben tomarse de manera que no retrasen el procesamiento de los eventos de registro durante un período prolongado y guarden el historial de cambios para que la fila seleccionada con el valor anterior no pueda sobrescribir el más nuevo. evento.

Para poder seleccionar bloques secuencialmente, en el registro de cambios creamos "marcas de agua" reconocibles. Las marcas de agua se implementan a través de una tabla en la base de datos de origen. Esta tabla se almacena en un espacio de nombres especial para que no haya conflictos con las tablas de la aplicación. Solo se almacena una línea con un valor UUID. Se crea una marca de agua cuando este valor cambia a un UUID específico. La actualización de la línea conduce a un evento de cambio, que finalmente recibimos a través del registro de cambios.

Los vertederos con marca de agua se crean de la siguiente manera:

  1. Suspendemos temporalmente el procesamiento de eventos de registro.
  2. Genere una marca de agua "baja" actualizando la tabla de marcas de agua.
  3. Comenzamos SELECT para el siguiente bloque y guardamos en la memoria el resultado indexado por la clave primaria.
  4. “” (high) , .
  5. . .
  6. , .
  7. , , .
  8. , 1.

Se supone que SELECT devuelve un estado que representa un cambio comprometido hasta un punto de la historia. O, equivalente a lo siguiente: SELECT se ejecuta en una determinada posición del registro de cambios, teniendo en cuenta los cambios hasta este punto. Las bases de datos generalmente no proporcionan información sobre el tiempo de ejecución de un SELECT (con la excepción de MariaDB ).

La idea principal de nuestro enfoque es que en el registro de cambios definimos una ventana que garantiza la preservación de la posición SELECT en el bloque. La ventana se abre escribiendo la marca de agua inferior, después de lo cual se ejecuta SELECT, y la ventana se cierra escribiendo la marca de agua superior. Dado que se desconoce la posición exacta de SELECT, se eliminan todas las filas seleccionadas que entran en conflicto con los eventos de registro en esta ventana. Esto asegura que no hay reescritura del historial en el registro de cambios.

Para que esto funcione, SELECT debe leer el estado de la tabla desde el momento de la marca de agua inferior o posterior (está permitido incluir los cambios que se hicieron después de la marca de agua inferior y antes de leer). En general, se requiere SELECT para ver los cambios realizados antes de que se ejecute.. Lo llamamos "lecturas no obsoletas". Además, dado que la marca de agua superior se escribe después, se garantiza que SELECT se ejecutará antes.

Las figuras 2a y 2b ilustran el algoritmo de selección de bloque. Como ejemplo, damos una tabla con claves primarias de k1 a k6. Cada entrada en el registro de cambios representa un evento de creación, actualización o eliminación para la clave primaria. La Figura 2a muestra la generación de marca de agua y la selección de bloque (pasos 1 a 4). La actualización de la tabla de marca de agua en los pasos 2 y 4 crea dos eventos de cambio (magenta), que finalmente se reciben a través del registro. En la Figura 2b, nos enfocamos en las líneas del bloque actual que se eliminan del conjunto de resultados con las claves principales que aparecen entre las marcas de agua (pasos 5 a 7).


Figura 2a - Algoritmo de marca de agua para la selección de bloques (pasos 1–4).


Figura 2b - Algoritmo de marca de agua para seleccionar bloques (pasos 5–7).

Tenga en cuenta que entre las marcas de agua inferior y superior puede aparecer una gran cantidad de eventos en el registro si una o más transacciones realizaron muchos cambios de línea. Por esta razón, hacemos una suspensión a corto plazo del procesamiento del registro en las etapas 2–4 para no perder las marcas de agua. Por lo tanto, el procesamiento de eventos de registro se puede reanudar evento por evento, lo que en última instancia le permite detectar marcas de agua sin la necesidad de almacenar en caché los eventos de registro del registro. El procesamiento de registros se suspende solo por un corto tiempo, ya que se espera que los pasos 2 a 4 sean rápidos: la actualización de las marcas de agua es una operación de escritura única, y SELECT se realiza con una restricción.

Tan pronto como se recibe la marca de agua superior en el paso 7, las líneas no conflictivas del bloque se transmiten a la salida en el orden en que se recibieron. Esta es una operación sin bloqueo, porque el escritor se ejecuta en un hilo separado, lo que le permite reanudar rápidamente el procesamiento del registro después del paso 7. Después de eso, el procesamiento del registro continúa para los eventos que ocurren después de la marca de agua superior.

La figura 2c muestra el orden de grabación para todo el bloque utilizando el mismo ejemplo que en las figuras 2a y 2b. Los eventos en el registro que aparecen antes de la marca de agua superior se registran primero. Luego, las líneas restantes del resultado del bloque (magenta). Y finalmente, se registran los eventos que ocurren después de la marca de agua superior.


Figura 2c - El orden de grabación de la salida. Tira de alternancia con volcado.

Bases de datos compatibles


Para usar DBLog, la base de datos debe proporcionar un registro de cambios como un historial lineal de cambios confirmados con lecturas no obsoletas. Sistemas como MySQL, PostgreSQL, MariaDB, etc. cumplen estas condiciones, por lo que el marco puede utilizarse de la misma manera con estas bases de datos.
Hasta ahora, hemos agregado soporte para MySQL y PostgreSQL. Cada base de datos usa sus propias bibliotecas para recibir eventos de registro, ya que cada uno de ellos usa un protocolo propietario. Para MySQL, utilizamos shyiko / mysql-binlog-connector , que implementa el protocolo de replicación binlog. Para PostgreSQL, hay ranuras de replicación con el complemento wal2json . Los cambios se aceptan a través del protocolo de replicación de transmisión, implementado por el controlador jdbcPostgreSQL La definición de esquema para cada cambio capturado es diferente en MySQL y PostgreSQL. En PostgreSQL, wal2json contiene nombres, tipos de columna y valores. Para MySQL, los cambios de esquema deben rastrearse como eventos binlog.

El procesamiento de volcado se realizó utilizando SQL y JDBC, que requieren solo la implementación de la selección de bloques y la actualización de la marca de agua. Para MySQL y PostgreSQL, se usa el mismo código, que se puede usar para otras bases de datos similares. El procesamiento de volcado en sí no depende de SQL o JDBC y le permite usar bases de datos que cumplan con los requisitos de DBLog, incluso si usan estándares diferentes.


Figura 3 - Arquitectura DBLog de alto nivel.

Alta disponibilidad


DBLog utiliza una arquitectura activa-pasiva de nodo único. Una instancia es activa (líder), mientras que las otras son pasivas (en espera). Para seleccionar el host, usamos Zookeeper. Se utiliza una concesión para el nodo maestro, que debe actualizarse periódicamente para seguir siendo el maestro. En caso de terminación de la renovación del arrendamiento, las funciones del líder se transfieren a otro nodo. Actualmente, implementamos una copia para cada AZ (zona de disponibilidad, generalmente tenemos 3 AZ), por lo que si una AZ cae, entonces la copia en otra AZ puede continuar procesándose con un tiempo de inactividad total mínimo. Las instancias de respaldo se pueden ubicar en diferentes regiones, aunque se recomienda que trabaje en la misma región que el host de la base de datos para proporcionar baja latencia para capturar cambios.

Uso en producción


DBLog es la base de los conectores MySQL y PostgreSQL utilizados en Delta . Desde 2018, Delta se ha utilizado en producción para sincronizar almacenes de datos y procesamiento de eventos en aplicaciones de estudio de Netflix. Los conectores Delta usan su serializador de eventos. Las transmisiones específicas de Netflix como Keystone se utilizan como salida .


Figura 4 - Conector Delta.

Además de Delta, DBLog también se usa en Netflix para crear conectores para otras plataformas de movimiento de datos que tienen sus propios formatos de datos.

Quédate con nosotros


DBLog tiene características adicionales que no están cubiertas en este artículo, como:

  • Posibilidad de obtener esquemas de tabla sin usar bloqueos.
  • Integración con el esquema de almacenamiento. Para cada evento, el esquema se almacena en el almacenamiento, cuyo enlace se indica en la carga útil del evento.
  • Modo de escritura monotónica. Asegurarse de que después de guardar el estado de una fila en particular, su estado pasado no se pueda sobrescribir. Por lo tanto, los consumidores reciben cambios de estado solo en la dirección hacia adelante, sin avanzar y retroceder en el tiempo.

Planeamos abrir el código fuente DBLog en 2020 e incluir documentación adicional en él.

Expresiones de gratitud


Queremos agradecer a las siguientes personas por contribuir al desarrollo de DBLog: Josh Snyder , Raghuram Onti Srinivasan , Tharanga Gamaethige y Yun Wang .

Referencias


[1] Das, Shirshanka y col. "¡Todos a bordo del Databus!: La plataforma escalable de captura de datos de cambios consistentes de Linkedin". Tercer Simposio de ACM sobre Cloud Computing. ACM, 2012
[2] "Acerca de Change Data Capture (SQL Server)" , documentos de Microsoft SQL, 2019
[3] Kleppmann, Martin, "Uso de registros para construir una infraestructura de datos sólida (o: por qué las escrituras dobles son una mala idea)" , Confluent, 2015
[4] Kleppmann, Martin, Alastair R. Beresford, Boerge Svingen. "Procesamiento de eventos en línea". Comunicaciones de ACM 62.5 (2019): 43–49
[5] https://debezium.io/documentation/reference/0.10/connectors/mysql.html#snapshots


Obtenga más información sobre el curso.

All Articles