Skip to content

Commit

Permalink
fix(k8s): handle exceptions better for Dask k8s resources (reanahub#618)
Browse files Browse the repository at this point in the history
  • Loading branch information
Alputer committed Jan 22, 2025
1 parent afd1400 commit 75fed4f
Show file tree
Hide file tree
Showing 3 changed files with 194 additions and 134 deletions.
41 changes: 2 additions & 39 deletions reana_workflow_controller/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
from reana_commons.k8s.api_client import (
current_k8s_batchv1_api_client,
current_k8s_corev1_api_client,
current_k8s_custom_objects_api_client,
current_k8s_networking_api_client,
)
from reana_commons.k8s.secrets import UserSecretsStore
from reana_commons.utils import (
Expand All @@ -45,10 +43,8 @@
REANA_JOB_STATUS_CONSUMER_PREFETCH_COUNT,
)
from reana_workflow_controller.errors import REANAWorkflowControllerError
from reana_workflow_controller.k8s import delete_dask_dashboard_ingress

from reana_workflow_controller.config import DASK_AUTOSCALER_ENABLED
from reana_workflow_controller.dask import requires_dask
from reana_workflow_controller.dask import requires_dask, delete_dask_cluster

try:
from urllib import parse as urlparse
Expand Down Expand Up @@ -168,7 +164,7 @@ def _update_workflow_status(workflow, status, logs):
workflow.logs += "Workflow engine logs could not be retrieved.\n"

if requires_dask(workflow):
_delete_dask_cluster(workflow)
delete_dask_cluster(workflow.id_, workflow.owner_id)

if RunStatus.should_cleanup_job(status):
try:
Expand Down Expand Up @@ -314,36 +310,3 @@ def _get_workflow_engine_pod_logs(workflow: Workflow) -> str:
# There might not be any pod returned by `list_namespaced_pod`, for example
# when a workflow fails to be scheduled
return ""


def _delete_dask_cluster(workflow: Workflow) -> None:
"""Delete the Dask cluster resources."""
current_k8s_custom_objects_api_client.delete_namespaced_custom_object(
group="kubernetes.dask.org",
version="v1",
plural="daskclusters",
namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE,
name=f"reana-run-dask-{workflow.id_}",
)

if DASK_AUTOSCALER_ENABLED:
current_k8s_custom_objects_api_client.delete_namespaced_custom_object(
group="kubernetes.dask.org",
version="v1",
plural="daskautoscalers",
namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE,
name=f"dask-autoscaler-reana-run-dask-{workflow.id_}",
)

delete_dask_dashboard_ingress(
f"dask-dashboard-ingress-reana-run-dask-{workflow.id_}", workflow.id_
)

dask_service = (
Session.query(Service)
.filter_by(name=f"dask-dashboard-{workflow.id_}")
.one_or_none()
)
workflow.services.remove(dask_service)
Session.delete(dask_service)
Session.object_session(workflow).commit()
203 changes: 192 additions & 11 deletions reana_workflow_controller/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@

from flask import current_app

from kubernetes import client
from kubernetes.client.exceptions import ApiException

from reana_db.database import Session
from reana_db.models import Service
from reana_db.utils import _get_workflow_with_uuid_or_name
from reana_commons.config import (
K8S_CERN_EOS_AVAILABLE,
K8S_CERN_EOS_MOUNT_CONFIGURATION,
Expand All @@ -20,6 +26,7 @@
REANA_RUNTIME_KUBERNETES_NAMESPACE,
)
from reana_commons.k8s.api_client import (
current_k8s_networking_api_client,
current_k8s_custom_objects_api_client,
)
from reana_commons.k8s.kerberos import get_kerberos_k8s_config
Expand All @@ -28,10 +35,13 @@
get_workspace_volume,
get_reana_shared_volume,
)
from reana_commons.job_utils import kubernetes_memory_to_bytes

from reana_workflow_controller.config import DASK_AUTOSCALER_ENABLED
from reana_workflow_controller.k8s import create_dask_dashboard_ingress
from reana_workflow_controller.config import (
DASK_AUTOSCALER_ENABLED,
REANA_INGRESS_HOST,
REANA_INGRESS_CLASS_NAME,
REANA_INGRESS_ANNOTATIONS,
)


class DaskResourceManager:
Expand Down Expand Up @@ -108,14 +118,21 @@ def _load_dask_autoscaler_template(self):

def create_dask_resources(self):
"""Create necessary Dask resources for the workflow."""
self._prepare_cluster()
self._create_dask_cluster()
try:
self._prepare_cluster()
self._create_dask_cluster()

if DASK_AUTOSCALER_ENABLED:
self._prepare_autoscaler()
self._create_dask_autoscaler()
if DASK_AUTOSCALER_ENABLED:
self._prepare_autoscaler()
self._create_dask_autoscaler()

create_dask_dashboard_ingress(self.cluster_name, self.workflow_id)

create_dask_dashboard_ingress(self.cluster_name, self.workflow_id)
except Exception as e:
logging.error(
f"An error occured while trying to create dask cluster, now deleting the cluster... Error message:\n{e}"
)
delete_dask_cluster(self.workflow_id, self.user_id)

def _prepare_cluster(self):
"""Prepare Dask cluster body by adding necessary image-pull secrets, volumes, volume mounts, init containers and sidecar containers."""
Expand Down Expand Up @@ -488,11 +505,11 @@ def _create_dask_cluster(self):
namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE,
body=self.cluster_body,
)
except Exception:
except Exception as e:
logging.exception(
"An error occurred while trying to create a Dask cluster."
)
raise
raise e

def _create_dask_autoscaler(self):
"""Create Dask autoscaler resource."""
Expand All @@ -516,3 +533,167 @@ def requires_dask(workflow):
return bool(
workflow.reana_specification["workflow"].get("resources", {}).get("dask", False)
)


def delete_dask_cluster(workflow_id, user_id) -> None:
"""Delete the Dask cluster resources."""
errors = [] # Collect errors during deletion attempts

try:
current_k8s_custom_objects_api_client.delete_namespaced_custom_object(
group="kubernetes.dask.org",
version="v1",
plural="daskclusters",
namespace="default",
name=f"reana-run-dask-{workflow_id}",
)
logging.info(f"Dask cluster for workflow {workflow_id} deleted successfully.")
except Exception as e:
errors.append(f"Error deleting Dask cluster for workflow {workflow_id}: {e}")

if DASK_AUTOSCALER_ENABLED:
try:
current_k8s_custom_objects_api_client.delete_namespaced_custom_object(
group="kubernetes.dask.org",
version="v1",
plural="daskautoscalers",
namespace="default",
name=f"dask-autoscaler-reana-run-dask-{workflow_id}",
)
logging.info(
f"Dask autoscaler for workflow {workflow_id} deleted successfully."
)
except Exception as e:
errors.append(
f"Error deleting Dask autoscaler for workflow {workflow_id}: {e}"
)

try:
delete_dask_dashboard_ingress(
f"dask-dashboard-ingress-reana-run-dask-{workflow_id}", workflow_id
)
logging.info(
f"Dask dashboard ingress for workflow {workflow_id} deleted successfully."
)
except Exception as e:
errors.append(
f"Error deleting Dask dashboard ingress for workflow {workflow_id}: {e}"
)

try:
dask_service = (
Session.query(Service)
.filter_by(name=f"dask-dashboard-{workflow_id}")
.one_or_none()
)
if dask_service:
workflow = _get_workflow_with_uuid_or_name(str(workflow_id), user_id)
workflow.services.remove(dask_service)
Session.delete(dask_service)
Session.object_session(workflow).commit()

except Exception as e:
errors.append(
f"Error deleting Dask Service model from database of the workflow: {workflow_id}: {e}"
)

# Raise collected errors if any
if errors:
logging.error("Errors occurred during resource deletion:\n" + "\n".join(errors))
raise RuntimeError(
"Errors occurred during resource deletion:\n" + "\n".join(errors)
)


def create_dask_dashboard_ingress(cluster_name, workflow_id):
"""Create K8S Ingress object for Dask dashboard."""
# Define the middleware spec
middleware_spec = {
"apiVersion": "traefik.io/v1alpha1",
"kind": "Middleware",
"metadata": {"name": f"replacepath-{workflow_id}", "namespace": "default"},
"spec": {
"replacePathRegex": {
"regex": f"/{workflow_id}/dashboard/*",
"replacement": "/$1",
}
},
}

ingress = client.V1Ingress(
api_version="networking.k8s.io/v1",
kind="Ingress",
metadata=client.V1ObjectMeta(
name=f"dask-dashboard-ingress-{cluster_name}",
annotations={
**REANA_INGRESS_ANNOTATIONS,
"traefik.ingress.kubernetes.io/router.middlewares": f"default-replacepath-{workflow_id}@kubernetescrd",
},
),
spec=client.V1IngressSpec(
rules=[
client.V1IngressRule(
host=REANA_INGRESS_HOST,
http=client.V1HTTPIngressRuleValue(
paths=[
client.V1HTTPIngressPath(
path=f"/{workflow_id}/dashboard",
path_type="Prefix",
backend=client.V1IngressBackend(
service=client.V1IngressServiceBackend(
name=f"{cluster_name}-scheduler",
port=client.V1ServiceBackendPort(number=8787),
)
),
)
]
),
)
]
),
)
if REANA_INGRESS_CLASS_NAME:
ingress.spec.ingress_class_name = REANA_INGRESS_CLASS_NAME

# Create middleware for ingress
current_k8s_custom_objects_api_client.create_namespaced_custom_object(
group="traefik.io",
version="v1alpha1",
namespace="default",
plural="middlewares",
body=middleware_spec,
)
# Create the ingress resource
current_k8s_networking_api_client.create_namespaced_ingress(
namespace="default", body=ingress
)


def delete_dask_dashboard_ingress(cluster_name, workflow_id):
"""Delete K8S Ingress Object for Dask dashboard."""
errors = [] # Collect errors during deletion attempts
try:
current_k8s_networking_api_client.delete_namespaced_ingress(
name=cluster_name, namespace="default", body=client.V1DeleteOptions()
)
except Exception as e:
errors.append(
f"Error deleting Dask dashboard ingress for workflow {workflow_id}: {e}"
)

try:
current_k8s_custom_objects_api_client.delete_namespaced_custom_object(
group="traefik.io",
version="v1alpha1",
namespace="default",
plural="middlewares",
name=f"replacepath-{workflow_id}",
)
except Exception as e:
errors.append(
f"Error deleting Dask dashboard ingress middleware for workflow {workflow_id}: {e}"
)

# Raise collected errors if any
if errors:
raise RuntimeError("\n".join(errors))
Loading

0 comments on commit 75fed4f

Please sign in to comment.