From 43c68be10cef6f4755853cc244c4f0ed4e87517e Mon Sep 17 00:00:00 2001 From: Phil Date: Wed, 21 Feb 2024 16:39:16 -0500 Subject: [PATCH] agent: defer handling of publications and evolutions that require blocking locks There's an issue related to locking that's been plaguing publications handling. We sometimes see deadlocks due to multiple publications that affect the same `live_specs` rows. To start from the beginning: for each individual `live_specs` row, we need to guarantee a sensible order of operations. For example, let's say we have two different captures into two separate collections, and a single materialization of both collections. Auto-discovers run for the captures, and result in two separate publications, one for each of the materialized collections. Each of those publications will result in an update to the `built_spec` of the materialization, and it's critical the modifications from one publication don't clobber those of the other. We solve this issue today by implicitly locking the "expanded" `live_specs` rows when we update the `built_spec`s. This is why we occasionally see deadlocks that cause the agent to crash. We'd like to find another way to ensure a sensible ordering of operations that doesn't result in deadlocks and agent crashes. It's tempting to think of an approach where we first determine the set of specs that would be affected by a given publication, and use that knowledge to process publications in an order that avoids simultaneous processing of publications that would affect the same `live_specs`. But the set of specs that's affected by a given publication is dependent on the current state of the drafted and live specs, and is subject to change. We need to do a decent amount of parsing and validation before we can even identify which "expanded" rows might be affected by a publication. As long as that's the case, we can really only answer the question, "can we continue working on this publication right now?". So the basic idea is to do just that. If we're unable to continue working on the publication, then just defer it and try a different job in the meantime. We can identify publications that affect overlapping `live_specs` rows, we can rely on postgres row-level locks, as we do today. We just need to add `for update of live_specs nowait` to the SQL that fetches `live_specs` rows. If the rows can't be locked immediately, then it'll return a specific error code, which we can handle in the `agent`. When we see an error with that code, we handle it specially and leave its `job_status` as `{"type": "queued" }`. It will be retried after a short delay. The end result is that the agent should now defer publications that previously might have deadlocked. It will _not_ try to work on something else in the meantime, but will instead continue to re-try until the job can run to completion. The change to `evolutions` started out as accidental, due to the evolutions handler using the same `resolve_expanded_rows` function. But it turns out that it's really helpful to prevent background publication failures. Without it, `evolutions` have tended to create `publications` having stale `expect_pub_id`s in scenarios where there's multiple captures feeding into one materialization. Those stale `expect_pub_id`s mean that the publication will fail, and we'll have to wait for the next auto-discover in order to try again. But deferring evolutions that affect the same `live_specs` as an in-progress publication allows the evolution to result in a publication that's much more likely to succeed the first time. --- Cargo.lock | 1 + crates/agent-sql/src/publications.rs | 3 +- crates/agent/Cargo.toml | 3 +- crates/agent/src/evolution.rs | 58 +++++++++++++------ crates/agent/src/lib.rs | 12 ++++ crates/agent/src/publications.rs | 77 +++++++++++++++++--------- crates/agent/src/publications/specs.rs | 8 +-- 7 files changed, 114 insertions(+), 48 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 224c8fa370..50b36add8b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -49,6 +49,7 @@ dependencies = [ "derivative", "doc", "futures", + "humantime-serde", "insta", "itertools 0.10.5", "json", diff --git a/crates/agent-sql/src/publications.rs b/crates/agent-sql/src/publications.rs index c265c761e4..62cda51d04 100644 --- a/crates/agent-sql/src/publications.rs +++ b/crates/agent-sql/src/publications.rs @@ -282,7 +282,7 @@ pub async fn resolve_spec_rows( on draft_specs.catalog_name = live_specs.catalog_name where draft_specs.draft_id = $1 order by draft_specs.catalog_name asc - for update of draft_specs, live_specs; + for update of draft_specs, live_specs nowait; "#, draft_id as Id, user_id, @@ -447,6 +447,7 @@ pub async fn resolve_expanded_rows( -- Strip deleted specs which are still reach-able through a dataflow edge, -- and strip rows already part of the seed set. where l.spec is not null and l.id not in (select id from seeds) + for update of l nowait "#, seed_ids as Vec, user_id, diff --git a/crates/agent/Cargo.toml b/crates/agent/Cargo.toml index a759888651..33953e46e4 100644 --- a/crates/agent/Cargo.toml +++ b/crates/agent/Cargo.toml @@ -29,9 +29,10 @@ base64 = { workspace = true } bytes = { workspace = true } chrono = { workspace = true } clap = { workspace = true } -colored_json = { workspace = true } # Used to render ops::Logs for UI. +colored_json = { workspace = true } # Used to render ops::Logs for UI. derivative = { workspace = true } futures = { workspace = true } +humantime-serde = { workspace = true } itertools = { workspace = true } lazy_static = { workspace = true } regex = { workspace = true } diff --git a/crates/agent/src/evolution.rs b/crates/agent/src/evolution.rs index 01c21f3ebd..e94f768a5a 100644 --- a/crates/agent/src/evolution.rs +++ b/crates/agent/src/evolution.rs @@ -41,6 +41,7 @@ pub enum JobStatus { evolved_collections: Vec, publication_id: Option, }, + Queued, } #[derive(Serialize, Deserialize, PartialEq, Debug)] @@ -66,22 +67,47 @@ impl Handler for EvolutionHandler { pg_pool: &sqlx::PgPool, allow_background: bool, ) -> anyhow::Result { - let mut txn = pg_pool.begin().await?; - - let Some(row) = agent_sql::evolutions::dequeue(&mut txn, allow_background).await? else { - return Ok(HandleResult::NoJobs); - }; - - let time_queued = chrono::Utc::now().signed_duration_since(row.updated_at); - let id: Id = row.id; - let status = process_row(row, &mut txn).await?; - let status = serde_json::to_value(status)?; - - tracing::info!(%id, %time_queued, %status, "evolution finished"); - agent_sql::evolutions::resolve(id, &status, &mut txn).await?; - txn.commit().await?; - - Ok(HandleResult::HadJob) + loop { + let mut txn = pg_pool.begin().await?; + + let Some(row) = agent_sql::evolutions::dequeue(&mut txn, allow_background).await? + else { + return Ok(HandleResult::NoJobs); + }; + + let time_queued = chrono::Utc::now().signed_duration_since(row.updated_at); + let id: Id = row.id; + let process_result = process_row(row, &mut txn).await; + let job_status = match process_result { + Ok(s) => s, + Err(err) if crate::is_acquire_lock_error(&err) => { + tracing::info!(%id, %time_queued, "cannot acquire all row locks for evolution (will retry)"); + // Since we failed to acquire a necessary row lock, wait a short + // while and then try again. + txn.rollback().await?; + // The sleep is really just so we don't spam the DB in a busy + // loop. I arrived at these values via the very scientific 😉 + // process of reproducing failures using a couple of different + // values and squinting at the logs in my terminal. In + // practice, it's common for another agent process to pick up + // the job while this one is sleeping, which is why I didn't + // see a need for jitter. All agents process the job queue in + // the same order, so the next time any agent polls the + // handler, it should get this same job, since we've released + // the lock on the job row. Evolutions jobs will fail _quite_ + // quickly in this scenario, hence the full second. + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + continue; + } + Err(other_err) => return Err(other_err), + }; + let status = serde_json::to_value(job_status)?; + tracing::info!(%id, %time_queued, %status, "evolution finished"); + agent_sql::evolutions::resolve(id, &status, &mut txn).await?; + txn.commit().await?; + + return Ok(HandleResult::HadJob); + } } fn table_name(&self) -> &'static str { diff --git a/crates/agent/src/lib.rs b/crates/agent/src/lib.rs index a5753aaceb..2108d5a15e 100644 --- a/crates/agent/src/lib.rs +++ b/crates/agent/src/lib.rs @@ -27,6 +27,18 @@ lazy_static! { static ref NAME_VERSION_RE: Regex = Regex::new(r#".*[_-][vV](\d+)$"#).unwrap(); } +/// Returns true if the given error represents a failure to acquire a lock, as indicated +/// by the "sql state" code. +fn is_acquire_lock_error(err: &anyhow::Error) -> bool { + let Some(sql_err) = err.downcast_ref::() else { + return false; + }; + sql_err + .as_database_error() + .filter(|e| e.code().as_ref().map(|c| c.as_ref()) == Some("55P03")) + .is_some() +} + /// Takes an existing name and returns a new name with an incremeted version suffix. /// The name `foo` will become `foo_v2`, and `foo_v2` will become `foo_v3` and so on. pub fn next_name(current_name: &str) -> String { diff --git a/crates/agent/src/publications.rs b/crates/agent/src/publications.rs index 7dd68a483e..be2f8460e1 100644 --- a/crates/agent/src/publications.rs +++ b/crates/agent/src/publications.rs @@ -103,34 +103,59 @@ impl Handler for PublishHandler { pg_pool: &sqlx::PgPool, allow_background: bool, ) -> anyhow::Result { - let mut txn = pg_pool.begin().await?; - - let row: Row = match agent_sql::publications::dequeue(&mut txn, allow_background).await? { - None => return Ok(HandleResult::NoJobs), - Some(row) => row, - }; - - let delete_draft_id = if !row.dry_run { - Some(row.draft_id) - } else { - None - }; - - let time_queued = chrono::Utc::now().signed_duration_since(row.updated_at); - - let (id, status) = self.process(row, &mut txn, false).await?; - info!(%id, %time_queued, ?status, "finished"); - - agent_sql::publications::resolve(id, &status, &mut txn).await?; - txn.commit().await?; + loop { + let mut txn = pg_pool.begin().await?; + + let row: Row = + match agent_sql::publications::dequeue(&mut txn, allow_background).await? { + None => return Ok(HandleResult::NoJobs), + Some(row) => row, + }; + let background = row.background; + + let delete_draft_id = if !row.dry_run { + Some(row.draft_id) + } else { + None + }; - // As a separate transaction, delete the draft if it has no draft_specs. - // The user could have raced an insertion of a new spec. - if let (Some(delete_draft_id), JobStatus::Success { .. }) = (delete_draft_id, status) { - agent_sql::publications::delete_draft(delete_draft_id, pg_pool).await?; + let time_queued = chrono::Utc::now().signed_duration_since(row.updated_at); + let id = row.pub_id; + let process_result = self.process(row, &mut txn, false).await; + + let status = match process_result { + Ok((_, status)) => status, + Err(err) if crate::is_acquire_lock_error(&err) => { + tracing::info!(%id, %time_queued, "cannot acquire all row locks for publication (will retry)"); + // Since we failed to acquire a necessary row lock, wait a short + // while and then try again. + txn.rollback().await?; + // The sleep is really just so we don't spam the DB in a busy + // loop. I arrived at these values via the very scientific 😉 + // process of reproducing failures using a couple of different + // values and squinting at the logs in my terminal. In + // practice, it's common for another agent process to pick up + // the job while this one is sleeping, which is why I didn't + // see a need for jitter. All agents process the job queue in + // the same order, so the next time any agent polls the + // handler, it should get this same job, since we've released + // the lock on the job row. + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + continue; + } + Err(other_err) => return Err(other_err), + }; + info!(%id, %time_queued, ?status, %background, "finished"); + agent_sql::publications::resolve(id, &status, &mut txn).await?; + txn.commit().await?; + + // As a separate transaction, delete the draft if it has no draft_specs. + // The user could have raced an insertion of a new spec. + if let (Some(delete_draft_id), JobStatus::Success { .. }) = (delete_draft_id, status) { + agent_sql::publications::delete_draft(delete_draft_id, pg_pool).await?; + } + return Ok(HandleResult::HadJob); } - - Ok(HandleResult::HadJob) } fn table_name(&self) -> &'static str { diff --git a/crates/agent/src/publications/specs.rs b/crates/agent/src/publications/specs.rs index 9c9ad3cb21..19e9817ef8 100644 --- a/crates/agent/src/publications/specs.rs +++ b/crates/agent/src/publications/specs.rs @@ -69,10 +69,10 @@ pub async fn resolve_specifications( } // expanded_specifications returns additional specifications which should be -// included in this publication's build. These specifications are not changed -// by the publication and are read with read-committed transaction semantics, -// but (if not a dry-run) we do re-activate each specification within the -// data-plane with the outcome of this publication's build. +// included in this publication's build. Attempts to acquire a lock on each expanded `live_specs` +// row, with the assumption that we will be updating the `built_spec` and `last_build_id`. +// Returns `Ok(Err(CannotAcquireLock))` if any locks could not be immediately acquired, so that the +// publication can be re-tried later. pub async fn expanded_specifications( user_id: Uuid, spec_rows: &[SpecRow],