forked from dbt-labs/redshift
-
Notifications
You must be signed in to change notification settings - Fork 0
/
compression.sql
97 lines (70 loc) · 3.51 KB
/
compression.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
{% macro find_analyze_recommendations(schema, table, comprows=none) %}
{% set comprows_s = '' if comprows is none else 'comprows ' ~ comprows %}
{% set query %}
analyze compression "{{ schema }}"."{{ table }}" {{ comprows_s }}
{% endset %}
{% set columns = redshift.get_data(query, ['table', 'column', 'encoding', 'reduction_pct']) %}
{% set ret = {} %}
{% for column in columns %}
{%- set _ = ret.update({column.column: column}) -%}
{% endfor %}
{{ return(ret) }}
{% endmacro %}
{% macro build_optimized_definition(definition, recommendation) -%}
{% set optimized = {} %}
{% set _ = optimized.update(definition) %}
{% for name, column in definition['columns'].items() %}
{% set recommended_encoding = recommendation[name] %}
{% if recommended_encoding['encoding'] != column['encoding'] %}
{{ log(" Changing " ~ name ~ ": " ~ column['encoding'] ~ " -> " ~ recommended_encoding['encoding'] ~ " (" ~ recommended_encoding['reduction_pct'] ~ "%)") }}
{% else %}
{{ log("Not Changing " ~ name ~ ": " ~ column['encoding']) }}
{% endif %}
{% set _ = optimized['columns'][name].update({"encoding": recommended_encoding['encoding']}) %}
{% endfor %}
{{ return(optimized) }}
{%- endmacro %}
{%- macro insert_into_sql(from_schema, from_table, to_schema, to_table) -%}
insert into "{{ to_schema }}"."{{ to_table }}" (
select * from "{{ from_schema }}"."{{ from_table }}"
);
{%- endmacro -%}
{%- macro atomic_swap_sql(schema, from_table, to_table, drop_backup) -%}
begin;
-- drop table if exists "{{ schema }}"."{{ from_table }}__backup" cascade;
alter table "{{ schema }}"."{{ from_table }}" rename to "{{ from_table }}__backup";
alter table "{{ schema }}"."{{ to_table }}" rename to "{{ from_table }}";
{% if drop_backup %}
drop table "{{ schema }}"."{{ from_table }}__backup" cascade;
{% else %}
{{ log('drop_backup is False -- not dropping ' ~ from_table ~ "__backup") }}
{% endif %}
commit;
{%- endmacro -%}
{%- macro compress_table(schema, table, drop_backup=False,
comprows=none, sort_style=none, sort_keys=none,
dist_style=none, dist_key=none, skip_if_incremental=False) -%}
{% if not execute %}
{{ return(none) }}
{% endif %}
{% if skip_if_incremental and is_incremental() %}
{{ return('') }}
{% endif %}
{% set recommendation = redshift.find_analyze_recommendations(schema, table, comprows) %}
{% set definition = redshift.fetch_table_definition(schema, table) %}
{% if definition is none %}
{{ return(none) }}
{% endif %}
{% set optimized = redshift.build_optimized_definition(definition, recommendation) %}
{% set _ = optimized.update({"keys": optimized.get('keys', {}) | default({})}) %}
{% if sort_style %} {% set _ = optimized['keys'].update({"sort_style": sort_style}) %} {% endif %}
{% if sort_keys %} {% set _ = optimized['keys'].update({"sort_keys": sort_keys}) %} {% endif %}
{% if dist_style %} {% set _ = optimized['keys'].update({"dist_style": dist_style}) %} {% endif %}
{% if dist_key %} {% set _ = optimized['keys'].update({"dist_key": dist_key}) %} {% endif %}
{% set new_table = table ~ "__compressed" %}
{% set _ = optimized.update({'name': new_table}) %}
{# Build the DDL #}
{{ redshift.build_ddl_sql(optimized) }}
{{ redshift.insert_into_sql(schema, table, schema, new_table) }}
{{ redshift.atomic_swap_sql(schema, table, new_table, drop_backup) }}
{%- endmacro %}