From 358137876843656b3bff37995d12fcc43d2e8d20 Mon Sep 17 00:00:00 2001 From: Manuel Giffels Date: Thu, 1 Feb 2024 13:45:45 +0100 Subject: [PATCH] feat(compute4punch): add initial support (#430) --- .gitignore | 3 + Dockerfile | 4 + docs/openapi.json | 9 + .../compute4punch_job_manager.py | 391 ++++++++++++++++++ reana_job_controller/config.py | 35 ++ reana_job_controller/job_monitor.py | 164 +++++++- reana_job_controller/schemas.py | 3 + reana_job_controller/utils.py | 120 ++++++ requirements.in | 2 +- requirements.txt | 4 +- setup.py | 6 +- 11 files changed, 737 insertions(+), 4 deletions(-) create mode 100644 reana_job_controller/compute4punch_job_manager.py diff --git a/.gitignore b/.gitignore index d11b05ab..b280d06d 100644 --- a/.gitignore +++ b/.gitignore @@ -64,3 +64,6 @@ target/ # Vagrant .vagrant + +# Pycharm +.idea \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 4dea9496..1fad800d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -95,12 +95,16 @@ ARG DEBUG=0 RUN if [ "${DEBUG}" -gt 0 ]; then \ if echo "$COMPUTE_BACKENDS" | grep -q "htcondorcern"; then \ pip install --no-cache-dir -e ".[debug,htcondor]"; \ + elif echo "$COMPUTE_BACKENDS" | grep -q "compute4punch"; then \ + pip install --no-cache-dir ".[debug,mytoken,ssh]"; \ else \ pip install --no-cache-dir -e ".[debug]"; \ fi \ else \ if echo "$COMPUTE_BACKENDS" | grep -q "htcondorcern"; then \ pip install --no-cache-dir ".[htcondor]"; \ + elif echo "$COMPUTE_BACKENDS" | grep -q "compute4punch"; then \ + pip install --no-cache-dir ".[mytoken,ssh]"; \ else \ pip install --no-cache-dir .; \ fi \ diff --git a/docs/openapi.json b/docs/openapi.json index d037c2c9..1fe9bc99 100644 --- a/docs/openapi.json +++ b/docs/openapi.json @@ -39,6 +39,15 @@ }, "JobRequest": { "properties": { + "c4p_additional_requirements": { + "type": "string" + }, + "c4p_cpu_cores": { + "type": "string" + }, + "c4p_memory_limit": { + "type": "string" + }, "cmd": { "default": "", "type": "string" diff --git a/reana_job_controller/compute4punch_job_manager.py b/reana_job_controller/compute4punch_job_manager.py new file mode 100644 index 00000000..51fe6a20 --- /dev/null +++ b/reana_job_controller/compute4punch_job_manager.py @@ -0,0 +1,391 @@ +# This file is part of REANA. +# Copyright (C) 2019, 2020, 2021, 2022, 2023, 2024 CERN. +# +# REANA is free software; you can redistribute it and/or modify it +# under the terms of the MIT License; see LICENSE file for more details. + +"""Compute4PUNCH Job Manager.""" + +import base64 +import logging +import os +import re + +from paramiko.sftp_client import SFTPClient +from shlex import quote +from stat import S_ISDIR +from typing import Iterable + +from reana_commons.workspace import is_directory, open_file, walk +from reana_db.database import Session +from reana_db.models import Workflow +from reana_job_controller.job_manager import JobManager +from reana_job_controller.utils import SSHClient, motley_cue_auth_strategy_factory +from reana_job_controller.config import ( + C4P_LOGIN_NODE_HOSTNAME, + C4P_LOGIN_NODE_PORT, + C4P_SSH_TIMEOUT, + C4P_SSH_BANNER_TIMEOUT, + C4P_SSH_AUTH_TIMEOUT, + C4P_CPU_CORES, + C4P_MEMORY_LIMIT, + C4P_ADDITIONAL_REQUIREMENTS, + C4P_REANA_REL_WORKFLOW_PATH, +) + + +class Compute4PUNCHJobManager(JobManager): + """Compute4PUNCH Job Manager.""" + + C4P_WORKSPACE_PATH = "" + """Absolute path on the Compute4PUNCH head node used for submission""" + C4P_HOME_PATH = "" + """Default Compute4PUNCH home directory""" + SUBMIT_ID_PATTERN = re.compile(r"Proc\s(\d+\.\d+)") + """ regex to search the Job ID in a submit Proc line """ + + def __init__( + self, + docker_img=None, + cmd=None, + prettified_cmd=None, + env_vars=None, + workflow_uuid=None, + workflow_workspace=None, + cvmfs_mounts="false", + shared_file_system=False, + job_name=None, + c4p_cpu_cores=C4P_CPU_CORES, + c4p_memory_limit=C4P_MEMORY_LIMIT, + c4p_additional_requirements=C4P_ADDITIONAL_REQUIREMENTS, + **kwargs, + ): + """ + Compute4PUNCH Job Manager. + + :param docker_img: Docker image. + :type docker_img: str + :param cmd: Command to execute. + :type cmd: list + :param prettified_cmd: prettified version of command to execute. + :type prettified_cmd: str + :param env_vars: Environment variables. + :type env_vars: dict + :param workflow_uuid: Unique workflow id. + :type workflow_uuid: str + :param workflow_workspace: Path to REANA workspace + :type workflow_workspace: str + :param cvmfs_mounts: list of CVMFS mounts as a string. + :type cvmfs_mounts: str + :param shared_file_system: if shared file system is available. + :type shared_file_system: bool + :param job_name: Name of the job + :type job_name: str + :param c4p_cpu_cores: number of CPU cores to use on C4P + :type c4p_cpu_cores: str + :param c4p_memory_limit: maximum memory to be used on C4P + :type c4p_memory_limit: str + :param c4p_additional_requirements: additional HTCondor requirements for the job + :type c4p_additional_requirements: str + """ + super(Compute4PUNCHJobManager, self).__init__( + docker_img=docker_img, + cmd=cmd, + prettified_cmd=prettified_cmd, + env_vars=env_vars, + workflow_uuid=workflow_uuid, + workflow_workspace=workflow_workspace, + job_name=job_name, + ) + self.c4p_connection = SSHClient( + hostname=C4P_LOGIN_NODE_HOSTNAME, + port=C4P_LOGIN_NODE_PORT, + timeout=C4P_SSH_TIMEOUT, + banner_timeout=C4P_SSH_BANNER_TIMEOUT, + auth_timeout=C4P_SSH_AUTH_TIMEOUT, + auth_strategy=motley_cue_auth_strategy_factory( + hostname=C4P_LOGIN_NODE_HOSTNAME + ), + ) + self.compute_backend = "compute4punch" + self.cvmfs_mounts = cvmfs_mounts + self.shared_file_system = shared_file_system + self.job_execution_script_path = os.path.join( + self.c4p_abs_workspace_path, "run.sh" + ) + self.job_description_path = os.path.join( + self.c4p_abs_workspace_path, "submit.jdl" + ) + + self.c4p_cpu_cores = c4p_cpu_cores + self.c4p_memory_limit = c4p_memory_limit + self.c4p_additional_requirements = c4p_additional_requirements + + @JobManager.execution_hook + def execute(self) -> str: + """ + Execute/submit a job on Compute4PUNCH. + + :return: Backend Job ID + """ + self._create_c4p_workspace_environment() + self._create_c4p_job_execution_script() + job_inputs = self._get_job_inputs() + self._create_c4p_job_description(job_inputs=job_inputs) + self._upload_job_inputs(job_inputs=job_inputs) + self._upload_mytoken() + + submit_cmd_list = [ + f"cd {self.c4p_abs_workspace_path}", + f"condor_submit --verbose {os.path.basename(self.job_description_path)}", + ] + + response = self.c4p_connection.exec_command("&&".join(submit_cmd_list)) + + return next( + self.SUBMIT_ID_PATTERN.search(line).group(1) + for line in response.splitlines() + if line.startswith("** Proc") + ) + + @classmethod + def get_logs(cls, backend_job_id: str, **kwargs) -> str: + """ + Return job logs if log files are present. + + :param backend_job_id: ID of the job in the backend. + :param kwargs: Additional parameters needed to fetch logs. + In the case of Slurm, the ``workspace`` parameter is needed. + :return: String containing the job logs. + """ + if "workspace" not in kwargs: + raise ValueError("Missing 'workspace' parameter") + workspace = kwargs["workspace"] + + job_log = "" + + try: + for log in ("out", "err"): + filepath = f"logs/{backend_job_id}.{log}" + with open_file(workspace, filepath) as f: + job_log += f.read() + return job_log + except FileNotFoundError as e: + msg = f"Job logs of {backend_job_id} were not found. {e}" + logging.error(msg, exc_info=True) + return msg + + @classmethod + def get_outputs(cls, c4p_connection, src, dest) -> None: + """ + Transfer job outputs from Compute4PUNCH to local REANA workspace. + + :param c4p_connection: SSH connection to Compute4PUNCH + :type c4p_connection: SSHClient + :param src: Source directory + :type src: str + :param dest: Destination directory + :type dest: str + """ + sftp_client = c4p_connection.ssh_client.open_sftp() + sftp_client.chdir(src) + try: + cls._download_output_directory(sftp_client, src, dest) + finally: + sftp_client.close() + + def stop(self, backend_job_id: str) -> None: + """ + Stop job execution. + + :param backend_job_id: The backend job id + :type backend_job_id: str + """ + try: + self.c4p_connection.exec_command(f"condor_rm {backend_job_id}") + except Exception as ex: + logging.error(ex, exc_info=True) + + @property + def c4p_home_path(self) -> str: + """Determine and return the Compute4PUNCH home directory on Compute4PUNCH.""" + if not self.C4P_HOME_PATH: + # Since the JobMonitor entirely rely on class variables to get corresponding + # paths on Compute4PUNCH, the class variable C4P_HOME_PATH needs to be + # modified here. + Compute4PUNCHJobManager.C4P_HOME_PATH = self.c4p_connection.exec_command( + "pwd" + ).strip() + return self.C4P_HOME_PATH + + @property + def c4p_abs_workspace_path(self) -> str: + """Determine and return the absolute Compute4PUNCH workspace path.""" + if not self.C4P_WORKSPACE_PATH: + # Since the JobMonitor entirely rely on class variables to get corresponding + # paths on Compute4PUNCH, the class variable C4P_WORKSPACE_PATH needs to be + # modified here. + Compute4PUNCHJobManager.C4P_WORKSPACE_PATH = os.path.join( + self.c4p_home_path, self.c4p_rel_workspace_path + ) + return self.C4P_WORKSPACE_PATH + + @property + def c4p_rel_workspace_path(self) -> str: + """Determine and return the relative Compute4PUNCH workspace path.""" + return os.path.join(C4P_REANA_REL_WORKFLOW_PATH, self.workflow_uuid) + + def _create_c4p_job_description(self, job_inputs: Iterable) -> None: + """Create job description for Compute4PUNCH.""" + job_inputs = ",".join(job_inputs) + job_outputs = "." # download everything from remote job + job_environment = (f"{key}={value}" for key, value in self.env_vars.items()) + job_description_template = [ + f"executable = {os.path.basename(self.job_execution_script_path)}", + "use_oauth_services = helmholtz", + "output = logs/$(cluster).$(process).out", + "error = logs/$(cluster).$(process).err", + "log = cluster.log", + f"environment = \"{' '.join(job_environment)}\"", + "ShouldTransferFiles = YES", + "WhenToTransferOutput = ON_SUCCESS", + "preserve_relative_paths = TRUE", + f"transfer_input_files = {job_inputs}" if job_inputs else "", + f"transfer_output_files = {job_outputs}", + f"request_cpus = {self.c4p_cpu_cores}", + f"request_memory = {self.c4p_memory_limit}", + f'+SINGULARITY_JOB_CONTAINER = "{self.docker_img}"', + ( + f"requirements = {self.c4p_additional_requirements}" + if self.c4p_additional_requirements + else "" + ), + f'description = "{self.workflow.get_full_workflow_name() + "_" + self.job_name}"', + "queue 1", + ] + # Avoid potentially security issue, by removing all strings after a potential + # injected newline. + job_description_template = map( + lambda x: x.split("\n")[0] if x else x, job_description_template + ) + job_description = "\n".join(filter(None, job_description_template)) + + self.c4p_connection.exec_command( + f"cat <<< {quote(job_description)} > {self.job_description_path}" + ) + + def _create_c4p_job_execution_script(self) -> None: + """Create job execution script for Compute4PUNCH.""" + # The workflow workspace does not exist on Compute4PUNCH, + # therefore replace it with CONDOR_JOB_IWD + cmd = self.cmd.replace(self.workflow_workspace, "$_CONDOR_JOB_IWD") + logging.info(f"CMD is {cmd}") + cmd = self._encode_cmd(cmd) + job_execution_script_template = ["#!/bin/bash", cmd] + job_execution_script = "\n".join(job_execution_script_template) + + self.c4p_connection.exec_command( + f"cat <<< '{job_execution_script}' > {self.job_execution_script_path} && " + f"chmod +x {self.job_execution_script_path}" + ) + + def _create_c4p_workspace_environment(self) -> None: + """Create workspace environment for REANA @ Compute4PUNCH.""" + self.c4p_connection.exec_command(f"mkdir -p {self.c4p_abs_workspace_path}") + self.c4p_connection.exec_command( + f"mkdir -p {os.path.join(self.c4p_abs_workspace_path, 'logs')}" + ) + + @classmethod + def _download_output_directory( + cls, sftp_client: SFTPClient, remote_dir: str, local_dir: str + ) -> None: + """ + Download output directory and content to a local directory. + + :param sftp_client: SFTP client to use for downloading + :type sftp_client: SFTPClient + :param remote_dir: Remote directory to download + :type remote_dir: str + :param local_dir: Local destination directory + :type local_dir: str + """ + skipped_output_files = ( + "cluster.log", + "_condor_creds", + "_condor_stdout", + "_condor_stderr", + "run.sh", + "submit.jdl", + "tmp", + "var_tmp", + ) # skip intermediate and temporary htcondor files + os.path.exists(local_dir) or os.makedirs(local_dir) + for item in filter( + lambda x: x.filename not in skipped_output_files, + sftp_client.listdir_attr(remote_dir), + ): + remote_path = os.path.join(remote_dir, item.filename) + local_path = os.path.join(local_dir, item.filename) + if S_ISDIR(item.st_mode): + cls._download_output_directory(sftp_client, remote_path, local_path) + else: + sftp_client.get(remote_path, local_path) + + @staticmethod + def _encode_cmd(cmd: str) -> str: + """ + Encode base64 cmd. + + :param cmd: Command to encode + :type cmd: str + """ + encoded_cmd = base64.b64encode(cmd.encode("utf-8")).decode("utf-8") + return f"echo {encoded_cmd} | base64 -d | bash" + + def _get_job_inputs(self) -> Iterable: + """Collect all input files in the local REANA workspace.""" + skipped_input_files = (".job.ad", ".machine.ad", ".chirp.config") + return list( + filter( + lambda x: x not in skipped_input_files, + walk( + workspace=self.workflow_workspace, + ), + ) + ) + + def _upload_mytoken(self) -> None: + mytoken = os.environ.get("HELMHOLTZ_TOP") + self.c4p_connection.exec_command( + f"/usr/local/bin/reana_upload_mytoken <<< {mytoken}" + ) + + def _upload_job_inputs(self, job_inputs: Iterable) -> None: + """Upload job inputs to Compute4PUNCH.""" + sftp_client = self.c4p_connection.ssh_client.open_sftp() + sftp_client.chdir(self.c4p_rel_workspace_path) + try: + for job_input in job_inputs: + if is_directory(self.workflow_workspace, job_input): + try: # check if directory already exists + sftp_client.stat(job_input) + except FileNotFoundError: + sftp_client.mkdir(job_input) + finally: + continue + else: + local_path = os.path.join(self.workflow_workspace, job_input) + remote_path = os.path.join(self.c4p_abs_workspace_path, job_input) + sftp_client.put(local_path, remote_path) + finally: + sftp_client.close() + + @property + def workflow(self): + """Get workflow from db.""" + workflow = ( + Session.query(Workflow).filter_by(id_=self.workflow_uuid).one_or_none() + ) + if workflow: + return workflow diff --git a/reana_job_controller/config.py b/reana_job_controller/config.py index e1bc02bb..f243d282 100644 --- a/reana_job_controller/config.py +++ b/reana_job_controller/config.py @@ -30,6 +30,9 @@ "slurmcern": lambda: import_string( "reana_job_controller.slurmcern_job_manager.SlurmJobManagerCERN" ), + "compute4punch": lambda: import_string( + "reana_job_controller.compute4punch_job_manager.Compute4PUNCHJobManager" + ), } """Supported job compute backends and corresponding management class.""" @@ -43,6 +46,9 @@ "slurmcern": lambda: import_string( "reana_job_controller.job_monitor.JobMonitorSlurmCERN" ), + "compute4punch": lambda: import_string( + "reana_job_controller.job_monitor.JobMonitorCompute4PUNCH" + ), } """Classes responsible for monitoring specific backend jobs""" @@ -153,3 +159,32 @@ SLURM_SSH_AUTH_TIMEOUT = float(os.getenv("SLURM_SSH_AUTH_TIMEOUT", "60")) """Seconds to wait for SLURM SSH authentication response.""" + +C4P_LOGIN_NODE_HOSTNAME = os.getenv("C4P_LOGIN_NODE", "c4p-login.gridka.de") +"""Hostname of C4P login node used for job management via SSH.""" + +C4P_LOGIN_NODE_PORT = os.getenv("C4P_LOGIN_NODE_PORT", "22") +"""Port of C4P login node.""" + +C4P_SSH_TIMEOUT = float(os.getenv("C4P_SSH_TIMEOUT", "60")) +"""Seconds to wait for C4P SSH TCP connection.""" + +C4P_SSH_BANNER_TIMEOUT = float(os.getenv("C4P_SSH_BANNER_TIMEOUT", "60")) +"""Seconds to wait for C4P SSH banner to be presented.""" + +C4P_SSH_AUTH_TIMEOUT = float(os.getenv("C4P_SSH_AUTH_TIMEOUT", "60")) +"""Seconds to wait for C4P SSH authentication response.""" + +C4P_CPU_CORES = os.getenv("C4P_CPU_CORES", "8") +"""Number of CPU cores used to run the REANA jobs.""" + +C4P_MEMORY_LIMIT = os.getenv("C4P_MEMORY_LIMIT", "20000") +"""Maximum amount memory used by the REANA jobs.""" + +C4P_ADDITIONAL_REQUIREMENTS = os.getenv("C4P_ADDITIONAL_REQUIREMENTS", "") +"""Additional requirements to run the REANA jobs on C4P nodes.""" + +C4P_REANA_REL_WORKFLOW_PATH = os.getenv( + "C4P_REANA_REL_WORKFLOW_PATH", "reana/workflows" +) +"""Path relative to the uses home directory of the REANA workflow space on the C4P login node.""" diff --git a/reana_job_controller/job_monitor.py b/reana_job_controller/job_monitor.py index 4308a806..0a3995f3 100644 --- a/reana_job_controller/job_monitor.py +++ b/reana_job_controller/job_monitor.py @@ -27,10 +27,21 @@ SLURM_SSH_TIMEOUT, SLURM_SSH_BANNER_TIMEOUT, SLURM_SSH_AUTH_TIMEOUT, + C4P_LOGIN_NODE_HOSTNAME, + C4P_LOGIN_NODE_PORT, + C4P_SSH_TIMEOUT, + C4P_SSH_BANNER_TIMEOUT, + C4P_SSH_AUTH_TIMEOUT, ) + from reana_job_controller.job_db import JOB_DB, store_job_logs, update_job_status from reana_job_controller.kubernetes_job_manager import KubernetesJobManager -from reana_job_controller.utils import SSHClient, singleton +from reana_job_controller.utils import ( + SSHClient, + singleton, + csv_parser, + motley_cue_auth_strategy_factory, +) class JobMonitor: @@ -469,6 +480,157 @@ def watch_jobs(self, job_db, app=None): time.sleep(120) +@singleton +class JobMonitorCompute4PUNCH(JobMonitor): + """HTCondor jobs monitor Compute4PUNCH.""" + + def __init__(self, **kwargs): + """Initialize Compute4PUNCH job monitor thread.""" + self.job_manager_cls = COMPUTE_BACKENDS["compute4punch"]() + super(__class__, self).__init__(thread_name="compute4punch_job_monitor") + + def watch_jobs(self, job_db, app=None): + """ + Use SSH connection to Compute4PUNCH login node to monitor jobs. + + :param job_db: Dictionary which contains all running jobs. + """ + c4p_connection = SSHClient( + hostname=C4P_LOGIN_NODE_HOSTNAME, + port=C4P_LOGIN_NODE_PORT, + timeout=C4P_SSH_TIMEOUT, + banner_timeout=C4P_SSH_BANNER_TIMEOUT, + auth_timeout=C4P_SSH_AUTH_TIMEOUT, + auth_strategy=motley_cue_auth_strategy_factory( + hostname=C4P_LOGIN_NODE_HOSTNAME + ), + ) + + while True: + logging.debug( + "Starting a new stream request to watch Jobs on Compute4PUNCH" + ) + try: + c4p_job_mapping = { + job_dict["backend_job_id"]: reana_job_id + for reana_job_id, job_dict in job_db.items() + if filter_jobs_to_watch( + reana_job_id, job_db, compute_backend="compute4punch" + ) + } + c4p_job_statuses = query_c4p_jobs( + *c4p_job_mapping.keys(), ssh_client=c4p_connection + ) + logging.info(f"Compute4PUNCH JobStatuses: {c4p_job_statuses}") + for c4p_job_id, reana_job_id in c4p_job_mapping.items(): + job_status = None + try: + c4p_job_status = c4p_job_statuses[c4p_job_id]["JobStatus"] + logging.debug(f"JobStatus of {c4p_job_id} is {c4p_job_status}") + except KeyError: + msg = f"Job {c4p_job_id} was not found on " + msg += f"{C4P_LOGIN_NODE_HOSTNAME}. Assuming it has failed." + logging.warning(msg) + job_status = "failed" + update_job_status(reana_job_id, job_status) + job_db[reana_job_id]["deleted"] = True + store_job_logs(logs=msg, job_id=reana_job_id) + else: + if c4p_job_status == str(condorJobStatus["Completed"]): + if c4p_job_statuses[c4p_job_id]["ExitCode"] == "0": + job_status = "finished" + else: + job_status = "failed" + elif c4p_job_status == str(condorJobStatus["Held"]): + if c4p_job_statuses[c4p_job_id]["HoldReasonCode"] == "16": + # HoldReasonCode 16 means input files are being spooled. + continue + logging.debug( + f"Job {c4p_job_id} was held, will delete and set as failed" + ) + self.job_manager_cls.stop(c4p_job_id) + job_status = "failed" + else: + continue + if job_status in ("failed", "finished"): + workflow_workspace = job_db[reana_job_id][ + "obj" + ].workflow_workspace + self.job_manager_cls.get_outputs( + c4p_connection=c4p_connection, + src=self.job_manager_cls.C4P_WORKSPACE_PATH, + dest=workflow_workspace, + ) + update_job_status(reana_job_id, job_status) + job_db[reana_job_id]["deleted"] = True + store_job_logs( + logs=self.job_manager_cls.get_logs( + backend_job_id=c4p_job_id, + workspace=workflow_workspace, + ), + job_id=reana_job_id, + ) + except Exception as ex: + logging.error("Unexpected error: {}".format(ex), exc_info=True) + time.sleep(120) + + +def query_c4p_jobs(*backend_job_ids: str, ssh_client: SSHClient): + """ + Query status information of backend jobs on Compute4PUNCH. + + :param backend_job_ids: List of job ids to query on Compute4PUNCH + :type backend_job_ids: str + :param ssh_client: SSH client used to communicate with Compute4PUNCH + """ + attributes = ("JobStatus", "ClusterId", "ProcId", "ExitCode", "HoldReasonCode") + attributes_string = " ".join(attributes) + + formatted_backend_job_ids = " ".join(backend_job_ids) + + condor_q_command = f"condor_q {formatted_backend_job_ids} -af:t {attributes_string}" + condor_history_command = ( + f"condor_history {formatted_backend_job_ids} -af:t {attributes_string}" + ) + + c4p_job_status = ssh_client.exec_command( + f"{condor_q_command} && {condor_history_command}" + ) + + c4p_queue = {} + + for row in csv_parser( + input_csv=c4p_job_status.strip(), + fieldnames=attributes, + delimiter="\t", + replacements=dict(undefined=None), + ): + row["JobId"] = f"{row['ClusterId']}.{row['ProcId']}" + c4p_queue[row["JobId"]] = row + + return c4p_queue + + +def filter_jobs_to_watch( + id, job_db, compute_backend, statuses_to_skip=("finished", "failed", "stopped") +): + """ + Filter jobs to watch for job completion. + + :param id: REANA job id + :type id: str + :param job_db: REANA job database + :type job_db: JOB_DB + :param compute_backend: REANA compute backend used + :type compute_backend: str + :param statuses_to_skip: REANA job statuses to skip + :type statuses_to_skip: tuple[str] + """ + return job_db[id]["compute_backend"] == compute_backend and not ( + job_db[id]["deleted"] or job_db[id]["status"] in statuses_to_skip + ) + + def format_condor_job_que_query(backend_job_ids): """Format HTCondor job que query.""" base_query = "ClusterId == {} ||" diff --git a/reana_job_controller/schemas.py b/reana_job_controller/schemas.py index c44189bf..3c6ae659 100644 --- a/reana_job_controller/schemas.py +++ b/reana_job_controller/schemas.py @@ -56,6 +56,9 @@ class JobRequest(Schema): htcondor_accounting_group = fields.Str(required=False) slurm_partition = fields.Str(required=False) slurm_time = fields.Str(required=False) + c4p_cpu_cores = fields.Str(required=False) + c4p_memory_limit = fields.Str(required=False) + c4p_additional_requirements = fields.Str(required=False) @pre_load def set_kubernetes_job_timeout(self, in_data, **kwargs): diff --git a/reana_job_controller/utils.py b/reana_job_controller/utils.py index 2f6389f7..5bb5d809 100644 --- a/reana_job_controller/utils.py +++ b/reana_job_controller/utils.py @@ -8,12 +8,16 @@ """Job controller utils.""" +import csv import logging import os import socket import subprocess import sys +from io import StringIO +from typing import List, Tuple + from reana_db.database import Session from reana_db.models import Workflow @@ -69,6 +73,119 @@ def initialize_krb5_token(workflow_uuid): logging.error(msg, exc_info=True) +def csv_parser( + input_csv: str, + fieldnames: [List, Tuple], + delimiter: str = "\t", + replacements: dict = None, + skip_initial_space: bool = False, + skip_trailing_space: bool = False, +): + """ + Parse CSV formatted input. + + :param input_csv: CSV formatted input + :type input_csv: str + :param fieldnames: corresponding field names + :type fieldnames: [List, Tuple] + :param delimiter: delimiter between entries + :type delimiter: str + :param replacements: fields to be replaced + :type replacements: dict + :param skip_initial_space: ignore whitespace immediately following the delimiter + :type skip_initial_space: bool + :param skip_trailing_space: ignore whitespace at the end of each csv row + :type skip_trailing_space: bool + """ + if skip_trailing_space: + input_csv = "\n".join((line.strip() for line in input_csv.splitlines())) + + replacements = replacements or {} + with StringIO(input_csv) as csv_input: + csv_reader = csv.DictReader( + csv_input, + fieldnames=fieldnames, + delimiter=delimiter, + skipinitialspace=skip_initial_space, + ) + for row in csv_reader: + yield { + key: value if value not in replacements.keys() else replacements[value] + for key, value in row.items() + } + + +def motley_cue_auth_strategy_factory(hostname): + """ + Paramiko auth strategy factory that provides oauth based ssh token authentication. + + This auth strategy has been developed against the motley cue implementation of + oauth based ssh token authentication on the server side. + + :param hostname: hostname of the ssh node + :type hostname: str + """ + # Using a factory to avoid a general dependency on libmytoken, paramiko and pyjwt + from libmytoken import get_access_token_from_jwt_mytoken + from paramiko.auth_strategy import AuthSource + from time import time + import jwt + import requests + + class MotleyCueTokenAuth(AuthSource): + def __init__(self): + self._access_token = None + self._access_token_expires_on = 0 + self.hostname = hostname + self.username = self._get_deployed_username() + super().__init__(username=self.username) + + @property + def access_token(self): + if not (self._access_token and self._is_access_token_valid()): + self._refresh_access_token() + return self._access_token + + def authenticate(self, transport): + return transport.auth_interactive( + username=self.username, handler=self.motley_cue_auth_handler + ) + + def _is_access_token_valid(self): + return ( + self._access_token_expires_on - time() > 100 + ) # token should be at least valid for 100 s + + def _get_access_token_expiry_date(self): + decoded_token = jwt.decode( + self._access_token, + options={"verify_signature": False, "verify_aud": False}, + ) + return decoded_token["exp"] + + def _get_deployed_username(self): + headers = {"Authorization": f"Bearer {self.access_token}"} + req = requests.get( + f"https://{self.hostname}/user/deploy", headers=headers, verify=True + ) + req.raise_for_status() + return req.json()["credentials"]["ssh_user"] + + def motley_cue_auth_handler(self, title, instructions, prompt_list): + return [ + self.access_token if (echo and "Access Token" in prompt) else "" + for prompt, echo in prompt_list + ] + + def _refresh_access_token(self): + self._access_token = get_access_token_from_jwt_mytoken( + os.environ.get("HELMHOLTZ_TOP") + ) + self._access_token_expires_on = self._get_access_token_expiry_date() + + return MotleyCueTokenAuth() + + @singleton class SSHClient: """SSH Client.""" @@ -82,6 +199,7 @@ def __init__( timeout=None, banner_timeout=None, auth_timeout=None, + auth_strategy=None, ): """Initialize ssh client.""" if hostname: @@ -96,6 +214,7 @@ def __init__( self.timeout = timeout self.banner_timeout = banner_timeout self.auth_timeout = auth_timeout + self.auth_strategy = auth_strategy self.ssh_client = self.paramiko.SSHClient() self.ssh_client.set_missing_host_key_policy(self.paramiko.AutoAddPolicy()) self.establish_connection() @@ -114,6 +233,7 @@ def establish_connection(self): look_for_keys=False, port=self.port, timeout=self.timeout, + auth_strategy=self.auth_strategy, ) def exec_command(self, command): diff --git a/requirements.in b/requirements.in index 07d5b913..13258579 100644 --- a/requirements.in +++ b/requirements.in @@ -4,5 +4,5 @@ # REANA is free software; you can redistribute it and/or modify it # under the terms of the MIT License; see LICENSE file for more details. -paramiko[gssapi]==3.0.0 +paramiko[gssapi]>=3.2.0 gssapi==1.6.1 diff --git a/requirements.txt b/requirements.txt index a4e8cfe5..1a6e3dd2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -37,6 +37,7 @@ jsonref==1.1.0 # via bravado-core jsonschema[format]==3.2.0 # via bravado-core, reana-commons, swagger-spec-validator kombu==5.3.5 # via reana-commons kubernetes==22.6.0 # via reana-commons +libmytoken==0.1.0 mako==1.3.2 # via alembic markupsafe==2.1.5 # via jinja2, mako, werkzeug marshmallow==2.20.1 # via reana-job-controller (setup.py) @@ -45,11 +46,12 @@ monotonic==1.6 # via bravado msgpack==1.0.7 # via bravado-core msgpack-python==0.5.6 # via bravado oauthlib==3.2.2 # via requests-oauthlib -paramiko[gssapi]==3.0.0 # via -r requirements.in +paramiko[gssapi]==3.4.0 # via -r requirements.in, paramiko psycopg2-binary==2.9.9 # via reana-db pyasn1==0.5.1 # via paramiko, pyasn1-modules, rsa pyasn1-modules==0.3.0 # via google-auth pycparser==2.21 # via cffi +pyjwt==2.8.0 pynacl==1.5.0 # via paramiko pyrsistent==0.20.0 # via jsonschema python-dateutil==2.9.0 # via bravado, bravado-core, kubernetes diff --git a/setup.py b/setup.py index d2e94a9e..2814a9ef 100644 --- a/setup.py +++ b/setup.py @@ -40,7 +40,11 @@ "htcondor==9.0.17", ], "tests": tests_require, - "ssh": ["paramiko[gssapi]>=3.0.0"], + "ssh": ["paramiko[gssapi]>=3.2.0"], + "mytoken": [ + "pyjwt>=2.8.0", + "libmytoken>=0.1.0", + ], } # Python tests need SSH dependencies for imports