Skip to content

Commit 0557fbf

Browse files
committed
Merge branch '287-feature-extract-multiple-columns-from-one-prejoined-object' into test
2 parents c859311 + c8df4d3 commit 0557fbf

File tree

12 files changed

+1029
-727
lines changed

12 files changed

+1029
-727
lines changed

macros/staging/bigquery/stage.sql

Lines changed: 98 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@
9696
{# Getting the column names for all additional columns #}
9797
{%- set derived_column_names = datavault4dbt.extract_column_names(derived_columns) -%}
9898
{%- set hashed_column_names = datavault4dbt.extract_column_names(hashed_columns) -%}
99-
{%- set prejoined_column_names = datavault4dbt.extract_column_names(prejoined_columns) -%}
99+
{%- set prejoined_column_names = datavault4dbt.extract_prejoin_column_names(prejoined_columns) -%}
100100
{%- set missing_column_names = datavault4dbt.extract_column_names(missing_columns) -%}
101101
{%- set exclude_column_names = derived_column_names + hashed_column_names + prejoined_column_names + missing_column_names + ldts_rsrc_input_column_names %}
102102
{%- set source_and_derived_column_names = (all_source_columns + derived_column_names) | unique | list -%}
@@ -185,6 +185,8 @@
185185
{# Setting the ldts default datatype #}
186186
{% set ldts_default_dtype = datavault4dbt.timestamp_default_dtype() %}
187187

188+
{{ datavault4dbt.prepend_generated_by() }}
189+
188190
WITH
189191

190192
{# Selecting everything that we need from the source relation. #}
@@ -256,26 +258,51 @@ missing_columns AS (
256258
),
257259
{%- endif -%}
258260

261+
259262
{%- if datavault4dbt.is_something(prejoined_columns) %}
260263
{# Prejoining Business Keys of other source objects for Link purposes #}
261264
prejoined_columns AS (
262265

263266
SELECT
264267
{% if final_columns_to_select | length > 0 -%}
265268
{{ datavault4dbt.print_list(datavault4dbt.prefix(columns=datavault4dbt.escape_column_names(final_columns_to_select), prefix_str='lcte').split(',')) }}
266-
{% endif %}
267-
{%- for col, vals in prejoined_columns.items() -%}
268-
,pj_{{loop.index}}.{{ vals['bk'] }} AS {{ col }}
269-
{% endfor -%}
269+
{%- endif -%}
270+
271+
{# Iterate over each prejoin, doing logic checks and generating the select-statements #}
272+
{%- for prejoin in prejoined_columns -%}
273+
{%- set prejoin_alias = 'pj_' + loop.index|string -%}
274+
275+
{# If extract_columns and/or aliases are passed as string convert them to a list so they can be used as iterators later #}
276+
{%- if not datavault4dbt.is_list(prejoin['extract_columns'])-%}
277+
{%- do prejoin.update({'extract_columns': [prejoin['extract_columns']]}) -%}
278+
{%- endif -%}
279+
{%- if not datavault4dbt.is_list(prejoin['aliases']) and datavault4dbt.is_something(prejoin['aliases']) -%}
280+
{%- do prejoin.update({'aliases': [prejoin['aliases']]}) -%}
281+
{%- endif -%}
282+
283+
{# If passed, make sure there are as many aliases as there are extract_columns, ensuring a 1:1 mapping #}
284+
{%- if datavault4dbt.is_something(prejoin['aliases']) -%}
285+
{%- if not prejoin['aliases']|length == prejoin['extract_columns']|length -%}
286+
{%- do exceptions.raise_compiler_error("Prejoin aliases must have the same length as extract_columns. Got "
287+
~ prejoin['extract_columns']|length ~ " extract_column(s) and " ~ prejoin['aliases']|length ~ " aliase(s).") -%}
288+
{%- endif -%}
289+
{%- endif -%}
290+
291+
{# Generate the columns for the SELECT-statement #}
292+
{%- for column in prejoin['extract_columns'] %}
293+
,{{ prejoin_alias }}.{{ column }} {% if datavault4dbt.is_something(prejoin['aliases']) -%} AS {{ prejoin['aliases'][loop.index0] }} {% endif -%}
294+
{%- endfor -%}
295+
{%- endfor %}
270296

271297
FROM {{ last_cte }} lcte
272298

273-
{% for col, vals in prejoined_columns.items() %}
299+
{# Iterate over prejoins and generate the join-statements #}
300+
{%- for prejoin in prejoined_columns -%}
274301

275-
{%- if 'src_name' in vals.keys() or 'src_table' in vals.keys() -%}
276-
{%- set relation = source(vals['src_name']|string, vals['src_table']) -%}
277-
{%- elif 'ref_model' in vals.keys() -%}
278-
{%- set relation = ref(vals['ref_model']) -%}
302+
{%- if 'ref_model' in prejoin.keys() -%}
303+
{% set relation = ref(prejoin['ref_model']) -%}
304+
{%- elif 'src_name' in prejoin.keys() and 'src_table' in prejoin.keys() -%}
305+
{%- set relation = source(prejoin['src_name']|string, prejoin['src_table']) -%}
279306
{%- else -%}
280307
{%- set error_message -%}
281308
Prejoin error: Invalid target entity definition. Allowed are:
@@ -296,28 +323,25 @@ prejoined_columns AS (
296323
ref_column_name: join_columns_in_ref_model
297324

298325
Got:
299-
{{ col }}: {{ vals }}
326+
{{ prejoin }}
300327
{%- endset -%}
301328

302329
{%- do exceptions.raise_compiler_error(error_message) -%}
303330
{%- endif -%}
304331

305-
{# This sets a default value for the operator that connects multiple joining conditions. Only when it is not set by user. #}
306-
{%- if 'operator' not in vals.keys() -%}
332+
{%- if 'operator' not in prejoin.keys() -%}
307333
{%- set operator = 'AND' -%}
308334
{%- else -%}
309-
{%- set operator = vals['operator'] -%}
335+
{%- set operator = prejoin['operator'] -%}
310336
{%- endif -%}
311-
312-
{%- set prejoin_alias = 'pj_' + loop.index|string -%}
313-
314-
left join {{ relation }} as {{ prejoin_alias }}
315-
on {{ datavault4dbt.multikey(columns=vals['this_column_name'], prefix=['lcte', prejoin_alias], condition='=', operator=operator, right_columns=vals['ref_column_name']) }}
316-
317-
{% endfor %}
337+
{%- set prejoin_alias = 'pj_' + loop.index|string %}
338+
339+
left join {{ relation }} as {{ prejoin_alias }}
340+
on {{ datavault4dbt.multikey(columns=prejoin['this_column_name'], prefix=['lcte', prejoin_alias], condition='=', operator=operator, right_columns=prejoin['ref_column_name']) }}
341+
{%- endfor -%}
318342

319343
{% set last_cte = "prejoined_columns" -%}
320-
{%- set final_columns_to_select = final_columns_to_select + prejoined_column_names %}
344+
{%- set final_columns_to_select = final_columns_to_select + prejoined_column_names -%}
321345
),
322346
{%- endif -%}
323347

@@ -444,65 +468,61 @@ unknown_values AS (
444468

445469
SELECT
446470

447-
{{ datavault4dbt.string_to_timestamp(timestamp_format, beginning_of_all_times) }} as {{ load_datetime_col_name }},
448-
'{{ unknown_value_rsrc }}' as {{ record_source_col_name }}
471+
{{ datavault4dbt.string_to_timestamp(timestamp_format, beginning_of_all_times) }} as {{ load_datetime_col_name }}
472+
,'{{ unknown_value_rsrc }}' as {{ record_source_col_name }}
449473

450-
{%- if columns_without_excluded_columns is defined and columns_without_excluded_columns| length > 0 -%},
474+
{%- if columns_without_excluded_columns is defined and columns_without_excluded_columns| length > 0 -%}
451475
{# Generating Ghost Records for all source columns, except the ldts, rsrc & edwSequence column #}
452476
{%- for column in columns_without_excluded_columns %}
453-
{{ datavault4dbt.ghost_record_per_datatype(column_name=column.name, datatype=column.dtype, ghost_record_type='unknown') }}
454-
{%- if not loop.last %},{% endif -%}
477+
,{{ datavault4dbt.ghost_record_per_datatype(column_name=column.name, datatype=column.dtype, ghost_record_type='unknown') }}
455478
{%- endfor -%}
456479

457480
{%- endif -%}
458481

459-
{%- if datavault4dbt.is_something(missing_columns) -%},
482+
{%- if datavault4dbt.is_something(missing_columns) -%}
460483
{# Additionally generating ghost record for missing columns #}
461484
{%- for col, dtype in missing_columns.items() %}
462-
{{ datavault4dbt.ghost_record_per_datatype(column_name=col, datatype=dtype, ghost_record_type='unknown') }}
463-
{%- if not loop.last %},{% endif -%}
485+
,{{- datavault4dbt.ghost_record_per_datatype(column_name=col, datatype=dtype, ghost_record_type='unknown') }}
464486
{%- endfor -%}
465487
{%- endif -%}
466488

467-
{%- if datavault4dbt.is_something(prejoined_columns) -%},
468-
{# Additionally generating ghost records for the prejoined attributes#}
469-
{% for col, vals in prejoined_columns.items() %}
489+
{%- if datavault4dbt.is_something(prejoined_columns) -%}
490+
{# Additionally generating ghost records for the prejoined attributes #}
491+
{%- for prejoin in prejoined_columns -%}
470492

471-
{%- if 'src_name' in vals.keys() or 'src_table' in vals.keys() -%}
472-
{%- set relation = source(vals['src_name']|string, vals['src_table']) -%}
473-
{%- elif 'ref_model' in vals.keys() -%}
474-
{%- set relation = ref(vals['ref_model']) -%}
493+
{%- if 'ref_model' in prejoin.keys() -%}
494+
{%- set relation = ref(prejoin['ref_model']) -%}
495+
{%- elif 'src_name' in prejoin.keys() and 'src_table' in prejoin.keys() -%}
496+
{%- set relation = source(prejoin['src_name']|string, prejoin['src_table']) -%}
475497
{%- endif -%}
476498

477499
{%- set pj_relation_columns = adapter.get_columns_in_relation( relation ) -%}
478-
{{ log('pj_relation_columns: ' ~ pj_relation_columns, false ) }}
479-
480-
{% for column in pj_relation_columns -%}
481-
482-
{% if column.name|lower == vals['bk']|lower -%}
483-
{{ log('column found? yes, for column :' ~ column.name , false) }}
484-
{{ datavault4dbt.ghost_record_per_datatype(column_name=column.name, datatype=column.dtype, ghost_record_type='unknown', alias=col) }}
500+
{{ log('pj_relation_columns for '~relation~': ' ~ pj_relation_columns, false ) }}
501+
502+
{%- for column in pj_relation_columns -%}
503+
{%- if column.name|lower in prejoin['extract_columns']|map('lower') -%}
504+
{%- set prejoin_extract_cols_lower = prejoin['extract_columns']|map('lower')|list -%}
505+
{%- set prejoin_col_index = prejoin_extract_cols_lower.index(column.name|lower) -%}
506+
{{ log('column found? yes, for column: ' ~ column.name , false) }}
507+
,{{ datavault4dbt.ghost_record_per_datatype(column_name=column.name, datatype=column.dtype, ghost_record_type='unknown', alias=prejoin['aliases'][prejoin_col_index]) }}
485508
{%- endif -%}
486509

487510
{%- endfor -%}
488-
{%- if not loop.last %},{% endif %}
489511
{% endfor -%}
490512
{%- endif %}
491513

492-
{%- if datavault4dbt.is_something(derived_columns) -%},
493-
{# Additionally generating Ghost Records for Derived Columns #}
494-
{%- for column_name, properties in derived_columns_with_datatypes_DICT.items() %}
495-
{{ datavault4dbt.ghost_record_per_datatype(column_name=column_name, datatype=properties.datatype, ghost_record_type='unknown') }}
496-
{%- if not loop.last %},{% endif -%}
514+
{%- if datavault4dbt.is_something(derived_columns) -%}
515+
{# Additionally generating Ghost Records for Derived Columns #}
516+
{% for column_name, properties in derived_columns_with_datatypes_DICT.items() %}
517+
,{{ datavault4dbt.ghost_record_per_datatype(column_name=column_name, datatype=properties.datatype, ghost_record_type='unknown') }}
497518
{%- endfor -%}
498519

499520
{%- endif -%}
500521

501-
{%- if datavault4dbt.is_something(processed_hash_columns) -%},
522+
{%- if datavault4dbt.is_something(processed_hash_columns) -%}
502523

503524
{%- for hash_column in processed_hash_columns %}
504-
CAST({{ datavault4dbt.as_constant(column_str=unknown_key) }} as {{ hash_dtype }}) as {{ hash_column }}
505-
{%- if not loop.last %},{% endif %}
525+
,CAST({{ datavault4dbt.as_constant(column_str=unknown_key) }} as {{ hash_dtype }}) as {{ hash_column }}
506526
{%- endfor -%}
507527

508528
{%- endif -%}
@@ -514,62 +534,61 @@ error_values AS (
514534

515535
SELECT
516536

517-
{{ datavault4dbt.string_to_timestamp(timestamp_format , end_of_all_times) }} as {{ load_datetime_col_name }},
518-
'{{ error_value_rsrc }}' as {{ record_source_col_name }}
537+
{{ datavault4dbt.string_to_timestamp(timestamp_format , end_of_all_times) }} as {{ load_datetime_col_name }}
538+
,'{{ error_value_rsrc }}' as {{ record_source_col_name }}
519539

520-
{%- if columns_without_excluded_columns is defined and columns_without_excluded_columns| length > 0 -%},
540+
{%- if columns_without_excluded_columns is defined and columns_without_excluded_columns| length > 0 -%}
521541
{# Generating Ghost Records for Source Columns #}
522542
{%- for column in columns_without_excluded_columns %}
523-
{{ datavault4dbt.ghost_record_per_datatype(column_name=column.name, datatype=column.dtype, ghost_record_type='error') }}
524-
{%- if not loop.last %},{% endif -%}
543+
,{{ datavault4dbt.ghost_record_per_datatype(column_name=column.name, datatype=column.dtype, ghost_record_type='error') }}
525544
{%- endfor -%}
526545

527546
{%- endif -%}
528547

529-
{%- if datavault4dbt.is_something(missing_columns) -%},
548+
{%- if datavault4dbt.is_something(missing_columns) -%}
530549
{# Additionally generating ghost record for Missing columns #}
531550
{%- for col, dtype in missing_columns.items() %}
532-
{{ datavault4dbt.ghost_record_per_datatype(column_name=col, datatype=dtype, ghost_record_type='error') }}
533-
{%- if not loop.last %},{% endif -%}
551+
,{{ datavault4dbt.ghost_record_per_datatype(column_name=col, datatype=dtype, ghost_record_type='error') }}
534552
{%- endfor -%}
535553
{%- endif -%}
536554

537-
{%- if datavault4dbt.is_something(prejoined_columns) -%},
538-
{# Additionally generating ghost records for the prejoined attributes #}
539-
{%- for col, vals in prejoined_columns.items() %}
555+
{%- if datavault4dbt.is_something(prejoined_columns) -%}
556+
{# Additionally generating ghost records for the prejoined attributes#}
557+
{% for prejoin in prejoined_columns %}
540558

541-
{%- if 'src_name' in vals.keys() or 'src_table' in vals.keys() -%}
542-
{%- set relation = source(vals['src_name']|string, vals['src_table']) -%}
543-
{%- elif 'ref_model' in vals.keys() -%}
544-
{%- set relation = ref(vals['ref_model']) -%}
559+
{%- if 'ref_model' in prejoin.keys() -%}
560+
{% set relation = ref(prejoin['ref_model']) -%}
561+
{%- elif 'src_name' in prejoin.keys() and 'src_table' in prejoin.keys() -%}
562+
{%- set relation = source(prejoin['src_name']|string, prejoin['src_table']) -%}
545563
{%- endif -%}
546564

547565
{%- set pj_relation_columns = adapter.get_columns_in_relation( relation ) -%}
566+
{{- log('pj_relation_columns for '~relation~': ' ~ pj_relation_columns, false ) -}}
548567

549568
{% for column in pj_relation_columns -%}
550-
{% if column.name|lower == vals['bk']|lower -%}
551-
{{ datavault4dbt.ghost_record_per_datatype(column_name=column.name, datatype=column.dtype, ghost_record_type='error', alias=col) -}}
569+
{%- if column.name|lower in prejoin['extract_columns']|map('lower') -%}
570+
{%- set prejoin_extract_cols_lower = prejoin['extract_columns']|map('lower')|list -%}
571+
{%- set prejoin_col_index = prejoin_extract_cols_lower.index(column.name|lower) -%}
572+
{{ log('column found? yes, for column: ' ~ column.name , false) }}
573+
,{{ datavault4dbt.ghost_record_per_datatype(column_name=column.name, datatype=column.dtype, ghost_record_type='error', alias=prejoin['aliases'][prejoin_col_index]) }}
552574
{%- endif -%}
575+
553576
{%- endfor -%}
554-
{%- if not loop.last -%},{%- endif %}
555577
{% endfor -%}
578+
{%- endif %}
556579

557-
{%- endif -%}
558-
559-
{%- if datavault4dbt.is_something(derived_columns) %},
580+
{%- if datavault4dbt.is_something(derived_columns) %}
560581
{# Additionally generating Ghost Records for Derived Columns #}
561582
{%- for column_name, properties in derived_columns_with_datatypes_DICT.items() %}
562-
{{ datavault4dbt.ghost_record_per_datatype(column_name=column_name, datatype=properties.datatype, ghost_record_type='error') }}
563-
{%- if not loop.last %},{% endif %}
583+
,{{ datavault4dbt.ghost_record_per_datatype(column_name=column_name, datatype=properties.datatype, ghost_record_type='error') }}
564584
{%- endfor -%}
565585

566586
{%- endif -%}
567587

568-
{%- if datavault4dbt.is_something(processed_hash_columns) -%},
588+
{%- if datavault4dbt.is_something(processed_hash_columns) -%}
569589

570590
{%- for hash_column in processed_hash_columns %}
571-
CAST({{ datavault4dbt.as_constant(column_str=error_key) }} as {{ hash_dtype }}) as {{ hash_column }}
572-
{%- if not loop.last %},{% endif %}
591+
,CAST({{ datavault4dbt.as_constant(column_str=error_key) }} as {{ hash_dtype }}) as {{ hash_column }}
573592
{%- endfor -%}
574593

575594
{%- endif -%}

0 commit comments

Comments
 (0)