Skip to content

Commit

Permalink
Native query mutations (#189)
Browse files Browse the repository at this point in the history
### What

We want to support the
[spec](https://hasura.github.io/ndc-spec/specification/mutations/index.html)
definition for mutations over procedures by allowing users to define
native queries that can do mutations.

Native Query Mutations should return the columns of the table using a
`RETURNING` clause,
so we can select those when building the response query.

![Screenshot from 2023-11-28
17-24-58](https://github.com/hasura/ndc-postgres/assets/8547573/9451327a-9ab2-488e-82ae-65a23f3f63b2)


### How

tl;dr, we:

1. revise the `ExecutionPlan` to support a list of mutations as well as
a query.
2. translate each mutation to something like `with "nq" as (<native
query>) select <fields>, <affected row> from <selects>`.
3. implement the execution of the mutations list by running them on
after the other in a transaction and concatenate the results.
4. implement the `/mutation` endpoint to call the relevant parts of the
translation and execution.
  • Loading branch information
Gil Mizrahi authored Dec 1, 2023
1 parent addc37f commit 9e5377b
Show file tree
Hide file tree
Showing 36 changed files with 3,460 additions and 94 deletions.
32 changes: 18 additions & 14 deletions crates/connectors/ndc-postgres/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use super::capabilities;
use super::configuration;
use super::explain;
use super::health;
use super::mutation;
use super::query;
use super::schema;
use super::state;
Expand Down Expand Up @@ -203,21 +204,24 @@ impl connector::Connector for Postgres {
/// This function implements the [mutation endpoint](https://hasura.github.io/ndc-spec/specification/mutations/index.html)
/// from the NDC specification.
async fn mutation(
_configuration: &Self::Configuration,
_state: &Self::State,
_request: models::MutationRequest,
configuration: &Self::Configuration,
state: &Self::State,
request: models::MutationRequest,
) -> Result<JsonResponse<models::MutationResponse>, connector::MutationError> {
tracing::error!(
meta.signal_type = "log",
event.domain = "ndc",
event.name = "Mutations error",
name = "Mutations error",
body = "mutations are currently not implemented",
error = true,
);
Err(connector::MutationError::UnsupportedOperation(
"mutations are currently not implemented".into(),
))
let conf = &configuration.as_runtime_configuration();
mutation::mutation(conf, state, request)
.await
.map_err(|err| {
tracing::error!(
meta.signal_type = "log",
event.domain = "ndc",
event.name = "Mutation error",
name = "Mutation error",
body = %err,
error = true,
);
err
})
}

/// Execute a query
Expand Down
60 changes: 32 additions & 28 deletions crates/connectors/ndc-postgres/src/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use tracing::{info_span, Instrument};

use ndc_sdk::connector;
use ndc_sdk::models;
use query_engine_execution::execution;
use query_engine_sql::sql;
use query_engine_translation::translation;

Expand Down Expand Up @@ -37,34 +36,38 @@ pub async fn explain<'a>(
.await?;

// Execute an explain query.
let (query, plan) =
execution::explain(&state.pool, &state.database_info, &state.metrics, plan)
.instrument(info_span!("Explain query"))
.await
.map_err(|err| match err {
execution::Error::Query(err) => {
tracing::error!("{}", err);
// log error metric
match &err {
execution::QueryError::ReservedVariableName(_) => {
state.metrics.error_metrics.record_invalid_request()
}
execution::QueryError::VariableNotFound(_) => {
state.metrics.error_metrics.record_invalid_request()
}
execution::QueryError::NotSupported(_) => {
state.metrics.error_metrics.record_unsupported_feature()
}
}

connector::ExplainError::Other(err.to_string().into())
let (query, plan) = query_engine_execution::query::explain(
&state.pool,
&state.database_info,
&state.metrics,
plan,
)
.instrument(info_span!("Explain query"))
.await
.map_err(|err| match err {
query_engine_execution::query::Error::Query(err) => {
tracing::error!("{}", err);
// log error metric
match &err {
query_engine_execution::query::QueryError::ReservedVariableName(_) => {
state.metrics.error_metrics.record_invalid_request()
}
query_engine_execution::query::QueryError::VariableNotFound(_) => {
state.metrics.error_metrics.record_invalid_request()
}
execution::Error::DB(err) => {
tracing::error!("{}", err);
state.metrics.error_metrics.record_database_error();
connector::ExplainError::Other(err.to_string().into())
query_engine_execution::query::QueryError::NotSupported(_) => {
state.metrics.error_metrics.record_unsupported_feature()
}
})?;
}

connector::ExplainError::Other(err.to_string().into())
}
query_engine_execution::query::Error::DB(err) => {
tracing::error!("{}", err);
state.metrics.error_metrics.record_database_error();
connector::ExplainError::Other(err.to_string().into())
}
})?;

state.metrics.record_successful_explain();

Expand All @@ -83,7 +86,8 @@ fn plan_query(
configuration: &configuration::RuntimeConfiguration,
state: &state::State,
query_request: models::QueryRequest,
) -> Result<sql::execution_plan::ExecutionPlan, connector::ExplainError> {
) -> Result<sql::execution_plan::ExecutionPlan<sql::execution_plan::Query>, connector::ExplainError>
{
let timer = state.metrics.time_query_plan();
let result =
translation::query::translate(&configuration.metadata, query_request).map_err(|err| {
Expand Down
1 change: 1 addition & 0 deletions crates/connectors/ndc-postgres/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub mod configuration;
pub mod connector;
pub mod explain;
pub mod health;
pub mod mutation;
pub mod query;
pub mod schema;
pub mod state;
130 changes: 130 additions & 0 deletions crates/connectors/ndc-postgres/src/mutation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
//! Implement the `/mutation` endpoint to run a mutation statement against postgres.
//! See the Hasura
//! [Native Data Connector Specification](https://hasura.github.io/ndc-spec/specification/mutations/index.html)
//! for further details.

use tracing::{info_span, Instrument};

use ndc_sdk::connector;
use ndc_sdk::json_response::JsonResponse;
use ndc_sdk::models;
use query_engine_execution;
use query_engine_sql::sql;
use query_engine_translation::translation;

use super::configuration;
use super::state;

/// Execute a mutation
///
/// This function implements the [mutation endpoint](https://hasura.github.io/ndc-spec/specification/mutations/index.html)
/// from the NDC specification.
pub async fn mutation<'a>(
configuration: &configuration::RuntimeConfiguration<'a>,
state: &state::State,
request: models::MutationRequest,
) -> Result<JsonResponse<models::MutationResponse>, connector::MutationError> {
let timer = state.metrics.time_query_total();

// See https://docs.rs/tracing/0.1.29/tracing/span/struct.Span.html#in-asynchronous-code
let result = async move {
tracing::info!(
request_json = serde_json::to_string(&request).unwrap(),
request = ?request
);

let plan = async { plan_mutation(configuration, state, request) }
.instrument(info_span!("Plan mutation"))
.await?;

let result = execute_mutation(state, plan)
.instrument(info_span!("Execute mutation"))
.await?;

state.metrics.record_successful_mutation();
Ok(result)
}
.instrument(info_span!("/mutation"))
.await;

timer.complete_with(result)
}

fn plan_mutation(
configuration: &configuration::RuntimeConfiguration,
state: &state::State,
request: models::MutationRequest,
) -> Result<
sql::execution_plan::ExecutionPlan<sql::execution_plan::Mutations>,
connector::MutationError,
> {
let timer = state.metrics.time_query_plan();
let mutations = request
.operations
.into_iter()
.map(|operation| {
translation::mutation::translate(
configuration.metadata,
operation,
request.collection_relationships.clone(),
)
.map_err(|err| {
tracing::error!("{}", err);
// log metrics
match err {
translation::error::Error::CapabilityNotSupported(_) => {
state.metrics.error_metrics.record_unsupported_capability();
connector::MutationError::UnsupportedOperation(err.to_string())
}
translation::error::Error::NotImplementedYet(_) => {
state.metrics.error_metrics.record_unsupported_feature();
connector::MutationError::UnsupportedOperation(err.to_string())
}
_ => {
state.metrics.error_metrics.record_invalid_request();
connector::MutationError::InvalidRequest(err.to_string())
}
}
})
})
.collect::<Result<Vec<_>, connector::MutationError>>()?;
timer.complete_with(Ok(sql::execution_plan::simple_mutations_execution_plan(
mutations,
)))
}

async fn execute_mutation(
state: &state::State,
plan: sql::execution_plan::ExecutionPlan<sql::execution_plan::Mutations>,
) -> Result<JsonResponse<models::MutationResponse>, connector::MutationError> {
query_engine_execution::mutation::execute(
&state.pool,
&state.database_info,
&state.metrics,
plan,
)
.await
.map(JsonResponse::Serialized)
.map_err(|err| {
tracing::error!("{}", err);
log_err_metrics(state, &err);
connector::MutationError::Other(err.to_string().into())
})
}

fn log_err_metrics(state: &state::State, err: &query_engine_execution::mutation::Error) {
match err {
query_engine_execution::mutation::Error::Query(err) => match &err {
query_engine_execution::mutation::QueryError::NotSupported(_) => {
state.metrics.error_metrics.record_unsupported_feature()
}
},
query_engine_execution::mutation::Error::DB(_) => {
state.metrics.error_metrics.record_database_error();
}
query_engine_execution::mutation::Error::Multiple(err1, err2) => {
log_err_metrics(state, err1);
log_err_metrics(state, err2);
}
}
}
17 changes: 8 additions & 9 deletions crates/connectors/ndc-postgres/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use tracing::{info_span, Instrument};
use ndc_sdk::connector;
use ndc_sdk::json_response::JsonResponse;
use ndc_sdk::models;
use query_engine_execution::execution;
use query_engine_sql::sql;
use query_engine_translation::translation;

Expand Down Expand Up @@ -54,7 +53,7 @@ fn plan_query(
configuration: &configuration::RuntimeConfiguration,
state: &state::State,
query_request: models::QueryRequest,
) -> Result<sql::execution_plan::ExecutionPlan, connector::QueryError> {
) -> Result<sql::execution_plan::ExecutionPlan<sql::execution_plan::Query>, connector::QueryError> {
let timer = state.metrics.time_query_plan();
let result =
translation::query::translate(&configuration.metadata, query_request).map_err(|err| {
Expand All @@ -80,29 +79,29 @@ fn plan_query(

async fn execute_query(
state: &state::State,
plan: sql::execution_plan::ExecutionPlan,
plan: sql::execution_plan::ExecutionPlan<sql::execution_plan::Query>,
) -> Result<JsonResponse<models::QueryResponse>, connector::QueryError> {
execution::execute(&state.pool, &state.database_info, &state.metrics, plan)
query_engine_execution::query::execute(&state.pool, &state.database_info, &state.metrics, plan)
.await
.map(JsonResponse::Serialized)
.map_err(|err| match err {
execution::Error::Query(err) => {
query_engine_execution::query::Error::Query(err) => {
tracing::error!("{}", err);
// log error metric
match &err {
execution::QueryError::ReservedVariableName(_) => {
query_engine_execution::query::QueryError::ReservedVariableName(_) => {
state.metrics.error_metrics.record_invalid_request()
}
execution::QueryError::VariableNotFound(_) => {
query_engine_execution::query::QueryError::VariableNotFound(_) => {
state.metrics.error_metrics.record_invalid_request()
}
execution::QueryError::NotSupported(_) => {
query_engine_execution::query::QueryError::NotSupported(_) => {
state.metrics.error_metrics.record_unsupported_feature()
}
}
connector::QueryError::Other(err.to_string().into())
}
execution::Error::DB(err) => {
query_engine_execution::query::Error::DB(err) => {
tracing::error!("{}", err);
state.metrics.error_metrics.record_database_error();
connector::QueryError::Other(err.to_string().into())
Expand Down
3 changes: 2 additions & 1 deletion crates/query-engine/execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
//! See `/architecture.md#execution` in the repository for more details.

pub mod database_info;
pub mod execution;
pub mod metrics;
pub mod mutation;
pub mod query;
Loading

0 comments on commit 9e5377b

Please sign in to comment.