Skip to content

Commit

Permalink
Removed some old code related to data deps
Browse files Browse the repository at this point in the history
  • Loading branch information
spirali committed Oct 31, 2024
1 parent 825fca0 commit 30f2e69
Show file tree
Hide file tree
Showing 14 changed files with 41 additions and 128 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ anyhow = "1"
nix = { version = "0.29", features = ["process", "signal"] }
bstr = { version = "1.9", features = ["serde"] }
psutil = "3"
thin-vec = { version = "0.2", features = ["serde"] }


[profile.release]
panic = "abort"
Expand Down
1 change: 1 addition & 0 deletions crates/hyperqueue/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ nix = { workspace = true }
bstr = { workspace = true }
psutil = { workspace = true }
byteorder = { workspace = true }
thin-vec = { workspace = true }

humantime = "2"
num_cpus = "1"
Expand Down
3 changes: 2 additions & 1 deletion crates/hyperqueue/src/server/client/submit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::time::Duration;
use bstr::BString;
use tako::Map;
use tako::Set;
use thin_vec::ThinVec;

use tako::gateway::{
FromGatewayMessage, NewTasksMessage, ResourceRequestVariants, SharedTaskConfiguration,
Expand Down Expand Up @@ -272,7 +273,7 @@ fn build_tasks_array(
let build_task_conf = |body: Box<[u8]>, tako_id: TakoTaskId| TaskConfiguration {
id: tako_id,
shared_data_index: 0,
task_deps: Vec::new(),
task_deps: ThinVec::new(),
body,
};

Expand Down
3 changes: 2 additions & 1 deletion crates/tako/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@ tracing = { workspace = true }
nix = { workspace = true }
bstr = { workspace = true }
psutil = { workspace = true }
thin-vec = { workspace = true }


hashbrown = { version = "0.15", features = ["serde", "inline-more"], default-features = false }
tracing-subscriber = { version = "0.3", features = ["json"] }
priority-queue = "2"
bitflags = "2"
fxhash = "0.2"
thin-vec = "0.2"
derive_more = { version = "1", features = ["add", "add_assign", "sum"] }

[dev-dependencies]
Expand Down
3 changes: 2 additions & 1 deletion crates/tako/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::task::SerializedTaskContext;
use crate::{InstanceId, Map, Priority, TaskId, WorkerId};
use smallvec::{smallvec, SmallVec};
use std::time::Duration;
use thin_vec::ThinVec;

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)]
pub struct ResourceRequestEntry {
Expand Down Expand Up @@ -116,7 +117,7 @@ pub struct TaskConfiguration {
/// Index into NewTasksMessage::shared_data that contains the shared data for this task.
pub shared_data_index: u32,

pub task_deps: Vec<TaskId>,
pub task_deps: ThinVec<TaskId>,

/// Opaque data that is passed by the gateway user to task launchers.
#[serde(with = "serde_bytes")]
Expand Down
7 changes: 3 additions & 4 deletions crates/tako/src/internal/scheduler/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,13 @@ fn crawl<F1: Fn(&Task) -> &Set<TaskId>>(tasks: &mut TaskMap, predecessor_fn: F1)
let task = tasks.get_task_mut(task_id);
task.set_scheduler_priority(level + 1);

for ti in task.inputs.iter() {
let input_id = ti.task();
for t in task.task_deps.iter() {
let v: &mut u32 = neighbours
.get_mut(&input_id)
.get_mut(t)
.expect("Couldn't find task neighbour in level computation");
if *v <= 1 {
assert_eq!(*v, 1);
stack.push(input_id);
stack.push(*t);
} else {
*v -= 1;
}
Expand Down
6 changes: 3 additions & 3 deletions crates/tako/src/internal/scheduler/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl SchedulerState {
try_prev_worker: bool, // Enable heuristics that tries to fit tasks on fewer workers
) -> Option<WorkerId> {
// Fast path
if try_prev_worker && task.inputs.is_empty() {
if try_prev_worker && task.task_deps.is_empty() {
// Note: We are *not* using "is_capable_to_run" but "have_immediate_resources_for_rq",
// because we want to enable fast path only if task can be directly executed
// We want to avoid creation of overloaded
Expand Down Expand Up @@ -256,7 +256,7 @@ impl SchedulerState {
worker_id
);
}
(task.inputs.clone(), assigned_worker)
(task.task_deps.clone(), assigned_worker)
};

let (tasks, workers) = core.split_tasks_workers_mut();
Expand Down Expand Up @@ -419,7 +419,7 @@ impl SchedulerState {
let task = tasks.get_task_mut(task_id);
if task.is_sn_running()
|| (not_overloaded
&& (task.is_fresh() || !task.inputs.is_empty())
&& (task.is_fresh() || !task.task_deps.is_empty())
&& worker.has_time_to_run_for_rqv(&task.configuration.resources, now))
{
continue;
Expand Down
58 changes: 2 additions & 56 deletions crates/tako/src/internal/server/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,57 +11,8 @@ use crate::internal::scheduler::query::compute_new_worker_query;
use crate::internal::server::comm::{Comm, CommSender, CommSenderRef};
use crate::internal::server::core::{Core, CoreRef};
use crate::internal::server::reactor::{on_cancel_tasks, on_new_tasks};
use crate::internal::server::task::{Task, TaskConfiguration, TaskInput, TaskRuntimeState};
use crate::internal::server::task::{Task, TaskConfiguration, TaskRuntimeState};
use std::rc::Rc;
use thin_vec::ThinVec;

/*pub(crate) async fn client_connection_handler(
core_ref: CoreRef,
comm_ref: CommSenderRef,
listener: UnixListener,
client_sender: UnboundedSender<ToGatewayMessage>,
client_receiver: UnboundedReceiver<ToGatewayMessage>,
) {
if let Ok((stream, _)) = listener.accept().await {
let framed = make_protocol_builder().new_framed(stream);
let (sender, mut receiver) = framed.split();
let send_loop = forward_queue_to_sink(client_receiver, sender, |msg| {
rmp_serde::to_vec_named(&msg).unwrap().into()
});
{
let core = core_ref.get();
let mut comm = comm_ref.get_mut();
for worker in core.get_workers() {
comm.send_client_worker_new(worker.id, &worker.configuration);
}
}
let receive_loop = async move {
while let Some(data) = receiver.next().await {
// TODO: Instead of unwrap, send error message to client
let data = data.unwrap();
let message: Result<FromGatewayMessage, _> = rmp_serde::from_slice(&data);
let error = match message {
Ok(message) => {
process_client_message(&core_ref, &comm_ref, &client_sender, message).await
}
Err(error) => Some(format!("Invalid format of message: {}", error)),
};
if let Some(message) = error {
client_sender
.send(ToGatewayMessage::Error(ErrorResponse { message }))
.unwrap();
}
}
};
tokio::select! {
r = send_loop => { r.unwrap() },
() = receive_loop => {},
}
} else {
panic!("Invalid connection from client");
}
log::info!("Client connection terminated");
}*/

fn create_task_configuration(
core_ref: &mut Core,
Expand Down Expand Up @@ -238,12 +189,7 @@ fn handle_new_tasks(
return Some(format!("Invalid configuration index {idx}"));
}
let conf = &configurations[idx];
let inputs: ThinVec<_> = task
.task_deps
.iter()
.map(|&task_id| TaskInput::new_task_dependency(task_id))
.collect();
let task = Task::new(task.id, inputs, conf.clone(), task.body);
let task = Task::new(task.id, task.task_deps, conf.clone(), task.body);
tasks.push(task);
}
if !msg.adjust_instance_id.is_empty() {
Expand Down
12 changes: 6 additions & 6 deletions crates/tako/src/internal/server/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,8 @@ impl Core {
#[cfg(test)]
pub fn sanity_check(&self) {
let fw_check = |task: &Task| {
for input in &task.inputs {
assert!(self.tasks.get_task(input.task()).is_finished());
for task_dep in &task.task_deps {
assert!(self.tasks.get_task(*task_dep).is_finished());
}
for &task_id in task.get_consumers() {
assert!(self.tasks.get_task(task_id).is_waiting());
Expand Down Expand Up @@ -398,8 +398,8 @@ impl Core {
match &task.state {
TaskRuntimeState::Waiting(winfo) => {
let mut count = 0;
for ti in &task.inputs {
if !self.tasks.get_task(ti.task()).is_finished() {
for task_dep in &task.task_deps {
if !self.tasks.get_task(*task_dep).is_finished() {
count += 1;
}
}
Expand All @@ -425,8 +425,8 @@ impl Core {
}

TaskRuntimeState::Finished(_) => {
for ti in &task.inputs {
assert!(self.tasks.get_task(ti.task()).is_finished());
for task_dep in &task.task_deps {
assert!(self.tasks.get_task(*task_dep).is_finished());
}
}
TaskRuntimeState::RunningMultiNode(ws) => {
Expand Down
14 changes: 3 additions & 11 deletions crates/tako/src/internal/server/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,8 @@ pub(crate) fn on_new_tasks(core: &mut Core, comm: &mut impl Comm, new_tasks: Vec
let mut task = task_map.remove(&task_id).unwrap();

let mut count = 0;
for ti in task.inputs.iter() {
let input_id = ti.task();
let task_dep = task_map
.get_mut(&input_id)
.unwrap_or_else(|| core.get_task_mut(input_id));
for t in task.task_deps.iter() {
let task_dep = task_map.get_mut(t).unwrap_or_else(|| core.get_task_mut(*t));
task_dep.add_consumer(task.id);
if !task_dep.is_finished() {
count += 1
Expand Down Expand Up @@ -555,12 +552,7 @@ pub(crate) fn on_cancel_tasks(
}

fn unregister_as_consumer(core: &mut Core, comm: &mut impl Comm, task_id: TaskId) {
let inputs: Vec<TaskId> = core
.get_task(task_id)
.inputs
.iter()
.map(|ti| ti.task())
.collect();
let inputs: Vec<TaskId> = core.get_task(task_id).task_deps.iter().copied().collect();
for input_id in inputs {
let input = core.get_task_mut(input_id);
assert!(input.remove_consumer(task_id));
Expand Down
38 changes: 3 additions & 35 deletions crates/tako/src/internal/server/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,38 +54,6 @@ bitflags::bitflags! {
}
}

#[derive(Clone)]
#[cfg_attr(test, derive(Eq, PartialEq))]
pub struct TaskInput {
task: TaskId,
output_id: u32, // MAX = pure dependency on task, not real output id
}

impl TaskInput {
pub fn new(task: TaskId, output_id: u32) -> Self {
TaskInput { task, output_id }
}

pub fn new_task_dependency(task: TaskId) -> Self {
TaskInput {
task,
output_id: u32::MAX,
}
}

pub fn task(&self) -> TaskId {
self.task
}

pub fn output_id(&self) -> Option<u32> {
if self.output_id == u32::MAX {
None
} else {
Some(self.output_id)
}
}
}

#[derive(Debug)]
#[cfg_attr(test, derive(Eq, PartialEq))]
pub struct TaskConfiguration {
Expand All @@ -101,7 +69,7 @@ pub struct Task {
pub id: TaskId,
pub state: TaskRuntimeState,
consumers: Set<TaskId>,
pub inputs: ThinVec<TaskInput>,
pub task_deps: ThinVec<TaskId>,
pub flags: TaskFlags,
pub configuration: Rc<TaskConfiguration>,
pub scheduler_priority: Priority,
Expand All @@ -123,7 +91,7 @@ impl fmt::Debug for Task {
impl Task {
pub fn new(
id: TaskId,
inputs: ThinVec<TaskInput>,
dependencies: ThinVec<TaskId>,
configuration: Rc<TaskConfiguration>,
body: Box<[u8]>,
) -> Self {
Expand All @@ -134,7 +102,7 @@ impl Task {

Self {
id,
inputs,
task_deps: dependencies,
flags,
configuration,
body,
Expand Down
3 changes: 2 additions & 1 deletion crates/tako/src/internal/tests/integration/utils/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::time::Duration;
use crate::internal::common::index::ItemId;
use derive_builder::Builder;
use smallvec::smallvec;
use thin_vec::ThinVec;

use crate::gateway::{
ResourceRequest, ResourceRequestEntry, ResourceRequestVariants, SharedTaskConfiguration,
Expand Down Expand Up @@ -119,7 +120,7 @@ pub fn build_task_def_from_config(
TaskConfiguration {
id: TaskId::new(id.unwrap_or(1) as <TaskId as ItemId>::IdType),
shared_data_index: 0,
task_deps: Vec::new(),
task_deps: ThinVec::new(),
body: body.into_boxed_slice(),
},
conf,
Expand Down
Loading

0 comments on commit 30f2e69

Please sign in to comment.