Skip to content

Commit

Permalink
merge: #3084
Browse files Browse the repository at this point in the history
3084: Optimize components paste r=paulocsanz a=paulocsanz

- Parallelize component pasting (without edges)
- Filter existing attribute values/prototypes in rust (check context in rust)
- Commit more frequently to give the user the feeling of progress

Co-authored-by: Paulo Cabral <[email protected]>
  • Loading branch information
si-bors-ng[bot] and paulocsanz authored Dec 19, 2023
2 parents 59a38c2 + 9263da6 commit b50845b
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 56 deletions.
14 changes: 14 additions & 0 deletions lib/dal/src/attribute/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,20 @@ impl AttributeContext {
) -> AttributeContextResult<Option<crate::ExternalProvider>> {
Ok(crate::ExternalProvider::get_by_id(ctx, &self.external_provider_id()).await?)
}

pub fn check(&self, context: Self) -> bool {
let prop = self.prop_id == context.prop_id
&& context.internal_provider_id.is_none()
&& context.external_provider_id.is_none();
let internal_provider = context.prop_id.is_none()
&& self.internal_provider_id == context.internal_provider_id
&& context.external_provider_id.is_none();
let external_provider = context.prop_id.is_none()
&& context.internal_provider_id.is_none()
&& self.external_provider_id == context.external_provider_id;
let component = self.component_id == context.component_id || self.component_id.is_none();
(prop || internal_provider || external_provider) && component
}
}

#[remain::sorted]
Expand Down
56 changes: 31 additions & 25 deletions lib/dal/src/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ impl Component {
// persist. But it isn't, - our node is anemic.
let node = Node::new(ctx, &NodeKind::Configuration).await?;
node.set_component(ctx, component.id()).await?;

component.set_name(ctx, Some(name.as_ref())).await?;

Ok((component, node))
Expand Down Expand Up @@ -993,34 +994,36 @@ impl Component {
let attribute_values =
AttributeValue::find_by_attr(ctx, "attribute_context_component_id", &component_id)
.await?;
let mut my_attribute_values =
AttributeValue::find_by_attr(ctx, "attribute_context_component_id", self.id()).await?;

let mut pasted_attribute_values_by_original = HashMap::new();

for copied_av in &attribute_values {
let context = AttributeContextBuilder::from(copied_av.context)
.set_component_id(*self.id())
.to_context()?;

// TODO: should we clone the fb and fbrv?
let mut pasted_av = if let Some(mut av) =
AttributeValue::find_for_context(ctx, context.into()).await?
let mut pasted_av = if let Some(av) = my_attribute_values
.iter_mut()
.find(|av| context.check(av.context))
{
av.set_func_binding_id(ctx, copied_av.func_binding_id())
.await?;
av.set_func_binding_return_value_id(ctx, copied_av.func_binding_return_value_id())
.await?;
av.set_key(ctx, copied_av.key()).await?;
av
av.clone()
} else {
dbg!(
AttributeValue::new(
ctx,
copied_av.func_binding_id(),
copied_av.func_binding_return_value_id(),
context,
copied_av.key(),
)
.await
)?
AttributeValue::new(
ctx,
copied_av.func_binding_id(),
copied_av.func_binding_return_value_id(),
context,
copied_av.key(),
)
.await?
};

pasted_av
Expand Down Expand Up @@ -1066,20 +1069,19 @@ impl Component {
let attribute_prototypes =
AttributePrototype::find_by_attr(ctx, "attribute_context_component_id", &component_id)
.await?;
let mut my_attribute_prototypes =
AttributePrototype::find_by_attr(ctx, "attribute_context_component_id", self.id())
.await?;

let mut pasted_attribute_prototypes_by_original = HashMap::new();
for copied_ap in &attribute_prototypes {
let context = AttributeContextBuilder::from(copied_ap.context)
.set_component_id(*self.id())
.to_context()?;

let id = if let Some(mut ap) = AttributePrototype::find_for_context_and_key(
ctx,
context,
&copied_ap.key().map(ToOwned::to_owned),
)
.await?
.pop()
let id = if let Some(ap) = my_attribute_prototypes
.iter_mut()
.find(|av| context.check(av.context) && av.key.as_deref() == copied_ap.key())
{
ap.set_func_id(ctx, copied_ap.func_id()).await?;
ap.set_key(ctx, copied_ap.key()).await?;
Expand Down Expand Up @@ -1137,21 +1139,23 @@ impl Component {
.get(&original_belongs_to_id)
.ok_or(ComponentError::AttributeValueNotFound)?;

ctx
.txns()
ctx.txns()
.await?
.pg()
.query("INSERT INTO attribute_value_belongs_to_attribute_value
.query(
"INSERT INTO attribute_value_belongs_to_attribute_value
(object_id, belongs_to_id, tenancy_workspace_pk, visibility_change_set_pk)
VALUES ($1, $2, $3, $4)
ON CONFLICT (object_id, tenancy_workspace_pk, visibility_change_set_pk) DO NOTHING",
ON CONFLICT (object_id, tenancy_workspace_pk, visibility_change_set_pk)
DO NOTHING",
&[
&object_id,
&belongs_to_id,
&ctx.tenancy().workspace_pk(),
&ctx.visibility().change_set_pk,
],
).await?;
)
.await?;
}

let rows = ctx
Expand All @@ -1176,6 +1180,7 @@ impl Component {
],
)
.await?;

for row in rows {
let original_object_id: AttributeValueId = row.try_get("object_id")?;
let original_belongs_to_id: AttributePrototypeId = row.try_get("belongs_to_id")?;
Expand Down Expand Up @@ -1203,6 +1208,7 @@ impl Component {
],
).await?;
}

Ok(())
}
}
Expand Down
108 changes: 77 additions & 31 deletions lib/sdf-server/src/server/service/diagram/paste_component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,61 +4,68 @@ use chrono::Utc;
use dal::{
action_prototype::ActionPrototypeContextField, func::backend::js_action::ActionRunResult,
Action, ActionKind, ActionPrototype, ActionPrototypeContext, ChangeSet, Component,
ComponentError, ComponentId, Connection, DalContext, Edge, Node, StandardModel, Visibility,
WsEvent,
ComponentError, ComponentId, Connection, DalContextBuilder, Edge, Node, StandardModel,
Visibility, WsEvent,
};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use telemetry::prelude::*;
use tokio::task::JoinSet;
use veritech_client::ResourceStatus;

use super::{DiagramError, DiagramResult};
use crate::server::extract::{AccessBuilder, HandlerContext, PosthogClient};
use crate::server::tracking::track;

#[allow(clippy::too_many_arguments)]
async fn paste_single_component(
ctx: &DalContext,
ctx_builder: DalContextBuilder,
request_ctx: dal::context::AccessBuilder,
visibility: Visibility,
component_id: ComponentId,
offset_x: f64,
offset_y: f64,
original_uri: &Uri,
PosthogClient(posthog_client): &PosthogClient,
) -> DiagramResult<Node> {
let original_comp = Component::get_by_id(ctx, &component_id)
let ctx = ctx_builder.build(request_ctx.build(visibility)).await?;

let original_comp = Component::get_by_id(&ctx, &component_id)
.await?
.ok_or(DiagramError::ComponentNotFound)?;
let original_node = original_comp
.node(ctx)
.node(&ctx)
.await?
.pop()
.ok_or(ComponentError::NodeNotFoundForComponent(component_id))?;

let schema_variant = original_comp
.schema_variant(ctx)
.schema_variant(&ctx)
.await?
.ok_or(DiagramError::SchemaNotFound)?;

let (pasted_comp, mut pasted_node) =
Component::new(ctx, original_comp.name(ctx).await?, *schema_variant.id()).await?;

Component::new(&ctx, original_comp.name(&ctx).await?, *schema_variant.id()).await?;
let x: f64 = original_node.x().parse()?;
let y: f64 = original_node.y().parse()?;
pasted_node
.set_geometry(
ctx,
&ctx,
(x + offset_x).to_string(),
(y + offset_y).to_string(),
original_node.width(),
original_node.height(),
)
.await?;
ctx.commit().await?;

pasted_comp
.clone_attributes_from(ctx, *original_comp.id())
.clone_attributes_from(&ctx, *original_comp.id())
.await?;

pasted_comp
.set_resource_raw(
ctx,
&ctx,
ActionRunResult {
status: ResourceStatus::Ok,
payload: None,
Expand All @@ -70,41 +77,43 @@ async fn paste_single_component(
)
.await?;

ctx.commit().await?;

for prototype in ActionPrototype::find_for_context_and_kind(
ctx,
&ctx,
ActionKind::Create,
ActionPrototypeContext::new_for_context_field(ActionPrototypeContextField::SchemaVariant(
*schema_variant.id(),
)),
)
.await?
{
let action = Action::new(ctx, *prototype.id(), *pasted_comp.id()).await?;
let prototype = action.prototype(ctx).await?;
let action = Action::new(&ctx, *prototype.id(), *pasted_comp.id()).await?;
let prototype = action.prototype(&ctx).await?;

track(
posthog_client,
ctx,
&ctx,
original_uri,
"create_action",
serde_json::json!({
"how": "/diagram/paste_components",
"prototype_id": prototype.id(),
"prototype_kind": prototype.kind(),
"component_id": pasted_comp.id(),
"component_name": pasted_comp.name(ctx).await?,
"component_name": pasted_comp.name(&ctx).await?,
"change_set_pk": ctx.visibility().change_set_pk,
}),
);
}

let schema = pasted_comp
.schema(ctx)
.schema(&ctx)
.await?
.ok_or(DiagramError::SchemaNotFound)?;
track(
posthog_client,
ctx,
&ctx,
original_uri,
"paste_component",
serde_json::json!({
Expand All @@ -113,11 +122,13 @@ async fn paste_single_component(
}),
);

WsEvent::change_set_written(ctx)
WsEvent::change_set_written(&ctx)
.await?
.publish_on_commit(ctx)
.publish_on_commit(&ctx)
.await?;

ctx.commit().await?;

Ok(pasted_node)
}

Expand All @@ -135,7 +146,7 @@ pub struct PasteComponentsRequest {
pub async fn paste_components(
HandlerContext(builder): HandlerContext,
AccessBuilder(request_ctx): AccessBuilder,
posthog_client: PosthogClient,
PosthogClient(posthog_client): PosthogClient,
OriginalUri(original_uri): OriginalUri,
Json(request): Json<PasteComponentsRequest>,
) -> DiagramResult<impl IntoResponse> {
Expand All @@ -156,19 +167,54 @@ pub async fn paste_components(
.publish_on_commit(&ctx)
.await?;
};
ctx.commit().await?;

let mut tasks = JoinSet::new();

let mut pasted_components_by_original = HashMap::new();
for component_id in &request.component_ids {
let pasted_node_id = paste_single_component(
&ctx,
*component_id,
request.offset_x,
request.offset_y,
&original_uri,
&posthog_client,
)
.await?;
pasted_components_by_original.insert(*component_id, pasted_node_id);
let ctx_builder = ctx.to_builder();
let (visibility, component_id) = (request.visibility, *component_id);
let (offset_x, offset_y) = (request.offset_x, request.offset_y);
let (original_uri, posthog_client) =
(original_uri.clone(), PosthogClient(posthog_client.clone()));
tasks.spawn(async move {
let pasted_node = paste_single_component(
ctx_builder,
request_ctx,
visibility,
component_id,
offset_x,
offset_y,
&original_uri,
&posthog_client,
)
.await?;

Ok::<_, DiagramError>((component_id, pasted_node))
});
}

while let Some(result) = tasks.join_next().await {
match result {
Ok(Ok((component_id, pasted_node))) => {
pasted_components_by_original.insert(component_id, pasted_node);
}
Ok(Err(err)) => return Err(err)?,
// Task panicked, let's propagate it
Err(err) => match err.try_into_panic() {
Ok(panic) => {
std::panic::resume_unwind(panic);
}
Err(err) => {
if err.is_cancelled() {
warn!("Paste Componetn was cancelled: {err}");
} else {
error!("Unknown failure in component paste: {err}");
}
}
},
}
}

for component_id in &request.component_ids {
Expand Down

0 comments on commit b50845b

Please sign in to comment.