From d5f2781eff92432ceea9497f7b1fe1c3b672eda4 Mon Sep 17 00:00:00 2001 From: Zach Birenbaum Date: Thu, 8 Aug 2024 12:00:11 -0700 Subject: [PATCH] Conversion implementations for awaited action db structs (#1243) Implements a number of convenience implementations to simplify compatibility with fred and conversions into redis keys/values. --- Cargo.lock | 1 + nativelink-scheduler/BUILD.bazel | 1 + nativelink-scheduler/Cargo.toml | 1 + .../src/awaited_action_db/awaited_action.rs | 14 +++- .../src/awaited_action_db/mod.rs | 79 ++++++++++++++++++- 5 files changed, 89 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5d7deddaf..2271588aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1806,6 +1806,7 @@ version = "0.5.0" dependencies = [ "async-lock", "async-trait", + "bytes", "futures", "lru", "mock_instant", diff --git a/nativelink-scheduler/BUILD.bazel b/nativelink-scheduler/BUILD.bazel index 218013cc0..b7bdb5f0e 100644 --- a/nativelink-scheduler/BUILD.bazel +++ b/nativelink-scheduler/BUILD.bazel @@ -37,6 +37,7 @@ rust_library( "//nativelink-store", "//nativelink-util", "@crates//:async-lock", + "@crates//:bytes", "@crates//:futures", "@crates//:lru", "@crates//:parking_lot", diff --git a/nativelink-scheduler/Cargo.toml b/nativelink-scheduler/Cargo.toml index 38dbe0194..bd1df109a 100644 --- a/nativelink-scheduler/Cargo.toml +++ b/nativelink-scheduler/Cargo.toml @@ -15,6 +15,7 @@ nativelink-metric = { path = "../nativelink-metric" } nativelink-store = { path = "../nativelink-store" } async-lock = { version = "3.4.0", features = ["std"], default-features = false } async-trait = "0.1.81" +bytes = { version = "1.6.1", default-features = false } prost = { version = "0.13.1", default-features = false } uuid = { version = "1.8.0", default-features = false, features = ["v4", "serde"] } futures = { version = "0.3.30", default-features = false } diff --git a/nativelink-scheduler/src/awaited_action_db/awaited_action.rs b/nativelink-scheduler/src/awaited_action_db/awaited_action.rs index 5437eee1f..bc9432653 100644 --- a/nativelink-scheduler/src/awaited_action_db/awaited_action.rs +++ b/nativelink-scheduler/src/awaited_action_db/awaited_action.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; +use bytes::Bytes; use nativelink_error::{make_input_err, Error, ResultExt}; use nativelink_metric::{ MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent, @@ -149,12 +150,13 @@ impl AwaitedAction { } } -impl TryInto> for AwaitedAction { +impl TryInto for AwaitedAction { type Error = Error; - fn try_into(self) -> Result, Self::Error> { - serde_json::to_vec(&self) + fn try_into(self) -> Result { + serde_json::to_string(&self) + .map(Bytes::from) .map_err(|e| make_input_err!("{}", e.to_string())) - .err_tip(|| "In AwaitedAction::TryInto::>") + .err_tip(|| "In AwaitedAction::TryInto::Bytes") } } @@ -213,6 +215,10 @@ impl AwaitedActionSortKey { .as_secs() as u32; Self::new(priority, timestamp) } + + pub fn as_u64(&self) -> u64 { + self.0 + } } // Ensure the size of the sort key is the same as a `u64`. diff --git a/nativelink-scheduler/src/awaited_action_db/mod.rs b/nativelink-scheduler/src/awaited_action_db/mod.rs index f2baba978..fe6fdbf07 100644 --- a/nativelink-scheduler/src/awaited_action_db/mod.rs +++ b/nativelink-scheduler/src/awaited_action_db/mod.rs @@ -18,9 +18,10 @@ use std::sync::Arc; pub use awaited_action::{AwaitedAction, AwaitedActionSortKey}; use futures::{Future, Stream}; -use nativelink_error::Error; +use nativelink_error::{make_input_err, Error, ResultExt}; use nativelink_metric::MetricsComponent; -use nativelink_util::action_messages::{ActionInfo, OperationId}; +use nativelink_util::action_messages::{ActionInfo, ActionStage, OperationId}; +use serde::{Deserialize, Serialize}; mod awaited_action; @@ -33,8 +34,38 @@ pub enum SortedAwaitedActionState { Completed, } +impl TryFrom<&ActionStage> for SortedAwaitedActionState { + type Error = Error; + fn try_from(value: &ActionStage) -> Result { + match value { + ActionStage::CacheCheck => Ok(Self::CacheCheck), + ActionStage::Executing => Ok(Self::Executing), + ActionStage::Completed(_) => Ok(Self::Completed), + ActionStage::Queued => Ok(Self::Queued), + _ => Err(make_input_err!("Invalid State")), + } + } +} + +impl TryFrom for SortedAwaitedActionState { + type Error = Error; + fn try_from(value: ActionStage) -> Result { + Self::try_from(&value) + } +} + +impl std::fmt::Display for SortedAwaitedActionState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + SortedAwaitedActionState::CacheCheck => write!(f, "CacheCheck"), + SortedAwaitedActionState::Queued => write!(f, "Queued"), + SortedAwaitedActionState::Executing => write!(f, "Executing"), + SortedAwaitedActionState::Completed => write!(f, "Completed"), + } + } +} /// A struct pointing to an AwaitedAction that can be sorted. -#[derive(Debug, Clone, MetricsComponent)] +#[derive(Debug, Clone, Serialize, Deserialize, MetricsComponent)] pub struct SortedAwaitedAction { #[metric(help = "The sort key of the AwaitedAction")] pub sort_key: AwaitedActionSortKey, @@ -64,6 +95,48 @@ impl Ord for SortedAwaitedAction { } } +impl std::fmt::Display for SortedAwaitedAction { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + std::fmt::write( + f, + format_args!("{}-{}", self.sort_key.as_u64(), self.operation_id), + ) + } +} + +impl From<&AwaitedAction> for SortedAwaitedAction { + fn from(value: &AwaitedAction) -> Self { + Self { + operation_id: value.operation_id().clone(), + sort_key: value.sort_key(), + } + } +} + +impl From for SortedAwaitedAction { + fn from(value: AwaitedAction) -> Self { + Self::from(&value) + } +} + +impl TryInto> for SortedAwaitedAction { + type Error = Error; + fn try_into(self) -> Result, Self::Error> { + serde_json::to_vec(&self) + .map_err(|e| make_input_err!("{}", e.to_string())) + .err_tip(|| "In SortedAwaitedAction::TryInto::>") + } +} + +impl TryFrom<&[u8]> for SortedAwaitedAction { + type Error = Error; + fn try_from(value: &[u8]) -> Result { + serde_json::from_slice(value) + .map_err(|e| make_input_err!("{}", e.to_string())) + .err_tip(|| "In AwaitedAction::TryFrom::&[u8]") + } +} + /// Subscriber that can be used to monitor when AwaitedActions change. pub trait AwaitedActionSubscriber: Send + Sync + Sized + 'static { /// Wait for AwaitedAction to change.