DBA:负责组织同步和导入

随着大型数据集的复杂处理(不同的ETL过程:导入,转换和与外部源的同步),通常有必要临时“记住”并立即处理大量的事情。

这种典型的任务通常听起来像这样:会计部门在这里上载了从客户银行最后收到的付款,我们需要快速将它们上载到网站并将其链接到帐户”。

但是当这种“某物”的容量开始以数百兆字节为单位进行测量时,该服务这应该可以在24x7模式下继续使用底座,但有许多副作用会破坏您的生活。

为了在PostgreSQL中(而且不仅是在其中)应对它们,您可以使用一些优化选项,这些选项将使您能够更快地处理并且使用更少的资源。

1.在哪里发货?


首先,让我们决定在哪里可以上传我们要“处理”的数据。

1.1。临时表(TEMPORARY TABLE)


原则上,对于PostgreSQL,临时表与其他表相同。因此,诸如“所有内容仅存储在内存中,但它可以结束”之类的迷信是不正确的但是有几个明显的不同。

到数据库的每个连接都有自己的名称空间


如果两个连接试图同时建立CREATE TABLE x,那么肯定有人会收到非唯一 DB对象错误

但是,如果两个都尝试执行,那么通常两个都可以执行,并且每个都将收到自己的副本他们之间没有任何共同点。CREATE TEMPORARY TABLE x

断开连接的“自毁”


当您关闭连接时,所有临时表都会自动删除,因此“手动”执行DROP TABLE x没有任何意义,除非...

如果您在事务模式下通过pgbouncer进行工作,数据库将继续假定该连接仍然处于活动状态,并且该临时表该表仍然存在。

因此,尝试从另一个连接到pgbouncer重新创建它会导致错误。但这可以通过利用来规避 没错,最好不要这样做,因为这样您就可以“突然”找出“前任所有者”留下的数据。相反,最好阅读手册,并确保在创建表时有机会添加CREATE TEMPORARY TABLE IF NOT EXISTS x

ON COMMIT DROP -也就是说,交易完成后,表格将被自动删除。

非复制


由于仅属于特定联接,因此不会复制临时表。但这消除了在堆+ WAL中双重写入数据的需要,因此INSERT / UPDATE / DELETE的速度要快得多。

但是由于临时表仍然是“几乎普通的”表,因此也不能在副本上创建它。至少到目前为止,尽管相应的补丁已经存在很长时间了。

1.2。未记录表(UNLOGGED TABLE)


但是,例如,如果您有某种繁琐的ETL流程无法在单个事务中实现,而您仍然有pgbouncer处于事务模式,该怎么办?..

或数据流太大,以至于每个连接没有足够的带宽从数据库(读取一个CPU上的进程)?..

还是某些操作在不同的连接中异步进行?..

只有一个选择- 临时创建一个非临时表潘,是的 即:

  • 使用最大随机名称创建“他”表,以免与任何人交叉
  • 提取:将来自外部源的数据倒入其中
  • 转换:转换后,填写关键绑定字段
  • 加载:将完成的数据倒入目标表
  • 删除“我的”表

而现在-美中不足的苍蝇。实际上,PostgreSQL中的所有编写都发生两次 - 首先在WAL中,然后在表/索引的主体中。所有这些都是为了支持ACID COMMIT以及ROLLBACK嵌套事务嵌套事务之间数据的正确可见性

但是我们不需要这个!我们是否已通过或未成功通过整个过程它包含多少中间事务都没关系-我们对“从中间继续进行过程”不感兴趣,尤其是当不清楚它在哪里时。

为此,PostgreSQL开发人员引入了9.1版,例如非新闻稿(UNLOGGED)表
. , , (. 29), . , ; . , . , , .
简而言之,它将更快,但是如果数据库服务器“崩溃”,那将是令人不快的。但如何经常会出现这种情况,与该数据库的“振兴规划”后,确实你的ETL过程中知道如何正确地修改它“从中间” ..?

如果不是,和上面的情况类似,你的-使用UNLOGGED,但从来没有包括在真实的表这个属性亲爱的数据。

1.3。提交{删除行| 下降}


这种设计允许在创建表时设置事务结束时的自动行为。

关于我在上面已经写过的信息,它会生成,但是情况更有趣-在这里它生成了 由于用于存储临时表的meta描述整个基础设施是完全一样平常之一,不断创建和删除临时表导致的一个强大的系统表的“肿胀”的pg_class,pg_attribute里,pg_attrdef,pg_depend,...... 现在,假设你有一个工人就行了连接到数据库,每秒打开一个新事务,创建,填充,处理和删除临时表...系统表中的垃圾会累积过多,这在每次操作过程中都是多余的刹车。ON COMMIT DROPDROP TABLEON COMMIT DELETE ROWSTRUNCATE TABLE





一般来说,不要!在这种情况下,CREATE TEMPORARY TABLE x ... ON COMMIT DELETE ROWS将其从事务周期中删除会更加有效-然后在每个新事务开始之前,该表已经存在(保存调用CREATE),但由于在上一个事务结束时(我们也保存了调用),该表将为TRUNCATE

1.4。喜欢...包括...


我在一开始就提到临时表的典型用例之一是各种导入-开发人员很累地将目标表的字段列表复制粘贴到他的临时声明中……

但是懒惰是进步的动力!因此,“在模型上”创建新表可能会更加简单:

CREATE TEMPORARY TABLE import_table(
  LIKE target_table
);

由于您可以在此表中添加大量数据,因此对其进行搜索将永远不会很快。但是有一个针对此的传统解决方案-索引!而且,是的,临时表也可以具有index

由于所需的索引通常与目标表的索引一致,因此您只需编写即可 如果还需要-values(例如,填写主键值),则可以使用。好吧,或者干脆- 它会复制默认值,索引,约束... 但是在这里您需要了解,如果您立即使用索引创建了导入表,那么数据将被填充更长的时间LIKE target_table INCLUDING INDEXES

DEFAULTLIKE target_table INCLUDING DEFAULTSLIKE target_table INCLUDING ALL

比起先填满所有内容然后滚动索引-就像pg_dump做到这一点的示例

总而言之,RTFM

2.怎么写?


我会说简单地使用COPY-stream而不是“ packs” INSERT有时会加速您甚至可以直接从预生成的文件中进行操作。

3.如何处理?


因此,让我们的介绍看起来像这样:

  • 您的数据库中有一个包含100万条记录的客户数据的标牌
  • 客户每天都会向您发送新的完整“图片”
  • 根据经验,您会不时更改超过1万条记录

这种情况的一个典型例子是KLADR数据库 -地址很多,但是每周一次上传更改(定居点重新命名,街道协会,新房子的出现)的数量很少,甚至在全国范围内。

3.1。全同步算法


为简单起见,假设您甚至不需要重组数据-只需以正确的形式显示表格即可,即:

  • 删除不再存在的所有内容
  • 更新所有已存在的内容,您需要更新
  • 插入所有尚未

为什么按此顺序值得进行操作?因为这是表格大小最小增长的方式(请记住MVCC!)。

从dst删除


不,当然,您只能执行两个操作:

  • 完全删除DELETE
  • 粘贴新图像中的所有内容

但是同时,由于有了MVCC,表格大小将增加两倍由于更新了10K,因此在表中获取+ 1M图像记录-超级冗余...

截断dst


经验更丰富的开发人员知道,整个印版可以很便宜地清洗:

  • 清除TRUNCATE)整个表
  • 粘贴新图像中的所有内容

该方法有效,有时很适用,但是存在一个问题……我们将注入1M条记录,因此我们不能一直将表留空(这不会发生在没有包装单个事务的情况下)。

意思是:

  • 我们开始长期交易
  • TRUNCATE施加AccessExclusive -Lock
  • 我们做了很长时间的插入,此时其他所有人甚至无法SELECT

不好了...

ALTER TABLE ...重命名... / DROP TABLE ...


作为一种选择,将所有内容填充到单独的新表中,然后将其重命名为旧表。一些讨厌的小事情:

  • AccessExclusive也是如此,尽管时间大大减少了
  • 该表的所有查询计划/统计信息均已重置,需要驱动ANALYZE
  • 桌上的所有外键(FK)都中断

Simon Riggs提供了一个WIP补丁程序,该补丁程序建议在ALTER不影响统计信息和FK的情况下,在文件级别执行替换表主体操作,但未收集仲裁。

删除,更新,插入


因此,我们停止了三个操作的非阻塞版本。几乎三个……如何最有效地做到这一点?

--     ,     "" 
BEGIN;

--      
CREATE TEMPORARY TABLE tmp(
  LIKE dst INCLUDING INDEXES --    ,   
) ON COMMIT DROP; --       

-- -     COPY
COPY tmp FROM STDIN;
-- ...
-- \.

--  
DELETE FROM
  dst D
USING
  dst X
LEFT JOIN
  tmp Y
    USING(pk1, pk2) --   
WHERE
  (D.pk1, D.pk2) = (X.pk1, X.pk2) AND
  Y IS NOT DISTINCT FROM NULL; -- ""

--  
UPDATE
  dst D
SET
  (f1, f2, f3) = (T.f1, T.f2, T.f3)
FROM
  tmp T
WHERE
  (D.pk1, D.pk2) = (T.pk1, T.pk2) AND
  (D.f1, D.f2, D.f3) IS DISTINCT FROM (T.f1, T.f2, T.f3); --   

--  
INSERT INTO
  dst
SELECT
  T.*
FROM
  tmp T
LEFT JOIN
  dst D
    USING(pk1, pk2)
WHERE
  D IS NOT DISTINCT FROM NULL;

COMMIT;

3.2。导入后处理


在同一个KLADER中,所有更改的记录都必须另外通过后期处理运行-规范化,突出显示关键字并带入必要的结构。但是,您如何知道到底发生了什么变化,而又不使同步代码复杂化?

如果在同步时仅您的进程具有写访问权,则可以使用触发器来为我们收集所有更改:

--  
CREATE TABLE kladr(...);
CREATE TABLE kladr_house(...);

--    
CREATE TABLE kladr$log(
  ro kladr, --      /
  rn kladr
);

CREATE TABLE kladr_house$log(
  ro kladr_house,
  rn kladr_house
);

--    
CREATE OR REPLACE FUNCTION diff$log() RETURNS trigger AS $$
DECLARE
  dst varchar = TG_TABLE_NAME || '$log';
  stmt text = '';
BEGIN
  --      
  IF TG_OP = 'UPDATE' THEN
    IF NEW IS NOT DISTINCT FROM OLD THEN
      RETURN NEW;
    END IF;
  END IF;
  --   
  stmt = 'INSERT INTO ' || dst::text || '(ro,rn)VALUES(';
  CASE TG_OP
    WHEN 'INSERT' THEN
      EXECUTE stmt || 'NULL,$1)' USING NEW;
    WHEN 'UPDATE' THEN
      EXECUTE stmt || '$1,$2)' USING OLD, NEW;
    WHEN 'DELETE' THEN
      EXECUTE stmt || '$1,NULL)' USING OLD;
  END CASE;
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

现在,我们可以在开始同步之前强加触发器(或通过启用ALTER TABLE ... ENABLE TRIGGER ...):

CREATE TRIGGER log
  AFTER INSERT OR UPDATE OR DELETE
  ON kladr
    FOR EACH ROW
      EXECUTE PROCEDURE diff$log();

CREATE TRIGGER log
  AFTER INSERT OR UPDATE OR DELETE
  ON kladr_house
    FOR EACH ROW
      EXECUTE PROCEDURE diff$log();

然后悄悄地从日志表中提取所需的所有更改,并通过其他处理程序运行它。

3.3。导入相关集


上面,我们考虑了源和接收器的数据结构一致的情况。但是,如果从外部系统卸载的格式与我们数据库中的存储结构不同,该怎么办?

以存储客户及其帐户为例,经典的多对一选项:

CREATE TABLE client(
  client_id
    serial
      PRIMARY KEY
, inn
    varchar
      UNIQUE
, name
    varchar
);

CREATE TABLE invoice(
  invoice_id
    serial
      PRIMARY KEY
, client_id
    integer
      REFERENCES client(client_id)
, number
    varchar
, dt
    date
, sum
    numeric(32,2)
);

但是从外部来源卸载以“多合一”的形式出现:

CREATE TEMPORARY TABLE invoice_import(
  client_inn
    varchar
, client_name
    varchar
, invoice_number
    varchar
, invoice_dt
    date
, invoice_sum
    numeric(32,2)
);

显然,客户数据可以通过这种方式进行复制,并且主要记录是“帐户”:

0123456789;;A-01;2020-03-16;1000.00
9876543210;;A-02;2020-03-16;666.00
0123456789;;B-03;2020-03-16;9999.00

对于模型,只需插入我们的测试数据,但请记住- COPY更有效!

INSERT INTO invoice_import
VALUES
  ('0123456789', '', 'A-01', '2020-03-16', 1000.00)
, ('9876543210', '', 'A-02', '2020-03-16', 666.00)
, ('0123456789', '', 'B-03', '2020-03-16', 9999.00);

首先,我们选择“事实”所指的“切工”。在我们的案例中,帐户指的是客户:

CREATE TEMPORARY TABLE client_import AS
SELECT DISTINCT ON(client_inn)
--   SELECT DISTINCT,    
  client_inn inn
, client_name "name"
FROM
  invoice_import;

为了正确地将帐户与客户ID相关联,我们需要首先找出或生成这些标识符。为他们添加字段:

ALTER TABLE invoice_import ADD COLUMN client_id integer;
ALTER TABLE client_import ADD COLUMN client_id integer;

我们将使用同步表的方法进行上述较小的修正-我们将不更新或删除目标表中的任何内容,因为导入客户端是“仅追加的”:

--     ID   
UPDATE
  client_import T
SET
  client_id = D.client_id
FROM
  client D
WHERE
  T.inn = D.inn; -- unique key

--       ID
WITH ins AS (
  INSERT INTO client(
    inn
  , name
  )
  SELECT
    inn
  , name
  FROM
    client_import
  WHERE
    client_id IS NULL --  ID  
  RETURNING *
)
UPDATE
  client_import T
SET
  client_id = D.client_id
FROM
  ins D
WHERE
  T.inn = D.inn; -- unique key

--  ID    
UPDATE
  invoice_import T
SET
  client_id = D.client_id
FROM
  client_import D
WHERE
  T.client_inn = D.inn; --  

实际上,所有内容- invoice_import现在我们已经填写client_id了用于插入帐户的通讯字段

All Articles