diff --git a/Cargo.lock b/Cargo.lock index cdc168815a..25d792db42 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5401,6 +5401,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f66ea23a2d0e5734297357705193335e0a957696f34bed2f2faefacb2fec336f" dependencies = [ "bytes", + "chrono", "fallible-iterator 0.2.0", "postgres-protocol", ] @@ -7211,6 +7212,7 @@ name = "spin-factor-outbound-pg" version = "2.8.0-pre0" dependencies = [ "anyhow", + "chrono", "native-tls", "postgres-native-tls", "spin-core", diff --git a/crates/factor-outbound-pg/Cargo.toml b/crates/factor-outbound-pg/Cargo.toml index 5a621b436a..47899aee91 100644 --- a/crates/factor-outbound-pg/Cargo.toml +++ b/crates/factor-outbound-pg/Cargo.toml @@ -6,6 +6,7 @@ edition = { workspace = true } [dependencies] anyhow = { workspace = true } +chrono = "0.4" native-tls = "0.2" postgres-native-tls = "0.5" spin-core = { path = "../core" } @@ -14,7 +15,7 @@ spin-factors = { path = "../factors" } spin-resource-table = { path = "../table" } spin-world = { path = "../world" } tokio = { workspace = true, features = ["rt-multi-thread"] } -tokio-postgres = "0.7" +tokio-postgres = { version = "0.7", features = ["with-chrono-0_4"] } tracing = { workspace = true } [dev-dependencies] diff --git a/crates/factor-outbound-pg/src/client.rs b/crates/factor-outbound-pg/src/client.rs index 06a93a6311..3f0a890a97 100644 --- a/crates/factor-outbound-pg/src/client.rs +++ b/crates/factor-outbound-pg/src/client.rs @@ -2,8 +2,9 @@ use anyhow::{anyhow, Result}; use native_tls::TlsConnector; use postgres_native_tls::MakeTlsConnector; use spin_world::async_trait; -use spin_world::v2::postgres::{self as v2}; -use spin_world::v2::rdbms_types::{Column, DbDataType, DbValue, ParameterValue, RowSet}; +use spin_world::spin::postgres::postgres::{ + self as v3, Column, DbDataType, DbValue, ParameterValue, RowSet, +}; use tokio_postgres::types::Type; use tokio_postgres::{config::SslMode, types::ToSql, Row}; use tokio_postgres::{Client as TokioClient, NoTls, Socket}; @@ -18,13 +19,13 @@ pub trait Client { &self, statement: String, params: Vec, - ) -> Result; + ) -> Result; async fn query( &self, statement: String, params: Vec, - ) -> Result; + ) -> Result; } #[async_trait] @@ -54,33 +55,43 @@ impl Client for TokioClient { &self, statement: String, params: Vec, - ) -> Result { - let params: Vec<&(dyn ToSql + Sync)> = params + ) -> Result { + let params = params .iter() .map(to_sql_parameter) .collect::>>() - .map_err(|e| v2::Error::ValueConversionFailed(format!("{:?}", e)))?; + .map_err(|e| v3::Error::ValueConversionFailed(format!("{:?}", e)))?; - self.execute(&statement, params.as_slice()) + let params_refs: Vec<&(dyn ToSql + Sync)> = params + .iter() + .map(|b| b.as_ref() as &(dyn ToSql + Sync)) + .collect(); + + self.execute(&statement, params_refs.as_slice()) .await - .map_err(|e| v2::Error::QueryFailed(format!("{:?}", e))) + .map_err(|e| v3::Error::QueryFailed(format!("{:?}", e))) } async fn query( &self, statement: String, params: Vec, - ) -> Result { - let params: Vec<&(dyn ToSql + Sync)> = params + ) -> Result { + let params = params .iter() .map(to_sql_parameter) .collect::>>() - .map_err(|e| v2::Error::BadParameter(format!("{:?}", e)))?; + .map_err(|e| v3::Error::BadParameter(format!("{:?}", e)))?; + + let params_refs: Vec<&(dyn ToSql + Sync)> = params + .iter() + .map(|b| b.as_ref() as &(dyn ToSql + Sync)) + .collect(); let results = self - .query(&statement, params.as_slice()) + .query(&statement, params_refs.as_slice()) .await - .map_err(|e| v2::Error::QueryFailed(format!("{:?}", e)))?; + .map_err(|e| v3::Error::QueryFailed(format!("{:?}", e)))?; if results.is_empty() { return Ok(RowSet { @@ -94,7 +105,7 @@ impl Client for TokioClient { .iter() .map(convert_row) .collect::, _>>() - .map_err(|e| v2::Error::QueryFailed(format!("{:?}", e)))?; + .map_err(|e| v3::Error::QueryFailed(format!("{:?}", e)))?; Ok(RowSet { columns, rows }) } @@ -111,22 +122,43 @@ where }); } -fn to_sql_parameter(value: &ParameterValue) -> Result<&(dyn ToSql + Sync)> { +fn to_sql_parameter(value: &ParameterValue) -> Result> { match value { - ParameterValue::Boolean(v) => Ok(v), - ParameterValue::Int32(v) => Ok(v), - ParameterValue::Int64(v) => Ok(v), - ParameterValue::Int8(v) => Ok(v), - ParameterValue::Int16(v) => Ok(v), - ParameterValue::Floating32(v) => Ok(v), - ParameterValue::Floating64(v) => Ok(v), - ParameterValue::Uint8(_) - | ParameterValue::Uint16(_) - | ParameterValue::Uint32(_) - | ParameterValue::Uint64(_) => Err(anyhow!("Postgres does not support unsigned integers")), - ParameterValue::Str(v) => Ok(v), - ParameterValue::Binary(v) => Ok(v), - ParameterValue::DbNull => Ok(&PgNull), + ParameterValue::Boolean(v) => Ok(Box::new(*v)), + ParameterValue::Int32(v) => Ok(Box::new(*v)), + ParameterValue::Int64(v) => Ok(Box::new(*v)), + ParameterValue::Int8(v) => Ok(Box::new(*v)), + ParameterValue::Int16(v) => Ok(Box::new(*v)), + ParameterValue::Floating32(v) => Ok(Box::new(*v)), + ParameterValue::Floating64(v) => Ok(Box::new(*v)), + ParameterValue::Str(v) => Ok(Box::new(v.clone())), + ParameterValue::Binary(v) => Ok(Box::new(v.clone())), + ParameterValue::Date((y, mon, d)) => { + let naive_date = chrono::NaiveDate::from_ymd_opt(*y, (*mon).into(), (*d).into()) + .ok_or_else(|| anyhow!("invalid date y={y}, m={mon}, d={d}"))?; + Ok(Box::new(naive_date)) + } + ParameterValue::Time((h, min, s, ns)) => { + let naive_time = + chrono::NaiveTime::from_hms_nano_opt((*h).into(), (*min).into(), (*s).into(), *ns) + .ok_or_else(|| anyhow!("invalid time {h}:{min}:{s}:{ns}"))?; + Ok(Box::new(naive_time)) + } + ParameterValue::Datetime((y, mon, d, h, min, s, ns)) => { + let naive_date = chrono::NaiveDate::from_ymd_opt(*y, (*mon).into(), (*d).into()) + .ok_or_else(|| anyhow!("invalid date y={y}, m={mon}, d={d}"))?; + let naive_time = + chrono::NaiveTime::from_hms_nano_opt((*h).into(), (*min).into(), (*s).into(), *ns) + .ok_or_else(|| anyhow!("invalid time {h}:{min}:{s}:{ns}"))?; + let dt = chrono::NaiveDateTime::new(naive_date, naive_time); + Ok(Box::new(dt)) + } + ParameterValue::Timestamp(v) => { + let ts = chrono::DateTime::::from_timestamp(*v, 0) + .ok_or_else(|| anyhow!("invalid epoch timestamp {v}"))?; + Ok(Box::new(ts)) + } + ParameterValue::DbNull => Ok(Box::new(PgNull)), } } @@ -155,6 +187,9 @@ fn convert_data_type(pg_type: &Type) -> DbDataType { Type::INT4 => DbDataType::Int32, Type::INT8 => DbDataType::Int64, Type::TEXT | Type::VARCHAR | Type::BPCHAR => DbDataType::Str, + Type::TIMESTAMP | Type::TIMESTAMPTZ => DbDataType::Timestamp, + Type::DATE => DbDataType::Date, + Type::TIME => DbDataType::Time, _ => { tracing::debug!("Couldn't convert Postgres type {} to WIT", pg_type.name(),); DbDataType::Other @@ -162,7 +197,7 @@ fn convert_data_type(pg_type: &Type) -> DbDataType { } } -fn convert_row(row: &Row) -> Result, tokio_postgres::Error> { +fn convert_row(row: &Row) -> anyhow::Result> { let mut result = Vec::with_capacity(row.len()); for index in 0..row.len() { result.push(convert_entry(row, index)?); @@ -170,7 +205,7 @@ fn convert_row(row: &Row) -> Result, tokio_postgres::Error> { Ok(result) } -fn convert_entry(row: &Row, index: usize) -> Result { +fn convert_entry(row: &Row, index: usize) -> anyhow::Result { let column = &row.columns()[index]; let value = match column.type_() { &Type::BOOL => { @@ -229,6 +264,27 @@ fn convert_entry(row: &Row, index: usize) -> Result DbValue::DbNull, } } + &Type::TIMESTAMP | &Type::TIMESTAMPTZ => { + let value: Option = row.try_get(index)?; + match value { + Some(v) => DbValue::Datetime(tuplify_date_time(v)?), + None => DbValue::DbNull, + } + } + &Type::DATE => { + let value: Option = row.try_get(index)?; + match value { + Some(v) => DbValue::Date(tuplify_date(v)?), + None => DbValue::DbNull, + } + } + &Type::TIME => { + let value: Option = row.try_get(index)?; + match value { + Some(v) => DbValue::Time(tuplify_time(v)?), + None => DbValue::DbNull, + } + } t => { tracing::debug!( "Couldn't convert Postgres type {} in column {}", @@ -241,6 +297,41 @@ fn convert_entry(row: &Row, index: usize) -> Result anyhow::Result<(i32, u8, u8, u8, u8, u8, u32)> { + use chrono::{Datelike, Timelike}; + Ok(( + value.year(), + value.month().try_into()?, + value.day().try_into()?, + value.hour().try_into()?, + value.minute().try_into()?, + value.second().try_into()?, + value.nanosecond(), + )) +} + +fn tuplify_date(value: chrono::NaiveDate) -> anyhow::Result<(i32, u8, u8)> { + use chrono::Datelike; + Ok(( + value.year(), + value.month().try_into()?, + value.day().try_into()?, + )) +} + +fn tuplify_time(value: chrono::NaiveTime) -> anyhow::Result<(u8, u8, u8, u32)> { + use chrono::Timelike; + Ok(( + value.hour().try_into()?, + value.minute().try_into()?, + value.second().try_into()?, + value.nanosecond(), + )) +} + /// Although the Postgres crate converts Rust Option::None to Postgres NULL, /// it enforces the type of the Option as it does so. (For example, trying to /// pass an Option::::None to a VARCHAR column fails conversion.) As we diff --git a/crates/factor-outbound-pg/src/host.rs b/crates/factor-outbound-pg/src/host.rs index 09065d0eff..78dff49e25 100644 --- a/crates/factor-outbound-pg/src/host.rs +++ b/crates/factor-outbound-pg/src/host.rs @@ -1,10 +1,10 @@ use anyhow::Result; use spin_core::{async_trait, wasmtime::component::Resource}; +use spin_world::spin::postgres::postgres::{self as v3}; use spin_world::v1::postgres as v1; use spin_world::v1::rdbms_types as v1_types; -use spin_world::v2::postgres::{self as v2, Connection}; -use spin_world::v2::rdbms_types; -use spin_world::v2::rdbms_types::{ParameterValue, RowSet}; +use spin_world::v2::postgres::{self as v2}; +use spin_world::v2::rdbms_types as v2_types; use tracing::field::Empty; use tracing::instrument; use tracing::Level; @@ -13,21 +13,27 @@ use crate::client::Client; use crate::InstanceState; impl InstanceState { - async fn open_connection(&mut self, address: &str) -> Result, v2::Error> { + async fn open_connection( + &mut self, + address: &str, + ) -> Result, v3::Error> { self.connections .push( C::build_client(address) .await - .map_err(|e| v2::Error::ConnectionFailed(format!("{e:?}")))?, + .map_err(|e| v3::Error::ConnectionFailed(format!("{e:?}")))?, ) - .map_err(|_| v2::Error::ConnectionFailed("too many connections".into())) + .map_err(|_| v3::Error::ConnectionFailed("too many connections".into())) .map(Resource::new_own) } - async fn get_client(&mut self, connection: Resource) -> Result<&C, v2::Error> { + async fn get_client( + &mut self, + connection: Resource, + ) -> Result<&C, v3::Error> { self.connections .get(connection.rep()) - .ok_or_else(|| v2::Error::ConnectionFailed("no connection found".into())) + .ok_or_else(|| v3::Error::ConnectionFailed("no connection found".into())) } async fn is_address_allowed(&self, address: &str) -> Result { @@ -59,21 +65,26 @@ impl InstanceState { } } -#[async_trait] -impl v2::Host for InstanceState {} +fn v2_params_to_v3( + params: Vec, +) -> Result, v2::Error> { + params.into_iter().map(|p| p.try_into()).collect() +} #[async_trait] -impl v2::HostConnection for InstanceState { +impl spin_world::spin::postgres::postgres::HostConnection + for InstanceState +{ #[instrument(name = "spin_outbound_pg.open", skip(self, address), err(level = Level::INFO), fields(otel.kind = "client", db.system = "postgresql", db.address = Empty, server.port = Empty, db.namespace = Empty))] - async fn open(&mut self, address: String) -> Result, v2::Error> { + async fn open(&mut self, address: String) -> Result, v3::Error> { spin_factor_outbound_networking::record_address_fields(&address); if !self .is_address_allowed(&address) .await - .map_err(|e| v2::Error::Other(e.to_string()))? + .map_err(|e| v3::Error::Other(e.to_string()))? { - return Err(v2::Error::ConnectionFailed(format!( + return Err(v3::Error::ConnectionFailed(format!( "address {address} is not permitted" ))); } @@ -83,10 +94,10 @@ impl v2::HostConnection for InstanceState { #[instrument(name = "spin_outbound_pg.execute", skip(self, connection, params), err(level = Level::INFO), fields(otel.kind = "client", db.system = "postgresql", otel.name = statement))] async fn execute( &mut self, - connection: Resource, + connection: Resource, statement: String, - params: Vec, - ) -> Result { + params: Vec, + ) -> Result { Ok(self .get_client(connection) .await? @@ -97,10 +108,10 @@ impl v2::HostConnection for InstanceState { #[instrument(name = "spin_outbound_pg.query", skip(self, connection, params), err(level = Level::INFO), fields(otel.kind = "client", db.system = "postgresql", otel.name = statement))] async fn query( &mut self, - connection: Resource, + connection: Resource, statement: String, - params: Vec, - ) -> Result { + params: Vec, + ) -> Result { Ok(self .get_client(connection) .await? @@ -108,22 +119,28 @@ impl v2::HostConnection for InstanceState { .await?) } - async fn drop(&mut self, connection: Resource) -> anyhow::Result<()> { + async fn drop(&mut self, connection: Resource) -> anyhow::Result<()> { self.connections.remove(connection.rep()); Ok(()) } } -impl rdbms_types::Host for InstanceState { +impl v2_types::Host for InstanceState { fn convert_error(&mut self, error: v2::Error) -> Result { Ok(error) } } -/// Delegate a function call to the v2::HostConnection implementation +impl v3::Host for InstanceState { + fn convert_error(&mut self, error: v3::Error) -> Result { + Ok(error) + } +} + +/// Delegate a function call to the v3::HostConnection implementation macro_rules! delegate { ($self:ident.$name:ident($address:expr, $($arg:expr),*)) => {{ - if !$self.is_address_allowed(&$address).await.map_err(|e| v2::Error::Other(e.to_string()))? { + if !$self.is_address_allowed(&$address).await.map_err(|e| v3::Error::Other(e.to_string()))? { return Err(v1::PgError::ConnectionFailed(format!( "address {} is not permitted", $address ))); @@ -132,12 +149,68 @@ macro_rules! delegate { Ok(c) => c, Err(e) => return Err(e.into()), }; - ::$name($self, connection, $($arg),*) + ::$name($self, connection, $($arg),*) .await .map_err(|e| e.into()) }}; } +#[async_trait] +impl v2::Host for InstanceState {} + +#[async_trait] +impl v2::HostConnection for InstanceState { + #[instrument(name = "spin_outbound_pg.open", skip(self, address), err(level = Level::INFO), fields(otel.kind = "client", db.system = "postgresql", db.address = Empty, server.port = Empty, db.namespace = Empty))] + async fn open(&mut self, address: String) -> Result, v2::Error> { + spin_factor_outbound_networking::record_address_fields(&address); + + if !self + .is_address_allowed(&address) + .await + .map_err(|e| v2::Error::Other(e.to_string()))? + { + return Err(v2::Error::ConnectionFailed(format!( + "address {address} is not permitted" + ))); + } + Ok(self.open_connection(&address).await?) + } + + #[instrument(name = "spin_outbound_pg.execute", skip(self, connection, params), err(level = Level::INFO), fields(otel.kind = "client", db.system = "postgresql", otel.name = statement))] + async fn execute( + &mut self, + connection: Resource, + statement: String, + params: Vec, + ) -> Result { + Ok(self + .get_client(connection) + .await? + .execute(statement, v2_params_to_v3(params)?) + .await?) + } + + #[instrument(name = "spin_outbound_pg.query", skip(self, connection, params), err(level = Level::INFO), fields(otel.kind = "client", db.system = "postgresql", otel.name = statement))] + async fn query( + &mut self, + connection: Resource, + statement: String, + params: Vec, + ) -> Result { + Ok(self + .get_client(connection) + .await? + .query(statement, v2_params_to_v3(params)?) + .await? + .into()) + } + + async fn drop(&mut self, connection: Resource) -> anyhow::Result<()> { + self.connections.remove(connection.rep()); + Ok(()) + } +} + #[async_trait] impl v1::Host for InstanceState { async fn execute( @@ -149,7 +222,10 @@ impl v1::Host for InstanceState { delegate!(self.execute( address, statement, - params.into_iter().map(Into::into).collect() + params + .into_iter() + .map(TryInto::try_into) + .collect::, _>>()? )) } @@ -162,7 +238,10 @@ impl v1::Host for InstanceState { delegate!(self.query( address, statement, - params.into_iter().map(Into::into).collect() + params + .into_iter() + .map(TryInto::try_into) + .collect::, _>>()? )) .map(Into::into) } diff --git a/crates/factor-outbound-pg/src/lib.rs b/crates/factor-outbound-pg/src/lib.rs index 4061f4b7f5..4ca3663531 100644 --- a/crates/factor-outbound-pg/src/lib.rs +++ b/crates/factor-outbound-pg/src/lib.rs @@ -23,6 +23,7 @@ impl Factor for OutboundPgFactor { ) -> anyhow::Result<()> { ctx.link_bindings(spin_world::v1::postgres::add_to_linker)?; ctx.link_bindings(spin_world::v2::postgres::add_to_linker)?; + ctx.link_bindings(spin_world::spin::postgres::postgres::add_to_linker)?; Ok(()) } diff --git a/crates/factor-outbound-pg/tests/factor_test.rs b/crates/factor-outbound-pg/tests/factor_test.rs index b765d805f6..ae0ab28767 100644 --- a/crates/factor-outbound-pg/tests/factor_test.rs +++ b/crates/factor-outbound-pg/tests/factor_test.rs @@ -6,10 +6,10 @@ use spin_factor_variables::VariablesFactor; use spin_factors::{anyhow, RuntimeFactors}; use spin_factors_test::{toml, TestEnvironment}; use spin_world::async_trait; -use spin_world::v2::postgres::HostConnection; -use spin_world::v2::postgres::{self as v2}; -use spin_world::v2::rdbms_types::Error as PgError; -use spin_world::v2::rdbms_types::{ParameterValue, RowSet}; +use spin_world::spin::postgres::postgres::Error as PgError; +use spin_world::spin::postgres::postgres::HostConnection; +use spin_world::spin::postgres::postgres::{self as v2}; +use spin_world::spin::postgres::postgres::{ParameterValue, RowSet}; #[derive(RuntimeFactors)] struct TestFactors { diff --git a/crates/world/src/conversions.rs b/crates/world/src/conversions.rs index 29b21f408a..cd75d78a2d 100644 --- a/crates/world/src/conversions.rs +++ b/crates/world/src/conversions.rs @@ -12,6 +12,24 @@ mod rdbms_types { } } + impl From for v1::rdbms_types::Column { + fn from(value: spin::postgres::postgres::Column) -> Self { + v1::rdbms_types::Column { + name: value.name, + data_type: value.data_type.into(), + } + } + } + + impl From for v2::rdbms_types::Column { + fn from(value: spin::postgres::postgres::Column) -> Self { + v2::rdbms_types::Column { + name: value.name, + data_type: value.data_type.into(), + } + } + } + impl From for v1::rdbms_types::DbValue { fn from(value: v2::rdbms_types::DbValue) -> v1::rdbms_types::DbValue { match value { @@ -34,6 +52,108 @@ mod rdbms_types { } } + impl From for v1::rdbms_types::DbValue { + fn from(value: spin::postgres::postgres::DbValue) -> v1::rdbms_types::DbValue { + match value { + spin::postgres::postgres::DbValue::Boolean(b) => { + v1::rdbms_types::DbValue::Boolean(b) + } + spin::postgres::postgres::DbValue::Int8(i) => v1::rdbms_types::DbValue::Int8(i), + spin::postgres::postgres::DbValue::Int16(i) => v1::rdbms_types::DbValue::Int16(i), + spin::postgres::postgres::DbValue::Int32(i) => v1::rdbms_types::DbValue::Int32(i), + spin::postgres::postgres::DbValue::Int64(i) => v1::rdbms_types::DbValue::Int64(i), + spin::postgres::postgres::DbValue::Floating32(r) => { + v1::rdbms_types::DbValue::Floating32(r) + } + spin::postgres::postgres::DbValue::Floating64(r) => { + v1::rdbms_types::DbValue::Floating64(r) + } + spin::postgres::postgres::DbValue::Str(s) => v1::rdbms_types::DbValue::Str(s), + spin::postgres::postgres::DbValue::Binary(b) => v1::rdbms_types::DbValue::Binary(b), + spin::postgres::postgres::DbValue::DbNull => v1::rdbms_types::DbValue::DbNull, + spin::postgres::postgres::DbValue::Unsupported => { + v1::rdbms_types::DbValue::Unsupported + } + _ => v1::rdbms_types::DbValue::Unsupported, + } + } + } + + impl From for v2::rdbms_types::DbValue { + fn from(value: spin::postgres::postgres::DbValue) -> v2::rdbms_types::DbValue { + match value { + spin::postgres::postgres::DbValue::Boolean(b) => { + v2::rdbms_types::DbValue::Boolean(b) + } + spin::postgres::postgres::DbValue::Int8(i) => v2::rdbms_types::DbValue::Int8(i), + spin::postgres::postgres::DbValue::Int16(i) => v2::rdbms_types::DbValue::Int16(i), + spin::postgres::postgres::DbValue::Int32(i) => v2::rdbms_types::DbValue::Int32(i), + spin::postgres::postgres::DbValue::Int64(i) => v2::rdbms_types::DbValue::Int64(i), + spin::postgres::postgres::DbValue::Floating32(r) => { + v2::rdbms_types::DbValue::Floating32(r) + } + spin::postgres::postgres::DbValue::Floating64(r) => { + v2::rdbms_types::DbValue::Floating64(r) + } + spin::postgres::postgres::DbValue::Str(s) => v2::rdbms_types::DbValue::Str(s), + spin::postgres::postgres::DbValue::Binary(b) => v2::rdbms_types::DbValue::Binary(b), + spin::postgres::postgres::DbValue::DbNull => v2::rdbms_types::DbValue::DbNull, + spin::postgres::postgres::DbValue::Unsupported => { + v2::rdbms_types::DbValue::Unsupported + } + _ => v2::rdbms_types::DbValue::Unsupported, + } + } + } + + impl From for v1::rdbms_types::DbDataType { + fn from(value: spin::postgres::postgres::DbDataType) -> v1::rdbms_types::DbDataType { + match value { + spin::postgres::postgres::DbDataType::Boolean => { + v1::rdbms_types::DbDataType::Boolean + } + spin::postgres::postgres::DbDataType::Int8 => v1::rdbms_types::DbDataType::Int8, + spin::postgres::postgres::DbDataType::Int16 => v1::rdbms_types::DbDataType::Int16, + spin::postgres::postgres::DbDataType::Int32 => v1::rdbms_types::DbDataType::Int32, + spin::postgres::postgres::DbDataType::Int64 => v1::rdbms_types::DbDataType::Int64, + spin::postgres::postgres::DbDataType::Floating32 => { + v1::rdbms_types::DbDataType::Floating32 + } + spin::postgres::postgres::DbDataType::Floating64 => { + v1::rdbms_types::DbDataType::Floating64 + } + spin::postgres::postgres::DbDataType::Str => v1::rdbms_types::DbDataType::Str, + spin::postgres::postgres::DbDataType::Binary => v1::rdbms_types::DbDataType::Binary, + spin::postgres::postgres::DbDataType::Other => v1::rdbms_types::DbDataType::Other, + _ => v1::rdbms_types::DbDataType::Other, + } + } + } + + impl From for v2::rdbms_types::DbDataType { + fn from(value: spin::postgres::postgres::DbDataType) -> v2::rdbms_types::DbDataType { + match value { + spin::postgres::postgres::DbDataType::Boolean => { + v2::rdbms_types::DbDataType::Boolean + } + spin::postgres::postgres::DbDataType::Int8 => v2::rdbms_types::DbDataType::Int8, + spin::postgres::postgres::DbDataType::Int16 => v2::rdbms_types::DbDataType::Int16, + spin::postgres::postgres::DbDataType::Int32 => v2::rdbms_types::DbDataType::Int32, + spin::postgres::postgres::DbDataType::Int64 => v2::rdbms_types::DbDataType::Int64, + spin::postgres::postgres::DbDataType::Floating32 => { + v2::rdbms_types::DbDataType::Floating32 + } + spin::postgres::postgres::DbDataType::Floating64 => { + v2::rdbms_types::DbDataType::Floating64 + } + spin::postgres::postgres::DbDataType::Str => v2::rdbms_types::DbDataType::Str, + spin::postgres::postgres::DbDataType::Binary => v2::rdbms_types::DbDataType::Binary, + spin::postgres::postgres::DbDataType::Other => v2::rdbms_types::DbDataType::Other, + _ => v2::rdbms_types::DbDataType::Other, + } + } + } + impl From for v1::rdbms_types::DbDataType { fn from(value: v2::rdbms_types::DbDataType) -> v1::rdbms_types::DbDataType { match value { @@ -100,6 +220,106 @@ mod rdbms_types { } } + impl TryFrom for spin::postgres::postgres::ParameterValue { + type Error = v1::postgres::PgError; + + fn try_from( + value: v1::rdbms_types::ParameterValue, + ) -> Result { + let converted = match value { + v1::rdbms_types::ParameterValue::Boolean(b) => { + spin::postgres::postgres::ParameterValue::Boolean(b) + } + v1::rdbms_types::ParameterValue::Int8(i) => { + spin::postgres::postgres::ParameterValue::Int8(i) + } + v1::rdbms_types::ParameterValue::Int16(i) => { + spin::postgres::postgres::ParameterValue::Int16(i) + } + v1::rdbms_types::ParameterValue::Int32(i) => { + spin::postgres::postgres::ParameterValue::Int32(i) + } + v1::rdbms_types::ParameterValue::Int64(i) => { + spin::postgres::postgres::ParameterValue::Int64(i) + } + v1::rdbms_types::ParameterValue::Uint8(_) + | v1::rdbms_types::ParameterValue::Uint16(_) + | v1::rdbms_types::ParameterValue::Uint32(_) + | v1::rdbms_types::ParameterValue::Uint64(_) => { + return Err(v1::postgres::PgError::ValueConversionFailed( + "Postgres does not support unsigned integers".to_owned(), + )); + } + v1::rdbms_types::ParameterValue::Floating32(r) => { + spin::postgres::postgres::ParameterValue::Floating32(r) + } + v1::rdbms_types::ParameterValue::Floating64(r) => { + spin::postgres::postgres::ParameterValue::Floating64(r) + } + v1::rdbms_types::ParameterValue::Str(s) => { + spin::postgres::postgres::ParameterValue::Str(s) + } + v1::rdbms_types::ParameterValue::Binary(b) => { + spin::postgres::postgres::ParameterValue::Binary(b) + } + v1::rdbms_types::ParameterValue::DbNull => { + spin::postgres::postgres::ParameterValue::DbNull + } + }; + Ok(converted) + } + } + + impl TryFrom for spin::postgres::postgres::ParameterValue { + type Error = v2::rdbms_types::Error; + + fn try_from( + value: v2::rdbms_types::ParameterValue, + ) -> Result { + let converted = match value { + v2::rdbms_types::ParameterValue::Boolean(b) => { + spin::postgres::postgres::ParameterValue::Boolean(b) + } + v2::rdbms_types::ParameterValue::Int8(i) => { + spin::postgres::postgres::ParameterValue::Int8(i) + } + v2::rdbms_types::ParameterValue::Int16(i) => { + spin::postgres::postgres::ParameterValue::Int16(i) + } + v2::rdbms_types::ParameterValue::Int32(i) => { + spin::postgres::postgres::ParameterValue::Int32(i) + } + v2::rdbms_types::ParameterValue::Int64(i) => { + spin::postgres::postgres::ParameterValue::Int64(i) + } + v2::rdbms_types::ParameterValue::Uint8(_) + | v2::rdbms_types::ParameterValue::Uint16(_) + | v2::rdbms_types::ParameterValue::Uint32(_) + | v2::rdbms_types::ParameterValue::Uint64(_) => { + return Err(v2::rdbms_types::Error::ValueConversionFailed( + "Postgres does not support unsigned integers".to_owned(), + )); + } + v2::rdbms_types::ParameterValue::Floating32(r) => { + spin::postgres::postgres::ParameterValue::Floating32(r) + } + v2::rdbms_types::ParameterValue::Floating64(r) => { + spin::postgres::postgres::ParameterValue::Floating64(r) + } + v2::rdbms_types::ParameterValue::Str(s) => { + spin::postgres::postgres::ParameterValue::Str(s) + } + v2::rdbms_types::ParameterValue::Binary(b) => { + spin::postgres::postgres::ParameterValue::Binary(b) + } + v2::rdbms_types::ParameterValue::DbNull => { + spin::postgres::postgres::ParameterValue::DbNull + } + }; + Ok(converted) + } + } + impl From for v1::mysql::MysqlError { fn from(error: v2::rdbms_types::Error) -> v1::mysql::MysqlError { match error { @@ -114,16 +334,72 @@ mod rdbms_types { } } - impl From for v1::postgres::PgError { - fn from(error: v2::rdbms_types::Error) -> v1::postgres::PgError { + impl From for v1::postgres::PgError { + fn from(error: spin::postgres::postgres::Error) -> v1::postgres::PgError { match error { - v2::mysql::Error::ConnectionFailed(e) => v1::postgres::PgError::ConnectionFailed(e), - v2::mysql::Error::BadParameter(e) => v1::postgres::PgError::BadParameter(e), - v2::mysql::Error::QueryFailed(e) => v1::postgres::PgError::QueryFailed(e), - v2::mysql::Error::ValueConversionFailed(e) => { + spin::postgres::postgres::Error::ConnectionFailed(e) => { + v1::postgres::PgError::ConnectionFailed(e) + } + spin::postgres::postgres::Error::BadParameter(e) => { + v1::postgres::PgError::BadParameter(e) + } + spin::postgres::postgres::Error::QueryFailed(e) => { + v1::postgres::PgError::QueryFailed(e) + } + spin::postgres::postgres::Error::ValueConversionFailed(e) => { v1::postgres::PgError::ValueConversionFailed(e) } - v2::mysql::Error::Other(e) => v1::postgres::PgError::OtherError(e), + spin::postgres::postgres::Error::Other(e) => v1::postgres::PgError::OtherError(e), + } + } + } + + impl From for v2::rdbms_types::Error { + fn from(error: spin::postgres::postgres::Error) -> v2::rdbms_types::Error { + match error { + spin::postgres::postgres::Error::ConnectionFailed(e) => { + v2::rdbms_types::Error::ConnectionFailed(e) + } + spin::postgres::postgres::Error::BadParameter(e) => { + v2::rdbms_types::Error::BadParameter(e) + } + spin::postgres::postgres::Error::QueryFailed(e) => { + v2::rdbms_types::Error::QueryFailed(e) + } + spin::postgres::postgres::Error::ValueConversionFailed(e) => { + v2::rdbms_types::Error::ValueConversionFailed(e) + } + spin::postgres::postgres::Error::Other(e) => v2::rdbms_types::Error::Other(e), + } + } + } +} + +mod postgres { + use super::*; + + impl From for v1::postgres::RowSet { + fn from(value: spin::postgres::postgres::RowSet) -> v1::postgres::RowSet { + v1::mysql::RowSet { + columns: value.columns.into_iter().map(Into::into).collect(), + rows: value + .rows + .into_iter() + .map(|r| r.into_iter().map(Into::into).collect()) + .collect(), + } + } + } + + impl From for v2::rdbms_types::RowSet { + fn from(value: spin::postgres::postgres::RowSet) -> v2::rdbms_types::RowSet { + v2::rdbms_types::RowSet { + columns: value.columns.into_iter().map(Into::into).collect(), + rows: value + .rows + .into_iter() + .map(|r| r.into_iter().map(Into::into).collect()) + .collect(), } } } diff --git a/crates/world/src/lib.rs b/crates/world/src/lib.rs index 676ec3e54a..4aa2ecbd0c 100644 --- a/crates/world/src/lib.rs +++ b/crates/world/src/lib.rs @@ -1,4 +1,5 @@ #![allow(missing_docs)] +#![allow(non_camel_case_types)] // bindgen emits Host_Pre and Host_Indices pub use async_trait::async_trait; @@ -28,6 +29,7 @@ wasmtime::component::bindgen!({ "fermyon:spin/sqlite@2.0.0/error" => v2::sqlite::Error, "fermyon:spin/sqlite/error" => v1::sqlite::Error, "fermyon:spin/variables@2.0.0/error" => v2::variables::Error, + "spin:postgres/postgres/error" => spin::postgres::postgres::Error, "wasi:config/store@0.2.0-draft-2024-09-27/error" => wasi::config::store::Error, }, trappable_imports: true, diff --git a/tests/test-components/components/outbound-postgres/src/lib.rs b/tests/test-components/components/outbound-postgres/src/lib.rs index ace6eca6e5..75c0235b47 100644 --- a/tests/test-components/components/outbound-postgres/src/lib.rs +++ b/tests/test-components/components/outbound-postgres/src/lib.rs @@ -1,6 +1,6 @@ use helper::{ensure, ensure_eq, ensure_matches, ensure_ok}; -use bindings::fermyon::spin2_0_0::{postgres, rdbms_types}; +use bindings::spin::postgres::postgres; helper::define_component!(Component); const DB_URL_ENV: &str = "DB_URL"; @@ -21,15 +21,24 @@ impl Component { let rowset = ensure_ok!(numeric_types(&conn)); ensure!(rowset.rows.iter().all(|r| r.len() == 12)); - ensure_matches!(rowset.rows[0][11], rdbms_types::DbValue::Floating64(f) if f == 1.0); + ensure_matches!(rowset.rows[0][11], postgres::DbValue::Floating64(f) if f == 1.0); let rowset = ensure_ok!(character_types(&conn)); ensure!(rowset.rows.iter().all(|r| r.len() == 3)); - ensure!(matches!(rowset.rows[0][0], rdbms_types::DbValue::Str(ref s) if s == "rvarchar")); + ensure!(matches!(rowset.rows[0][0], postgres::DbValue::Str(ref s) if s == "rvarchar")); + + let rowset = ensure_ok!(date_time_types(&conn)); + ensure!(rowset.rows.iter().all(|r| r.len() == 4)); + ensure_matches!(rowset.rows[0][1], postgres::DbValue::Date((y, m, d)) if y == 2525 && m == 12 && d == 25); + ensure_matches!(rowset.rows[0][2], postgres::DbValue::Time((h, m, s, ns)) if h == 4 && m == 5 && s == 6 && ns == 789_000_000); + ensure_matches!(rowset.rows[0][3], postgres::DbValue::Datetime((y, _, _, h, _, _, ns)) if y == 1989 && h == 1 && ns == 0); + ensure_matches!(rowset.rows[1][1], postgres::DbValue::Date((y, m, d)) if y == 2525 && m == 12 && d == 25); + ensure_matches!(rowset.rows[1][2], postgres::DbValue::Time((h, m, s, ns)) if h == 14 && m == 15 && s == 16 && ns == 17); + ensure_matches!(rowset.rows[1][3], postgres::DbValue::Datetime((y, _, _, h, _, _, ns)) if y == 1989 && h == 1 && ns == 4); let rowset = ensure_ok!(nullable(&conn)); ensure!(rowset.rows.iter().all(|r| r.len() == 1)); - ensure!(matches!(rowset.rows[0][0], rdbms_types::DbValue::DbNull)); + ensure!(matches!(rowset.rows[0][0], postgres::DbValue::DbNull)); let pid1 = format!("{:?}", ensure_ok!(pg_backend_pid(&conn))); let pid2 = format!("{:?}", ensure_ok!(pg_backend_pid(&conn))); @@ -117,6 +126,59 @@ fn character_types(conn: &postgres::Connection) -> Result Result { + let create_table_sql = r#" + CREATE TEMPORARY TABLE test_date_time_types ( + index int2, + rdate date NOT NULL, + rtime time NOT NULL, + rtimestamp timestamp NOT NULL + ); + "#; + + conn.execute(create_table_sql, &[])?; + + // We will use this to test that we correctly decode "known good" + // Postgres database values. (This validates our decoding logic + // independently of our encoding logic.) + let insert_sql_pg_literals = r#" + INSERT INTO test_date_time_types + (index, rdate, rtime, rtimestamp) + VALUES + (1, date '2525-12-25', time '04:05:06.789', timestamp '1989-11-24 01:02:03'); + "#; + + conn.execute(insert_sql_pg_literals, &[])?; + + // We will use this to test that we correctly encode Spin ParameterValue + // objects. (In conjunction with knowing that our decode logic is good, + // this validates our encode logic.) + let insert_sql_spin_parameters = r#" + INSERT INTO test_date_time_types + (index, rdate, rtime, rtimestamp) + VALUES + (2, $1, $2, $3); + "#; + + let date_pv = postgres::ParameterValue::Date((2525, 12, 25)); + let time_pv = postgres::ParameterValue::Time((14, 15, 16, 17)); + let ts_pv = postgres::ParameterValue::Datetime((1989, 11, 24, 1, 2, 3, 4)); + conn.execute(insert_sql_spin_parameters, &[date_pv, time_pv, ts_pv])?; + + let sql = r#" + SELECT + index, + rdate, + rtime, + rtimestamp + FROM test_date_time_types + ORDER BY index; + "#; + + conn.query(sql, &[]) + +} + fn nullable(conn: &postgres::Connection) -> Result { let create_table_sql = r#" CREATE TEMPORARY TABLE test_nullable ( @@ -133,7 +195,7 @@ fn nullable(conn: &postgres::Connection) -> Result Result Result { +fn pg_backend_pid(conn: &postgres::Connection) -> Result { let sql = "SELECT pg_backend_pid()"; let rowset = conn.query(sql, &[])?; diff --git a/wit/deps/spin-postgres@3.0.0/postgres.wit b/wit/deps/spin-postgres@3.0.0/postgres.wit new file mode 100644 index 0000000000..87a2c73a93 --- /dev/null +++ b/wit/deps/spin-postgres@3.0.0/postgres.wit @@ -0,0 +1,96 @@ +package spin:postgres@3.0.0; + +interface postgres { + /// Errors related to interacting with a database. + variant error { + connection-failed(string), + bad-parameter(string), + query-failed(string), + value-conversion-failed(string), + other(string) + } + + /// Data types for a database column + enum db-data-type { + boolean, + int8, + int16, + int32, + int64, + floating32, + floating64, + str, + binary, + date, + time, + datetime, + timestamp, + other, + } + + /// Database values + variant db-value { + boolean(bool), + int8(s8), + int16(s16), + int32(s32), + int64(s64), + floating32(float32), + floating64(float64), + str(string), + binary(list), + date(tuple), // (year, month, day) + time(tuple), // (hour, minute, second, nanosecond) + /// Date-time types are always treated as UTC (without timezone info). + /// The instant is represented as a (year, month, day, hour, minute, second, nanosecond) tuple. + datetime(tuple), + timestamp(s64), // Unix timestamp (seconds since epoch) + db-null, + unsupported, + } + + /// Values used in parameterized queries + variant parameter-value { + boolean(bool), + int8(s8), + int16(s16), + int32(s32), + int64(s64), + floating32(float32), + floating64(float64), + str(string), + binary(list), + date(tuple), // (year, month, day) + time(tuple), // (hour, minute, second, nanosecond) + datetime(tuple), // (year, month, day, hour, minute, second, nanosecond) + timestamp(s64), // Unix timestamp (seconds since epoch) + db-null, + } + + /// A database column + record column { + name: string, + data-type: db-data-type, + } + + /// A database row + type row = list; + + /// A set of database rows + record row-set { + columns: list, + rows: list, + } + + /// A connection to a postgres database. + resource connection { + /// Open a connection to the Postgres instance at `address`. + open: static func(address: string) -> result; + + /// Query the database. + query: func(statement: string, params: list) -> result; + + /// Execute command to the database. + execute: func(statement: string, params: list) -> result; + } +} diff --git a/wit/world.wit b/wit/world.wit index b38f1f4630..55a369174b 100644 --- a/wit/world.wit +++ b/wit/world.wit @@ -9,5 +9,6 @@ world http-trigger { /// The imports needed for a guest to run on a Spin host world platform { include fermyon:spin/platform@2.0.0; + import spin:postgres/postgres@3.0.0; import wasi:config/store@0.2.0-draft-2024-09-27; }