diff --git a/README.md b/README.md index a982a07f..6c4c034a 100644 --- a/README.md +++ b/README.md @@ -1,25 +1,26 @@ # datavault4dbt by [Scalefree International GmbH](https://www.scalefree.com) - +[](https://www.datavault4dbt.com/) --- ### Included Macros -- Staging Area (For Hashing, prejoins and ghost records) -- Hubs, Links & Satellites (allowing multiple deltas) -- Non-Historized Links and Satellites -- Multi-Active Satellites -- Virtualized End-Dating (in Satellites) -- Reference Hubs, - Satellites, and - Tables -- PIT Tables - - Hook for Cleaning up PITs -- Snapshot Control +- [Staging Area (For Hashing, prejoins and ghost records)](https://www.datavault4dbt.com/documentation/macro-instructions/staging/) +- [Hubs](https://www.datavault4dbt.com/documentation/macro-instructions/hubs/standard-hub/), [Links](https://www.datavault4dbt.com/documentation/macro-instructions/links/standard-link/) & [Satellites](https://www.datavault4dbt.com/documentation/macro-instructions/satellites/standard-satellite/standard-satellite-v0/) (allowing multiple deltas) +- [Non-Historized Links](https://www.datavault4dbt.com/documentation/macro-instructions/links/non-historized-link/) and [Satellites](https://www.datavault4dbt.com/documentation/macro-instructions/satellites/non-historized-satellite/) +- [Multi-Active Satellites](https://www.datavault4dbt.com/documentation/macro-instructions/satellites/multi-active-satellite/multi-active-satellite-v0/) +- [Effectivity](https://www.datavault4dbt.com/documentation/macro-instructions/satellites/effectivity-satellite/) and [Record Tracking Satellites](https://www.datavault4dbt.com/documentation/macro-instructions/satellites/record-tracking-satellite/) +- [Virtualized End-Dating (in Satellites)](https://www.datavault4dbt.com/documentation/macro-instructions/satellites/standard-satellite/standard-satellite-v1/) +- [Reference Hubs](https://www.datavault4dbt.com/documentation/macro-instructions/reference-data/reference-hub/), [- Satellites](https://www.datavault4dbt.com/documentation/macro-instructions/reference-data/reference-satellite/reference-satellite-v0/), and [- Tables](https://www.datavault4dbt.com/documentation/macro-instructions/reference-data/reference-tables/) +- [PIT Tables](https://www.datavault4dbt.com/documentation/macro-instructions/business-vault/pit/) + - [Hook for Cleaning up PITs](https://www.datavault4dbt.com/documentation/macro-instructions/business-vault/pit/hook-cleanup-pits/) +- Snapshot Control [Tables](https://www.datavault4dbt.com/documentation/macro-instructions/business-vault/snapshot-control/snapshot-control-v0/) and [Views](https://www.datavault4dbt.com/documentation/macro-instructions/business-vault/snapshot-control/snapshot-control-v1/) ### Features With datavault4dbt you will get a lot of awesome features, including: - A Data Vault 2.0 implementation congruent to the original Data Vault 2.0 definition by Dan Linstedt -- Ready for both Persistent Staging Areas and Transient Staging Areas, due to the allowance of multiple deltas in all macros, without losing any intermediate changes - Enforcing standards in naming conventions by implementing [global variables](https://github.com/ScalefreeCOM/datavault4dbt/wiki/Global-variables) for technical columns +- Ready for both Persistent Staging Areas and Transient Staging Areas, due to the allowance of multiple deltas in all macros, without losing any intermediate changes - Enforcing standards in naming conventions by implementing [global variables](https://www.datavault4dbt.com/documentation/general-usage-notes/global-variables/) for technical columns - A fully auditable solution for a Data Warehouse - Creating a centralized, snapshot-based Business interface by using a centralized snapshot table supporting logarithmic logic - A modern insert-only approach that avoids updating data @@ -36,11 +37,8 @@ To use the macros efficiently, there are a few prerequisites you need to provide ### Resources: +- Find technical information about the macros, examples, and more, on [the official datavault4dbt Website](https://www.datavault4dbt.com/)! - Learn more about dbt [in the docs](https://docs.getdbt.com/docs/introduction) -- Check out [Discourse](https://discourse.getdbt.com/) for commonly asked questions and answers -- Join the [chat](https://community.getdbt.com/) on Slack for live discussions and support -- Check out [the blog](https://blog.getdbt.com/) for the latest news on dbt's development and best practices -- Find [dbt events](https://events.getdbt.com) near you - Check out the [Scalefree-Blog](https://www.scalefree.com/blog/) - [Data-Vault 2.0 with dbt #1](https://www.scalefree.com/scalefree-newsletter/data-vault-2-0-with-dbt-part-1/) - [Data-Vault 2.0 with dbt #2](https://www.scalefree.com/scalefree-newsletter/data-vault-2-0-with-dbt-part-2/) @@ -79,11 +77,11 @@ For further information on how to install packages in dbt, please visit the foll [https://docs.getdbt.com/docs/building-a-dbt-project/package-management](https://docs.getdbt.com/docs/building-a-dbt-project/package-management#how-do-i-add-a-package-to-my-project) ### Global variables -datavault4dbt is highly customizable by using many global variables. Since they are applied on multiple levels, a high rate of standardization across your data vault 2.0 solution is guaranteed. The default values of those variables are set inside the packages `dbt_project.yml` and should be copied to your own `dbt_project.yml`. For an explanation of all global variables see [the wiki](https://github.com/ScalefreeCOM/datavault4dbt/wiki/Global-variables). +datavault4dbt is highly customizable by using many global variables. Since they are applied on multiple levels, a high rate of standardization across your data vault 2.0 solution is guaranteed. The default values of those variables are set inside the packages `dbt_project.yml` and should be copied to your own `dbt_project.yml`. For an explanation of all global variables see [the docs](https://www.datavault4dbt.com/documentation/general-usage-notes/global-variables/). --- ## Usage -The datavault4dbt package provides macros for Staging and Creation of all DataVault-Entities you need, to build your own DataVault2.0 solution. The usage of the macros is well-explained in the documentation: https://github.com/ScalefreeCOM/datavault4dbt/wiki +The datavault4dbt package provides macros for Staging and Creation of all DataVault-Entities you need, to build your own DataVault2.0 solution. The usage of the macros is well-explained in the [documentation]([url](https://www.datavault4dbt.com/documentation/)). --- ## Contributing diff --git a/macros/supporting/ghost_record_per_datatype.sql b/macros/supporting/ghost_record_per_datatype.sql index 341715dc..03698646 100644 --- a/macros/supporting/ghost_record_per_datatype.sql +++ b/macros/supporting/ghost_record_per_datatype.sql @@ -82,7 +82,7 @@ {%- if ghost_record_type == 'unknown' -%} - {%- if datatype == 'TIMESTAMP' or datatype == 'TIMESTAMP WITH LOCAL TIMEZONE' %} {{- datavault4dbt.string_to_timestamp( timestamp_format , beginning_of_all_times) }} as "{{ column_name }}" + {%- if datatype == 'TIMESTAMP' or datatype == 'TIMESTAMP WITH LOCAL TIMEZONE' %} {{- datavault4dbt.string_to_timestamp( timestamp_format , beginning_of_all_times) }} as {{ alias }} {%- elif datatype == 'DATE'-%} TO_DATE('{{ beginning_of_all_times_date }}', '{{ date_format }}' ) as {{ alias }} {%- elif datatype.upper().startswith('VARCHAR') -%} {%- if col_size is not none -%} @@ -110,7 +110,7 @@ {%- elif ghost_record_type == 'error' -%} - {%- if datatype == 'TIMESTAMP' or datatype == 'TIMESTAMP WITH LOCAL TIME ZONE' %} {{- datavault4dbt.string_to_timestamp( timestamp_format , end_of_all_times) }} as "{{ column_name }}" + {%- if datatype == 'TIMESTAMP' or datatype == 'TIMESTAMP WITH LOCAL TIME ZONE' %} {{- datavault4dbt.string_to_timestamp( timestamp_format , end_of_all_times) }} as {{ alias }} {%- elif datatype == 'DATE'-%} TO_DATE('{{ end_of_all_times_date }}', '{{ date_format }}' ) as {{ alias }} {%- elif datatype.upper().startswith('VARCHAR') -%} {%- if col_size is not none -%} @@ -267,7 +267,7 @@ {%- if ghost_record_type == 'unknown' -%} - {%- if datatype in ['DATETIME', 'DATETIME2', 'DATETIMEOFFSET'] %} CONVERT({{ datatype }}, {{- datavault4dbt.string_to_timestamp( timestamp_format , beginning_of_all_times) }}) as "{{ column_name }}" + {%- if datatype in ['DATETIME', 'DATETIME2', 'DATETIMEOFFSET'] %} CONVERT({{ datatype }}, {{- datavault4dbt.string_to_timestamp( timestamp_format , beginning_of_all_times) }}) as "{{ alias }}" {%- elif 'CHAR' in datatype -%} {%- if col_size is not none -%} {%- if (col_size | int) == -1 -%} @@ -297,7 +297,7 @@ {%- elif ghost_record_type == 'error' -%} - {%- if datatype in ['DATETIME', 'DATETIME2', 'DATETIMEOFFSET'] %} CONVERT({{ datatype }}, {{- datavault4dbt.string_to_timestamp( timestamp_format , end_of_all_times) }}) as "{{ column_name }}" + {%- if datatype in ['DATETIME', 'DATETIME2', 'DATETIMEOFFSET'] %} CONVERT({{ datatype }}, {{- datavault4dbt.string_to_timestamp( timestamp_format , end_of_all_times) }}) as "{{ alias }}" {%- elif 'CHAR' in datatype -%} {%- if col_size is not none -%} {%- if (col_size | int) == -1 -%} diff --git a/macros/supporting/string_to_timestamp.sql b/macros/supporting/string_to_timestamp.sql index 3ae80306..05c52de4 100644 --- a/macros/supporting/string_to_timestamp.sql +++ b/macros/supporting/string_to_timestamp.sql @@ -8,7 +8,7 @@ {%- endmacro -%} {%- macro exasol__string_to_timestamp(format, timestamp) -%} - TO_TIMESTAMP('{{ timestamp }}', '{{ format }}') + CAST(TO_TIMESTAMP('{{ timestamp }}', '{{ format }}') AS {{ datavault4dbt.timestamp_default_dtype() }}) {%- endmacro -%} {%- macro snowflake__string_to_timestamp(format, timestamp) -%} diff --git a/macros/tables/bigquery/eff_sat_v0.sql b/macros/tables/bigquery/eff_sat_v0.sql new file mode 100644 index 00000000..b9b70a80 --- /dev/null +++ b/macros/tables/bigquery/eff_sat_v0.sql @@ -0,0 +1,300 @@ +{%- macro default__eff_sat_v0(source_model, tracked_hashkey, src_ldts, src_rsrc, is_active_alias, source_is_single_batch, disable_hwm) -%} + +{%- set end_of_all_times = datavault4dbt.end_of_all_times() -%} +{%- set beginning_of_all_times = datavault4dbt.beginning_of_all_times() -%} +{%- set timestamp_format = datavault4dbt.timestamp_format() -%} + +{%- set ns = namespace(last_cte= "") -%} + +{%- set source_relation = ref(source_model) -%} + +{%- set tracked_hashkey = datavault4dbt.escape_column_names(tracked_hashkey) -%} +{%- set is_active_alias = datavault4dbt.escape_column_names(is_active_alias) -%} +{%- set src_ldts = datavault4dbt.escape_column_names(src_ldts) -%} +{%- set src_rsrc = datavault4dbt.escape_column_names(src_rsrc) -%} + +{{ log('columns to select: '~final_columns_to_select, false) }} + +{{ datavault4dbt.prepend_generated_by() }} + +WITH + +{# + In all cases, the source model is selected, and optionally a HWM is applied. +#} +source_data AS ( + + SELECT + {{ tracked_hashkey }}, + {{ src_ldts }} + FROM {{ source_relation }} src + WHERE {{ src_ldts }} NOT IN ({{ datavault4dbt.string_to_timestamp(timestamp_format, beginning_of_all_times) }}, {{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }}) + {%- if is_incremental() and not disable_hwm %} + AND src.{{ src_ldts }} > ( + SELECT + MAX({{ src_ldts }}) + FROM {{ this }} + WHERE {{ src_ldts }} != {{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }} + ) + {%- endif %} +), + +{# + In all incremental cases, the current status for each hashkey is selected from the existing Effectivity Satellite. +#} +{%- if is_incremental() %} +current_status AS ( + + SELECT + {{ tracked_hashkey }}, + {{ is_active_alias }} + FROM {{ this }} + QUALIFY + ROW_NUMBER() OVER(PARTITION BY {{ tracked_hashkey }} ORDER BY {{ src_ldts }} DESC) = 1 + +), +{% endif %} + +{# + This block is for multi-batch processing. +#} +{% if not source_is_single_batch %} + + {# + List of all Hashkeys with their date of first appearance in the source model. + #} + hashkeys AS ( + + SELECT + {{ tracked_hashkey }}, + MIN({{ src_ldts }}) as first_appearance + FROM source_data + GROUP BY {{ tracked_hashkey }} + + ), + + {# + Distinct list of load dates in the multi-batch source. + #} + load_dates AS ( + + SELECT Distinct + {{ src_ldts }} + FROM source_data + + ), + + {# + All combinations of hashkeys and loaddates, for loaddates after the first appearance of a hashkey. + #} + history AS ( + + SELECT + hk.{{ tracked_hashkey }}, + ld.{{ src_ldts }} + FROM hashkeys hk + CROSS JOIN load_dates ld + WHERE ld.{{ src_ldts }} >= hk.first_appearance + + ), + + {# + All theoretical combinations are checked against the actual occurences of hashkeys in each batch / loaddate. + If a Hashkey is part of a load/batch, is_active_alias is set to 1, because the hashkey was active in that load/batch. + If a Hashkey is not part of a load/batch, is_active_alias is set to 0, because the hashkey was not active in that load/batch. + #} + is_active AS ( + + SELECT + h.{{ tracked_hashkey }}, + h.{{ src_ldts }}, + CASE + WHEN src.{{ tracked_hashkey }} IS NULL THEN 0 + ELSE 1 + END as {{ is_active_alias }} + FROM history h + LEFT JOIN source_data src + ON src.{{ tracked_hashkey }} = h.{{ tracked_hashkey }} + AND src.{{ src_ldts }} = h.{{ src_ldts }} + + ), + + {# + The rows are deduplicated on the is_active_alias, to only include status changes. + Additionally, a ROW_NUMBER() is calculated in incremental runs, to use it in the next step for comparison against the current status. + #} + deduplicated_incoming_prep AS ( + + SELECT + is_active.{{ tracked_hashkey }}, + is_active.{{ src_ldts }}, + is_active.{{ is_active_alias }}, + LAG(is_active.{{ is_active_alias }}) OVER (PARTITION BY {{ tracked_hashkey }} ORDER BY {{ src_ldts }}) as lag_is_active + + FROM is_active + + ), + + deduplicated_incoming AS ( + + SELECT + deduplicated_incoming_prep.{{ tracked_hashkey }}, + deduplicated_incoming_prep.{{ src_ldts }}, + deduplicated_incoming_prep.{{ is_active_alias }} + + FROM + deduplicated_incoming_prep + WHERE + deduplicated_incoming_prep.{{ is_active_alias }} != deduplicated_incoming_prep.lag_is_active + OR deduplicated_incoming_prep.lag_is_active IS NULL + + ), + + {% set ns.last_cte = 'deduplicated_incoming' %} + +{# + This block is for single-batch processing +#} +{% else %} + + {# + In initial loads of single-batch eff sats, every hashkey of the source is set to active. + #} + new_hashkeys AS ( + + SELECT DISTINCT + src.{{ tracked_hashkey }}, + src.{{ src_ldts }}, + 1 as {{ is_active_alias }} + FROM source_data src + + {# + For incremental runs of single-batch eff sats, only hashkeys that are not active right now are set to active. + This automatically includes totally new hashkeys, or hashkeys that are currently set to inactive. + #} + {% if is_incremental() %} + LEFT JOIN current_status cs + ON src.{{ tracked_hashkey }} = cs.{{ tracked_hashkey }} + AND cs.{{ is_active_alias }} = 1 + WHERE cs.{{ tracked_hashkey }} IS NULL + {% endif %} + + ), + + {% set ns.last_cte = 'new_hashkeys' %} + +{% endif %} + +{# + In all incremental runs, the source needs to be scanned for all currently active hashkeys. + If they are no longer present, they will be deactived. +#} +{%- if is_incremental() %} + + {%- if not source_is_single_batch %} + disappeared_hashkeys AS ( + + SELECT DISTINCT + cs.{{ tracked_hashkey }}, + ldts.min_ldts as {{ src_ldts }}, + 0 as {{ is_active_alias }} + FROM current_status cs + LEFT JOIN ( + SELECT + MIN({{ src_ldts }}) as min_ldts + FROM deduplicated_incoming) ldts + ON 1 = 1 + LEFT JOIN deduplicated_incoming src + ON src.{{ tracked_hashkey }} = cs.{{ tracked_hashkey }} + AND src.{{ src_ldts }} = ldts.min_ldts + WHERE + cs.{{ is_active_alias }} = 1 + AND src.{{ tracked_hashkey }} IS NULL + AND ldts.min_ldts IS NOT NULL + + ), + {% else %} + disappeared_hashkeys AS ( + + SELECT DISTINCT + cs.{{ tracked_hashkey }}, + ldts.min_ldts as {{ src_ldts }}, + 0 as {{ is_active_alias }} + FROM current_status cs + LEFT JOIN ( + SELECT + MIN({{ src_ldts }}) as min_ldts + FROM source_data) ldts + ON 1 = 1 + WHERE NOT EXISTS ( + SELECT + 1 + FROM source_data src + WHERE src.{{ tracked_hashkey }} = cs.{{ tracked_hashkey }} + ) + AND cs.{{ is_active_alias }} = 1 + AND ldts.min_ldts IS NOT NULL + + ), + {% endif %} +{%- endif %} + +records_to_insert AS ( + + {# + This first part of the UNION includes: + - for single-batch loads: Only is_active_alias = 1, deactivations are handled later + - for multi-batch loads: Ativation and deactivation inside the multiple loads + #} + SELECT + di.{{ tracked_hashkey }}, + di.{{ src_ldts }}, + di.{{ is_active_alias }} + FROM {{ ns.last_cte }} di + + + {%- if is_incremental() %} + + {# + For incremental multi-batch loads, the earliest to-be inserted status is compared to the current status. + It will only be inserted if the status changed. We use the ROW_NUMBER() + #} + {%- if not source_is_single_batch %} + WHERE NOT EXISTS ( + SELECT 1 + FROM current_status + WHERE {{ datavault4dbt.multikey(tracked_hashkey, prefix=['current_status', 'di'], condition='=') }} + AND {{ datavault4dbt.multikey(is_active_alias, prefix=['current_status', 'di'], condition='=') }} + AND di.{{ src_ldts }} = (SELECT MIN({{ src_ldts }}) FROM deduplicated_incoming) + ) + AND di.{{ src_ldts }} > (SELECT MAX({{ src_ldts }}) FROM {{ this }}) + {% endif %} + + {# + For all incremental loads, the disappeared hashkeys are UNIONed. + #} + UNION ALL + + SELECT + {{ tracked_hashkey }}, + {{ src_ldts }}, + {{ is_active_alias }} + FROM disappeared_hashkeys + + {%- endif %} + +) + +SELECT * +FROM records_to_insert ri + +{% if is_incremental() %} +WHERE NOT EXISTS ( + SELECT 1 + FROM {{ this }} t + WHERE t.{{ tracked_hashkey }} = ri.{{ tracked_hashkey }} + AND t.{{ src_ldts }} = ri.{{ src_ldts }} +) +{% endif %} + +{%- endmacro -%} \ No newline at end of file diff --git a/macros/tables/databricks/eff_sat_v0.sql b/macros/tables/databricks/eff_sat_v0.sql new file mode 100644 index 00000000..7dbdd2ee --- /dev/null +++ b/macros/tables/databricks/eff_sat_v0.sql @@ -0,0 +1,292 @@ +{%- macro databricks__eff_sat_v0(source_model, tracked_hashkey, src_ldts, src_rsrc, is_active_alias, source_is_single_batch, disable_hwm) -%} + +{%- set end_of_all_times = datavault4dbt.end_of_all_times() -%} +{%- set timestamp_format = datavault4dbt.timestamp_format() -%} + +{%- set ns = namespace(last_cte= "") -%} + +{%- set source_relation = ref(source_model) -%} + +{%- set tracked_hashkey = datavault4dbt.escape_column_names(tracked_hashkey) -%} +{%- set is_active_alias = datavault4dbt.escape_column_names(is_active_alias) -%} +{%- set src_ldts = datavault4dbt.escape_column_names(src_ldts) -%} +{%- set src_rsrc = datavault4dbt.escape_column_names(src_rsrc) -%} + +{{ log('columns to select: '~final_columns_to_select, false) }} + +{{ datavault4dbt.prepend_generated_by() }} + +WITH + +{# + In all cases, the source model is selected, and optionally a HWM is applied. +#} +source_data AS ( + + SELECT + {{ tracked_hashkey }}, + {{ src_ldts }} + FROM {{ source_relation }} src + WHERE {{ src_ldts }} NOT IN ('{{ datavault4dbt.beginning_of_all_times() }}', '{{ datavault4dbt.end_of_all_times() }}') + {%- if is_incremental() and not disable_hwm %} + AND src.{{ src_ldts }} > ( + SELECT + MAX({{ src_ldts }}) + FROM {{ this }} + WHERE {{ src_ldts }} != {{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }} + ) + {%- endif %} +), + +{# + In all incremental cases, the current status for each hashkey is selected from the existing Effectivity Satellite. +#} +{%- if is_incremental() %} +current_status AS ( + + SELECT + {{ tracked_hashkey }}, + {{ is_active_alias }} + FROM {{ this }} + QUALIFY + ROW_NUMBER() OVER(PARTITION BY {{ tracked_hashkey }} ORDER BY {{ src_ldts }} DESC) = 1 + +), +{% endif %} + +{# + This block is for multi-batch processing. +#} +{% if not source_is_single_batch %} + + {# + List of all Hashkeys with their date of first appearance in the source model. + #} + hashkeys AS ( + + SELECT + {{ tracked_hashkey }}, + MIN({{ src_ldts }}) as first_appearance + FROM source_data + GROUP BY {{ tracked_hashkey }} + + ), + + {# + Distinct list of load dates in the multi-batch source. + #} + load_dates AS ( + + SELECT Distinct + {{ src_ldts }} + FROM source_data + + ), + + {# + All combinations of hashkeys and loaddates, for loaddates after the first appearance of a hashkey. + #} + history AS ( + + SELECT + hk.{{ tracked_hashkey }}, + ld.{{ src_ldts }} + FROM hashkeys hk + CROSS JOIN load_dates ld + WHERE ld.{{ src_ldts }} >= hk.first_appearance + + ), + + {# + All theoretical combinations are checked against the actual occurences of hashkeys in each batch / loaddate. + If a Hashkey is part of a load/batch, is_active_alias is set to 1, because the hashkey was active in that load/batch. + If a Hashkey is not part of a load/batch, is_active_alias is set to 0, because the hashkey was not active in that load/batch. + #} + is_active AS ( + + SELECT + h.{{ tracked_hashkey }}, + h.{{ src_ldts }}, + CASE + WHEN src.{{ tracked_hashkey }} IS NULL THEN 0 + ELSE 1 + END as {{ is_active_alias }} + FROM history h + LEFT JOIN source_data src + ON src.{{ tracked_hashkey }} = h.{{ tracked_hashkey }} + AND src.{{ src_ldts }} = h.{{ src_ldts }} + + ), + + {# + The rows are deduplicated on the is_active_alias, to only include status changes. + Additionally, a ROW_NUMBER() is calculated in incremental runs, to use it in the next step for comparison against the current status. + #} + deduplicated_incoming AS ( + + SELECT + is_active.{{ tracked_hashkey }}, + is_active.{{ src_ldts }}, + is_active.{{ is_active_alias }} + + {% if is_incremental() -%} + , ROW_NUMBER() OVER(PARTITION BY is_active.{{ tracked_hashkey }} ORDER BY is_active.{{ src_ldts }}) as rn + {%- endif %} + + FROM is_active + QUALIFY + CASE + WHEN is_active.{{ is_active_alias }} = LAG(is_active.{{ is_active_alias }}) OVER (PARTITION BY {{ tracked_hashkey }} ORDER BY {{ src_ldts }}) THEN FALSE + ELSE TRUE + END + + ), + + {% set ns.last_cte = 'deduplicated_incoming' %} + +{# + This block is for single-batch processing +#} +{% else %} + + {# + In initial loads of single-batch eff sats, every hashkey of the source is set to active. + #} + new_hashkeys AS ( + + SELECT DISTINCT + src.{{ tracked_hashkey }}, + src.{{ src_ldts }}, + 1 as {{ is_active_alias }} + FROM source_data src + + {# + For incremental runs of single-batch eff sats, only hashkeys that are not active right now are set to active. + This automatically includes totally new hashkeys, or hashkeys that are currently set to inactive. + #} + {% if is_incremental() %} + LEFT JOIN current_status cs + ON src.{{ tracked_hashkey }} = cs.{{ tracked_hashkey }} + AND cs.{{ is_active_alias }} = 1 + WHERE cs.{{ tracked_hashkey }} IS NULL + {% endif %} + + ), + + {% set ns.last_cte = 'new_hashkeys' %} + +{% endif %} + +{# + In all incremental runs, the source needs to be scanned for all currently active hashkeys. + If they are no longer present, they will be deactived. +#} +{%- if is_incremental() %} + + {%- if not source_is_single_batch %} + disappeared_hashkeys AS ( + + SELECT DISTINCT + cs.{{ tracked_hashkey }}, + ldts.min_ldts as {{ src_ldts }}, + 0 as {{ is_active_alias }} + FROM current_status cs + LEFT JOIN ( + SELECT + MIN({{ src_ldts }}) as min_ldts + FROM deduplicated_incoming) ldts + ON 1 = 1 + LEFT JOIN deduplicated_incoming src + ON src.{{ tracked_hashkey }} = cs.{{ tracked_hashkey }} + AND src.{{ src_ldts }} = ldts.min_ldts + WHERE + cs.{{ is_active_alias }} = 1 + AND src.{{ tracked_hashkey }} IS NULL + AND ldts.min_ldts IS NOT NULL + + ), + {% else %} + disappeared_hashkeys AS ( + + SELECT DISTINCT + cs.{{ tracked_hashkey }}, + ldts.min_ldts as {{ src_ldts }}, + 0 as {{ is_active_alias }} + FROM current_status cs + LEFT JOIN ( + SELECT + MIN({{ src_ldts }}) as min_ldts + FROM source_data) ldts + ON 1 = 1 + WHERE NOT EXISTS ( + SELECT + 1 + FROM source_data src + WHERE src.{{ tracked_hashkey }} = cs.{{ tracked_hashkey }} + ) + AND cs.{{ is_active_alias }} = 1 + AND ldts.min_ldts IS NOT NULL + + ), + {% endif %} +{%- endif %} + +records_to_insert AS ( + + {# + This first part of the UNION includes: + - for single-batch loads: Only is_active_alias = 1, deactivations are handled later + - for multi-batch loads: Ativation and deactivation inside the multiple loads + #} + SELECT + di.{{ tracked_hashkey }}, + di.{{ src_ldts }}, + di.{{ is_active_alias }} + FROM {{ ns.last_cte }} di + + + {%- if is_incremental() %} + + {# + For incremental multi-batch loads, the earliest to-be inserted status is compared to the current status. + It will only be inserted if the status changed. We use the ROW_NUMBER() + #} + {%- if not source_is_single_batch %} + WHERE NOT EXISTS ( + SELECT 1 + FROM current_status + WHERE {{ datavault4dbt.multikey(tracked_hashkey, prefix=['current_status', 'di'], condition='=') }} + AND {{ datavault4dbt.multikey(is_active_alias, prefix=['current_status', 'di'], condition='=') }} + AND di.{{ src_ldts }} = (SELECT MIN({{ src_ldts }}) FROM deduplicated_incoming) + ) + AND di.{{ src_ldts }} > (SELECT MAX({{ src_ldts }}) FROM {{ this }}) + {% endif %} + + {# + For all incremental loads, the disappeared hashkeys are UNIONed. + #} + UNION + + SELECT + {{ tracked_hashkey }}, + {{ src_ldts }}, + {{ is_active_alias }} + FROM disappeared_hashkeys + + {%- endif %} + +) + +SELECT * +FROM records_to_insert ri + +{% if is_incremental() %} +WHERE NOT EXISTS ( + SELECT 1 + FROM {{ this }} t + WHERE t.{{ tracked_hashkey }} = ri.{{ tracked_hashkey }} + AND t.{{ src_ldts }} = ri.{{ src_ldts }} +) +{% endif %} + +{%- endmacro -%} \ No newline at end of file diff --git a/macros/tables/eff_sat_v0.sql b/macros/tables/eff_sat_v0.sql new file mode 100644 index 00000000..2857e263 --- /dev/null +++ b/macros/tables/eff_sat_v0.sql @@ -0,0 +1,16 @@ +{%- macro eff_sat_v0(source_model, tracked_hashkey, src_ldts=none, src_rsrc=none, is_active_alias=none, source_is_single_batch=true, disable_hwm=false) -%} + + {# Applying the default aliases as stored inside the global variables, if src_ldts, src_rsrc, and ledts_alias are not set. #} + {%- set src_ldts = datavault4dbt.replace_standard(src_ldts, 'datavault4dbt.ldts_alias', 'ldts') -%} + {%- set src_rsrc = datavault4dbt.replace_standard(src_rsrc, 'datavault4dbt.rsrc_alias', 'rsrc') -%} + {%- set is_active_alias = datavault4dbt.replace_standard(is_active_alias, 'datavault4dbt.is_active_alias', 'is_active') -%} + + {{ return(adapter.dispatch('eff_sat_v0', 'datavault4dbt')(tracked_hashkey=tracked_hashkey, + src_ldts=src_ldts, + src_rsrc=src_rsrc, + is_active_alias=is_active_alias, + source_model=source_model, + source_is_single_batch=source_is_single_batch, + disable_hwm=disable_hwm) ) + }} +{%- endmacro -%} \ No newline at end of file diff --git a/macros/tables/exasol/eff_sat_v0.sql b/macros/tables/exasol/eff_sat_v0.sql new file mode 100644 index 00000000..9248be70 --- /dev/null +++ b/macros/tables/exasol/eff_sat_v0.sql @@ -0,0 +1,293 @@ +{%- macro exasol__eff_sat_v0(source_model, tracked_hashkey, src_ldts, src_rsrc, is_active_alias, source_is_single_batch, disable_hwm) -%} + +{%- set end_of_all_times = datavault4dbt.end_of_all_times() -%} +{%- set beginning_of_all_times = datavault4dbt.beginning_of_all_times() -%} +{%- set timestamp_format = datavault4dbt.timestamp_format() -%} + +{%- set ns = namespace(last_cte= "") -%} + +{%- set source_relation = ref(source_model) -%} + +{%- set tracked_hashkey = datavault4dbt.escape_column_names(tracked_hashkey) -%} +{%- set is_active_alias = datavault4dbt.escape_column_names(is_active_alias) -%} +{%- set src_ldts = datavault4dbt.escape_column_names(src_ldts) -%} +{%- set src_rsrc = datavault4dbt.escape_column_names(src_rsrc) -%} + +{{ log('columns to select: '~final_columns_to_select, false) }} + +{{ datavault4dbt.prepend_generated_by() }} + +WITH + +{# + In all cases, the source model is selected, and optionally a HWM is applied. +#} +source_data AS ( + + SELECT + {{ tracked_hashkey }}, + {{ src_ldts }} + FROM {{ source_relation }} src + WHERE {{ src_ldts }} NOT IN ({{ datavault4dbt.string_to_timestamp(timestamp_format, beginning_of_all_times) }}, {{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }}) + {%- if is_incremental() and not disable_hwm %} + AND src.{{ src_ldts }} > ( + SELECT + MAX({{ src_ldts }}) + FROM {{ this }} + WHERE {{ src_ldts }} != {{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }} + ) + {%- endif %} +), + +{# + In all incremental cases, the current status for each hashkey is selected from the existing Effectivity Satellite. +#} +{%- if is_incremental() %} +current_status AS ( + + SELECT + {{ tracked_hashkey }}, + {{ is_active_alias }} + FROM {{ this }} + QUALIFY + ROW_NUMBER() OVER(PARTITION BY {{ tracked_hashkey }} ORDER BY {{ src_ldts }} DESC) = 1 + +), +{% endif %} + +{# + This block is for multi-batch processing. +#} +{% if not source_is_single_batch %} + + {# + List of all Hashkeys with their date of first appearance in the source model. + #} + hashkeys AS ( + + SELECT + {{ tracked_hashkey }}, + MIN({{ src_ldts }}) as first_appearance + FROM source_data + GROUP BY {{ tracked_hashkey }} + + ), + + {# + Distinct list of load dates in the multi-batch source. + #} + load_dates AS ( + + SELECT Distinct + {{ src_ldts }} + FROM source_data + + ), + + {# + All combinations of hashkeys and loaddates, for loaddates after the first appearance of a hashkey. + #} + history AS ( + + SELECT + hk.{{ tracked_hashkey }}, + ld.{{ src_ldts }} + FROM hashkeys hk + CROSS JOIN load_dates ld + WHERE ld.{{ src_ldts }} >= hk.first_appearance + + ), + + {# + All theoretical combinations are checked against the actual occurences of hashkeys in each batch / loaddate. + If a Hashkey is part of a load/batch, is_active_alias is set to 1, because the hashkey was active in that load/batch. + If a Hashkey is not part of a load/batch, is_active_alias is set to 0, because the hashkey was not active in that load/batch. + #} + is_active AS ( + + SELECT + h.{{ tracked_hashkey }}, + h.{{ src_ldts }}, + CASE + WHEN src.{{ tracked_hashkey }} IS NULL THEN 0 + ELSE 1 + END as {{ is_active_alias }} + FROM history h + LEFT JOIN source_data src + ON src.{{ tracked_hashkey }} = h.{{ tracked_hashkey }} + AND src.{{ src_ldts }} = h.{{ src_ldts }} + + ), + + {# + The rows are deduplicated on the is_active_alias, to only include status changes. + Additionally, a ROW_NUMBER() is calculated in incremental runs, to use it in the next step for comparison against the current status. + #} + deduplicated_incoming AS ( + + SELECT + is_active.{{ tracked_hashkey }}, + is_active.{{ src_ldts }}, + is_active.{{ is_active_alias }} + + {% if is_incremental() -%} + , ROW_NUMBER() OVER(PARTITION BY is_active.{{ tracked_hashkey }} ORDER BY is_active.{{ src_ldts }}) as rn + {%- endif %} + + FROM is_active + QUALIFY + CASE + WHEN is_active.{{ is_active_alias }} = LAG(is_active.{{ is_active_alias }}) OVER (PARTITION BY {{ tracked_hashkey }} ORDER BY {{ src_ldts }}) THEN FALSE + ELSE TRUE + END + + ), + + {% set ns.last_cte = 'deduplicated_incoming' %} + +{# + This block is for single-batch processing +#} +{% else %} + + {# + In initial loads of single-batch eff sats, every hashkey of the source is set to active. + #} + new_hashkeys AS ( + + SELECT DISTINCT + src.{{ tracked_hashkey }}, + src.{{ src_ldts }}, + 1 as {{ is_active_alias }} + FROM source_data src + + {# + For incremental runs of single-batch eff sats, only hashkeys that are not active right now are set to active. + This automatically includes totally new hashkeys, or hashkeys that are currently set to inactive. + #} + {% if is_incremental() %} + LEFT JOIN current_status cus + ON src.{{ tracked_hashkey }} = cus.{{ tracked_hashkey }} + AND cus.{{ is_active_alias }} = 1 + WHERE cus.{{ tracked_hashkey }} IS NULL + {% endif %} + + ), + + {% set ns.last_cte = 'new_hashkeys' %} + +{% endif %} + +{# + In all incremental runs, the source needs to be scanned for all currently active hashkeys. + If they are no longer present, they will be deactived. +#} +{%- if is_incremental() %} + + {%- if not source_is_single_batch %} + disappeared_hashkeys AS ( + + SELECT DISTINCT + cus.{{ tracked_hashkey }}, + ldts.min_ldts as {{ src_ldts }}, + 0 as {{ is_active_alias }} + FROM current_status cus + LEFT JOIN ( + SELECT + MIN({{ src_ldts }}) as min_ldts + FROM deduplicated_incoming) ldts + ON 1 = 1 + LEFT JOIN deduplicated_incoming src + ON src.{{ tracked_hashkey }} = cus.{{ tracked_hashkey }} + AND src.{{ src_ldts }} = ldts.min_ldts + WHERE + cus.{{ is_active_alias }} = 1 + AND src.{{ tracked_hashkey }} IS NULL + AND ldts.min_ldts IS NOT NULL + + ), + {% else %} + disappeared_hashkeys AS ( + + SELECT DISTINCT + cus.{{ tracked_hashkey }}, + ldts.min_ldts as {{ src_ldts }}, + 0 as {{ is_active_alias }} + FROM current_status cus + LEFT JOIN ( + SELECT + MIN({{ src_ldts }}) as min_ldts + FROM source_data) ldts + ON 1 = 1 + WHERE NOT EXISTS ( + SELECT + 1 + FROM source_data src + WHERE src.{{ tracked_hashkey }} = cus.{{ tracked_hashkey }} + ) + AND cus.{{ is_active_alias }} = 1 + AND ldts.min_ldts IS NOT NULL + + ), + {% endif %} +{%- endif %} + +records_to_insert AS ( + + {# + This first part of the UNION includes: + - for single-batch loads: Only is_active_alias = 1, deactivations are handled later + - for multi-batch loads: Ativation and deactivation inside the multiple loads + #} + SELECT + di.{{ tracked_hashkey }}, + di.{{ src_ldts }}, + di.{{ is_active_alias }} + FROM {{ ns.last_cte }} di + + + {%- if is_incremental() %} + + {# + For incremental multi-batch loads, the earliest to-be inserted status is compared to the current status. + It will only be inserted if the status changed. We use the ROW_NUMBER() + #} + {%- if not source_is_single_batch %} + WHERE NOT EXISTS ( + SELECT 1 + FROM current_status + WHERE {{ datavault4dbt.multikey(tracked_hashkey, prefix=['current_status', 'di'], condition='=') }} + AND {{ datavault4dbt.multikey(is_active_alias, prefix=['current_status', 'di'], condition='=') }} + AND di.{{ src_ldts }} = (SELECT MIN({{ src_ldts }}) FROM deduplicated_incoming) + ) + AND di.{{ src_ldts }} > (SELECT MAX({{ src_ldts }}) FROM {{ this }}) + {% endif %} + + {# + For all incremental loads, the disappeared hashkeys are UNIONed. + #} + UNION + + SELECT + {{ tracked_hashkey }}, + {{ src_ldts }}, + {{ is_active_alias }} + FROM disappeared_hashkeys + + {%- endif %} + +) + +SELECT * +FROM records_to_insert ri + +{% if is_incremental() %} +WHERE NOT EXISTS ( + SELECT 1 + FROM {{ this }} t + WHERE t.{{ tracked_hashkey }} = ri.{{ tracked_hashkey }} + AND t.{{ src_ldts }} = ri.{{ src_ldts }} +) +{% endif %} + +{%- endmacro -%} \ No newline at end of file diff --git a/macros/tables/fabric/eff_sat_v0.sql b/macros/tables/fabric/eff_sat_v0.sql new file mode 100644 index 00000000..c50d5d34 --- /dev/null +++ b/macros/tables/fabric/eff_sat_v0.sql @@ -0,0 +1,315 @@ +{%- macro fabric__eff_sat_v0(source_model, tracked_hashkey, src_ldts, src_rsrc, is_active_alias, source_is_single_batch, disable_hwm) -%} + +{%- set end_of_all_times = datavault4dbt.end_of_all_times() -%} +{%- set timestamp_format = datavault4dbt.timestamp_format() -%} + +{%- set ns = namespace(last_cte= "") -%} + +{%- set source_relation = ref(source_model) -%} + +{%- set tracked_hashkey = datavault4dbt.escape_column_names(tracked_hashkey) -%} +{%- set is_active_alias = datavault4dbt.escape_column_names(is_active_alias) -%} +{%- set src_ldts = datavault4dbt.escape_column_names(src_ldts) -%} +{%- set src_rsrc = datavault4dbt.escape_column_names(src_rsrc) -%} + +{{ log('columns to select: '~final_columns_to_select, false) }} + +{{ datavault4dbt.prepend_generated_by() }} + +WITH + +{# + In all cases, the source model is selected, and optionally a HWM is applied. +#} +{% if is_incremental() and not disable_hwm %} +max_ldts_prep AS ( + + SELECT + MAX({{ src_ldts }}) AS max_ldts + FROM {{ this }} + WHERE {{ src_ldts }} != {{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }} +), +{% endif %} + +source_data AS ( + + SELECT + {{ tracked_hashkey }}, + {{ src_ldts }} + FROM {{ source_relation }} src + WHERE {{ src_ldts }} NOT IN ('{{ datavault4dbt.beginning_of_all_times() }}', '{{ datavault4dbt.end_of_all_times() }}') + {%- if is_incremental() and not disable_hwm %} + AND src.{{ src_ldts }} > ( + SELECT max_ldts FROM max_ldts_prep + ) + {%- endif %} +), + +{# + In all incremental cases, the current status for each hashkey is selected from the existing Effectivity Satellite. +#} +{%- if is_incremental() %} +current_status_prep AS ( + + SELECT + {{ tracked_hashkey }}, + {{ is_active_alias}}, + ROW_NUMBER() OVER (PARTITION BY {{ tracked_hashkey }} ORDER BY {{ src_ldts }} DESC) as rn + FROM {{ this }} + +), + +current_status AS ( + + SELECT + {{ tracked_hashkey }}, + {{ is_active_alias }} + FROM current_status_prep + WHERE rn = 1 + +), +{% endif %} + +{# + This block is for multi-batch processing. +#} +{% if not source_is_single_batch %} + + {# + List of all Hashkeys with their date of first appearance in the source model. + #} + hashkeys AS ( + + SELECT + {{ tracked_hashkey }}, + MIN({{ src_ldts }}) as first_appearance + FROM source_data + GROUP BY {{ tracked_hashkey }} + + ), + + {# + Distinct list of load dates in the multi-batch source. + #} + load_dates AS ( + + SELECT Distinct + {{ src_ldts }} + FROM source_data + + ), + + {# + All combinations of hashkeys and loaddates, for loaddates after the first appearance of a hashkey. + #} + history AS ( + + SELECT + hk.{{ tracked_hashkey }}, + ld.{{ src_ldts }} + FROM hashkeys hk + CROSS JOIN load_dates ld + WHERE ld.{{ src_ldts }} >= hk.first_appearance + + ), + + {# + All theoretical combinations are checked against the actual occurences of hashkeys in each batch / loaddate. + If a Hashkey is part of a load/batch, is_active_alias is set to 1, because the hashkey was active in that load/batch. + If a Hashkey is not part of a load/batch, is_active_alias is set to 0, because the hashkey was not active in that load/batch. + #} + is_active AS ( + + SELECT + h.{{ tracked_hashkey }}, + h.{{ src_ldts }}, + CASE + WHEN src.{{ tracked_hashkey }} IS NULL THEN 0 + ELSE 1 + END as {{ is_active_alias }} + FROM history h + LEFT JOIN source_data src + ON src.{{ tracked_hashkey }} = h.{{ tracked_hashkey }} + AND src.{{ src_ldts }} = h.{{ src_ldts }} + + ), + + {# + The rows are deduplicated on the is_active_alias, to only include status changes. + Additionally, a ROW_NUMBER() is calculated in incremental runs, to use it in the next step for comparison against the current status. + #} + deduplicated_incoming_prep AS ( + + SELECT + is_active.{{ tracked_hashkey }}, + is_active.{{ src_ldts }}, + is_active.{{ is_active_alias }}, + LAG(is_active.{{ is_active_alias }}) OVER (PARTITION BY {{ tracked_hashkey }} ORDER BY {{ src_ldts }}) as lag_is_active + + FROM is_active + + ), + + deduplicated_incoming AS ( + + SELECT + deduplicated_incoming_prep.{{ tracked_hashkey }}, + deduplicated_incoming_prep.{{ src_ldts }}, + deduplicated_incoming_prep.{{ is_active_alias }} + + FROM + deduplicated_incoming_prep + WHERE + deduplicated_incoming_prep.{{ is_active_alias }} != deduplicated_incoming_prep.lag_is_active + OR deduplicated_incoming_prep.lag_is_active IS NULL + + ), + + {% set ns.last_cte = 'deduplicated_incoming' %} + +{# + This block is for single-batch processing +#} +{% else %} + + {# + In initial loads of single-batch eff sats, every hashkey of the source is set to active. + #} + new_hashkeys AS ( + + SELECT DISTINCT + src.{{ tracked_hashkey }}, + src.{{ src_ldts }}, + 1 as {{ is_active_alias }} + FROM source_data src + + {# + For incremental runs of single-batch eff sats, only hashkeys that are not active right now are set to active. + This automatically includes totally new hashkeys, or hashkeys that are currently set to inactive. + #} + {% if is_incremental() %} + LEFT JOIN current_status cs + ON src.{{ tracked_hashkey }} = cs.{{ tracked_hashkey }} + AND cs.{{ is_active_alias }} = 1 + WHERE cs.{{ tracked_hashkey }} IS NULL + {% endif %} + + ), + + {% set ns.last_cte = 'new_hashkeys' %} + +{% endif %} + +{# + In all incremental runs, the source needs to be scanned for all currently active hashkeys. + If they are no longer present, they will be deactived. +#} +{%- if is_incremental() %} + + {%- if not source_is_single_batch %} + disappeared_hashkeys AS ( + + SELECT DISTINCT + cs.{{ tracked_hashkey }}, + ldts.min_ldts as {{ src_ldts }}, + 0 as {{ is_active_alias }} + FROM current_status cs + LEFT JOIN ( + SELECT + MIN({{ src_ldts }}) as min_ldts + FROM deduplicated_incoming) ldts + ON 1 = 1 + LEFT JOIN deduplicated_incoming src + ON src.{{ tracked_hashkey }} = cs.{{ tracked_hashkey }} + AND src.{{ src_ldts }} = ldts.min_ldts + WHERE + cs.{{ is_active_alias }} = 1 + AND src.{{ tracked_hashkey }} IS NULL + AND ldts.min_ldts IS NOT NULL + + ), + {% else %} + disappeared_hashkeys AS ( + + SELECT DISTINCT + cs.{{ tracked_hashkey }}, + ldts.min_ldts as {{ src_ldts }}, + 0 as {{ is_active_alias }} + FROM current_status cs + LEFT JOIN ( + SELECT + MIN({{ src_ldts }}) as min_ldts + FROM source_data) ldts + ON 1 = 1 + WHERE NOT EXISTS ( + SELECT + 1 + FROM source_data src + WHERE src.{{ tracked_hashkey }} = cs.{{ tracked_hashkey }} + ) + AND cs.{{ is_active_alias }} = 1 + AND ldts.min_ldts IS NOT NULL + + ), + {% endif %} +{%- endif %} + +records_to_insert AS ( + + {# + This first part of the UNION includes: + - for single-batch loads: Only is_active_alias = 1, deactivations are handled later + - for multi-batch loads: Ativation and deactivation inside the multiple loads + #} + SELECT + di.{{ tracked_hashkey }}, + di.{{ src_ldts }}, + di.{{ is_active_alias }} + FROM {{ ns.last_cte }} di + + + {%- if is_incremental() %} + + {# + For incremental multi-batch loads, the earliest to-be inserted status is compared to the current status. + It will only be inserted if the status changed. We use the ROW_NUMBER() + #} + {%- if not source_is_single_batch %} + WHERE NOT EXISTS ( + SELECT 1 + FROM current_status + WHERE {{ datavault4dbt.multikey(tracked_hashkey, prefix=['current_status', 'di'], condition='=') }} + AND {{ datavault4dbt.multikey(is_active_alias, prefix=['current_status', 'di'], condition='=') }} + AND di.{{ src_ldts }} = (SELECT MIN({{ src_ldts }}) FROM deduplicated_incoming) + ) + AND di.{{ src_ldts }} > (SELECT MAX({{ src_ldts }}) FROM {{ this }}) + {% endif %} + + {# + For all incremental loads, the disappeared hashkeys are UNIONed. + #} + UNION + + SELECT + {{ tracked_hashkey }}, + {{ src_ldts }}, + {{ is_active_alias }} + FROM disappeared_hashkeys + + {%- endif %} + +) + +SELECT * +FROM records_to_insert ri + +{% if is_incremental() %} +WHERE NOT EXISTS ( + SELECT 1 + FROM {{ this }} t + WHERE t.{{ tracked_hashkey }} = ri.{{ tracked_hashkey }} + AND t.{{ src_ldts }} = ri.{{ src_ldts }} +) +{% endif %} + +{%- endmacro -%} \ No newline at end of file diff --git a/macros/tables/fabric/sat_v0.sql b/macros/tables/fabric/sat_v0.sql index 0e07a5aa..7223949c 100644 --- a/macros/tables/fabric/sat_v0.sql +++ b/macros/tables/fabric/sat_v0.sql @@ -63,7 +63,7 @@ latest_entries_in_sat_prep AS ( SELECT tgt.{{ parent_hashkey }}, tgt.{{ ns.hdiff_alias }}, - ROW_NUMBER() OVER(PARTITION BY tgt.{{ parent_hashkey|lower }} ORDER BY tgt.{{ src_ldts }} DESC) as rn + ROW_NUMBER() OVER(PARTITION BY tgt.{{ parent_hashkey }} ORDER BY tgt.{{ src_ldts }} DESC) as rn FROM {{ this }} tgt INNER JOIN distinct_incoming_hashkeys src ON tgt.{{ parent_hashkey }} = src.{{ parent_hashkey }} @@ -91,7 +91,7 @@ deduplicated_numbered_source_prep AS ( {{ parent_hashkey }}, {{ ns.hdiff_alias }}, {{ datavault4dbt.print_list(source_cols) }} - , LAG({{ ns.hdiff_alias }}) OVER(PARTITION BY {{ parent_hashkey|lower }} ORDER BY {{ src_ldts }}) as prev_hashdiff + , LAG({{ ns.hdiff_alias }}) OVER(PARTITION BY {{ parent_hashkey }} ORDER BY {{ src_ldts }}) as prev_hashdiff FROM source_data ), @@ -142,4 +142,4 @@ records_to_insert AS ( SELECT * FROM records_to_insert -{%- endmacro -%} \ No newline at end of file +{%- endmacro -%} diff --git a/macros/tables/oracle/eff_sat_v0.sql b/macros/tables/oracle/eff_sat_v0.sql new file mode 100644 index 00000000..2ff92746 --- /dev/null +++ b/macros/tables/oracle/eff_sat_v0.sql @@ -0,0 +1,316 @@ +{%- macro oracle__eff_sat_v0(source_model, tracked_hashkey, src_ldts, src_rsrc, is_active_alias, source_is_single_batch, disable_hwm) -%} + +{%- set beginning_of_all_times = datavault4dbt.beginning_of_all_times() -%} +{%- set end_of_all_times = datavault4dbt.end_of_all_times() -%} +{%- set timestamp_format = datavault4dbt.timestamp_format() -%} + +{%- set ns = namespace(last_cte= "") -%} + +{%- set source_relation = ref(source_model) -%} + +{%- set tracked_hashkey = datavault4dbt.escape_column_names(tracked_hashkey) -%} +{%- set is_active_alias = datavault4dbt.escape_column_names(is_active_alias) -%} +{%- set src_ldts = datavault4dbt.escape_column_names(src_ldts) -%} +{%- set src_rsrc = datavault4dbt.escape_column_names(src_rsrc) -%} + +{{ log('columns to select: '~final_columns_to_select, false) }} + +{{ datavault4dbt.prepend_generated_by() }} + +WITH + +{# + In all cases, the source model is selected, and optionally a HWM is applied. +#} +{% if is_incremental() and not disable_hwm %} +max_ldts_prep AS ( + + SELECT + MAX({{ src_ldts }}) AS max_ldts + FROM {{ this }} + WHERE {{ src_ldts }} != {{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }} +), +{% endif %} + +source_data AS ( + + SELECT + {{ tracked_hashkey }}, + {{ src_ldts }} + FROM {{ source_relation }} src + WHERE {{ src_ldts }} NOT IN ({{ datavault4dbt.string_to_timestamp(timestamp_format, beginning_of_all_times) }}, {{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }}) + {%- if is_incremental() and not disable_hwm %} + AND src.{{ src_ldts }} > ( + SELECT max_ldts FROM max_ldts_prep + ) + {%- endif %} +), + +{# + In all incremental cases, the current status for each hashkey is selected from the existing Effectivity Satellite. +#} +{%- if is_incremental() %} +current_status_prep AS ( + + SELECT + {{ tracked_hashkey }}, + {{ is_active_alias}}, + ROW_NUMBER() OVER (PARTITION BY {{ tracked_hashkey }} ORDER BY {{ src_ldts }} DESC) as rn + FROM {{ this }} + +), + +current_status AS ( + + SELECT + {{ tracked_hashkey }}, + {{ is_active_alias }} + FROM current_status_prep + WHERE rn = 1 + +), +{% endif %} + +{# + This block is for multi-batch processing. +#} +{% if not source_is_single_batch %} + + {# + List of all Hashkeys with their date of first appearance in the source model. + #} + hashkeys AS ( + + SELECT + {{ tracked_hashkey }}, + MIN({{ src_ldts }}) as first_appearance + FROM source_data + GROUP BY {{ tracked_hashkey }} + + ), + + {# + Distinct list of load dates in the multi-batch source. + #} + load_dates AS ( + + SELECT Distinct + {{ src_ldts }} + FROM source_data + + ), + + {# + All combinations of hashkeys and loaddates, for loaddates after the first appearance of a hashkey. + #} + history AS ( + + SELECT + hk.{{ tracked_hashkey }}, + ld.{{ src_ldts }} + FROM hashkeys hk + CROSS JOIN load_dates ld + WHERE ld.{{ src_ldts }} >= hk.first_appearance + + ), + + {# + All theoretical combinations are checked against the actual occurences of hashkeys in each batch / loaddate. + If a Hashkey is part of a load/batch, is_active_alias is set to 1, because the hashkey was active in that load/batch. + If a Hashkey is not part of a load/batch, is_active_alias is set to 0, because the hashkey was not active in that load/batch. + #} + is_active AS ( + + SELECT + h.{{ tracked_hashkey }}, + h.{{ src_ldts }}, + CASE + WHEN src.{{ tracked_hashkey }} IS NULL THEN 0 + ELSE 1 + END as {{ is_active_alias }} + FROM history h + LEFT JOIN source_data src + ON src.{{ tracked_hashkey }} = h.{{ tracked_hashkey }} + AND src.{{ src_ldts }} = h.{{ src_ldts }} + + ), + + {# + The rows are deduplicated on the is_active_alias, to only include status changes. + Additionally, a ROW_NUMBER() is calculated in incremental runs, to use it in the next step for comparison against the current status. + #} + deduplicated_incoming_prep AS ( + + SELECT + is_active.{{ tracked_hashkey }}, + is_active.{{ src_ldts }}, + is_active.{{ is_active_alias }}, + LAG(is_active.{{ is_active_alias }}) OVER (PARTITION BY {{ tracked_hashkey }} ORDER BY {{ src_ldts }}) as lag_is_active + + FROM is_active + + ), + + deduplicated_incoming AS ( + + SELECT + deduplicated_incoming_prep.{{ tracked_hashkey }}, + deduplicated_incoming_prep.{{ src_ldts }}, + deduplicated_incoming_prep.{{ is_active_alias }} + + FROM + deduplicated_incoming_prep + WHERE + deduplicated_incoming_prep.{{ is_active_alias }} != deduplicated_incoming_prep.lag_is_active + OR deduplicated_incoming_prep.lag_is_active IS NULL + + ), + + {% set ns.last_cte = 'deduplicated_incoming' %} + +{# + This block is for single-batch processing +#} +{% else %} + + {# + In initial loads of single-batch eff sats, every hashkey of the source is set to active. + #} + new_hashkeys AS ( + + SELECT DISTINCT + src.{{ tracked_hashkey }}, + src.{{ src_ldts }}, + 1 as {{ is_active_alias }} + FROM source_data src + + {# + For incremental runs of single-batch eff sats, only hashkeys that are not active right now are set to active. + This automatically includes totally new hashkeys, or hashkeys that are currently set to inactive. + #} + {% if is_incremental() %} + LEFT JOIN current_status cs + ON src.{{ tracked_hashkey }} = cs.{{ tracked_hashkey }} + AND cs.{{ is_active_alias }} = 1 + WHERE cs.{{ tracked_hashkey }} IS NULL + {% endif %} + + ), + + {% set ns.last_cte = 'new_hashkeys' %} + +{% endif %} + +{# + In all incremental runs, the source needs to be scanned for all currently active hashkeys. + If they are no longer present, they will be deactived. +#} +{%- if is_incremental() %} + + {%- if not source_is_single_batch %} + disappeared_hashkeys AS ( + + SELECT DISTINCT + cs.{{ tracked_hashkey }}, + ldts.min_ldts as {{ src_ldts }}, + 0 as {{ is_active_alias }} + FROM current_status cs + LEFT JOIN ( + SELECT + MIN({{ src_ldts }}) as min_ldts + FROM deduplicated_incoming) ldts + ON 1 = 1 + LEFT JOIN deduplicated_incoming src + ON src.{{ tracked_hashkey }} = cs.{{ tracked_hashkey }} + AND src.{{ src_ldts }} = ldts.min_ldts + WHERE + cs.{{ is_active_alias }} = 1 + AND src.{{ tracked_hashkey }} IS NULL + AND ldts.min_ldts IS NOT NULL + + ), + {% else %} + disappeared_hashkeys AS ( + + SELECT DISTINCT + cs.{{ tracked_hashkey }}, + ldts.min_ldts as {{ src_ldts }}, + 0 as {{ is_active_alias }} + FROM current_status cs + LEFT JOIN ( + SELECT + MIN({{ src_ldts }}) as min_ldts + FROM source_data) ldts + ON 1 = 1 + WHERE NOT EXISTS ( + SELECT + 1 + FROM source_data src + WHERE src.{{ tracked_hashkey }} = cs.{{ tracked_hashkey }} + ) + AND cs.{{ is_active_alias }} = 1 + AND ldts.min_ldts IS NOT NULL + + ), + {% endif %} +{%- endif %} + +records_to_insert AS ( + + {# + This first part of the UNION includes: + - for single-batch loads: Only is_active_alias = 1, deactivations are handled later + - for multi-batch loads: Ativation and deactivation inside the multiple loads + #} + SELECT + di.{{ tracked_hashkey }}, + di.{{ src_ldts }}, + di.{{ is_active_alias }} + FROM {{ ns.last_cte }} di + + + {%- if is_incremental() %} + + {# + For incremental multi-batch loads, the earliest to-be inserted status is compared to the current status. + It will only be inserted if the status changed. We use the ROW_NUMBER() + #} + {%- if not source_is_single_batch %} + WHERE NOT EXISTS ( + SELECT 1 + FROM current_status + WHERE {{ datavault4dbt.multikey(tracked_hashkey, prefix=['current_status', 'di'], condition='=') }} + AND {{ datavault4dbt.multikey(is_active_alias, prefix=['current_status', 'di'], condition='=') }} + AND di.{{ src_ldts }} = (SELECT MIN({{ src_ldts }}) FROM deduplicated_incoming) + ) + AND di.{{ src_ldts }} > (SELECT MAX({{ src_ldts }}) FROM {{ this }}) + {% endif %} + + {# + For all incremental loads, the disappeared hashkeys are UNIONed. + #} + UNION + + SELECT + {{ tracked_hashkey }}, + {{ src_ldts }}, + {{ is_active_alias }} + FROM disappeared_hashkeys + + {%- endif %} + +) + +SELECT * +FROM records_to_insert ri + +{% if is_incremental() %} +WHERE NOT EXISTS ( + SELECT 1 + FROM {{ this }} t + WHERE t.{{ tracked_hashkey }} = ri.{{ tracked_hashkey }} + AND t.{{ src_ldts }} = ri.{{ src_ldts }} +) +{% endif %} + +{%- endmacro -%} \ No newline at end of file diff --git a/macros/tables/postgres/eff_sat_v0.sql b/macros/tables/postgres/eff_sat_v0.sql new file mode 100644 index 00000000..ef8d63c8 --- /dev/null +++ b/macros/tables/postgres/eff_sat_v0.sql @@ -0,0 +1,315 @@ +{%- macro postgres__eff_sat_v0(source_model, tracked_hashkey, src_ldts, src_rsrc, is_active_alias, source_is_single_batch, disable_hwm) -%} + +{%- set end_of_all_times = datavault4dbt.end_of_all_times() -%} +{%- set timestamp_format = datavault4dbt.timestamp_format() -%} + +{%- set ns = namespace(last_cte= "") -%} + +{%- set source_relation = ref(source_model) -%} + +{%- set tracked_hashkey = datavault4dbt.escape_column_names(tracked_hashkey) -%} +{%- set is_active_alias = datavault4dbt.escape_column_names(is_active_alias) -%} +{%- set src_ldts = datavault4dbt.escape_column_names(src_ldts) -%} +{%- set src_rsrc = datavault4dbt.escape_column_names(src_rsrc) -%} + +{{ log('columns to select: '~final_columns_to_select, false) }} + +{{ datavault4dbt.prepend_generated_by() }} + +WITH + +{# + In all cases, the source model is selected, and optionally a HWM is applied. +#} +{% if is_incremental() and not disable_hwm %} +max_ldts_prep AS ( + + SELECT + MAX({{ src_ldts }}) AS max_ldts + FROM {{ this }} + WHERE {{ src_ldts }} != {{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }} +), +{% endif %} + +source_data AS ( + + SELECT + {{ tracked_hashkey }}, + {{ src_ldts }} + FROM {{ source_relation }} src + WHERE {{ src_ldts }} NOT IN ('{{ datavault4dbt.beginning_of_all_times() }}', '{{ datavault4dbt.end_of_all_times() }}') + {%- if is_incremental() and not disable_hwm %} + AND src.{{ src_ldts }} > ( + SELECT max_ldts FROM max_ldts_prep + ) + {%- endif %} +), + +{# + In all incremental cases, the current status for each hashkey is selected from the existing Effectivity Satellite. +#} +{%- if is_incremental() %} +current_status_prep AS ( + + SELECT + {{ tracked_hashkey }}, + {{ is_active_alias}}, + ROW_NUMBER() OVER (PARTITION BY {{ tracked_hashkey }} ORDER BY {{ src_ldts }} DESC) as rn + FROM {{ this }} + +), + +current_status AS ( + + SELECT + {{ tracked_hashkey }}, + {{ is_active_alias }} + FROM current_status_prep + WHERE rn = 1 + +), +{% endif %} + +{# + This block is for multi-batch processing. +#} +{% if not source_is_single_batch %} + + {# + List of all Hashkeys with their date of first appearance in the source model. + #} + hashkeys AS ( + + SELECT + {{ tracked_hashkey }}, + MIN({{ src_ldts }}) as first_appearance + FROM source_data + GROUP BY {{ tracked_hashkey }} + + ), + + {# + Distinct list of load dates in the multi-batch source. + #} + load_dates AS ( + + SELECT Distinct + {{ src_ldts }} + FROM source_data + + ), + + {# + All combinations of hashkeys and loaddates, for loaddates after the first appearance of a hashkey. + #} + history AS ( + + SELECT + hk.{{ tracked_hashkey }}, + ld.{{ src_ldts }} + FROM hashkeys hk + CROSS JOIN load_dates ld + WHERE ld.{{ src_ldts }} >= hk.first_appearance + + ), + + {# + All theoretical combinations are checked against the actual occurences of hashkeys in each batch / loaddate. + If a Hashkey is part of a load/batch, is_active_alias is set to 1, because the hashkey was active in that load/batch. + If a Hashkey is not part of a load/batch, is_active_alias is set to 0, because the hashkey was not active in that load/batch. + #} + is_active AS ( + + SELECT + h.{{ tracked_hashkey }}, + h.{{ src_ldts }}, + CASE + WHEN src.{{ tracked_hashkey }} IS NULL THEN 0 + ELSE 1 + END as {{ is_active_alias }} + FROM history h + LEFT JOIN source_data src + ON src.{{ tracked_hashkey }} = h.{{ tracked_hashkey }} + AND src.{{ src_ldts }} = h.{{ src_ldts }} + + ), + + {# + The rows are deduplicated on the is_active_alias, to only include status changes. + Additionally, a ROW_NUMBER() is calculated in incremental runs, to use it in the next step for comparison against the current status. + #} + deduplicated_incoming_prep AS ( + + SELECT + is_active.{{ tracked_hashkey }}, + is_active.{{ src_ldts }}, + is_active.{{ is_active_alias }}, + LAG(is_active.{{ is_active_alias }}) OVER (PARTITION BY {{ tracked_hashkey }} ORDER BY {{ src_ldts }}) as lag_is_active + + FROM is_active + + ), + + deduplicated_incoming AS ( + + SELECT + deduplicated_incoming_prep.{{ tracked_hashkey }}, + deduplicated_incoming_prep.{{ src_ldts }}, + deduplicated_incoming_prep.{{ is_active_alias }} + + FROM + deduplicated_incoming_prep + WHERE + deduplicated_incoming_prep.{{ is_active_alias }} != deduplicated_incoming_prep.lag_is_active + OR deduplicated_incoming_prep.lag_is_active IS NULL + + ), + + {% set ns.last_cte = 'deduplicated_incoming' %} + +{# + This block is for single-batch processing +#} +{% else %} + + {# + In initial loads of single-batch eff sats, every hashkey of the source is set to active. + #} + new_hashkeys AS ( + + SELECT DISTINCT + src.{{ tracked_hashkey }}, + src.{{ src_ldts }}, + 1 as {{ is_active_alias }} + FROM source_data src + + {# + For incremental runs of single-batch eff sats, only hashkeys that are not active right now are set to active. + This automatically includes totally new hashkeys, or hashkeys that are currently set to inactive. + #} + {% if is_incremental() %} + LEFT JOIN current_status cs + ON src.{{ tracked_hashkey }} = cs.{{ tracked_hashkey }} + AND cs.{{ is_active_alias }} = 1 + WHERE cs.{{ tracked_hashkey }} IS NULL + {% endif %} + + ), + + {% set ns.last_cte = 'new_hashkeys' %} + +{% endif %} + +{# + In all incremental runs, the source needs to be scanned for all currently active hashkeys. + If they are no longer present, they will be deactived. +#} +{%- if is_incremental() %} + + {%- if not source_is_single_batch %} + disappeared_hashkeys AS ( + + SELECT DISTINCT + cs.{{ tracked_hashkey }}, + ldts.min_ldts as {{ src_ldts }}, + 0 as {{ is_active_alias }} + FROM current_status cs + LEFT JOIN ( + SELECT + MIN({{ src_ldts }}) as min_ldts + FROM deduplicated_incoming) ldts + ON 1 = 1 + LEFT JOIN deduplicated_incoming src + ON src.{{ tracked_hashkey }} = cs.{{ tracked_hashkey }} + AND src.{{ src_ldts }} = ldts.min_ldts + WHERE + cs.{{ is_active_alias }} = 1 + AND src.{{ tracked_hashkey }} IS NULL + AND ldts.min_ldts IS NOT NULL + + ), + {% else %} + disappeared_hashkeys AS ( + + SELECT DISTINCT + cs.{{ tracked_hashkey }}, + ldts.min_ldts as {{ src_ldts }}, + 0 as {{ is_active_alias }} + FROM current_status cs + LEFT JOIN ( + SELECT + MIN({{ src_ldts }}) as min_ldts + FROM source_data) ldts + ON 1 = 1 + WHERE NOT EXISTS ( + SELECT + 1 + FROM source_data src + WHERE src.{{ tracked_hashkey }} = cs.{{ tracked_hashkey }} + ) + AND cs.{{ is_active_alias }} = 1 + AND ldts.min_ldts IS NOT NULL + + ), + {% endif %} +{%- endif %} + +records_to_insert AS ( + + {# + This first part of the UNION includes: + - for single-batch loads: Only is_active_alias = 1, deactivations are handled later + - for multi-batch loads: Ativation and deactivation inside the multiple loads + #} + SELECT + di.{{ tracked_hashkey }}, + di.{{ src_ldts }}, + di.{{ is_active_alias }} + FROM {{ ns.last_cte }} di + + + {%- if is_incremental() %} + + {# + For incremental multi-batch loads, the earliest to-be inserted status is compared to the current status. + It will only be inserted if the status changed. We use the ROW_NUMBER() + #} + {%- if not source_is_single_batch %} + WHERE NOT EXISTS ( + SELECT 1 + FROM current_status + WHERE {{ datavault4dbt.multikey(tracked_hashkey, prefix=['current_status', 'di'], condition='=') }} + AND {{ datavault4dbt.multikey(is_active_alias, prefix=['current_status', 'di'], condition='=') }} + AND di.{{ src_ldts }} = (SELECT MIN({{ src_ldts }}) FROM deduplicated_incoming) + ) + AND di.{{ src_ldts }} > (SELECT MAX({{ src_ldts }}) FROM {{ this }}) + {% endif %} + + {# + For all incremental loads, the disappeared hashkeys are UNIONed. + #} + UNION + + SELECT + {{ tracked_hashkey }}, + {{ src_ldts }}, + {{ is_active_alias }} + FROM disappeared_hashkeys + + {%- endif %} + +) + +SELECT * +FROM records_to_insert ri + +{% if is_incremental() %} +WHERE NOT EXISTS ( + SELECT 1 + FROM {{ this }} t + WHERE t.{{ tracked_hashkey }} = ri.{{ tracked_hashkey }} + AND t.{{ src_ldts }} = ri.{{ src_ldts }} +) +{% endif %} + +{%- endmacro -%} \ No newline at end of file diff --git a/macros/tables/redshift/eff_sat_v0.sql b/macros/tables/redshift/eff_sat_v0.sql new file mode 100644 index 00000000..3483db4a --- /dev/null +++ b/macros/tables/redshift/eff_sat_v0.sql @@ -0,0 +1,315 @@ +{%- macro redshift__eff_sat_v0(source_model, tracked_hashkey, src_ldts, src_rsrc, is_active_alias, source_is_single_batch, disable_hwm) -%} + +{%- set end_of_all_times = datavault4dbt.end_of_all_times() -%} +{%- set timestamp_format = datavault4dbt.timestamp_format() -%} + +{%- set ns = namespace(last_cte= "") -%} + +{%- set source_relation = ref(source_model) -%} + +{%- set tracked_hashkey = datavault4dbt.escape_column_names(tracked_hashkey) -%} +{%- set is_active_alias = datavault4dbt.escape_column_names(is_active_alias) -%} +{%- set src_ldts = datavault4dbt.escape_column_names(src_ldts) -%} +{%- set src_rsrc = datavault4dbt.escape_column_names(src_rsrc) -%} + +{{ log('columns to select: '~final_columns_to_select, false) }} + +{{ datavault4dbt.prepend_generated_by() }} + +WITH + +{# + In all cases, the source model is selected, and optionally a HWM is applied. +#} +{% if is_incremental() and not disable_hwm %} +max_ldts_prep AS ( + + SELECT + MAX({{ src_ldts }}) AS max_ldts + FROM {{ this }} + WHERE {{ src_ldts }} != {{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }} +), +{% endif %} + +source_data AS ( + + SELECT + {{ tracked_hashkey }}, + {{ src_ldts }} + FROM {{ source_relation }} src + WHERE {{ src_ldts }} NOT IN ('{{ datavault4dbt.beginning_of_all_times() }}', '{{ datavault4dbt.end_of_all_times() }}') + {%- if is_incremental() and not disable_hwm %} + AND src.{{ src_ldts }} > ( + SELECT max_ldts FROM max_ldts_prep + ) + {%- endif %} +), + +{# + In all incremental cases, the current status for each hashkey is selected from the existing Effectivity Satellite. +#} +{%- if is_incremental() %} +current_status_prep AS ( + + SELECT + {{ tracked_hashkey }}, + {{ is_active_alias}}, + ROW_NUMBER() OVER (PARTITION BY {{ tracked_hashkey }} ORDER BY {{ src_ldts }} DESC) as rn + FROM {{ this }} + +), + +current_status AS ( + + SELECT + {{ tracked_hashkey }}, + {{ is_active_alias }} + FROM current_status_prep + WHERE rn = 1 + +), +{% endif %} + +{# + This block is for multi-batch processing. +#} +{% if not source_is_single_batch %} + + {# + List of all Hashkeys with their date of first appearance in the source model. + #} + hashkeys AS ( + + SELECT + {{ tracked_hashkey }}, + MIN({{ src_ldts }}) as first_appearance + FROM source_data + GROUP BY {{ tracked_hashkey }} + + ), + + {# + Distinct list of load dates in the multi-batch source. + #} + load_dates AS ( + + SELECT Distinct + {{ src_ldts }} + FROM source_data + + ), + + {# + All combinations of hashkeys and loaddates, for loaddates after the first appearance of a hashkey. + #} + history AS ( + + SELECT + hk.{{ tracked_hashkey }}, + ld.{{ src_ldts }} + FROM hashkeys hk + CROSS JOIN load_dates ld + WHERE ld.{{ src_ldts }} >= hk.first_appearance + + ), + + {# + All theoretical combinations are checked against the actual occurences of hashkeys in each batch / loaddate. + If a Hashkey is part of a load/batch, is_active_alias is set to 1, because the hashkey was active in that load/batch. + If a Hashkey is not part of a load/batch, is_active_alias is set to 0, because the hashkey was not active in that load/batch. + #} + is_active AS ( + + SELECT + h.{{ tracked_hashkey }}, + h.{{ src_ldts }}, + CASE + WHEN src.{{ tracked_hashkey }} IS NULL THEN 0 + ELSE 1 + END as {{ is_active_alias }} + FROM history h + LEFT JOIN source_data src + ON src.{{ tracked_hashkey }} = h.{{ tracked_hashkey }} + AND src.{{ src_ldts }} = h.{{ src_ldts }} + + ), + + {# + The rows are deduplicated on the is_active_alias, to only include status changes. + Additionally, a ROW_NUMBER() is calculated in incremental runs, to use it in the next step for comparison against the current status. + #} + deduplicated_incoming_prep AS ( + + SELECT + is_active.{{ tracked_hashkey }}, + is_active.{{ src_ldts }}, + is_active.{{ is_active_alias }}, + LAG(is_active.{{ is_active_alias }}) OVER (PARTITION BY {{ tracked_hashkey }} ORDER BY {{ src_ldts }}) as lag_is_active + + FROM is_active + + ), + + deduplicated_incoming AS ( + + SELECT + deduplicated_incoming_prep.{{ tracked_hashkey }}, + deduplicated_incoming_prep.{{ src_ldts }}, + deduplicated_incoming_prep.{{ is_active_alias }} + + FROM + deduplicated_incoming_prep + WHERE + deduplicated_incoming_prep.{{ is_active_alias }} != deduplicated_incoming_prep.lag_is_active + OR deduplicated_incoming_prep.lag_is_active IS NULL + + ), + + {% set ns.last_cte = 'deduplicated_incoming' %} + +{# + This block is for single-batch processing +#} +{% else %} + + {# + In initial loads of single-batch eff sats, every hashkey of the source is set to active. + #} + new_hashkeys AS ( + + SELECT DISTINCT + src.{{ tracked_hashkey }}, + src.{{ src_ldts }}, + 1 as {{ is_active_alias }} + FROM source_data src + + {# + For incremental runs of single-batch eff sats, only hashkeys that are not active right now are set to active. + This automatically includes totally new hashkeys, or hashkeys that are currently set to inactive. + #} + {% if is_incremental() %} + LEFT JOIN current_status cs + ON src.{{ tracked_hashkey }} = cs.{{ tracked_hashkey }} + AND cs.{{ is_active_alias }} = 1 + WHERE cs.{{ tracked_hashkey }} IS NULL + {% endif %} + + ), + + {% set ns.last_cte = 'new_hashkeys' %} + +{% endif %} + +{# + In all incremental runs, the source needs to be scanned for all currently active hashkeys. + If they are no longer present, they will be deactived. +#} +{%- if is_incremental() %} + + {%- if not source_is_single_batch %} + disappeared_hashkeys AS ( + + SELECT DISTINCT + cs.{{ tracked_hashkey }}, + ldts.min_ldts as {{ src_ldts }}, + 0 as {{ is_active_alias }} + FROM current_status cs + LEFT JOIN ( + SELECT + MIN({{ src_ldts }}) as min_ldts + FROM deduplicated_incoming) ldts + ON 1 = 1 + LEFT JOIN deduplicated_incoming src + ON src.{{ tracked_hashkey }} = cs.{{ tracked_hashkey }} + AND src.{{ src_ldts }} = ldts.min_ldts + WHERE + cs.{{ is_active_alias }} = 1 + AND src.{{ tracked_hashkey }} IS NULL + AND ldts.min_ldts IS NOT NULL + + ), + {% else %} + disappeared_hashkeys AS ( + + SELECT DISTINCT + cs.{{ tracked_hashkey }}, + ldts.min_ldts as {{ src_ldts }}, + 0 as {{ is_active_alias }} + FROM current_status cs + LEFT JOIN ( + SELECT + MIN({{ src_ldts }}) as min_ldts + FROM source_data) ldts + ON 1 = 1 + WHERE NOT EXISTS ( + SELECT + 1 + FROM source_data src + WHERE src.{{ tracked_hashkey }} = cs.{{ tracked_hashkey }} + ) + AND cs.{{ is_active_alias }} = 1 + AND ldts.min_ldts IS NOT NULL + + ), + {% endif %} +{%- endif %} + +records_to_insert AS ( + + {# + This first part of the UNION includes: + - for single-batch loads: Only is_active_alias = 1, deactivations are handled later + - for multi-batch loads: Ativation and deactivation inside the multiple loads + #} + SELECT + di.{{ tracked_hashkey }}, + di.{{ src_ldts }}, + di.{{ is_active_alias }} + FROM {{ ns.last_cte }} di + + + {%- if is_incremental() %} + + {# + For incremental multi-batch loads, the earliest to-be inserted status is compared to the current status. + It will only be inserted if the status changed. We use the ROW_NUMBER() + #} + {%- if not source_is_single_batch %} + WHERE NOT EXISTS ( + SELECT 1 + FROM current_status + WHERE {{ datavault4dbt.multikey(tracked_hashkey, prefix=['current_status', 'di'], condition='=') }} + AND {{ datavault4dbt.multikey(is_active_alias, prefix=['current_status', 'di'], condition='=') }} + AND di.{{ src_ldts }} = (SELECT MIN({{ src_ldts }}) FROM deduplicated_incoming) + ) + AND di.{{ src_ldts }} > (SELECT MAX({{ src_ldts }}) FROM {{ this }}) + {% endif %} + + {# + For all incremental loads, the disappeared hashkeys are UNIONed. + #} + UNION + + SELECT + {{ tracked_hashkey }}, + {{ src_ldts }}, + {{ is_active_alias }} + FROM disappeared_hashkeys + + {%- endif %} + +) + +SELECT * +FROM records_to_insert ri + +{% if is_incremental() %} +WHERE NOT EXISTS ( + SELECT 1 + FROM {{ this }} t + WHERE t.{{ tracked_hashkey }} = ri.{{ tracked_hashkey }} + AND t.{{ src_ldts }} = ri.{{ src_ldts }} +) +{% endif %} + +{%- endmacro -%} \ No newline at end of file diff --git a/macros/tables/snowflake/eff_sat_v0.sql b/macros/tables/snowflake/eff_sat_v0.sql new file mode 100644 index 00000000..c2b7a75e --- /dev/null +++ b/macros/tables/snowflake/eff_sat_v0.sql @@ -0,0 +1,292 @@ +{%- macro snowflake__eff_sat_v0(source_model, tracked_hashkey, src_ldts, src_rsrc, is_active_alias, source_is_single_batch, disable_hwm) -%} + +{%- set end_of_all_times = datavault4dbt.end_of_all_times() -%} +{%- set timestamp_format = datavault4dbt.timestamp_format() -%} + +{%- set ns = namespace(last_cte= "") -%} + +{%- set source_relation = ref(source_model) -%} + +{%- set tracked_hashkey = datavault4dbt.escape_column_names(tracked_hashkey) -%} +{%- set is_active_alias = datavault4dbt.escape_column_names(is_active_alias) -%} +{%- set src_ldts = datavault4dbt.escape_column_names(src_ldts) -%} +{%- set src_rsrc = datavault4dbt.escape_column_names(src_rsrc) -%} + +{{ log('columns to select: '~final_columns_to_select, false) }} + +{{ datavault4dbt.prepend_generated_by() }} + +WITH + +{# + In all cases, the source model is selected, and optionally a HWM is applied. +#} +source_data AS ( + + SELECT + {{ tracked_hashkey }}, + {{ src_ldts }} + FROM {{ source_relation }} src + WHERE {{ src_ldts }} NOT IN ('{{ datavault4dbt.beginning_of_all_times() }}', '{{ datavault4dbt.end_of_all_times() }}') + {%- if is_incremental() and not disable_hwm %} + AND src.{{ src_ldts }} > ( + SELECT + MAX({{ src_ldts }}) + FROM {{ this }} + WHERE {{ src_ldts }} != {{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }} + ) + {%- endif %} +), + +{# + In all incremental cases, the current status for each hashkey is selected from the existing Effectivity Satellite. +#} +{%- if is_incremental() %} +current_status AS ( + + SELECT + {{ tracked_hashkey }}, + {{ is_active_alias }} + FROM {{ this }} + QUALIFY + ROW_NUMBER() OVER(PARTITION BY {{ tracked_hashkey }} ORDER BY {{ src_ldts }} DESC) = 1 + +), +{% endif %} + +{# + This block is for multi-batch processing. +#} +{% if not source_is_single_batch %} + + {# + List of all Hashkeys with their date of first appearance in the source model. + #} + hashkeys AS ( + + SELECT + {{ tracked_hashkey }}, + MIN({{ src_ldts }}) as first_appearance + FROM source_data + GROUP BY {{ tracked_hashkey }} + + ), + + {# + Distinct list of load dates in the multi-batch source. + #} + load_dates AS ( + + SELECT Distinct + {{ src_ldts }} + FROM source_data + + ), + + {# + All combinations of hashkeys and loaddates, for loaddates after the first appearance of a hashkey. + #} + history AS ( + + SELECT + hk.{{ tracked_hashkey }}, + ld.{{ src_ldts }} + FROM hashkeys hk + CROSS JOIN load_dates ld + WHERE ld.{{ src_ldts }} >= hk.first_appearance + + ), + + {# + All theoretical combinations are checked against the actual occurences of hashkeys in each batch / loaddate. + If a Hashkey is part of a load/batch, is_active_alias is set to 1, because the hashkey was active in that load/batch. + If a Hashkey is not part of a load/batch, is_active_alias is set to 0, because the hashkey was not active in that load/batch. + #} + is_active AS ( + + SELECT + h.{{ tracked_hashkey }}, + h.{{ src_ldts }}, + CASE + WHEN src.{{ tracked_hashkey }} IS NULL THEN 0 + ELSE 1 + END as {{ is_active_alias }} + FROM history h + LEFT JOIN source_data src + ON src.{{ tracked_hashkey }} = h.{{ tracked_hashkey }} + AND src.{{ src_ldts }} = h.{{ src_ldts }} + + ), + + {# + The rows are deduplicated on the is_active_alias, to only include status changes. + Additionally, a ROW_NUMBER() is calculated in incremental runs, to use it in the next step for comparison against the current status. + #} + deduplicated_incoming AS ( + + SELECT + is_active.{{ tracked_hashkey }}, + is_active.{{ src_ldts }}, + is_active.{{ is_active_alias }} + + {% if is_incremental() -%} + , ROW_NUMBER() OVER(PARTITION BY is_active.{{ tracked_hashkey }} ORDER BY is_active.{{ src_ldts }}) as rn + {%- endif %} + + FROM is_active + QUALIFY + CASE + WHEN is_active.{{ is_active_alias }} = LAG(is_active.{{ is_active_alias }}) OVER (PARTITION BY {{ tracked_hashkey }} ORDER BY {{ src_ldts }}) THEN FALSE + ELSE TRUE + END + + ), + + {% set ns.last_cte = 'deduplicated_incoming' %} + +{# + This block is for single-batch processing +#} +{% else %} + + {# + In initial loads of single-batch eff sats, every hashkey of the source is set to active. + #} + new_hashkeys AS ( + + SELECT DISTINCT + src.{{ tracked_hashkey }}, + src.{{ src_ldts }}, + 1 as {{ is_active_alias }} + FROM source_data src + + {# + For incremental runs of single-batch eff sats, only hashkeys that are not active right now are set to active. + This automatically includes totally new hashkeys, or hashkeys that are currently set to inactive. + #} + {% if is_incremental() %} + LEFT JOIN current_status cs + ON src.{{ tracked_hashkey }} = cs.{{ tracked_hashkey }} + AND cs.{{ is_active_alias }} = 1 + WHERE cs.{{ tracked_hashkey }} IS NULL + {% endif %} + + ), + + {% set ns.last_cte = 'new_hashkeys' %} + +{% endif %} + +{# + In all incremental runs, the source needs to be scanned for all currently active hashkeys. + If they are no longer present, they will be deactived. +#} +{%- if is_incremental() %} + + {%- if not source_is_single_batch %} + disappeared_hashkeys AS ( + + SELECT DISTINCT + cs.{{ tracked_hashkey }}, + ldts.min_ldts as {{ src_ldts }}, + 0 as {{ is_active_alias }} + FROM current_status cs + LEFT JOIN ( + SELECT + MIN({{ src_ldts }}) as min_ldts + FROM deduplicated_incoming) ldts + ON 1 = 1 + LEFT JOIN deduplicated_incoming src + ON src.{{ tracked_hashkey }} = cs.{{ tracked_hashkey }} + AND src.{{ src_ldts }} = ldts.min_ldts + WHERE + cs.{{ is_active_alias }} = 1 + AND src.{{ tracked_hashkey }} IS NULL + AND ldts.min_ldts IS NOT NULL + + ), + {% else %} + disappeared_hashkeys AS ( + + SELECT DISTINCT + cs.{{ tracked_hashkey }}, + ldts.min_ldts as {{ src_ldts }}, + 0 as {{ is_active_alias }} + FROM current_status cs + LEFT JOIN ( + SELECT + MIN({{ src_ldts }}) as min_ldts + FROM source_data) ldts + ON 1 = 1 + WHERE NOT EXISTS ( + SELECT + 1 + FROM source_data src + WHERE src.{{ tracked_hashkey }} = cs.{{ tracked_hashkey }} + ) + AND cs.{{ is_active_alias }} = 1 + AND ldts.min_ldts IS NOT NULL + + ), + {% endif %} +{%- endif %} + +records_to_insert AS ( + + {# + This first part of the UNION includes: + - for single-batch loads: Only is_active_alias = 1, deactivations are handled later + - for multi-batch loads: Ativation and deactivation inside the multiple loads + #} + SELECT + di.{{ tracked_hashkey }}, + di.{{ src_ldts }}, + di.{{ is_active_alias }} + FROM {{ ns.last_cte }} di + + + {%- if is_incremental() %} + + {# + For incremental multi-batch loads, the earliest to-be inserted status is compared to the current status. + It will only be inserted if the status changed. We use the ROW_NUMBER() + #} + {%- if not source_is_single_batch %} + WHERE NOT EXISTS ( + SELECT 1 + FROM current_status + WHERE {{ datavault4dbt.multikey(tracked_hashkey, prefix=['current_status', 'di'], condition='=') }} + AND {{ datavault4dbt.multikey(is_active_alias, prefix=['current_status', 'di'], condition='=') }} + AND di.{{ src_ldts }} = (SELECT MIN({{ src_ldts }}) FROM deduplicated_incoming) + ) + AND di.{{ src_ldts }} > (SELECT MAX({{ src_ldts }}) FROM {{ this }}) + {% endif %} + + {# + For all incremental loads, the disappeared hashkeys are UNIONed. + #} + UNION + + SELECT + {{ tracked_hashkey }}, + {{ src_ldts }}, + {{ is_active_alias }} + FROM disappeared_hashkeys + + {%- endif %} + +) + +SELECT * +FROM records_to_insert ri + +{% if is_incremental() %} +WHERE NOT EXISTS ( + SELECT 1 + FROM {{ this }} t + WHERE t.{{ tracked_hashkey }} = ri.{{ tracked_hashkey }} + AND t.{{ src_ldts }} = ri.{{ src_ldts }} +) +{% endif %} + +{%- endmacro -%} \ No newline at end of file diff --git a/macros/tables/synapse/eff_sat_v0.sql b/macros/tables/synapse/eff_sat_v0.sql new file mode 100644 index 00000000..3bac2383 --- /dev/null +++ b/macros/tables/synapse/eff_sat_v0.sql @@ -0,0 +1,315 @@ +{%- macro synapse__eff_sat_v0(source_model, tracked_hashkey, src_ldts, src_rsrc, is_active_alias, source_is_single_batch, disable_hwm) -%} + +{%- set end_of_all_times = datavault4dbt.end_of_all_times() -%} +{%- set timestamp_format = datavault4dbt.timestamp_format() -%} + +{%- set ns = namespace(last_cte= "") -%} + +{%- set source_relation = ref(source_model) -%} + +{%- set tracked_hashkey = datavault4dbt.escape_column_names(tracked_hashkey) -%} +{%- set is_active_alias = datavault4dbt.escape_column_names(is_active_alias) -%} +{%- set src_ldts = datavault4dbt.escape_column_names(src_ldts) -%} +{%- set src_rsrc = datavault4dbt.escape_column_names(src_rsrc) -%} + +{{ log('columns to select: '~final_columns_to_select, false) }} + +{{ datavault4dbt.prepend_generated_by() }} + +WITH + +{# + In all cases, the source model is selected, and optionally a HWM is applied. +#} +{% if is_incremental() and not disable_hwm %} +max_ldts_prep AS ( + + SELECT + MAX({{ src_ldts }}) AS max_ldts + FROM {{ this }} + WHERE {{ src_ldts }} != {{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }} +), +{% endif %} + +source_data AS ( + + SELECT + {{ tracked_hashkey }}, + {{ src_ldts }} + FROM {{ source_relation }} src + WHERE {{ src_ldts }} NOT IN ('{{ datavault4dbt.beginning_of_all_times() }}', '{{ datavault4dbt.end_of_all_times() }}') + {%- if is_incremental() and not disable_hwm %} + AND src.{{ src_ldts }} > ( + SELECT max_ldts FROM max_ldts_prep + ) + {%- endif %} +), + +{# + In all incremental cases, the current status for each hashkey is selected from the existing Effectivity Satellite. +#} +{%- if is_incremental() %} +current_status_prep AS ( + + SELECT + {{ tracked_hashkey }}, + {{ is_active_alias}}, + ROW_NUMBER() OVER (PARTITION BY {{ tracked_hashkey }} ORDER BY {{ src_ldts }} DESC) as rn + FROM {{ this }} + +), + +current_status AS ( + + SELECT + {{ tracked_hashkey }}, + {{ is_active_alias }} + FROM current_status_prep + WHERE rn = 1 + +), +{% endif %} + +{# + This block is for multi-batch processing. +#} +{% if not source_is_single_batch %} + + {# + List of all Hashkeys with their date of first appearance in the source model. + #} + hashkeys AS ( + + SELECT + {{ tracked_hashkey }}, + MIN({{ src_ldts }}) as first_appearance + FROM source_data + GROUP BY {{ tracked_hashkey }} + + ), + + {# + Distinct list of load dates in the multi-batch source. + #} + load_dates AS ( + + SELECT Distinct + {{ src_ldts }} + FROM source_data + + ), + + {# + All combinations of hashkeys and loaddates, for loaddates after the first appearance of a hashkey. + #} + history AS ( + + SELECT + hk.{{ tracked_hashkey }}, + ld.{{ src_ldts }} + FROM hashkeys hk + CROSS JOIN load_dates ld + WHERE ld.{{ src_ldts }} >= hk.first_appearance + + ), + + {# + All theoretical combinations are checked against the actual occurences of hashkeys in each batch / loaddate. + If a Hashkey is part of a load/batch, is_active_alias is set to 1, because the hashkey was active in that load/batch. + If a Hashkey is not part of a load/batch, is_active_alias is set to 0, because the hashkey was not active in that load/batch. + #} + is_active AS ( + + SELECT + h.{{ tracked_hashkey }}, + h.{{ src_ldts }}, + CASE + WHEN src.{{ tracked_hashkey }} IS NULL THEN 0 + ELSE 1 + END as {{ is_active_alias }} + FROM history h + LEFT JOIN source_data src + ON src.{{ tracked_hashkey }} = h.{{ tracked_hashkey }} + AND src.{{ src_ldts }} = h.{{ src_ldts }} + + ), + + {# + The rows are deduplicated on the is_active_alias, to only include status changes. + Additionally, a ROW_NUMBER() is calculated in incremental runs, to use it in the next step for comparison against the current status. + #} + deduplicated_incoming_prep AS ( + + SELECT + is_active.{{ tracked_hashkey }}, + is_active.{{ src_ldts }}, + is_active.{{ is_active_alias }}, + LAG(is_active.{{ is_active_alias }}) OVER (PARTITION BY {{ tracked_hashkey }} ORDER BY {{ src_ldts }}) as lag_is_active + + FROM is_active + + ), + + deduplicated_incoming AS ( + + SELECT + deduplicated_incoming_prep.{{ tracked_hashkey }}, + deduplicated_incoming_prep.{{ src_ldts }}, + deduplicated_incoming_prep.{{ is_active_alias }} + + FROM + deduplicated_incoming_prep + WHERE + deduplicated_incoming_prep.{{ is_active_alias }} != deduplicated_incoming_prep.lag_is_active + OR deduplicated_incoming_prep.lag_is_active IS NULL + + ), + + {% set ns.last_cte = 'deduplicated_incoming' %} + +{# + This block is for single-batch processing +#} +{% else %} + + {# + In initial loads of single-batch eff sats, every hashkey of the source is set to active. + #} + new_hashkeys AS ( + + SELECT DISTINCT + src.{{ tracked_hashkey }}, + src.{{ src_ldts }}, + 1 as {{ is_active_alias }} + FROM source_data src + + {# + For incremental runs of single-batch eff sats, only hashkeys that are not active right now are set to active. + This automatically includes totally new hashkeys, or hashkeys that are currently set to inactive. + #} + {% if is_incremental() %} + LEFT JOIN current_status cs + ON src.{{ tracked_hashkey }} = cs.{{ tracked_hashkey }} + AND cs.{{ is_active_alias }} = 1 + WHERE cs.{{ tracked_hashkey }} IS NULL + {% endif %} + + ), + + {% set ns.last_cte = 'new_hashkeys' %} + +{% endif %} + +{# + In all incremental runs, the source needs to be scanned for all currently active hashkeys. + If they are no longer present, they will be deactived. +#} +{%- if is_incremental() %} + + {%- if not source_is_single_batch %} + disappeared_hashkeys AS ( + + SELECT DISTINCT + cs.{{ tracked_hashkey }}, + ldts.min_ldts as {{ src_ldts }}, + 0 as {{ is_active_alias }} + FROM current_status cs + LEFT JOIN ( + SELECT + MIN({{ src_ldts }}) as min_ldts + FROM deduplicated_incoming) ldts + ON 1 = 1 + LEFT JOIN deduplicated_incoming src + ON src.{{ tracked_hashkey }} = cs.{{ tracked_hashkey }} + AND src.{{ src_ldts }} = ldts.min_ldts + WHERE + cs.{{ is_active_alias }} = 1 + AND src.{{ tracked_hashkey }} IS NULL + AND ldts.min_ldts IS NOT NULL + + ), + {% else %} + disappeared_hashkeys AS ( + + SELECT DISTINCT + cs.{{ tracked_hashkey }}, + ldts.min_ldts as {{ src_ldts }}, + 0 as {{ is_active_alias }} + FROM current_status cs + LEFT JOIN ( + SELECT + MIN({{ src_ldts }}) as min_ldts + FROM source_data) ldts + ON 1 = 1 + WHERE NOT EXISTS ( + SELECT + 1 + FROM source_data src + WHERE src.{{ tracked_hashkey }} = cs.{{ tracked_hashkey }} + ) + AND cs.{{ is_active_alias }} = 1 + AND ldts.min_ldts IS NOT NULL + + ), + {% endif %} +{%- endif %} + +records_to_insert AS ( + + {# + This first part of the UNION includes: + - for single-batch loads: Only is_active_alias = 1, deactivations are handled later + - for multi-batch loads: Ativation and deactivation inside the multiple loads + #} + SELECT + di.{{ tracked_hashkey }}, + di.{{ src_ldts }}, + di.{{ is_active_alias }} + FROM {{ ns.last_cte }} di + + + {%- if is_incremental() %} + + {# + For incremental multi-batch loads, the earliest to-be inserted status is compared to the current status. + It will only be inserted if the status changed. We use the ROW_NUMBER() + #} + {%- if not source_is_single_batch %} + WHERE NOT EXISTS ( + SELECT 1 + FROM current_status + WHERE {{ datavault4dbt.multikey(tracked_hashkey, prefix=['current_status', 'di'], condition='=') }} + AND {{ datavault4dbt.multikey(is_active_alias, prefix=['current_status', 'di'], condition='=') }} + AND di.{{ src_ldts }} = (SELECT MIN({{ src_ldts }}) FROM deduplicated_incoming) + ) + AND di.{{ src_ldts }} > (SELECT MAX({{ src_ldts }}) FROM {{ this }}) + {% endif %} + + {# + For all incremental loads, the disappeared hashkeys are UNIONed. + #} + UNION + + SELECT + {{ tracked_hashkey }}, + {{ src_ldts }}, + {{ is_active_alias }} + FROM disappeared_hashkeys + + {%- endif %} + +) + +SELECT * +FROM records_to_insert ri + +{% if is_incremental() %} +WHERE NOT EXISTS ( + SELECT 1 + FROM {{ this }} t + WHERE t.{{ tracked_hashkey }} = ri.{{ tracked_hashkey }} + AND t.{{ src_ldts }} = ri.{{ src_ldts }} +) +{% endif %} + +{%- endmacro -%} \ No newline at end of file