diff --git a/psql-srv/src/codec/encoder.rs b/psql-srv/src/codec/encoder.rs index 709ed252ac..b9645f8d19 100644 --- a/psql-srv/src/codec/encoder.rs +++ b/psql-srv/src/codec/encoder.rs @@ -358,7 +358,7 @@ fn encode(message: BackendMessage, dst: &mut BytesMut) -> Result<(), Error> { put_i16(i16::try_from(field_descriptions.len())?, dst); for d in field_descriptions { put_str(&d.field_name, dst); - put_i32(d.table_id, dst); + put_u32(d.table_id, dst); put_i16(d.col_id, dst); put_type(d.data_type, dst)?; put_i16(d.data_type_size, dst); diff --git a/psql-srv/src/lib.rs b/psql-srv/src/lib.rs index 242b13e167..09f46c6fed 100644 --- a/psql-srv/src/lib.rs +++ b/psql-srv/src/lib.rs @@ -123,6 +123,12 @@ pub struct Column { /// The name of the column pub name: SqlIdentifier, + /// The OID of the column's table, if known + pub table_oid: Option, + + /// The attribute number of the column, if known + pub attnum: Option, + /// The type of the column pub col_type: Type, } diff --git a/psql-srv/src/message/backend.rs b/psql-srv/src/message/backend.rs index f5dcc88d58..cadf52d0f4 100644 --- a/psql-srv/src/message/backend.rs +++ b/psql-srv/src/message/backend.rs @@ -128,7 +128,7 @@ pub enum ErrorSeverity { #[derive(Debug, PartialEq, Eq)] pub struct FieldDescription { pub field_name: SqlIdentifier, - pub table_id: i32, + pub table_id: u32, pub col_id: i16, pub data_type: Type, pub data_type_size: i16, diff --git a/psql-srv/src/protocol.rs b/psql-srv/src/protocol.rs index 5a46761f8b..e149e790c4 100644 --- a/psql-srv/src/protocol.rs +++ b/psql-srv/src/protocol.rs @@ -41,7 +41,7 @@ const TYPLEN_32: i16 = 32; const TYPLEN_VARLENA: i16 = -1; const TYPLEN_CSTRING: i16 = -2; // Null-terminated C string const UNKNOWN_COLUMN: i16 = 0; -const UNKNOWN_TABLE: i32 = 0; +const UNKNOWN_TABLE: u32 = 0; /// State machine for an ongoing SASL authentication flow /// @@ -907,8 +907,8 @@ async fn make_field_description( Ok(FieldDescription { field_name: col.name.clone(), - table_id: UNKNOWN_TABLE, - col_id: UNKNOWN_COLUMN, + table_id: col.table_oid.unwrap_or(UNKNOWN_TABLE), + col_id: col.attnum.unwrap_or(UNKNOWN_COLUMN), data_type: col.col_type.clone(), data_type_size, type_modifier: ATTTYPMOD_NONE, @@ -1007,10 +1007,14 @@ mod tests { schema: vec![ Column { name: "col1".into(), + table_oid: None, + attnum: None, col_type: Type::INT4, }, Column { name: "col2".into(), + table_oid: None, + attnum: None, col_type: Type::FLOAT8, }, ], @@ -1039,10 +1043,14 @@ mod tests { row_schema: vec![ Column { name: "col1".into(), + table_oid: None, + attnum: None, col_type: Type::INT4, }, Column { name: "col2".into(), + table_oid: None, + attnum: None, col_type: Type::FLOAT8, }, ], @@ -1064,10 +1072,14 @@ mod tests { schema: vec![ Column { name: "col1".into(), + table_oid: None, + attnum: None, col_type: Type::INT4, }, Column { name: "col2".into(), + table_oid: None, + attnum: None, col_type: Type::FLOAT8, }, ], @@ -1548,10 +1560,14 @@ mod tests { row_schema: vec![ Column { name: "col1".into(), + table_oid: None, + attnum: None, col_type: Type::INT4 }, Column { name: "col2".into(), + table_oid: None, + attnum: None, col_type: Type::FLOAT8 }, ], diff --git a/psql-srv/tests/errors.rs b/psql-srv/tests/errors.rs index 87e7643c0a..b53e722c61 100644 --- a/psql-srv/tests/errors.rs +++ b/psql-srv/tests/errors.rs @@ -60,6 +60,8 @@ impl PsqlBackend for ErrorBackend { param_schema: vec![], row_schema: vec![Column { name: "x".into(), + table_oid: None, + attnum: None, col_type: Type::BOOL, }], }) @@ -76,6 +78,8 @@ impl PsqlBackend for ErrorBackend { ErrorPosition::Serialize => Ok(QueryResponse::Select { schema: vec![Column { name: "x".into(), + table_oid: None, + attnum: None, col_type: Type::BOOL, }], resultset: stream::iter(vec![Err(Error::InternalError("factory".to_owned()))]), diff --git a/readyset-psql/benches/proxy.rs b/readyset-psql/benches/proxy.rs index 7b8ec1c7eb..62eeaeb14f 100644 --- a/readyset-psql/benches/proxy.rs +++ b/readyset-psql/benches/proxy.rs @@ -153,6 +153,8 @@ impl PsqlBackend for Backend { .map(|col| psql_srv::Column { name: col.name().into(), col_type: col.type_().clone(), + table_oid: None, + attnum: None, }) .collect() }) @@ -181,6 +183,8 @@ impl PsqlBackend for Backend { .map(|c| psql_srv::Column { name: c.name().into(), col_type: c.type_().clone(), + table_oid: None, + attnum: None, }) .collect(), }; @@ -223,6 +227,8 @@ impl PsqlBackend for Backend { .map(|col| psql_srv::Column { name: col.name().into(), col_type: col.type_().clone(), + table_oid: None, + attnum: None, }) .collect() }) diff --git a/readyset-psql/src/schema.rs b/readyset-psql/src/schema.rs index 872c61b73e..27d2517a3b 100644 --- a/readyset-psql/src/schema.rs +++ b/readyset-psql/src/schema.rs @@ -15,15 +15,7 @@ pub struct SelectSchema<'a>(pub cl::SelectSchema<'a>); impl<'a> TryFrom> for Vec { type Error = Error; fn try_from(s: SelectSchema) -> Result { - s.0.schema - .iter() - .map(|c| { - Ok(ps::Column { - name: c.column.name.clone(), - col_type: type_to_pgsql(&c.column_type)?, - }) - }) - .collect() + NoriaSchema(&s.0.schema).try_into() } } @@ -50,6 +42,8 @@ impl<'a> TryFrom> for Vec { Ok(ps::Column { name: c.column.name.clone(), col_type: type_to_pgsql(&c.column_type)?, + table_oid: c.base.as_ref().and_then(|b| b.table_oid), + attnum: c.base.as_ref().and_then(|b| b.attnum), }) }) .collect() diff --git a/readyset-psql/src/upstream.rs b/readyset-psql/src/upstream.rs index d96b70fa63..536025c12e 100644 --- a/readyset-psql/src/upstream.rs +++ b/readyset-psql/src/upstream.rs @@ -258,6 +258,10 @@ impl UpstreamDatabase for PostgreSqlUpstream { Ok(Column { name: col.name().into(), col_type: col.type_().clone(), + // TODO: Load the following two fields from upstream, once tokio-postgres + // provides them + table_oid: None, + attnum: None, }) }) .collect::, _>>()?,