Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support disk directive for local executor #5652

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/executor.md
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ The `local` executor is useful for developing and testing a pipeline script on y
Resource requests and other job characteristics can be controlled via the following process directives:

- {ref}`process-cpus`
- {ref}`process-disk`
- {ref}`process-memory`
- {ref}`process-time`
- {ref}`process-container`
Expand Down
5 changes: 5 additions & 0 deletions docs/reference/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,11 @@ The following settings are available:
`executor.cpus`
: The maximum number of CPUs made available by the underlying system. Used only by the `local` executor.

`executor.disk`
: :::{versionadded} 24.12.0-edge
:::
: The maximum amount of disk made available by the underlying system. Used only by the `local` executor.

`executor.dumpInterval`
: Determines how often to log the executor status (default: `5min`).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
*/

package nextflow.processor

import java.lang.management.ManagementFactory
import java.nio.file.FileSystems

import com.sun.management.OperatingSystemMXBean
import groovy.transform.CompileStatic
Expand Down Expand Up @@ -58,6 +60,16 @@ class LocalPollingMonitor extends TaskPollingMonitor {
*/
private final long maxMemory

/**
* Amount of `free` disk available to execute pending tasks
*/
private long availDisk

/**
* Total amount of disk available in the system
*/
private final long maxDisk

/**
* Create the task polling monitor with the provided named parameters object.
* <p>
Expand All @@ -74,8 +86,9 @@ class LocalPollingMonitor extends TaskPollingMonitor {
super(params)
this.availCpus = maxCpus = params.cpus as int
this.availMemory = maxMemory = params.memory as long
assert availCpus>0, "Local avail `cpus` attribute cannot be zero"
assert availMemory>0, "Local avail `memory` attribute cannot zero"
this.availDisk = maxDisk = params.disk as long
assert availCpus>0, "Local available `cpus` attribute cannot be zero"
assert availMemory>0, "Local available `memory` attribute cannot be zero"
}

/**
Expand All @@ -98,14 +111,16 @@ class LocalPollingMonitor extends TaskPollingMonitor {

final int cpus = configCpus(session,name)
final long memory = configMem(session,name)
final long disk = configDisk(session,name)
final int size = session.getQueueSize(name, OS.getAvailableProcessors())

log.debug "Creating local task monitor for executor '$name' > cpus=$cpus; memory=${new MemoryUnit(memory)}; capacity=$size; pollInterval=$pollInterval; dumpInterval=$dumpInterval"
log.debug "Creating local task monitor for executor '$name' > cpus=$cpus; memory=${MemoryUnit.of(memory)}; disk=${MemoryUnit.of(disk)}; capacity=$size; pollInterval=$pollInterval; dumpInterval=$dumpInterval"

new LocalPollingMonitor(
name: name,
cpus: cpus,
memory: memory,
disk: disk,
session: session,
capacity: size,
pollInterval: pollInterval,
Expand All @@ -128,6 +143,15 @@ class LocalPollingMonitor extends TaskPollingMonitor {
(session.getExecConfigProp(name, 'memory', OS.getTotalPhysicalMemorySize()) as MemoryUnit).toBytes()
}

@PackageScope
static long configDisk(Session session, String name) {
if( session.workDir.fileSystem != FileSystems.default ) {
log.debug "Local executor is using a remote work directory -- task disk requirements will be ignored"
return 0
}
(session.getExecConfigProp(name, 'disk', session.workDir.toFile().getUsableSpace()) as MemoryUnit).toBytes()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the avail disk changes over time, how to take into account free spaces can increase or decrease while the workflow is running?

}

/**
* @param handler
* A {@link TaskHandler} instance
Expand All @@ -149,6 +173,17 @@ class LocalPollingMonitor extends TaskPollingMonitor {
handler.task.getConfig()?.getMemory()?.toBytes() ?: 1L
}

/**
*
* @param handler
* A {@link TaskHandler} instance
* @return
* The amount of disk (bytes) requested to execute the specified task
*/
private static long disk(TaskHandler handler) {
handler.task.getConfig()?.getDisk()?.toBytes() ?: 1L
}

/**
* Determines if a task can be submitted for execution checking if the resources required
* (cpus and memory) match the amount of avail resource
Expand All @@ -167,20 +202,34 @@ class LocalPollingMonitor extends TaskPollingMonitor {
protected boolean canSubmit(TaskHandler handler) {

final taskCpus = cpus(handler)
if( taskCpus>maxCpus )
throw new ProcessUnrecoverableException("Process requirement exceeds available CPUs -- req: $taskCpus; avail: $maxCpus")
if( taskCpus > maxCpus )
throw new ProcessUnrecoverableException("Task requirement exceeds available CPUs -- req: $taskCpus; avail: $maxCpus")

final taskMemory = mem(handler)
if( taskMemory>maxMemory)
throw new ProcessUnrecoverableException("Process requirement exceeds available memory -- req: ${new MemoryUnit(taskMemory)}; avail: ${new MemoryUnit(maxMemory)}")
if( taskMemory > maxMemory )
throw new ProcessUnrecoverableException("Task requirement exceeds available memory -- req: ${MemoryUnit.of(taskMemory)}; avail: ${MemoryUnit.of(maxMemory)}")

final taskDisk = disk(handler)
if( isDiskEnabled() && taskDisk > maxDisk )
throw new ProcessUnrecoverableException("Task requirement exceeds available disk -- req: ${MemoryUnit.of(taskDisk)}; avail: ${MemoryUnit.of(maxDisk)}")

final result = super.canSubmit(handler) && taskCpus <= availCpus && taskMemory <= availMemory
if( !result && log.isTraceEnabled( ) ) {
log.trace "Task `${handler.task.name}` cannot be scheduled -- taskCpus: $taskCpus <= availCpus: $availCpus && taskMemory: ${new MemoryUnit(taskMemory)} <= availMemory: ${new MemoryUnit(availMemory)}"
final result = super.canSubmit(handler) && taskCpus <= availCpus && taskMemory <= availMemory && (maxDisk == 0 || taskDisk <= availDisk)
if( !result && log.isTraceEnabled() ) {
def message = "Task `${handler.task.name}` cannot be scheduled -- taskCpus: $taskCpus <= availCpus: $availCpus && taskMemory: ${MemoryUnit.of(taskMemory)} <= availMemory: ${MemoryUnit.of(availMemory)}"
if( isDiskEnabled() )
message += " && taskDisk: ${MemoryUnit.of(taskDisk)} <= availDisk: ${MemoryUnit.of(availDisk)}"
log.trace message
}
return result
}

/**
* Determine whether task disk requirements should be enforced.
*/
protected boolean isDiskEnabled() {
return maxDisk > 0
}

/**
* Submits a task for execution allocating the resources (cpus and memory)
* requested by the task
Expand All @@ -192,6 +241,8 @@ class LocalPollingMonitor extends TaskPollingMonitor {
super.submit(handler)
availCpus -= cpus(handler)
availMemory -= mem(handler)
if( isDiskEnabled() )
availDisk -= disk(handler)
}

/**
Expand All @@ -209,6 +260,8 @@ class LocalPollingMonitor extends TaskPollingMonitor {
if( result ) {
availCpus += cpus(handler)
availMemory += mem(handler)
if( isDiskEnabled() )
availDisk += disk(handler)
}
return result
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,25 @@ class LocalPollingMonitorTest extends Specification {
cpus: 10,
capacity: 20,
memory: _20_GB,
disk: _20_GB,
session: session,
name: 'local',
pollInterval: 100
)

def task = new TaskRun()
task.config = new TaskConfig(cpus: 3, memory: MemoryUnit.of('2GB'))
task.config = new TaskConfig(cpus: 3, memory: MemoryUnit.of('2GB'), disk: MemoryUnit.of('10GB'))
def handler = Mock(TaskHandler)
handler.getTask() >> { task }

expect:
monitor.availCpus == 10
monitor.capacity == 20
monitor.availMemory == _20_GB
monitor.availDisk == _20_GB
monitor.maxCpus == 10
monitor.maxMemory == _20_GB
monitor.maxDisk == _20_GB

when:
monitor.submit(handler)
Expand All @@ -62,17 +65,21 @@ class LocalPollingMonitorTest extends Specification {
monitor.getRunningQueue().size()==1
monitor.availCpus == 7
monitor.availMemory == MemoryUnit.of('18GB').toBytes()
monitor.availDisk == MemoryUnit.of('10GB').toBytes()
monitor.maxCpus == 10
monitor.maxMemory == _20_GB
monitor.maxDisk == _20_GB

when:
monitor.remove(handler)
then:
monitor.getRunningQueue().size()==0
monitor.availCpus == 10
monitor.availMemory == _20_GB
monitor.availDisk == _20_GB
monitor.maxCpus == 10
monitor.maxMemory == _20_GB
monitor.maxDisk == _20_GB

}

Expand All @@ -86,18 +93,18 @@ class LocalPollingMonitorTest extends Specification {
cpus: 10,
capacity: 10,
memory: _20_GB,
disk: _20_GB,
session: session,
name: 'local',
pollInterval: 100
)

def task = new TaskRun()
task.config = new TaskConfig(cpus: 4, memory: MemoryUnit.of('8GB'))
task.config = new TaskConfig(cpus: 4, memory: MemoryUnit.of('8GB'), disk: MemoryUnit.of('10GB'))
def handler = Mock(TaskHandler)
handler.getTask() >> { task }
handler.canForkProcess() >> true
handler.isReady() >> true

expect:
monitor.canSubmit(handler) == true

Expand All @@ -110,6 +117,7 @@ class LocalPollingMonitorTest extends Specification {
monitor.canSubmit(handler) == true
monitor.availCpus == 6
monitor.availMemory == MemoryUnit.of('12GB').toBytes()
monitor.availDisk == MemoryUnit.of('10GB').toBytes()

when:
monitor.submit(handler)
Expand All @@ -120,6 +128,7 @@ class LocalPollingMonitorTest extends Specification {
monitor.canSubmit(handler) == false
monitor.availCpus == 2
monitor.availMemory == MemoryUnit.of('4GB').toBytes()
monitor.availDisk == 0

}

Expand All @@ -132,6 +141,7 @@ class LocalPollingMonitorTest extends Specification {
cpus: 1,
capacity: 1,
memory: _20_GB,
disk: 0,
session: session,
name: 'local',
pollInterval: 100
Expand Down Expand Up @@ -167,6 +177,7 @@ class LocalPollingMonitorTest extends Specification {
cpus: 10,
capacity: 20,
memory: _20_GB,
disk: 0,
session: session,
name: 'local',
pollInterval: 100
Expand All @@ -181,7 +192,7 @@ class LocalPollingMonitorTest extends Specification {
monitor.canSubmit(handler)
then:
def e1 = thrown(ProcessUnrecoverableException)
e1.message == 'Process requirement exceeds available CPUs -- req: 12; avail: 10'
e1.message == 'Task requirement exceeds available CPUs -- req: 12; avail: 10'


}
Expand All @@ -195,6 +206,7 @@ class LocalPollingMonitorTest extends Specification {
cpus: 10,
capacity: 20,
memory: _20_GB,
disk: 0,
session: session,
name: 'local',
pollInterval: 100
Expand All @@ -209,7 +221,35 @@ class LocalPollingMonitorTest extends Specification {
monitor.canSubmit(handler)
then:
def e2 = thrown(ProcessUnrecoverableException)
e2.message == 'Process requirement exceeds available memory -- req: 22 GB; avail: 20 GB'
e2.message == 'Task requirement exceeds available memory -- req: 22 GB; avail: 20 GB'

}

def 'should throw an exception for missing disk' () {

given:
def _20_GB = MemoryUnit.of('20GB').toBytes()
def session = new Session()
def monitor = new LocalPollingMonitor(
cpus: 10,
capacity: 20,
memory: _20_GB,
disk: _20_GB,
session: session,
name: 'local',
pollInterval: 100
)

def task = new TaskRun()
task.config = new TaskConfig(disk: MemoryUnit.of('22GB'))
def handler = Mock(TaskHandler)
handler.getTask() >> { task }

when:
monitor.canSubmit(handler)
then:
def e2 = thrown(ProcessUnrecoverableException)
e2.message == 'Task requirement exceeds available disk -- req: 22 GB; avail: 20 GB'

}

Expand Down Expand Up @@ -267,4 +307,21 @@ class LocalPollingMonitorTest extends Specification {
LocalPollingMonitor.configMem(session4,'local') == OS.getTotalPhysicalMemorySize()

}

def 'should get the amount of disk' () {

given:
def _10_GB = MemoryUnit.of('10 GB').toBytes()

when:
def session2 = new Session([executor: [disk: '10 GB']])
then:
LocalPollingMonitor.configDisk(session2,'local') == _10_GB

when:
def session3 = new Session([executor: ['$local': [disk: _10_GB]]])
then:
LocalPollingMonitor.configDisk(session3,'local') == _10_GB

}
}
Loading