Skip to content

Commit

Permalink
agent: evolutions set expect_pub_id in draft_specs
Browse files Browse the repository at this point in the history
Updates the evolutions handler to have it set the `expect_pub_id` column when
creating draft specs. This clears up a minor bit of tech debt, and helps
prevent changes from an auto-discover from clobbering changes from a concurrent
manual publication.
  • Loading branch information
psFried committed Jan 24, 2024
1 parent 98beee9 commit d512439
Show file tree
Hide file tree
Showing 10 changed files with 82 additions and 19 deletions.
1 change: 1 addition & 0 deletions crates/agent-sql/src/evolutions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ where
Ok(())
}

#[derive(Debug)]
pub struct SpecRow {
pub catalog_name: String,
/// The id of the draft spec, or None if it is not already in the draft
Expand Down
3 changes: 3 additions & 0 deletions crates/agent-sql/src/publications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,8 @@ pub struct ExpandedRow {
pub catalog_name: String,
// Last build ID of the live spec.
pub last_build_id: Id,
// Last publication ID of the live spec.
pub last_pub_id: Id,
// Current live specification of this expansion.
// It won't be changed by this publication.
pub live_spec: Json<Box<RawValue>>,
Expand Down Expand Up @@ -419,6 +421,7 @@ pub async fn resolve_expanded_rows(
l.id as "live_spec_id!: Id",
l.catalog_name as "catalog_name!",
l.last_build_id as "last_build_id!: Id",
l.last_pub_id as "last_pub_id!: Id",
l.spec as "live_spec!: Json<Box<RawValue>>",
l.spec_type as "live_type!: CatalogType",
(
Expand Down
2 changes: 1 addition & 1 deletion crates/agent/src/discovers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ impl DiscoverHandler {
};

let drafted_spec_count = catalog.spec_count();
draft::upsert_specs(row.draft_id, catalog, txn)
draft::upsert_specs(row.draft_id, catalog, &Default::default(), txn)
.await
.context("inserting draft specs")?;

Expand Down
25 changes: 19 additions & 6 deletions crates/agent/src/draft.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::BTreeMap;

use super::Id;
use agent_sql::{drafts as drafts_sql, CatalogType};
use anyhow::Context;
Expand All @@ -9,7 +11,10 @@ pub struct Error {
pub detail: String,
}

// upsert_specs updates the given draft with specifications of the catalog.
/// upsert_specs updates the given draft with specifications of the catalog.
/// The `expect_pub_ids` parameter is used to lookup the `last_pub_id` by catalog name.
/// For each item in the catalog, if an entry exists in `expect_pub_ids`, then it will
/// be used as the `expect_pub_id` column.
pub async fn upsert_specs(
draft_id: Id,
models::Catalog {
Expand All @@ -19,6 +24,7 @@ pub async fn upsert_specs(
tests,
..
}: models::Catalog,
expect_pub_ids: &BTreeMap<&str, Id>,
txn: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> Result<(), sqlx::Error> {
for (collection, spec) in collections {
Expand All @@ -27,7 +33,7 @@ pub async fn upsert_specs(
collection.as_str(),
spec,
CatalogType::Collection,
None,
expect_pub_ids.get(collection.as_str()).copied(),
txn,
)
.await?;
Expand All @@ -38,7 +44,7 @@ pub async fn upsert_specs(
capture.as_str(),
spec,
CatalogType::Capture,
None,
expect_pub_ids.get(capture.as_str()).copied(),
txn,
)
.await?;
Expand All @@ -49,14 +55,21 @@ pub async fn upsert_specs(
materialization.as_str(),
spec,
CatalogType::Materialization,
None,
expect_pub_ids.get(materialization.as_str()).copied(),
txn,
)
.await?;
}
for (test, steps) in tests {
drafts_sql::upsert_spec(draft_id, test.as_str(), steps, CatalogType::Test, None, txn)
.await?;
drafts_sql::upsert_spec(
draft_id,
test.as_str(),
steps,
CatalogType::Test,
expect_pub_ids.get(test.as_str()).copied(),
txn,
)
.await?;
}

agent_sql::drafts::touch(draft_id, txn).await?;
Expand Down
24 changes: 18 additions & 6 deletions crates/agent/src/evolution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use agent_sql::{
use anyhow::Context;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use std::collections::BTreeSet;
use std::collections::{BTreeMap, BTreeSet};

#[cfg(test)]
mod test;
Expand Down Expand Up @@ -200,7 +200,23 @@ async fn process_row(

tracing::info!(changes=?changed_collections, "evolved catalog");

draft::upsert_specs(draft_id, new_catalog, txn)
// Determine the value of `expect_pub_id` to use for each draft spec.
// For existing draft specs that already have an `expect_pub_id`, we'll
// use that value exactly. Otherwise, we'll use the `last_pub_id` from the live spec.
let mut expect_pub_ids = BTreeMap::new();
for row in expanded_rows.iter() {
expect_pub_ids.insert(row.catalog_name.as_str(), row.last_pub_id);
}
for row in spec_rows.iter() {
// It's possible for spec rows to have neither of these values, since
// they may include drafted specs that are new and unaffected by this
// evolution
if let Some(id) = row.expect_pub_id.or(row.last_pub_id) {
expect_pub_ids.insert(row.catalog_name.as_str(), id);
}
}

draft::upsert_specs(draft_id, new_catalog, &expect_pub_ids, txn)
.await
.context("inserting draft specs")?;

Expand All @@ -225,10 +241,6 @@ async fn process_row(
}
}

// TODO: Update the `expect_pub_id` of any specs that we've added to the draft.
// This is important to do, but is something that I think we can safely defer
// until a future commit.

// Create a publication of the draft, if desired.
let publication_id = if row.auto_publish {
let detail = format!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ expression: new_draft
},
},
),
expect_pub_id: Some(
bbbbbbbbbbbbbbbb,
),
},
Record {
catalog_name: "evolution/CaptureB",
Expand Down Expand Up @@ -64,6 +67,9 @@ expression: new_draft
},
},
),
expect_pub_id: Some(
bbbbbbbbbbbbbbbb,
),
},
Record {
catalog_name: "evolution/CollectionA",
Expand All @@ -88,6 +94,7 @@ expression: new_draft
},
},
),
expect_pub_id: None,
},
Record {
catalog_name: "evolution/CollectionC",
Expand All @@ -113,6 +120,7 @@ expression: new_draft
},
},
),
expect_pub_id: None,
},
Record {
catalog_name: "evolution/MaterializationA",
Expand Down Expand Up @@ -151,6 +159,9 @@ expression: new_draft
},
},
),
expect_pub_id: Some(
bbbbbbbbbbbbbbbb,
),
},
Record {
catalog_name: "evolution/MaterializationB",
Expand Down Expand Up @@ -189,6 +200,9 @@ expression: new_draft
},
},
),
expect_pub_id: Some(
bbbbbbbbbbbbbbbb,
),
},
Record {
catalog_name: "evolution/MaterializationC",
Expand Down Expand Up @@ -217,6 +231,7 @@ expression: new_draft
},
},
),
expect_pub_id: None,
},
Record {
catalog_name: "evolution/NewCollectionB",
Expand All @@ -241,6 +256,7 @@ expression: new_draft
},
},
),
expect_pub_id: None,
},
Record {
catalog_name: "evolution/NewCollectionD",
Expand All @@ -266,5 +282,6 @@ expression: new_draft
},
},
),
expect_pub_id: None,
},
]
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ expression: draft_specs
},
},
),
expect_pub_id: Some(
bbbbbbbbbbbbbbbb,
),
},
Record {
catalog_name: "evolution/CollectionC_v2",
Expand All @@ -58,6 +61,7 @@ expression: draft_specs
},
},
),
expect_pub_id: None,
},
Record {
catalog_name: "evolution/MaterializationA",
Expand Down Expand Up @@ -95,6 +99,9 @@ expression: draft_specs
},
},
),
expect_pub_id: Some(
bbbbbbbbbbbbbbbb,
),
},
Record {
catalog_name: "evolution/MaterializationB",
Expand Down Expand Up @@ -132,5 +139,8 @@ expression: draft_specs
},
},
),
expect_pub_id: Some(
bbbbbbbbbbbbbbbb,
),
},
]
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ expression: draft_specs
},
},
),
expect_pub_id: None,
expect_pub_id: Some(
bbbbbbbbbbbbbbbb,
),
},
]
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,8 @@ expression: draft_specs
},
},
),
expect_pub_id: Some(
cccccccccccccccc,
),
},
]
12 changes: 7 additions & 5 deletions crates/agent/src/evolution/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async fn test_collection_evolution() {

let new_draft = sqlx::query!(
r#"
select catalog_name, spec_type as "spec_type: CatalogType", spec
select catalog_name, spec_type as "spec_type: CatalogType", spec, expect_pub_id as "expect_pub_id: Id"
from draft_specs
where draft_id = '2230000000000000'
order by catalog_name asc
Expand Down Expand Up @@ -192,7 +192,7 @@ async fn evolution_adds_collections_to_the_draft_if_necessary() {

let draft_specs = sqlx::query!(
r#"
select catalog_name, spec_type as "spec_type: CatalogType", spec
select catalog_name, spec_type as "spec_type: CatalogType", spec, expect_pub_id as "expect_pub_id: Id"
from draft_specs
where draft_id = '2230000000000000'
order by catalog_name asc
Expand Down Expand Up @@ -224,9 +224,10 @@ async fn evolution_preserves_changes_already_in_the_draft() {
.execute(&mut txn)
.await
.unwrap();
sqlx::query(r##"insert into draft_specs (draft_id, catalog_name, spec_type, spec) values (
sqlx::query(r##"insert into draft_specs (draft_id, catalog_name, expect_pub_id, spec_type, spec) values (
'2230000000000000',
'evolution/MaterializationA',
'cccccccccccccccc',
'materialization',
'{
"bindings": [
Expand Down Expand Up @@ -271,7 +272,7 @@ async fn evolution_preserves_changes_already_in_the_draft() {

let draft_specs = sqlx::query!(
r#"
select catalog_name, spec_type as "spec_type: CatalogType", spec
select catalog_name, spec_type as "spec_type: CatalogType", spec, expect_pub_id as "expect_pub_id: Id"
from draft_specs
where draft_id = '2230000000000000'
order by catalog_name asc
Expand All @@ -283,7 +284,8 @@ async fn evolution_preserves_changes_already_in_the_draft() {

// We're looking for the new endpoint and resource configs, which ought to
// be preserved, and for the backfill counter to have been incremented still
// (even though it's already larger than the value in live_specs).
// (even though it's already larger than the value in live_specs). Also looking
// for the expect_pub_id from the draft spec to be preserved.
insta::assert_debug_snapshot!(draft_specs);
}

Expand Down

0 comments on commit d512439

Please sign in to comment.