Skip to content

Commit

Permalink
fix(k8s): handle exceptions better for Dask K8s resources (reanahub#618)
Browse files Browse the repository at this point in the history
  • Loading branch information
Alputer committed Jan 28, 2025
1 parent f252098 commit 95a14d3
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 159 deletions.
39 changes: 2 additions & 37 deletions reana_workflow_controller/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
from reana_commons.k8s.api_client import (
current_k8s_batchv1_api_client,
current_k8s_corev1_api_client,
current_k8s_custom_objects_api_client,
current_k8s_networking_api_client,
)
from reana_commons.k8s.secrets import UserSecretsStore
from reana_commons.utils import (
Expand All @@ -46,10 +44,8 @@
REANA_JOB_STATUS_CONSUMER_PREFETCH_COUNT,
)
from reana_workflow_controller.errors import REANAWorkflowControllerError
from reana_workflow_controller.k8s import delete_dask_dashboard_ingress

from reana_workflow_controller.config import DASK_AUTOSCALER_ENABLED
from reana_workflow_controller.dask import requires_dask
from reana_workflow_controller.dask import requires_dask, delete_dask_cluster

try:
from urllib import parse as urlparse
Expand Down Expand Up @@ -169,7 +165,7 @@ def _update_workflow_status(workflow, status, logs):
workflow.logs += "Workflow engine logs could not be retrieved.\n"

if requires_dask(workflow):
_delete_dask_cluster(workflow)
delete_dask_cluster(workflow.id_, workflow.owner_id)

if RunStatus.should_cleanup_job(status):
try:
Expand Down Expand Up @@ -315,34 +311,3 @@ def _get_workflow_engine_pod_logs(workflow: Workflow) -> str:
# There might not be any pod returned by `list_namespaced_pod`, for example
# when a workflow fails to be scheduled
return ""


def _delete_dask_cluster(workflow: Workflow) -> None:
"""Delete the Dask cluster resources."""
current_k8s_custom_objects_api_client.delete_namespaced_custom_object(
group="kubernetes.dask.org",
version="v1",
plural="daskclusters",
name=get_dask_component_name(workflow.id_, "cluster"),
namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE,
)

if DASK_AUTOSCALER_ENABLED:
current_k8s_custom_objects_api_client.delete_namespaced_custom_object(
group="kubernetes.dask.org",
version="v1",
plural="daskautoscalers",
name=get_dask_component_name(workflow.id_, "autoscaler"),
namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE,
)

delete_dask_dashboard_ingress(workflow.id_)

dask_service = (
Session.query(Service)
.filter_by(name=get_dask_component_name(workflow.id_, "database_model_service"))
.one_or_none()
)
workflow.services.remove(dask_service)
Session.delete(dask_service)
Session.object_session(workflow).commit()
205 changes: 194 additions & 11 deletions reana_workflow_controller/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@

from flask import current_app

from kubernetes import client
from kubernetes.client.exceptions import ApiException

from reana_db.database import Session
from reana_db.models import Service
from reana_db.utils import _get_workflow_with_uuid_or_name
from reana_commons.config import (
K8S_CERN_EOS_AVAILABLE,
K8S_CERN_EOS_MOUNT_CONFIGURATION,
Expand All @@ -20,6 +26,7 @@
REANA_RUNTIME_KUBERNETES_NAMESPACE,
)
from reana_commons.k8s.api_client import (
current_k8s_networking_api_client,
current_k8s_custom_objects_api_client,
)
from reana_commons.k8s.kerberos import get_kerberos_k8s_config
Expand All @@ -28,11 +35,14 @@
get_workspace_volume,
get_reana_shared_volume,
)
from reana_commons.job_utils import kubernetes_memory_to_bytes
from reana_commons.utils import get_dask_component_name

from reana_workflow_controller.config import DASK_AUTOSCALER_ENABLED
from reana_workflow_controller.k8s import create_dask_dashboard_ingress
from reana_workflow_controller.config import (
DASK_AUTOSCALER_ENABLED,
REANA_INGRESS_HOST,
REANA_INGRESS_CLASS_NAME,
REANA_INGRESS_ANNOTATIONS,
)


class DaskResourceManager:
Expand Down Expand Up @@ -110,14 +120,21 @@ def _load_dask_autoscaler_template(self):

def create_dask_resources(self):
"""Create necessary Dask resources for the workflow."""
self._prepare_cluster()
self._create_dask_cluster()
try:
self._prepare_cluster()
self._create_dask_cluster()

if DASK_AUTOSCALER_ENABLED:
self._prepare_autoscaler()
self._create_dask_autoscaler()
if DASK_AUTOSCALER_ENABLED:
self._prepare_autoscaler()
self._create_dask_autoscaler()

create_dask_dashboard_ingress(self.cluster_name, self.workflow_id)

create_dask_dashboard_ingress(self.workflow_id)
except Exception as e:
logging.error(
f"An error occured while trying to create dask cluster, now deleting the cluster... Error message:\n{e}"
)
delete_dask_cluster(self.workflow_id, self.user_id)

def _prepare_cluster(self):
"""Prepare Dask cluster body by adding necessary image-pull secrets, volumes, volume mounts, init containers and sidecar containers."""
Expand Down Expand Up @@ -492,11 +509,11 @@ def _create_dask_cluster(self):
namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE,
body=self.cluster_body,
)
except Exception:
except Exception as e:
logging.exception(
"An error occurred while trying to create a Dask cluster."
)
raise
raise e

def _create_dask_autoscaler(self):
"""Create Dask autoscaler resource."""
Expand All @@ -520,3 +537,169 @@ def requires_dask(workflow):
return bool(
workflow.reana_specification["workflow"].get("resources", {}).get("dask", False)
)


def delete_dask_cluster(workflow_id, user_id) -> None:
"""Delete the Dask cluster resources."""
errors = [] # Collect errors during deletion attempts

try:
current_k8s_custom_objects_api_client.delete_namespaced_custom_object(
group="kubernetes.dask.org",
version="v1",
plural="daskclusters",
namespace="default",
name=get_dask_component_name(workflow_id, "cluster"),
)
logging.info(f"Dask cluster for workflow {workflow_id} deleted successfully.")
except Exception as e:
errors.append(f"Error deleting Dask cluster for workflow {workflow_id}: {e}")

if DASK_AUTOSCALER_ENABLED:
try:
current_k8s_custom_objects_api_client.delete_namespaced_custom_object(
group="kubernetes.dask.org",
version="v1",
plural="daskautoscalers",
namespace="default",
name=get_dask_component_name(workflow_id, "autoscaler"),
)
logging.info(
f"Dask autoscaler for workflow {workflow_id} deleted successfully."
)
except Exception as e:
errors.append(
f"Error deleting Dask autoscaler for workflow {workflow_id}: {e}"
)

try:
delete_dask_dashboard_ingress(workflow_id)
logging.info(
f"Dask dashboard ingress for workflow {workflow_id} deleted successfully."
)
except Exception as e:
errors.append(
f"Error deleting Dask dashboard ingress for workflow {workflow_id}: {e}"
)

try:
dask_service = (
Session.query(Service)
.filter_by(
name=get_dask_component_name(workflow_id, "database_model_service")
)
.one_or_none()
)
if dask_service:
workflow = _get_workflow_with_uuid_or_name(str(workflow_id), user_id)
workflow.services.remove(dask_service)
Session.delete(dask_service)
Session.object_session(workflow).commit()

except Exception as e:
errors.append(
f"Error deleting Dask Service model from database of the workflow: {workflow_id}: {e}"
)

# Raise collected errors if any
if errors:
logging.error("Errors occurred during resource deletion:\n" + "\n".join(errors))
raise RuntimeError(
"Errors occurred during resource deletion:\n" + "\n".join(errors)
)


def create_dask_dashboard_ingress(cluster_name, workflow_id):
"""Create K8S Ingress object for Dask dashboard."""
# Define the middleware spec
middleware_spec = {
"apiVersion": "traefik.io/v1alpha1",
"kind": "Middleware",
"metadata": {"name": f"replacepath-{workflow_id}", "namespace": "default"},
"spec": {
"replacePathRegex": {
"regex": f"/{workflow_id}/dashboard/*",
"replacement": "/$1",
}
},
}

ingress = client.V1Ingress(
api_version="networking.k8s.io/v1",
kind="Ingress",
metadata=client.V1ObjectMeta(
name=f"dask-dashboard-ingress-{cluster_name}",
annotations={
**REANA_INGRESS_ANNOTATIONS,
"traefik.ingress.kubernetes.io/router.middlewares": f"default-replacepath-{workflow_id}@kubernetescrd",
},
),
spec=client.V1IngressSpec(
rules=[
client.V1IngressRule(
host=REANA_INGRESS_HOST,
http=client.V1HTTPIngressRuleValue(
paths=[
client.V1HTTPIngressPath(
path=f"/{workflow_id}/dashboard",
path_type="Prefix",
backend=client.V1IngressBackend(
service=client.V1IngressServiceBackend(
name=f"{cluster_name}-scheduler",
port=client.V1ServiceBackendPort(number=8787),
)
),
)
]
),
)
]
),
)
if REANA_INGRESS_CLASS_NAME:
ingress.spec.ingress_class_name = REANA_INGRESS_CLASS_NAME

# Create middleware for ingress
current_k8s_custom_objects_api_client.create_namespaced_custom_object(
group="traefik.io",
version="v1alpha1",
namespace="default",
plural="middlewares",
body=middleware_spec,
)
# Create the ingress resource
current_k8s_networking_api_client.create_namespaced_ingress(
namespace="default", body=ingress
)


def delete_dask_dashboard_ingress(workflow_id):
"""Delete K8S Ingress Object for Dask dashboard."""
errors = [] # Collect errors during deletion attempts
try:
current_k8s_networking_api_client.delete_namespaced_ingress(
get_dask_component_name(workflow_id, "dashboard_ingress"),
namespace="default",
body=client.V1DeleteOptions(),
)
except Exception as e:
errors.append(
f"Error deleting Dask dashboard ingress for workflow {workflow_id}: {e}"
)

try:
current_k8s_custom_objects_api_client.delete_namespaced_custom_object(
group="traefik.io",
version="v1alpha1",
namespace="default",
plural="middlewares",
name=get_dask_component_name(workflow_id, "dashboard_ingress_middleware"),
)
except Exception as e:
errors.append(
f"Error deleting Dask dashboard ingress middleware for workflow {workflow_id}: {e}"
)

# Raise collected errors if any
if errors:
raise RuntimeError("\n".join(errors))
Loading

0 comments on commit 95a14d3

Please sign in to comment.