数据构建工具或数据仓库和居安思危型之间的共同点


理想的数据仓库的原理是什么?

在没有样板代码的情况下,专注于业务价值和分析。将DWH作为代码库进行管理:版本控制,审阅,自动化测试和CI。模块化,可扩展性,开源和社区。友好的用户文档和依赖关系可视化(数据沿袭)。

有关所有这些以及DBT在大数据和分析生态系统中的作用的更多信息,欢迎关注。

大家好


与Artemy Kozyr联系。五年多来,我一直在与数据仓库合作,构建ETL / ELT,以及数据分析和可视化。我目前在Wheely工作,在OTUS教授数据工程师课程,今天我想与大家分享我在新课程注册开始前夕写的一篇文章

简短评论


DBT框架全都与缩写词ELT(提取-转换-加载)中的字母T有关。

随着BigQuery,Redshift,Snowflake等高效且可扩展的分析数据库的出现,在数据仓库外部进行转换的任何意义都消失了。 

DBT不会从源下载数据,但是提供了巨大的机会来处理已经加载到存储(内部或外部存储)中的数据。


DBT的主要目的是获取代码,在SQL中进行编译,并在存储库中以正确的顺序执行命令。

DBT项目结构


该项目仅由两种类型的目录和文件组成:

  • 模型(.sql)-SELECT查询表示的转换单位
  • 配置文件(.yml)-参数,设置,测试,文档

从根本上来说,工作的结构如下:

  • 用户在任何方便的IDE中准备模型代码
  • 使用CLI启动模型,DBT用SQL编译模型代码
  • 在仓库中按指定顺序执行已编译的SQL代码(图形)

以下是从CLI启动的样子:


一切都是选择


这是数据构建工具框架的致命功能。换句话说,DBT提取与仓库中查询的实现相关的所有代码(来自CREATE,INSERT,UPDATE,DELETE ALTER,GRANT等命令的变体)。

任何模型都涉及编写一个SELECT查询,该查询定义了结果数据集。

同时,转换逻辑可以是多级的,可以合并来自其他几个模型的数据。将构建订单展示(f_orders)的模型示例:

{% set payment_methods = ['credit_card', 'coupon', 'bank_transfer', 'gift_card'] %}
 
with orders as (
 
   select * from {{ ref('stg_orders') }}
 
),
 
order_payments as (
 
   select * from {{ ref('order_payments') }}
 
),
 
final as (
 
   select
       orders.order_id,
       orders.customer_id,
       orders.order_date,
       orders.status,
       {% for payment_method in payment_methods -%}
       order_payments.{{payment_method}}_amount,
       {% endfor -%}
       order_payments.total_amount as amount
   from orders
       left join order_payments using (order_id)
 
)
 
select * from final

我们在这里能看到什么有趣的东西?

首先:CTE(公用表表达式)用于组织和理解包含许多转换和业务逻辑的

代码;其次:模型代码是SQL和Jinja语言(模板语言)的混合。

在示例中,for循环用于set表达式中指定的每种付款方式形成金额ref函数也可以使用-可以在代码内部引用其他模型:

  • 在编译时,ref将被转换为指向存储库中表或视图的目标指针
  • ref允许您构建模型依赖关系图

这是神社的是增加了几乎无限的可能性,DBT。最常用的:

  • if / else语句-分支语句
  • For循环-循环
  • 变量-变量
  • 巨集-建立巨集

物化:表,视图,增量


物化策略-一种方法,根据该方法将结果模型数据集存储在存储库中。

从根本上考虑,这是:

  • 表-存储中的物理表
  • 视图-存储库中的虚拟表视图

有更复杂的物化策略:

  • 增量-增量加载(大型事实表);添加新行,更新已修改的行,清除已删除的行 
  • 暂时的-该模型不能直接实现,但可以作为CTE参与其他模型
  • 您可以添加自己的任何其他策略

除了物化策略外,还为特定仓库提供了优化的机会,例如:

  • Snowflake:临时表,合并行为,表集群,复制授权,安全视图
  • Redshift:Distkey,Sortkey(交错,复合),后期绑定视图
  • BigQuery:表分区和集群,合并行为,KMS加密,标签和标签
  • Spark:文件格式(镶木地板,csv,json,兽人,三角洲),partition_by,clustered_by,存储桶,incremental_strategy

当前支持以下存储库:

  • Postgres
  • 红移
  • 大查询
  • 雪花
  • Presto(部分)
  • Spark(部分)
  • Microsoft SQL Server(社区适配器)

让我们改进模型:

  • 使其填充增量(增量)
  • 为Redshift添加细分和排序键

--  : 
--  ,      (unique_key)
--   (dist),   (sort)
{{
  config(
       materialized='incremental',
       unique_key='order_id',
       dist="customer_id",
       sort="order_date"
   )
}}
 
{% set payment_methods = ['credit_card', 'coupon', 'bank_transfer', 'gift_card'] %}
 
with orders as (
 
   select * from {{ ref('stg_orders') }}
   where 1=1
   {% if is_incremental() -%}
       --        
       and order_date >= (select max(order_date) from {{ this }})
   {%- endif %} 
 
),
 
order_payments as (
 
   select * from {{ ref('order_payments') }}
 
),
 
final as (
 
   select
       orders.order_id,
       orders.customer_id,
       orders.order_date,
       orders.status,
       {% for payment_method in payment_methods -%}
       order_payments.{{payment_method}}_amount,
       {% endfor -%}
       order_payments.total_amount as amount
   from orders
       left join order_payments using (order_id)
 
)
 
select * from final

模型依赖图


他是一棵依赖树。他是DAG(有向无环图-定向无环图)。

DBT根据所有项目模型的配置来构建图形,或者将模型内部的ref()链接链接到其他模型。拥有图使您可以执行以下操作:

  • 以正确的顺序运行模型
  • 窗饰的并行化
  • 运行任意子图 

图形可视化示例:


图的每个节点都是一个模型,图的边缘由表达式ref给出。

数据质量和文档


除了模型本身的形成之外,DBT还允许您测试有关结果数据集的许多断言,例如:

  • 不为空
  • 独特
  • 参考完整性-参考完整性(例如,订单表中的customer_id对应于客户表中的id)
  • 匹配有效列表

您可以添加自己的测试(自定义数据测试),例如,一天,一周,一个月前的收入与指标的偏差百分比。公式化为SQL查询的任何假设都可以作为测试。

这样,可以在存储的店面中捕获不必要的数据偏差和错误。

在文档方面,DBT提供了用于在模型级别甚至属性上添加,版本控制和分发元数据和注释的机制。 

这是在配置文件级别添加测试和文档的样子:

 - name: fct_orders
   description: This table has basic information about orders, as well as some derived facts based on payments
   columns:
     - name: order_id
       tests:
         - unique #    
         - not_null #    null
       description: This is a unique identifier for an order
     - name: customer_id
       description: Foreign key to the customers table
       tests:
         - not_null
         - relationships: #   
             to: ref('dim_customers')
             field: customer_id
     - name: order_date
       description: Date (UTC) that the order was placed
     - name: status
       description: '{{ doc("orders_status") }}'
       tests:
         - accepted_values: #    
             values: ['placed', 'shipped', 'completed', 'return_pending', 'returned']

这是此文档在生成的网站上的外观:


宏和模块


DBT的目的不仅仅是成为一组SQL脚本,而是为用户提供功能强大且功能丰富的工具,以构建自己的转换并分发这些模块。

宏是可在模型中称为函数的构造和表达式集。宏使您可以根据DRY(不要自己重复)工程原理在模型和项目之间重用SQL。

宏示例:

{% macro rename_category(column_name) %}
case
 when {{ column_name }} ilike  '%osx%' then 'osx'
 when {{ column_name }} ilike  '%android%' then 'android'
 when {{ column_name }} ilike  '%ios%' then 'ios'
 else 'other'
end as renamed_product
{% endmacro %}

及其用途:

{% set column_name = 'product' %}
select
 product,
 {{ rename_category(column_name) }} --  
from my_table

DBT带有程序包管理器,该程序包管理器允许用户发布和重用单个模块和宏。

这意味着可以下载和使用库,例如​​:


dbt hub 上提供了软件包的完整列表

更多功能


在这里,我将描述我和团队用来在Wheely中构建数据仓库的其他一些有趣的功能和实现

运行时环境的分离DEV-TEST-PROD


即使在相同的DWH群集中(采用不同的方案)。例如,使用以下表达式:

with source as (
 
   select * from {{ source('salesforce', 'users') }}
   where 1=1
   {%- if target.name in ['dev', 'test', 'ci'] -%}           
       where timestamp >= dateadd(day, -3, current_date)   
   {%- endif -%}
 
)

该代码从字面上说:对于开发,测试,ci环境仅获取最近3天的数据,而不再获取数据。也就是说,在这些环境中运行将更快并且需要更少的资源。产品环境中启动时,过滤条件将被忽略。

替代列编码实现


Redshift是列DBMS,它允许您为每个单独的列指定数据压缩算法。选择最佳算法可以减少20-50%的磁盘占用空间。redshift.compress_table

将执行ANALYZE COMPRESSION命令,使用分段键(dist_key)和排序(key_key)指示的建议列编码算法创建新表,将数据传输至该表,并在必要时删除旧副本。 宏签名:



{{ compress_table(schema, table,
                 drop_backup=False,
                 comprows=none|Integer,
                 sort_style=none|compound|interleaved,
                 sort_keys=none|List<String>,
                 dist_style=none|all|even,
                 dist_key=none|String) }}

记录模型启动


对于模型的每次执行,您都可以挂起在启动模型之前或创建模型之后立即执行的钩子:

   pre-hook: "{{ logging.log_model_start_event() }}"
   post-hook: "{{ logging.log_model_end_event() }}"

日志记录模块将允许您将所有必要的元数据记录在一个单独的表中,据此您可以随后审核和分析问题区域(瓶颈)。

这是仪表板在Looker中的查找数据上的外观:


仓储自动化


如果您使用所用存储的功能的任何扩展,例如UDF(用户定义的函数),则对这些函数进行版本控制,访问控制以及自动推出新发行版在DBT中非常方便地实现。

我们在Python中使用UDF来计算哈希值,邮件域域和解码位掩码。

在任何运行时(开发,测试,生产)上创建UDF的示例宏:

{% macro create_udf() -%}
 
 {% set sql %}
       CREATE OR REPLACE FUNCTION {{ target.schema }}.f_sha256(mes "varchar")
           RETURNS varchar
           LANGUAGE plpythonu
           STABLE
       AS $$  
           import hashlib
           return hashlib.sha256(mes).hexdigest()
       $$
       ;
 {% endset %}
  
 {% set table = run_query(sql) %}
 
{%- endmacro %}

在Wheely,我们使用基于PostgreSQL的Amazon Redshift。对于Redshift,定期收集表统计信息并释放磁盘空间非常重要-分别是ANALYZE和VACUUM命令。

为此,每晚都会执行redshift_maintenance宏中的命令:

{% macro redshift_maintenance() %}
 
   {% set vacuumable_tables=run_query(vacuumable_tables_sql) %}
 
   {% for row in vacuumable_tables %}
       {% set message_prefix=loop.index ~ " of " ~ loop.length %}
 
       {%- set relation_to_vacuum = adapter.get_relation(
                                               database=row['table_database'],
                                               schema=row['table_schema'],
                                               identifier=row['table_name']
                                   ) -%}
       {% do run_query("commit") %}
 
       {% if relation_to_vacuum %}
           {% set start=modules.datetime.datetime.now() %}
           {{ dbt_utils.log_info(message_prefix ~ " Vacuuming " ~ relation_to_vacuum) }}
           {% do run_query("VACUUM " ~ relation_to_vacuum ~ " BOOST") %}
           {{ dbt_utils.log_info(message_prefix ~ " Analyzing " ~ relation_to_vacuum) }}
           {% do run_query("ANALYZE " ~ relation_to_vacuum) %}
           {% set end=modules.datetime.datetime.now() %}
           {% set total_seconds = (end - start).total_seconds() | round(2)  %}
           {{ dbt_utils.log_info(message_prefix ~ " Finished " ~ relation_to_vacuum ~ " in " ~ total_seconds ~ "s") }}
       {% else %}
           {{ dbt_utils.log_info(message_prefix ~ ' Skipping relation "' ~ row.values() | join ('"."') ~ '" as it does not exist') }}
       {% endif %}
 
   {% endfor %}
 
{% endmacro %}

DBT云


可以将DBT用作服务(托管服务)。在一组中:

  • 用于开发项目和模型的Web IDE
  • 作业配置和计划设置
  • 简单方便地访问日志
  • 具有您的项目文档的网站
  • CI(持续集成)连接


结论


烹饪和饮用DWH与喝冰沙一样令人愉悦和有益。DBT由Jinja,自定义扩展(模块),编译器,引擎(执行程序)和包管理器组成。一起收集这些元素,您将为数据仓库提供一个完善的工作环境。今天,在DWH中几乎没有更好的方法来管理转换。



DBT开发人员遵循的信念如下:

  • 代码而不是GUI是表达复杂分析逻辑的最佳抽象
  • 处理数据应适应软件开发的最佳实践(软件工程)

  • 关键数据基础架构必须由用户社区作为开源软件进行控制
  • 不仅分析工具,代码也将越来越多地成为开源社区的一部分。

这些核心信念催生了目前有850多家公司使用的产品,它们构成了将来将要创建的许多有趣扩展的基础。

对于那些感兴趣的人,有一个我在几个月前花费的公开课程的视频,这是OTUS- Amazon Redshift存储库的数据构建工具中的公开课程的一部分

除了DBT和数据仓库之外,作为OTUS平台上的数据工程师课程的一部分,我和我的同事们还就其他一些相关的现代主题进行课程:

  • 大数据应用的架构概念
  • 练习Spark和Spark Streaming
  • 加载数据源的学习方法和工具
  • 在DWH中建立分析展示柜
  • NoSQL: HBase, Cassandra, ElasticSearch
  •  
  • :

:


  1. DBT documentation — Introduction
  2. What, exactly, is dbt? — DBT 
  3. Data Build Tool Amazon Redshift — YouTube, OTUS
  4. Greenplum — 15 2020
  5. Data Engineering — OTUS
  6. Building a Mature Analytics Workflow
  7. It’s time for open source analytics — Open Source
  8. Continuous Integration and Automated Build Testing with dbtCloud — CI DBT
  9. Getting started with DBT tutorial — ,
  10. Jaffle shop — Github DBT Tutorial — Github,




.



All Articles