Skip to content

Commit

Permalink
Merge branch 'main' into schema-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Comeani authored Aug 21, 2024
2 parents be37f41 + b4d5bd4 commit 93c0a2e
Show file tree
Hide file tree
Showing 5 changed files with 422 additions and 121 deletions.
111 changes: 54 additions & 57 deletions apps/crc_idle.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,29 @@
Resource summaries are provided for GPU and CPU partitions.
"""

from argparse import Namespace
from typing import Dict, Tuple
import re
from argparse import Namespace
from collections import defaultdict

from .utils import Shell, Slurm
from .utils.cli import BaseParser
from .utils.system_info import Shell, Slurm


class CrcIdle(BaseParser):
"""Display idle Slurm resources."""

# The type of resource available on a cluster
# Either ``cores`` or ``GPUs`` depending on the cluster type
cluster_types = {
'smp': 'cores',
'gpu': 'GPUs',
'mpi': 'cores',
'htc': 'cores'
}
default_type = 'cores'
# Specify the type of resource available on each cluster
# Either `cores` or `GPUs` depending on the cluster type
cluster_types = defaultdict(
lambda: 'cores',
smp='cores',
gpu='GPUs',
mpi='cores',
htc='cores',
)

def __init__(self) -> None:
"""Define arguments for the command line interface"""
"""Define arguments for the command line interface."""

super(CrcIdle, self).__init__()
self.add_argument('-s', '--smp', action='store_true', help='list idle resources on the smp cluster')
Expand All @@ -37,8 +37,8 @@ def __init__(self) -> None:
self.add_argument('-d', '--htc', action='store_true', help='list idle resources on the htc cluster')
self.add_argument('-p', '--partition', nargs='+', help='only include information for specific partitions')

def get_cluster_list(self, args: Namespace) -> Tuple[str]:
"""Return a list of clusters specified by command line arguments
def get_cluster_list(self, args: Namespace) -> tuple[str]:
"""Return a list of clusters specified by command line arguments.
Returns a tuple of clusters specified by command line arguments. If no
clusters were specified, then return a tuple of all cluster names.
Expand All @@ -50,64 +50,61 @@ def get_cluster_list(self, args: Namespace) -> Tuple[str]:
A tuple of cluster names
"""

argument_options = self.cluster_types
argument_clusters = tuple(filter(lambda cluster: getattr(args, cluster), argument_options))
# Select only the specified clusters
argument_clusters = tuple(self.cluster_types.keys())
specified_clusters = tuple(filter(lambda cluster: getattr(args, cluster), argument_clusters))

# Default to returning all clusters
return argument_clusters or argument_options
return specified_clusters or argument_clusters

@staticmethod
def _idle_cpu_resources(cluster: str, partition: str) -> Dict[int, int]:
"""Return the idle CPU resources on a given cluster partition
def _count_idle_cpu_resources(cluster: str, partition: str) -> dict[int, int]:
"""Return the idle CPU resources on a given cluster partition.
Args:
cluster: The cluster to print a summary for
partition: The partition in the parent cluster
cluster: The cluster to print a summary for.
partition: The partition in the parent cluster.
Returns:
A dictionary mapping idle resources to number of nodes
A dictionary mapping the number of idle resources to the number of nodes with that many idle resources.
"""

# Use `sinfo` command to determine the status of each node in the given partition
command = f'sinfo -h -M {cluster} -p {partition} -N -o %N,%C'
stdout = Shell.run_command(command)
slurm_data = stdout.strip().split()
slurm_data = Shell.run_command(command).strip().split()

# Count the number of nodes having a given number of idle cores/GPUs
return_dict = dict()
for node_info in slurm_data:
_, resource_data = node_info.split(',') # Returns: node_name, resource_data
_, idle, _, _ = [int(x) for x in resource_data.split('/')] # Returns: allocated, idle, other, total
node_name, resource_data = node_info.split(',')
allocated, idle, other, total = [int(x) for x in resource_data.split('/')]
return_dict[idle] = return_dict.setdefault(idle, 0) + 1

return return_dict

@staticmethod
def _idle_gpu_resources(cluster: str, partition: str) -> Dict[int, int]:
"""Return idle GPU resources on a given cluster partition
def _count_idle_gpu_resources(cluster: str, partition: str) -> dict[int, int]:
"""Return idle GPU resources on a given cluster partition.
If the host node is in a ``drain`` state, the GPUs are reported as unavailable.
If the host node is in a `drain` state, the GPUs are reported as unavailable.
Args:
cluster: The cluster to print a summary for
partition: The partition in the parent cluster
cluster: The cluster to print a summary for.
partition: The partition in the parent cluster.
Returns:
A dictionary mapping idle resources to number of nodes
A dictionary mapping the number of idle resources to the number of nodes with that many idle resources.
"""

# Use `sinfo` command to determine the status of each node in the given partition
command = f"sinfo -h -M {cluster} -p {partition} -N " \
f"--Format=NodeList:'_',gres:5'_',gresUsed:12'_',StateCompact:' '"

stdout = Shell.run_command(command)
slurm_data = stdout.strip().split()
slurm_output_format = "NodeList:'_',gres:5'_',gresUsed:12'_',StateCompact:' '"
command = f"sinfo -h -M {cluster} -p {partition} -N --Format={slurm_output_format}"
slurm_data = Shell.run_command(command).strip().split()

# Count the number of nodes having a given number of idle cores/GPUs
return_dict = dict()
for node_info in slurm_data:
# Returns: node_name, total, allocated, node state
_, total, allocated, state = node_info.split('_')
node_name, total, allocated, state = node_info.split('_')

# If the node is in a downed state, report 0 resource availability.
if re.search("drain", state):
Expand All @@ -122,61 +119,61 @@ def _idle_gpu_resources(cluster: str, partition: str) -> Dict[int, int]:

return return_dict

def count_idle_resources(self, cluster: str, partition: str) -> Dict[int, int]:
"""Determine the number of idle resources on a given cluster partition
def count_idle_resources(self, cluster: str, partition: str) -> dict[int, int]:
"""Determine the number of idle resources on a given cluster partition.
The returned dictionary maps the number of idle resources (e.g., cores)
to the number of nodes in the partition having that many resources idle.
Args:
cluster: The cluster to print a summary for
partition: The partition in the parent cluster
cluster: The cluster to print a summary for.
partition: The partition in the parent cluster.
Returns:
A dictionary mapping idle resources to number of nodes
A dictionary mapping idle resources to number of nodes.
"""

cluster_type = self.cluster_types.get(cluster, self.default_type)
cluster_type = self.cluster_types[cluster]
if cluster_type == 'GPUs':
return self._idle_gpu_resources(cluster, partition)
return self._count_idle_gpu_resources(cluster, partition)

elif cluster_type == 'cores':
return self._idle_cpu_resources(cluster, partition)
return self._count_idle_cpu_resources(cluster, partition)

raise ValueError(f'Unknown cluster type: {cluster}')

def print_partition_summary(self, cluster: str, partition: str) -> None:
def print_partition_summary(self, cluster: str, partition: str, idle_resources: dict) -> None:
"""Print a summary of idle resources in a single partition
Args:
cluster: The cluster to print a summary for
partition: The partition in the parent cluster
idle_resources: Dictionary mapping idle resources to number of nodes
"""

resource_allocation = self.count_idle_resources(cluster, partition)

output_width = 30
header = f'Cluster: {cluster}, Partition: {partition}'
unit = self.cluster_types.get(cluster, self.default_type)
unit = self.cluster_types[cluster]

print(header)
print('=' * output_width)
for idle, nodes in sorted(resource_allocation.items()):
for idle, nodes in sorted(idle_resources.items()):
print(f'{nodes:4d} nodes w/ {idle:3d} idle {unit}')

if not resource_allocation:
if not idle_resources:
print(' No idle resources')

print('')

def app_logic(self, args: Namespace) -> None:
"""Logic to evaluate when executing the application
"""Logic to evaluate when executing the application.
Args:
args: Parsed command line arguments
args: Parsed command line arguments.
"""

for cluster in self.get_cluster_list(args):
partitions_to_print = args.partition or Slurm.get_partition_names(cluster)
for partition in partitions_to_print:
self.print_partition_summary(cluster, partition)
idle_resources = self.count_idle_resources(cluster, partition)
self.print_partition_summary(cluster, partition, idle_resources)
79 changes: 46 additions & 33 deletions apps/crc_interactive.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""A simple wrapper around the Slurm ``srun`` command
"""A simple wrapper around the Slurm `srun` command.
The application launches users into an interactive Slurm session on a
user-selected cluster and (if specified) partition. Dedicated command line
Expand All @@ -9,7 +9,8 @@
to be manually added (or removed) by updating the application CLI arguments.
"""

from argparse import Namespace, ArgumentTypeError
from argparse import ArgumentTypeError, Namespace
from collections import defaultdict
from datetime import time
from os import system

Expand All @@ -21,7 +22,7 @@ class CrcInteractive(BaseParser):
"""Launch an interactive Slurm session."""

min_mpi_nodes = 2 # Minimum limit on requested MPI nodes
min_mpi_cores = {'mpi': 48, 'opa-high-mem': 28}
min_mpi_cores = defaultdict(lambda: 28, {'mpi': 48, 'opa-high-mem': 28}) # Minimum cores per MPI partition
min_time = 1 # Minimum limit on requested time in hours
max_time = 12 # Maximum limit on requested time in hours

Expand All @@ -32,6 +33,17 @@ class CrcInteractive(BaseParser):
default_mem = 1 # Default memory in GB
default_gpus = 0 # Default number of GPUs

# Clusters names to make available from the command line
# Maps cluster name to single character abbreviation use in the CLI
clusters = {
'smp': 's',
'gpu': 'g',
'mpi': 'm',
'invest': 'i',
'htc': 'd',
'teach': 'e'
}

def __init__(self) -> None:
"""Define arguments for the command line interface."""

Expand All @@ -42,13 +54,9 @@ def __init__(self) -> None:

# Arguments for specifying what cluster to start an interactive session on
cluster_args = self.add_argument_group('Cluster Arguments')
cluster_args.add_argument('-s', '--smp', action='store_true', help='launch a session on the smp cluster')
cluster_args.add_argument('-g', '--gpu', action='store_true', help='launch a session on the gpu cluster')
cluster_args.add_argument('-m', '--mpi', action='store_true', help='launch a session on the mpi cluster')
cluster_args.add_argument('-i', '--invest', action='store_true', help='launch a session on the invest cluster')
cluster_args.add_argument('-d', '--htc', action='store_true', help='launch a session on the htc cluster')
cluster_args.add_argument('-e', '--teach', action='store_true', help='launch a session on the teach cluster')
cluster_args.add_argument('-p', '--partition', help='run the session on a specific partition')
for cluster, abbrev in self.clusters.items():
cluster_args.add_argument(f'-{abbrev}', f'--{cluster}', action='store_true', help=f'launch a session on the {cluster} cluster')

# Arguments for requesting additional hardware resources
resource_args = self.add_argument_group('Arguments for Increased Resources')
Expand Down Expand Up @@ -79,7 +87,7 @@ def __init__(self) -> None:

@staticmethod
def parse_time(time_str: str) -> time:
"""Parse a string representation of time in 'HH:MM:SS' format and return a time object
"""Parse a string representation of time in 'HH:MM:SS' format and return a time object.
Args:
time_str: A string representing time in 'HH:MM:SS' format.
Expand All @@ -101,40 +109,45 @@ def parse_time(time_str: str) -> time:
except Exception:
raise ArgumentTypeError(f'Could not parse time value {time_str}')

def _validate_arguments(self, args: Namespace) -> None:
"""Exit the application if command line arguments are invalid
def parse_args(self, args=None, namespace=None) -> Namespace:
"""Parse command line arguments."""

Args:
args: Parsed commandline arguments
"""
args = super().parse_args(args, namespace)

# Set defaults that need to be determined dynamically
if not args.num_gpus:
args.num_gpus = 1 if args.gpu else 0

# Check wall time is between limits, enable both %H:%M format and integer hours
check_time = args.time.hour + args.time.minute / 60 + args.time.second / 3600

if not self.min_time <= check_time <= self.max_time:
self.error(f'{check_time} is not in {self.min_time} <= time <= {self.max_time}... exiting')
self.error(f'Requested time must be between {self.min_time} and {self.max_time}.')

# Check the minimum number of nodes are requested for mpi
if args.mpi and args.num_nodes < self.min_mpi_nodes:
self.error(f'You must use at least {self.min_mpi_nodes} nodes when using the MPI cluster')
self.error(f'You must use at least {self.min_mpi_nodes} nodes when using the MPI cluster.')

# Check the minimum number of cores are requested for mpi
if args.mpi and args.num_cores < self.min_mpi_cores.get(args.partition, self.default_mpi_cores):
self.error(f'You must request at least {self.min_mpi_cores.get(args.partition, self.default_mpi_cores)} '
f'cores per node when using the MPI cluster {args.partition} partition')
min_cores = self.min_mpi_cores[args.partition]
if args.mpi and args.num_cores < min_cores:
self.error(
f'You must request at least {min_cores} cores per node when using the {args.partition} partition on the MPI cluster.'
)

# Check a partition is specified if the user is requesting invest
if args.invest and not args.partition:
self.error('You must specify a partition when using the Investor cluster')
self.error('You must specify a partition when using the invest cluster.')

return args

def create_srun_command(self, args: Namespace) -> str:
"""Create an ``srun`` command based on parsed command line arguments
"""Create an `srun` command based on parsed command line arguments.
Args:
args: A dictionary of parsed command line parsed_args
args: A dictionary of parsed command line parsed_args.
Return:
The equivalent ``srun`` command as a string
The equivalent `srun` command as a string.
"""

# Map arguments from the parent application to equivalent srun arguments
Expand All @@ -161,26 +174,26 @@ def create_srun_command(self, args: Namespace) -> str:
if (args.gpu or args.invest) and args.num_gpus:
srun_args += ' ' + f'--gres=gpu:{args.num_gpus}'

cluster_to_run = next(cluster for cluster in Slurm.get_cluster_names() if getattr(args, cluster))
try:
cluster_to_run = next(cluster for cluster in self.clusters if getattr(args, cluster))

except StopIteration:
raise RuntimeError('Please specify which cluster to run on.')

return f'srun -M {cluster_to_run} {srun_args} --pty bash'

def app_logic(self, args: Namespace) -> None:
"""Logic to evaluate when executing the application
"""Logic to evaluate when executing the application.
Args:
args: Parsed command line arguments
args: Parsed command line arguments.
"""

if not any(getattr(args, cluster, False) for cluster in Slurm.get_cluster_names()):
self.print_help()
self.exit()

# Set defaults that need to be determined dynamically
if not args.num_gpus:
args.num_gpus = 1 if args.gpu else 0

# Create the slurm command
self._validate_arguments(args)
srun_command = self.create_srun_command(args)

if args.print_command:
Expand Down
2 changes: 2 additions & 0 deletions apps/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
"""The ``utils`` module defines helper utilities for building commandline system tools."""

from .system_info import Shell, Slurm
Loading

0 comments on commit 93c0a2e

Please sign in to comment.