Skip to content

Commit

Permalink
agent: defer handling of publications and evolutions that require blo…
Browse files Browse the repository at this point in the history
…cking locks

There's an issue related to locking that's been plaguing publications handling.
We sometimes see deadlocks due to multiple publications that affect the same
`live_specs` rows.

To start from the beginning: for each individual `live_specs` row, we need to
guarantee a sensible order of operations. For example, let's say we have two
different captures into two separate collections, and a single materialization
of both collections. Auto-discovers run for the captures, and result in two
separate publications, one for each of the materialized collections. Each of
those publications will result in an update to the `built_spec` of the
materialization, and it's critical the modifications from one publication don't
clobber those of the other.

We solve this issue today by implicitly locking the "expanded" `live_specs`
rows when we update the `built_spec`s. This is why we occasionally see
deadlocks that cause the agent to crash. We'd like to find another way to
ensure a sensible ordering of operations that doesn't result in deadlocks and
agent crashes.

It's tempting to think of an approach where we first determine the set of specs
that would be affected by a given publication, and use that knowledge to
process publications in an order that avoids simultaneous processing of
publications that would affect the same `live_specs`. But the set of specs
that's affected by a given publication is dependent on the current state of the
drafted and live specs, and is subject to change. We need to do a decent amount
of parsing and validation before we can even identify which "expanded" rows
might be affected by a publication. As long as that's the case, we can really
only answer the question, "can we continue working on this publication right
now?". So the basic idea is to do just that. If we're unable to continue
working on the publication, then just defer it and try a different job in the
meantime.

We can identify publications that affect overlapping `live_specs` rows, we can
rely on postgres row-level locks, as we do today. We just need to add `for
update of live_specs nowait` to the SQL that fetches `live_specs` rows.
If the rows can't be locked immediately, then it'll return a specific error
code, which we can handle in the `agent`. When we see an error with that code,
we handle it specially and leave its `job_status` as `{"type": "queued" }`. It
will be retried after a short delay.

The end result is that the agent should now defer publications that previously
might have deadlocked. It will _not_ try to work on something else in the meantime,
but will instead continue to re-try until the job can run to completion.

The change to `evolutions` started out as accidental, due to the evolutions
handler using the same `resolve_expanded_rows` function. But it turns out that
it's really helpful to prevent background publication failures. Without it,
`evolutions` have tended to create `publications` having stale `expect_pub_id`s
in scenarios where there's multiple captures feeding into one materialization.
Those stale `expect_pub_id`s mean that the publication will fail, and we'll
have to wait for the next auto-discover in order to try again. But deferring
evolutions that affect the same `live_specs` as an in-progress publication
allows the evolution to result in a publication that's much more likely to
succeed the first time.
  • Loading branch information
psFried committed Feb 27, 2024
1 parent 316cf3c commit 43c68be
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 48 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/agent-sql/src/publications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ pub async fn resolve_spec_rows(
on draft_specs.catalog_name = live_specs.catalog_name
where draft_specs.draft_id = $1
order by draft_specs.catalog_name asc
for update of draft_specs, live_specs;
for update of draft_specs, live_specs nowait;
"#,
draft_id as Id,
user_id,
Expand Down Expand Up @@ -447,6 +447,7 @@ pub async fn resolve_expanded_rows(
-- Strip deleted specs which are still reach-able through a dataflow edge,
-- and strip rows already part of the seed set.
where l.spec is not null and l.id not in (select id from seeds)
for update of l nowait
"#,
seed_ids as Vec<Id>,
user_id,
Expand Down
3 changes: 2 additions & 1 deletion crates/agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ base64 = { workspace = true }
bytes = { workspace = true }
chrono = { workspace = true }
clap = { workspace = true }
colored_json = { workspace = true } # Used to render ops::Logs for UI.
colored_json = { workspace = true } # Used to render ops::Logs for UI.
derivative = { workspace = true }
futures = { workspace = true }
humantime-serde = { workspace = true }
itertools = { workspace = true }
lazy_static = { workspace = true }
regex = { workspace = true }
Expand Down
58 changes: 42 additions & 16 deletions crates/agent/src/evolution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub enum JobStatus {
evolved_collections: Vec<EvolvedCollection>,
publication_id: Option<Id>,
},
Queued,
}

#[derive(Serialize, Deserialize, PartialEq, Debug)]
Expand All @@ -66,22 +67,47 @@ impl Handler for EvolutionHandler {
pg_pool: &sqlx::PgPool,
allow_background: bool,
) -> anyhow::Result<HandleResult> {
let mut txn = pg_pool.begin().await?;

let Some(row) = agent_sql::evolutions::dequeue(&mut txn, allow_background).await? else {
return Ok(HandleResult::NoJobs);
};

let time_queued = chrono::Utc::now().signed_duration_since(row.updated_at);
let id: Id = row.id;
let status = process_row(row, &mut txn).await?;
let status = serde_json::to_value(status)?;

tracing::info!(%id, %time_queued, %status, "evolution finished");
agent_sql::evolutions::resolve(id, &status, &mut txn).await?;
txn.commit().await?;

Ok(HandleResult::HadJob)
loop {
let mut txn = pg_pool.begin().await?;

let Some(row) = agent_sql::evolutions::dequeue(&mut txn, allow_background).await?
else {
return Ok(HandleResult::NoJobs);
};

let time_queued = chrono::Utc::now().signed_duration_since(row.updated_at);
let id: Id = row.id;
let process_result = process_row(row, &mut txn).await;
let job_status = match process_result {
Ok(s) => s,
Err(err) if crate::is_acquire_lock_error(&err) => {
tracing::info!(%id, %time_queued, "cannot acquire all row locks for evolution (will retry)");
// Since we failed to acquire a necessary row lock, wait a short
// while and then try again.
txn.rollback().await?;
// The sleep is really just so we don't spam the DB in a busy
// loop. I arrived at these values via the very scientific 😉
// process of reproducing failures using a couple of different
// values and squinting at the logs in my terminal. In
// practice, it's common for another agent process to pick up
// the job while this one is sleeping, which is why I didn't
// see a need for jitter. All agents process the job queue in
// the same order, so the next time any agent polls the
// handler, it should get this same job, since we've released
// the lock on the job row. Evolutions jobs will fail _quite_
// quickly in this scenario, hence the full second.
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
continue;
}
Err(other_err) => return Err(other_err),
};
let status = serde_json::to_value(job_status)?;
tracing::info!(%id, %time_queued, %status, "evolution finished");
agent_sql::evolutions::resolve(id, &status, &mut txn).await?;
txn.commit().await?;

return Ok(HandleResult::HadJob);
}
}

fn table_name(&self) -> &'static str {
Expand Down
12 changes: 12 additions & 0 deletions crates/agent/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,18 @@ lazy_static! {
static ref NAME_VERSION_RE: Regex = Regex::new(r#".*[_-][vV](\d+)$"#).unwrap();
}

/// Returns true if the given error represents a failure to acquire a lock, as indicated
/// by the "sql state" code.
fn is_acquire_lock_error(err: &anyhow::Error) -> bool {
let Some(sql_err) = err.downcast_ref::<sqlx::Error>() else {
return false;
};
sql_err
.as_database_error()
.filter(|e| e.code().as_ref().map(|c| c.as_ref()) == Some("55P03"))
.is_some()
}

/// Takes an existing name and returns a new name with an incremeted version suffix.
/// The name `foo` will become `foo_v2`, and `foo_v2` will become `foo_v3` and so on.
pub fn next_name(current_name: &str) -> String {
Expand Down
77 changes: 51 additions & 26 deletions crates/agent/src/publications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,34 +103,59 @@ impl Handler for PublishHandler {
pg_pool: &sqlx::PgPool,
allow_background: bool,
) -> anyhow::Result<HandleResult> {
let mut txn = pg_pool.begin().await?;

let row: Row = match agent_sql::publications::dequeue(&mut txn, allow_background).await? {
None => return Ok(HandleResult::NoJobs),
Some(row) => row,
};

let delete_draft_id = if !row.dry_run {
Some(row.draft_id)
} else {
None
};

let time_queued = chrono::Utc::now().signed_duration_since(row.updated_at);

let (id, status) = self.process(row, &mut txn, false).await?;
info!(%id, %time_queued, ?status, "finished");

agent_sql::publications::resolve(id, &status, &mut txn).await?;
txn.commit().await?;
loop {
let mut txn = pg_pool.begin().await?;

let row: Row =
match agent_sql::publications::dequeue(&mut txn, allow_background).await? {
None => return Ok(HandleResult::NoJobs),
Some(row) => row,
};
let background = row.background;

let delete_draft_id = if !row.dry_run {
Some(row.draft_id)
} else {
None
};

// As a separate transaction, delete the draft if it has no draft_specs.
// The user could have raced an insertion of a new spec.
if let (Some(delete_draft_id), JobStatus::Success { .. }) = (delete_draft_id, status) {
agent_sql::publications::delete_draft(delete_draft_id, pg_pool).await?;
let time_queued = chrono::Utc::now().signed_duration_since(row.updated_at);
let id = row.pub_id;
let process_result = self.process(row, &mut txn, false).await;

let status = match process_result {
Ok((_, status)) => status,
Err(err) if crate::is_acquire_lock_error(&err) => {
tracing::info!(%id, %time_queued, "cannot acquire all row locks for publication (will retry)");
// Since we failed to acquire a necessary row lock, wait a short
// while and then try again.
txn.rollback().await?;
// The sleep is really just so we don't spam the DB in a busy
// loop. I arrived at these values via the very scientific 😉
// process of reproducing failures using a couple of different
// values and squinting at the logs in my terminal. In
// practice, it's common for another agent process to pick up
// the job while this one is sleeping, which is why I didn't
// see a need for jitter. All agents process the job queue in
// the same order, so the next time any agent polls the
// handler, it should get this same job, since we've released
// the lock on the job row.
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
continue;
}
Err(other_err) => return Err(other_err),
};
info!(%id, %time_queued, ?status, %background, "finished");
agent_sql::publications::resolve(id, &status, &mut txn).await?;
txn.commit().await?;

// As a separate transaction, delete the draft if it has no draft_specs.
// The user could have raced an insertion of a new spec.
if let (Some(delete_draft_id), JobStatus::Success { .. }) = (delete_draft_id, status) {
agent_sql::publications::delete_draft(delete_draft_id, pg_pool).await?;
}
return Ok(HandleResult::HadJob);
}

Ok(HandleResult::HadJob)
}

fn table_name(&self) -> &'static str {
Expand Down
8 changes: 4 additions & 4 deletions crates/agent/src/publications/specs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ pub async fn resolve_specifications(
}

// expanded_specifications returns additional specifications which should be
// included in this publication's build. These specifications are not changed
// by the publication and are read with read-committed transaction semantics,
// but (if not a dry-run) we do re-activate each specification within the
// data-plane with the outcome of this publication's build.
// included in this publication's build. Attempts to acquire a lock on each expanded `live_specs`
// row, with the assumption that we will be updating the `built_spec` and `last_build_id`.
// Returns `Ok(Err(CannotAcquireLock))` if any locks could not be immediately acquired, so that the
// publication can be re-tried later.
pub async fn expanded_specifications(
user_id: Uuid,
spec_rows: &[SpecRow],
Expand Down

0 comments on commit 43c68be

Please sign in to comment.