Skip to content

Commit

Permalink
Merge pull request #46 from mikaelene/v0.18.0
Browse files Browse the repository at this point in the history
V0.18.0
  • Loading branch information
mikaelene authored Sep 15, 2020
2 parents 1b99883 + a28f381 commit aaf8ac3
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 259 deletions.
12 changes: 9 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
Passing all tests in [dbt-integration-tests](https://github.com/fishtown-analytics/dbt-integration-tests/).

Only supports dbt 0.14 and newer!
- For dbt 0.14.x use dbt-sqlserver 0.14.x
- For dbt 0.15.x use dbt-sqlserver 0.15.x
- For dbt 0.18.x use dbt-sqlserver 0.18.x
- dbt 0.17.x is unsupported
- dbt 0.16.x is unsupported
- dbt 0.17.x is unsupported - development in progress
- For dbt 0.15.x use dbt-sqlserver 0.15.x
- For dbt 0.14.x use dbt-sqlserver 0.14.x


Easiest install is to use pip:

Expand Down Expand Up @@ -100,6 +102,10 @@ Example of applying Unique clustered index on two columns, Ordinary index on one

## Changelog

### v0.18.0
#### New Features:
- Adds support for dbt v0.18.0

### v0.15.3.1

#### Fixes:
Expand Down
9 changes: 8 additions & 1 deletion dbt/adapters/sqlserver/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,14 @@ def open(cls, connection):
try:
con_str = []
con_str.append(f"DRIVER={{{credentials.driver}}}")
con_str.append(f"SERVER={credentials.host},{credentials.port}")

if "\\" in credentials.host:
# if there is a backslash \ in the host name the host is a sql-server named instance
# in this case then port number has to be omitted
con_str.append(f"SERVER={credentials.host}")
else:
con_str.append(f"SERVER={credentials.host},{credentials.port}")

con_str.append(f"Database={credentials.database}")

if not getattr(credentials, 'windows_login', False):
Expand Down
30 changes: 20 additions & 10 deletions dbt/include/sqlserver/macros/adapters.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
{% macro sqlserver__list_relations_without_caching(information_schema, schema) %}
{% macro sqlserver__information_schema_name(database) -%}
information_schema
{%- endmacro %}

{% macro sqlserver__list_relations_without_caching(schema_relation) %}
{% call statement('list_relations_without_caching', fetch_result=True) -%}
select
table_catalog as [database],
Expand All @@ -8,9 +12,10 @@
when table_type = 'VIEW' then 'view'
else table_type
end as table_type
from {{ information_schema }}.tables
where table_schema like '{{ schema }}'
and table_catalog like '{{ information_schema.database }}'

from information_schema.tables
where table_schema like '{{ schema_relation.schema }}'
and table_catalog like '{{ schema_relation.database }}'
{% endcall %}
{{ return(load_result('list_relations_without_caching').table) }}
{% endmacro %}
Expand All @@ -23,19 +28,19 @@
{{ return(load_result('list_schemas').table) }}
{% endmacro %}

{% macro sqlserver__create_schema(database_name, schema_name) -%}
{% macro sqlserver__create_schema(relation) -%}
{% call statement('create_schema') -%}
USE {{ database_name }}
IF NOT EXISTS (SELECT * FROM sys.schemas WHERE name = {{ schema_name | replace('"', "'") }})
USE [{{ relation.database }}]
IF NOT EXISTS (SELECT * FROM sys.schemas WHERE name = '{{ relation.without_identifier().schema }}')
BEGIN
EXEC('CREATE SCHEMA {{ schema_name | replace('"', "") }}')
EXEC('CREATE SCHEMA {{ relation.without_identifier().schema }}')
END
{% endcall %}
{% endmacro %}

{% macro sqlserver__drop_schema(database_name, schema_name) -%}
{% call statement('drop_schema') -%}
drop schema if exists {{database_name}}.{{schema_name}}
drop schema if exists {{ relation.without_identifier().schema }}
{% endcall %}
{% endmacro %}

Expand Down Expand Up @@ -164,4 +169,9 @@
path={"identifier": tmp_identifier}) -%}

{% do return(tmp_relation) %}
{% endmacro %}
{% endmacro %}

{% macro sqlserver__snapshot_string_as_time(timestamp) -%}
{%- set result = "CONVERT(DATETIME2, '" ~ timestamp ~ "')" -%}
{{ return(result) }}
{%- endmacro %}
2 changes: 1 addition & 1 deletion dbt/include/sqlserver/macros/catalog.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

{% macro sqlserver__get_catalog(information_schemas) -%}
{% macro sqlserver__get_catalog(information_schemas, schemas) -%}

{%- call statement('catalog', fetch_result=True) -%}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{% macro dbt__incremental_delete(target_relation, tmp_relation) -%}
{% macro dbt__incremental_delete(target_relation, tmp_relation) -%}
{%- set unique_key = config.require('unique_key') -%}

delete
Expand Down
4 changes: 2 additions & 2 deletions dbt/include/sqlserver/macros/materializations/seed/seed.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{% macro basic_load_csv_rows(model, batch_size, agate_table) %}
{% macro sqlserver__basic_load_csv_rows(model, batch_size, agate_table) %}
{% set cols_sql = get_seed_column_quoted_csv(model, agate_table.column_names) %}
{% set bindings = [] %}

Expand Down Expand Up @@ -34,5 +34,5 @@
{% endmacro %}

{% macro sqlserver__load_csv_rows(model, agate_table) %}
{{ return(basic_load_csv_rows(model, 200, agate_table) )}}
{{ return(sqlserver__basic_load_csv_rows(model, 200, agate_table) )}}
{% endmacro %}
118 changes: 0 additions & 118 deletions dbt/include/sqlserver/macros/materializations/snapshot/snapshot.sql
Original file line number Diff line number Diff line change
@@ -1,122 +1,4 @@
{% macro snapshot_staging_table_inserts(strategy, source_sql, target_relation) -%}

select
'insert' as dbt_change_type,
source_data.*

from (

select *,
COALESCE({{ strategy.scd_id }}, NULL) as dbt_scd_id,
COALESCE({{ strategy.unique_key }}, NULL) as dbt_unique_key,
COALESCE({{ strategy.updated_at }}, NULL) as dbt_updated_at,
COALESCE({{ strategy.updated_at }}, NULL) as dbt_valid_from,
nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to

from (
{{ source_sql }}
) snapshot_query
) source_data
left outer join (

select *,
{{ strategy.unique_key }} as dbt_unique_key

from {{ target_relation }}

) snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
where snapshotted_data.dbt_unique_key is null
or (
snapshotted_data.dbt_unique_key is not null
and snapshotted_data.dbt_valid_to is null
and (
{{ strategy.row_changed }}
)
)

{%- endmacro %}


{% macro snapshot_staging_table_updates(strategy, source_sql, target_relation) -%}

select
'update' as dbt_change_type,
snapshotted_data.dbt_scd_id,
source_data.dbt_valid_from as dbt_valid_to

from (

select
*,
COALESCE({{ strategy.scd_id }}, NULL) as dbt_scd_id,
COALESCE({{ strategy.unique_key }}, NULL) as dbt_unique_key,
COALESCE({{ strategy.updated_at }}, NULL) as dbt_updated_at,
COALESCE({{ strategy.updated_at }}, NULL) as dbt_valid_from

from (
{{ source_sql }}
) snapshot_query
) source_data
join (

select *,
{{ strategy.unique_key }} as dbt_unique_key

from {{ target_relation }}

) snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
where snapshotted_data.dbt_valid_to is null
and (
{{ strategy.row_changed }}
)

{%- endmacro %}


{% macro build_snapshot_staging_table(strategy, sql, target_relation) %}
{% set tmp_relation = make_temp_relation(target_relation) %}

{% set inserts_select = snapshot_staging_table_inserts(strategy, sql, target_relation) %}
{% set updates_select = snapshot_staging_table_updates(strategy, sql, target_relation) %}

{% call statement('build_snapshot_staging_relation_inserts') %}
{{ create_table_as(False, tmp_relation, inserts_select) }}
{% endcall %}

{% call statement('build_snapshot_staging_relation_updates') %}
insert into {{ tmp_relation }} (dbt_change_type, dbt_scd_id, dbt_valid_to)
select dbt_change_type, dbt_scd_id, dbt_valid_to from (
{{ updates_select }}
) dbt_sbq;
{% endcall %}

{% do return(tmp_relation) %}
{% endmacro %}


{% macro sqlserver__post_snapshot(staging_relation) %}
-- Clean up the snapshot temp table
{% do drop_relation(staging_relation) %}
{% endmacro %}


{% macro build_snapshot_table(strategy, sql) %}

select *,
COALESCE({{ strategy.scd_id }}, NULL) as dbt_scd_id,
COALESCE({{ strategy.updated_at }}, NULL) as dbt_updated_at,
COALESCE({{ strategy.updated_at }}, NULL) as dbt_valid_from,
nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to
from (
{{ sql }}
) sbq

{% endmacro %}

{% macro sqlserver__create_columns(relation, columns) %}
{% for column in columns %}
{% call statement() %}
alter table {{ relation }} add "{{ column.name }}" {{ column.data_type }};
{% endcall %}
{% endfor %}
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -1,40 +1,40 @@
{% macro sqlserver__snapshot_hash_arguments(args) %}
CONVERT(VARCHAR(32), HashBytes('MD5', {% for arg in args %}
coalesce(cast({{ arg }} as varchar ), '') {% if not loop.last %} + '|' + {% endif %}
{% endfor %}), 2)
{% endmacro %}

{% macro snapshot_check_strategy(node, snapshotted_rel, current_rel, config, target_exists) %}
{% set check_cols_config = config['check_cols'] %}
{% set primary_key = config['unique_key'] %}
{% set updated_at = snapshot_get_time() %}

{% if check_cols_config == 'all' %}
{% set check_cols = get_columns_in_query(node['injected_sql']) %}
{% elif check_cols_config is iterable and (check_cols_config | length) > 0 %}
{% set check_cols = check_cols_config %}
{% else %}
{% do exceptions.raise_compiler_error("Invalid value for 'check_cols': " ~ check_cols_config) %}
{% endif %}

{% set row_changed_expr -%}
(
{% for col in check_cols %}
{{ snapshotted_rel }}.{{ col }} != {{ current_rel }}.{{ col }}
or
({{ snapshotted_rel }}.{{ col }} is null) and not ({{ current_rel }}.{{ col }} is null)
{%- if not loop.last %} or {% endif %}

{% endfor %}
)
{%- endset %}

{% set scd_id_expr = snapshot_hash_arguments([primary_key, updated_at]) %}

{% do return({
"unique_key": primary_key,
"updated_at": updated_at,
"row_changed": row_changed_expr,
"scd_id": scd_id_expr
}) %}
{% macro sqlserver__snapshot_hash_arguments(args) %}
CONVERT(VARCHAR(32), HashBytes('MD5', {% for arg in args %}
coalesce(cast({{ arg }} as varchar ), '') {% if not loop.last %} + '|' + {% endif %}
{% endfor %}), 2)
{% endmacro %}

{% macro snapshot_check_strategy(node, snapshotted_rel, current_rel, config, target_exists) %}
{% set check_cols_config = config['check_cols'] %}
{% set primary_key = config['unique_key'] %}
{% set updated_at = snapshot_get_time() %}

{% if check_cols_config == 'all' %}
{% set check_cols = get_columns_in_query(node['injected_sql']) %}
{% elif check_cols_config is iterable and (check_cols_config | length) > 0 %}
{% set check_cols = check_cols_config %}
{% else %}
{% do exceptions.raise_compiler_error("Invalid value for 'check_cols': " ~ check_cols_config) %}
{% endif %}

{% set row_changed_expr -%}
(
{% for col in check_cols %}
{{ snapshotted_rel }}.{{ col }} != {{ current_rel }}.{{ col }}
or
({{ snapshotted_rel }}.{{ col }} is null) and not ({{ current_rel }}.{{ col }} is null)
{%- if not loop.last %} or {% endif %}

{% endfor %}
)
{%- endset %}

{% set scd_id_expr = sqlserver__snapshot_hash_arguments([primary_key, updated_at]) %}

{% do return({
"unique_key": primary_key,
"updated_at": updated_at,
"row_changed": row_changed_expr,
"scd_id": scd_id_expr
}) %}
{% endmacro %}
Loading

0 comments on commit aaf8ac3

Please sign in to comment.