Skip to content

Commit

Permalink
merge: #3153
Browse files Browse the repository at this point in the history
3153: feat: summary table for qualifications r=stack72 a=adamhjk

The get_summary endpoint was taking 30+ seconds to respond in production. This commit introduces the idea of 'summary tables', which get updated when the internal provider for the root prop is updated. We take the raw data, and then compute custom tables that cache the data, getting rid of the need to do the complex queries to get the information.

This is in a pretty raw state - if we decide that summary tables like this are a pattern we want more broadly, it'll certainly need some refactoring (like moving the summary table code into a single module, etc.)

<img src="https://media4.giphy.com/media/NFA61GS9qKZ68/giphy.gif"/>

Co-authored-by: Adam Jacob <[email protected]>
  • Loading branch information
si-bors-ng[bot] and adamhjk authored Jan 11, 2024
2 parents 2b94dce + a763e9c commit ce133b8
Show file tree
Hide file tree
Showing 7 changed files with 239 additions and 68 deletions.
3 changes: 1 addition & 2 deletions lib/dal/src/attribute/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -922,8 +922,7 @@ impl AttributeValue {
/// [`Component`](crate::Component). This means that the [`Prop`](crate::Prop) that the
/// [`InternalProvider`](crate::InternalProvider) is sourcing its data from does not have a
/// parent [`Prop`](crate::Prop).
#[allow(unused)]
async fn is_for_internal_provider_of_root_prop(
pub async fn is_for_internal_provider_of_root_prop(
&mut self,
ctx: &DalContext,
) -> AttributeValueResult<bool> {
Expand Down
4 changes: 3 additions & 1 deletion lib/dal/src/job/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use si_data_nats::NatsError;
use si_data_pg::PgPoolError;
use si_data_pg::{PgError, PgPoolError};
use thiserror::Error;
use tokio::task::JoinError;

Expand Down Expand Up @@ -63,6 +63,8 @@ pub enum JobConsumerError {
#[error("no schema variant found for component {0}")]
NoSchemaVariantFound(ComponentId),
#[error(transparent)]
PgError(#[from] PgError),
#[error(transparent)]
PgPool(#[from] PgPoolError),
#[error(transparent)]
SerdeJson(#[from] serde_json::Error),
Expand Down
90 changes: 90 additions & 0 deletions lib/dal/src/job/definition/dependent_values_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use tokio::task::JoinSet;

use crate::tasks::StatusReceiverClient;
use crate::tasks::StatusReceiverRequest;
use crate::ComponentId;
use crate::FuncBindingReturnValue;
use crate::{
job::consumer::{
JobConsumer, JobConsumerError, JobConsumerMetadata, JobConsumerResult, JobInfo,
Expand Down Expand Up @@ -337,6 +339,26 @@ async fn update_value(
ctx.rollback().await?;
}

// If this is a root prop, then we want to update summary tables
if attribute_value
.is_for_internal_provider_of_root_prop(&ctx)
.await?
{
if let Some(fbrv) =
FuncBindingReturnValue::get_by_id(&ctx, &attribute_value.func_binding_return_value_id())
.await?
{
if let Some(component_value_json) = fbrv.unprocessed_value() {
update_summary_tables(
&ctx,
component_value_json,
attribute_value.context.component_id(),
)
.await?;
}
}
}

ctx.commit().await?;

if update_result.is_ok() {
Expand All @@ -346,6 +368,74 @@ async fn update_value(
Ok(())
}

#[instrument(
name = "dependent_values_update.update_summary_tables",
skip_all,
level = "info",
fields(
component.id = %component_id,
)
)]
async fn update_summary_tables(
ctx: &DalContext,
component_value_json: &serde_json::Value,
component_id: ComponentId,
) -> JobConsumerResult<()> {
// Qualification summary table - if we add more summary tables, this should be extracted to its
// own method.
let mut total: i64 = 0;
let mut warned: i64 = 0;
let mut succeeded: i64 = 0;
let mut failed: i64 = 0;
let mut name: String = String::new();

if let Some(component_name) = component_value_json.pointer("/si/name") {
if let Some(component_name_str) = component_name.as_str() {
dbg!(name = String::from(component_name_str));
}
}

if let Some(qualification_map_value) = component_value_json.pointer("/qualification") {
if let Some(qualification_map) = qualification_map_value.as_object() {
for qual_result_map_value in qualification_map.values() {
if let Some(qual_result_map) = qual_result_map_value.as_object() {
if let Some(qual_result) = qual_result_map.get("result") {
if let Some(qual_result_string) = qual_result.as_str() {
total += 1;
match qual_result_string {
"success" => succeeded += 1,
"warning" => warned += 1,
"failure" => failed += 1,
&_ => (),
}
}
}
}
}
}
}
let _row = ctx
.txns()
.await?
.pg()
.query_one(
"SELECT object FROM summary_qualification_update_v1($1, $2, $3, $4, $5, $6, $7, $8)",
&[
ctx.tenancy(),
ctx.visibility(),
&component_id,
&name,
&total,
&warned,
&succeeded,
&failed,
],
)
.await?;

Ok(())
}

impl TryFrom<JobInfo> for DependentValuesUpdate {
type Error = JobConsumerError;

Expand Down
50 changes: 50 additions & 0 deletions lib/dal/src/migrations/U2409__summary_qualifications.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
CREATE TABLE summary_qualifications
(
pk ident primary key default ident_create_v1(),
id ident not null default ident_create_v1(),
component_id ident NOT NULL,
component_name text NOT NULL,
tenancy_workspace_pk ident,
visibility_change_set_pk ident NOT NULL DEFAULT ident_nil_v1(),
visibility_deleted_at timestamp with time zone,
created_at timestamp with time zone NOT NULL DEFAULT CLOCK_TIMESTAMP(),
updated_at timestamp with time zone NOT NULL DEFAULT CLOCK_TIMESTAMP(),
total bigint,
warned bigint,
succeeded bigint,
failed bigint
);

SELECT standard_model_table_constraints_v1('summary_qualifications');
INSERT INTO standard_models (table_name, table_type, history_event_label_base, history_event_message_name)
VALUES ('summary_qualifications', 'model', 'summary_qualifications', 'Summary Qualifications');

CREATE OR REPLACE FUNCTION summary_qualification_update_v1(
this_tenancy jsonb,
this_visibility jsonb,
this_component_id ident,
this_component_name text,
this_total bigint,
this_warned bigint,
this_succeeded bigint,
this_failed bigint,
OUT object json) AS
$$
DECLARE
this_tenancy_record tenancy_record_v1;
this_visibility_record visibility_record_v1;
this_new_row summary_qualifications%ROWTYPE;
BEGIN
this_tenancy_record := tenancy_json_to_columns_v1(this_tenancy);
this_visibility_record := visibility_json_to_columns_v1(this_visibility);

INSERT INTO summary_qualifications
(id, component_id, component_name, tenancy_workspace_pk, visibility_change_set_pk, total, warned, succeeded, failed)
VALUES
(this_component_id, this_component_id, this_component_name, this_tenancy_record.tenancy_workspace_pk, this_visibility_record.visibility_change_set_pk,
this_total, this_warned, this_succeeded, this_failed)
ON CONFLICT (id, tenancy_workspace_pk, visibility_change_set_pk)
DO UPDATE SET component_name = this_component_name, total = this_total, warned = this_warned, succeeded = this_succeeded, failed = this_failed
RETURNING * INTO this_new_row;
END
$$ LANGUAGE PLPGSQL VOLATILE;
95 changes: 35 additions & 60 deletions lib/dal/src/qualification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,28 @@ use crate::func::binding_return_value::FuncBindingReturnValueId;
use crate::{
func::binding_return_value::{FuncBindingReturnValue, FuncBindingReturnValueError},
ws_event::{WsEvent, WsPayload},
Component, ComponentError, ComponentId, DalContext, FuncId, StandardModel, StandardModelError,
ComponentError, ComponentId, DalContext, FuncId, StandardModel, StandardModelError,
WsEventResult,
};
use crate::{standard_model, TransactionsError};

#[derive(Deserialize, Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct QualificationSummaryForComponent {
component_id: ComponentId,
component_name: String,
total: i64,
warned: i64,
succeeded: i64,
failed: i64,
pub component_id: ComponentId,
pub component_name: String,
pub total: i64,
pub warned: i64,
pub succeeded: i64,
pub failed: i64,
}

#[derive(Deserialize, Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct QualificationSummary {
total: i64,
succeeded: i64,
warned: i64,
failed: i64,
components: Vec<QualificationSummaryForComponent>,
pub total: i64,
pub succeeded: i64,
pub warned: i64,
pub failed: i64,
pub components: Vec<QualificationSummaryForComponent>,
}

#[allow(clippy::large_enum_variant)]
Expand All @@ -44,70 +43,46 @@ pub enum QualificationSummaryError {
Pg(#[from] PgError),
#[error(transparent)]
StandardModel(#[from] StandardModelError),
#[error(transparent)]
Transaction(#[from] TransactionsError),
}

pub type QualificationSummaryResult<T> = Result<T, QualificationSummaryError>;

const GET_SUMMARY_QUALIFICATIONS: &str =
include_str!("queries/summary_qualification/get_summary_qualifications.sql");

impl QualificationSummary {
// TODO(nick): turn this into a query for performance. The original version leveraged a query,
// but since qualifications are now on the prop tree, there is no longer a relevant query
// to help here. I'd write it, but the PR replacing the prototypes and resolvers with the prop
// tree is getting huge.
#[instrument(skip_all)]
pub async fn get_summary(ctx: &DalContext) -> QualificationSummaryResult<QualificationSummary> {
let mut component_summaries = Vec::new();
let mut components_succeeded = 0;
let mut components_warned = 0;
let mut components_failed = 0;
let mut total = 0;

for component in Component::list(ctx).await? {
let component_id = *component.id();
let qualifications = Component::list_qualifications(ctx, component_id).await?;

let individual_total = qualifications.len() as i64;
let mut succeeded = 0;
let mut warned = 0;
let mut failed = 0;
for qualification in qualifications {
if let Some(result) = qualification.result {
match result.status {
QualificationSubCheckStatus::Success => succeeded += 1,
QualificationSubCheckStatus::Warning => warned += 1,
QualificationSubCheckStatus::Failure => failed += 1,
QualificationSubCheckStatus::Unknown => {}
}
}
}

let individual_summary = QualificationSummaryForComponent {
component_id,
component_name: component.name(ctx).await?,
total: individual_total,
succeeded,
warned,
failed,
};

// Update counters for all components.
if failed > 0 {
components_failed += 1;
} else if warned > 0 {
components_warned += 1;
} else {
components_succeeded += 1;
}
total += individual_total;

component_summaries.push(individual_summary);
let rows = ctx
.txns()
.await?
.pg()
.query(
GET_SUMMARY_QUALIFICATIONS,
&[ctx.tenancy(), ctx.visibility()],
)
.await?;
let qualification_summary_for_components: Vec<QualificationSummaryForComponent> =
standard_model::objects_from_rows(rows)?;
for component_summary in qualification_summary_for_components.iter() {
components_succeeded += component_summary.succeeded;
components_warned += component_summary.warned;
components_failed += component_summary.failed;
total += 1;
}

Ok(QualificationSummary {
total,
succeeded: components_succeeded,
warned: components_warned,
failed: components_failed,
components: component_summaries,
components: qualification_summary_for_components,
})
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
SELECT row_to_json(a) AS object
FROM (SELECT DISTINCT ON (components.id) components.id AS component_id,
summary_qualifications.component_name,
summary_qualifications.total,
summary_qualifications.warned,
summary_qualifications.succeeded,
summary_qualifications.failed
FROM components
INNER JOIN summary_qualifications ON components.id = summary_qualifications.component_id AND components.visibility_change_set_pk = summary_qualifications.visibility_change_set_pk
WHERE in_tenancy_and_visible_v1($1, $2, components)
ORDER BY components.id, components.visibility_change_set_pk DESC, components.visibility_deleted_at DESC) AS a
Loading

0 comments on commit ce133b8

Please sign in to comment.