理想的数据仓库的原理是什么?在没有样板代码的情况下,专注于业务价值和分析。将DWH作为代码库进行管理:版本控制,审阅,自动化测试和CI。模块化,可扩展性,开源和社区。友好的用户文档和依赖关系可视化(数据沿袭)。有关所有这些以及DBT在大数据和分析生态系统中的作用的更多信息,欢迎关注。大家好
与Artemy Kozyr联系。五年多来,我一直在与数据仓库合作,构建ETL / ELT,以及数据分析和可视化。我目前在Wheely工作,在OTUS教授数据工程师课程,今天我想与大家分享我在新课程注册开始前夕写的一篇文章。简短评论
DBT框架全都与缩写词ELT(提取-转换-加载)中的字母T有关。随着BigQuery,Redshift,Snowflake等高效且可扩展的分析数据库的出现,在数据仓库外部进行转换的任何意义都消失了。 DBT不会从源下载数据,但是提供了巨大的机会来处理已经加载到存储(内部或外部存储)中的数据。DBT的主要目的是获取代码,在SQL中进行编译,并在存储库中以正确的顺序执行命令。DBT项目结构
该项目仅由两种类型的目录和文件组成:- 模型(.sql)-SELECT查询表示的转换单位
- 配置文件(.yml)-参数,设置,测试,文档
从根本上来说,工作的结构如下:- 用户在任何方便的IDE中准备模型代码
- 使用CLI启动模型,DBT用SQL编译模型代码
- 在仓库中按指定顺序执行已编译的SQL代码(图形)
以下是从CLI启动的样子:一切都是选择
这是数据构建工具框架的致命功能。换句话说,DBT提取与仓库中查询的实现相关的所有代码(来自CREATE,INSERT,UPDATE,DELETE ALTER,GRANT等命令的变体)。任何模型都涉及编写一个SELECT查询,该查询定义了结果数据集。同时,转换逻辑可以是多级的,可以合并来自其他几个模型的数据。将构建订单展示(f_orders)的模型示例:{% set payment_methods = ['credit_card', 'coupon', 'bank_transfer', 'gift_card'] %}
with orders as (
select * from {{ ref('stg_orders') }}
),
order_payments as (
select * from {{ ref('order_payments') }}
),
final as (
select
orders.order_id,
orders.customer_id,
orders.order_date,
orders.status,
{% for payment_method in payment_methods -%}
order_payments.{{payment_method}}_amount,
{% endfor -%}
order_payments.total_amount as amount
from orders
left join order_payments using (order_id)
)
select * from final
我们在这里能看到什么有趣的东西?首先:CTE(公用表表达式)用于组织和理解包含许多转换和业务逻辑的代码;其次:模型代码是SQL和Jinja语言(模板语言)的混合。在示例中,for循环用于为set表达式中指定的每种付款方式形成金额。ref函数也可以使用-可以在代码内部引用其他模型:- 在编译时,ref将被转换为指向存储库中表或视图的目标指针
- ref允许您构建模型依赖关系图
这是神社的是增加了几乎无限的可能性,DBT。最常用的:- if / else语句-分支语句
- For循环-循环
- 变量-变量
- 巨集-建立巨集
物化:表,视图,增量
物化策略-一种方法,根据该方法将结果模型数据集存储在存储库中。从根本上考虑,这是:有更复杂的物化策略:- 增量-增量加载(大型事实表);添加新行,更新已修改的行,清除已删除的行
- 暂时的-该模型不能直接实现,但可以作为CTE参与其他模型
- 您可以添加自己的任何其他策略
除了物化策略外,还为特定仓库提供了优化的机会,例如:- Snowflake:临时表,合并行为,表集群,复制授权,安全视图
- Redshift:Distkey,Sortkey(交错,复合),后期绑定视图
- BigQuery:表分区和集群,合并行为,KMS加密,标签和标签
- Spark:文件格式(镶木地板,csv,json,兽人,三角洲),partition_by,clustered_by,存储桶,incremental_strategy
当前支持以下存储库:- Postgres
- 红移
- 大查询
- 雪花
- Presto(部分)
- Spark(部分)
- Microsoft SQL Server(社区适配器)
让我们改进模型:- 使其填充增量(增量)
- 为Redshift添加细分和排序键
{{
config(
materialized='incremental',
unique_key='order_id',
dist="customer_id",
sort="order_date"
)
}}
{% set payment_methods = ['credit_card', 'coupon', 'bank_transfer', 'gift_card'] %}
with orders as (
select * from {{ ref('stg_orders') }}
where 1=1
{% if is_incremental() -%}
and order_date >= (select max(order_date) from {{ this }})
{%- endif %}
),
order_payments as (
select * from {{ ref('order_payments') }}
),
final as (
select
orders.order_id,
orders.customer_id,
orders.order_date,
orders.status,
{% for payment_method in payment_methods -%}
order_payments.{{payment_method}}_amount,
{% endfor -%}
order_payments.total_amount as amount
from orders
left join order_payments using (order_id)
)
select * from final
模型依赖图
他是一棵依赖树。他是DAG(有向无环图-定向无环图)。DBT根据所有项目模型的配置来构建图形,或者将模型内部的ref()链接链接到其他模型。拥有图使您可以执行以下操作:图形可视化示例:图的每个节点都是一个模型,图的边缘由表达式ref给出。数据质量和文档
除了模型本身的形成之外,DBT还允许您测试有关结果数据集的许多断言,例如:- 不为空
- 独特
- 参考完整性-参考完整性(例如,订单表中的customer_id对应于客户表中的id)
- 匹配有效列表
您可以添加自己的测试(自定义数据测试),例如,一天,一周,一个月前的收入与指标的偏差百分比。公式化为SQL查询的任何假设都可以作为测试。这样,可以在存储的店面中捕获不必要的数据偏差和错误。在文档方面,DBT提供了用于在模型级别甚至属性上添加,版本控制和分发元数据和注释的机制。 这是在配置文件级别添加测试和文档的样子: - name: fct_orders
description: This table has basic information about orders, as well as some derived facts based on payments
columns:
- name: order_id
tests:
- unique
- not_null
description: This is a unique identifier for an order
- name: customer_id
description: Foreign key to the customers table
tests:
- not_null
- relationships:
to: ref('dim_customers')
field: customer_id
- name: order_date
description: Date (UTC) that the order was placed
- name: status
description: '{{ doc("orders_status") }}'
tests:
- accepted_values:
values: ['placed', 'shipped', 'completed', 'return_pending', 'returned']
这是此文档在生成的网站上的外观:宏和模块
DBT的目的不仅仅是成为一组SQL脚本,而是为用户提供功能强大且功能丰富的工具,以构建自己的转换并分发这些模块。宏是可在模型中称为函数的构造和表达式集。宏使您可以根据DRY(不要自己重复)工程原理在模型和项目之间重用SQL。宏示例:{% macro rename_category(column_name) %}
case
when {{ column_name }} ilike '%osx%' then 'osx'
when {{ column_name }} ilike '%android%' then 'android'
when {{ column_name }} ilike '%ios%' then 'ios'
else 'other'
end as renamed_product
{% endmacro %}
及其用途:{% set column_name = 'product' %}
select
product,
{{ rename_category(column_name) }}
from my_table
DBT带有程序包管理器,该程序包管理器允许用户发布和重用单个模块和宏。这意味着可以下载和使用库,例如:dbt hub
上提供了软件包的完整列表。更多功能
在这里,我将描述我和团队用来在Wheely中构建数据仓库的其他一些有趣的功能和实现。运行时环境的分离DEV-TEST-PROD
即使在相同的DWH群集中(采用不同的方案)。例如,使用以下表达式:with source as (
select * from {{ source('salesforce', 'users') }}
where 1=1
{%- if target.name in ['dev', 'test', 'ci'] -%}
where timestamp >= dateadd(day, -3, current_date)
{%- endif -%}
)
该代码从字面上说:对于开发,测试,ci环境,仅获取最近3天的数据,而不再获取数据。也就是说,在这些环境中运行将更快并且需要更少的资源。在产品环境中启动时,过滤条件将被忽略。替代列编码实现
Redshift是列DBMS,它允许您为每个单独的列指定数据压缩算法。选择最佳算法可以减少20-50%的磁盘占用空间。redshift.compress_table宏将执行ANALYZE COMPRESSION命令,使用分段键(dist_key)和排序(key_key)指示的建议列编码算法创建新表,将数据传输至该表,并在必要时删除旧副本。
宏签名:{{ compress_table(schema, table,
drop_backup=False,
comprows=none|Integer,
sort_style=none|compound|interleaved,
sort_keys=none|List<String>,
dist_style=none|all|even,
dist_key=none|String) }}
记录模型启动
对于模型的每次执行,您都可以挂起在启动模型之前或创建模型之后立即执行的钩子: pre-hook: "{{ logging.log_model_start_event() }}"
post-hook: "{{ logging.log_model_end_event() }}"
日志记录模块将允许您将所有必要的元数据记录在一个单独的表中,据此您可以随后审核和分析问题区域(瓶颈)。这是仪表板在Looker中的查找数据上的外观:仓储自动化
如果您使用所用存储的功能的任何扩展,例如UDF(用户定义的函数),则对这些函数进行版本控制,访问控制以及自动推出新发行版在DBT中非常方便地实现。我们在Python中使用UDF来计算哈希值,邮件域域和解码位掩码。在任何运行时(开发,测试,生产)上创建UDF的示例宏:{% macro create_udf() -%}
{% set sql %}
CREATE OR REPLACE FUNCTION {{ target.schema }}.f_sha256(mes "varchar")
RETURNS varchar
LANGUAGE plpythonu
STABLE
AS $$
import hashlib
return hashlib.sha256(mes).hexdigest()
$$
;
{% endset %}
{% set table = run_query(sql) %}
{%- endmacro %}
在Wheely,我们使用基于PostgreSQL的Amazon Redshift。对于Redshift,定期收集表统计信息并释放磁盘空间非常重要-分别是ANALYZE和VACUUM命令。为此,每晚都会执行redshift_maintenance宏中的命令:{% macro redshift_maintenance() %}
{% set vacuumable_tables=run_query(vacuumable_tables_sql) %}
{% for row in vacuumable_tables %}
{% set message_prefix=loop.index ~ " of " ~ loop.length %}
{%- set relation_to_vacuum = adapter.get_relation(
database=row['table_database'],
schema=row['table_schema'],
identifier=row['table_name']
) -%}
{% do run_query("commit") %}
{% if relation_to_vacuum %}
{% set start=modules.datetime.datetime.now() %}
{{ dbt_utils.log_info(message_prefix ~ " Vacuuming " ~ relation_to_vacuum) }}
{% do run_query("VACUUM " ~ relation_to_vacuum ~ " BOOST") %}
{{ dbt_utils.log_info(message_prefix ~ " Analyzing " ~ relation_to_vacuum) }}
{% do run_query("ANALYZE " ~ relation_to_vacuum) %}
{% set end=modules.datetime.datetime.now() %}
{% set total_seconds = (end - start).total_seconds() | round(2) %}
{{ dbt_utils.log_info(message_prefix ~ " Finished " ~ relation_to_vacuum ~ " in " ~ total_seconds ~ "s") }}
{% else %}
{{ dbt_utils.log_info(message_prefix ~ ' Skipping relation "' ~ row.values() | join ('"."') ~ '" as it does not exist') }}
{% endif %}
{% endfor %}
{% endmacro %}
DBT云
可以将DBT用作服务(托管服务)。在一组中:- 用于开发项目和模型的Web IDE
- 作业配置和计划设置
- 简单方便地访问日志
- 具有您的项目文档的网站
- CI(持续集成)连接
结论
烹饪和饮用DWH与喝冰沙一样令人愉悦和有益。DBT由Jinja,自定义扩展(模块),编译器,引擎(执行程序)和包管理器组成。一起收集这些元素,您将为数据仓库提供一个完善的工作环境。今天,在DWH中几乎没有更好的方法来管理转换。
DBT开发人员遵循的信念如下:- 代码而不是GUI是表达复杂分析逻辑的最佳抽象
- 处理数据应适应软件开发的最佳实践(软件工程)
- 关键数据基础架构必须由用户社区作为开源软件进行控制
- 不仅分析工具,代码也将越来越多地成为开源社区的一部分。
这些核心信念催生了目前有850多家公司使用的产品,它们构成了将来将要创建的许多有趣扩展的基础。对于那些感兴趣的人,有一个我在几个月前花费的公开课程的视频,这是OTUS- Amazon Redshift存储库的数据构建工具中的公开课程的一部分。除了DBT和数据仓库之外,作为OTUS平台上的数据工程师课程的一部分,我和我的同事们还就其他一些相关的现代主题进行课程:- 大数据应用的架构概念
- 练习Spark和Spark Streaming
- 加载数据源的学习方法和工具
- 在DWH中建立分析展示柜
- NoSQL: HBase, Cassandra, ElasticSearch
-
- :
:
- DBT documentation — Introduction —
- What, exactly, is dbt? — DBT
- Data Build Tool Amazon Redshift — YouTube, OTUS
- Greenplum — 15 2020
- Data Engineering — OTUS
- Building a Mature Analytics Workflow —
- It’s time for open source analytics — Open Source
- Continuous Integration and Automated Build Testing with dbtCloud — CI DBT
- Getting started with DBT tutorial — ,
- Jaffle shop — Github DBT Tutorial — Github,
.