Skip to content

Commit

Permalink
update stored procedure
Browse files Browse the repository at this point in the history
  • Loading branch information
agnessnowplow committed Feb 22, 2022
1 parent 6482044 commit afdfa17
Showing 1 changed file with 84 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,128 +86,99 @@ CREATE OR REPLACE PROCEDURE {{.output_schema}}.column_check(SOURCE_SCHEMA VARCHA
AS
$$

var delim = '~';
var sourceColumns = list_cols_with_type(SOURCE_SCHEMA,SOURCE_TABLE,delim).split(delim);
var targetColumns = list_cols_with_type(TARGET_SCHEMA,TARGET_TABLE,delim).split(delim);
var sourceColumnLengths = list_cols_with_length(SOURCE_SCHEMA,SOURCE_TABLE,delim).split(delim);
var targetColumnLengths = list_cols_with_length(TARGET_SCHEMA,TARGET_TABLE,delim).split(delim);

if (targetColumns.length > sourceColumns.length) {
column_check_stmt = `
WITH source_data AS (
SELECT
column_name,
ordinal_position,
character_maximum_length,
data_type,
CASE
WHEN isc.data_type='TEXT' THEN CONCAT(isc.column_name, ' VARCHAR(',isc.character_maximum_length, ')')
WHEN isc.data_type='NUMBER' THEN CONCAT(isc.column_name, ' NUMBER(', isc.numeric_precision, ',',isc.numeric_scale, ')')
else CONCAT(isc.column_name, ' ', isc.data_type)
END AS column_definition
FROM information_schema.columns isc
WHERE table_schema = UPPER(:1)
AND table_name = UPPER(:2)
ORDER BY ordinal_position
),
target_data AS (
SELECT
column_name,
ordinal_position,
character_maximum_length,
data_type
FROM information_schema.columns isc
where table_schema = UPPER(:3)
AND table_name = UPPER(:4)
ORDER BY ordinal_position
),
varchar_check AS (
SELECT listagg(
CASE WHEN T.character_maximum_length < s.character_maximum_length THEN s.column_name END
, ', ') WITHIN GROUP (ORDER BY s.ordinal_position) AS cols_with_varchar_issue
FROM target_data t
LEFT JOIN SOURCE_data s
ON t.ordinal_position = s.ordinal_position AND t.column_name = s.column_name
WHERE t.ordinal_position>= s.ordinal_position),
type_check AS (
SELECT SUM(CASE WHEN s.column_name IS NULL THEN 1 ELSE 0 END) AS missing_in_source,
SUM(CASE WHEN t.column_name IS NULL THEN 1 ELSE 0 END) AS missing_in_target
FROM target_data t
FULL OUTER JOIN SOURCE_data s
ON t.ordinal_position = s.ordinal_position AND T.data_type = s.data_type AND s.column_name = t.column_name
WHERE t.ordinal_position>= s.ordinal_position OR s.ordinal_position is null),
columns_to_add AS (
SELECT listagg(CASE WHEN t.ordinal_position IS NULL THEN s.column_definition end
, ', ') WITHIN GROUP (ORDER BY s.ordinal_position) AS cols_to_add
FROM source_data s
LEFT JOIN target_data t
ON t.ordinal_position = s.ordinal_position )
SELECT missing_in_source, missing_in_target, cols_with_varchar_issue, cols_to_add
FROM columns_to_add, type_check, varchar_check`;

var res = snowflake.createStatement({sqlText: column_check_stmt,
binds: [SOURCE_SCHEMA, SOURCE_TABLE,TARGET_SCHEMA, TARGET_TABLE]}
).execute();
res.next();

missing_in_source = res.getColumnValue(1);
missing_in_target = res.getColumnValue(2);
cols_with_varchar_issue = res.getColumnValue(3);
cols_to_add = res.getColumnValue(4);

if (missing_in_target > 1) {
throw "ERROR: Source table is missing column(s) which exist in target table.";

} else {
for (var i = 0; i < targetColumns.length; i++) {
if (+targetColumnLengths[i] < +sourceColumnLengths[i]) {
throw "ERROR: varchar field in source column " + sourceColumns[i] + " is longer than varchar field in target column."
}
}

var columnAdditions = sourceColumns.filter(notIncludedIn(targetColumns));

if (sourceColumns.some(notIncludedIn(targetColumns)) === true) {
if ( AUTOMIGRATE !== 'TRUE' ) {
throw "ERROR: Target table is missing column(s),but automigrate is not enabled.";
} else {
// enforce order
for (var i = 0; i < targetColumns.length; i++) {
if (targetColumns[i] !== sourceColumns[i]) {
throw "ERROR: Can only migrate extra columns of the end of source."
}
}
var pos = targetColumns.length + 1
var extraCols = list_extra_cols(SOURCE_SCHEMA,SOURCE_TABLE,delim, pos).split(delim);
add_columns_to(TARGET_SCHEMA, TARGET_TABLE, extraCols.join(','));
return "ok. Columns added."
}
} else {
return "ok. Columns match."
}
}

// == Helpers ==

function list_cols_with_type(sch,tbl,delimiter) {
var stmt = `
SELECT
LISTAGG(
CASE
WHEN isc.data_type='TEXT'
THEN isc.column_name
WHEN isc.data_type='NUMBER'
THEN CONCAT(isc.column_name, ' NUMBER(', isc.numeric_precision, ',',isc.numeric_scale, ')')
ELSE
CONCAT(isc.column_name, ' ', isc.data_type)
END, '` + delimiter + `')
WITHIN GROUP (order by isc.ordinal_position)
FROM information_schema.columns AS isc
WHERE table_schema='` + sch + `'
AND table_name='` + tbl + `';`;

var res = snowflake.createStatement({sqlText: stmt}).execute();
res.next();
result = res.getColumnValue(1);

return result;
if (cols_with_varchar_issue !== '') {
throw "ERROR: field length for source varchar column(s) " + cols_with_varchar_issue + " is longer than the target."
}

function list_extra_cols(sch,tbl,delimiter,position) {
var stmt = `
SELECT
LISTAGG(
CASE
WHEN isc.data_type='TEXT'
THEN CONCAT(isc.column_name, ' VARCHAR(',isc.character_maximum_length, ')')
WHEN isc.data_type='NUMBER'
THEN CONCAT(isc.column_name, ' NUMBER(', isc.numeric_precision, ',',isc.numeric_scale, ')')
ELSE
CONCAT(isc.column_name, ' ', isc.data_type)
END, '` + delimiter + `')
WITHIN GROUP (order by isc.ordinal_position)
FROM information_schema.columns AS isc
WHERE table_schema='` + sch + `'
AND table_name='` + tbl + `'
AND isc.ordinal_position >='` + position + `';`;

var res = snowflake.createStatement({sqlText: stmt}).execute();
res.next();
result = res.getColumnValue(1);

return result;
if (missing_in_source > 0) {
throw "ERROR: Can only migrate extra columns of the end of source"
}

function list_cols_with_length(sch,tbl,delimiter) {
var stmt = `
SELECT
LISTAGG(
CASE
WHEN isc.data_type='TEXT'
THEN isc.character_maximum_length
ELSE 0
END, '` + delimiter + `')
WITHIN GROUP (order by isc.ordinal_position)
FROM information_schema.columns AS isc
WHERE table_schema='` + sch + `'
AND table_name='` + tbl + `';`;

var res = snowflake.createStatement({sqlText: stmt}).execute();
res.next();
result = res.getColumnValue(1);

return result;
}

function notIncludedIn(arr) {
return function(elt) {
return ! arr.includes(elt);
};
}
if (cols_to_add !== '') {

function add_columns_to(sch, tbl, cols) {
var alter_stmt = `ALTER TABLE ` + sch + `.` + tbl + ` ADD COLUMN ` + cols;
snowflake.createStatement({sqlText: alter_stmt}).execute();
if ( AUTOMIGRATE !== 'TRUE' ) {
throw "ERROR: Target table is missing column(s),but automigrate is not enabled.";

return "ok. Columns added.";
}
} else {
var alter_stmt = `ALTER TABLE ` + TARGET_SCHEMA + `.` + TARGET_TABLE + ` ADD COLUMN ` + cols_to_add;
snowflake.createStatement({sqlText: alter_stmt}).execute();
return "ok. Columns added."
}

} else {
return "ok. Columns match."
}

$$
;
Expand Down

0 comments on commit afdfa17

Please sign in to comment.