网站建设报价乱不一,泉州网站建设平台,中国空间站叫什么名,专业型企业网站有哪些上文我们介绍了增量策略的理论知识#xff0c;本文结合实际场景介绍如何合理利用增量策略#xff0c;内容包括应用场景、常见问题及解决方案。 应用场景
增量模型是定义如何增量向数据模型添加数据的有效方法——假设我们有描述信用卡交易的数据表——我们创建DBT模型#… 上文我们介绍了增量策略的理论知识本文结合实际场景介绍如何合理利用增量策略内容包括应用场景、常见问题及解决方案。 应用场景
增量模型是定义如何增量向数据模型添加数据的有效方法——假设我们有描述信用卡交易的数据表——我们创建DBT模型内容如下
{{config(materializedtable,) }}
selecttransaction_id,transaction_date,user_id,store_name_description,transaction_amount
from {{ ref(external_table_transaction) }}这将在目标数据库中创建一张表从表external_table_transaction加载事务数据。问题是每次我们重新运行这个查询时它都会重新加载整个表——表中的数据越多我们的查询就会变得越慢运行时间越长——解决该问题的方法是使用增量模型:
{{config(materializedincremental,unique_key[transaction_id],incremental_strategydeleteinsert,) }}
selecttransaction_id,transaction_date,user_id,store_name_description,transaction_amount
from {{ ref(external_table_transaction) }}
{%- if is_incremental() %}where transaction_date (select max(transaction_date) 1 as next_date from {{ this }})
{%- endif %}我们看到宏函数及jinja模版功能让DBT如此强大。上面代码基本实现了我们的诉求现在应该只从external_table_transaction加载事务增量数据其中transaction_date比表中的最新数据大1天——它简单而强大。我们现在只需要处理以前看不见的行而不是处理每次更新都会变大的数十亿行数据如果需要仍然可以选择完全刷新重新加载全部数据。
问题分析
增量模型非常吸引人——它们在逻辑上非常漂亮在定期处理数据流的情况下效率非常高。当我们需要控制正在处理哪些数据时问题就出现了增量模型不能处理需要重新运行特定分区的情况而是根据增量模型的规则加载数据。
也许从理论上讲这不是问题因为如果增量模型在理想环境中运行所有数据将只加载一次。但现实是混乱的——数据处理流程中断数据延迟交付或者在某些情况下根本不交付有时我们需要重新加载历史记录。此外如果调度dbt任务程序出错也会需要重新运行弥补问题。典型造成问题的场景如下 数据流中原始一些数据需要回溯加载2年前的修复数据我们需要加载历史数据这需要以一种特殊的方式完成因为增量策略无法加载2年前的历史记录。 数据流由于上游问题而中断了3天3天没有加载数据当数据流在第4天运行时它正在加载第1天的数据-换句话说它已经不同步其他数据。当然我们可以修改条件加载大于最大日期数据但是即使这样也会因为日期问题漏加载数据。 数据流的上游有跳过日(缺少数据的日子)我们的增量模型试图通过在数据中的最大日期上添加“1”来加载数据但该日期从未出现因此数据从未加载导致需要人工干预。
即使有这些问题我们也不能简单放弃增量模型对于大数据量场景处理任务是非常缓慢、且成本高。
幂等和分区
增量模型的关键问题是它们不是幂等的并且不能配置为针对特定日期分区运行。对于幂等脚本可以多次重新运行而不会产生副作用。如果历史数据有问题我们总是可以重新生成一些特定的分区——由于脚本是幂等的我们可以在给定的一天内多次运行而不会产生任何问题。
增量模型不具备重新运行数据的特定分区的能力——相反它们将所有数据视为流只加载看不见的数据——基本上加载满足特定规则的数据而不是数据的特定分区。
问题是有时我们需要数据流符合某种时间分区运行——可以是每小时、每天、每周、每月。如果我们重新运行数据流任务希望它在对应的时间分区上运行但是同时也需要增量模型只会“向前看”而不是在历史分区上配置。总之就是既要增量、又要灵活按时间分区幂等方式运行。
解决方案
解决方案很简单我们可以使用DBT变量并且不需要完全抛弃增量模型的功能。我们可以添加变量来显式地针特定分区运行:
{%- set target_date var(target_date, ) %}
{{config(materializedincremental,unique_key[transaction_id],incremental_strategydeleteinsert, ) }}
selecttransaction_id,transaction_date,user_id,store_name_description,transaction_amount
from {{ ref(external_table_transaction) }}
{%- if target_date ! %}where transaction_date {{ target_date }}
{%- else %}{%- if is_incremental() %}where transaction_date (select max(transaction_date) 1 as next_date from {{ this }}){%- endif %}
{%- endif %}这里在DBT模型添加了’ target_date ‘变量。如果’ target_date 未定义则模型将以增量行为运行但如果传入变量则模型将针对指定分区运行。当通过调度程序执行时这种方式会工作得很好。
此外我们采用deleteinsert增量策略模型现在已经变成幂等的——假设源数据是相同的我们可以用相同的参数运行相同的查询并期望得到相同的结果——而对于增量模型加载的数据取决于表的内容以及上游发生的更改。
这个解决方案有效地为我们提供了三种模式: 完全重新加载、增量加载和分区加载。因此在实际应用中非常实用而且可以很好地配合Airflow或其他调度工具实现自动化运行
dbt run --select my_model-- 显示完全刷新数据模型
dbt run --select my_model --full-refresh-- 指定参数执行
dbt run --select my_model --vars {target_date : 2024-01-01}最后总结
本文介绍了增量策略实际应用中的问题如何让增量模型能够高效幂等运行。我们提供良好的解决方案同时满足三种场景应用让数据转换流程更健壮、更高效。期待您的真诚反馈更多内容请阅读数据分析工程专栏。