Skip to content

Commit

Permalink
Conversion implementations for awaited action db structs (TraceMachin…
Browse files Browse the repository at this point in the history
…a#1243)

Implements a number of convenience implementations to simplify
compatibility with fred and conversions into redis keys/values.
  • Loading branch information
zbirenbaum authored Aug 8, 2024
1 parent 3eadab0 commit d5f2781
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 7 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions nativelink-scheduler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ rust_library(
"//nativelink-store",
"//nativelink-util",
"@crates//:async-lock",
"@crates//:bytes",
"@crates//:futures",
"@crates//:lru",
"@crates//:parking_lot",
Expand Down
1 change: 1 addition & 0 deletions nativelink-scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ nativelink-metric = { path = "../nativelink-metric" }
nativelink-store = { path = "../nativelink-store" }
async-lock = { version = "3.4.0", features = ["std"], default-features = false }
async-trait = "0.1.81"
bytes = { version = "1.6.1", default-features = false }
prost = { version = "0.13.1", default-features = false }
uuid = { version = "1.8.0", default-features = false, features = ["v4", "serde"] }
futures = { version = "0.3.30", default-features = false }
Expand Down
14 changes: 10 additions & 4 deletions nativelink-scheduler/src/awaited_action_db/awaited_action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};

use bytes::Bytes;
use nativelink_error::{make_input_err, Error, ResultExt};
use nativelink_metric::{
MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent,
Expand Down Expand Up @@ -149,12 +150,13 @@ impl AwaitedAction {
}
}

impl TryInto<Vec<u8>> for AwaitedAction {
impl TryInto<bytes::Bytes> for AwaitedAction {
type Error = Error;
fn try_into(self) -> Result<Vec<u8>, Self::Error> {
serde_json::to_vec(&self)
fn try_into(self) -> Result<Bytes, Self::Error> {
serde_json::to_string(&self)
.map(Bytes::from)
.map_err(|e| make_input_err!("{}", e.to_string()))
.err_tip(|| "In AwaitedAction::TryInto::<Vec<u8>>")
.err_tip(|| "In AwaitedAction::TryInto::Bytes")
}
}

Expand Down Expand Up @@ -213,6 +215,10 @@ impl AwaitedActionSortKey {
.as_secs() as u32;
Self::new(priority, timestamp)
}

pub fn as_u64(&self) -> u64 {
self.0
}
}

// Ensure the size of the sort key is the same as a `u64`.
Expand Down
79 changes: 76 additions & 3 deletions nativelink-scheduler/src/awaited_action_db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ use std::sync::Arc;

pub use awaited_action::{AwaitedAction, AwaitedActionSortKey};
use futures::{Future, Stream};
use nativelink_error::Error;
use nativelink_error::{make_input_err, Error, ResultExt};
use nativelink_metric::MetricsComponent;
use nativelink_util::action_messages::{ActionInfo, OperationId};
use nativelink_util::action_messages::{ActionInfo, ActionStage, OperationId};
use serde::{Deserialize, Serialize};

mod awaited_action;

Expand All @@ -33,8 +34,38 @@ pub enum SortedAwaitedActionState {
Completed,
}

impl TryFrom<&ActionStage> for SortedAwaitedActionState {
type Error = Error;
fn try_from(value: &ActionStage) -> Result<Self, Error> {
match value {
ActionStage::CacheCheck => Ok(Self::CacheCheck),
ActionStage::Executing => Ok(Self::Executing),
ActionStage::Completed(_) => Ok(Self::Completed),
ActionStage::Queued => Ok(Self::Queued),
_ => Err(make_input_err!("Invalid State")),
}
}
}

impl TryFrom<ActionStage> for SortedAwaitedActionState {
type Error = Error;
fn try_from(value: ActionStage) -> Result<Self, Error> {
Self::try_from(&value)
}
}

impl std::fmt::Display for SortedAwaitedActionState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
SortedAwaitedActionState::CacheCheck => write!(f, "CacheCheck"),
SortedAwaitedActionState::Queued => write!(f, "Queued"),
SortedAwaitedActionState::Executing => write!(f, "Executing"),
SortedAwaitedActionState::Completed => write!(f, "Completed"),
}
}
}
/// A struct pointing to an AwaitedAction that can be sorted.
#[derive(Debug, Clone, MetricsComponent)]
#[derive(Debug, Clone, Serialize, Deserialize, MetricsComponent)]
pub struct SortedAwaitedAction {
#[metric(help = "The sort key of the AwaitedAction")]
pub sort_key: AwaitedActionSortKey,
Expand Down Expand Up @@ -64,6 +95,48 @@ impl Ord for SortedAwaitedAction {
}
}

impl std::fmt::Display for SortedAwaitedAction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::write(
f,
format_args!("{}-{}", self.sort_key.as_u64(), self.operation_id),
)
}
}

impl From<&AwaitedAction> for SortedAwaitedAction {
fn from(value: &AwaitedAction) -> Self {
Self {
operation_id: value.operation_id().clone(),
sort_key: value.sort_key(),
}
}
}

impl From<AwaitedAction> for SortedAwaitedAction {
fn from(value: AwaitedAction) -> Self {
Self::from(&value)
}
}

impl TryInto<Vec<u8>> for SortedAwaitedAction {
type Error = Error;
fn try_into(self) -> Result<Vec<u8>, Self::Error> {
serde_json::to_vec(&self)
.map_err(|e| make_input_err!("{}", e.to_string()))
.err_tip(|| "In SortedAwaitedAction::TryInto::<Vec<u8>>")
}
}

impl TryFrom<&[u8]> for SortedAwaitedAction {
type Error = Error;
fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
serde_json::from_slice(value)
.map_err(|e| make_input_err!("{}", e.to_string()))
.err_tip(|| "In AwaitedAction::TryFrom::&[u8]")
}
}

/// Subscriber that can be used to monitor when AwaitedActions change.
pub trait AwaitedActionSubscriber: Send + Sync + Sized + 'static {
/// Wait for AwaitedAction to change.
Expand Down

0 comments on commit d5f2781

Please sign in to comment.