diff --git a/src/bindings/python/flux/constraint/parser.py b/src/bindings/python/flux/constraint/parser.py index ca299dd524c7..d97ae0e1dc00 100644 --- a/src/bindings/python/flux/constraint/parser.py +++ b/src/bindings/python/flux/constraint/parser.py @@ -231,6 +231,16 @@ class default operator may optionally be substituted, e.g. "operand" to split values of that operator. For instance ``{"op": ","}`` would autosplit operator ``op`` values on comma. + convert_values (dict): A mapping of operator name to callable which + should take a single list argument containing the values from + the obtained from the current term for the operator after the + operator_map and split_values operations have been applied. The + callable should return a new list of values. This can be used + to convert values to a new type or to combine multiple values + into a single element, e.g. :: + + convert_values = {"ints": lambda args: [int(x) for x in args]} + combined_terms (set): A set of operator terms whose values can be combined when joined with the AND logical operator. E.g. if "test" is in ``combined_terms``, then @@ -284,6 +294,10 @@ class MyConstraintParser(ConstraintParser): # Combined terms combined_terms = set() + # Mapping of operator name to value conversion function. + # E.g. { "integer": lambda args: [ int(x) for x in args ] } + convert_values = {} + def __init__( self, lexer=None, optimize=True, debug=False, write_tables=False, **kw_args ): @@ -408,10 +422,12 @@ def p_expression_token(self, p): f"invalid character '{invalid}' in operator '{op}:'" ) + values = [value] if op in self.split_values: - p[0] = {op: value.split(self.split_values[op])} - else: - p[0] = {op: [value]} + values = value.split(self.split_values[op]) + if op in self.convert_values: + values = self.convert_values[op](values) + p[0] = {op: values} def p_quoted_token(self, p): """ diff --git a/src/bindings/python/flux/job/info.py b/src/bindings/python/flux/job/info.py index 376f965e345a..9ecb61637ab1 100644 --- a/src/bindings/python/flux/job/info.py +++ b/src/bindings/python/flux/job/info.py @@ -19,7 +19,7 @@ from itertools import chain import flux.constants -from flux.core.inner import raw +from flux.core.inner import ffi, raw from flux.job.JobID import JobID from flux.job.stats import JobStats from flux.memoized_property import memoized_property @@ -42,6 +42,12 @@ def statetostr(stateid, fmt="L"): return raw.flux_job_statetostr(stateid, fmt).decode("utf-8") +def strtostate(state): + result = ffi.new("flux_job_state_t [1]") + raw.flux_job_strtostate(state, result) + return int(result[0]) + + def statetoemoji(stateid): statestr = raw.flux_job_statetostr(stateid, "S").decode("utf-8") if statestr == "N": @@ -81,6 +87,12 @@ def resulttostr(resultid, fmt="L"): return raw.flux_job_resulttostr(resultid, fmt).decode("utf-8") +def strtoresult(arg): + result = ffi.new("flux_job_result_t [1]") + raw.flux_job_strtoresult(arg, result) + return int(result[0]) + + def resulttoemoji(resultid): if resultid != "": resultstr = raw.flux_job_resulttostr(resultid, "S").decode("utf-8") diff --git a/src/bindings/python/flux/job/list.py b/src/bindings/python/flux/job/list.py index 73f342f40bbf..6c4f8518e3e8 100644 --- a/src/bindings/python/flux/job/list.py +++ b/src/bindings/python/flux/job/list.py @@ -10,13 +10,18 @@ import errno import os import pwd +import sys from collections.abc import Iterable +from datetime import datetime +from functools import reduce import flux.constants +from flux.constraint.parser import ConstraintLexer, ConstraintParser from flux.future import WaitAllFuture from flux.job import JobID -from flux.job.info import JobInfo +from flux.job.info import JobInfo, strtoresult, strtostate from flux.rpc import RPC +from flux.util import parse_datetime class JobListRPC(RPC): @@ -341,3 +346,176 @@ def jobs(self): if hasattr(rpc, "errors"): self.errors = rpc.errors return [JobInfo(job) for job in jobs] + + +def job_list_filter_to_mask(args, conv): + """ + Convert all job state or result strings with conv() and combine into + a single state or result mask as accepted by the job-list constraints. + + This is a convenience function for the JobListConstraintParser class. + + Args: + args (list): list of values to convert + conv (callable): function to call on each arg to convert to a state + or result mask. + """ + return reduce(lambda x, y: x | y, map(conv, args)) + + +class JobListConstraintParser(ConstraintParser): + operator_map = { + None: "filter", + "id": "jobid", + "host": "hostlist", + "hosts": "hostlist", + "rank": "ranks", + } + split_values = {"states": ",", "results": ",", "userid": ","} + convert_values = { + "userid": lambda args: [int(x) for x in args], + "states": lambda args: [job_list_filter_to_mask(args, strtostate)], + "results": lambda args: [job_list_filter_to_mask(args, strtoresult)], + } + valid_states = ( + "depend", + "priority", + "sched", + "run", + "cleanup", + "inactive", + "pending", + "running", + "active", + ) + valid_results = ("completed", "failed", "canceled", "timeout") + + def convert_filter(self, arg): + # + # This is a generic state/result filter for backwards compat with + # --filter=. Split into separate states and results operators and + # return the new term(s) (joined by 'or' since that preserves the + # behavior of `--filter`). + # + states = [] + results = [] + for name in arg.split(","): + name = name.lower() + if name in self.valid_states: + states.append(name) + elif name in self.valid_results: + results.append(name) + else: + raise ValueError(f"Invalid filter specified: {name}") + arg = "" + if states: + arg += "states:" + ",".join(states) + " " + if results: + arg += "or " + if results: + arg += "results:" + ",".join(results) + return arg.rstrip() + + @staticmethod + def convert_user(arg): + op, _, arg = arg.partition(":") + users = [] + for user in arg.split(","): + try: + users.append(str(int(user))) + except ValueError: + users.append(str(pwd.getpwnam(user).pw_uid)) + return "userid:" + ",".join(users) + + @staticmethod + def convert_datetime(dt): + if isinstance(dt, (float, int)): + if dt == 0: + # A datetime of zero indicates unset, or an arbitrary time + # in the future. Return 12 months from now. + return parse_datetime("+12m") + dt = datetime.fromtimestamp(dt).astimezone() + else: + dt = parse_datetime(dt, assumeFuture=False) + return dt.timestamp() + + def convert_range(self, arg): + arg = arg[1:] + if ".." in arg: + start, end = arg.split("..") + arg = "(not (" + if start: + dt = self.convert_datetime(start) + arg += f"'t_cleanup:<{dt}'" + if start and end: + arg += " or " + if end: + dt = self.convert_datetime(end) + arg += f"'t_run:>{dt}'" + arg += "))" + else: + dt = self.convert_datetime(arg) + arg = f"(t_run:'<={dt}' and t_cleanup:'>={dt}')" + return arg + + def convert_timeop(self, arg): + op, _, arg = arg.partition(":") + prefix = "" + if arg[0] in (">", "<"): + if arg[1] == "=": + prefix = arg[:2] + arg = arg[2:] + else: + prefix = arg[0] + arg = arg[1:] + arg = self.convert_datetime(arg) + return f"'{op}:{prefix}{arg}'" + + def convert_token(self, arg): + if arg.startswith("@"): + return self.convert_range(arg) + if arg.startswith("t_"): + return self.convert_timeop(arg) + if arg.startswith("user:"): + return self.convert_user(arg) + if ":" not in arg: + return self.convert_filter(arg) + return f"'{arg}'" + + def parse(self, string, debug=False): + # First pass: traverse all tokens and apply convenience conversions + expression = "" + lexer = ConstraintLexer() + lexer.input(str(string)) + if debug: + print(f"input: {string}", file=sys.stderr) + + # Get all tokens first so we can do lookahead in the next step for + # proper use of whitespace: + tokens = [] + while True: + tok = lexer.token() + if tok is None: + break + tokens.append(tok) + + # Reconstruct expression while converting tokens: + for i, tok in enumerate(tokens): + next_tok = None + if i < len(tokens) - 1: + next_tok = tokens[i + 1] + if debug: + print(tok, file=sys.stderr) + if tok.type != "TOKEN": + expression += tok.value + else: + expression += self.convert_token(tok.value) + if tok.type not in ("LPAREN", "NEGATE") and ( + next_tok and next_tok.type not in ("RPAREN") + ): + expression += " " + + if debug: + print(f"expression: '{expression}'", file=sys.stderr) + + return super().parse(expression) diff --git a/src/bindings/python/flux/util.py b/src/bindings/python/flux/util.py index 6f561f1add26..9c20f08cd609 100644 --- a/src/bindings/python/flux/util.py +++ b/src/bindings/python/flux/util.py @@ -226,6 +226,17 @@ def __call__(self, parser, namespace, values, option_string=None): getattr(namespace, self.dest).update(values) +class FilterActionConcatenate(argparse.Action): + """Concatenate filter arguments separated with space""" + + def __call__(self, parser, namespace, values, option_string=None): + setattr(namespace, "filtered", True) + current = getattr(namespace, self.dest) + if current is not None: + values = current + " " + values + setattr(namespace, self.dest, values) + + # pylint: disable=redefined-builtin class FilterTrueAction(argparse.Action): def __init__( @@ -331,7 +342,7 @@ def parse_fsd(fsd_string): return seconds -def parse_datetime(string, now=None): +def parse_datetime(string, now=None, assumeFuture=True): """Parse a possibly human readable datetime string or offset If string starts with `+` or `-`, then the remainder of the string @@ -369,6 +380,9 @@ def parse_datetime(string, now=None): cal = Calendar() cal.ptc.StartHour = 0 + if not assumeFuture: + cal.ptc.DOWParseStyle = 0 + cal.ptc.YearParseStyle = 0 time_struct, status = cal.parse(string, sourceTime=now.timetuple()) if status == 0: raise ValueError(f'Invalid datetime: "{string}"') diff --git a/src/cmd/flux-jobs.py b/src/cmd/flux-jobs.py index 91b66914c4a9..53126658b477 100755 --- a/src/cmd/flux-jobs.py +++ b/src/cmd/flux-jobs.py @@ -20,9 +20,11 @@ from flux.hostlist import Hostlist from flux.idset import IDset from flux.job import JobID, JobInfo, JobInfoFormat, JobList, job_fields_to_attrs +from flux.job.list import JobListConstraintParser from flux.job.stats import JobStats from flux.util import ( FilterAction, + FilterActionConcatenate, FilterActionSetUpdate, FilterTrueAction, UtilConfig, @@ -153,18 +155,17 @@ def fetch_jobs_flux(args, fields, flux_handle=None): if args.filter: LOGGER.warning("Both -a and --filter specified, ignoring -a") else: - args.filter.update(["pending", "running", "inactive"]) + args.filter = "pending,running,inactive" if not args.filter: - args.filter = {"pending", "running"} + args.filter = "pending,running" - constraint = None if args.include: try: - constraint = {"ranks": [IDset(args.include).encode()]} + args.filter += " ranks:" + IDset(args.include).encode() except ValueError: try: - constraint = {"hostlist": [Hostlist(args.include).encode()]} + args.filter += " host:" + Hostlist(args.include).encode() except ValueError: raise ValueError(f"-i/--include: invalid targets: {args.include}") @@ -172,13 +173,12 @@ def fetch_jobs_flux(args, fields, flux_handle=None): flux_handle, ids=args.jobids, attrs=attrs, - filters=args.filter, user=args.user, max_entries=args.count, since=since, name=args.name, queue=args.queue, - constraint=constraint, + constraint=JobListConstraintParser().parse(args.filter), ) jobs = jobs_rpc.jobs() @@ -231,10 +231,9 @@ def parse_args(): parser.add_argument( "-f", "--filter", - action=FilterActionSetUpdate, - metavar="STATE|RESULT", - default=set(), - help="List jobs with specific job state or result", + action=FilterActionConcatenate, + metavar="QUERY", + help="Restrict jobs using a constraint query string", ) parser.add_argument( "--since",