Skip to content

Commit

Permalink
scheduler_mpi: add max_runs_per_task parameter that caps scheduled …
Browse files Browse the repository at this point in the history
…runs
  • Loading branch information
lukas-weber committed Jan 26, 2024
1 parent ccf4bb7 commit 9d1655d
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 9 deletions.
15 changes: 10 additions & 5 deletions src/scheduler_mpi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,15 @@ mutable struct MPISchedulerController <: AbstractScheduler
return new(
active_ranks,
length(job.tasks),
map(
x -> SchedulerTask(x.target_sweeps, x.sweeps, x.dir, 0),
JobTools.read_progress(job),
),
[
SchedulerTask(
p.target_sweeps,
p.sweeps,
p.dir,
0,
get(t.params, :max_runs_per_task, typemax(Int64)),
) for (p, t) in zip(JobTools.read_progress(job), job.tasks)
],
)
end
end
Expand Down Expand Up @@ -286,7 +291,7 @@ function start(

task = job.tasks[msg.task_id]
scheduler_task =
SchedulerTask(msg.sweeps_until_comm, 0, JobTools.task_dir(job, task), 0)
SchedulerTask(msg.sweeps_until_comm, 0, JobTools.task_dir(job, task))
rundir = run_dir(scheduler_task, msg.run_id)

run = read_checkpoint(Run{MC,DefaultRNG}, rundir, task.params, run_comm)
Expand Down
7 changes: 5 additions & 2 deletions src/scheduler_single.jl
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ function start(::Type{SingleScheduler}, job::JobInfo)
scheduler.time_last_checkpoint = scheduler.time_start

scheduler.tasks = map(
x -> SchedulerTask(x.target_sweeps, x.sweeps, x.dir, 0),
x -> SchedulerTask(x.target_sweeps, x.sweeps, x.dir),
JobTools.read_progress(scheduler.job),
)
scheduler.task_id = get_new_task_id(scheduler.tasks, length(scheduler.tasks))
Expand Down Expand Up @@ -77,7 +77,10 @@ function get_new_task_id(
tasks::AbstractVector{SchedulerTask},
old_id::Integer,
)::Union{Integer,Nothing}
next_unshifted = findfirst(x -> !is_done(x), circshift(tasks, -old_id))
next_unshifted = findfirst(
x -> !is_done(x) && x.scheduled_runs < x.max_scheduled_runs,
circshift(tasks, -old_id),
)
if next_unshifted === nothing
return nothing
end
Expand Down
4 changes: 4 additions & 0 deletions src/scheduler_task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@ mutable struct SchedulerTask

dir::String
scheduled_runs::Int64
max_scheduled_runs::Int64
end

SchedulerTask(target_sweeps::Integer, sweeps::Integer, dir::AbstractString) =
SchedulerTask(target_sweeps, sweeps, dir, 0, typemax(Int64))

is_done(task::SchedulerTask) = task.sweeps >= task.target_sweeps

function run_dir(task::SchedulerTask, run_id::Integer)
Expand Down
4 changes: 2 additions & 2 deletions test/test_scheduler.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ using Logging

@testset "Task Selection" begin
sweeps = [100, 10, 10, 101, 10]
tasks = map(s -> Carlo.SchedulerTask(100, s, "", 0), sweeps)
tasks = [Carlo.SchedulerTask(100, s, "") for s in sweeps]

@test Carlo.get_new_task_id(tasks, 1) == 2
@test Carlo.get_new_task_id(tasks, 2) == 3
@test Carlo.get_new_task_id(tasks, 3) == 5
@test Carlo.get_new_task_id(tasks, 4) == 5
@test Carlo.get_new_task_id(tasks, 5) == 2

tasks = map(s -> Carlo.SchedulerTask(100, s, "", 0), [100, 100, 100])
tasks = [Carlo.SchedulerTask(100, s, "") for s in [100, 100, 100]]
for i = 1:length(tasks)
@test Carlo.get_new_task_id(tasks, i) === nothing
end
Expand Down

0 comments on commit 9d1655d

Please sign in to comment.