Skip to content

Commit

Permalink
vine: check worker transfer capacity earlier (#4013)
Browse files Browse the repository at this point in the history
* consider worker transfer capacity in check_worker_against_task

* lint

* fix comment
  • Loading branch information
JinZhou5042 authored Jan 14, 2025
1 parent b37f1ea commit 7891a58
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 27 deletions.
47 changes: 20 additions & 27 deletions taskvine/src/manager/vine_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -3143,7 +3143,7 @@ If a file can be fetched from a substitute source,
this function modifies the file->substitute field to reflect that source.
*/

static int vine_manager_transfer_capacity_available(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t)
int vine_manager_transfer_capacity_available(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t)
{
struct vine_mount *m;

Expand Down Expand Up @@ -3324,55 +3324,41 @@ static int vine_manager_check_library_for_function_call(struct vine_manager *q,
}

/*
Consider if a task is eligible to run, and if so, find the best worker for it.
Consider if a task is eligible to run.
*/
static struct vine_worker_info *consider_task(struct vine_manager *q, struct vine_task *t)
int consider_task(struct vine_manager *q, struct vine_task *t)
{
timestamp_t now_usecs = timestamp_get();
double now_secs = ((double)now_usecs) / ONE_SECOND;

// Skip task if min requested start time not met.
if (t->resources_requested->start > now_secs) {
return NULL;
return 0;
}

// Skip if this task failed recently
if (t->time_when_last_failure + q->transient_error_interval > now_usecs) {
return NULL;
return 0;
}

// Skip if category already running maximum allowed tasks
struct category *c = vine_category_lookup_or_create(q, t->category);
if (c->max_concurrent > -1 && c->max_concurrent <= c->vine_stats->tasks_running) {
return NULL;
return 0;
}

// Skip task if temp input files have not been materialized.
if (!vine_manager_check_inputs_available(q, t)) {
return NULL;
return 0;
}

// Skip function call task if no suitable library template was installed
// Skip function call task if no suitable library template was installed.
if (!vine_manager_check_library_for_function_call(q, t)) {
return NULL;
}

// Find the best worker for the task
q->stats_measure->time_scheduling = timestamp_get();
struct vine_worker_info *w = vine_schedule_task_to_worker(q, t);
if (!w) {
return NULL;
}
q->stats->time_scheduling += timestamp_get() - q->stats_measure->time_scheduling;

// Check if there is transfer capacity available.
if (q->peer_transfers_enabled) {
if (!vine_manager_transfer_capacity_available(q, w, t))
return NULL;
return 0;
}

// All checks passed
return w;
// All checks passed, task is eligible to run.
return 1;
}

/*
Expand All @@ -3385,7 +3371,6 @@ static int send_one_task(struct vine_manager *q)
{
int t_idx;
struct vine_task *t;
struct vine_worker_info *w = NULL;

int iter_count = 0;
int iter_depth = MIN(priority_queue_size(q->ready_tasks), q->attempt_schedule_depth);
Expand All @@ -3406,7 +3391,15 @@ static int send_one_task(struct vine_manager *q)
// the priority queue data structure where also invokes priority_queue_rotate_reset.
PRIORITY_QUEUE_ROTATE_ITERATE(q->ready_tasks, t_idx, t, iter_count, iter_depth)
{
w = consider_task(q, t);
if (!consider_task(q, t)) {
continue;
}

// Find the best worker for the task
q->stats_measure->time_scheduling = timestamp_get();
struct vine_worker_info *w = vine_schedule_task_to_worker(q, t);
q->stats->time_scheduling += timestamp_get() - q->stats_measure->time_scheduling;

if (w) {
priority_queue_remove(q->ready_tasks, t_idx);
commit_task_to_worker(q, w, t);
Expand Down
3 changes: 3 additions & 0 deletions taskvine/src/manager/vine_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,9 @@ struct vine_task *vine_manager_no_wait(struct vine_manager *q, const char *tag,

void vine_manager_remove_worker(struct vine_manager *q, struct vine_worker_info *w, vine_worker_disconnect_reason_t reason);

/* Check if the worker is able to transfer the necessary files for this task. */
int vine_manager_transfer_capacity_available(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t);

/* The expected format of files created by the resource monitor.*/
#define RESOURCE_MONITOR_TASK_LOCAL_NAME "vine-task-%d"
#define RESOURCE_MONITOR_REMOTE_NAME "cctools-monitor"
Expand Down
5 changes: 5 additions & 0 deletions taskvine/src/manager/vine_schedule.c
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,11 @@ int check_worker_against_task(struct vine_manager *q, struct vine_worker_info *w
return 0;
}

/* If the worker has transfer capacity to get this task. */
if (q->peer_transfers_enabled && !vine_manager_transfer_capacity_available(q, w, t)) {
return 0;
}

/* If the worker doesn't have the features the task requires. */
if (t->feature_list) {
if (!w->features) {
Expand Down

0 comments on commit 7891a58

Please sign in to comment.