Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

agent: defer handling of publications that require blocking locks #1390

Merged
merged 1 commit into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/agent-sql/src/publications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ pub async fn resolve_spec_rows(
on draft_specs.catalog_name = live_specs.catalog_name
where draft_specs.draft_id = $1
order by draft_specs.catalog_name asc
for update of draft_specs, live_specs;
for update of draft_specs, live_specs nowait;
"#,
draft_id as Id,
user_id,
Expand Down Expand Up @@ -447,6 +447,7 @@ pub async fn resolve_expanded_rows(
-- Strip deleted specs which are still reach-able through a dataflow edge,
-- and strip rows already part of the seed set.
where l.spec is not null and l.id not in (select id from seeds)
for update of l nowait
"#,
seed_ids as Vec<Id>,
user_id,
Expand Down
3 changes: 2 additions & 1 deletion crates/agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ base64 = { workspace = true }
bytes = { workspace = true }
chrono = { workspace = true }
clap = { workspace = true }
colored_json = { workspace = true } # Used to render ops::Logs for UI.
colored_json = { workspace = true } # Used to render ops::Logs for UI.
derivative = { workspace = true }
futures = { workspace = true }
humantime-serde = { workspace = true }
itertools = { workspace = true }
lazy_static = { workspace = true }
regex = { workspace = true }
Expand Down
58 changes: 42 additions & 16 deletions crates/agent/src/evolution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub enum JobStatus {
evolved_collections: Vec<EvolvedCollection>,
publication_id: Option<Id>,
},
Queued,
}

#[derive(Serialize, Deserialize, PartialEq, Debug)]
Expand All @@ -66,22 +67,47 @@ impl Handler for EvolutionHandler {
pg_pool: &sqlx::PgPool,
allow_background: bool,
) -> anyhow::Result<HandleResult> {
let mut txn = pg_pool.begin().await?;

let Some(row) = agent_sql::evolutions::dequeue(&mut txn, allow_background).await? else {
return Ok(HandleResult::NoJobs);
};

let time_queued = chrono::Utc::now().signed_duration_since(row.updated_at);
let id: Id = row.id;
let status = process_row(row, &mut txn).await?;
let status = serde_json::to_value(status)?;

tracing::info!(%id, %time_queued, %status, "evolution finished");
agent_sql::evolutions::resolve(id, &status, &mut txn).await?;
txn.commit().await?;

Ok(HandleResult::HadJob)
loop {
let mut txn = pg_pool.begin().await?;

let Some(row) = agent_sql::evolutions::dequeue(&mut txn, allow_background).await?
else {
return Ok(HandleResult::NoJobs);
};

let time_queued = chrono::Utc::now().signed_duration_since(row.updated_at);
let id: Id = row.id;
let process_result = process_row(row, &mut txn).await;
let job_status = match process_result {
Ok(s) => s,
Err(err) if crate::is_acquire_lock_error(&err) => {
tracing::info!(%id, %time_queued, "cannot acquire all row locks for evolution (will retry)");
// Since we failed to acquire a necessary row lock, wait a short
// while and then try again.
txn.rollback().await?;
// The sleep is really just so we don't spam the DB in a busy
// loop. I arrived at these values via the very scientific 😉
// process of reproducing failures using a couple of different
// values and squinting at the logs in my terminal. In
// practice, it's common for another agent process to pick up
// the job while this one is sleeping, which is why I didn't
// see a need for jitter. All agents process the job queue in
// the same order, so the next time any agent polls the
// handler, it should get this same job, since we've released
// the lock on the job row. Evolutions jobs will fail _quite_
// quickly in this scenario, hence the full second.
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why is this one second, while the publications handler backoff is 500 ms?

I'm thinking of this as just a back-off so we don't spam the DB in a busy loop, right?

It should avoid deadlock because we've rolled back and released all locks, so progress is likely being made, though I could imagine racy scenarios where to jobs keep on detecting potential deadlock without waiting, backoff, and retry at the precise time that causes the condition to happen again.

Perhaps this should be jittered to make sure that ☝️ doesn't happen indefinitely?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is really just so we don't spam the DB in a busy loop. I arrived at these values via the very scientific 😉 process of reproducing failures using a couple of different values and squinting at the logs in my terminal. Evolutions tend to fail much faster (single digit ms) than publications, which have a lot more back and forths with the database before attempting to lock expanded spec rows. In testing, it's been much more common for another agent process to pick up the job while this one is sleeping, which is why I didn't see a need for jitter. All agents process the job queue in the same order, so the next time any agent polls the handler, it should get this same job, since we've released the lock on the job row.

☝️ This clearly should have been a code comment, so I'll add that.

continue;
}
Err(other_err) => return Err(other_err),
};
let status = serde_json::to_value(job_status)?;
tracing::info!(%id, %time_queued, %status, "evolution finished");
agent_sql::evolutions::resolve(id, &status, &mut txn).await?;
txn.commit().await?;

return Ok(HandleResult::HadJob);
}
}

fn table_name(&self) -> &'static str {
Expand Down
12 changes: 12 additions & 0 deletions crates/agent/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,18 @@ lazy_static! {
static ref NAME_VERSION_RE: Regex = Regex::new(r#".*[_-][vV](\d+)$"#).unwrap();
}

/// Returns true if the given error represents a failure to acquire a lock, as indicated
/// by the "sql state" code.
fn is_acquire_lock_error(err: &anyhow::Error) -> bool {
let Some(sql_err) = err.downcast_ref::<sqlx::Error>() else {
return false;
};
sql_err
.as_database_error()
.filter(|e| e.code().as_ref().map(|c| c.as_ref()) == Some("55P03"))
.is_some()
}

/// Takes an existing name and returns a new name with an incremeted version suffix.
/// The name `foo` will become `foo_v2`, and `foo_v2` will become `foo_v3` and so on.
pub fn next_name(current_name: &str) -> String {
Expand Down
77 changes: 51 additions & 26 deletions crates/agent/src/publications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,34 +103,59 @@ impl Handler for PublishHandler {
pg_pool: &sqlx::PgPool,
allow_background: bool,
) -> anyhow::Result<HandleResult> {
let mut txn = pg_pool.begin().await?;

let row: Row = match agent_sql::publications::dequeue(&mut txn, allow_background).await? {
None => return Ok(HandleResult::NoJobs),
Some(row) => row,
};

let delete_draft_id = if !row.dry_run {
Some(row.draft_id)
} else {
None
};

let time_queued = chrono::Utc::now().signed_duration_since(row.updated_at);

let (id, status) = self.process(row, &mut txn, false).await?;
info!(%id, %time_queued, ?status, "finished");

agent_sql::publications::resolve(id, &status, &mut txn).await?;
txn.commit().await?;
loop {
let mut txn = pg_pool.begin().await?;

let row: Row =
match agent_sql::publications::dequeue(&mut txn, allow_background).await? {
None => return Ok(HandleResult::NoJobs),
Some(row) => row,
};
let background = row.background;

let delete_draft_id = if !row.dry_run {
Some(row.draft_id)
} else {
None
};

// As a separate transaction, delete the draft if it has no draft_specs.
// The user could have raced an insertion of a new spec.
if let (Some(delete_draft_id), JobStatus::Success { .. }) = (delete_draft_id, status) {
agent_sql::publications::delete_draft(delete_draft_id, pg_pool).await?;
let time_queued = chrono::Utc::now().signed_duration_since(row.updated_at);
let id = row.pub_id;
let process_result = self.process(row, &mut txn, false).await;

let status = match process_result {
Ok((_, status)) => status,
Err(err) if crate::is_acquire_lock_error(&err) => {
tracing::info!(%id, %time_queued, "cannot acquire all row locks for publication (will retry)");
// Since we failed to acquire a necessary row lock, wait a short
// while and then try again.
txn.rollback().await?;
// The sleep is really just so we don't spam the DB in a busy
// loop. I arrived at these values via the very scientific 😉
// process of reproducing failures using a couple of different
// values and squinting at the logs in my terminal. In
// practice, it's common for another agent process to pick up
// the job while this one is sleeping, which is why I didn't
// see a need for jitter. All agents process the job queue in
// the same order, so the next time any agent polls the
// handler, it should get this same job, since we've released
// the lock on the job row.
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
continue;
}
Err(other_err) => return Err(other_err),
};
info!(%id, %time_queued, ?status, %background, "finished");
agent_sql::publications::resolve(id, &status, &mut txn).await?;
txn.commit().await?;

// As a separate transaction, delete the draft if it has no draft_specs.
// The user could have raced an insertion of a new spec.
if let (Some(delete_draft_id), JobStatus::Success { .. }) = (delete_draft_id, status) {
agent_sql::publications::delete_draft(delete_draft_id, pg_pool).await?;
}
return Ok(HandleResult::HadJob);
}

Ok(HandleResult::HadJob)
}

fn table_name(&self) -> &'static str {
Expand Down
8 changes: 4 additions & 4 deletions crates/agent/src/publications/specs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ pub async fn resolve_specifications(
}

// expanded_specifications returns additional specifications which should be
// included in this publication's build. These specifications are not changed
// by the publication and are read with read-committed transaction semantics,
// but (if not a dry-run) we do re-activate each specification within the
// data-plane with the outcome of this publication's build.
// included in this publication's build. Attempts to acquire a lock on each expanded `live_specs`
// row, with the assumption that we will be updating the `built_spec` and `last_build_id`.
// Returns `Ok(Err(CannotAcquireLock))` if any locks could not be immediately acquired, so that the
// publication can be re-tried later.
pub async fn expanded_specifications(
user_id: Uuid,
spec_rows: &[SpecRow],
Expand Down
Loading