Skip to content

Commit

Permalink
Add validation of config options for all drivers
Browse files Browse the repository at this point in the history
This still supports unsetting the options when providing an empty string.
Additionally, it raises ValueError when we are not able to set the option
when starting simulation.

If different queue system is ajusted, than currently in use, we validate and issue
only warning instead.

We expose queue_bool_options, queue_memory_options, queue_positive_int_options,
queue_positive_number_options, queue_string_options and
queue_memory_options from ert.config.

NB: when DEBUG_OUTPUT is deprecated it needs to be removed from queue_options.
  • Loading branch information
xjules committed Nov 6, 2023
1 parent 70d1f32 commit 39a2783
Show file tree
Hide file tree
Showing 7 changed files with 252 additions and 123 deletions.
14 changes: 13 additions & 1 deletion src/ert/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,14 @@
from .observations import EnkfObs
from .parameter_config import ParameterConfig
from .parsing import ConfigValidationError, ConfigWarning
from .queue_config import QueueConfig
from .queue_config import (
QueueConfig,
queue_bool_options,
queue_memory_options,
queue_positive_int_options,
queue_positive_number_options,
queue_string_options,
)
from .queue_system import QueueSystem
from .response_config import ResponseConfig
from .summary_config import SummaryConfig
Expand Down Expand Up @@ -62,4 +69,9 @@
"WorkflowJob",
"field_transform",
"get_mode_variables",
"queue_bool_options",
"queue_memory_options",
"queue_positive_int_options",
"queue_positive_number_options",
"queue_string_options",
]
214 changes: 172 additions & 42 deletions src/ert/config/queue_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
import logging
import re
import shutil
import warnings
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Any, Dict, List, Tuple, no_type_check
from typing import Any, Dict, List, Mapping, Set, Tuple, no_type_check

from ert import _clib

from .parsing import ConfigDict, ConfigValidationError, ErrorInfo
from .parsing import ConfigDict, ConfigValidationError, ConfigWarning
from .queue_system import QueueSystem

GENERIC_QUEUE_OPTIONS: List[str] = ["MAX_RUNNING"]
Expand All @@ -31,32 +32,6 @@ class QueueConfig:
default_factory=dict
)

def __post_init__(self) -> None:
errors = []
for _, value in [
setting
for settings in self.queue_options.values()
for setting in settings
if setting[0] == "MAX_RUNNING" and setting[1]
]:
err_msg = "QUEUE_OPTION MAX_RUNNING is"
try:
int_val = int(value)
if int_val < 0:
errors.append(
ErrorInfo(f"{err_msg} negative: {str(value)!r}").set_context(
value
)
)
except ValueError:
errors.append(
ErrorInfo(f"{err_msg} not an integer: {str(value)!r}").set_context(
value
)
)
if errors:
raise ConfigValidationError.from_collected(errors)

@no_type_check
@classmethod
def from_dict(cls, config_dict: ConfigDict) -> QueueConfig:
Expand All @@ -77,13 +52,16 @@ def from_dict(cls, config_dict: ConfigDict) -> QueueConfig:
job_script = job_script or "job_dispatch.py"
max_submit: int = config_dict.get("MAX_SUBMIT", 2)
queue_options: Dict[QueueSystem, List[Tuple[str, str]]] = defaultdict(list)
used_queue_systems: Set[QueueSystem] = set()
for system, option_name, *values in config_dict.get("QUEUE_OPTION", []):
queue_system = QueueSystem.from_string(system)
used_queue_systems.add(system)
if option_name not in VALID_QUEUE_OPTIONS[queue_system]:
raise ConfigValidationError(
f"Invalid QUEUE_OPTION for {queue_system.name}: '{option_name}'. "
f"Valid choices are {sorted(VALID_QUEUE_OPTIONS[queue_system])}."
)

queue_options[queue_system].append(
(option_name, values[0] if values else "")
)
Expand All @@ -95,11 +73,14 @@ def from_dict(cls, config_dict: ConfigDict) -> QueueConfig:
" usually provided by the site-configuration file, beware that"
" you are effectively replacing the default value provided."
)
if (
selected_queue_system == QueueSystem.TORQUE
and queue_options[QueueSystem.TORQUE]
):
_validate_torque_options(queue_options[QueueSystem.TORQUE])

for queue_system_val in used_queue_systems:
if queue_options[queue_system_val]:
_validate_queue_driver_settings(
queue_options[queue_system_val],
queue_system_val.name,
throw_error=(queue_system_val == selected_queue_system),
)

if (
selected_queue_system != QueueSystem.LOCAL
Expand Down Expand Up @@ -139,17 +120,166 @@ def generate_dict(option_list: List[Tuple[str, str]]) -> Dict[str, List[str]]:
)


def _validate_torque_options(torque_options: List[Tuple[str, str]]) -> None:
for option_strings in torque_options:
queue_memory_options: Mapping[str, List[str]] = {
"LSF": [],
"SLURM": [],
"TORQUE": ["MEMORY_PER_JOB"],
"LOCAL": [],
}

queue_string_options: Mapping[str, List[str]] = {
"LSF": [
"LSF_RESOURCE",
"LSF_SERVER",
"LSF_QUEUE",
"LSF_LOGIN_SHELL",
"LSF_RSH_CMD",
"BSUB_CMD",
"BJOBS_CMD",
"BKILL_CMD",
"BHIST_CMD",
"EXCLUDE_HOST",
"PROJECT_CODE",
],
"SLURM": [
"SBATCH",
"SCANCEL",
"SCONTROL",
"SQUEUE",
"PARTITION",
"INCLUDE_HOST",
"EXCLUDE_HOST",
],
"TORQUE": [
"QSUB_CMD",
"QSTAT_CMD",
"QDEL_CMD",
"QSTAT_OPTIONS",
"QUEUE",
"CLUSTER_LABEL",
"JOB_PREFIX",
"DEBUG_OUTPUT",
],
"LOCAL": [],
}

queue_positive_int_options: Mapping[str, List[str]] = {
"LSF": [
"BJOBS_TIMEOUT",
"MAX_RUNNING",
],
"SLURM": [
"MEMORY",
"MEMORY_PER_CPU",
"MAX_RUNNING",
],
"TORQUE": [
"NUM_NODES",
"NUM_CPUS_PER_NODE",
"MAX_RUNNING",
],
"LOCAL": ["MAX_RUNNING"],
}

queue_positive_number_options: Mapping[str, List[str]] = {
"LSF": [
"SUBMIT_SLEEP",
],
"SLURM": [
"SQUEUE_TIMEOUT",
"MAX_RUNTIME",
],
"TORQUE": ["SUBMIT_SLEEP", "QUEUE_QUERY_TIMEOUT"],
"LOCAL": [],
}

queue_bool_options: Mapping[str, List[str]] = {
"LSF": ["DEBUG_OUTPUT"],
"SLURM": [],
"TORQUE": ["KEEP_QSUB_OUTPUT"],
"LOCAL": [],
}


def throw_error_or_warning(
error_msg: str, option_value: str, throw_error: bool
) -> None:
if throw_error:
raise ConfigValidationError.with_context(
error_msg,
option_value,
)
else:
warnings.warn(
ConfigWarning.with_context(
error_msg,
option_value,
),
stacklevel=1,
)


def _validate_queue_driver_settings(
queue_system_options: List[Tuple[str, str]], queue_type: str, throw_error: bool
) -> None:
for option_strings in queue_system_options:
option_name = option_strings[0]
option_value = option_strings[1]
if (
option_value != "" # This is equivalent to the option not being set
and option_name == "MEMORY_PER_JOB"

if option_value == "": # This is equivalent to the option not being set
continue
elif (
option_name in queue_memory_options[queue_type]
and re.match("[0-9]+[mg]b", option_value) is None
):
raise ConfigValidationError(
f"The value '{option_value}' is not valid for the Torque option "
"MEMORY_PER_JOB, it must be of "
"the format '<integer>mb' or '<integer>gb'."
throw_error_or_warning(
(
f"'{option_value}' for {option_name} is not a valid format "
"It should be of the format '<integer>mb' or '<integer>gb'."
),
option_value,
throw_error,
)

elif option_name in queue_string_options[queue_type] and not isinstance(
option_value, str
):
throw_error_or_warning(
f"'{option_value}' for {option_name} is not a valid string type.",
option_value,
throw_error,
)

elif option_name in queue_positive_number_options[queue_type] and (
re.match(r"^\d+(\.\d+)?$", option_value) is None
):
throw_error_or_warning(
f"'{option_value}' for {option_name} is not a valid integer or float.",
option_value,
throw_error,
)

elif option_name in queue_positive_int_options[queue_type] and (
re.match(r"^\d+$", option_value) is None
):
throw_error_or_warning(
f"'{option_value}' for {option_name} is not a valid positive integer.",
option_value,
throw_error,
)

elif option_name in queue_bool_options[queue_type] and not option_value in [
"TRUE",
"FALSE",
"0",
"1",
"T",
"F",
"True",
"False",
]:
throw_error_or_warning(
f"The '{option_value}' for {option_name} should be either TRUE or FALSE.",
option_value,
throw_error,
)
5 changes: 3 additions & 2 deletions src/ert/job_queue/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class Driver(BaseCClass): # type: ignore
TYPE_NAME = "driver"
_alloc = ResPrototype("void* queue_driver_alloc( queue_driver_enum )", bind=False)
_free = ResPrototype("void queue_driver_free( driver )")
_set_option = ResPrototype("void queue_driver_set_option( driver , char* , char*)")
_set_option = ResPrototype("bool queue_driver_set_option( driver , char* , char*)")
_get_option = ResPrototype("char* queue_driver_get_option(driver, char*)")

def __init__(
Expand Down Expand Up @@ -52,7 +52,8 @@ def create_driver(cls, queue_config: QueueConfig) -> "Driver":
driver = Driver(queue_config.queue_system)
if queue_config.queue_system in queue_config.queue_options:
for setting in queue_config.queue_options[queue_config.queue_system]:
driver.set_option(*setting)
if not driver.set_option(*setting):
raise ValueError(f"Queue option not set {setting}")
return driver

@property
Expand Down
Loading

0 comments on commit 39a2783

Please sign in to comment.