DBA: competently organize synchronization and imports

With complex processing of large data sets (different ETL processes : imports, conversions, and synchronization with an external source), it is often necessary to temporarily “remember” and immediately process something voluminous.

A typical task of this kind usually sounds something like this: “Here the accounting department uploaded the last received payments from the client bank , we need to quickly upload them to the website and link them to the accounts”

But when the volume of this “something” begins to be measured in hundreds of megabytes, and the service This should continue to work with the base in 24x7 mode, there are many side effects that will ruin your life.

To cope with them in PostgreSQL (and not only in it), you can use some optimization options that will allow you to process faster and with less resources.

1. Where to ship?


First, let's decide where we can upload the data that we want to “process”.

1.1. Temporary Tables (TEMPORARY TABLE)


In principle, for PostgreSQL, temporary ones are the same tables as any others. Therefore, superstitions like “everything is stored there only in memory, but it can end” are incorrect . But there are several significant differences.

Own namespace for each connection to the database


If two connections try to make at the same time CREATE TABLE x, then someone will definitely get an error of non-unique DB objects.

But if both try to execute , then both will normally do it, and each will receive its own copy of the table. And there will be nothing in common between them.CREATE TEMPORARY TABLE x

"Self-destruction" with disconnect


When you close the connection, all temporary tables are automatically deleted, so DROP TABLE xthere is no sense in “manually” executing , except ...

If you work through pgbouncer in transaction mode , the database continues to assume that this connection is still active, and this temporary the table still exists.

Therefore, an attempt to recreate it, from another connection to pgbouncer, will result in an error. But this can be circumvented by taking advantage . True, it’s better not to do it all the same, because then you can “suddenly” find out the data left from the “previous owner” there. Instead, it’s much better to read the manual, and see that when creating the table there is an opportunity to addCREATE TEMPORARY TABLE IF NOT EXISTS x

ON COMMIT DROP - that is, when the transaction is completed, the table will be automatically deleted.

Non-replication


Because only a particular join belongs, temporary tables are not replicated. But this eliminates the need to double-write data in heap + WAL, so INSERT / UPDATE / DELETE is much faster in it.

But since the temporary table is still an “almost ordinary” table, it cannot be created on the replica either. At least for now, although the corresponding patch has been around for a long time.

1.2. Unlogged tables (UNLOGGED TABLE)


But what to do, for example, if you have some cumbersome ETL process that cannot be implemented within a single transaction, and you still have pgbouncer in transaction mode ? ..

Or the data stream is so large that there is not enough bandwidth per connection from the database (read, one process on the CPU)? ..

Or do some of the operations go asynchronously in different connections? ..

There is only one option - to temporarily create a non-temporary table . Pun, yeah. I.e:

  • created “his” tables with maximally random names so as not to cross with anyone
  • Extract : poured data from an external source into them
  • Transform : transformed, filled in key binding fields
  • Load : poured finished data into target tables
  • deleted "my" tables

And now - a fly in the ointment. In fact, all the writing in PostgreSQL happens twice - first in the WAL , then in the body of the table / index. All of this is done to support ACID and the correct visibility of data between COMMITnested and ROLLBACKnested transactions.

But we don’t need this! We have the whole process or successfully passed, or not . It doesn’t matter how many intermediate transactions it contains - we are not interested in “continuing the process from the middle”, especially when it is not clear where it was.

To do this, the PostgreSQL developers introduced version 9.1 such as non-journaled (UNLOGGED) tables :
. , , (. 29), . , ; . , . , , .
In short, it will be much faster , but if the database server "crashes" - it will be unpleasant. But how often does this happen, and does your ETL process know how to correctly modify it “from the middle” after the “revitalization” of the database? ..

If not, and the case above is similar to yours - use UNLOGGED, but never include this attribute on real tables data from which you are dear.

1.3. ON COMMIT {DELETE ROWS | DROP}


This design allows when creating a table to set automatic behavior when the transaction ends.

About I already wrote above, it generates , but the situation is more interesting - here it is generated . Since the entire infrastructure for storing the meta description of the temporary table is exactly the same as the usual one, the constant creation and deletion of temporary tables leads to a strong "swelling" of the system tables pg_class, pg_attribute, pg_attrdef, pg_depend, ... Now imagine that you have a worker on the line connecting to the database, which every second opens a new transaction, creates, fills, processes and deletes the temporary table ... Garbage in the system tables will accumulate in excess, and this is extra brakes during each operation.ON COMMIT DROPDROP TABLEON COMMIT DELETE ROWSTRUNCATE TABLE





In general, do not! In this case, it’s much more efficient CREATE TEMPORARY TABLE x ... ON COMMIT DELETE ROWSto take it out of the transaction cycle - then by the beginning of each new transaction the table will already exist (save the call CREATE), but it will be empty , thanks to TRUNCATE(we saved the call too) at the end of the previous transaction.

1.4. LIKE ... INCLUDING ...


I mentioned at the beginning that one of the typical use cases for temporary tables is various kinds of imports - and the developer tiredly copy-paste the list of fields of the target table into the declaration of his temporary ...

But laziness is the engine of progress! Therefore, creating a new table “on the model” can be much simpler:

CREATE TEMPORARY TABLE import_table(
  LIKE target_table
);

Since you can then add a lot of data to this table, searches on it will never be quick. But there is a traditional solution against this - indexes! And, yes, a temporary table can also have indexes .

Since, often, the desired indices coincide with the indices of the target table, you can simply write . If you also need -values ​​(for example, to fill in the primary key values), you can use . Well, or just - - it will copy defaults, indexes, constraints ... But here you need to understand that if you created an import table right away with indexes, then the data will be filled in longerLIKE target_table INCLUDING INDEXES

DEFAULTLIKE target_table INCLUDING DEFAULTSLIKE target_table INCLUDING ALL

than if you fill everything first, and then roll the indices - look as an example of how pg_dump does it .

All in all, RTFM !

2. How to write?


I will say simply - use COPY-stream instead of "packs" INSERT, acceleration at times . You can even directly from a pre-generated file.

3. How to handle?


So, let our introductory look something like this:

  • you have in your database a plate with client data for 1M records
  • every day the client sends you a new complete “image”
  • from experience you know that no more than 10K records change from time to time

A classic example of such a situation is the KLADR database - there are a lot of addresses, but in each weekly uploading of changes (renaming of settlements, street associations, the appearance of new houses) there are very few, even nationwide.

3.1. Full synchronization algorithm


For simplicity, let’s say that you don’t even need to restructure the data - just bring the table in the right form, that is:

  • delete everything that is no longer
  • update everything that was already, and you need to update
  • insert everything that has not been

Why in this order it is worth doing operations? Because this is how the size of the table grows minimally ( remember about MVCC! ).

DELETE FROM dst


No, of course, you can do just two operations:

  • delete ( DELETE) at all
  • paste everything from a new image

But at the same time, thanks to MVCC, the size of the table will increase exactly twice ! Get + 1M image records in the table due to 10K update - so-so redundancy ...

TRUNCATE dst


A more experienced developer knows that the whole plate can be cleaned quite cheaply:

  • clear ( TRUNCATE) the whole table
  • paste everything from a new image

The method is effective, sometimes it’s quite applicable , but there is a problem ... We will inject 1M records, so we cannot afford to leave the table empty for all this time (as will happen without wrapping in a single transaction).

Which means:

  • we start a long transaction
  • TRUNCATEimposes AccessExclusive -Lock
  • we do the insert for a long time, and everyone else at this time cannot evenSELECT

Something is bad ...

ALTER TABLE ... RENAME ... / DROP TABLE ...


As an option, fill everything into a separate new table, and then simply rename it to the old one. A couple of nasty little things:

  • AccessExclusive too , albeit substantially less in time
  • all query plans / statistics of this table are reset, it is necessary to drive ANALYZE
  • all foreign keys (FK) break on the table

There was a WIP patch from Simon Riggs, which suggested doing an ALTERoperation to replace the table body at the file level, without touching the statistics and FK, but did not collect the quorum.

DELETE, UPDATE, INSERT


So, we stop on a non-blocking version of three operations. Almost three ... How to do this most effectively?

--     ,     "" 
BEGIN;

--      
CREATE TEMPORARY TABLE tmp(
  LIKE dst INCLUDING INDEXES --    ,   
) ON COMMIT DROP; --       

-- -     COPY
COPY tmp FROM STDIN;
-- ...
-- \.

--  
DELETE FROM
  dst D
USING
  dst X
LEFT JOIN
  tmp Y
    USING(pk1, pk2) --   
WHERE
  (D.pk1, D.pk2) = (X.pk1, X.pk2) AND
  Y IS NOT DISTINCT FROM NULL; -- ""

--  
UPDATE
  dst D
SET
  (f1, f2, f3) = (T.f1, T.f2, T.f3)
FROM
  tmp T
WHERE
  (D.pk1, D.pk2) = (T.pk1, T.pk2) AND
  (D.f1, D.f2, D.f3) IS DISTINCT FROM (T.f1, T.f2, T.f3); --   

--  
INSERT INTO
  dst
SELECT
  T.*
FROM
  tmp T
LEFT JOIN
  dst D
    USING(pk1, pk2)
WHERE
  D IS NOT DISTINCT FROM NULL;

COMMIT;

3.2. Import post processing


In the same KLADER, all changed records must be additionally run through post-processing - normalize, highlight keywords, and bring to the necessary structures. But how do you know what exactly has changed , without complicating the synchronization code, ideally without touching it at all?

If only your process has write access at the time of synchronization, then you can use a trigger that will collect all the changes for us:

--  
CREATE TABLE kladr(...);
CREATE TABLE kladr_house(...);

--    
CREATE TABLE kladr$log(
  ro kladr, --      /
  rn kladr
);

CREATE TABLE kladr_house$log(
  ro kladr_house,
  rn kladr_house
);

--    
CREATE OR REPLACE FUNCTION diff$log() RETURNS trigger AS $$
DECLARE
  dst varchar = TG_TABLE_NAME || '$log';
  stmt text = '';
BEGIN
  --      
  IF TG_OP = 'UPDATE' THEN
    IF NEW IS NOT DISTINCT FROM OLD THEN
      RETURN NEW;
    END IF;
  END IF;
  --   
  stmt = 'INSERT INTO ' || dst::text || '(ro,rn)VALUES(';
  CASE TG_OP
    WHEN 'INSERT' THEN
      EXECUTE stmt || 'NULL,$1)' USING NEW;
    WHEN 'UPDATE' THEN
      EXECUTE stmt || '$1,$2)' USING OLD, NEW;
    WHEN 'DELETE' THEN
      EXECUTE stmt || '$1,NULL)' USING OLD;
  END CASE;
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

Now we can impose triggers (or enable through ALTER TABLE ... ENABLE TRIGGER ...) before starting synchronization :

CREATE TRIGGER log
  AFTER INSERT OR UPDATE OR DELETE
  ON kladr
    FOR EACH ROW
      EXECUTE PROCEDURE diff$log();

CREATE TRIGGER log
  AFTER INSERT OR UPDATE OR DELETE
  ON kladr_house
    FOR EACH ROW
      EXECUTE PROCEDURE diff$log();

And then quietly from the log tables we extract all the changes we need and run it through additional handlers.

3.3. Import related sets


Above, we considered cases when the data structures of the source and receiver coincide. But what if the unloading from an external system has a format different from the storage structure in our database?

Take the storage of customers and their accounts as an example, the classic many-to-one option:

CREATE TABLE client(
  client_id
    serial
      PRIMARY KEY
, inn
    varchar
      UNIQUE
, name
    varchar
);

CREATE TABLE invoice(
  invoice_id
    serial
      PRIMARY KEY
, client_id
    integer
      REFERENCES client(client_id)
, number
    varchar
, dt
    date
, sum
    numeric(32,2)
);

But unloading from an external source comes to us in the form of "all in one":

CREATE TEMPORARY TABLE invoice_import(
  client_inn
    varchar
, client_name
    varchar
, invoice_number
    varchar
, invoice_dt
    date
, invoice_sum
    numeric(32,2)
);

Obviously, customer data can be duplicated in this way, and the main record is the “account”:

0123456789;;A-01;2020-03-16;1000.00
9876543210;;A-02;2020-03-16;666.00
0123456789;;B-03;2020-03-16;9999.00

For the model, just insert our test data, but remember - COPYmore efficiently!

INSERT INTO invoice_import
VALUES
  ('0123456789', '', 'A-01', '2020-03-16', 1000.00)
, ('9876543210', '', 'A-02', '2020-03-16', 666.00)
, ('0123456789', '', 'B-03', '2020-03-16', 9999.00);

First, we select those “cuts” to which our “facts” refer. In our case, accounts refer to customers:

CREATE TEMPORARY TABLE client_import AS
SELECT DISTINCT ON(client_inn)
--   SELECT DISTINCT,    
  client_inn inn
, client_name "name"
FROM
  invoice_import;

In order to correctly associate accounts with customer IDs, we need to first find out or generate these identifiers. Add fields for them:

ALTER TABLE invoice_import ADD COLUMN client_id integer;
ALTER TABLE client_import ADD COLUMN client_id integer;

We will use the method of synchronizing tables with the small correction described above - we will not update or delete anything in the target table, because importing clients is “append-only”:

--     ID   
UPDATE
  client_import T
SET
  client_id = D.client_id
FROM
  client D
WHERE
  T.inn = D.inn; -- unique key

--       ID
WITH ins AS (
  INSERT INTO client(
    inn
  , name
  )
  SELECT
    inn
  , name
  FROM
    client_import
  WHERE
    client_id IS NULL --  ID  
  RETURNING *
)
UPDATE
  client_import T
SET
  client_id = D.client_id
FROM
  ins D
WHERE
  T.inn = D.inn; -- unique key

--  ID    
UPDATE
  invoice_import T
SET
  client_id = D.client_id
FROM
  client_import D
WHERE
  T.client_inn = D.inn; --  

Actually, everything - in invoice_importnow we have filled in the communication field client_idwith which we will insert the account.

All Articles