From 7891a5805a74b92b9fe7287a9505ba6bd1c45cd4 Mon Sep 17 00:00:00 2001 From: JinZhou5042 <142265839+JinZhou5042@users.noreply.github.com> Date: Tue, 14 Jan 2025 08:06:05 -0500 Subject: [PATCH] vine: check worker transfer capacity earlier (#4013) * consider worker transfer capacity in check_worker_against_task * lint * fix comment --- taskvine/src/manager/vine_manager.c | 47 ++++++++++++---------------- taskvine/src/manager/vine_manager.h | 3 ++ taskvine/src/manager/vine_schedule.c | 5 +++ 3 files changed, 28 insertions(+), 27 deletions(-) diff --git a/taskvine/src/manager/vine_manager.c b/taskvine/src/manager/vine_manager.c index 9f77bbc60a..db5bbd2a4d 100644 --- a/taskvine/src/manager/vine_manager.c +++ b/taskvine/src/manager/vine_manager.c @@ -3143,7 +3143,7 @@ If a file can be fetched from a substitute source, this function modifies the file->substitute field to reflect that source. */ -static int vine_manager_transfer_capacity_available(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t) +int vine_manager_transfer_capacity_available(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t) { struct vine_mount *m; @@ -3324,55 +3324,41 @@ static int vine_manager_check_library_for_function_call(struct vine_manager *q, } /* -Consider if a task is eligible to run, and if so, find the best worker for it. +Consider if a task is eligible to run. */ -static struct vine_worker_info *consider_task(struct vine_manager *q, struct vine_task *t) +int consider_task(struct vine_manager *q, struct vine_task *t) { timestamp_t now_usecs = timestamp_get(); double now_secs = ((double)now_usecs) / ONE_SECOND; // Skip task if min requested start time not met. if (t->resources_requested->start > now_secs) { - return NULL; + return 0; } // Skip if this task failed recently if (t->time_when_last_failure + q->transient_error_interval > now_usecs) { - return NULL; + return 0; } // Skip if category already running maximum allowed tasks struct category *c = vine_category_lookup_or_create(q, t->category); if (c->max_concurrent > -1 && c->max_concurrent <= c->vine_stats->tasks_running) { - return NULL; + return 0; } // Skip task if temp input files have not been materialized. if (!vine_manager_check_inputs_available(q, t)) { - return NULL; + return 0; } - // Skip function call task if no suitable library template was installed + // Skip function call task if no suitable library template was installed. if (!vine_manager_check_library_for_function_call(q, t)) { - return NULL; - } - - // Find the best worker for the task - q->stats_measure->time_scheduling = timestamp_get(); - struct vine_worker_info *w = vine_schedule_task_to_worker(q, t); - if (!w) { - return NULL; - } - q->stats->time_scheduling += timestamp_get() - q->stats_measure->time_scheduling; - - // Check if there is transfer capacity available. - if (q->peer_transfers_enabled) { - if (!vine_manager_transfer_capacity_available(q, w, t)) - return NULL; + return 0; } - // All checks passed - return w; + // All checks passed, task is eligible to run. + return 1; } /* @@ -3385,7 +3371,6 @@ static int send_one_task(struct vine_manager *q) { int t_idx; struct vine_task *t; - struct vine_worker_info *w = NULL; int iter_count = 0; int iter_depth = MIN(priority_queue_size(q->ready_tasks), q->attempt_schedule_depth); @@ -3406,7 +3391,15 @@ static int send_one_task(struct vine_manager *q) // the priority queue data structure where also invokes priority_queue_rotate_reset. PRIORITY_QUEUE_ROTATE_ITERATE(q->ready_tasks, t_idx, t, iter_count, iter_depth) { - w = consider_task(q, t); + if (!consider_task(q, t)) { + continue; + } + + // Find the best worker for the task + q->stats_measure->time_scheduling = timestamp_get(); + struct vine_worker_info *w = vine_schedule_task_to_worker(q, t); + q->stats->time_scheduling += timestamp_get() - q->stats_measure->time_scheduling; + if (w) { priority_queue_remove(q->ready_tasks, t_idx); commit_task_to_worker(q, w, t); diff --git a/taskvine/src/manager/vine_manager.h b/taskvine/src/manager/vine_manager.h index b6951937f5..cf93c79d9a 100644 --- a/taskvine/src/manager/vine_manager.h +++ b/taskvine/src/manager/vine_manager.h @@ -278,6 +278,9 @@ struct vine_task *vine_manager_no_wait(struct vine_manager *q, const char *tag, void vine_manager_remove_worker(struct vine_manager *q, struct vine_worker_info *w, vine_worker_disconnect_reason_t reason); +/* Check if the worker is able to transfer the necessary files for this task. */ +int vine_manager_transfer_capacity_available(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t); + /* The expected format of files created by the resource monitor.*/ #define RESOURCE_MONITOR_TASK_LOCAL_NAME "vine-task-%d" #define RESOURCE_MONITOR_REMOTE_NAME "cctools-monitor" diff --git a/taskvine/src/manager/vine_schedule.c b/taskvine/src/manager/vine_schedule.c index 115c8898ef..a15d076999 100644 --- a/taskvine/src/manager/vine_schedule.c +++ b/taskvine/src/manager/vine_schedule.c @@ -233,6 +233,11 @@ int check_worker_against_task(struct vine_manager *q, struct vine_worker_info *w return 0; } + /* If the worker has transfer capacity to get this task. */ + if (q->peer_transfers_enabled && !vine_manager_transfer_capacity_available(q, w, t)) { + return 0; + } + /* If the worker doesn't have the features the task requires. */ if (t->feature_list) { if (!w->features) {