Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only get types and procedures from unqualified schemas #271

Merged
merged 8 commits into from
Jan 31, 2024
74 changes: 67 additions & 7 deletions crates/connectors/ndc-postgres/src/configuration/version2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use crate::configuration::version1;
use crate::configuration::IsolationLevel;

pub use version1::{
default_comparison_operator_mapping, default_excluded_schemas, default_unqualified_schemas,
ConnectionUri, PoolSettings, ResolvedSecret,
default_comparison_operator_mapping, default_excluded_schemas, ConnectionUri, PoolSettings,
ResolvedSecret,
};

const CONFIGURATION_QUERY: &str = include_str!("version2.sql");
Expand Down Expand Up @@ -50,6 +50,20 @@ impl RawConfiguration {
}
}

/// Collection names of tables in these schemas will be appear as unqualified.
pub fn default_unqualified_schemas_for_tables() -> Vec<String> {
plcplc marked this conversation as resolved.
Show resolved Hide resolved
vec!["public".to_string()]
}

/// Types, operators and procedures from these schemas will appear unqualified in the configuration.
pub fn default_unqualified_schemas_for_types_and_procedures() -> Vec<String> {
plcplc marked this conversation as resolved.
Show resolved Hide resolved
vec![
"public".to_string(),
"pg_catalog".to_string(),
"tiger".to_string(),
]
}

/// Options which only influence how the configuration server updates the configuration
#[derive(Clone, Debug, Deserialize, Serialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
Expand All @@ -58,10 +72,22 @@ pub struct ConfigureOptions {
/// internal schemas of Postgres, Citus, Cockroach, and the PostGIS extension.
#[serde(default = "default_excluded_schemas")]
pub excluded_schemas: Vec<String>,
/// Deprecated alias for 'unqualifiedSchemasForTables'.
#[serde(default)]
// weirdly, we have to use 'skip_serializing_if' rather than 'skip_serializing', since the
// latter causes schemars to consider the field required, in spite of the 'default' attribute.
// If both 'unqualified_schemas' and 'unqualified_schemas_for_tables' are specified,
// 'configure()' will merge them and check for duplicates, returning unly a
// `unqualified_schemas_for_tables' field.
#[serde(skip_serializing_if = "always_skip_unqualified_schemas")]
pub unqualified_schemas: Vec<String>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please document the relationship between this field and the others? What happens if this field and the others are defined?

/// The names of Tables and Views in these schemas will be returned unqualified.
/// The default setting will set the `public` schema as unqualified.
#[serde(default = "default_unqualified_schemas")]
pub unqualified_schemas: Vec<String>,
#[serde(default = "default_unqualified_schemas_for_tables")]
pub unqualified_schemas_for_tables: Vec<String>,
/// The types and procedures in these schemas will be returned unqualified.
#[serde(default = "default_unqualified_schemas_for_types_and_procedures")]
pub unqualified_schemas_for_types_and_procedures: Vec<String>,
/// The mapping of comparison operator names to apply when updating the configuration
#[serde(default = "default_comparison_operator_mapping")]
pub comparison_operator_mapping: Vec<version1::ComparisonOperatorMapping>,
Expand All @@ -82,7 +108,10 @@ impl Default for ConfigureOptions {
fn default() -> ConfigureOptions {
ConfigureOptions {
excluded_schemas: version1::default_excluded_schemas(),
unqualified_schemas: version1::default_unqualified_schemas(),
unqualified_schemas: vec![],
unqualified_schemas_for_tables: default_unqualified_schemas_for_tables(),
unqualified_schemas_for_types_and_procedures:
default_unqualified_schemas_for_types_and_procedures(),
comparison_operator_mapping: version1::default_comparison_operator_mapping(),
// we'll change this to `Some(MutationsVersions::V1)` when we
// want to "release" this behaviour
Expand All @@ -93,6 +122,13 @@ impl Default for ConfigureOptions {
}
}

/// This is a deprecated field subsumed by `unqualified_schemas_for_tables` and
/// `unqualified_schemas_for_types_and_procedures`.
/// We don't want to output it when generating the configuration.
pub fn always_skip_unqualified_schemas(_: &Vec<String>) -> bool {
plcplc marked this conversation as resolved.
Show resolved Hide resolved
true
}

fn default_introspect_prefix_function_comparison_operators() -> Vec<String> {
vec![
"box_above".to_string(),
Expand Down Expand Up @@ -245,7 +281,7 @@ pub async fn validate_raw_configuration(

/// Construct the NDC metadata configuration by introspecting the database.
pub async fn configure(
args: RawConfiguration,
mut args: RawConfiguration,
) -> Result<RawConfiguration, connector::UpdateConfigurationError> {
let version1::ConnectionUri::Uri(version1::ResolvedSecret(uri)) = &args.connection_uri;

Expand All @@ -254,9 +290,33 @@ pub async fn configure(
.await
.map_err(|e| connector::UpdateConfigurationError::Other(e.into()))?;

// Use only 'unqualified_schemas_for_tables'.
let mut unqualified_schemas = args
.configure_options
.unqualified_schemas
.iter()
.chain(args.configure_options.unqualified_schemas_for_tables.iter())
.cloned()
.collect::<Vec<String>>();

unqualified_schemas.sort();
unqualified_schemas.dedup();

args.configure_options.unqualified_schemas = vec![];
args.configure_options.unqualified_schemas_for_tables = unqualified_schemas;

let query = sqlx::query(CONFIGURATION_QUERY)
.bind(args.configure_options.excluded_schemas.clone())
.bind(args.configure_options.unqualified_schemas.clone())
.bind(
args.configure_options
.unqualified_schemas_for_tables
.clone(),
)
.bind(
args.configure_options
.unqualified_schemas_for_types_and_procedures
.clone(),
)
.bind(
serde_json::to_value(args.configure_options.comparison_operator_mapping.clone())
.map_err(|e| connector::UpdateConfigurationError::Other(e.into()))?,
Expand Down
155 changes: 103 additions & 52 deletions crates/connectors/ndc-postgres/src/configuration/version2.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,6 @@
-- The data model of these tables is quite involved and carries with it decades
-- of legacy. Supporting notes on this are kept in 'introspection-notes.md'.
--
-- TODO: This uses unqualified table (and view) and constraint names.
-- We will need to qualify them at some point. This makes the aliases seem
-- redundant, but they will change in the future.
-- If similar named tables exist in different schemas it is arbitrary
-- which one we pick currently! (c.f. Citus schemas 'columnar' and
-- 'columnar_internal' which both have a 'chunk' table)

-- When debugging in 'psql', uncomment the lines below to be able to run the
-- query with arguments set.

Expand Down Expand Up @@ -50,6 +43,35 @@ WITH
NOT (ns.nspname = ANY ($1))
),

-- These are the schemas of which tables will be unqualified
unqualified_schemas_for_tables AS
(
SELECT DISTINCT
schema_name,
ns.oid as schema_id
FROM
UNNEST($2) AS t(schema_name)
INNER JOIN
pg_namespace
AS ns
ON (ns.nspname = schema_name)
),

-- These are the schemas of which types and procedures will be
-- exported unqualified.
unqualified_schemas_for_types_and_procedures AS
(
SELECT DISTINCT
schema_name,
ns.oid as schema_id
FROM
UNNEST($3) AS t(schema_name)
INNER JOIN
pg_namespace
AS ns
ON (ns.nspname = schema_name)
),

-- Tables and views etc. are recorded in `pg_class`, see
-- https://www.postgresql.org/docs/current/catalog-pg-class.html for its
-- schema.
Expand Down Expand Up @@ -129,8 +151,9 @@ WITH
-- (remember that, since 'pg_class' records all tables (and other relations)
-- that exist in the database, it also has a record of itself).
--
-- We assume 'classoid' to be stable and will just use literal values rather
-- than actually looking them up in pg_class.
-- Rather than using literal numerical oids in this query we use the special
-- built-in datatype 'regclass' which resolves names to oids automatically.
-- See https://www.postgresql.org/docs/current/datatype-oid.html
column_comments AS
(
SELECT
Expand All @@ -146,7 +169,7 @@ WITH
FROM
pg_description
WHERE
classoid = 1259
classoid = 'pg_catalog.pg_class'::regclass
) AS comm
INNER JOIN
columns
Expand All @@ -161,7 +184,7 @@ WITH
FROM
pg_description
WHERE
classoid = 1259
classoid = 'pg_catalog.pg_class'::regclass
AND objsubid = 0
),

Expand All @@ -178,6 +201,11 @@ WITH
-- typedelim
FROM
pg_catalog.pg_type AS t
INNER JOIN
-- Until the schema is made part of our model of types we only consider
-- those defined in the public schema.
unqualified_schemas_for_types_and_procedures as q
ON (t.typnamespace = q.schema_id)
WHERE
-- We currently filter out pseudo (polymorphic) types, because our schema
-- can only deal with monomorphic types.
Expand All @@ -198,41 +226,42 @@ WITH
-- 'r' for range
-- 'm' for multi-range
)
AND NOT (
-- Exclude arrays (see 'array_types' below).
t.typelem != 0 -- whether you can subscript into the type
AND typcategory = 'A' -- The parsers considers this type an array for
-- the purpose of selecting preferred implicit casts.
AND NOT
(
-- Exclude arrays (see 'array_types' below).
t.typelem != 0 -- whether you can subscript into the type
AND typcategory = 'A' -- The parsers considers this type an array for
-- the purpose of selecting preferred implicit casts.
)
-- Ignore types that are (primarily) for internal postgres use.
-- This is a good candidate for a configuration option.
AND NOT typname IN
(
'aclitem',
'cid',
'gidx',
'name',
'oid',
'pg_dependencies',
'pg_lsn',
'pg_mcv_list',
'pg_ndistinct',
'pg_node_tree',
'regclass',
'regcollation',
'regconfig',
'regdictionary',
'regnamespace',
'regoper',
'regoperator',
'regproc',
'regprocedure',
'regrole',
'regtype',
'tid',
'xid',
'xid8'
)
-- Ignore types that are (primarily) for internal postgres use.
-- This is a good candidate for a configuration option.
AND NOT typname IN
(
'aclitem',
'cid',
'gidx',
'name',
'oid',
'pg_dependencies',
'pg_lsn',
'pg_mcv_list',
'pg_ndistinct',
'pg_node_tree',
'regclass',
'regcollation',
'regconfig',
'regdictionary',
'regnamespace',
'regoper',
'regoperator',
'regproc',
'regprocedure',
'regrole',
'regtype',
'tid',
'xid',
'xid8'
)
),
array_types AS
(
Expand All @@ -248,6 +277,11 @@ WITH
scalar_types
AS et
ON (et.type_id = t.typelem)
INNER JOIN
-- Until the schema is made part of our model of types we only consider
-- types defined in the public schema.
unqualified_schemas_for_types_and_procedures
USING (schema_id)
WHERE
-- See 'scalar_types' above
t.typtype NOT IN
Expand Down Expand Up @@ -346,8 +380,14 @@ WITH
INNER JOIN scalar_types
AS ret_type
ON (ret_type.type_id = proc.prorettype)
INNER JOIN
-- Until the schema is made part of our model of types we only consider
-- types defined in the public schema.
unqualified_schemas_for_types_and_procedures
AS q
ON (q.schema_id = proc.pronamespace)
WHERE
ret_type.type_name = 'bool'
ret_type.type_id = 'pg_catalog.bool'::regtype
-- We check that we only consider procedures which take two regular
-- arguments.
AND cardinality(proc.proargtypes) = 2
Expand Down Expand Up @@ -379,7 +419,7 @@ WITH
-- Include only procedures that are explicitly selected.
-- This is controlled by the
-- 'introspectPrefixFunctionComparisonOperators' configuration option.
operator_name = ANY ($4)
operator_name = ANY ($5)
),

-- Operators are recorded across 'pg_proc', pg_operator, and 'pg_aggregate', see
Expand Down Expand Up @@ -412,8 +452,14 @@ WITH
scalar_types
AS t_res
ON (op.oprresult = t_res.type_id)
INNER JOIN
-- Until the schema is made part of our model of operators we only consider
-- those defined in the public schema.
unqualified_schemas_for_types_and_procedures
AS q
ON (q.schema_id = op.oprnamespace)
WHERE
t_res.type_name = 'bool'
t_res.type_id = 'pg_catalog.bool'::regtype
ORDER BY op.oprname
),

Expand Down Expand Up @@ -659,7 +705,7 @@ WITH
v ->> 'operatorName' AS operator_name,
v ->> 'exposedName' AS exposed_name
FROM
jsonb_array_elements($3) AS v
jsonb_array_elements($4) AS v
),

-- Constraints are recorded in 'pg_constraint', see
Expand Down Expand Up @@ -779,7 +825,7 @@ FROM
SELECT
jsonb_object_agg(
CASE
WHEN s.schema_name = ANY ($2)
WHEN unqualified_schemas_for_tables.schema_id IS NOT NULL
THEN rel.relation_name
ELSE s.schema_name || '_' || rel.relation_name
END,
Expand All @@ -803,14 +849,18 @@ FROM
relations
AS rel

LEFT OUTER JOIN
unqualified_schemas_for_tables
USING (schema_id)

LEFT OUTER JOIN
table_comments
AS comm
USING (relation_id)

INNER JOIN schemas
AS s
USING (schema_id)
ON (rel.schema_id = s.schema_id)

-- Columns
INNER JOIN
Expand Down Expand Up @@ -1095,7 +1145,8 @@ FROM
--
-- EXECUTE configuration(
-- '{"information_schema", "tiger", "pg_catalog", "topology"}'::varchar[],
-- '{}'::varchar[],
-- '{"public"}'::varchar[],
-- '{"public", "pg_catalog", "tiger"}'::varchar[],
-- '[
-- {"operatorName": "=", "exposedName": "_eq"},
-- {"operatorName": "!=", "exposedName": "_neq"},
Expand Down
Loading
Loading