Skip to content

Commit

Permalink
agent: defer handling of publications and evolutions that require blo…
Browse files Browse the repository at this point in the history
…cking locks

There's an issue related to locking that's been plaguing publications handling.
We sometimes see deadlocks due to multiple publications that affect the same
`live_specs` rows.

To start from the beginning: for each individual `live_specs` row, we need to
guarantee a sensible order of operations. For example, let's say we have two
different captures into two separate collections, and a single materialization
of both collections. Auto-discovers run for the captures, and result in two
separate publications, one for each of the materialized collections. Each of
those publications will result in an update to the `built_spec` of the
materialization, and it's critical the modifications from one publication don't
clobber those of the other.

We solve this issue today by implicitly locking the "expanded" `live_specs`
rows when we update the `built_spec`s. This is why we occasionally see
deadlocks that cause the agent to crash. We'd like to find another way to
ensure a sensible ordering of operations that doesn't result in deadlocks and
agent crashes.

It's tempting to think of an approach where we first determine the set of specs
that would be affected by a given publication, and use that knowledge to
process publications in an order that avoids simultaneous processing of
publications that would affect the same `live_specs`. But the set of specs
that's affected by a given publication is dependent on the current state of the
drafted and live specs, and is subject to change. We need to do a decent amount
of parsing and validation before we can even identify which "expanded" rows
might be affected by a publication. As long as that's the case, we can really
only answer the question, "can we continue working on this publication right
now?". So the basic idea is to do just that. If we're unable to continue
working on the publication, then just defer it and try a different job in the
meantime.

We can identify publications that affect overlapping `live_specs` rows, we can
rely on postgres row-level locks, as we do today. We just need to add `for
update of live_specs nowait` to the SQL that fetches `live_specs` rows.
If the rows can't be locked immediately, then it'll return a specific error
code, which we can handle in the `agent`. When we see an error with that code,
we handle it specially and leave its `job_status` as `{"type": "queued" }`. It
will be retried after a short delay.

The end result is that the agent should now defer publications that previously
might have deadlocked. It will _not_ try to work on something else in the meantime,
but will instead continue to re-try until the job can run to completion.

The change to `evolutions` started out as accidental, due to the evolutions
handler using the same `resolve_expanded_rows` function. But it turns out that
it's really helpful to prevent background publication failures. Without it,
`evolutions` have tended to create `publications` having stale `expect_pub_id`s
in scenarios where there's multiple captures feeding into one materialization.
Those stale `expect_pub_id`s mean that the publication will fail, and we'll
have to wait for the next auto-discover in order to try again. But deferring
evolutions that affect the same `live_specs` as an in-progress publication
allows the evolution to result in a publication that's much more likely to
succeed the first time.
  • Loading branch information
psFried committed Feb 27, 2024
1 parent 316cf3c commit 212ae01
Show file tree
Hide file tree
Showing 8 changed files with 159 additions and 90 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/agent-sql/src/publications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ pub async fn resolve_spec_rows(
on draft_specs.catalog_name = live_specs.catalog_name
where draft_specs.draft_id = $1
order by draft_specs.catalog_name asc
for update of draft_specs, live_specs;
for update of draft_specs, live_specs nowait;
"#,
draft_id as Id,
user_id,
Expand Down Expand Up @@ -447,6 +447,7 @@ pub async fn resolve_expanded_rows(
-- Strip deleted specs which are still reach-able through a dataflow edge,
-- and strip rows already part of the seed set.
where l.spec is not null and l.id not in (select id from seeds)
for update of l nowait
"#,
seed_ids as Vec<Id>,
user_id,
Expand Down
3 changes: 2 additions & 1 deletion crates/agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ base64 = { workspace = true }
bytes = { workspace = true }
chrono = { workspace = true }
clap = { workspace = true }
colored_json = { workspace = true } # Used to render ops::Logs for UI.
colored_json = { workspace = true } # Used to render ops::Logs for UI.
derivative = { workspace = true }
futures = { workspace = true }
humantime-serde = { workspace = true }
itertools = { workspace = true }
lazy_static = { workspace = true }
regex = { workspace = true }
Expand Down
67 changes: 45 additions & 22 deletions crates/agent/src/evolution.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::CannotAcquireLock;

use super::{draft, HandleResult, Handler, Id};
use agent_sql::{
evolutions::{Row, SpecRow},
Expand Down Expand Up @@ -41,6 +43,7 @@ pub enum JobStatus {
evolved_collections: Vec<EvolvedCollection>,
publication_id: Option<Id>,
},
Queued,
}

#[derive(Serialize, Deserialize, PartialEq, Debug)]
Expand All @@ -55,8 +58,8 @@ pub struct EvolvedCollection {
pub updated_captures: Vec<String>,
}

fn error_status(err: impl Into<String>) -> anyhow::Result<JobStatus> {
Ok(JobStatus::EvolutionFailed { error: err.into() })
fn error_status(err: impl Into<String>) -> anyhow::Result<Result<JobStatus, CannotAcquireLock>> {
Ok(Ok(JobStatus::EvolutionFailed { error: err.into() }))
}

#[async_trait::async_trait]
Expand All @@ -66,22 +69,37 @@ impl Handler for EvolutionHandler {
pg_pool: &sqlx::PgPool,
allow_background: bool,
) -> anyhow::Result<HandleResult> {
let mut txn = pg_pool.begin().await?;

let Some(row) = agent_sql::evolutions::dequeue(&mut txn, allow_background).await? else {
return Ok(HandleResult::NoJobs);
};

let time_queued = chrono::Utc::now().signed_duration_since(row.updated_at);
let id: Id = row.id;
let status = process_row(row, &mut txn).await?;
let status = serde_json::to_value(status)?;
loop {
let mut txn = pg_pool.begin().await?;

let Some(row) = agent_sql::evolutions::dequeue(&mut txn, allow_background).await?
else {
return Ok(HandleResult::NoJobs);
};

let time_queued = chrono::Utc::now().signed_duration_since(row.updated_at);
let id: Id = row.id;
let process_result = process_row(row, &mut txn).await?;
let job_status = match process_result {
Ok(s) => s,
Err(CannotAcquireLock) => {
tracing::info!(%id, %time_queued, "cannot acquire all row locks for evolution (will retry)");
// Since we failed to acquire a necessary row lock, wait a short while and then
// try again. Evolutions jobs will fail _quite_ quickly in this scenario, so
// wait a whole second before re-trying.
txn.rollback().await?;
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
continue;
}
};
let status = serde_json::to_value(job_status)?;

tracing::info!(%id, %time_queued, %status, "evolution finished");
agent_sql::evolutions::resolve(id, &status, &mut txn).await?;
txn.commit().await?;
tracing::info!(%id, %time_queued, %status, "evolution finished");
agent_sql::evolutions::resolve(id, &status, &mut txn).await?;
txn.commit().await?;

Ok(HandleResult::HadJob)
return Ok(HandleResult::HadJob);
}
}

fn table_name(&self) -> &'static str {
Expand All @@ -93,7 +111,7 @@ impl Handler for EvolutionHandler {
async fn process_row(
row: Row,
txn: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> anyhow::Result<JobStatus> {
) -> anyhow::Result<Result<JobStatus, CannotAcquireLock>> {
let Row {
draft_id,
user_id,
Expand Down Expand Up @@ -134,9 +152,14 @@ async fn process_row(
.collect::<Vec<Id>>();

// Fetch all of the live_specs that directly read from or write to any of these collections.
let expanded_rows = agent_sql::publications::resolve_expanded_rows(user_id, seed_ids, txn)
.await
.context("expanding specifications")?;
let expanded_rows =
match agent_sql::publications::resolve_expanded_rows(user_id, seed_ids, txn).await {
Ok(rows) => rows,
Err(err) if crate::is_acquire_lock_error(&err) => {
return Ok(Err(CannotAcquireLock));
}
Err(other_err) => return Err(other_err).context("expanding specifications"),
};

// Build up catalog of all the possibly affected entities, for easy lookups.
// Note that we put `expanded_rows` first so that `spec_rows` will overwrite
Expand Down Expand Up @@ -267,10 +290,10 @@ async fn process_row(
} else {
None
};
Ok(JobStatus::Success {
Ok(Ok(JobStatus::Success {
evolved_collections: changed_collections,
publication_id,
})
}))
}

fn validate_evolving_collections(
Expand Down
33 changes: 18 additions & 15 deletions crates/agent/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,21 +121,24 @@ struct WrappedHandler {
impl WrappedHandler {
async fn handle_next_job(&mut self, pg_pool: &sqlx::PgPool) -> anyhow::Result<()> {
let allow_background = self.status != Status::PollInteractive;
match self.handler.handle(pg_pool, allow_background).await {
Ok(HandleResult::HadJob) => Ok(()),
Ok(HandleResult::NoJobs) if self.status == Status::PollInteractive => {
tracing::debug!(handler = %self.handler.name(), "handler completed all interactive jobs");
self.status = Status::PollBackground;
Ok(())
}
Ok(HandleResult::NoJobs) => {
tracing::debug!(handler = %self.handler.name(), "handler completed all background jobs");
self.status = Status::Idle;
Ok(())
}
Err(err) => {
tracing::error!(handler = %self.handler.name(), error = ?err, "Error invoking handler");
Err(err)

loop {
match self.handler.handle(pg_pool, allow_background).await {
Ok(HandleResult::HadJob) => return Ok(()),
Ok(HandleResult::NoJobs) if self.status == Status::PollInteractive => {
tracing::debug!(handler = %self.handler.name(), "handler completed all interactive jobs");
self.status = Status::PollBackground;
return Ok(());
}
Ok(HandleResult::NoJobs) => {
tracing::debug!(handler = %self.handler.name(), "handler completed all background jobs");
self.status = Status::Idle;
return Ok(());
}
Err(err) => {
tracing::error!(handler = %self.handler.name(), error = ?err, "Error invoking handler");
return Err(err);
}
}
}
}
Expand Down
14 changes: 14 additions & 0 deletions crates/agent/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,20 @@ lazy_static! {
static ref NAME_VERSION_RE: Regex = Regex::new(r#".*[_-][vV](\d+)$"#).unwrap();
}

/// Indicates that a row-level lock could not immediately be acquired.
/// This is used in conjuction with `for update ... nowait` in SQL queries
/// to bail out and re-try processing jobs that conflict with other in-progess jobs.
#[derive(Debug)]
pub struct CannotAcquireLock;

/// Returns true if the given error represents a failure to acquire a lock, as indicated
/// by the "sql state" code.
fn is_acquire_lock_error(err: &sqlx::Error) -> bool {
err.as_database_error()
.filter(|e| e.code().as_ref().map(|c| c.as_ref()) == Some("55P03"))
.is_some()
}

/// Takes an existing name and returns a new name with an incremeted version suffix.
/// The name `foo` will become `foo_v2`, and `foo_v2` will become `foo_v3` and so on.
pub fn next_name(current_name: &str) -> String {
Expand Down
91 changes: 56 additions & 35 deletions crates/agent/src/publications.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use std::collections::HashSet;

use self::builds::IncompatibleCollection;
use self::validation::ControlPlane;
use super::{
draft::{self, Error},
logs, HandleResult, Handler, Id,
};
use crate::CannotAcquireLock;
use agent_sql::{connector_tags::UnknownConnector, publications::Row, CatalogType};
use anyhow::Context;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use tracing::info;

pub mod builds;
Expand Down Expand Up @@ -103,34 +103,45 @@ impl Handler for PublishHandler {
pg_pool: &sqlx::PgPool,
allow_background: bool,
) -> anyhow::Result<HandleResult> {
let mut txn = pg_pool.begin().await?;

let row: Row = match agent_sql::publications::dequeue(&mut txn, allow_background).await? {
None => return Ok(HandleResult::NoJobs),
Some(row) => row,
};

let delete_draft_id = if !row.dry_run {
Some(row.draft_id)
} else {
None
};

let time_queued = chrono::Utc::now().signed_duration_since(row.updated_at);
loop {
let mut txn = pg_pool.begin().await?;

let row: Row =
match agent_sql::publications::dequeue(&mut txn, allow_background).await? {
None => return Ok(HandleResult::NoJobs),
Some(row) => row,
};
let background = row.background;

let delete_draft_id = if !row.dry_run {
Some(row.draft_id)
} else {
None
};

let (id, status) = self.process(row, &mut txn, false).await?;
info!(%id, %time_queued, ?status, "finished");
let time_queued = chrono::Utc::now().signed_duration_since(row.updated_at);
let id = row.pub_id;
let process_result = self.process(row, &mut txn, false).await?;
info!(%id, %time_queued, result=?process_result, %background, "finished");

let Ok((id, status)) = process_result else {
// Since we failed to acquire a necessary row lock, wait a short while and then
// try again.
txn.rollback().await?;
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
continue;
};

agent_sql::publications::resolve(id, &status, &mut txn).await?;
txn.commit().await?;
agent_sql::publications::resolve(id, &status, &mut txn).await?;
txn.commit().await?;

// As a separate transaction, delete the draft if it has no draft_specs.
// The user could have raced an insertion of a new spec.
if let (Some(delete_draft_id), JobStatus::Success { .. }) = (delete_draft_id, status) {
agent_sql::publications::delete_draft(delete_draft_id, pg_pool).await?;
// As a separate transaction, delete the draft if it has no draft_specs.
// The user could have raced an insertion of a new spec.
if let (Some(delete_draft_id), JobStatus::Success { .. }) = (delete_draft_id, status) {
agent_sql::publications::delete_draft(delete_draft_id, pg_pool).await?;
}
return Ok(HandleResult::HadJob);
}

Ok(HandleResult::HadJob)
}

fn table_name(&self) -> &'static str {
Expand All @@ -145,7 +156,7 @@ impl PublishHandler {
row: Row,
txn: &mut sqlx::Transaction<'_, sqlx::Postgres>,
test_run: bool,
) -> anyhow::Result<(Id, JobStatus)> {
) -> anyhow::Result<Result<(Id, JobStatus), CannotAcquireLock>> {
info!(
%row.created_at,
%row.draft_id,
Expand All @@ -167,8 +178,12 @@ impl PublishHandler {
.await
.context("creating savepoint")?;

let spec_rows =
specs::resolve_specifications(row.draft_id, row.pub_id, row.user_id, txn).await?;
let Ok(spec_rows) =
specs::resolve_specifications(row.draft_id, row.pub_id, row.user_id, txn).await?
else {
// If unable to lock the spec rows, then bail and leave the job queued so we can try again.
return Ok(Err(CannotAcquireLock));
};
tracing::debug!(specs = %spec_rows.len(), "resolved specifications");

// Keep track of which collections are being deleted so that we can account for them
Expand Down Expand Up @@ -284,7 +299,13 @@ impl PublishHandler {
return stop_with_errors(errors, JobStatus::build_failed(Vec::new()), row, txn).await;
}

let expanded_rows = specs::expanded_specifications(row.user_id, &spec_rows, txn).await?;
let Ok(expanded_rows) =
specs::expanded_specifications(row.user_id, &spec_rows, txn).await?
else {
// If unable to lock the spec rows, then bail and leave the job queued so we can try again.
return Ok(Err(CannotAcquireLock));
};

tracing::debug!(specs = %expanded_rows.len(), "resolved expanded specifications");

// Touch all expanded specifications to update their build ID.
Expand Down Expand Up @@ -336,7 +357,7 @@ impl PublishHandler {
}

if test_run {
return Ok((row.pub_id, JobStatus::success(Vec::new())));
return Ok(Ok((row.pub_id, JobStatus::success(Vec::new()))));
}

let tmpdir_handle = tempfile::TempDir::new().context("creating tempdir")?;
Expand Down Expand Up @@ -413,7 +434,7 @@ impl PublishHandler {
.await
.context("adding built specs to draft")?;

return Ok((row.pub_id, JobStatus::success(Vec::new())));
return Ok(Ok((row.pub_id, JobStatus::success(Vec::new()))));
}

// Add built specs to the live spec when publishing a build.
Expand Down Expand Up @@ -458,7 +479,7 @@ impl PublishHandler {
.await
.context("creating linked materialization publications")?;

Ok((row.pub_id, JobStatus::success(pub_ids)))
Ok(Ok((row.pub_id, JobStatus::success(pub_ids))))
}
}

Expand All @@ -467,7 +488,7 @@ async fn stop_with_errors(
mut job_status: JobStatus,
row: Row,
txn: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> anyhow::Result<(Id, JobStatus)> {
) -> anyhow::Result<Result<(Id, JobStatus), CannotAcquireLock>> {
agent_sql::publications::rollback_noop(txn)
.await
.context("rolling back to savepoint")?;
Expand Down Expand Up @@ -501,7 +522,7 @@ async fn stop_with_errors(
}
}

Ok((row.pub_id, job_status))
Ok(Ok((row.pub_id, job_status)))
}

fn create_evolutions_requests(
Expand Down
Loading

0 comments on commit 212ae01

Please sign in to comment.