DBLog - uma estrutura comum para o Change Data Capture

Olá a todos! Oferecemos a você a leitura do artigo, preparado especialmente para os alunos do curso "High Load Architect" .




Introdução


O rastreamento de alterações de dados (Change Data Capture, CDC) permite que você receba em tempo real as alterações confirmadas no banco de dados e as distribua a vários consumidores [1] [2]. O CDC está se tornando cada vez mais popular quando a sincronização entre data warehousing heterogêneo é necessária (por exemplo, MySQL e ElasticSearch) e é uma alternativa aos métodos tradicionais, como transações de gravação dupla e transações distribuídas [3] [4].

A fonte do CDC, em bancos de dados como MySQL e PostgreSQL, é o log de transações (log de transações). Mas como os logs de transações geralmente são truncados, eles podem não conter todo o histórico de alterações. Portanto, para obter o estado completo da fonte, precisamos de dumps. Examinamos vários projetos de CDC de código aberto, geralmente usando as mesmas bibliotecas, APIs de banco de dados e protocolos, e encontramos várias limitações neles que não atendiam aos nossos requisitos. Por exemplo, parando o processamento de eventos de log até a conclusão do despejo (captura instantânea de dados completos), a incapacidade de iniciar o despejo de despejo sob demanda ou implementação, afetando o tráfego de gravação devido ao uso de bloqueios de tabela.

Isso nos levou a desenvolver o DBLog.com uma abordagem unificada para processar logs e despejos. Para suportá-lo, várias funções devem ser implementadas no DBMS, que já estão no MySQL, PostgreSQL, MariaDB e vários outros bancos de dados.

Alguns dos recursos do DBLog:

  • Eventos de log são processados ​​na ordem em que ocorrem.
  • Dumps podem ser feitos a qualquer momento para todas as tabelas, para uma tabela ou para chaves primárias específicas de uma tabela.
  • O processamento de log alterna com o processamento de dump, dividindo o dump em blocos. Assim, o processamento de log pode ocorrer em paralelo com o processamento de despejo. Se o processo terminar, ele poderá ser retomado após o último bloco concluído sem precisar reiniciar novamente. Também permite ajustar a taxa de transferência ao criar um dump e, se necessário, interromper sua criação.
  • , .
  • : , API.
  • . , .


Anteriormente, discutimos o Delta ( tradução ), uma plataforma para enriquecimento e sincronização de dados. O objetivo da Delta é sincronizar vários armazenamentos de dados, onde um deles é primário (por exemplo, MySQL) e os outros derivados (por exemplo, ElasticSearch). Um dos principais requisitos de desenvolvimento era um baixo atraso na propagação de alterações da origem para os destinatários, bem como uma alta disponibilidade do fluxo de eventos. Essas condições se aplicam independentemente de todos os armazéns de dados serem usados ​​por uma equipe ou se uma equipe possui os dados e a outra os consome. Em um artigo sobre Delta ( tradução ), também descrevemos casos de uso que vão além da sincronização de dados, como o processamento de eventos.

Para sincronizar dados e processar eventos, além de poder rastrear alterações em tempo real, precisamos atender aos seguintes requisitos:

  • Obtendo o estado completo . As lojas derivadas (como ElasticSearch) devem finalmente armazenar o estado completo da fonte. Implementamos isso através de despejos do banco de dados original.
  • Inicie a recuperação do estado a qualquer momento. Em vez de considerar o despejo como uma operação única apenas para inicialização primária, podemos fazê-lo a qualquer momento: para todas as tabelas, para uma tabela ou para chaves primárias específicas. Isso é muito importante para a recuperação de consumidores em casos de perda ou corrupção de dados.
  • - . . , (, ). - . , - .
  • . . API, , , . , , , .
  • . Netflix , Kafka, SQS, Kinesis, Netflix, Keystone. , (, ), (, ). . API.
  • . Netflix , (MySQL, PostgreSQL) AWS RDS. .



Examinamos várias soluções de código aberto existentes, incluindo: Maxwell , SpinalTap , Yelp MySQL Streamer e Debezium . Em termos de coleta de dados, todos eles funcionam de maneira semelhante, usando um log de transações. Por exemplo, usando o protocolo de replicação binlog no MySQL ou os slots de replicação no PostgreSQL.

Mas ao processar despejos, eles têm pelo menos uma das seguintes limitações:

  • Pare de processar eventos de log ao criar um despejo . Como resultado, se o dump for grande, o processamento dos eventos de log será interrompido por um longo período. Isso será um problema se os consumidores confiarem em pequenos atrasos na propagação das alterações.
  • . . (, ElasticSearch) .
  • . . [5]. . , . . , PostgreSQL RDS .
  • Usando funções específicas do banco de dados . Descobrimos que algumas soluções usam recursos adicionais de banco de dados que não estão presentes em todos os sistemas. Por exemplo, usando o mecanismo blackhole no MySQL ou obtendo um instantâneo consistente de despejos através de slots de replicação no PostgreSQL. Isso limita a reutilização de código entre diferentes bancos de dados.

No final, decidimos adotar uma abordagem diferente para trabalhar com despejos:

  • eventos alternativos de log e despejo para que possam ser executados juntos;
  • iniciar um despejo a qualquer momento;
  • Não use bloqueios de mesa
  • Não use recursos específicos do banco de dados.

Framework DBLog


DBLog é uma estrutura java para receber despejos e alterações em tempo real. Os despejos são executados em partes para que eles alternem com eventos em tempo real e não atrasem seu processamento por um longo período. Dumps podem ser feitos a qualquer momento através da API. Isso permite que os consumidores obtenham o estado completo do banco de dados no estágio de inicialização ou posterior para recuperação de desastres.

Ao projetar a estrutura, pensamos em minimizar o impacto no banco de dados. Os despejos podem ser pausados ​​e retomados conforme necessário. Isso funciona tanto para recuperação de falhas quanto para parar se o banco de dados se tornar um gargalo. Também não bloqueamos tabelas para não afetar as operações de gravação.

O DBLog permite registrar eventos de várias formas, inclusive em outro banco de dados ou através da API. Para armazenar o estado associado ao processamento de logs e despejos, bem como selecionar o nó principal, usamos o Zookeeper. Ao criar o DBLog, implementamos a capacidade de conectar vários plug-ins, permitindo que você altere a implementação como desejar (por exemplo, substitua o Zookeeper por outra coisa).

A seguir, consideramos com mais detalhes o processamento de logs e despejos.

Histórico


A estrutura requer que o banco de dados registre eventos para cada linha alterada em tempo real, mantendo a ordem das confirmações. A origem desses eventos é assumida como o log de transações. O banco de dados os envia através de um transporte que o DBLog pode usar. Para esse transporte, usamos o termo "log de alterações". Um evento pode ser dos seguintes tipos: criar, criar, atualizar ou excluir. Para cada evento, as seguintes informações devem ser fornecidas: o número de sequência no log (número de sequência do log), o estado da coluna durante a operação e o esquema que foi aplicado no momento em que a operação foi executada.

Cada alteração é serializada no formato de evento DBLog e enviada ao gravador, para posterior transferência para a saída. Enviar eventos ao gravador é uma operação sem bloqueio, porque o gravador é executado em um encadeamento separado e acumula eventos no buffer interno. Eventos em buffer são enviados na ordem em que foram recebidos. A estrutura permite conectar um formatador personalizado para serializar eventos em um formato arbitrário. A saída é uma interface simples que permite conectar-se a qualquer destinatário, como um fluxo, data warehouse ou mesmo uma API.

Lixões


Os despejos são necessários porque os logs de transações têm um tempo de armazenamento limitado, o que impede que sejam usados ​​para restaurar o conjunto de dados original completo. Os dumps são criados em blocos (chunk) para que possam alternar com os eventos de log, permitindo que sejam processados ​​simultaneamente. Para cada linha selecionada do bloco, um evento é gerado e serializado no mesmo formato que o evento de log. Portanto, o consumidor não precisa se preocupar com a ocorrência de um evento do log ou despejo. Eventos de log e eventos de despejo são enviados para a saída através do mesmo gravador.

Os dumps podem ser agendados a qualquer momento através da API para todas as tabelas, uma tabela ou para chaves primárias específicas da tabela. Um despejo da tabela é realizado por blocos de um determinado tamanho. Você também pode configurar um atraso no processamento de novos blocos, permitindo apenas eventos de log no momento. O tamanho e o atraso do bloco permitem equilibrar o processamento de eventos de log e dump. Ambas as configurações podem ser alteradas em tempo de execução.

Os blocos (bloco) são selecionados ordenando a tabela em ordem crescente da chave primária e selecionando as linhas em que a chave primária é maior que a última chave primária do bloco anterior. O banco de dados é necessário para executar essa consulta com eficiência, o que geralmente é aplicável aos sistemas que implementam uma varredura de intervalo em um intervalo de chaves primárias.


Figura 1. Divisão da tabela com 4 colunas c1-c4 e c1 como a chave primária (pk). A chave primária de um tipo inteiro, tamanho do bloco 3. O bloco 2 é selecionado pela condição c1> 4. Os

blocos devem ser tomados de forma a não atrasar o processamento dos eventos de log por um longo período e salvar o histórico de alterações para que a linha selecionada com o valor antigo não possa sobrescrever o novo. evento.

Para poder selecionar os blocos sequencialmente, no registro de alterações, criamos "marcas d'água" reconhecíveis. As marcas d'água são implementadas por meio de uma tabela no banco de dados de origem. Esta tabela é armazenada em um espaço de nome especial para que não haja conflitos com as tabelas de aplicativos. Apenas uma linha com um valor UUID é armazenada nela. Uma marca d'água é criada quando esse valor muda para um UUID específico. A atualização da linha leva a um evento de mudança, que finalmente recebemos através do log de alterações.

Os dumps com marca d'água são criados da seguinte maneira:

  1. Suspendemos temporariamente o processamento de eventos de log.
  2. Gere uma marca d'água "baixa" atualizando a tabela de marcas d'água.
  3. Iniciamos SELECT para o próximo bloco e salvamos na memória o resultado indexado pela chave primária.
  4. “” (high) , .
  5. . .
  6. , .
  7. , , .
  8. , 1.

SELECT deve retornar um estado que represente uma alteração confirmada até um ponto no histórico. Ou, equivalente ao seguinte: SELECT é executado em uma determinada posição do log de alterações, levando em consideração as alterações até este ponto. Os bancos de dados geralmente não fornecem informações sobre o tempo de execução de um SELECT (com exceção do MariaDB ).

A idéia principal de nossa abordagem é que, no log de alterações, definimos uma janela que garanta a preservação da posição SELECT no bloco. A janela é aberta escrevendo a marca d'água inferior, após a qual SELECT é executado, e a janela é fechada escrevendo a marca d'água superior. Como a posição exata do SELECT é desconhecida, todas as linhas selecionadas que conflitam com os eventos de log nesta janela são excluídas. Isso garante que não haja reescrita do histórico no log de alterações.

Para que isso funcione, SELECT deve ler o estado da tabela a partir do momento da marca d'água mais baixa ou mais tarde (é permitido incluir as alterações feitas após a marca d'água mais baixa e antes da leitura). Em geral, SELECT é necessário para ver as alterações feitas antes de ser executado.. Chamamos isso de "leituras não obsoletas". Além disso, como a marca d'água superior é escrita depois, é garantido que SELECT será executado antes dela.

As Figuras 2a e 2b ilustram o algoritmo de seleção de blocos. Como exemplo, fornecemos uma tabela com chaves primárias de k1 a k6. Cada entrada no log de alterações representa um evento de criação, atualização ou exclusão para a chave primária. A Figura 2a mostra a geração da marca d'água e a seleção de blocos (etapas 1 a 4). A atualização da tabela de marca d'água nas etapas 2 e 4 cria dois eventos de alteração (magenta), que são finalmente recebidos através do log. Na Figura 2b, focamos nas linhas do bloco atual que são removidas do conjunto de resultados com chaves primárias que aparecem entre as marcas d'água (etapas 5 a 7).


Figura 2a - Algoritmo de marca d'água para seleção de blocos (etapas 1 a 4).


Figura 2b - Algoritmo de marca d'água para seleção de blocos (etapas 5 a 7).

Observe que, entre as marcas d'água inferior e superior, um grande número de eventos pode aparecer no log se uma ou mais transações fizerem muitas alterações de linha. Por esse motivo, suspendemos a curto prazo o processamento do log nos estágios 2–4 para não perder as marcas d'água. Portanto, o processamento de eventos de log pode ser retomado evento por evento, o que permite detectar marcas d'água sem a necessidade de armazenar em cache os eventos de log do log. O processamento do log é suspenso apenas por um curto período de tempo, pois as etapas 2 a 4 devem ser rápidas: atualizar as marcas d'água é uma operação de gravação única e o SELECT é executado com uma restrição.

Assim que a marca d'água superior é recebida na etapa 7, as linhas não conflitantes do bloco são transmitidas para a saída na ordem em que foram recebidas. Esta é uma operação sem bloqueio, porque o gravador é executado em um encadeamento separado, o que permite retomar rapidamente o processamento do log após a etapa 7. Depois disso, o processamento do log continua para eventos que ocorrem após a marca d'água superior.

A Figura 2c mostra a ordem de gravação para todo o bloco usando o mesmo exemplo das figuras 2a e 2b. Os eventos no log que aparecem antes da marca d'água superior são registrados primeiro. Em seguida, as linhas restantes do bloco resultam (magenta). E, finalmente, os eventos que ocorrem após a marca d'água superior são registrados.


Figura 2c - A ordem de gravação da saída. Tira alternância com despejo.

Bancos de dados suportados


Para usar o DBLog, o banco de dados deve fornecer um log de alterações como um histórico linear de alterações confirmadas com leituras não obsoletas. Essas condições são atendidas por sistemas como MySQL, PostgreSQL, MariaDB, etc., portanto a estrutura pode ser usada da mesma maneira com esses bancos de dados.
Até agora, adicionamos suporte ao MySQL e PostgreSQL. Cada banco de dados usa suas próprias bibliotecas para receber eventos de log, pois cada um deles usa um protocolo proprietário. Para o MySQL, usamos shyiko / mysql-binlog-connector , que implementa o protocolo de replicação de binlog. Para o PostgreSQL, há slots de replicação com o plugin wal2json . As alterações são aceitas através do protocolo de replicação de streaming, que é implementado pelo driver jdbcPostgreSQL A definição do esquema para cada alteração capturada é diferente no MySQL e no PostgreSQL. No PostgreSQL, o wal2json contém nomes, tipos de colunas e valores. Para o MySQL, as mudanças de esquema devem ser rastreadas como eventos binlog.

O processamento do dump foi realizado usando SQL e JDBC, exigindo apenas a implementação da seleção de blocos e a atualização da marca d'água. Para MySQL e PostgreSQL, o mesmo código é usado, que pode ser usado para outros bancos de dados semelhantes. O processamento do dump em si não depende de SQL ou JDBC e permite usar bancos de dados que atendem aos requisitos do DBLog, mesmo que eles usem padrões diferentes.


Figura 3 - Arquitetura de alto nível do DBLog.

Alta disponibilidade


O DBLog usa uma arquitetura passiva ativa de nó único. Uma instância é ativa (principal), enquanto as outras são passivas (em espera). Para selecionar o host, usamos o Zookeeper. Uma concessão é usada para o nó principal, que deve ser atualizado periodicamente para continuar sendo o principal. Em caso de rescisão da renovação do arrendamento, as funções do líder são transferidas para outro nó. Atualmente, implantamos uma cópia para cada AZ (zona de disponibilidade, geralmente temos 3 AZ); portanto, se um AZ cair, a cópia em outro AZ poderá continuar processando com um tempo de inatividade total mínimo. As instâncias de backup podem estar localizadas em diferentes regiões, embora seja recomendável trabalhar na mesma região que o host do banco de dados para fornecer baixa latência para capturar alterações.

Uso na produção


DBLog é a base para os conectores MySQL e PostgreSQL usados ​​no Delta . Desde 2018, a Delta é usada na produção para sincronizar data warehouses e processamento de eventos em aplicativos de estúdio da Netflix. Os conectores Delta usam seu serializador de eventos. Fluxos específicos da Netflix, como Keystone, são usados ​​como saída .


Figura 4 - Delta Connector.

Além do Delta, o DBLog também é usado na Netflix para criar conectores para outras plataformas de movimentação de dados que possuem seus próprios formatos de dados.

Fique conosco


O DBLog possui recursos adicionais que não são abordados neste artigo, como:

  • Capacidade de obter esquemas de tabela sem usar bloqueios.
  • Integração com armazenamento de esquema. Para cada evento, o esquema é armazenado no armazenamento, cujo link é indicado na carga útil do evento.
  • Modo de gravação monotônica. Garantindo que, após salvar o estado de uma linha específica, seu estado passado não possa ser substituído. Assim, os consumidores recebem mudanças de estado apenas na direção para a frente, sem se mover para frente e para trás no tempo.

Planejamos abrir o código-fonte DBLog em 2020 e incluir documentação adicional nele.

Agradecimentos


Gostaríamos de agradecer às seguintes pessoas por contribuírem para o desenvolvimento do DBLog: Josh Snyder , Raghuram Onti Srinivasan , Tharanga Gamaethige e Yun Wang .

Referências


[1] Das, Shirshanka, et al. “Todos a bordo do Databus!: Plataforma consistente e escalável de captura de dados de alterações do Linkedin.” Terceiro Simpósio da ACM sobre Computação em Nuvem. ACM, 2012
[2] “Sobre o Change Data Capture (SQL Server)” , documentos do Microsoft SQL, 2019
[3] Kleppmann, Martin, “Usando logs para criar uma infraestrutura de dados sólida (ou: por que as gravações duplas são uma má ideia)” , Confluent, 2015
[4] Kleppmann, Martin, Alastair R. Beresford, Boerge Svingen. "Processamento de eventos online." Comunicações do ACM 62.5 (2019): 43–49
[5] https://debezium.io/documentation/reference/0.10/connectors/mysql.html#snapshots


Saiba mais sobre o curso.

All Articles