DBA: organize competentemente a sincronização e as importações

Com o processamento complexo de grandes conjuntos de dados (diferentes processos ETL : importações, conversões e sincronização com uma fonte externa), geralmente é necessário "lembrar" temporariamente e processar imediatamente algo volumoso.

Uma tarefa típica desse tipo geralmente soa algo como isto: "Aqui, o departamento de contabilidade carregou os últimos pagamentos recebidos do banco do cliente , precisamos enviá-los rapidamente para o site e vinculá-los às contas".

Mas quando o volume desse "algo" começa a ser medido em centenas de megabytes, e o serviço Isso deve continuar a funcionar com a base no modo 24x7, existem muitos efeitos colaterais que arruinarão sua vida.

Para lidar com eles no PostgreSQL (e não apenas nele), você pode usar algumas opções de otimização que permitirão processar mais rapidamente e com menos recursos.

1. Para onde enviar?


Primeiro, vamos decidir onde podemos fazer o upload dos dados que queremos “processar”.

1.1 Tabelas temporárias (TABELA TEMPORÁRIA)


Em princípio, para o PostgreSQL, as temporárias são as mesmas tabelas que as outras. Portanto, superstições como "tudo é armazenado apenas na memória, mas pode terminar" estão incorretas . Mas existem várias diferenças significativas.

Espaço de nome próprio para cada conexão com o banco de dados


Se duas conexões tentarem fazer ao mesmo tempo CREATE TABLE x, alguém definitivamente obterá um erro de objetos de banco de dados não exclusivos .

Mas se ambos tentam executar , normalmente ambos o fazem e cada um recebe sua própria cópia da tabela. E não haverá nada em comum entre eles.CREATE TEMPORARY TABLE x

"Autodestruição" com desconexão


Quando você fecha a conexão, todas as tabelas temporárias são excluídas automaticamente; portanto, DROP TABLE xnão faz sentido executar manualmente , exceto ...

Se você trabalha com o pgbouncer no modo de transação , o banco de dados continua assumindo que essa conexão ainda está ativa e temporária. a tabela ainda existe.

Portanto, uma tentativa de recriá-lo, de outra conexão com o pgbouncer, resultará em um erro. Mas isso pode ser contornado aproveitando-se . É verdade que é melhor não fazer a mesma coisa, porque então você pode "repentinamente" descobrir os dados deixados pelo "proprietário anterior". Em vez disso, é muito melhor ler o manual e ver que, ao criar a tabela, há uma oportunidade de adicionarCREATE TEMPORARY TABLE IF NOT EXISTS x

ON COMMIT DROP - isto é, quando a transação for concluída, a tabela será excluída automaticamente.

Não replicação


Como apenas uma junção específica pertence, as tabelas temporárias não são replicadas. Mas isso elimina a necessidade de gravar dados duplicados no heap + WAL, de modo que INSERT / UPDATE / DELETE é muito mais rápido.

Mas como a tabela temporária ainda é uma tabela "quase comum", também não pode ser criada na réplica. Pelo menos por enquanto, embora o patch correspondente já exista há muito tempo.

1.2 Tabelas não registradas (UNLOGGED TABLE)


Mas o que fazer, por exemplo, se você tiver algum tipo de processo ETL complicado que não pode ser implementado em uma única transação e ainda tiver o pgbouncer no modo de transação ? ..

Ou o fluxo de dados é tão grande que não há largura de banda suficiente por conexão do banco de dados (leitura, um processo na CPU)? ..

Ou algumas das operações ocorrem de forma assíncrona em conexões diferentes? ..

Existe apenas uma opção: criar temporariamente uma tabela não temporária . Chalaça, sim. Ou seja:

  • criou tabelas "his" com nomes maximamente aleatórios para não cruzar com ninguém
  • Extrair : derramou dados de uma fonte externa para eles
  • Transformação : transformada, preenchida em campos de ligação de chave
  • Carregar : derramou os dados finais nas tabelas de destino
  • tabelas "minhas" excluídas

E agora - uma mosca na pomada. De fato, toda a escrita no PostgreSQL acontece duas vezes - primeiro no WAL , depois no corpo da tabela / índice. Tudo isso é feito para oferecer suporte ao ACID e à visibilidade correta dos dados entre transações aninhadas COMMITe ROLLBACKaninhadas.

Mas não precisamos disso! Temos todo o processo ou passamos com sucesso, ou não . Não importa quantas transações intermediárias existam, não estamos interessados ​​em “continuar o processo do meio”, especialmente quando não está claro onde estava.

Para isso, os desenvolvedores do PostgreSQL introduziram a versão 9.1, como tabelas não registradas em diário (UNLOGGED) :
. , , (. 29), . , ; . , . , , .
Em suma, será muito mais rápido , mas se o servidor de banco de dados "travar" - será desagradável. Mas com que frequência isso acontece e seu processo ETL sabe como modificá-lo corretamente "do meio" após a "revitalização" do banco de dados? ..

Caso contrário, o caso acima é semelhante ao seu - use UNLOGGED, mas nunca inclua esse atributo em tabelas reais dados dos quais você é querido.

1.3 EM COMPROMISSO {EXCLUIR LINHAS | SOLTA}


Esse design permite ao criar uma tabela para definir o comportamento automático quando a transação termina.

Sobre o que eu já escrevi acima, gera , mas a situação é mais interessante - aqui é gerada . Como toda a infraestrutura para armazenar a meta descrição da tabela temporária é exatamente a mesma que a usual, a criação e exclusão constantes de tabelas temporárias levam a um forte "aumento" das tabelas do sistema pg_class, pg_attribute, pg_attrdef, pg_depend, ... Agora, imagine que você tem um trabalhador na linha conectar-se ao banco de dados, que a cada segundo abre uma nova transação, cria, preenche, processa e exclui a tabela temporária ... O lixo nas tabelas do sistema se acumulará em excesso, e isso é um freio extra a cada operação.ON COMMIT DROPDROP TABLEON COMMIT DELETE ROWSTRUNCATE TABLE





Em geral, não! Nesse caso, é muito mais eficiente CREATE TEMPORARY TABLE x ... ON COMMIT DELETE ROWSretirá-lo do ciclo da transação - então, no início de cada nova transação, a tabela já existirá (salve a chamada CREATE), mas ficará vazia , graças a TRUNCATE(também salvamos a ligação) no final da transação anterior.

1.4 COMO ... INCLUINDO ...


Mencionei no início que um dos casos de uso típicos de tabelas temporárias são vários tipos de importações - e o desenvolvedor copia e cola a lista de campos da tabela de destino na declaração de seu arquivo temporário ...

Mas a preguiça é o mecanismo do progresso! Portanto, criar uma nova tabela "no modelo" pode ser muito mais simples:

CREATE TEMPORARY TABLE import_table(
  LIKE target_table
);

Como você pode adicionar muitos dados a essa tabela, as pesquisas nela nunca serão rápidas. Mas existe uma solução tradicional contra isso - índices! E, sim, uma tabela temporária também pode ter índices .

Como, frequentemente, os índices desejados coincidem com os da tabela de destino, você pode simplesmente escrever . Se você também precisar de -values ​​(por exemplo, para preencher os valores da chave primária), poderá usar . Bem, ou simplesmente - - ele copiará padrões, índices, restrições ... Mas aqui você precisa entender que, se você criou uma tabela de importação imediatamente com índices, os dados serão preenchidos por mais tempoLIKE target_table INCLUDING INDEXES

DEFAULTLIKE target_table INCLUDING DEFAULTSLIKE target_table INCLUDING ALL

do que se você preencher tudo primeiro e depois rolar os índices - veja como um exemplo de como o pg_dump faz isso .

Em suma , RTFM !

2. Como escrever?


Vou dizer simplesmente - use COPY-stream em vez de "pacotes" INSERT, aceleração às vezes . Você pode até diretamente de um arquivo pré-gerado.

3. Como lidar com isso?


Então, deixe nossa introdução parecer algo como isto:

  • você tem em seu banco de dados uma placa com dados do cliente para registros de 1 milhão
  • todos os dias o cliente envia uma nova "imagem" completa
  • por experiência, você sabe que não mais que 10 mil registros mudam de tempos em tempos

Um exemplo clássico de tal situação é o banco de dados da KLADR - existem muitos endereços, mas em cada upload semanal de alterações (renomeação de assentamentos, associações de rua, aparecimento de novas casas), existem muito poucos em todo o país.

3.1 Algoritmo de sincronização total


Para simplificar, digamos que você nem precise reestruturar os dados - basta trazer a tabela da forma correta, ou seja:

  • excluir tudo o que não é mais
  • atualizar tudo o que já estava, e você precisa atualizar
  • insira tudo o que não foi

Por que nessa ordem vale a pena fazer operações? Porque é assim que o tamanho da tabela aumenta minimamente ( lembre-se do MVCC! ).

EXCLUIR DO dst


Não, é claro, você pode fazer apenas duas operações:

  • delete ( DELETE)
  • cole tudo de uma nova imagem

Mas, ao mesmo tempo, graças ao MVCC, o tamanho da tabela aumentará exatamente duas vezes ! Obtenha + 1 milhão de registros de imagem na tabela devido a uma atualização de 10 K - uma redundância tão ...

TRUNCATE dst


Um desenvolvedor mais experiente sabe que todo o prato pode ser limpo de maneira barata:

  • clear ( TRUNCATE) a tabela inteira
  • cole tudo de uma nova imagem

O método é eficaz, às vezes é bastante aplicável , mas há um problema ... Injetaremos 1 milhão de registros, por isso não podemos deixar a tabela em branco por todo esse tempo (como acontecerá sem envolver uma única transação).

Que significa:

  • começamos uma transação longa
  • TRUNCATEimpõe AccessExclusive -Lock
  • fazemos a inserção por um longo tempo, e todos os outros neste momento não podem nemSELECT

Algo está ruim ...

ALTER TABLE ... RENOMEAR ... / DROP TABLE ...


Como opção, preencha tudo em uma nova tabela separada e simplesmente renomeie-a para a antiga. Algumas pequenas coisas desagradáveis:

  • AccessExclusive também , embora substancialmente menos no tempo
  • todos os planos / estatísticas de consulta desta tabela são redefinidos, é necessário conduzir ANALYZE
  • todas as chaves estrangeiras (FK) quebram na mesa

Havia um patch WIP de Simon Riggs, que sugeria executar uma ALTERoperação para substituir o corpo da tabela no nível do arquivo, sem tocar nas estatísticas e no FK, mas não coletava o quorum.

EXCLUIR, ATUALIZAR, INSERIR


Portanto, paramos em uma versão sem bloqueio de três operações. Quase três ... Como fazer isso de maneira mais eficaz?

--     ,     "" 
BEGIN;

--      
CREATE TEMPORARY TABLE tmp(
  LIKE dst INCLUDING INDEXES --    ,   
) ON COMMIT DROP; --       

-- -     COPY
COPY tmp FROM STDIN;
-- ...
-- \.

--  
DELETE FROM
  dst D
USING
  dst X
LEFT JOIN
  tmp Y
    USING(pk1, pk2) --   
WHERE
  (D.pk1, D.pk2) = (X.pk1, X.pk2) AND
  Y IS NOT DISTINCT FROM NULL; -- ""

--  
UPDATE
  dst D
SET
  (f1, f2, f3) = (T.f1, T.f2, T.f3)
FROM
  tmp T
WHERE
  (D.pk1, D.pk2) = (T.pk1, T.pk2) AND
  (D.f1, D.f2, D.f3) IS DISTINCT FROM (T.f1, T.f2, T.f3); --   

--  
INSERT INTO
  dst
SELECT
  T.*
FROM
  tmp T
LEFT JOIN
  dst D
    USING(pk1, pk2)
WHERE
  D IS NOT DISTINCT FROM NULL;

COMMIT;

3.2 Importar pós-processamento


No mesmo KLADRE, todos os registros alterados devem ser executados adicionalmente através do pós-processamento - normalizar, destacar palavras-chave e trazer para as estruturas necessárias. Mas como você sabe exatamente o que mudou , sem complicar o código de sincronização, idealmente sem tocá-lo?

Se apenas seu processo tiver acesso de gravação no momento da sincronização, você poderá usar um gatilho que coletará todas as alterações para nós:

--  
CREATE TABLE kladr(...);
CREATE TABLE kladr_house(...);

--    
CREATE TABLE kladr$log(
  ro kladr, --      /
  rn kladr
);

CREATE TABLE kladr_house$log(
  ro kladr_house,
  rn kladr_house
);

--    
CREATE OR REPLACE FUNCTION diff$log() RETURNS trigger AS $$
DECLARE
  dst varchar = TG_TABLE_NAME || '$log';
  stmt text = '';
BEGIN
  --      
  IF TG_OP = 'UPDATE' THEN
    IF NEW IS NOT DISTINCT FROM OLD THEN
      RETURN NEW;
    END IF;
  END IF;
  --   
  stmt = 'INSERT INTO ' || dst::text || '(ro,rn)VALUES(';
  CASE TG_OP
    WHEN 'INSERT' THEN
      EXECUTE stmt || 'NULL,$1)' USING NEW;
    WHEN 'UPDATE' THEN
      EXECUTE stmt || '$1,$2)' USING OLD, NEW;
    WHEN 'DELETE' THEN
      EXECUTE stmt || '$1,NULL)' USING OLD;
  END CASE;
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

Agora podemos impor gatilhos (ou ativar ALTER TABLE ... ENABLE TRIGGER ...) antes de iniciar a sincronização :

CREATE TRIGGER log
  AFTER INSERT OR UPDATE OR DELETE
  ON kladr
    FOR EACH ROW
      EXECUTE PROCEDURE diff$log();

CREATE TRIGGER log
  AFTER INSERT OR UPDATE OR DELETE
  ON kladr_house
    FOR EACH ROW
      EXECUTE PROCEDURE diff$log();

E então silenciosamente das tabelas de log, extraímos todas as alterações necessárias e as executamos por meio de manipuladores adicionais.

3.3 Importar conjuntos relacionados


Acima, consideramos casos em que as estruturas de dados da fonte e do receptor coincidem. Mas e se a descarga de um sistema externo tiver um formato diferente da estrutura de armazenamento em nosso banco de dados?

Tome como exemplo o armazenamento de clientes e suas contas, a opção clássica de muitos para um:

CREATE TABLE client(
  client_id
    serial
      PRIMARY KEY
, inn
    varchar
      UNIQUE
, name
    varchar
);

CREATE TABLE invoice(
  invoice_id
    serial
      PRIMARY KEY
, client_id
    integer
      REFERENCES client(client_id)
, number
    varchar
, dt
    date
, sum
    numeric(32,2)
);

Mas o descarregamento de uma fonte externa chega até nós na forma de "tudo em um":

CREATE TEMPORARY TABLE invoice_import(
  client_inn
    varchar
, client_name
    varchar
, invoice_number
    varchar
, invoice_dt
    date
, invoice_sum
    numeric(32,2)
);

Obviamente, os dados do cliente podem ser duplicados dessa maneira, e o registro principal é a "conta":

0123456789;;A-01;2020-03-16;1000.00
9876543210;;A-02;2020-03-16;666.00
0123456789;;B-03;2020-03-16;9999.00

Para o modelo, basta inserir nossos dados de teste, mas lembre-se - com COPYmais eficiência!

INSERT INTO invoice_import
VALUES
  ('0123456789', '', 'A-01', '2020-03-16', 1000.00)
, ('9876543210', '', 'A-02', '2020-03-16', 666.00)
, ('0123456789', '', 'B-03', '2020-03-16', 9999.00);

Primeiro, selecionamos os "cortes" aos quais nossos "fatos" se referem. No nosso caso, as contas se referem aos clientes:

CREATE TEMPORARY TABLE client_import AS
SELECT DISTINCT ON(client_inn)
--   SELECT DISTINCT,    
  client_inn inn
, client_name "name"
FROM
  invoice_import;

Para associar corretamente as contas aos IDs dos clientes, precisamos primeiro descobrir ou gerar esses identificadores. Adicione campos para eles:

ALTER TABLE invoice_import ADD COLUMN client_id integer;
ALTER TABLE client_import ADD COLUMN client_id integer;

Usaremos o método de sincronização de tabelas descrito acima com uma pequena correção - não atualizaremos ou excluiremos nada da tabela de destino, porque a importação de clientes é "somente anexar":

--     ID   
UPDATE
  client_import T
SET
  client_id = D.client_id
FROM
  client D
WHERE
  T.inn = D.inn; -- unique key

--       ID
WITH ins AS (
  INSERT INTO client(
    inn
  , name
  )
  SELECT
    inn
  , name
  FROM
    client_import
  WHERE
    client_id IS NULL --  ID  
  RETURNING *
)
UPDATE
  client_import T
SET
  client_id = D.client_id
FROM
  ins D
WHERE
  T.inn = D.inn; -- unique key

--  ID    
UPDATE
  invoice_import T
SET
  client_id = D.client_id
FROM
  client_import D
WHERE
  T.client_inn = D.inn; --  

Na verdade, tudo - invoice_importagora preenchemos o campo de comunicação client_idcom o qual inseriremos a conta.

All Articles