随着大型数据集的复杂处理(不同的ETL过程:导入,转换和与外部源的同步),通常有必要临时“记住”并立即处理大量的事情。这种典型的任务通常听起来像这样:“ 会计部门在这里上载了从客户银行最后收到的付款,我们需要快速将它们上载到网站并将其链接到帐户”。但是当这种“某物”的容量开始以数百兆字节为单位进行测量时,该服务这应该可以在24x7模式下继续使用底座,但有许多副作用会破坏您的生活。
为了在PostgreSQL中(而且不仅是在其中)应对它们,您可以使用一些优化选项,这些选项将使您能够更快地处理并且使用更少的资源。1.在哪里发货?
首先,让我们决定在哪里可以上传我们要“处理”的数据。1.1。临时表(TEMPORARY TABLE)
原则上,对于PostgreSQL,临时表与其他表相同。因此,诸如“所有内容仅存储在内存中,但它可以结束”之类的迷信是不正确的。但是有几个明显的不同。到数据库的每个连接都有自己的名称空间
如果两个连接试图同时建立CREATE TABLE x
,那么肯定有人会收到非唯一 DB对象的错误。但是,如果两个都尝试执行,那么通常两个都可以执行,并且每个都将收到自己的表副本。他们之间没有任何共同点。CREATE TEMPORARY TABLE x
断开连接的“自毁”
当您关闭连接时,所有临时表都会自动删除,因此“手动”执行DROP TABLE x
没有任何意义,除非...如果您在事务模式下通过pgbouncer进行工作,数据库将继续假定该连接仍然处于活动状态,并且该临时表该表仍然存在。因此,尝试从另一个连接到pgbouncer重新创建它会导致错误。但这可以通过利用来规避。
没错,最好不要这样做,因为这样您就可以“突然”找出“前任所有者”留下的数据。相反,最好阅读手册,并确保在创建表时有机会添加CREATE TEMPORARY TABLE IF NOT EXISTS x
ON COMMIT DROP
-也就是说,交易完成后,表格将被自动删除。非复制
由于仅属于特定联接,因此不会复制临时表。但这消除了在堆+ WAL中双重写入数据的需要,因此INSERT / UPDATE / DELETE的速度要快得多。但是由于临时表仍然是“几乎普通的”表,因此也不能在副本上创建它。至少到目前为止,尽管相应的补丁已经存在很长时间了。1.2。未记录表(UNLOGGED TABLE)
但是,例如,如果您有某种繁琐的ETL流程无法在单个事务中实现,而您仍然有pgbouncer处于事务模式,该怎么办?..或数据流太大,以至于每个连接没有足够的带宽从数据库(读取一个CPU上的进程)?..还是某些操作在不同的连接中异步进行?..只有一个选择- 临时创建一个非临时表。潘,是的 即:- 使用最大随机名称创建“他”表,以免与任何人交叉
- 提取:将来自外部源的数据倒入其中
- 转换:转换后,填写关键绑定字段
- 加载:将完成的数据倒入目标表
- 删除“我的”表
而现在-美中不足的苍蝇。实际上,PostgreSQL中的所有编写都发生两次 - 首先在WAL中,然后在表/索引的主体中。所有这些都是为了支持ACID COMMIT
以及ROLLBACK
嵌套事务和嵌套事务之间数据的正确可见性。但是我们不需要这个!我们是否已通过或未成功通过整个过程。它包含多少中间事务都没关系-我们对“从中间继续进行过程”不感兴趣,尤其是当不清楚它在哪里时。为此,PostgreSQL开发人员引入了9.1版,例如非新闻稿(UNLOGGED)表:. , , (. 29), . , ; . , . , , .
简而言之,它将更快,但是如果数据库服务器“崩溃”,那将是令人不快的。但如何经常会出现这种情况,与该数据库的“振兴规划”后,确实你的ETL过程中知道如何正确地修改它“从中间” ..?如果不是,和上面的情况类似,你的-使用UNLOGGED
,但从来没有包括在真实的表这个属性亲爱的数据。1.3。提交{删除行| 下降}
这种设计允许在创建表时设置事务结束时的自动行为。关于我在上面已经写过的信息,它会生成,但是情况更有趣-在这里它生成了。
由于用于存储临时表的meta描述整个基础设施是完全一样平常之一,不断创建和删除临时表导致的一个强大的系统表的“肿胀”的pg_class,pg_attribute里,pg_attrdef,pg_depend,...... 现在,假设你有一个工人就行了连接到数据库,每秒打开一个新事务,创建,填充,处理和删除临时表...系统表中的垃圾会累积过多,这在每次操作过程中都是多余的刹车。ON COMMIT DROP
DROP TABLE
ON COMMIT DELETE ROWS
TRUNCATE TABLE
一般来说,不要!在这种情况下,CREATE TEMPORARY TABLE x ... ON COMMIT DELETE ROWS
将其从事务周期中删除会更加有效-然后在每个新事务开始之前,该表将已经存在(保存调用CREATE
),但由于在上一个事务结束时(我们也保存了调用),该表将为空TRUNCATE
。1.4。喜欢...包括...
我在一开始就提到临时表的典型用例之一是各种导入-开发人员很累地将目标表的字段列表复制粘贴到他的临时声明中……但是懒惰是进步的动力!因此,“在模型上”创建新表可能会更加简单:CREATE TEMPORARY TABLE import_table(
LIKE target_table
);
由于您可以在此表中添加大量数据,因此对其进行搜索将永远不会很快。但是有一个针对此的传统解决方案-索引!而且,是的,临时表也可以具有index。由于所需的索引通常与目标表的索引一致,因此您只需编写即可。
如果还需要-values(例如,填写主键值),则可以使用。好吧,或者干脆- 它会复制默认值,索引,约束... 但是在这里您需要了解,如果您立即使用索引创建了导入表,那么数据将被填充更长的时间LIKE target_table INCLUDING INDEXES
DEFAULT
LIKE target_table INCLUDING DEFAULTS
LIKE target_table INCLUDING ALL
比起先填满所有内容然后滚动索引-就像pg_dump做到这一点的示例。总而言之,RTFM!2.怎么写?
我会说简单地使用COPY
-stream而不是“ packs” INSERT
,有时会加速。您甚至可以直接从预生成的文件中进行操作。3.如何处理?
因此,让我们的介绍看起来像这样:- 您的数据库中有一个包含100万条记录的客户数据的标牌
- 客户每天都会向您发送新的完整“图片”
- 根据经验,您会不时更改超过1万条记录
这种情况的一个典型例子是KLADR数据库 -地址很多,但是每周一次上传更改(定居点重新命名,街道协会,新房子的出现)的数量很少,甚至在全国范围内。3.1。全同步算法
为简单起见,假设您甚至不需要重组数据-只需以正确的形式显示表格即可,即:- 删除不再存在的所有内容
- 更新所有已存在的内容,您需要更新
- 插入所有尚未
为什么按此顺序值得进行操作?因为这是表格大小最小增长的方式(请记住MVCC!)。从dst删除
不,当然,您只能执行两个操作:但是同时,由于有了MVCC,表格的大小将增加两倍!由于更新了10K,因此在表中获取+ 1M图像记录-超级冗余...截断dst
经验更丰富的开发人员知道,整个印版可以很便宜地清洗:- 清除(
TRUNCATE
)整个表 - 粘贴新图像中的所有内容
该方法有效,有时很适用,但是存在一个问题……我们将注入1M条记录,因此我们不能一直将表留空(这不会发生在没有包装单个事务的情况下)。意思是:- 我们开始长期交易
TRUNCATE
施加AccessExclusive -Lock- 我们做了很长时间的插入,此时其他所有人甚至无法
SELECT
不好了...ALTER TABLE ...重命名... / DROP TABLE ...
作为一种选择,将所有内容填充到单独的新表中,然后将其重命名为旧表。一些讨厌的小事情:- AccessExclusive也是如此,尽管时间大大减少了
- 该表的所有查询计划/统计信息均已重置,需要驱动ANALYZE
- 桌上的所有外键(FK)都中断了
Simon Riggs提供了一个WIP补丁程序,该补丁程序建议在ALTER
不影响统计信息和FK的情况下,在文件级别执行替换表主体的操作,但未收集仲裁。删除,更新,插入
因此,我们停止了三个操作的非阻塞版本。几乎三个……如何最有效地做到这一点?
BEGIN;
CREATE TEMPORARY TABLE tmp(
LIKE dst INCLUDING INDEXES
) ON COMMIT DROP;
COPY tmp FROM STDIN;
DELETE FROM
dst D
USING
dst X
LEFT JOIN
tmp Y
USING(pk1, pk2)
WHERE
(D.pk1, D.pk2) = (X.pk1, X.pk2) AND
Y IS NOT DISTINCT FROM NULL;
UPDATE
dst D
SET
(f1, f2, f3) = (T.f1, T.f2, T.f3)
FROM
tmp T
WHERE
(D.pk1, D.pk2) = (T.pk1, T.pk2) AND
(D.f1, D.f2, D.f3) IS DISTINCT FROM (T.f1, T.f2, T.f3);
INSERT INTO
dst
SELECT
T.*
FROM
tmp T
LEFT JOIN
dst D
USING(pk1, pk2)
WHERE
D IS NOT DISTINCT FROM NULL;
COMMIT;
3.2。导入后处理
在同一个KLADER中,所有更改的记录都必须另外通过后期处理运行-规范化,突出显示关键字并带入必要的结构。但是,您如何知道到底发生了什么变化,而又不使同步代码复杂化?如果在同步时仅您的进程具有写访问权,则可以使用触发器来为我们收集所有更改:
CREATE TABLE kladr(...);
CREATE TABLE kladr_house(...);
CREATE TABLE kladr$log(
ro kladr,
rn kladr
);
CREATE TABLE kladr_house$log(
ro kladr_house,
rn kladr_house
);
CREATE OR REPLACE FUNCTION diff$log() RETURNS trigger AS $$
DECLARE
dst varchar = TG_TABLE_NAME || '$log';
stmt text = '';
BEGIN
IF TG_OP = 'UPDATE' THEN
IF NEW IS NOT DISTINCT FROM OLD THEN
RETURN NEW;
END IF;
END IF;
stmt = 'INSERT INTO ' || dst::text || '(ro,rn)VALUES(';
CASE TG_OP
WHEN 'INSERT' THEN
EXECUTE stmt || 'NULL,$1)' USING NEW;
WHEN 'UPDATE' THEN
EXECUTE stmt || '$1,$2)' USING OLD, NEW;
WHEN 'DELETE' THEN
EXECUTE stmt || '$1,NULL)' USING OLD;
END CASE;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
现在,我们可以在开始同步之前强加触发器(或通过启用ALTER TABLE ... ENABLE TRIGGER ...
):CREATE TRIGGER log
AFTER INSERT OR UPDATE OR DELETE
ON kladr
FOR EACH ROW
EXECUTE PROCEDURE diff$log();
CREATE TRIGGER log
AFTER INSERT OR UPDATE OR DELETE
ON kladr_house
FOR EACH ROW
EXECUTE PROCEDURE diff$log();
然后悄悄地从日志表中提取所需的所有更改,并通过其他处理程序运行它。3.3。导入相关集
上面,我们考虑了源和接收器的数据结构一致的情况。但是,如果从外部系统卸载的格式与我们数据库中的存储结构不同,该怎么办?以存储客户及其帐户为例,经典的多对一选项:CREATE TABLE client(
client_id
serial
PRIMARY KEY
, inn
varchar
UNIQUE
, name
varchar
);
CREATE TABLE invoice(
invoice_id
serial
PRIMARY KEY
, client_id
integer
REFERENCES client(client_id)
, number
varchar
, dt
date
, sum
numeric(32,2)
);
但是从外部来源卸载以“多合一”的形式出现:CREATE TEMPORARY TABLE invoice_import(
client_inn
varchar
, client_name
varchar
, invoice_number
varchar
, invoice_dt
date
, invoice_sum
numeric(32,2)
);
显然,客户数据可以通过这种方式进行复制,并且主要记录是“帐户”:0123456789;;A-01;2020-03-16;1000.00
9876543210;;A-02;2020-03-16;666.00
0123456789;;B-03;2020-03-16;9999.00
对于模型,只需插入我们的测试数据,但请记住- COPY
更有效!INSERT INTO invoice_import
VALUES
('0123456789', '', 'A-01', '2020-03-16', 1000.00)
, ('9876543210', '', 'A-02', '2020-03-16', 666.00)
, ('0123456789', '', 'B-03', '2020-03-16', 9999.00);
首先,我们选择“事实”所指的“切工”。在我们的案例中,帐户指的是客户:CREATE TEMPORARY TABLE client_import AS
SELECT DISTINCT ON(client_inn)
client_inn inn
, client_name "name"
FROM
invoice_import;
为了正确地将帐户与客户ID相关联,我们需要首先找出或生成这些标识符。为他们添加字段:ALTER TABLE invoice_import ADD COLUMN client_id integer;
ALTER TABLE client_import ADD COLUMN client_id integer;
我们将使用同步表的方法进行上述较小的修正-我们将不更新或删除目标表中的任何内容,因为导入客户端是“仅追加的”:
UPDATE
client_import T
SET
client_id = D.client_id
FROM
client D
WHERE
T.inn = D.inn;
WITH ins AS (
INSERT INTO client(
inn
, name
)
SELECT
inn
, name
FROM
client_import
WHERE
client_id IS NULL
RETURNING *
)
UPDATE
client_import T
SET
client_id = D.client_id
FROM
ins D
WHERE
T.inn = D.inn;
UPDATE
invoice_import T
SET
client_id = D.client_id
FROM
client_import D
WHERE
T.client_inn = D.inn;
实际上,所有内容- invoice_import
现在我们已经填写client_id
了用于插入帐户的通讯字段。