Skip to content

Commit

Permalink
Postgres date-time types
Browse files Browse the repository at this point in the history
Signed-off-by: itowlson <[email protected]>
  • Loading branch information
itowlson committed Oct 28, 2024
1 parent 3eaba5f commit c5eb166
Show file tree
Hide file tree
Showing 11 changed files with 688 additions and 77 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/factor-outbound-pg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ edition = { workspace = true }

[dependencies]
anyhow = { workspace = true }
chrono = "0.4"
native-tls = "0.2"
postgres-native-tls = "0.5"
spin-core = { path = "../core" }
Expand All @@ -14,7 +15,7 @@ spin-factors = { path = "../factors" }
spin-resource-table = { path = "../table" }
spin-world = { path = "../world" }
tokio = { workspace = true, features = ["rt-multi-thread"] }
tokio-postgres = "0.7"
tokio-postgres = { version = "0.7", features = ["with-chrono-0_4"] }
tracing = { workspace = true }

[dev-dependencies]
Expand Down
155 changes: 123 additions & 32 deletions crates/factor-outbound-pg/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ use anyhow::{anyhow, Result};
use native_tls::TlsConnector;
use postgres_native_tls::MakeTlsConnector;
use spin_world::async_trait;
use spin_world::v2::postgres::{self as v2};
use spin_world::v2::rdbms_types::{Column, DbDataType, DbValue, ParameterValue, RowSet};
use spin_world::spin::postgres::postgres::{
self as v3, Column, DbDataType, DbValue, ParameterValue, RowSet,
};
use tokio_postgres::types::Type;
use tokio_postgres::{config::SslMode, types::ToSql, Row};
use tokio_postgres::{Client as TokioClient, NoTls, Socket};
Expand All @@ -18,13 +19,13 @@ pub trait Client {
&self,
statement: String,
params: Vec<ParameterValue>,
) -> Result<u64, v2::Error>;
) -> Result<u64, v3::Error>;

async fn query(
&self,
statement: String,
params: Vec<ParameterValue>,
) -> Result<RowSet, v2::Error>;
) -> Result<RowSet, v3::Error>;
}

#[async_trait]
Expand Down Expand Up @@ -54,33 +55,43 @@ impl Client for TokioClient {
&self,
statement: String,
params: Vec<ParameterValue>,
) -> Result<u64, v2::Error> {
let params: Vec<&(dyn ToSql + Sync)> = params
) -> Result<u64, v3::Error> {
let params = params
.iter()
.map(to_sql_parameter)
.collect::<Result<Vec<_>>>()
.map_err(|e| v2::Error::ValueConversionFailed(format!("{:?}", e)))?;
.map_err(|e| v3::Error::ValueConversionFailed(format!("{:?}", e)))?;

self.execute(&statement, params.as_slice())
let params_refs: Vec<&(dyn ToSql + Sync)> = params
.iter()
.map(|b| b.as_ref() as &(dyn ToSql + Sync))
.collect();

self.execute(&statement, params_refs.as_slice())
.await
.map_err(|e| v2::Error::QueryFailed(format!("{:?}", e)))
.map_err(|e| v3::Error::QueryFailed(format!("{:?}", e)))
}

async fn query(
&self,
statement: String,
params: Vec<ParameterValue>,
) -> Result<RowSet, v2::Error> {
let params: Vec<&(dyn ToSql + Sync)> = params
) -> Result<RowSet, v3::Error> {
let params = params
.iter()
.map(to_sql_parameter)
.collect::<Result<Vec<_>>>()
.map_err(|e| v2::Error::BadParameter(format!("{:?}", e)))?;
.map_err(|e| v3::Error::BadParameter(format!("{:?}", e)))?;

let params_refs: Vec<&(dyn ToSql + Sync)> = params
.iter()
.map(|b| b.as_ref() as &(dyn ToSql + Sync))
.collect();

let results = self
.query(&statement, params.as_slice())
.query(&statement, params_refs.as_slice())
.await
.map_err(|e| v2::Error::QueryFailed(format!("{:?}", e)))?;
.map_err(|e| v3::Error::QueryFailed(format!("{:?}", e)))?;

if results.is_empty() {
return Ok(RowSet {
Expand All @@ -94,7 +105,7 @@ impl Client for TokioClient {
.iter()
.map(convert_row)
.collect::<Result<Vec<_>, _>>()
.map_err(|e| v2::Error::QueryFailed(format!("{:?}", e)))?;
.map_err(|e| v3::Error::QueryFailed(format!("{:?}", e)))?;

Ok(RowSet { columns, rows })
}
Expand All @@ -111,22 +122,43 @@ where
});
}

fn to_sql_parameter(value: &ParameterValue) -> Result<&(dyn ToSql + Sync)> {
fn to_sql_parameter(value: &ParameterValue) -> Result<Box<dyn ToSql + Send + Sync>> {
match value {
ParameterValue::Boolean(v) => Ok(v),
ParameterValue::Int32(v) => Ok(v),
ParameterValue::Int64(v) => Ok(v),
ParameterValue::Int8(v) => Ok(v),
ParameterValue::Int16(v) => Ok(v),
ParameterValue::Floating32(v) => Ok(v),
ParameterValue::Floating64(v) => Ok(v),
ParameterValue::Uint8(_)
| ParameterValue::Uint16(_)
| ParameterValue::Uint32(_)
| ParameterValue::Uint64(_) => Err(anyhow!("Postgres does not support unsigned integers")),
ParameterValue::Str(v) => Ok(v),
ParameterValue::Binary(v) => Ok(v),
ParameterValue::DbNull => Ok(&PgNull),
ParameterValue::Boolean(v) => Ok(Box::new(*v)),
ParameterValue::Int32(v) => Ok(Box::new(*v)),
ParameterValue::Int64(v) => Ok(Box::new(*v)),
ParameterValue::Int8(v) => Ok(Box::new(*v)),
ParameterValue::Int16(v) => Ok(Box::new(*v)),
ParameterValue::Floating32(v) => Ok(Box::new(*v)),
ParameterValue::Floating64(v) => Ok(Box::new(*v)),
ParameterValue::Str(v) => Ok(Box::new(v.clone())),
ParameterValue::Binary(v) => Ok(Box::new(v.clone())),
ParameterValue::Date((y, mon, d)) => {
let naive_date = chrono::NaiveDate::from_ymd_opt(*y, (*mon).into(), (*d).into())
.ok_or_else(|| anyhow!("invalid date y={y}, m={mon}, d={d}"))?;
Ok(Box::new(naive_date))
}
ParameterValue::Time((h, min, s, ns)) => {
let naive_time =
chrono::NaiveTime::from_hms_nano_opt((*h).into(), (*min).into(), (*s).into(), *ns)
.ok_or_else(|| anyhow!("invalid time {h}:{min}:{s}:{ns}"))?;
Ok(Box::new(naive_time))
}
ParameterValue::Datetime((y, mon, d, h, min, s, ns)) => {
let naive_date = chrono::NaiveDate::from_ymd_opt(*y, (*mon).into(), (*d).into())
.ok_or_else(|| anyhow!("invalid date y={y}, m={mon}, d={d}"))?;
let naive_time =
chrono::NaiveTime::from_hms_nano_opt((*h).into(), (*min).into(), (*s).into(), *ns)
.ok_or_else(|| anyhow!("invalid time {h}:{min}:{s}:{ns}"))?;
let dt = chrono::NaiveDateTime::new(naive_date, naive_time);
Ok(Box::new(dt))
}
ParameterValue::Timestamp(v) => {
let ts = chrono::DateTime::<chrono::Utc>::from_timestamp(*v, 0)
.ok_or_else(|| anyhow!("invalid epoch timestamp {v}"))?;
Ok(Box::new(ts))
}
ParameterValue::DbNull => Ok(Box::new(PgNull)),
}
}

Expand Down Expand Up @@ -155,22 +187,25 @@ fn convert_data_type(pg_type: &Type) -> DbDataType {
Type::INT4 => DbDataType::Int32,
Type::INT8 => DbDataType::Int64,
Type::TEXT | Type::VARCHAR | Type::BPCHAR => DbDataType::Str,
Type::TIMESTAMP | Type::TIMESTAMPTZ => DbDataType::Timestamp,
Type::DATE => DbDataType::Date,
Type::TIME => DbDataType::Time,
_ => {
tracing::debug!("Couldn't convert Postgres type {} to WIT", pg_type.name(),);
DbDataType::Other
}
}
}

fn convert_row(row: &Row) -> Result<Vec<DbValue>, tokio_postgres::Error> {
fn convert_row(row: &Row) -> anyhow::Result<Vec<DbValue>> {
let mut result = Vec::with_capacity(row.len());
for index in 0..row.len() {
result.push(convert_entry(row, index)?);
}
Ok(result)
}

fn convert_entry(row: &Row, index: usize) -> Result<DbValue, tokio_postgres::Error> {
fn convert_entry(row: &Row, index: usize) -> anyhow::Result<DbValue> {
let column = &row.columns()[index];
let value = match column.type_() {
&Type::BOOL => {
Expand Down Expand Up @@ -229,6 +264,27 @@ fn convert_entry(row: &Row, index: usize) -> Result<DbValue, tokio_postgres::Err
None => DbValue::DbNull,
}
}
&Type::TIMESTAMP | &Type::TIMESTAMPTZ => {
let value: Option<chrono::NaiveDateTime> = row.try_get(index)?;
match value {
Some(v) => DbValue::Datetime(tuplify_date_time(v)?),
None => DbValue::DbNull,
}
}
&Type::DATE => {
let value: Option<chrono::NaiveDate> = row.try_get(index)?;
match value {
Some(v) => DbValue::Date(tuplify_date(v)?),
None => DbValue::DbNull,
}
}
&Type::TIME => {
let value: Option<chrono::NaiveTime> = row.try_get(index)?;
match value {
Some(v) => DbValue::Time(tuplify_time(v)?),
None => DbValue::DbNull,
}
}
t => {
tracing::debug!(
"Couldn't convert Postgres type {} in column {}",
Expand All @@ -241,6 +297,41 @@ fn convert_entry(row: &Row, index: usize) -> Result<DbValue, tokio_postgres::Err
Ok(value)
}

// Functions to convert from the chrono types to the WIT interface tuples
fn tuplify_date_time(
value: chrono::NaiveDateTime,
) -> anyhow::Result<(i32, u8, u8, u8, u8, u8, u32)> {
use chrono::{Datelike, Timelike};
Ok((
value.year(),
value.month().try_into()?,
value.day().try_into()?,
value.hour().try_into()?,
value.minute().try_into()?,
value.second().try_into()?,
value.nanosecond(),
))
}

fn tuplify_date(value: chrono::NaiveDate) -> anyhow::Result<(i32, u8, u8)> {
use chrono::Datelike;
Ok((
value.year(),
value.month().try_into()?,
value.day().try_into()?,
))
}

fn tuplify_time(value: chrono::NaiveTime) -> anyhow::Result<(u8, u8, u8, u32)> {
use chrono::Timelike;
Ok((
value.hour().try_into()?,
value.minute().try_into()?,
value.second().try_into()?,
value.nanosecond(),
))
}

/// Although the Postgres crate converts Rust Option::None to Postgres NULL,
/// it enforces the type of the Option as it does so. (For example, trying to
/// pass an Option::<i32>::None to a VARCHAR column fails conversion.) As we
Expand Down
Loading

0 comments on commit c5eb166

Please sign in to comment.