From 30f2e698dd76f49f3ef86dbfffd311ba94279a55 Mon Sep 17 00:00:00 2001 From: Ada Bohm Date: Tue, 29 Oct 2024 11:12:49 +0100 Subject: [PATCH] Removed some old code related to data deps --- Cargo.lock | 4 ++ Cargo.toml | 2 + crates/hyperqueue/Cargo.toml | 1 + crates/hyperqueue/src/server/client/submit.rs | 3 +- crates/tako/Cargo.toml | 3 +- crates/tako/src/gateway.rs | 3 +- crates/tako/src/internal/scheduler/metrics.rs | 7 +-- crates/tako/src/internal/scheduler/state.rs | 6 +- crates/tako/src/internal/server/client.rs | 58 +------------------ crates/tako/src/internal/server/core.rs | 12 ++-- crates/tako/src/internal/server/reactor.rs | 14 +---- crates/tako/src/internal/server/task.rs | 38 +----------- .../internal/tests/integration/utils/task.rs | 3 +- crates/tako/src/internal/tests/utils/task.rs | 15 ++--- 14 files changed, 41 insertions(+), 128 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index aa2a3af81..140bb1f16 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1039,6 +1039,7 @@ dependencies = [ "tako", "tempfile", "textwrap 0.16.1", + "thin-vec", "thiserror", "tokio", "tokio-util", @@ -2268,6 +2269,9 @@ name = "thin-vec" version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a38c90d48152c236a3ab59271da4f4ae63d678c5d7ad6b7714d7cb9760be5e4b" +dependencies = [ + "serde", +] [[package]] name = "thiserror" diff --git a/Cargo.toml b/Cargo.toml index c55ba0d2b..bf152cf8a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,6 +42,8 @@ anyhow = "1" nix = { version = "0.29", features = ["process", "signal"] } bstr = { version = "1.9", features = ["serde"] } psutil = "3" +thin-vec = { version = "0.2", features = ["serde"] } + [profile.release] panic = "abort" diff --git a/crates/hyperqueue/Cargo.toml b/crates/hyperqueue/Cargo.toml index c95301dd5..08f7aaf86 100644 --- a/crates/hyperqueue/Cargo.toml +++ b/crates/hyperqueue/Cargo.toml @@ -34,6 +34,7 @@ nix = { workspace = true } bstr = { workspace = true } psutil = { workspace = true } byteorder = { workspace = true } +thin-vec = { workspace = true } humantime = "2" num_cpus = "1" diff --git a/crates/hyperqueue/src/server/client/submit.rs b/crates/hyperqueue/src/server/client/submit.rs index 405fc2adc..d95cfbe34 100644 --- a/crates/hyperqueue/src/server/client/submit.rs +++ b/crates/hyperqueue/src/server/client/submit.rs @@ -6,6 +6,7 @@ use std::time::Duration; use bstr::BString; use tako::Map; use tako::Set; +use thin_vec::ThinVec; use tako::gateway::{ FromGatewayMessage, NewTasksMessage, ResourceRequestVariants, SharedTaskConfiguration, @@ -272,7 +273,7 @@ fn build_tasks_array( let build_task_conf = |body: Box<[u8]>, tako_id: TakoTaskId| TaskConfiguration { id: tako_id, shared_data_index: 0, - task_deps: Vec::new(), + task_deps: ThinVec::new(), body, }; diff --git a/crates/tako/Cargo.toml b/crates/tako/Cargo.toml index e74e751d6..f77ffdd26 100644 --- a/crates/tako/Cargo.toml +++ b/crates/tako/Cargo.toml @@ -28,13 +28,14 @@ tracing = { workspace = true } nix = { workspace = true } bstr = { workspace = true } psutil = { workspace = true } +thin-vec = { workspace = true } + hashbrown = { version = "0.15", features = ["serde", "inline-more"], default-features = false } tracing-subscriber = { version = "0.3", features = ["json"] } priority-queue = "2" bitflags = "2" fxhash = "0.2" -thin-vec = "0.2" derive_more = { version = "1", features = ["add", "add_assign", "sum"] } [dev-dependencies] diff --git a/crates/tako/src/gateway.rs b/crates/tako/src/gateway.rs index e210552c5..cebada3ee 100644 --- a/crates/tako/src/gateway.rs +++ b/crates/tako/src/gateway.rs @@ -11,6 +11,7 @@ use crate::task::SerializedTaskContext; use crate::{InstanceId, Map, Priority, TaskId, WorkerId}; use smallvec::{smallvec, SmallVec}; use std::time::Duration; +use thin_vec::ThinVec; #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)] pub struct ResourceRequestEntry { @@ -116,7 +117,7 @@ pub struct TaskConfiguration { /// Index into NewTasksMessage::shared_data that contains the shared data for this task. pub shared_data_index: u32, - pub task_deps: Vec, + pub task_deps: ThinVec, /// Opaque data that is passed by the gateway user to task launchers. #[serde(with = "serde_bytes")] diff --git a/crates/tako/src/internal/scheduler/metrics.rs b/crates/tako/src/internal/scheduler/metrics.rs index 47910b6bb..a635eed92 100644 --- a/crates/tako/src/internal/scheduler/metrics.rs +++ b/crates/tako/src/internal/scheduler/metrics.rs @@ -29,14 +29,13 @@ fn crawl &Set>(tasks: &mut TaskMap, predecessor_fn: F1) let task = tasks.get_task_mut(task_id); task.set_scheduler_priority(level + 1); - for ti in task.inputs.iter() { - let input_id = ti.task(); + for t in task.task_deps.iter() { let v: &mut u32 = neighbours - .get_mut(&input_id) + .get_mut(t) .expect("Couldn't find task neighbour in level computation"); if *v <= 1 { assert_eq!(*v, 1); - stack.push(input_id); + stack.push(*t); } else { *v -= 1; } diff --git a/crates/tako/src/internal/scheduler/state.rs b/crates/tako/src/internal/scheduler/state.rs index 3f6f1daf6..0ddcb39cf 100644 --- a/crates/tako/src/internal/scheduler/state.rs +++ b/crates/tako/src/internal/scheduler/state.rs @@ -99,7 +99,7 @@ impl SchedulerState { try_prev_worker: bool, // Enable heuristics that tries to fit tasks on fewer workers ) -> Option { // Fast path - if try_prev_worker && task.inputs.is_empty() { + if try_prev_worker && task.task_deps.is_empty() { // Note: We are *not* using "is_capable_to_run" but "have_immediate_resources_for_rq", // because we want to enable fast path only if task can be directly executed // We want to avoid creation of overloaded @@ -256,7 +256,7 @@ impl SchedulerState { worker_id ); } - (task.inputs.clone(), assigned_worker) + (task.task_deps.clone(), assigned_worker) }; let (tasks, workers) = core.split_tasks_workers_mut(); @@ -419,7 +419,7 @@ impl SchedulerState { let task = tasks.get_task_mut(task_id); if task.is_sn_running() || (not_overloaded - && (task.is_fresh() || !task.inputs.is_empty()) + && (task.is_fresh() || !task.task_deps.is_empty()) && worker.has_time_to_run_for_rqv(&task.configuration.resources, now)) { continue; diff --git a/crates/tako/src/internal/server/client.rs b/crates/tako/src/internal/server/client.rs index 9b273c11c..aff4846b1 100644 --- a/crates/tako/src/internal/server/client.rs +++ b/crates/tako/src/internal/server/client.rs @@ -11,57 +11,8 @@ use crate::internal::scheduler::query::compute_new_worker_query; use crate::internal::server::comm::{Comm, CommSender, CommSenderRef}; use crate::internal::server::core::{Core, CoreRef}; use crate::internal::server::reactor::{on_cancel_tasks, on_new_tasks}; -use crate::internal::server::task::{Task, TaskConfiguration, TaskInput, TaskRuntimeState}; +use crate::internal::server::task::{Task, TaskConfiguration, TaskRuntimeState}; use std::rc::Rc; -use thin_vec::ThinVec; - -/*pub(crate) async fn client_connection_handler( - core_ref: CoreRef, - comm_ref: CommSenderRef, - listener: UnixListener, - client_sender: UnboundedSender, - client_receiver: UnboundedReceiver, -) { - if let Ok((stream, _)) = listener.accept().await { - let framed = make_protocol_builder().new_framed(stream); - let (sender, mut receiver) = framed.split(); - let send_loop = forward_queue_to_sink(client_receiver, sender, |msg| { - rmp_serde::to_vec_named(&msg).unwrap().into() - }); - { - let core = core_ref.get(); - let mut comm = comm_ref.get_mut(); - for worker in core.get_workers() { - comm.send_client_worker_new(worker.id, &worker.configuration); - } - } - let receive_loop = async move { - while let Some(data) = receiver.next().await { - // TODO: Instead of unwrap, send error message to client - let data = data.unwrap(); - let message: Result = rmp_serde::from_slice(&data); - let error = match message { - Ok(message) => { - process_client_message(&core_ref, &comm_ref, &client_sender, message).await - } - Err(error) => Some(format!("Invalid format of message: {}", error)), - }; - if let Some(message) = error { - client_sender - .send(ToGatewayMessage::Error(ErrorResponse { message })) - .unwrap(); - } - } - }; - tokio::select! { - r = send_loop => { r.unwrap() }, - () = receive_loop => {}, - } - } else { - panic!("Invalid connection from client"); - } - log::info!("Client connection terminated"); -}*/ fn create_task_configuration( core_ref: &mut Core, @@ -238,12 +189,7 @@ fn handle_new_tasks( return Some(format!("Invalid configuration index {idx}")); } let conf = &configurations[idx]; - let inputs: ThinVec<_> = task - .task_deps - .iter() - .map(|&task_id| TaskInput::new_task_dependency(task_id)) - .collect(); - let task = Task::new(task.id, inputs, conf.clone(), task.body); + let task = Task::new(task.id, task.task_deps, conf.clone(), task.body); tasks.push(task); } if !msg.adjust_instance_id.is_empty() { diff --git a/crates/tako/src/internal/server/core.rs b/crates/tako/src/internal/server/core.rs index 3add581f2..ab9fc5640 100644 --- a/crates/tako/src/internal/server/core.rs +++ b/crates/tako/src/internal/server/core.rs @@ -363,8 +363,8 @@ impl Core { #[cfg(test)] pub fn sanity_check(&self) { let fw_check = |task: &Task| { - for input in &task.inputs { - assert!(self.tasks.get_task(input.task()).is_finished()); + for task_dep in &task.task_deps { + assert!(self.tasks.get_task(*task_dep).is_finished()); } for &task_id in task.get_consumers() { assert!(self.tasks.get_task(task_id).is_waiting()); @@ -398,8 +398,8 @@ impl Core { match &task.state { TaskRuntimeState::Waiting(winfo) => { let mut count = 0; - for ti in &task.inputs { - if !self.tasks.get_task(ti.task()).is_finished() { + for task_dep in &task.task_deps { + if !self.tasks.get_task(*task_dep).is_finished() { count += 1; } } @@ -425,8 +425,8 @@ impl Core { } TaskRuntimeState::Finished(_) => { - for ti in &task.inputs { - assert!(self.tasks.get_task(ti.task()).is_finished()); + for task_dep in &task.task_deps { + assert!(self.tasks.get_task(*task_dep).is_finished()); } } TaskRuntimeState::RunningMultiNode(ws) => { diff --git a/crates/tako/src/internal/server/reactor.rs b/crates/tako/src/internal/server/reactor.rs index 6d95deb43..d2e13bbfb 100644 --- a/crates/tako/src/internal/server/reactor.rs +++ b/crates/tako/src/internal/server/reactor.rs @@ -167,11 +167,8 @@ pub(crate) fn on_new_tasks(core: &mut Core, comm: &mut impl Comm, new_tasks: Vec let mut task = task_map.remove(&task_id).unwrap(); let mut count = 0; - for ti in task.inputs.iter() { - let input_id = ti.task(); - let task_dep = task_map - .get_mut(&input_id) - .unwrap_or_else(|| core.get_task_mut(input_id)); + for t in task.task_deps.iter() { + let task_dep = task_map.get_mut(t).unwrap_or_else(|| core.get_task_mut(*t)); task_dep.add_consumer(task.id); if !task_dep.is_finished() { count += 1 @@ -555,12 +552,7 @@ pub(crate) fn on_cancel_tasks( } fn unregister_as_consumer(core: &mut Core, comm: &mut impl Comm, task_id: TaskId) { - let inputs: Vec = core - .get_task(task_id) - .inputs - .iter() - .map(|ti| ti.task()) - .collect(); + let inputs: Vec = core.get_task(task_id).task_deps.iter().copied().collect(); for input_id in inputs { let input = core.get_task_mut(input_id); assert!(input.remove_consumer(task_id)); diff --git a/crates/tako/src/internal/server/task.rs b/crates/tako/src/internal/server/task.rs index 8b3131b23..50193df4e 100644 --- a/crates/tako/src/internal/server/task.rs +++ b/crates/tako/src/internal/server/task.rs @@ -54,38 +54,6 @@ bitflags::bitflags! { } } -#[derive(Clone)] -#[cfg_attr(test, derive(Eq, PartialEq))] -pub struct TaskInput { - task: TaskId, - output_id: u32, // MAX = pure dependency on task, not real output id -} - -impl TaskInput { - pub fn new(task: TaskId, output_id: u32) -> Self { - TaskInput { task, output_id } - } - - pub fn new_task_dependency(task: TaskId) -> Self { - TaskInput { - task, - output_id: u32::MAX, - } - } - - pub fn task(&self) -> TaskId { - self.task - } - - pub fn output_id(&self) -> Option { - if self.output_id == u32::MAX { - None - } else { - Some(self.output_id) - } - } -} - #[derive(Debug)] #[cfg_attr(test, derive(Eq, PartialEq))] pub struct TaskConfiguration { @@ -101,7 +69,7 @@ pub struct Task { pub id: TaskId, pub state: TaskRuntimeState, consumers: Set, - pub inputs: ThinVec, + pub task_deps: ThinVec, pub flags: TaskFlags, pub configuration: Rc, pub scheduler_priority: Priority, @@ -123,7 +91,7 @@ impl fmt::Debug for Task { impl Task { pub fn new( id: TaskId, - inputs: ThinVec, + dependencies: ThinVec, configuration: Rc, body: Box<[u8]>, ) -> Self { @@ -134,7 +102,7 @@ impl Task { Self { id, - inputs, + task_deps: dependencies, flags, configuration, body, diff --git a/crates/tako/src/internal/tests/integration/utils/task.rs b/crates/tako/src/internal/tests/integration/utils/task.rs index 735f43063..cc0d07c36 100644 --- a/crates/tako/src/internal/tests/integration/utils/task.rs +++ b/crates/tako/src/internal/tests/integration/utils/task.rs @@ -5,6 +5,7 @@ use std::time::Duration; use crate::internal::common::index::ItemId; use derive_builder::Builder; use smallvec::smallvec; +use thin_vec::ThinVec; use crate::gateway::{ ResourceRequest, ResourceRequestEntry, ResourceRequestVariants, SharedTaskConfiguration, @@ -119,7 +120,7 @@ pub fn build_task_def_from_config( TaskConfiguration { id: TaskId::new(id.unwrap_or(1) as ::IdType), shared_data_index: 0, - task_deps: Vec::new(), + task_deps: ThinVec::new(), body: body.into_boxed_slice(), }, conf, diff --git a/crates/tako/src/internal/tests/utils/task.rs b/crates/tako/src/internal/tests/utils/task.rs index 65b6766b5..f3e6b7b82 100644 --- a/crates/tako/src/internal/tests/utils/task.rs +++ b/crates/tako/src/internal/tests/utils/task.rs @@ -3,7 +3,7 @@ use crate::internal::common::resources::{ NumOfNodes, ResourceAmount, ResourceId, ResourceRequestVariants, }; use crate::internal::messages::worker::TaskRunningMsg; -use crate::internal::server::task::{Task, TaskConfiguration, TaskInput}; +use crate::internal::server::task::{Task, TaskConfiguration}; use crate::resources::ResourceRequest; use crate::{Priority, TaskId}; use smallvec::SmallVec; @@ -11,7 +11,7 @@ use std::rc::Rc; pub struct TaskBuilder { id: TaskId, - inputs: Vec, + task_deps: Vec, n_outputs: u32, finished_resources: Vec, resources_builder: ResBuilder, @@ -23,7 +23,7 @@ impl TaskBuilder { pub fn new>(id: T) -> TaskBuilder { TaskBuilder { id: id.into(), - inputs: Default::default(), + task_deps: Default::default(), n_outputs: 0, finished_resources: vec![], resources_builder: Default::default(), @@ -38,15 +38,12 @@ impl TaskBuilder { } pub fn simple_deps(mut self, deps: &[&Task]) -> TaskBuilder { - self.inputs = deps.iter().map(|&tr| TaskInput::new(tr.id, 0)).collect(); + self.task_deps = deps.iter().map(|&tr| tr.id).collect(); self } pub fn task_deps(mut self, deps: &[&Task]) -> TaskBuilder { - self.inputs = deps - .iter() - .map(|&tr| TaskInput::new_task_dependency(tr.id)) - .collect(); + self.task_deps = deps.iter().map(|&tr| tr.id).collect(); self } @@ -101,7 +98,7 @@ impl TaskBuilder { let resources = ResourceRequestVariants::new(resources); Task::new( self.id, - self.inputs.into_iter().collect(), + self.task_deps.into_iter().collect(), Rc::new(TaskConfiguration { resources, n_outputs: self.n_outputs,