Skip to content

Commit

Permalink
Harden council and restart running jobs with a missing dep graph
Browse files Browse the repository at this point in the history
This commit is a first pass at hardening council. It primarily encloses
council's core loop in an infallible wrapper as well as handles
situations where we have a missing dependency graph and need to restart
all running jobs. The restart broadcast is also published whenever
council boots up in case of a panic or restart.

Council
- Upon startup, council will publish a message for all subscribers (all
  running pinga instances) to restart actively running jobs
- The core loop for "bin/council" has been wrapped with an infallible
  wrapper (within our control, the only way to escape the wrapper is via
  a shutdown call or a closed connection)
- Bubbled up errors are logged in otel
- Explicit panics are now disallowed in "lib/council-server" with
  additional clippy lints
- All "unwrap" usages have been replaced and are either bubbled up or
  handled

Missing Dependency Graph:
- In the two instances where a depenendecy graph can have missing data
  within council, we need to restart the actively running job

Management Client:
- There is a new client for council exclusively for subscribing to
  management messages from council
- This is solely used for restarting actively running jobs

Subject Generation:
- For both client and server, all subject management has been
  centralized into a new module in "lib/council-server"
- There are two new subjects: "<prefix>.council.management" and
  "<prefix>.council.management.reply"

Signed-off-by: Nick Gerace <[email protected]>
  • Loading branch information
nickgerace committed Jan 25, 2024
1 parent 9a59078 commit 5d15c15
Show file tree
Hide file tree
Showing 13 changed files with 436 additions and 128 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions lib/council-server/BUCK
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ rust_library(
"//third-party/rust:remain",
"//third-party/rust:serde",
"//third-party/rust:serde_json",
"//third-party/rust:strum",
"//third-party/rust:thiserror",
"//third-party/rust:tokio",
"//third-party/rust:ulid",
Expand Down
8 changes: 5 additions & 3 deletions lib/council-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@ rust-version = "1.64"
publish = false

[dependencies]
si-data-nats = { path = "../../lib/si-data-nats" }
si-settings = { path = "../../lib/si-settings" }
telemetry = { path = "../../lib/telemetry-rs" }

derive_builder = { workspace = true }
futures = { workspace = true }
remain = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
si-data-nats = { path = "../../lib/si-data-nats" }
si-settings = { path = "../../lib/si-settings" }
telemetry = { path = "../../lib/telemetry-rs" }
strum = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
ulid = { workspace = true }
54 changes: 28 additions & 26 deletions lib/council-server/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,24 @@ use si_data_nats::{NatsClient, Subject, Subscriber};
use std::time::Duration;
use telemetry::prelude::*;

use crate::SubjectGenerator;
use crate::{Graph, Id, Request, Response};

pub mod management;

pub type ClientResult<T, E = ClientError> = Result<T, E>;

#[remain::sorted]
#[derive(Debug, thiserror::Error)]
pub enum ClientError {
#[error(transparent)]
Nats(#[from] si_data_nats::Error),
#[error("no listener available for message that was just sent")]
NoListenerAvailable,
#[error(transparent)]
SerdeJson(#[from] serde_json::Error),
}

#[remain::sorted]
#[derive(Debug)]
pub enum State {
Expand All @@ -21,7 +37,7 @@ pub struct PubClient {
}

impl PubClient {
pub async fn register_dependency_graph(&self, dependency_graph: Graph) -> Result<()> {
pub async fn register_dependency_graph(&self, dependency_graph: Graph) -> ClientResult<()> {
let message = serde_json::to_vec(&Request::ValueDependencyGraph {
change_set_id: self.change_set_id,
dependency_graph,
Expand All @@ -36,7 +52,7 @@ impl PubClient {
Ok(())
}

pub async fn processed_value(&self, node_id: Id) -> Result<()> {
pub async fn processed_value(&self, node_id: Id) -> ClientResult<()> {
let message = serde_json::to_vec(&Request::ProcessedValue {
change_set_id: self.change_set_id,
node_id,
Expand All @@ -51,7 +67,7 @@ impl PubClient {
Ok(())
}

pub async fn failed_processing_value(&self, node_id: Id) -> Result<()> {
pub async fn failed_processing_value(&self, node_id: Id) -> ClientResult<()> {
let message = serde_json::to_vec(&Request::ValueProcessingFailed {
change_set_id: self.change_set_id,
node_id,
Expand All @@ -66,7 +82,7 @@ impl PubClient {
Ok(())
}

pub async fn bye(self) -> Result<()> {
pub async fn bye(self) -> ClientResult<()> {
let message = serde_json::to_vec(&Request::Bye {
change_set_id: self.change_set_id,
})?;
Expand All @@ -93,12 +109,11 @@ pub struct Client {
impl Client {
pub async fn new(
nats: NatsClient,
subject_prefix: &str,
subject_prefix: Option<String>,
id: Id,
change_set_id: Id,
) -> Result<Self> {
let pub_channel = format!("{subject_prefix}.{id}");
let reply_channel = format!("{pub_channel}.reply");
) -> ClientResult<Self> {
let (pub_channel, reply_channel) = SubjectGenerator::for_client(subject_prefix, id);
Ok(Self {
pub_channel: pub_channel.into(),
change_set_id,
Expand All @@ -118,7 +133,7 @@ impl Client {
}

// None means subscriber has been unsubscribed or that the connection has been closed
pub async fn fetch_response(&mut self) -> Result<Option<Response>> {
pub async fn fetch_response(&mut self) -> ClientResult<Option<Response>> {
// TODO: timeout so we don't get stuck here forever if council goes away
// TODO: handle message.data() empty with Status header as 503: https://github.com/nats-io/nats.go/pull/576
let msg = loop {
Expand All @@ -135,38 +150,25 @@ impl Client {
match msg {
Some(msg) => {
if msg.payload().is_empty() {
return Err(Error::NoListenerAvailable);
return Err(ClientError::NoListenerAvailable);
}
Ok(Some(serde_json::from_slice::<Response>(msg.payload())?))
}
None => Ok(None),
}
}

pub async fn register_dependency_graph(&self, dependency_graph: Graph) -> Result<()> {
pub async fn register_dependency_graph(&self, dependency_graph: Graph) -> ClientResult<()> {
self.clone_into_pub()
.register_dependency_graph(dependency_graph)
.await
}

pub async fn processed_value(&self, node_id: Id) -> Result<()> {
pub async fn processed_value(&self, node_id: Id) -> ClientResult<()> {
self.clone_into_pub().processed_value(node_id).await
}

pub async fn bye(&self) -> Result<()> {
pub async fn bye(&self) -> ClientResult<()> {
self.clone_into_pub().bye().await
}
}

pub type Result<T, E = Error> = std::result::Result<T, E>;

#[remain::sorted]
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error(transparent)]
Nats(#[from] si_data_nats::Error),
#[error("no listener available for message that was just sent")]
NoListenerAvailable,
#[error(transparent)]
SerdeJson(#[from] serde_json::Error),
}
57 changes: 57 additions & 0 deletions lib/council-server/src/client/management.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
//! This module contains [`ManagementClient`], which is used to subscribe to management-type messages from a council
//! server (example: notify all running pinga instances that they should [`restart`](ManagementResponse::Restart)
//! actively running jobs).

use futures::StreamExt;
use si_data_nats::{NatsClient, Subject, Subscriber};
use std::time::Duration;
use telemetry::prelude::*;

use crate::client::{ClientError, ClientResult};
use crate::{ManagementResponse, SubjectGenerator};

#[derive(Debug)]
pub struct ManagementClient {
management_channel: Subject,
management_subscriber: Subscriber,
}

impl ManagementClient {
pub async fn new(nats: &NatsClient, subject_prefix: Option<String>) -> ClientResult<Self> {
let management_channel = SubjectGenerator::for_management_client(subject_prefix);
Ok(Self {
management_subscriber: nats.subscribe(management_channel.clone()).await?,
management_channel: management_channel.into(),
})
}

// None means subscriber has been unsubscribed or that the connection has been closed
pub async fn fetch_response(&mut self) -> ClientResult<Option<ManagementResponse>> {
// TODO: timeout so we don't get stuck here forever if council goes away
// TODO: handle message.data() empty with Status header as 503: https://github.com/nats-io/nats.go/pull/576
let msg = loop {
let res =
tokio::time::timeout(Duration::from_secs(60), self.management_subscriber.next())
.await;

match res {
Ok(msg) => break msg,
Err(_) => {
warn!(management_channel = ?self.management_channel, "Council client waiting for response on management channel for 60 seconds");
}
}
};

match msg {
Some(msg) => {
if msg.payload().is_empty() {
return Err(ClientError::NoListenerAvailable);
}
Ok(Some(serde_json::from_slice::<ManagementResponse>(
msg.payload(),
)?))
}
None => Ok(None),
}
}
}
24 changes: 23 additions & 1 deletion lib/council-server/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,25 @@
#![warn(
clippy::unwrap_in_result,
clippy::unwrap_used,
clippy::panic,
clippy::panic_in_result_fn
)]

use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use strum::EnumDiscriminants;
use ulid::Ulid;

pub mod client;
pub mod server;

// The subject generator module is for internal use. The module is private as a result.
mod subject_generator;

pub use client::management::ManagementClient;
pub use client::{Client, PubClient};
pub use server::Server;
pub(crate) use subject_generator::SubjectGenerator;

#[derive(Clone, Copy, Eq, PartialEq, Serialize, Deserialize, Hash)]
pub struct Id(Ulid);
Expand Down Expand Up @@ -53,7 +66,7 @@ impl std::fmt::Debug for Id {
pub type Graph = HashMap<Id, Vec<Id>>;

#[remain::sorted]
#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug, EnumDiscriminants)]
#[serde(tag = "kind")]
pub enum Request {
Bye {
Expand All @@ -63,6 +76,7 @@ pub enum Request {
change_set_id: Id,
node_id: Id,
},
Restart,
ValueDependencyGraph {
change_set_id: Id,
dependency_graph: Graph,
Expand All @@ -80,5 +94,13 @@ pub enum Response {
BeenProcessed { node_id: Id },
Failed { node_id: Id },
OkToProcess { node_ids: Vec<Id> },
Restart,
Shutdown,
}

#[remain::sorted]
#[derive(Serialize, Deserialize, Debug)]
#[serde(tag = "kind")]
pub enum ManagementResponse {
Restart,
}
Loading

0 comments on commit 5d15c15

Please sign in to comment.