diff --git a/README.md b/README.md
index a982a07f..6c4c034a 100644
--- a/README.md
+++ b/README.md
@@ -1,25 +1,26 @@
# datavault4dbt by [Scalefree International GmbH](https://www.scalefree.com)
-
+[
](https://www.datavault4dbt.com/)
---
### Included Macros
-- Staging Area (For Hashing, prejoins and ghost records)
-- Hubs, Links & Satellites (allowing multiple deltas)
-- Non-Historized Links and Satellites
-- Multi-Active Satellites
-- Virtualized End-Dating (in Satellites)
-- Reference Hubs, - Satellites, and - Tables
-- PIT Tables
- - Hook for Cleaning up PITs
-- Snapshot Control
+- [Staging Area (For Hashing, prejoins and ghost records)](https://www.datavault4dbt.com/documentation/macro-instructions/staging/)
+- [Hubs](https://www.datavault4dbt.com/documentation/macro-instructions/hubs/standard-hub/), [Links](https://www.datavault4dbt.com/documentation/macro-instructions/links/standard-link/) & [Satellites](https://www.datavault4dbt.com/documentation/macro-instructions/satellites/standard-satellite/standard-satellite-v0/) (allowing multiple deltas)
+- [Non-Historized Links](https://www.datavault4dbt.com/documentation/macro-instructions/links/non-historized-link/) and [Satellites](https://www.datavault4dbt.com/documentation/macro-instructions/satellites/non-historized-satellite/)
+- [Multi-Active Satellites](https://www.datavault4dbt.com/documentation/macro-instructions/satellites/multi-active-satellite/multi-active-satellite-v0/)
+- [Effectivity](https://www.datavault4dbt.com/documentation/macro-instructions/satellites/effectivity-satellite/) and [Record Tracking Satellites](https://www.datavault4dbt.com/documentation/macro-instructions/satellites/record-tracking-satellite/)
+- [Virtualized End-Dating (in Satellites)](https://www.datavault4dbt.com/documentation/macro-instructions/satellites/standard-satellite/standard-satellite-v1/)
+- [Reference Hubs](https://www.datavault4dbt.com/documentation/macro-instructions/reference-data/reference-hub/), [- Satellites](https://www.datavault4dbt.com/documentation/macro-instructions/reference-data/reference-satellite/reference-satellite-v0/), and [- Tables](https://www.datavault4dbt.com/documentation/macro-instructions/reference-data/reference-tables/)
+- [PIT Tables](https://www.datavault4dbt.com/documentation/macro-instructions/business-vault/pit/)
+ - [Hook for Cleaning up PITs](https://www.datavault4dbt.com/documentation/macro-instructions/business-vault/pit/hook-cleanup-pits/)
+- Snapshot Control [Tables](https://www.datavault4dbt.com/documentation/macro-instructions/business-vault/snapshot-control/snapshot-control-v0/) and [Views](https://www.datavault4dbt.com/documentation/macro-instructions/business-vault/snapshot-control/snapshot-control-v1/)
### Features
With datavault4dbt you will get a lot of awesome features, including:
- A Data Vault 2.0 implementation congruent to the original Data Vault 2.0 definition by Dan Linstedt
-- Ready for both Persistent Staging Areas and Transient Staging Areas, due to the allowance of multiple deltas in all macros, without losing any intermediate changes - Enforcing standards in naming conventions by implementing [global variables](https://github.com/ScalefreeCOM/datavault4dbt/wiki/Global-variables) for technical columns
+- Ready for both Persistent Staging Areas and Transient Staging Areas, due to the allowance of multiple deltas in all macros, without losing any intermediate changes - Enforcing standards in naming conventions by implementing [global variables](https://www.datavault4dbt.com/documentation/general-usage-notes/global-variables/) for technical columns
- A fully auditable solution for a Data Warehouse
- Creating a centralized, snapshot-based Business interface by using a centralized snapshot table supporting logarithmic logic
- A modern insert-only approach that avoids updating data
@@ -36,11 +37,8 @@ To use the macros efficiently, there are a few prerequisites you need to provide
### Resources:
+- Find technical information about the macros, examples, and more, on [the official datavault4dbt Website](https://www.datavault4dbt.com/)!
- Learn more about dbt [in the docs](https://docs.getdbt.com/docs/introduction)
-- Check out [Discourse](https://discourse.getdbt.com/) for commonly asked questions and answers
-- Join the [chat](https://community.getdbt.com/) on Slack for live discussions and support
-- Check out [the blog](https://blog.getdbt.com/) for the latest news on dbt's development and best practices
-- Find [dbt events](https://events.getdbt.com) near you
- Check out the [Scalefree-Blog](https://www.scalefree.com/blog/)
- [Data-Vault 2.0 with dbt #1](https://www.scalefree.com/scalefree-newsletter/data-vault-2-0-with-dbt-part-1/)
- [Data-Vault 2.0 with dbt #2](https://www.scalefree.com/scalefree-newsletter/data-vault-2-0-with-dbt-part-2/)
@@ -79,11 +77,11 @@ For further information on how to install packages in dbt, please visit the foll
[https://docs.getdbt.com/docs/building-a-dbt-project/package-management](https://docs.getdbt.com/docs/building-a-dbt-project/package-management#how-do-i-add-a-package-to-my-project)
### Global variables
-datavault4dbt is highly customizable by using many global variables. Since they are applied on multiple levels, a high rate of standardization across your data vault 2.0 solution is guaranteed. The default values of those variables are set inside the packages `dbt_project.yml` and should be copied to your own `dbt_project.yml`. For an explanation of all global variables see [the wiki](https://github.com/ScalefreeCOM/datavault4dbt/wiki/Global-variables).
+datavault4dbt is highly customizable by using many global variables. Since they are applied on multiple levels, a high rate of standardization across your data vault 2.0 solution is guaranteed. The default values of those variables are set inside the packages `dbt_project.yml` and should be copied to your own `dbt_project.yml`. For an explanation of all global variables see [the docs](https://www.datavault4dbt.com/documentation/general-usage-notes/global-variables/).
---
## Usage
-The datavault4dbt package provides macros for Staging and Creation of all DataVault-Entities you need, to build your own DataVault2.0 solution. The usage of the macros is well-explained in the documentation: https://github.com/ScalefreeCOM/datavault4dbt/wiki
+The datavault4dbt package provides macros for Staging and Creation of all DataVault-Entities you need, to build your own DataVault2.0 solution. The usage of the macros is well-explained in the [documentation]([url](https://www.datavault4dbt.com/documentation/)).
---
## Contributing
diff --git a/macros/supporting/ghost_record_per_datatype.sql b/macros/supporting/ghost_record_per_datatype.sql
index 341715dc..03698646 100644
--- a/macros/supporting/ghost_record_per_datatype.sql
+++ b/macros/supporting/ghost_record_per_datatype.sql
@@ -82,7 +82,7 @@
{%- if ghost_record_type == 'unknown' -%}
- {%- if datatype == 'TIMESTAMP' or datatype == 'TIMESTAMP WITH LOCAL TIMEZONE' %} {{- datavault4dbt.string_to_timestamp( timestamp_format , beginning_of_all_times) }} as "{{ column_name }}"
+ {%- if datatype == 'TIMESTAMP' or datatype == 'TIMESTAMP WITH LOCAL TIMEZONE' %} {{- datavault4dbt.string_to_timestamp( timestamp_format , beginning_of_all_times) }} as {{ alias }}
{%- elif datatype == 'DATE'-%} TO_DATE('{{ beginning_of_all_times_date }}', '{{ date_format }}' ) as {{ alias }}
{%- elif datatype.upper().startswith('VARCHAR') -%}
{%- if col_size is not none -%}
@@ -110,7 +110,7 @@
{%- elif ghost_record_type == 'error' -%}
- {%- if datatype == 'TIMESTAMP' or datatype == 'TIMESTAMP WITH LOCAL TIME ZONE' %} {{- datavault4dbt.string_to_timestamp( timestamp_format , end_of_all_times) }} as "{{ column_name }}"
+ {%- if datatype == 'TIMESTAMP' or datatype == 'TIMESTAMP WITH LOCAL TIME ZONE' %} {{- datavault4dbt.string_to_timestamp( timestamp_format , end_of_all_times) }} as {{ alias }}
{%- elif datatype == 'DATE'-%} TO_DATE('{{ end_of_all_times_date }}', '{{ date_format }}' ) as {{ alias }}
{%- elif datatype.upper().startswith('VARCHAR') -%}
{%- if col_size is not none -%}
@@ -267,7 +267,7 @@
{%- if ghost_record_type == 'unknown' -%}
- {%- if datatype in ['DATETIME', 'DATETIME2', 'DATETIMEOFFSET'] %} CONVERT({{ datatype }}, {{- datavault4dbt.string_to_timestamp( timestamp_format , beginning_of_all_times) }}) as "{{ column_name }}"
+ {%- if datatype in ['DATETIME', 'DATETIME2', 'DATETIMEOFFSET'] %} CONVERT({{ datatype }}, {{- datavault4dbt.string_to_timestamp( timestamp_format , beginning_of_all_times) }}) as "{{ alias }}"
{%- elif 'CHAR' in datatype -%}
{%- if col_size is not none -%}
{%- if (col_size | int) == -1 -%}
@@ -297,7 +297,7 @@
{%- elif ghost_record_type == 'error' -%}
- {%- if datatype in ['DATETIME', 'DATETIME2', 'DATETIMEOFFSET'] %} CONVERT({{ datatype }}, {{- datavault4dbt.string_to_timestamp( timestamp_format , end_of_all_times) }}) as "{{ column_name }}"
+ {%- if datatype in ['DATETIME', 'DATETIME2', 'DATETIMEOFFSET'] %} CONVERT({{ datatype }}, {{- datavault4dbt.string_to_timestamp( timestamp_format , end_of_all_times) }}) as "{{ alias }}"
{%- elif 'CHAR' in datatype -%}
{%- if col_size is not none -%}
{%- if (col_size | int) == -1 -%}
diff --git a/macros/supporting/string_to_timestamp.sql b/macros/supporting/string_to_timestamp.sql
index 3ae80306..05c52de4 100644
--- a/macros/supporting/string_to_timestamp.sql
+++ b/macros/supporting/string_to_timestamp.sql
@@ -8,7 +8,7 @@
{%- endmacro -%}
{%- macro exasol__string_to_timestamp(format, timestamp) -%}
- TO_TIMESTAMP('{{ timestamp }}', '{{ format }}')
+ CAST(TO_TIMESTAMP('{{ timestamp }}', '{{ format }}') AS {{ datavault4dbt.timestamp_default_dtype() }})
{%- endmacro -%}
{%- macro snowflake__string_to_timestamp(format, timestamp) -%}
diff --git a/macros/tables/bigquery/eff_sat_v0.sql b/macros/tables/bigquery/eff_sat_v0.sql
new file mode 100644
index 00000000..b9b70a80
--- /dev/null
+++ b/macros/tables/bigquery/eff_sat_v0.sql
@@ -0,0 +1,300 @@
+{%- macro default__eff_sat_v0(source_model, tracked_hashkey, src_ldts, src_rsrc, is_active_alias, source_is_single_batch, disable_hwm) -%}
+
+{%- set end_of_all_times = datavault4dbt.end_of_all_times() -%}
+{%- set beginning_of_all_times = datavault4dbt.beginning_of_all_times() -%}
+{%- set timestamp_format = datavault4dbt.timestamp_format() -%}
+
+{%- set ns = namespace(last_cte= "") -%}
+
+{%- set source_relation = ref(source_model) -%}
+
+{%- set tracked_hashkey = datavault4dbt.escape_column_names(tracked_hashkey) -%}
+{%- set is_active_alias = datavault4dbt.escape_column_names(is_active_alias) -%}
+{%- set src_ldts = datavault4dbt.escape_column_names(src_ldts) -%}
+{%- set src_rsrc = datavault4dbt.escape_column_names(src_rsrc) -%}
+
+{{ log('columns to select: '~final_columns_to_select, false) }}
+
+{{ datavault4dbt.prepend_generated_by() }}
+
+WITH
+
+{#
+ In all cases, the source model is selected, and optionally a HWM is applied.
+#}
+source_data AS (
+
+ SELECT
+ {{ tracked_hashkey }},
+ {{ src_ldts }}
+ FROM {{ source_relation }} src
+ WHERE {{ src_ldts }} NOT IN ({{ datavault4dbt.string_to_timestamp(timestamp_format, beginning_of_all_times) }}, {{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }})
+ {%- if is_incremental() and not disable_hwm %}
+ AND src.{{ src_ldts }} > (
+ SELECT
+ MAX({{ src_ldts }})
+ FROM {{ this }}
+ WHERE {{ src_ldts }} != {{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }}
+ )
+ {%- endif %}
+),
+
+{#
+ In all incremental cases, the current status for each hashkey is selected from the existing Effectivity Satellite.
+#}
+{%- if is_incremental() %}
+current_status AS (
+
+ SELECT
+ {{ tracked_hashkey }},
+ {{ is_active_alias }}
+ FROM {{ this }}
+ QUALIFY
+ ROW_NUMBER() OVER(PARTITION BY {{ tracked_hashkey }} ORDER BY {{ src_ldts }} DESC) = 1
+
+),
+{% endif %}
+
+{#
+ This block is for multi-batch processing.
+#}
+{% if not source_is_single_batch %}
+
+ {#
+ List of all Hashkeys with their date of first appearance in the source model.
+ #}
+ hashkeys AS (
+
+ SELECT
+ {{ tracked_hashkey }},
+ MIN({{ src_ldts }}) as first_appearance
+ FROM source_data
+ GROUP BY {{ tracked_hashkey }}
+
+ ),
+
+ {#
+ Distinct list of load dates in the multi-batch source.
+ #}
+ load_dates AS (
+
+ SELECT Distinct
+ {{ src_ldts }}
+ FROM source_data
+
+ ),
+
+ {#
+ All combinations of hashkeys and loaddates, for loaddates after the first appearance of a hashkey.
+ #}
+ history AS (
+
+ SELECT
+ hk.{{ tracked_hashkey }},
+ ld.{{ src_ldts }}
+ FROM hashkeys hk
+ CROSS JOIN load_dates ld
+ WHERE ld.{{ src_ldts }} >= hk.first_appearance
+
+ ),
+
+ {#
+ All theoretical combinations are checked against the actual occurences of hashkeys in each batch / loaddate.
+ If a Hashkey is part of a load/batch, is_active_alias is set to 1, because the hashkey was active in that load/batch.
+ If a Hashkey is not part of a load/batch, is_active_alias is set to 0, because the hashkey was not active in that load/batch.
+ #}
+ is_active AS (
+
+ SELECT
+ h.{{ tracked_hashkey }},
+ h.{{ src_ldts }},
+ CASE
+ WHEN src.{{ tracked_hashkey }} IS NULL THEN 0
+ ELSE 1
+ END as {{ is_active_alias }}
+ FROM history h
+ LEFT JOIN source_data src
+ ON src.{{ tracked_hashkey }} = h.{{ tracked_hashkey }}
+ AND src.{{ src_ldts }} = h.{{ src_ldts }}
+
+ ),
+
+ {#
+ The rows are deduplicated on the is_active_alias, to only include status changes.
+ Additionally, a ROW_NUMBER() is calculated in incremental runs, to use it in the next step for comparison against the current status.
+ #}
+ deduplicated_incoming_prep AS (
+
+ SELECT
+ is_active.{{ tracked_hashkey }},
+ is_active.{{ src_ldts }},
+ is_active.{{ is_active_alias }},
+ LAG(is_active.{{ is_active_alias }}) OVER (PARTITION BY {{ tracked_hashkey }} ORDER BY {{ src_ldts }}) as lag_is_active
+
+ FROM is_active
+
+ ),
+
+ deduplicated_incoming AS (
+
+ SELECT
+ deduplicated_incoming_prep.{{ tracked_hashkey }},
+ deduplicated_incoming_prep.{{ src_ldts }},
+ deduplicated_incoming_prep.{{ is_active_alias }}
+
+ FROM
+ deduplicated_incoming_prep
+ WHERE
+ deduplicated_incoming_prep.{{ is_active_alias }} != deduplicated_incoming_prep.lag_is_active
+ OR deduplicated_incoming_prep.lag_is_active IS NULL
+
+ ),
+
+ {% set ns.last_cte = 'deduplicated_incoming' %}
+
+{#
+ This block is for single-batch processing
+#}
+{% else %}
+
+ {#
+ In initial loads of single-batch eff sats, every hashkey of the source is set to active.
+ #}
+ new_hashkeys AS (
+
+ SELECT DISTINCT
+ src.{{ tracked_hashkey }},
+ src.{{ src_ldts }},
+ 1 as {{ is_active_alias }}
+ FROM source_data src
+
+ {#
+ For incremental runs of single-batch eff sats, only hashkeys that are not active right now are set to active.
+ This automatically includes totally new hashkeys, or hashkeys that are currently set to inactive.
+ #}
+ {% if is_incremental() %}
+ LEFT JOIN current_status cs
+ ON src.{{ tracked_hashkey }} = cs.{{ tracked_hashkey }}
+ AND cs.{{ is_active_alias }} = 1
+ WHERE cs.{{ tracked_hashkey }} IS NULL
+ {% endif %}
+
+ ),
+
+ {% set ns.last_cte = 'new_hashkeys' %}
+
+{% endif %}
+
+{#
+ In all incremental runs, the source needs to be scanned for all currently active hashkeys.
+ If they are no longer present, they will be deactived.
+#}
+{%- if is_incremental() %}
+
+ {%- if not source_is_single_batch %}
+ disappeared_hashkeys AS (
+
+ SELECT DISTINCT
+ cs.{{ tracked_hashkey }},
+ ldts.min_ldts as {{ src_ldts }},
+ 0 as {{ is_active_alias }}
+ FROM current_status cs
+ LEFT JOIN (
+ SELECT
+ MIN({{ src_ldts }}) as min_ldts
+ FROM deduplicated_incoming) ldts
+ ON 1 = 1
+ LEFT JOIN deduplicated_incoming src
+ ON src.{{ tracked_hashkey }} = cs.{{ tracked_hashkey }}
+ AND src.{{ src_ldts }} = ldts.min_ldts
+ WHERE
+ cs.{{ is_active_alias }} = 1
+ AND src.{{ tracked_hashkey }} IS NULL
+ AND ldts.min_ldts IS NOT NULL
+
+ ),
+ {% else %}
+ disappeared_hashkeys AS (
+
+ SELECT DISTINCT
+ cs.{{ tracked_hashkey }},
+ ldts.min_ldts as {{ src_ldts }},
+ 0 as {{ is_active_alias }}
+ FROM current_status cs
+ LEFT JOIN (
+ SELECT
+ MIN({{ src_ldts }}) as min_ldts
+ FROM source_data) ldts
+ ON 1 = 1
+ WHERE NOT EXISTS (
+ SELECT
+ 1
+ FROM source_data src
+ WHERE src.{{ tracked_hashkey }} = cs.{{ tracked_hashkey }}
+ )
+ AND cs.{{ is_active_alias }} = 1
+ AND ldts.min_ldts IS NOT NULL
+
+ ),
+ {% endif %}
+{%- endif %}
+
+records_to_insert AS (
+
+ {#
+ This first part of the UNION includes:
+ - for single-batch loads: Only is_active_alias = 1, deactivations are handled later
+ - for multi-batch loads: Ativation and deactivation inside the multiple loads
+ #}
+ SELECT
+ di.{{ tracked_hashkey }},
+ di.{{ src_ldts }},
+ di.{{ is_active_alias }}
+ FROM {{ ns.last_cte }} di
+
+
+ {%- if is_incremental() %}
+
+ {#
+ For incremental multi-batch loads, the earliest to-be inserted status is compared to the current status.
+ It will only be inserted if the status changed. We use the ROW_NUMBER()
+ #}
+ {%- if not source_is_single_batch %}
+ WHERE NOT EXISTS (
+ SELECT 1
+ FROM current_status
+ WHERE {{ datavault4dbt.multikey(tracked_hashkey, prefix=['current_status', 'di'], condition='=') }}
+ AND {{ datavault4dbt.multikey(is_active_alias, prefix=['current_status', 'di'], condition='=') }}
+ AND di.{{ src_ldts }} = (SELECT MIN({{ src_ldts }}) FROM deduplicated_incoming)
+ )
+ AND di.{{ src_ldts }} > (SELECT MAX({{ src_ldts }}) FROM {{ this }})
+ {% endif %}
+
+ {#
+ For all incremental loads, the disappeared hashkeys are UNIONed.
+ #}
+ UNION ALL
+
+ SELECT
+ {{ tracked_hashkey }},
+ {{ src_ldts }},
+ {{ is_active_alias }}
+ FROM disappeared_hashkeys
+
+ {%- endif %}
+
+)
+
+SELECT *
+FROM records_to_insert ri
+
+{% if is_incremental() %}
+WHERE NOT EXISTS (
+ SELECT 1
+ FROM {{ this }} t
+ WHERE t.{{ tracked_hashkey }} = ri.{{ tracked_hashkey }}
+ AND t.{{ src_ldts }} = ri.{{ src_ldts }}
+)
+{% endif %}
+
+{%- endmacro -%}
\ No newline at end of file
diff --git a/macros/tables/databricks/eff_sat_v0.sql b/macros/tables/databricks/eff_sat_v0.sql
new file mode 100644
index 00000000..7dbdd2ee
--- /dev/null
+++ b/macros/tables/databricks/eff_sat_v0.sql
@@ -0,0 +1,292 @@
+{%- macro databricks__eff_sat_v0(source_model, tracked_hashkey, src_ldts, src_rsrc, is_active_alias, source_is_single_batch, disable_hwm) -%}
+
+{%- set end_of_all_times = datavault4dbt.end_of_all_times() -%}
+{%- set timestamp_format = datavault4dbt.timestamp_format() -%}
+
+{%- set ns = namespace(last_cte= "") -%}
+
+{%- set source_relation = ref(source_model) -%}
+
+{%- set tracked_hashkey = datavault4dbt.escape_column_names(tracked_hashkey) -%}
+{%- set is_active_alias = datavault4dbt.escape_column_names(is_active_alias) -%}
+{%- set src_ldts = datavault4dbt.escape_column_names(src_ldts) -%}
+{%- set src_rsrc = datavault4dbt.escape_column_names(src_rsrc) -%}
+
+{{ log('columns to select: '~final_columns_to_select, false) }}
+
+{{ datavault4dbt.prepend_generated_by() }}
+
+WITH
+
+{#
+ In all cases, the source model is selected, and optionally a HWM is applied.
+#}
+source_data AS (
+
+ SELECT
+ {{ tracked_hashkey }},
+ {{ src_ldts }}
+ FROM {{ source_relation }} src
+ WHERE {{ src_ldts }} NOT IN ('{{ datavault4dbt.beginning_of_all_times() }}', '{{ datavault4dbt.end_of_all_times() }}')
+ {%- if is_incremental() and not disable_hwm %}
+ AND src.{{ src_ldts }} > (
+ SELECT
+ MAX({{ src_ldts }})
+ FROM {{ this }}
+ WHERE {{ src_ldts }} != {{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }}
+ )
+ {%- endif %}
+),
+
+{#
+ In all incremental cases, the current status for each hashkey is selected from the existing Effectivity Satellite.
+#}
+{%- if is_incremental() %}
+current_status AS (
+
+ SELECT
+ {{ tracked_hashkey }},
+ {{ is_active_alias }}
+ FROM {{ this }}
+ QUALIFY
+ ROW_NUMBER() OVER(PARTITION BY {{ tracked_hashkey }} ORDER BY {{ src_ldts }} DESC) = 1
+
+),
+{% endif %}
+
+{#
+ This block is for multi-batch processing.
+#}
+{% if not source_is_single_batch %}
+
+ {#
+ List of all Hashkeys with their date of first appearance in the source model.
+ #}
+ hashkeys AS (
+
+ SELECT
+ {{ tracked_hashkey }},
+ MIN({{ src_ldts }}) as first_appearance
+ FROM source_data
+ GROUP BY {{ tracked_hashkey }}
+
+ ),
+
+ {#
+ Distinct list of load dates in the multi-batch source.
+ #}
+ load_dates AS (
+
+ SELECT Distinct
+ {{ src_ldts }}
+ FROM source_data
+
+ ),
+
+ {#
+ All combinations of hashkeys and loaddates, for loaddates after the first appearance of a hashkey.
+ #}
+ history AS (
+
+ SELECT
+ hk.{{ tracked_hashkey }},
+ ld.{{ src_ldts }}
+ FROM hashkeys hk
+ CROSS JOIN load_dates ld
+ WHERE ld.{{ src_ldts }} >= hk.first_appearance
+
+ ),
+
+ {#
+ All theoretical combinations are checked against the actual occurences of hashkeys in each batch / loaddate.
+ If a Hashkey is part of a load/batch, is_active_alias is set to 1, because the hashkey was active in that load/batch.
+ If a Hashkey is not part of a load/batch, is_active_alias is set to 0, because the hashkey was not active in that load/batch.
+ #}
+ is_active AS (
+
+ SELECT
+ h.{{ tracked_hashkey }},
+ h.{{ src_ldts }},
+ CASE
+ WHEN src.{{ tracked_hashkey }} IS NULL THEN 0
+ ELSE 1
+ END as {{ is_active_alias }}
+ FROM history h
+ LEFT JOIN source_data src
+ ON src.{{ tracked_hashkey }} = h.{{ tracked_hashkey }}
+ AND src.{{ src_ldts }} = h.{{ src_ldts }}
+
+ ),
+
+ {#
+ The rows are deduplicated on the is_active_alias, to only include status changes.
+ Additionally, a ROW_NUMBER() is calculated in incremental runs, to use it in the next step for comparison against the current status.
+ #}
+ deduplicated_incoming AS (
+
+ SELECT
+ is_active.{{ tracked_hashkey }},
+ is_active.{{ src_ldts }},
+ is_active.{{ is_active_alias }}
+
+ {% if is_incremental() -%}
+ , ROW_NUMBER() OVER(PARTITION BY is_active.{{ tracked_hashkey }} ORDER BY is_active.{{ src_ldts }}) as rn
+ {%- endif %}
+
+ FROM is_active
+ QUALIFY
+ CASE
+ WHEN is_active.{{ is_active_alias }} = LAG(is_active.{{ is_active_alias }}) OVER (PARTITION BY {{ tracked_hashkey }} ORDER BY {{ src_ldts }}) THEN FALSE
+ ELSE TRUE
+ END
+
+ ),
+
+ {% set ns.last_cte = 'deduplicated_incoming' %}
+
+{#
+ This block is for single-batch processing
+#}
+{% else %}
+
+ {#
+ In initial loads of single-batch eff sats, every hashkey of the source is set to active.
+ #}
+ new_hashkeys AS (
+
+ SELECT DISTINCT
+ src.{{ tracked_hashkey }},
+ src.{{ src_ldts }},
+ 1 as {{ is_active_alias }}
+ FROM source_data src
+
+ {#
+ For incremental runs of single-batch eff sats, only hashkeys that are not active right now are set to active.
+ This automatically includes totally new hashkeys, or hashkeys that are currently set to inactive.
+ #}
+ {% if is_incremental() %}
+ LEFT JOIN current_status cs
+ ON src.{{ tracked_hashkey }} = cs.{{ tracked_hashkey }}
+ AND cs.{{ is_active_alias }} = 1
+ WHERE cs.{{ tracked_hashkey }} IS NULL
+ {% endif %}
+
+ ),
+
+ {% set ns.last_cte = 'new_hashkeys' %}
+
+{% endif %}
+
+{#
+ In all incremental runs, the source needs to be scanned for all currently active hashkeys.
+ If they are no longer present, they will be deactived.
+#}
+{%- if is_incremental() %}
+
+ {%- if not source_is_single_batch %}
+ disappeared_hashkeys AS (
+
+ SELECT DISTINCT
+ cs.{{ tracked_hashkey }},
+ ldts.min_ldts as {{ src_ldts }},
+ 0 as {{ is_active_alias }}
+ FROM current_status cs
+ LEFT JOIN (
+ SELECT
+ MIN({{ src_ldts }}) as min_ldts
+ FROM deduplicated_incoming) ldts
+ ON 1 = 1
+ LEFT JOIN deduplicated_incoming src
+ ON src.{{ tracked_hashkey }} = cs.{{ tracked_hashkey }}
+ AND src.{{ src_ldts }} = ldts.min_ldts
+ WHERE
+ cs.{{ is_active_alias }} = 1
+ AND src.{{ tracked_hashkey }} IS NULL
+ AND ldts.min_ldts IS NOT NULL
+
+ ),
+ {% else %}
+ disappeared_hashkeys AS (
+
+ SELECT DISTINCT
+ cs.{{ tracked_hashkey }},
+ ldts.min_ldts as {{ src_ldts }},
+ 0 as {{ is_active_alias }}
+ FROM current_status cs
+ LEFT JOIN (
+ SELECT
+ MIN({{ src_ldts }}) as min_ldts
+ FROM source_data) ldts
+ ON 1 = 1
+ WHERE NOT EXISTS (
+ SELECT
+ 1
+ FROM source_data src
+ WHERE src.{{ tracked_hashkey }} = cs.{{ tracked_hashkey }}
+ )
+ AND cs.{{ is_active_alias }} = 1
+ AND ldts.min_ldts IS NOT NULL
+
+ ),
+ {% endif %}
+{%- endif %}
+
+records_to_insert AS (
+
+ {#
+ This first part of the UNION includes:
+ - for single-batch loads: Only is_active_alias = 1, deactivations are handled later
+ - for multi-batch loads: Ativation and deactivation inside the multiple loads
+ #}
+ SELECT
+ di.{{ tracked_hashkey }},
+ di.{{ src_ldts }},
+ di.{{ is_active_alias }}
+ FROM {{ ns.last_cte }} di
+
+
+ {%- if is_incremental() %}
+
+ {#
+ For incremental multi-batch loads, the earliest to-be inserted status is compared to the current status.
+ It will only be inserted if the status changed. We use the ROW_NUMBER()
+ #}
+ {%- if not source_is_single_batch %}
+ WHERE NOT EXISTS (
+ SELECT 1
+ FROM current_status
+ WHERE {{ datavault4dbt.multikey(tracked_hashkey, prefix=['current_status', 'di'], condition='=') }}
+ AND {{ datavault4dbt.multikey(is_active_alias, prefix=['current_status', 'di'], condition='=') }}
+ AND di.{{ src_ldts }} = (SELECT MIN({{ src_ldts }}) FROM deduplicated_incoming)
+ )
+ AND di.{{ src_ldts }} > (SELECT MAX({{ src_ldts }}) FROM {{ this }})
+ {% endif %}
+
+ {#
+ For all incremental loads, the disappeared hashkeys are UNIONed.
+ #}
+ UNION
+
+ SELECT
+ {{ tracked_hashkey }},
+ {{ src_ldts }},
+ {{ is_active_alias }}
+ FROM disappeared_hashkeys
+
+ {%- endif %}
+
+)
+
+SELECT *
+FROM records_to_insert ri
+
+{% if is_incremental() %}
+WHERE NOT EXISTS (
+ SELECT 1
+ FROM {{ this }} t
+ WHERE t.{{ tracked_hashkey }} = ri.{{ tracked_hashkey }}
+ AND t.{{ src_ldts }} = ri.{{ src_ldts }}
+)
+{% endif %}
+
+{%- endmacro -%}
\ No newline at end of file
diff --git a/macros/tables/eff_sat_v0.sql b/macros/tables/eff_sat_v0.sql
new file mode 100644
index 00000000..2857e263
--- /dev/null
+++ b/macros/tables/eff_sat_v0.sql
@@ -0,0 +1,16 @@
+{%- macro eff_sat_v0(source_model, tracked_hashkey, src_ldts=none, src_rsrc=none, is_active_alias=none, source_is_single_batch=true, disable_hwm=false) -%}
+
+ {# Applying the default aliases as stored inside the global variables, if src_ldts, src_rsrc, and ledts_alias are not set. #}
+ {%- set src_ldts = datavault4dbt.replace_standard(src_ldts, 'datavault4dbt.ldts_alias', 'ldts') -%}
+ {%- set src_rsrc = datavault4dbt.replace_standard(src_rsrc, 'datavault4dbt.rsrc_alias', 'rsrc') -%}
+ {%- set is_active_alias = datavault4dbt.replace_standard(is_active_alias, 'datavault4dbt.is_active_alias', 'is_active') -%}
+
+ {{ return(adapter.dispatch('eff_sat_v0', 'datavault4dbt')(tracked_hashkey=tracked_hashkey,
+ src_ldts=src_ldts,
+ src_rsrc=src_rsrc,
+ is_active_alias=is_active_alias,
+ source_model=source_model,
+ source_is_single_batch=source_is_single_batch,
+ disable_hwm=disable_hwm) )
+ }}
+{%- endmacro -%}
\ No newline at end of file
diff --git a/macros/tables/exasol/eff_sat_v0.sql b/macros/tables/exasol/eff_sat_v0.sql
new file mode 100644
index 00000000..9248be70
--- /dev/null
+++ b/macros/tables/exasol/eff_sat_v0.sql
@@ -0,0 +1,293 @@
+{%- macro exasol__eff_sat_v0(source_model, tracked_hashkey, src_ldts, src_rsrc, is_active_alias, source_is_single_batch, disable_hwm) -%}
+
+{%- set end_of_all_times = datavault4dbt.end_of_all_times() -%}
+{%- set beginning_of_all_times = datavault4dbt.beginning_of_all_times() -%}
+{%- set timestamp_format = datavault4dbt.timestamp_format() -%}
+
+{%- set ns = namespace(last_cte= "") -%}
+
+{%- set source_relation = ref(source_model) -%}
+
+{%- set tracked_hashkey = datavault4dbt.escape_column_names(tracked_hashkey) -%}
+{%- set is_active_alias = datavault4dbt.escape_column_names(is_active_alias) -%}
+{%- set src_ldts = datavault4dbt.escape_column_names(src_ldts) -%}
+{%- set src_rsrc = datavault4dbt.escape_column_names(src_rsrc) -%}
+
+{{ log('columns to select: '~final_columns_to_select, false) }}
+
+{{ datavault4dbt.prepend_generated_by() }}
+
+WITH
+
+{#
+ In all cases, the source model is selected, and optionally a HWM is applied.
+#}
+source_data AS (
+
+ SELECT
+ {{ tracked_hashkey }},
+ {{ src_ldts }}
+ FROM {{ source_relation }} src
+ WHERE {{ src_ldts }} NOT IN ({{ datavault4dbt.string_to_timestamp(timestamp_format, beginning_of_all_times) }}, {{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }})
+ {%- if is_incremental() and not disable_hwm %}
+ AND src.{{ src_ldts }} > (
+ SELECT
+ MAX({{ src_ldts }})
+ FROM {{ this }}
+ WHERE {{ src_ldts }} != {{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }}
+ )
+ {%- endif %}
+),
+
+{#
+ In all incremental cases, the current status for each hashkey is selected from the existing Effectivity Satellite.
+#}
+{%- if is_incremental() %}
+current_status AS (
+
+ SELECT
+ {{ tracked_hashkey }},
+ {{ is_active_alias }}
+ FROM {{ this }}
+ QUALIFY
+ ROW_NUMBER() OVER(PARTITION BY {{ tracked_hashkey }} ORDER BY {{ src_ldts }} DESC) = 1
+
+),
+{% endif %}
+
+{#
+ This block is for multi-batch processing.
+#}
+{% if not source_is_single_batch %}
+
+ {#
+ List of all Hashkeys with their date of first appearance in the source model.
+ #}
+ hashkeys AS (
+
+ SELECT
+ {{ tracked_hashkey }},
+ MIN({{ src_ldts }}) as first_appearance
+ FROM source_data
+ GROUP BY {{ tracked_hashkey }}
+
+ ),
+
+ {#
+ Distinct list of load dates in the multi-batch source.
+ #}
+ load_dates AS (
+
+ SELECT Distinct
+ {{ src_ldts }}
+ FROM source_data
+
+ ),
+
+ {#
+ All combinations of hashkeys and loaddates, for loaddates after the first appearance of a hashkey.
+ #}
+ history AS (
+
+ SELECT
+ hk.{{ tracked_hashkey }},
+ ld.{{ src_ldts }}
+ FROM hashkeys hk
+ CROSS JOIN load_dates ld
+ WHERE ld.{{ src_ldts }} >= hk.first_appearance
+
+ ),
+
+ {#
+ All theoretical combinations are checked against the actual occurences of hashkeys in each batch / loaddate.
+ If a Hashkey is part of a load/batch, is_active_alias is set to 1, because the hashkey was active in that load/batch.
+ If a Hashkey is not part of a load/batch, is_active_alias is set to 0, because the hashkey was not active in that load/batch.
+ #}
+ is_active AS (
+
+ SELECT
+ h.{{ tracked_hashkey }},
+ h.{{ src_ldts }},
+ CASE
+ WHEN src.{{ tracked_hashkey }} IS NULL THEN 0
+ ELSE 1
+ END as {{ is_active_alias }}
+ FROM history h
+ LEFT JOIN source_data src
+ ON src.{{ tracked_hashkey }} = h.{{ tracked_hashkey }}
+ AND src.{{ src_ldts }} = h.{{ src_ldts }}
+
+ ),
+
+ {#
+ The rows are deduplicated on the is_active_alias, to only include status changes.
+ Additionally, a ROW_NUMBER() is calculated in incremental runs, to use it in the next step for comparison against the current status.
+ #}
+ deduplicated_incoming AS (
+
+ SELECT
+ is_active.{{ tracked_hashkey }},
+ is_active.{{ src_ldts }},
+ is_active.{{ is_active_alias }}
+
+ {% if is_incremental() -%}
+ , ROW_NUMBER() OVER(PARTITION BY is_active.{{ tracked_hashkey }} ORDER BY is_active.{{ src_ldts }}) as rn
+ {%- endif %}
+
+ FROM is_active
+ QUALIFY
+ CASE
+ WHEN is_active.{{ is_active_alias }} = LAG(is_active.{{ is_active_alias }}) OVER (PARTITION BY {{ tracked_hashkey }} ORDER BY {{ src_ldts }}) THEN FALSE
+ ELSE TRUE
+ END
+
+ ),
+
+ {% set ns.last_cte = 'deduplicated_incoming' %}
+
+{#
+ This block is for single-batch processing
+#}
+{% else %}
+
+ {#
+ In initial loads of single-batch eff sats, every hashkey of the source is set to active.
+ #}
+ new_hashkeys AS (
+
+ SELECT DISTINCT
+ src.{{ tracked_hashkey }},
+ src.{{ src_ldts }},
+ 1 as {{ is_active_alias }}
+ FROM source_data src
+
+ {#
+ For incremental runs of single-batch eff sats, only hashkeys that are not active right now are set to active.
+ This automatically includes totally new hashkeys, or hashkeys that are currently set to inactive.
+ #}
+ {% if is_incremental() %}
+ LEFT JOIN current_status cus
+ ON src.{{ tracked_hashkey }} = cus.{{ tracked_hashkey }}
+ AND cus.{{ is_active_alias }} = 1
+ WHERE cus.{{ tracked_hashkey }} IS NULL
+ {% endif %}
+
+ ),
+
+ {% set ns.last_cte = 'new_hashkeys' %}
+
+{% endif %}
+
+{#
+ In all incremental runs, the source needs to be scanned for all currently active hashkeys.
+ If they are no longer present, they will be deactived.
+#}
+{%- if is_incremental() %}
+
+ {%- if not source_is_single_batch %}
+ disappeared_hashkeys AS (
+
+ SELECT DISTINCT
+ cus.{{ tracked_hashkey }},
+ ldts.min_ldts as {{ src_ldts }},
+ 0 as {{ is_active_alias }}
+ FROM current_status cus
+ LEFT JOIN (
+ SELECT
+ MIN({{ src_ldts }}) as min_ldts
+ FROM deduplicated_incoming) ldts
+ ON 1 = 1
+ LEFT JOIN deduplicated_incoming src
+ ON src.{{ tracked_hashkey }} = cus.{{ tracked_hashkey }}
+ AND src.{{ src_ldts }} = ldts.min_ldts
+ WHERE
+ cus.{{ is_active_alias }} = 1
+ AND src.{{ tracked_hashkey }} IS NULL
+ AND ldts.min_ldts IS NOT NULL
+
+ ),
+ {% else %}
+ disappeared_hashkeys AS (
+
+ SELECT DISTINCT
+ cus.{{ tracked_hashkey }},
+ ldts.min_ldts as {{ src_ldts }},
+ 0 as {{ is_active_alias }}
+ FROM current_status cus
+ LEFT JOIN (
+ SELECT
+ MIN({{ src_ldts }}) as min_ldts
+ FROM source_data) ldts
+ ON 1 = 1
+ WHERE NOT EXISTS (
+ SELECT
+ 1
+ FROM source_data src
+ WHERE src.{{ tracked_hashkey }} = cus.{{ tracked_hashkey }}
+ )
+ AND cus.{{ is_active_alias }} = 1
+ AND ldts.min_ldts IS NOT NULL
+
+ ),
+ {% endif %}
+{%- endif %}
+
+records_to_insert AS (
+
+ {#
+ This first part of the UNION includes:
+ - for single-batch loads: Only is_active_alias = 1, deactivations are handled later
+ - for multi-batch loads: Ativation and deactivation inside the multiple loads
+ #}
+ SELECT
+ di.{{ tracked_hashkey }},
+ di.{{ src_ldts }},
+ di.{{ is_active_alias }}
+ FROM {{ ns.last_cte }} di
+
+
+ {%- if is_incremental() %}
+
+ {#
+ For incremental multi-batch loads, the earliest to-be inserted status is compared to the current status.
+ It will only be inserted if the status changed. We use the ROW_NUMBER()
+ #}
+ {%- if not source_is_single_batch %}
+ WHERE NOT EXISTS (
+ SELECT 1
+ FROM current_status
+ WHERE {{ datavault4dbt.multikey(tracked_hashkey, prefix=['current_status', 'di'], condition='=') }}
+ AND {{ datavault4dbt.multikey(is_active_alias, prefix=['current_status', 'di'], condition='=') }}
+ AND di.{{ src_ldts }} = (SELECT MIN({{ src_ldts }}) FROM deduplicated_incoming)
+ )
+ AND di.{{ src_ldts }} > (SELECT MAX({{ src_ldts }}) FROM {{ this }})
+ {% endif %}
+
+ {#
+ For all incremental loads, the disappeared hashkeys are UNIONed.
+ #}
+ UNION
+
+ SELECT
+ {{ tracked_hashkey }},
+ {{ src_ldts }},
+ {{ is_active_alias }}
+ FROM disappeared_hashkeys
+
+ {%- endif %}
+
+)
+
+SELECT *
+FROM records_to_insert ri
+
+{% if is_incremental() %}
+WHERE NOT EXISTS (
+ SELECT 1
+ FROM {{ this }} t
+ WHERE t.{{ tracked_hashkey }} = ri.{{ tracked_hashkey }}
+ AND t.{{ src_ldts }} = ri.{{ src_ldts }}
+)
+{% endif %}
+
+{%- endmacro -%}
\ No newline at end of file
diff --git a/macros/tables/fabric/eff_sat_v0.sql b/macros/tables/fabric/eff_sat_v0.sql
new file mode 100644
index 00000000..c50d5d34
--- /dev/null
+++ b/macros/tables/fabric/eff_sat_v0.sql
@@ -0,0 +1,315 @@
+{%- macro fabric__eff_sat_v0(source_model, tracked_hashkey, src_ldts, src_rsrc, is_active_alias, source_is_single_batch, disable_hwm) -%}
+
+{%- set end_of_all_times = datavault4dbt.end_of_all_times() -%}
+{%- set timestamp_format = datavault4dbt.timestamp_format() -%}
+
+{%- set ns = namespace(last_cte= "") -%}
+
+{%- set source_relation = ref(source_model) -%}
+
+{%- set tracked_hashkey = datavault4dbt.escape_column_names(tracked_hashkey) -%}
+{%- set is_active_alias = datavault4dbt.escape_column_names(is_active_alias) -%}
+{%- set src_ldts = datavault4dbt.escape_column_names(src_ldts) -%}
+{%- set src_rsrc = datavault4dbt.escape_column_names(src_rsrc) -%}
+
+{{ log('columns to select: '~final_columns_to_select, false) }}
+
+{{ datavault4dbt.prepend_generated_by() }}
+
+WITH
+
+{#
+ In all cases, the source model is selected, and optionally a HWM is applied.
+#}
+{% if is_incremental() and not disable_hwm %}
+max_ldts_prep AS (
+
+ SELECT
+ MAX({{ src_ldts }}) AS max_ldts
+ FROM {{ this }}
+ WHERE {{ src_ldts }} != {{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }}
+),
+{% endif %}
+
+source_data AS (
+
+ SELECT
+ {{ tracked_hashkey }},
+ {{ src_ldts }}
+ FROM {{ source_relation }} src
+ WHERE {{ src_ldts }} NOT IN ('{{ datavault4dbt.beginning_of_all_times() }}', '{{ datavault4dbt.end_of_all_times() }}')
+ {%- if is_incremental() and not disable_hwm %}
+ AND src.{{ src_ldts }} > (
+ SELECT max_ldts FROM max_ldts_prep
+ )
+ {%- endif %}
+),
+
+{#
+ In all incremental cases, the current status for each hashkey is selected from the existing Effectivity Satellite.
+#}
+{%- if is_incremental() %}
+current_status_prep AS (
+
+ SELECT
+ {{ tracked_hashkey }},
+ {{ is_active_alias}},
+ ROW_NUMBER() OVER (PARTITION BY {{ tracked_hashkey }} ORDER BY {{ src_ldts }} DESC) as rn
+ FROM {{ this }}
+
+),
+
+current_status AS (
+
+ SELECT
+ {{ tracked_hashkey }},
+ {{ is_active_alias }}
+ FROM current_status_prep
+ WHERE rn = 1
+
+),
+{% endif %}
+
+{#
+ This block is for multi-batch processing.
+#}
+{% if not source_is_single_batch %}
+
+ {#
+ List of all Hashkeys with their date of first appearance in the source model.
+ #}
+ hashkeys AS (
+
+ SELECT
+ {{ tracked_hashkey }},
+ MIN({{ src_ldts }}) as first_appearance
+ FROM source_data
+ GROUP BY {{ tracked_hashkey }}
+
+ ),
+
+ {#
+ Distinct list of load dates in the multi-batch source.
+ #}
+ load_dates AS (
+
+ SELECT Distinct
+ {{ src_ldts }}
+ FROM source_data
+
+ ),
+
+ {#
+ All combinations of hashkeys and loaddates, for loaddates after the first appearance of a hashkey.
+ #}
+ history AS (
+
+ SELECT
+ hk.{{ tracked_hashkey }},
+ ld.{{ src_ldts }}
+ FROM hashkeys hk
+ CROSS JOIN load_dates ld
+ WHERE ld.{{ src_ldts }} >= hk.first_appearance
+
+ ),
+
+ {#
+ All theoretical combinations are checked against the actual occurences of hashkeys in each batch / loaddate.
+ If a Hashkey is part of a load/batch, is_active_alias is set to 1, because the hashkey was active in that load/batch.
+ If a Hashkey is not part of a load/batch, is_active_alias is set to 0, because the hashkey was not active in that load/batch.
+ #}
+ is_active AS (
+
+ SELECT
+ h.{{ tracked_hashkey }},
+ h.{{ src_ldts }},
+ CASE
+ WHEN src.{{ tracked_hashkey }} IS NULL THEN 0
+ ELSE 1
+ END as {{ is_active_alias }}
+ FROM history h
+ LEFT JOIN source_data src
+ ON src.{{ tracked_hashkey }} = h.{{ tracked_hashkey }}
+ AND src.{{ src_ldts }} = h.{{ src_ldts }}
+
+ ),
+
+ {#
+ The rows are deduplicated on the is_active_alias, to only include status changes.
+ Additionally, a ROW_NUMBER() is calculated in incremental runs, to use it in the next step for comparison against the current status.
+ #}
+ deduplicated_incoming_prep AS (
+
+ SELECT
+ is_active.{{ tracked_hashkey }},
+ is_active.{{ src_ldts }},
+ is_active.{{ is_active_alias }},
+ LAG(is_active.{{ is_active_alias }}) OVER (PARTITION BY {{ tracked_hashkey }} ORDER BY {{ src_ldts }}) as lag_is_active
+
+ FROM is_active
+
+ ),
+
+ deduplicated_incoming AS (
+
+ SELECT
+ deduplicated_incoming_prep.{{ tracked_hashkey }},
+ deduplicated_incoming_prep.{{ src_ldts }},
+ deduplicated_incoming_prep.{{ is_active_alias }}
+
+ FROM
+ deduplicated_incoming_prep
+ WHERE
+ deduplicated_incoming_prep.{{ is_active_alias }} != deduplicated_incoming_prep.lag_is_active
+ OR deduplicated_incoming_prep.lag_is_active IS NULL
+
+ ),
+
+ {% set ns.last_cte = 'deduplicated_incoming' %}
+
+{#
+ This block is for single-batch processing
+#}
+{% else %}
+
+ {#
+ In initial loads of single-batch eff sats, every hashkey of the source is set to active.
+ #}
+ new_hashkeys AS (
+
+ SELECT DISTINCT
+ src.{{ tracked_hashkey }},
+ src.{{ src_ldts }},
+ 1 as {{ is_active_alias }}
+ FROM source_data src
+
+ {#
+ For incremental runs of single-batch eff sats, only hashkeys that are not active right now are set to active.
+ This automatically includes totally new hashkeys, or hashkeys that are currently set to inactive.
+ #}
+ {% if is_incremental() %}
+ LEFT JOIN current_status cs
+ ON src.{{ tracked_hashkey }} = cs.{{ tracked_hashkey }}
+ AND cs.{{ is_active_alias }} = 1
+ WHERE cs.{{ tracked_hashkey }} IS NULL
+ {% endif %}
+
+ ),
+
+ {% set ns.last_cte = 'new_hashkeys' %}
+
+{% endif %}
+
+{#
+ In all incremental runs, the source needs to be scanned for all currently active hashkeys.
+ If they are no longer present, they will be deactived.
+#}
+{%- if is_incremental() %}
+
+ {%- if not source_is_single_batch %}
+ disappeared_hashkeys AS (
+
+ SELECT DISTINCT
+ cs.{{ tracked_hashkey }},
+ ldts.min_ldts as {{ src_ldts }},
+ 0 as {{ is_active_alias }}
+ FROM current_status cs
+ LEFT JOIN (
+ SELECT
+ MIN({{ src_ldts }}) as min_ldts
+ FROM deduplicated_incoming) ldts
+ ON 1 = 1
+ LEFT JOIN deduplicated_incoming src
+ ON src.{{ tracked_hashkey }} = cs.{{ tracked_hashkey }}
+ AND src.{{ src_ldts }} = ldts.min_ldts
+ WHERE
+ cs.{{ is_active_alias }} = 1
+ AND src.{{ tracked_hashkey }} IS NULL
+ AND ldts.min_ldts IS NOT NULL
+
+ ),
+ {% else %}
+ disappeared_hashkeys AS (
+
+ SELECT DISTINCT
+ cs.{{ tracked_hashkey }},
+ ldts.min_ldts as {{ src_ldts }},
+ 0 as {{ is_active_alias }}
+ FROM current_status cs
+ LEFT JOIN (
+ SELECT
+ MIN({{ src_ldts }}) as min_ldts
+ FROM source_data) ldts
+ ON 1 = 1
+ WHERE NOT EXISTS (
+ SELECT
+ 1
+ FROM source_data src
+ WHERE src.{{ tracked_hashkey }} = cs.{{ tracked_hashkey }}
+ )
+ AND cs.{{ is_active_alias }} = 1
+ AND ldts.min_ldts IS NOT NULL
+
+ ),
+ {% endif %}
+{%- endif %}
+
+records_to_insert AS (
+
+ {#
+ This first part of the UNION includes:
+ - for single-batch loads: Only is_active_alias = 1, deactivations are handled later
+ - for multi-batch loads: Ativation and deactivation inside the multiple loads
+ #}
+ SELECT
+ di.{{ tracked_hashkey }},
+ di.{{ src_ldts }},
+ di.{{ is_active_alias }}
+ FROM {{ ns.last_cte }} di
+
+
+ {%- if is_incremental() %}
+
+ {#
+ For incremental multi-batch loads, the earliest to-be inserted status is compared to the current status.
+ It will only be inserted if the status changed. We use the ROW_NUMBER()
+ #}
+ {%- if not source_is_single_batch %}
+ WHERE NOT EXISTS (
+ SELECT 1
+ FROM current_status
+ WHERE {{ datavault4dbt.multikey(tracked_hashkey, prefix=['current_status', 'di'], condition='=') }}
+ AND {{ datavault4dbt.multikey(is_active_alias, prefix=['current_status', 'di'], condition='=') }}
+ AND di.{{ src_ldts }} = (SELECT MIN({{ src_ldts }}) FROM deduplicated_incoming)
+ )
+ AND di.{{ src_ldts }} > (SELECT MAX({{ src_ldts }}) FROM {{ this }})
+ {% endif %}
+
+ {#
+ For all incremental loads, the disappeared hashkeys are UNIONed.
+ #}
+ UNION
+
+ SELECT
+ {{ tracked_hashkey }},
+ {{ src_ldts }},
+ {{ is_active_alias }}
+ FROM disappeared_hashkeys
+
+ {%- endif %}
+
+)
+
+SELECT *
+FROM records_to_insert ri
+
+{% if is_incremental() %}
+WHERE NOT EXISTS (
+ SELECT 1
+ FROM {{ this }} t
+ WHERE t.{{ tracked_hashkey }} = ri.{{ tracked_hashkey }}
+ AND t.{{ src_ldts }} = ri.{{ src_ldts }}
+)
+{% endif %}
+
+{%- endmacro -%}
\ No newline at end of file
diff --git a/macros/tables/fabric/sat_v0.sql b/macros/tables/fabric/sat_v0.sql
index 0e07a5aa..7223949c 100644
--- a/macros/tables/fabric/sat_v0.sql
+++ b/macros/tables/fabric/sat_v0.sql
@@ -63,7 +63,7 @@ latest_entries_in_sat_prep AS (
SELECT
tgt.{{ parent_hashkey }},
tgt.{{ ns.hdiff_alias }},
- ROW_NUMBER() OVER(PARTITION BY tgt.{{ parent_hashkey|lower }} ORDER BY tgt.{{ src_ldts }} DESC) as rn
+ ROW_NUMBER() OVER(PARTITION BY tgt.{{ parent_hashkey }} ORDER BY tgt.{{ src_ldts }} DESC) as rn
FROM {{ this }} tgt
INNER JOIN distinct_incoming_hashkeys src
ON tgt.{{ parent_hashkey }} = src.{{ parent_hashkey }}
@@ -91,7 +91,7 @@ deduplicated_numbered_source_prep AS (
{{ parent_hashkey }},
{{ ns.hdiff_alias }},
{{ datavault4dbt.print_list(source_cols) }}
- , LAG({{ ns.hdiff_alias }}) OVER(PARTITION BY {{ parent_hashkey|lower }} ORDER BY {{ src_ldts }}) as prev_hashdiff
+ , LAG({{ ns.hdiff_alias }}) OVER(PARTITION BY {{ parent_hashkey }} ORDER BY {{ src_ldts }}) as prev_hashdiff
FROM source_data
),
@@ -142,4 +142,4 @@ records_to_insert AS (
SELECT * FROM records_to_insert
-{%- endmacro -%}
\ No newline at end of file
+{%- endmacro -%}
diff --git a/macros/tables/oracle/eff_sat_v0.sql b/macros/tables/oracle/eff_sat_v0.sql
new file mode 100644
index 00000000..2ff92746
--- /dev/null
+++ b/macros/tables/oracle/eff_sat_v0.sql
@@ -0,0 +1,316 @@
+{%- macro oracle__eff_sat_v0(source_model, tracked_hashkey, src_ldts, src_rsrc, is_active_alias, source_is_single_batch, disable_hwm) -%}
+
+{%- set beginning_of_all_times = datavault4dbt.beginning_of_all_times() -%}
+{%- set end_of_all_times = datavault4dbt.end_of_all_times() -%}
+{%- set timestamp_format = datavault4dbt.timestamp_format() -%}
+
+{%- set ns = namespace(last_cte= "") -%}
+
+{%- set source_relation = ref(source_model) -%}
+
+{%- set tracked_hashkey = datavault4dbt.escape_column_names(tracked_hashkey) -%}
+{%- set is_active_alias = datavault4dbt.escape_column_names(is_active_alias) -%}
+{%- set src_ldts = datavault4dbt.escape_column_names(src_ldts) -%}
+{%- set src_rsrc = datavault4dbt.escape_column_names(src_rsrc) -%}
+
+{{ log('columns to select: '~final_columns_to_select, false) }}
+
+{{ datavault4dbt.prepend_generated_by() }}
+
+WITH
+
+{#
+ In all cases, the source model is selected, and optionally a HWM is applied.
+#}
+{% if is_incremental() and not disable_hwm %}
+max_ldts_prep AS (
+
+ SELECT
+ MAX({{ src_ldts }}) AS max_ldts
+ FROM {{ this }}
+ WHERE {{ src_ldts }} != {{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }}
+),
+{% endif %}
+
+source_data AS (
+
+ SELECT
+ {{ tracked_hashkey }},
+ {{ src_ldts }}
+ FROM {{ source_relation }} src
+ WHERE {{ src_ldts }} NOT IN ({{ datavault4dbt.string_to_timestamp(timestamp_format, beginning_of_all_times) }}, {{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }})
+ {%- if is_incremental() and not disable_hwm %}
+ AND src.{{ src_ldts }} > (
+ SELECT max_ldts FROM max_ldts_prep
+ )
+ {%- endif %}
+),
+
+{#
+ In all incremental cases, the current status for each hashkey is selected from the existing Effectivity Satellite.
+#}
+{%- if is_incremental() %}
+current_status_prep AS (
+
+ SELECT
+ {{ tracked_hashkey }},
+ {{ is_active_alias}},
+ ROW_NUMBER() OVER (PARTITION BY {{ tracked_hashkey }} ORDER BY {{ src_ldts }} DESC) as rn
+ FROM {{ this }}
+
+),
+
+current_status AS (
+
+ SELECT
+ {{ tracked_hashkey }},
+ {{ is_active_alias }}
+ FROM current_status_prep
+ WHERE rn = 1
+
+),
+{% endif %}
+
+{#
+ This block is for multi-batch processing.
+#}
+{% if not source_is_single_batch %}
+
+ {#
+ List of all Hashkeys with their date of first appearance in the source model.
+ #}
+ hashkeys AS (
+
+ SELECT
+ {{ tracked_hashkey }},
+ MIN({{ src_ldts }}) as first_appearance
+ FROM source_data
+ GROUP BY {{ tracked_hashkey }}
+
+ ),
+
+ {#
+ Distinct list of load dates in the multi-batch source.
+ #}
+ load_dates AS (
+
+ SELECT Distinct
+ {{ src_ldts }}
+ FROM source_data
+
+ ),
+
+ {#
+ All combinations of hashkeys and loaddates, for loaddates after the first appearance of a hashkey.
+ #}
+ history AS (
+
+ SELECT
+ hk.{{ tracked_hashkey }},
+ ld.{{ src_ldts }}
+ FROM hashkeys hk
+ CROSS JOIN load_dates ld
+ WHERE ld.{{ src_ldts }} >= hk.first_appearance
+
+ ),
+
+ {#
+ All theoretical combinations are checked against the actual occurences of hashkeys in each batch / loaddate.
+ If a Hashkey is part of a load/batch, is_active_alias is set to 1, because the hashkey was active in that load/batch.
+ If a Hashkey is not part of a load/batch, is_active_alias is set to 0, because the hashkey was not active in that load/batch.
+ #}
+ is_active AS (
+
+ SELECT
+ h.{{ tracked_hashkey }},
+ h.{{ src_ldts }},
+ CASE
+ WHEN src.{{ tracked_hashkey }} IS NULL THEN 0
+ ELSE 1
+ END as {{ is_active_alias }}
+ FROM history h
+ LEFT JOIN source_data src
+ ON src.{{ tracked_hashkey }} = h.{{ tracked_hashkey }}
+ AND src.{{ src_ldts }} = h.{{ src_ldts }}
+
+ ),
+
+ {#
+ The rows are deduplicated on the is_active_alias, to only include status changes.
+ Additionally, a ROW_NUMBER() is calculated in incremental runs, to use it in the next step for comparison against the current status.
+ #}
+ deduplicated_incoming_prep AS (
+
+ SELECT
+ is_active.{{ tracked_hashkey }},
+ is_active.{{ src_ldts }},
+ is_active.{{ is_active_alias }},
+ LAG(is_active.{{ is_active_alias }}) OVER (PARTITION BY {{ tracked_hashkey }} ORDER BY {{ src_ldts }}) as lag_is_active
+
+ FROM is_active
+
+ ),
+
+ deduplicated_incoming AS (
+
+ SELECT
+ deduplicated_incoming_prep.{{ tracked_hashkey }},
+ deduplicated_incoming_prep.{{ src_ldts }},
+ deduplicated_incoming_prep.{{ is_active_alias }}
+
+ FROM
+ deduplicated_incoming_prep
+ WHERE
+ deduplicated_incoming_prep.{{ is_active_alias }} != deduplicated_incoming_prep.lag_is_active
+ OR deduplicated_incoming_prep.lag_is_active IS NULL
+
+ ),
+
+ {% set ns.last_cte = 'deduplicated_incoming' %}
+
+{#
+ This block is for single-batch processing
+#}
+{% else %}
+
+ {#
+ In initial loads of single-batch eff sats, every hashkey of the source is set to active.
+ #}
+ new_hashkeys AS (
+
+ SELECT DISTINCT
+ src.{{ tracked_hashkey }},
+ src.{{ src_ldts }},
+ 1 as {{ is_active_alias }}
+ FROM source_data src
+
+ {#
+ For incremental runs of single-batch eff sats, only hashkeys that are not active right now are set to active.
+ This automatically includes totally new hashkeys, or hashkeys that are currently set to inactive.
+ #}
+ {% if is_incremental() %}
+ LEFT JOIN current_status cs
+ ON src.{{ tracked_hashkey }} = cs.{{ tracked_hashkey }}
+ AND cs.{{ is_active_alias }} = 1
+ WHERE cs.{{ tracked_hashkey }} IS NULL
+ {% endif %}
+
+ ),
+
+ {% set ns.last_cte = 'new_hashkeys' %}
+
+{% endif %}
+
+{#
+ In all incremental runs, the source needs to be scanned for all currently active hashkeys.
+ If they are no longer present, they will be deactived.
+#}
+{%- if is_incremental() %}
+
+ {%- if not source_is_single_batch %}
+ disappeared_hashkeys AS (
+
+ SELECT DISTINCT
+ cs.{{ tracked_hashkey }},
+ ldts.min_ldts as {{ src_ldts }},
+ 0 as {{ is_active_alias }}
+ FROM current_status cs
+ LEFT JOIN (
+ SELECT
+ MIN({{ src_ldts }}) as min_ldts
+ FROM deduplicated_incoming) ldts
+ ON 1 = 1
+ LEFT JOIN deduplicated_incoming src
+ ON src.{{ tracked_hashkey }} = cs.{{ tracked_hashkey }}
+ AND src.{{ src_ldts }} = ldts.min_ldts
+ WHERE
+ cs.{{ is_active_alias }} = 1
+ AND src.{{ tracked_hashkey }} IS NULL
+ AND ldts.min_ldts IS NOT NULL
+
+ ),
+ {% else %}
+ disappeared_hashkeys AS (
+
+ SELECT DISTINCT
+ cs.{{ tracked_hashkey }},
+ ldts.min_ldts as {{ src_ldts }},
+ 0 as {{ is_active_alias }}
+ FROM current_status cs
+ LEFT JOIN (
+ SELECT
+ MIN({{ src_ldts }}) as min_ldts
+ FROM source_data) ldts
+ ON 1 = 1
+ WHERE NOT EXISTS (
+ SELECT
+ 1
+ FROM source_data src
+ WHERE src.{{ tracked_hashkey }} = cs.{{ tracked_hashkey }}
+ )
+ AND cs.{{ is_active_alias }} = 1
+ AND ldts.min_ldts IS NOT NULL
+
+ ),
+ {% endif %}
+{%- endif %}
+
+records_to_insert AS (
+
+ {#
+ This first part of the UNION includes:
+ - for single-batch loads: Only is_active_alias = 1, deactivations are handled later
+ - for multi-batch loads: Ativation and deactivation inside the multiple loads
+ #}
+ SELECT
+ di.{{ tracked_hashkey }},
+ di.{{ src_ldts }},
+ di.{{ is_active_alias }}
+ FROM {{ ns.last_cte }} di
+
+
+ {%- if is_incremental() %}
+
+ {#
+ For incremental multi-batch loads, the earliest to-be inserted status is compared to the current status.
+ It will only be inserted if the status changed. We use the ROW_NUMBER()
+ #}
+ {%- if not source_is_single_batch %}
+ WHERE NOT EXISTS (
+ SELECT 1
+ FROM current_status
+ WHERE {{ datavault4dbt.multikey(tracked_hashkey, prefix=['current_status', 'di'], condition='=') }}
+ AND {{ datavault4dbt.multikey(is_active_alias, prefix=['current_status', 'di'], condition='=') }}
+ AND di.{{ src_ldts }} = (SELECT MIN({{ src_ldts }}) FROM deduplicated_incoming)
+ )
+ AND di.{{ src_ldts }} > (SELECT MAX({{ src_ldts }}) FROM {{ this }})
+ {% endif %}
+
+ {#
+ For all incremental loads, the disappeared hashkeys are UNIONed.
+ #}
+ UNION
+
+ SELECT
+ {{ tracked_hashkey }},
+ {{ src_ldts }},
+ {{ is_active_alias }}
+ FROM disappeared_hashkeys
+
+ {%- endif %}
+
+)
+
+SELECT *
+FROM records_to_insert ri
+
+{% if is_incremental() %}
+WHERE NOT EXISTS (
+ SELECT 1
+ FROM {{ this }} t
+ WHERE t.{{ tracked_hashkey }} = ri.{{ tracked_hashkey }}
+ AND t.{{ src_ldts }} = ri.{{ src_ldts }}
+)
+{% endif %}
+
+{%- endmacro -%}
\ No newline at end of file
diff --git a/macros/tables/postgres/eff_sat_v0.sql b/macros/tables/postgres/eff_sat_v0.sql
new file mode 100644
index 00000000..ef8d63c8
--- /dev/null
+++ b/macros/tables/postgres/eff_sat_v0.sql
@@ -0,0 +1,315 @@
+{%- macro postgres__eff_sat_v0(source_model, tracked_hashkey, src_ldts, src_rsrc, is_active_alias, source_is_single_batch, disable_hwm) -%}
+
+{%- set end_of_all_times = datavault4dbt.end_of_all_times() -%}
+{%- set timestamp_format = datavault4dbt.timestamp_format() -%}
+
+{%- set ns = namespace(last_cte= "") -%}
+
+{%- set source_relation = ref(source_model) -%}
+
+{%- set tracked_hashkey = datavault4dbt.escape_column_names(tracked_hashkey) -%}
+{%- set is_active_alias = datavault4dbt.escape_column_names(is_active_alias) -%}
+{%- set src_ldts = datavault4dbt.escape_column_names(src_ldts) -%}
+{%- set src_rsrc = datavault4dbt.escape_column_names(src_rsrc) -%}
+
+{{ log('columns to select: '~final_columns_to_select, false) }}
+
+{{ datavault4dbt.prepend_generated_by() }}
+
+WITH
+
+{#
+ In all cases, the source model is selected, and optionally a HWM is applied.
+#}
+{% if is_incremental() and not disable_hwm %}
+max_ldts_prep AS (
+
+ SELECT
+ MAX({{ src_ldts }}) AS max_ldts
+ FROM {{ this }}
+ WHERE {{ src_ldts }} != {{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }}
+),
+{% endif %}
+
+source_data AS (
+
+ SELECT
+ {{ tracked_hashkey }},
+ {{ src_ldts }}
+ FROM {{ source_relation }} src
+ WHERE {{ src_ldts }} NOT IN ('{{ datavault4dbt.beginning_of_all_times() }}', '{{ datavault4dbt.end_of_all_times() }}')
+ {%- if is_incremental() and not disable_hwm %}
+ AND src.{{ src_ldts }} > (
+ SELECT max_ldts FROM max_ldts_prep
+ )
+ {%- endif %}
+),
+
+{#
+ In all incremental cases, the current status for each hashkey is selected from the existing Effectivity Satellite.
+#}
+{%- if is_incremental() %}
+current_status_prep AS (
+
+ SELECT
+ {{ tracked_hashkey }},
+ {{ is_active_alias}},
+ ROW_NUMBER() OVER (PARTITION BY {{ tracked_hashkey }} ORDER BY {{ src_ldts }} DESC) as rn
+ FROM {{ this }}
+
+),
+
+current_status AS (
+
+ SELECT
+ {{ tracked_hashkey }},
+ {{ is_active_alias }}
+ FROM current_status_prep
+ WHERE rn = 1
+
+),
+{% endif %}
+
+{#
+ This block is for multi-batch processing.
+#}
+{% if not source_is_single_batch %}
+
+ {#
+ List of all Hashkeys with their date of first appearance in the source model.
+ #}
+ hashkeys AS (
+
+ SELECT
+ {{ tracked_hashkey }},
+ MIN({{ src_ldts }}) as first_appearance
+ FROM source_data
+ GROUP BY {{ tracked_hashkey }}
+
+ ),
+
+ {#
+ Distinct list of load dates in the multi-batch source.
+ #}
+ load_dates AS (
+
+ SELECT Distinct
+ {{ src_ldts }}
+ FROM source_data
+
+ ),
+
+ {#
+ All combinations of hashkeys and loaddates, for loaddates after the first appearance of a hashkey.
+ #}
+ history AS (
+
+ SELECT
+ hk.{{ tracked_hashkey }},
+ ld.{{ src_ldts }}
+ FROM hashkeys hk
+ CROSS JOIN load_dates ld
+ WHERE ld.{{ src_ldts }} >= hk.first_appearance
+
+ ),
+
+ {#
+ All theoretical combinations are checked against the actual occurences of hashkeys in each batch / loaddate.
+ If a Hashkey is part of a load/batch, is_active_alias is set to 1, because the hashkey was active in that load/batch.
+ If a Hashkey is not part of a load/batch, is_active_alias is set to 0, because the hashkey was not active in that load/batch.
+ #}
+ is_active AS (
+
+ SELECT
+ h.{{ tracked_hashkey }},
+ h.{{ src_ldts }},
+ CASE
+ WHEN src.{{ tracked_hashkey }} IS NULL THEN 0
+ ELSE 1
+ END as {{ is_active_alias }}
+ FROM history h
+ LEFT JOIN source_data src
+ ON src.{{ tracked_hashkey }} = h.{{ tracked_hashkey }}
+ AND src.{{ src_ldts }} = h.{{ src_ldts }}
+
+ ),
+
+ {#
+ The rows are deduplicated on the is_active_alias, to only include status changes.
+ Additionally, a ROW_NUMBER() is calculated in incremental runs, to use it in the next step for comparison against the current status.
+ #}
+ deduplicated_incoming_prep AS (
+
+ SELECT
+ is_active.{{ tracked_hashkey }},
+ is_active.{{ src_ldts }},
+ is_active.{{ is_active_alias }},
+ LAG(is_active.{{ is_active_alias }}) OVER (PARTITION BY {{ tracked_hashkey }} ORDER BY {{ src_ldts }}) as lag_is_active
+
+ FROM is_active
+
+ ),
+
+ deduplicated_incoming AS (
+
+ SELECT
+ deduplicated_incoming_prep.{{ tracked_hashkey }},
+ deduplicated_incoming_prep.{{ src_ldts }},
+ deduplicated_incoming_prep.{{ is_active_alias }}
+
+ FROM
+ deduplicated_incoming_prep
+ WHERE
+ deduplicated_incoming_prep.{{ is_active_alias }} != deduplicated_incoming_prep.lag_is_active
+ OR deduplicated_incoming_prep.lag_is_active IS NULL
+
+ ),
+
+ {% set ns.last_cte = 'deduplicated_incoming' %}
+
+{#
+ This block is for single-batch processing
+#}
+{% else %}
+
+ {#
+ In initial loads of single-batch eff sats, every hashkey of the source is set to active.
+ #}
+ new_hashkeys AS (
+
+ SELECT DISTINCT
+ src.{{ tracked_hashkey }},
+ src.{{ src_ldts }},
+ 1 as {{ is_active_alias }}
+ FROM source_data src
+
+ {#
+ For incremental runs of single-batch eff sats, only hashkeys that are not active right now are set to active.
+ This automatically includes totally new hashkeys, or hashkeys that are currently set to inactive.
+ #}
+ {% if is_incremental() %}
+ LEFT JOIN current_status cs
+ ON src.{{ tracked_hashkey }} = cs.{{ tracked_hashkey }}
+ AND cs.{{ is_active_alias }} = 1
+ WHERE cs.{{ tracked_hashkey }} IS NULL
+ {% endif %}
+
+ ),
+
+ {% set ns.last_cte = 'new_hashkeys' %}
+
+{% endif %}
+
+{#
+ In all incremental runs, the source needs to be scanned for all currently active hashkeys.
+ If they are no longer present, they will be deactived.
+#}
+{%- if is_incremental() %}
+
+ {%- if not source_is_single_batch %}
+ disappeared_hashkeys AS (
+
+ SELECT DISTINCT
+ cs.{{ tracked_hashkey }},
+ ldts.min_ldts as {{ src_ldts }},
+ 0 as {{ is_active_alias }}
+ FROM current_status cs
+ LEFT JOIN (
+ SELECT
+ MIN({{ src_ldts }}) as min_ldts
+ FROM deduplicated_incoming) ldts
+ ON 1 = 1
+ LEFT JOIN deduplicated_incoming src
+ ON src.{{ tracked_hashkey }} = cs.{{ tracked_hashkey }}
+ AND src.{{ src_ldts }} = ldts.min_ldts
+ WHERE
+ cs.{{ is_active_alias }} = 1
+ AND src.{{ tracked_hashkey }} IS NULL
+ AND ldts.min_ldts IS NOT NULL
+
+ ),
+ {% else %}
+ disappeared_hashkeys AS (
+
+ SELECT DISTINCT
+ cs.{{ tracked_hashkey }},
+ ldts.min_ldts as {{ src_ldts }},
+ 0 as {{ is_active_alias }}
+ FROM current_status cs
+ LEFT JOIN (
+ SELECT
+ MIN({{ src_ldts }}) as min_ldts
+ FROM source_data) ldts
+ ON 1 = 1
+ WHERE NOT EXISTS (
+ SELECT
+ 1
+ FROM source_data src
+ WHERE src.{{ tracked_hashkey }} = cs.{{ tracked_hashkey }}
+ )
+ AND cs.{{ is_active_alias }} = 1
+ AND ldts.min_ldts IS NOT NULL
+
+ ),
+ {% endif %}
+{%- endif %}
+
+records_to_insert AS (
+
+ {#
+ This first part of the UNION includes:
+ - for single-batch loads: Only is_active_alias = 1, deactivations are handled later
+ - for multi-batch loads: Ativation and deactivation inside the multiple loads
+ #}
+ SELECT
+ di.{{ tracked_hashkey }},
+ di.{{ src_ldts }},
+ di.{{ is_active_alias }}
+ FROM {{ ns.last_cte }} di
+
+
+ {%- if is_incremental() %}
+
+ {#
+ For incremental multi-batch loads, the earliest to-be inserted status is compared to the current status.
+ It will only be inserted if the status changed. We use the ROW_NUMBER()
+ #}
+ {%- if not source_is_single_batch %}
+ WHERE NOT EXISTS (
+ SELECT 1
+ FROM current_status
+ WHERE {{ datavault4dbt.multikey(tracked_hashkey, prefix=['current_status', 'di'], condition='=') }}
+ AND {{ datavault4dbt.multikey(is_active_alias, prefix=['current_status', 'di'], condition='=') }}
+ AND di.{{ src_ldts }} = (SELECT MIN({{ src_ldts }}) FROM deduplicated_incoming)
+ )
+ AND di.{{ src_ldts }} > (SELECT MAX({{ src_ldts }}) FROM {{ this }})
+ {% endif %}
+
+ {#
+ For all incremental loads, the disappeared hashkeys are UNIONed.
+ #}
+ UNION
+
+ SELECT
+ {{ tracked_hashkey }},
+ {{ src_ldts }},
+ {{ is_active_alias }}
+ FROM disappeared_hashkeys
+
+ {%- endif %}
+
+)
+
+SELECT *
+FROM records_to_insert ri
+
+{% if is_incremental() %}
+WHERE NOT EXISTS (
+ SELECT 1
+ FROM {{ this }} t
+ WHERE t.{{ tracked_hashkey }} = ri.{{ tracked_hashkey }}
+ AND t.{{ src_ldts }} = ri.{{ src_ldts }}
+)
+{% endif %}
+
+{%- endmacro -%}
\ No newline at end of file
diff --git a/macros/tables/redshift/eff_sat_v0.sql b/macros/tables/redshift/eff_sat_v0.sql
new file mode 100644
index 00000000..3483db4a
--- /dev/null
+++ b/macros/tables/redshift/eff_sat_v0.sql
@@ -0,0 +1,315 @@
+{%- macro redshift__eff_sat_v0(source_model, tracked_hashkey, src_ldts, src_rsrc, is_active_alias, source_is_single_batch, disable_hwm) -%}
+
+{%- set end_of_all_times = datavault4dbt.end_of_all_times() -%}
+{%- set timestamp_format = datavault4dbt.timestamp_format() -%}
+
+{%- set ns = namespace(last_cte= "") -%}
+
+{%- set source_relation = ref(source_model) -%}
+
+{%- set tracked_hashkey = datavault4dbt.escape_column_names(tracked_hashkey) -%}
+{%- set is_active_alias = datavault4dbt.escape_column_names(is_active_alias) -%}
+{%- set src_ldts = datavault4dbt.escape_column_names(src_ldts) -%}
+{%- set src_rsrc = datavault4dbt.escape_column_names(src_rsrc) -%}
+
+{{ log('columns to select: '~final_columns_to_select, false) }}
+
+{{ datavault4dbt.prepend_generated_by() }}
+
+WITH
+
+{#
+ In all cases, the source model is selected, and optionally a HWM is applied.
+#}
+{% if is_incremental() and not disable_hwm %}
+max_ldts_prep AS (
+
+ SELECT
+ MAX({{ src_ldts }}) AS max_ldts
+ FROM {{ this }}
+ WHERE {{ src_ldts }} != {{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }}
+),
+{% endif %}
+
+source_data AS (
+
+ SELECT
+ {{ tracked_hashkey }},
+ {{ src_ldts }}
+ FROM {{ source_relation }} src
+ WHERE {{ src_ldts }} NOT IN ('{{ datavault4dbt.beginning_of_all_times() }}', '{{ datavault4dbt.end_of_all_times() }}')
+ {%- if is_incremental() and not disable_hwm %}
+ AND src.{{ src_ldts }} > (
+ SELECT max_ldts FROM max_ldts_prep
+ )
+ {%- endif %}
+),
+
+{#
+ In all incremental cases, the current status for each hashkey is selected from the existing Effectivity Satellite.
+#}
+{%- if is_incremental() %}
+current_status_prep AS (
+
+ SELECT
+ {{ tracked_hashkey }},
+ {{ is_active_alias}},
+ ROW_NUMBER() OVER (PARTITION BY {{ tracked_hashkey }} ORDER BY {{ src_ldts }} DESC) as rn
+ FROM {{ this }}
+
+),
+
+current_status AS (
+
+ SELECT
+ {{ tracked_hashkey }},
+ {{ is_active_alias }}
+ FROM current_status_prep
+ WHERE rn = 1
+
+),
+{% endif %}
+
+{#
+ This block is for multi-batch processing.
+#}
+{% if not source_is_single_batch %}
+
+ {#
+ List of all Hashkeys with their date of first appearance in the source model.
+ #}
+ hashkeys AS (
+
+ SELECT
+ {{ tracked_hashkey }},
+ MIN({{ src_ldts }}) as first_appearance
+ FROM source_data
+ GROUP BY {{ tracked_hashkey }}
+
+ ),
+
+ {#
+ Distinct list of load dates in the multi-batch source.
+ #}
+ load_dates AS (
+
+ SELECT Distinct
+ {{ src_ldts }}
+ FROM source_data
+
+ ),
+
+ {#
+ All combinations of hashkeys and loaddates, for loaddates after the first appearance of a hashkey.
+ #}
+ history AS (
+
+ SELECT
+ hk.{{ tracked_hashkey }},
+ ld.{{ src_ldts }}
+ FROM hashkeys hk
+ CROSS JOIN load_dates ld
+ WHERE ld.{{ src_ldts }} >= hk.first_appearance
+
+ ),
+
+ {#
+ All theoretical combinations are checked against the actual occurences of hashkeys in each batch / loaddate.
+ If a Hashkey is part of a load/batch, is_active_alias is set to 1, because the hashkey was active in that load/batch.
+ If a Hashkey is not part of a load/batch, is_active_alias is set to 0, because the hashkey was not active in that load/batch.
+ #}
+ is_active AS (
+
+ SELECT
+ h.{{ tracked_hashkey }},
+ h.{{ src_ldts }},
+ CASE
+ WHEN src.{{ tracked_hashkey }} IS NULL THEN 0
+ ELSE 1
+ END as {{ is_active_alias }}
+ FROM history h
+ LEFT JOIN source_data src
+ ON src.{{ tracked_hashkey }} = h.{{ tracked_hashkey }}
+ AND src.{{ src_ldts }} = h.{{ src_ldts }}
+
+ ),
+
+ {#
+ The rows are deduplicated on the is_active_alias, to only include status changes.
+ Additionally, a ROW_NUMBER() is calculated in incremental runs, to use it in the next step for comparison against the current status.
+ #}
+ deduplicated_incoming_prep AS (
+
+ SELECT
+ is_active.{{ tracked_hashkey }},
+ is_active.{{ src_ldts }},
+ is_active.{{ is_active_alias }},
+ LAG(is_active.{{ is_active_alias }}) OVER (PARTITION BY {{ tracked_hashkey }} ORDER BY {{ src_ldts }}) as lag_is_active
+
+ FROM is_active
+
+ ),
+
+ deduplicated_incoming AS (
+
+ SELECT
+ deduplicated_incoming_prep.{{ tracked_hashkey }},
+ deduplicated_incoming_prep.{{ src_ldts }},
+ deduplicated_incoming_prep.{{ is_active_alias }}
+
+ FROM
+ deduplicated_incoming_prep
+ WHERE
+ deduplicated_incoming_prep.{{ is_active_alias }} != deduplicated_incoming_prep.lag_is_active
+ OR deduplicated_incoming_prep.lag_is_active IS NULL
+
+ ),
+
+ {% set ns.last_cte = 'deduplicated_incoming' %}
+
+{#
+ This block is for single-batch processing
+#}
+{% else %}
+
+ {#
+ In initial loads of single-batch eff sats, every hashkey of the source is set to active.
+ #}
+ new_hashkeys AS (
+
+ SELECT DISTINCT
+ src.{{ tracked_hashkey }},
+ src.{{ src_ldts }},
+ 1 as {{ is_active_alias }}
+ FROM source_data src
+
+ {#
+ For incremental runs of single-batch eff sats, only hashkeys that are not active right now are set to active.
+ This automatically includes totally new hashkeys, or hashkeys that are currently set to inactive.
+ #}
+ {% if is_incremental() %}
+ LEFT JOIN current_status cs
+ ON src.{{ tracked_hashkey }} = cs.{{ tracked_hashkey }}
+ AND cs.{{ is_active_alias }} = 1
+ WHERE cs.{{ tracked_hashkey }} IS NULL
+ {% endif %}
+
+ ),
+
+ {% set ns.last_cte = 'new_hashkeys' %}
+
+{% endif %}
+
+{#
+ In all incremental runs, the source needs to be scanned for all currently active hashkeys.
+ If they are no longer present, they will be deactived.
+#}
+{%- if is_incremental() %}
+
+ {%- if not source_is_single_batch %}
+ disappeared_hashkeys AS (
+
+ SELECT DISTINCT
+ cs.{{ tracked_hashkey }},
+ ldts.min_ldts as {{ src_ldts }},
+ 0 as {{ is_active_alias }}
+ FROM current_status cs
+ LEFT JOIN (
+ SELECT
+ MIN({{ src_ldts }}) as min_ldts
+ FROM deduplicated_incoming) ldts
+ ON 1 = 1
+ LEFT JOIN deduplicated_incoming src
+ ON src.{{ tracked_hashkey }} = cs.{{ tracked_hashkey }}
+ AND src.{{ src_ldts }} = ldts.min_ldts
+ WHERE
+ cs.{{ is_active_alias }} = 1
+ AND src.{{ tracked_hashkey }} IS NULL
+ AND ldts.min_ldts IS NOT NULL
+
+ ),
+ {% else %}
+ disappeared_hashkeys AS (
+
+ SELECT DISTINCT
+ cs.{{ tracked_hashkey }},
+ ldts.min_ldts as {{ src_ldts }},
+ 0 as {{ is_active_alias }}
+ FROM current_status cs
+ LEFT JOIN (
+ SELECT
+ MIN({{ src_ldts }}) as min_ldts
+ FROM source_data) ldts
+ ON 1 = 1
+ WHERE NOT EXISTS (
+ SELECT
+ 1
+ FROM source_data src
+ WHERE src.{{ tracked_hashkey }} = cs.{{ tracked_hashkey }}
+ )
+ AND cs.{{ is_active_alias }} = 1
+ AND ldts.min_ldts IS NOT NULL
+
+ ),
+ {% endif %}
+{%- endif %}
+
+records_to_insert AS (
+
+ {#
+ This first part of the UNION includes:
+ - for single-batch loads: Only is_active_alias = 1, deactivations are handled later
+ - for multi-batch loads: Ativation and deactivation inside the multiple loads
+ #}
+ SELECT
+ di.{{ tracked_hashkey }},
+ di.{{ src_ldts }},
+ di.{{ is_active_alias }}
+ FROM {{ ns.last_cte }} di
+
+
+ {%- if is_incremental() %}
+
+ {#
+ For incremental multi-batch loads, the earliest to-be inserted status is compared to the current status.
+ It will only be inserted if the status changed. We use the ROW_NUMBER()
+ #}
+ {%- if not source_is_single_batch %}
+ WHERE NOT EXISTS (
+ SELECT 1
+ FROM current_status
+ WHERE {{ datavault4dbt.multikey(tracked_hashkey, prefix=['current_status', 'di'], condition='=') }}
+ AND {{ datavault4dbt.multikey(is_active_alias, prefix=['current_status', 'di'], condition='=') }}
+ AND di.{{ src_ldts }} = (SELECT MIN({{ src_ldts }}) FROM deduplicated_incoming)
+ )
+ AND di.{{ src_ldts }} > (SELECT MAX({{ src_ldts }}) FROM {{ this }})
+ {% endif %}
+
+ {#
+ For all incremental loads, the disappeared hashkeys are UNIONed.
+ #}
+ UNION
+
+ SELECT
+ {{ tracked_hashkey }},
+ {{ src_ldts }},
+ {{ is_active_alias }}
+ FROM disappeared_hashkeys
+
+ {%- endif %}
+
+)
+
+SELECT *
+FROM records_to_insert ri
+
+{% if is_incremental() %}
+WHERE NOT EXISTS (
+ SELECT 1
+ FROM {{ this }} t
+ WHERE t.{{ tracked_hashkey }} = ri.{{ tracked_hashkey }}
+ AND t.{{ src_ldts }} = ri.{{ src_ldts }}
+)
+{% endif %}
+
+{%- endmacro -%}
\ No newline at end of file
diff --git a/macros/tables/snowflake/eff_sat_v0.sql b/macros/tables/snowflake/eff_sat_v0.sql
new file mode 100644
index 00000000..c2b7a75e
--- /dev/null
+++ b/macros/tables/snowflake/eff_sat_v0.sql
@@ -0,0 +1,292 @@
+{%- macro snowflake__eff_sat_v0(source_model, tracked_hashkey, src_ldts, src_rsrc, is_active_alias, source_is_single_batch, disable_hwm) -%}
+
+{%- set end_of_all_times = datavault4dbt.end_of_all_times() -%}
+{%- set timestamp_format = datavault4dbt.timestamp_format() -%}
+
+{%- set ns = namespace(last_cte= "") -%}
+
+{%- set source_relation = ref(source_model) -%}
+
+{%- set tracked_hashkey = datavault4dbt.escape_column_names(tracked_hashkey) -%}
+{%- set is_active_alias = datavault4dbt.escape_column_names(is_active_alias) -%}
+{%- set src_ldts = datavault4dbt.escape_column_names(src_ldts) -%}
+{%- set src_rsrc = datavault4dbt.escape_column_names(src_rsrc) -%}
+
+{{ log('columns to select: '~final_columns_to_select, false) }}
+
+{{ datavault4dbt.prepend_generated_by() }}
+
+WITH
+
+{#
+ In all cases, the source model is selected, and optionally a HWM is applied.
+#}
+source_data AS (
+
+ SELECT
+ {{ tracked_hashkey }},
+ {{ src_ldts }}
+ FROM {{ source_relation }} src
+ WHERE {{ src_ldts }} NOT IN ('{{ datavault4dbt.beginning_of_all_times() }}', '{{ datavault4dbt.end_of_all_times() }}')
+ {%- if is_incremental() and not disable_hwm %}
+ AND src.{{ src_ldts }} > (
+ SELECT
+ MAX({{ src_ldts }})
+ FROM {{ this }}
+ WHERE {{ src_ldts }} != {{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }}
+ )
+ {%- endif %}
+),
+
+{#
+ In all incremental cases, the current status for each hashkey is selected from the existing Effectivity Satellite.
+#}
+{%- if is_incremental() %}
+current_status AS (
+
+ SELECT
+ {{ tracked_hashkey }},
+ {{ is_active_alias }}
+ FROM {{ this }}
+ QUALIFY
+ ROW_NUMBER() OVER(PARTITION BY {{ tracked_hashkey }} ORDER BY {{ src_ldts }} DESC) = 1
+
+),
+{% endif %}
+
+{#
+ This block is for multi-batch processing.
+#}
+{% if not source_is_single_batch %}
+
+ {#
+ List of all Hashkeys with their date of first appearance in the source model.
+ #}
+ hashkeys AS (
+
+ SELECT
+ {{ tracked_hashkey }},
+ MIN({{ src_ldts }}) as first_appearance
+ FROM source_data
+ GROUP BY {{ tracked_hashkey }}
+
+ ),
+
+ {#
+ Distinct list of load dates in the multi-batch source.
+ #}
+ load_dates AS (
+
+ SELECT Distinct
+ {{ src_ldts }}
+ FROM source_data
+
+ ),
+
+ {#
+ All combinations of hashkeys and loaddates, for loaddates after the first appearance of a hashkey.
+ #}
+ history AS (
+
+ SELECT
+ hk.{{ tracked_hashkey }},
+ ld.{{ src_ldts }}
+ FROM hashkeys hk
+ CROSS JOIN load_dates ld
+ WHERE ld.{{ src_ldts }} >= hk.first_appearance
+
+ ),
+
+ {#
+ All theoretical combinations are checked against the actual occurences of hashkeys in each batch / loaddate.
+ If a Hashkey is part of a load/batch, is_active_alias is set to 1, because the hashkey was active in that load/batch.
+ If a Hashkey is not part of a load/batch, is_active_alias is set to 0, because the hashkey was not active in that load/batch.
+ #}
+ is_active AS (
+
+ SELECT
+ h.{{ tracked_hashkey }},
+ h.{{ src_ldts }},
+ CASE
+ WHEN src.{{ tracked_hashkey }} IS NULL THEN 0
+ ELSE 1
+ END as {{ is_active_alias }}
+ FROM history h
+ LEFT JOIN source_data src
+ ON src.{{ tracked_hashkey }} = h.{{ tracked_hashkey }}
+ AND src.{{ src_ldts }} = h.{{ src_ldts }}
+
+ ),
+
+ {#
+ The rows are deduplicated on the is_active_alias, to only include status changes.
+ Additionally, a ROW_NUMBER() is calculated in incremental runs, to use it in the next step for comparison against the current status.
+ #}
+ deduplicated_incoming AS (
+
+ SELECT
+ is_active.{{ tracked_hashkey }},
+ is_active.{{ src_ldts }},
+ is_active.{{ is_active_alias }}
+
+ {% if is_incremental() -%}
+ , ROW_NUMBER() OVER(PARTITION BY is_active.{{ tracked_hashkey }} ORDER BY is_active.{{ src_ldts }}) as rn
+ {%- endif %}
+
+ FROM is_active
+ QUALIFY
+ CASE
+ WHEN is_active.{{ is_active_alias }} = LAG(is_active.{{ is_active_alias }}) OVER (PARTITION BY {{ tracked_hashkey }} ORDER BY {{ src_ldts }}) THEN FALSE
+ ELSE TRUE
+ END
+
+ ),
+
+ {% set ns.last_cte = 'deduplicated_incoming' %}
+
+{#
+ This block is for single-batch processing
+#}
+{% else %}
+
+ {#
+ In initial loads of single-batch eff sats, every hashkey of the source is set to active.
+ #}
+ new_hashkeys AS (
+
+ SELECT DISTINCT
+ src.{{ tracked_hashkey }},
+ src.{{ src_ldts }},
+ 1 as {{ is_active_alias }}
+ FROM source_data src
+
+ {#
+ For incremental runs of single-batch eff sats, only hashkeys that are not active right now are set to active.
+ This automatically includes totally new hashkeys, or hashkeys that are currently set to inactive.
+ #}
+ {% if is_incremental() %}
+ LEFT JOIN current_status cs
+ ON src.{{ tracked_hashkey }} = cs.{{ tracked_hashkey }}
+ AND cs.{{ is_active_alias }} = 1
+ WHERE cs.{{ tracked_hashkey }} IS NULL
+ {% endif %}
+
+ ),
+
+ {% set ns.last_cte = 'new_hashkeys' %}
+
+{% endif %}
+
+{#
+ In all incremental runs, the source needs to be scanned for all currently active hashkeys.
+ If they are no longer present, they will be deactived.
+#}
+{%- if is_incremental() %}
+
+ {%- if not source_is_single_batch %}
+ disappeared_hashkeys AS (
+
+ SELECT DISTINCT
+ cs.{{ tracked_hashkey }},
+ ldts.min_ldts as {{ src_ldts }},
+ 0 as {{ is_active_alias }}
+ FROM current_status cs
+ LEFT JOIN (
+ SELECT
+ MIN({{ src_ldts }}) as min_ldts
+ FROM deduplicated_incoming) ldts
+ ON 1 = 1
+ LEFT JOIN deduplicated_incoming src
+ ON src.{{ tracked_hashkey }} = cs.{{ tracked_hashkey }}
+ AND src.{{ src_ldts }} = ldts.min_ldts
+ WHERE
+ cs.{{ is_active_alias }} = 1
+ AND src.{{ tracked_hashkey }} IS NULL
+ AND ldts.min_ldts IS NOT NULL
+
+ ),
+ {% else %}
+ disappeared_hashkeys AS (
+
+ SELECT DISTINCT
+ cs.{{ tracked_hashkey }},
+ ldts.min_ldts as {{ src_ldts }},
+ 0 as {{ is_active_alias }}
+ FROM current_status cs
+ LEFT JOIN (
+ SELECT
+ MIN({{ src_ldts }}) as min_ldts
+ FROM source_data) ldts
+ ON 1 = 1
+ WHERE NOT EXISTS (
+ SELECT
+ 1
+ FROM source_data src
+ WHERE src.{{ tracked_hashkey }} = cs.{{ tracked_hashkey }}
+ )
+ AND cs.{{ is_active_alias }} = 1
+ AND ldts.min_ldts IS NOT NULL
+
+ ),
+ {% endif %}
+{%- endif %}
+
+records_to_insert AS (
+
+ {#
+ This first part of the UNION includes:
+ - for single-batch loads: Only is_active_alias = 1, deactivations are handled later
+ - for multi-batch loads: Ativation and deactivation inside the multiple loads
+ #}
+ SELECT
+ di.{{ tracked_hashkey }},
+ di.{{ src_ldts }},
+ di.{{ is_active_alias }}
+ FROM {{ ns.last_cte }} di
+
+
+ {%- if is_incremental() %}
+
+ {#
+ For incremental multi-batch loads, the earliest to-be inserted status is compared to the current status.
+ It will only be inserted if the status changed. We use the ROW_NUMBER()
+ #}
+ {%- if not source_is_single_batch %}
+ WHERE NOT EXISTS (
+ SELECT 1
+ FROM current_status
+ WHERE {{ datavault4dbt.multikey(tracked_hashkey, prefix=['current_status', 'di'], condition='=') }}
+ AND {{ datavault4dbt.multikey(is_active_alias, prefix=['current_status', 'di'], condition='=') }}
+ AND di.{{ src_ldts }} = (SELECT MIN({{ src_ldts }}) FROM deduplicated_incoming)
+ )
+ AND di.{{ src_ldts }} > (SELECT MAX({{ src_ldts }}) FROM {{ this }})
+ {% endif %}
+
+ {#
+ For all incremental loads, the disappeared hashkeys are UNIONed.
+ #}
+ UNION
+
+ SELECT
+ {{ tracked_hashkey }},
+ {{ src_ldts }},
+ {{ is_active_alias }}
+ FROM disappeared_hashkeys
+
+ {%- endif %}
+
+)
+
+SELECT *
+FROM records_to_insert ri
+
+{% if is_incremental() %}
+WHERE NOT EXISTS (
+ SELECT 1
+ FROM {{ this }} t
+ WHERE t.{{ tracked_hashkey }} = ri.{{ tracked_hashkey }}
+ AND t.{{ src_ldts }} = ri.{{ src_ldts }}
+)
+{% endif %}
+
+{%- endmacro -%}
\ No newline at end of file
diff --git a/macros/tables/synapse/eff_sat_v0.sql b/macros/tables/synapse/eff_sat_v0.sql
new file mode 100644
index 00000000..3bac2383
--- /dev/null
+++ b/macros/tables/synapse/eff_sat_v0.sql
@@ -0,0 +1,315 @@
+{%- macro synapse__eff_sat_v0(source_model, tracked_hashkey, src_ldts, src_rsrc, is_active_alias, source_is_single_batch, disable_hwm) -%}
+
+{%- set end_of_all_times = datavault4dbt.end_of_all_times() -%}
+{%- set timestamp_format = datavault4dbt.timestamp_format() -%}
+
+{%- set ns = namespace(last_cte= "") -%}
+
+{%- set source_relation = ref(source_model) -%}
+
+{%- set tracked_hashkey = datavault4dbt.escape_column_names(tracked_hashkey) -%}
+{%- set is_active_alias = datavault4dbt.escape_column_names(is_active_alias) -%}
+{%- set src_ldts = datavault4dbt.escape_column_names(src_ldts) -%}
+{%- set src_rsrc = datavault4dbt.escape_column_names(src_rsrc) -%}
+
+{{ log('columns to select: '~final_columns_to_select, false) }}
+
+{{ datavault4dbt.prepend_generated_by() }}
+
+WITH
+
+{#
+ In all cases, the source model is selected, and optionally a HWM is applied.
+#}
+{% if is_incremental() and not disable_hwm %}
+max_ldts_prep AS (
+
+ SELECT
+ MAX({{ src_ldts }}) AS max_ldts
+ FROM {{ this }}
+ WHERE {{ src_ldts }} != {{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }}
+),
+{% endif %}
+
+source_data AS (
+
+ SELECT
+ {{ tracked_hashkey }},
+ {{ src_ldts }}
+ FROM {{ source_relation }} src
+ WHERE {{ src_ldts }} NOT IN ('{{ datavault4dbt.beginning_of_all_times() }}', '{{ datavault4dbt.end_of_all_times() }}')
+ {%- if is_incremental() and not disable_hwm %}
+ AND src.{{ src_ldts }} > (
+ SELECT max_ldts FROM max_ldts_prep
+ )
+ {%- endif %}
+),
+
+{#
+ In all incremental cases, the current status for each hashkey is selected from the existing Effectivity Satellite.
+#}
+{%- if is_incremental() %}
+current_status_prep AS (
+
+ SELECT
+ {{ tracked_hashkey }},
+ {{ is_active_alias}},
+ ROW_NUMBER() OVER (PARTITION BY {{ tracked_hashkey }} ORDER BY {{ src_ldts }} DESC) as rn
+ FROM {{ this }}
+
+),
+
+current_status AS (
+
+ SELECT
+ {{ tracked_hashkey }},
+ {{ is_active_alias }}
+ FROM current_status_prep
+ WHERE rn = 1
+
+),
+{% endif %}
+
+{#
+ This block is for multi-batch processing.
+#}
+{% if not source_is_single_batch %}
+
+ {#
+ List of all Hashkeys with their date of first appearance in the source model.
+ #}
+ hashkeys AS (
+
+ SELECT
+ {{ tracked_hashkey }},
+ MIN({{ src_ldts }}) as first_appearance
+ FROM source_data
+ GROUP BY {{ tracked_hashkey }}
+
+ ),
+
+ {#
+ Distinct list of load dates in the multi-batch source.
+ #}
+ load_dates AS (
+
+ SELECT Distinct
+ {{ src_ldts }}
+ FROM source_data
+
+ ),
+
+ {#
+ All combinations of hashkeys and loaddates, for loaddates after the first appearance of a hashkey.
+ #}
+ history AS (
+
+ SELECT
+ hk.{{ tracked_hashkey }},
+ ld.{{ src_ldts }}
+ FROM hashkeys hk
+ CROSS JOIN load_dates ld
+ WHERE ld.{{ src_ldts }} >= hk.first_appearance
+
+ ),
+
+ {#
+ All theoretical combinations are checked against the actual occurences of hashkeys in each batch / loaddate.
+ If a Hashkey is part of a load/batch, is_active_alias is set to 1, because the hashkey was active in that load/batch.
+ If a Hashkey is not part of a load/batch, is_active_alias is set to 0, because the hashkey was not active in that load/batch.
+ #}
+ is_active AS (
+
+ SELECT
+ h.{{ tracked_hashkey }},
+ h.{{ src_ldts }},
+ CASE
+ WHEN src.{{ tracked_hashkey }} IS NULL THEN 0
+ ELSE 1
+ END as {{ is_active_alias }}
+ FROM history h
+ LEFT JOIN source_data src
+ ON src.{{ tracked_hashkey }} = h.{{ tracked_hashkey }}
+ AND src.{{ src_ldts }} = h.{{ src_ldts }}
+
+ ),
+
+ {#
+ The rows are deduplicated on the is_active_alias, to only include status changes.
+ Additionally, a ROW_NUMBER() is calculated in incremental runs, to use it in the next step for comparison against the current status.
+ #}
+ deduplicated_incoming_prep AS (
+
+ SELECT
+ is_active.{{ tracked_hashkey }},
+ is_active.{{ src_ldts }},
+ is_active.{{ is_active_alias }},
+ LAG(is_active.{{ is_active_alias }}) OVER (PARTITION BY {{ tracked_hashkey }} ORDER BY {{ src_ldts }}) as lag_is_active
+
+ FROM is_active
+
+ ),
+
+ deduplicated_incoming AS (
+
+ SELECT
+ deduplicated_incoming_prep.{{ tracked_hashkey }},
+ deduplicated_incoming_prep.{{ src_ldts }},
+ deduplicated_incoming_prep.{{ is_active_alias }}
+
+ FROM
+ deduplicated_incoming_prep
+ WHERE
+ deduplicated_incoming_prep.{{ is_active_alias }} != deduplicated_incoming_prep.lag_is_active
+ OR deduplicated_incoming_prep.lag_is_active IS NULL
+
+ ),
+
+ {% set ns.last_cte = 'deduplicated_incoming' %}
+
+{#
+ This block is for single-batch processing
+#}
+{% else %}
+
+ {#
+ In initial loads of single-batch eff sats, every hashkey of the source is set to active.
+ #}
+ new_hashkeys AS (
+
+ SELECT DISTINCT
+ src.{{ tracked_hashkey }},
+ src.{{ src_ldts }},
+ 1 as {{ is_active_alias }}
+ FROM source_data src
+
+ {#
+ For incremental runs of single-batch eff sats, only hashkeys that are not active right now are set to active.
+ This automatically includes totally new hashkeys, or hashkeys that are currently set to inactive.
+ #}
+ {% if is_incremental() %}
+ LEFT JOIN current_status cs
+ ON src.{{ tracked_hashkey }} = cs.{{ tracked_hashkey }}
+ AND cs.{{ is_active_alias }} = 1
+ WHERE cs.{{ tracked_hashkey }} IS NULL
+ {% endif %}
+
+ ),
+
+ {% set ns.last_cte = 'new_hashkeys' %}
+
+{% endif %}
+
+{#
+ In all incremental runs, the source needs to be scanned for all currently active hashkeys.
+ If they are no longer present, they will be deactived.
+#}
+{%- if is_incremental() %}
+
+ {%- if not source_is_single_batch %}
+ disappeared_hashkeys AS (
+
+ SELECT DISTINCT
+ cs.{{ tracked_hashkey }},
+ ldts.min_ldts as {{ src_ldts }},
+ 0 as {{ is_active_alias }}
+ FROM current_status cs
+ LEFT JOIN (
+ SELECT
+ MIN({{ src_ldts }}) as min_ldts
+ FROM deduplicated_incoming) ldts
+ ON 1 = 1
+ LEFT JOIN deduplicated_incoming src
+ ON src.{{ tracked_hashkey }} = cs.{{ tracked_hashkey }}
+ AND src.{{ src_ldts }} = ldts.min_ldts
+ WHERE
+ cs.{{ is_active_alias }} = 1
+ AND src.{{ tracked_hashkey }} IS NULL
+ AND ldts.min_ldts IS NOT NULL
+
+ ),
+ {% else %}
+ disappeared_hashkeys AS (
+
+ SELECT DISTINCT
+ cs.{{ tracked_hashkey }},
+ ldts.min_ldts as {{ src_ldts }},
+ 0 as {{ is_active_alias }}
+ FROM current_status cs
+ LEFT JOIN (
+ SELECT
+ MIN({{ src_ldts }}) as min_ldts
+ FROM source_data) ldts
+ ON 1 = 1
+ WHERE NOT EXISTS (
+ SELECT
+ 1
+ FROM source_data src
+ WHERE src.{{ tracked_hashkey }} = cs.{{ tracked_hashkey }}
+ )
+ AND cs.{{ is_active_alias }} = 1
+ AND ldts.min_ldts IS NOT NULL
+
+ ),
+ {% endif %}
+{%- endif %}
+
+records_to_insert AS (
+
+ {#
+ This first part of the UNION includes:
+ - for single-batch loads: Only is_active_alias = 1, deactivations are handled later
+ - for multi-batch loads: Ativation and deactivation inside the multiple loads
+ #}
+ SELECT
+ di.{{ tracked_hashkey }},
+ di.{{ src_ldts }},
+ di.{{ is_active_alias }}
+ FROM {{ ns.last_cte }} di
+
+
+ {%- if is_incremental() %}
+
+ {#
+ For incremental multi-batch loads, the earliest to-be inserted status is compared to the current status.
+ It will only be inserted if the status changed. We use the ROW_NUMBER()
+ #}
+ {%- if not source_is_single_batch %}
+ WHERE NOT EXISTS (
+ SELECT 1
+ FROM current_status
+ WHERE {{ datavault4dbt.multikey(tracked_hashkey, prefix=['current_status', 'di'], condition='=') }}
+ AND {{ datavault4dbt.multikey(is_active_alias, prefix=['current_status', 'di'], condition='=') }}
+ AND di.{{ src_ldts }} = (SELECT MIN({{ src_ldts }}) FROM deduplicated_incoming)
+ )
+ AND di.{{ src_ldts }} > (SELECT MAX({{ src_ldts }}) FROM {{ this }})
+ {% endif %}
+
+ {#
+ For all incremental loads, the disappeared hashkeys are UNIONed.
+ #}
+ UNION
+
+ SELECT
+ {{ tracked_hashkey }},
+ {{ src_ldts }},
+ {{ is_active_alias }}
+ FROM disappeared_hashkeys
+
+ {%- endif %}
+
+)
+
+SELECT *
+FROM records_to_insert ri
+
+{% if is_incremental() %}
+WHERE NOT EXISTS (
+ SELECT 1
+ FROM {{ this }} t
+ WHERE t.{{ tracked_hashkey }} = ri.{{ tracked_hashkey }}
+ AND t.{{ src_ldts }} = ri.{{ src_ldts }}
+)
+{% endif %}
+
+{%- endmacro -%}
\ No newline at end of file