Skip to content

Commit

Permalink
update stored procedure to follow mobile logic
Browse files Browse the repository at this point in the history
  • Loading branch information
agnessnowplow committed Feb 22, 2022
1 parent 0c270c3 commit c6669d7
Showing 1 changed file with 53 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,86 +87,75 @@ CREATE OR REPLACE PROCEDURE {{.output_schema}}.column_check(SOURCE_SCHEMA VARCHA
$$

column_check_stmt = `
WITH source_data AS (
SELECT
column_name,
ordinal_position,
character_maximum_length,
data_type,
CASE
WHEN isc.data_type='TEXT' THEN CONCAT(isc.column_name, ' VARCHAR(',isc.character_maximum_length, ')')
WHEN isc.data_type='NUMBER' THEN CONCAT(isc.column_name, ' NUMBER(', isc.numeric_precision, ',',isc.numeric_scale, ')')
else CONCAT(isc.column_name, ' ', isc.data_type)
END AS column_definition
FROM information_schema.columns isc
WHERE table_schema = UPPER(:1)
AND table_name = UPPER(:2)
ORDER BY ordinal_position
),
target_data AS (
WITH target_columns AS (
SELECT
column_name,
ordinal_position,
character_maximum_length,
data_type
FROM information_schema.columns isc
where table_schema = UPPER(:3)
isc.column_name,
isc.data_type,
isc.ordinal_position,
isc.character_maximum_length
FROM information_schema.columns AS isc
WHERE table_schema = UPPER(:1)
AND table_name = UPPER(:2)
)
, source_columns AS (
SELECT
isc.column_name,
isc.data_type,
isc.ordinal_position,
isc.character_maximum_length,
isc.numeric_precision,
isc.numeric_scale
FROM information_schema.columns AS isc
WHERE table_schema = UPPER(:3)
AND table_name = UPPER(:4)
ORDER BY ordinal_position
),
varchar_check AS (
SELECT listagg(
CASE WHEN T.character_maximum_length < s.character_maximum_length THEN s.column_name END
, ', ') WITHIN GROUP (ORDER BY s.ordinal_position) AS cols_with_varchar_issue
FROM target_data t
LEFT JOIN SOURCE_data s
ON t.ordinal_position = s.ordinal_position AND t.column_name = s.column_name
WHERE t.ordinal_position>= s.ordinal_position),
type_check AS (
SELECT SUM(CASE WHEN s.column_name IS NULL AND ifnull(s.ordinal_position, 0) <= m.target_cols THEN 1 ELSE 0 END) AS missing_in_source,
SUM(CASE WHEN ifnull(s.ordinal_position,0) <= m.target_cols AND s.ordinal_position <> t.ordinal_position THEN 1 ELSE 0 END) AS missing_in_target
FROM target_data t
FULL OUTER JOIN SOURCE_data s
ON T.data_type = s.data_type AND t.column_name = s.column_name
LEFT JOIN (SELECT max(ordinal_position) target_cols FROM target_data) m
),
columns_to_add AS (
SELECT listagg(CASE WHEN t.ordinal_position IS NULL THEN s.column_definition end
, ', ') WITHIN GROUP (ORDER BY s.ordinal_position) AS cols_to_add
FROM source_data s
LEFT JOIN target_data t
ON t.ordinal_position = s.ordinal_position )
SELECT missing_in_source, missing_in_target, cols_with_varchar_issue, cols_to_add
FROM columns_to_add, type_check, varchar_check`;
)
SELECT
SUM(CASE WHEN sc.column_name IS NULL THEN 1 ELSE 0 END) AS missing_in_source,
SUM(CASE WHEN tc.column_name IS NULL THEN 1 ELSE 0 END) AS missing_in_target,
LISTAGG(
CASE
WHEN tc.column_name IS NOT NULL
THEN NULL
WHEN sc.data_type='TEXT'
THEN CONCAT(sc.column_name, ' VARCHAR(',sc.character_maximum_length, ')')
WHEN sc.data_type='NUMBER'
THEN CONCAT(sc.column_name, ' NUMBER(', sc.numeric_precision, ',',sc.numeric_scale, ')')
ELSE
CONCAT(sc.column_name, ' ', sc.data_type)
END
, ', ') WITHIN GROUP (ORDER BY sc.ordinal_position) AS cols_to_add,
LISTAGG(CASE WHEN tc.column_name IS NOT NULL AND sc.character_maximum_length > tc.character_maximum_length THEN sc.column_name END, ', ') as cols_w_incompatible_char_limits
FROM target_columns tc
FULL OUTER JOIN source_columns sc
ON tc.column_name = sc.column_name
AND tc.data_type = sc.data_type
AND tc.ordinal_position = sc.ordinal_position`;

var res = snowflake.createStatement({sqlText: column_check_stmt,
binds: [SOURCE_SCHEMA, SOURCE_TABLE,TARGET_SCHEMA, TARGET_TABLE]}
binds: [TARGET_SCHEMA, TARGET_TABLE,SOURCE_SCHEMA, SOURCE_TABLE]}
).execute();
res.next();

missing_in_source = res.getColumnValue(1);
missing_in_target = res.getColumnValue(2);
cols_with_varchar_issue = res.getColumnValue(3);
cols_to_add = res.getColumnValue(4);
cols_to_add = res.getColumnValue(3);
cols_with_varchar_issue = res.getColumnValue(4);

if (missing_in_source !== 0) {
throw "ERROR: Source table is missing column(s) which exist in target table.";

if (missing_in_source > 0) {
throw "ERROR: Source table is either missing column(s) which exist in target table or their position is wrong.";
}

if (cols_with_varchar_issue !== '') {
throw "ERROR: field length for source varchar column(s) " + cols_with_varchar_issue + " is longer than the target."
}

if (missing_in_target !== 0) {
throw "ERROR: Can only migrate extra columns of the end of source"
}

if (cols_to_add !== '') {
if (missing_in_target > 0) {

if ( AUTOMIGRATE !== 'TRUE' ) {
throw "ERROR: Target table is missing column(s),but automigrate is not enabled.";
Expand Down

0 comments on commit c6669d7

Please sign in to comment.