Skip to content

Commit

Permalink
psql: Pass through pg_meta to result columns
Browse files Browse the repository at this point in the history
If we know the PostgreSQL table OID and column attnum of a
column (because we know its ColumnBase, and it came from PG originally),
pass that all the way through to the `table_id` and `column_id` on the
FieldDescription we return in result sets to pg clients.

This is tough to test right know because tokio-postgres doesn't expose
these fields, but next up I'm going to patch it to do so, both so that
we can pass it through for proxied queries and so that we can test it
for cached queries. I have tested this manually using a Clojure REPL and
it does work. Before:

    user=> (jdbc/execute! ds ["select * from t1"])
    [{:a 1, :b nil}]

After:

    user=> (jdbc/execute! ds ["select * from t1"])
    [#:t1{:a 1, :b nil}]

Refs: REA-3380
Change-Id: Ide9867949212eea34ee1ec3575b53cb6f6fd4e66
  • Loading branch information
glittershark committed Aug 22, 2023
1 parent 544bc78 commit ae5b696
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 14 deletions.
2 changes: 1 addition & 1 deletion psql-srv/src/codec/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ fn encode(message: BackendMessage, dst: &mut BytesMut) -> Result<(), Error> {
put_i16(i16::try_from(field_descriptions.len())?, dst);
for d in field_descriptions {
put_str(&d.field_name, dst);
put_i32(d.table_id, dst);
put_u32(d.table_id, dst);
put_i16(d.col_id, dst);
put_type(d.data_type, dst)?;
put_i16(d.data_type_size, dst);
Expand Down
6 changes: 6 additions & 0 deletions psql-srv/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,12 @@ pub struct Column {
/// The name of the column
pub name: SqlIdentifier,

/// The OID of the column's table, if known
pub table_oid: Option<u32>,

/// The attribute number of the column, if known
pub attnum: Option<i16>,

/// The type of the column
pub col_type: Type,
}
Expand Down
2 changes: 1 addition & 1 deletion psql-srv/src/message/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ pub enum ErrorSeverity {
#[derive(Debug, PartialEq, Eq)]
pub struct FieldDescription {
pub field_name: SqlIdentifier,
pub table_id: i32,
pub table_id: u32,
pub col_id: i16,
pub data_type: Type,
pub data_type_size: i16,
Expand Down
22 changes: 19 additions & 3 deletions psql-srv/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ const TYPLEN_32: i16 = 32;
const TYPLEN_VARLENA: i16 = -1;
const TYPLEN_CSTRING: i16 = -2; // Null-terminated C string
const UNKNOWN_COLUMN: i16 = 0;
const UNKNOWN_TABLE: i32 = 0;
const UNKNOWN_TABLE: u32 = 0;

/// State machine for an ongoing SASL authentication flow
///
Expand Down Expand Up @@ -907,8 +907,8 @@ async fn make_field_description<B: PsqlBackend>(

Ok(FieldDescription {
field_name: col.name.clone(),
table_id: UNKNOWN_TABLE,
col_id: UNKNOWN_COLUMN,
table_id: col.table_oid.unwrap_or(UNKNOWN_TABLE),
col_id: col.attnum.unwrap_or(UNKNOWN_COLUMN),
data_type: col.col_type.clone(),
data_type_size,
type_modifier: ATTTYPMOD_NONE,
Expand Down Expand Up @@ -1007,10 +1007,14 @@ mod tests {
schema: vec![
Column {
name: "col1".into(),
table_oid: None,
attnum: None,
col_type: Type::INT4,
},
Column {
name: "col2".into(),
table_oid: None,
attnum: None,
col_type: Type::FLOAT8,
},
],
Expand Down Expand Up @@ -1039,10 +1043,14 @@ mod tests {
row_schema: vec![
Column {
name: "col1".into(),
table_oid: None,
attnum: None,
col_type: Type::INT4,
},
Column {
name: "col2".into(),
table_oid: None,
attnum: None,
col_type: Type::FLOAT8,
},
],
Expand All @@ -1064,10 +1072,14 @@ mod tests {
schema: vec![
Column {
name: "col1".into(),
table_oid: None,
attnum: None,
col_type: Type::INT4,
},
Column {
name: "col2".into(),
table_oid: None,
attnum: None,
col_type: Type::FLOAT8,
},
],
Expand Down Expand Up @@ -1548,10 +1560,14 @@ mod tests {
row_schema: vec![
Column {
name: "col1".into(),
table_oid: None,
attnum: None,
col_type: Type::INT4
},
Column {
name: "col2".into(),
table_oid: None,
attnum: None,
col_type: Type::FLOAT8
},
],
Expand Down
4 changes: 4 additions & 0 deletions psql-srv/tests/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ impl PsqlBackend for ErrorBackend {
param_schema: vec![],
row_schema: vec![Column {
name: "x".into(),
table_oid: None,
attnum: None,
col_type: Type::BOOL,
}],
})
Expand All @@ -76,6 +78,8 @@ impl PsqlBackend for ErrorBackend {
ErrorPosition::Serialize => Ok(QueryResponse::Select {
schema: vec![Column {
name: "x".into(),
table_oid: None,
attnum: None,
col_type: Type::BOOL,
}],
resultset: stream::iter(vec![Err(Error::InternalError("factory".to_owned()))]),
Expand Down
6 changes: 6 additions & 0 deletions readyset-psql/benches/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ impl PsqlBackend for Backend {
.map(|col| psql_srv::Column {
name: col.name().into(),
col_type: col.type_().clone(),
table_oid: None,
attnum: None,
})
.collect()
})
Expand Down Expand Up @@ -181,6 +183,8 @@ impl PsqlBackend for Backend {
.map(|c| psql_srv::Column {
name: c.name().into(),
col_type: c.type_().clone(),
table_oid: None,
attnum: None,
})
.collect(),
};
Expand Down Expand Up @@ -223,6 +227,8 @@ impl PsqlBackend for Backend {
.map(|col| psql_srv::Column {
name: col.name().into(),
col_type: col.type_().clone(),
table_oid: None,
attnum: None,
})
.collect()
})
Expand Down
12 changes: 3 additions & 9 deletions readyset-psql/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,7 @@ pub struct SelectSchema<'a>(pub cl::SelectSchema<'a>);
impl<'a> TryFrom<SelectSchema<'a>> for Vec<ps::Column> {
type Error = Error;
fn try_from(s: SelectSchema) -> Result<Self, Self::Error> {
s.0.schema
.iter()
.map(|c| {
Ok(ps::Column {
name: c.column.name.clone(),
col_type: type_to_pgsql(&c.column_type)?,
})
})
.collect()
NoriaSchema(&s.0.schema).try_into()
}
}

Expand All @@ -50,6 +42,8 @@ impl<'a> TryFrom<NoriaSchema<'a>> for Vec<ps::Column> {
Ok(ps::Column {
name: c.column.name.clone(),
col_type: type_to_pgsql(&c.column_type)?,
table_oid: c.base.as_ref().and_then(|b| b.table_oid),
attnum: c.base.as_ref().and_then(|b| b.attnum),
})
})
.collect()
Expand Down
4 changes: 4 additions & 0 deletions readyset-psql/src/upstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,10 @@ impl UpstreamDatabase for PostgreSqlUpstream {
Ok(Column {
name: col.name().into(),
col_type: col.type_().clone(),
// TODO: Load the following two fields from upstream, once tokio-postgres
// provides them
table_oid: None,
attnum: None,
})
})
.collect::<Result<Vec<_>, _>>()?,
Expand Down

0 comments on commit ae5b696

Please sign in to comment.