Skip to content

Commit

Permalink
fix(k8s): handle 409 conflict errors for dask resources (reanahub#618)
Browse files Browse the repository at this point in the history
  • Loading branch information
Alputer committed Nov 28, 2024
1 parent 7896c61 commit f56f50c
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 16 deletions.
12 changes: 12 additions & 0 deletions reana_workflow_controller/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

from flask import current_app

from kubernetes.client.exceptions import ApiException

from reana_commons.config import (
K8S_CERN_EOS_AVAILABLE,
K8S_CERN_EOS_MOUNT_CONFIGURATION,
Expand Down Expand Up @@ -489,6 +491,11 @@ def _create_dask_cluster(self):
namespace="default",
body=self.cluster_body,
)
except ApiException as e:
if e.status == 409:
logging.error("Conflict error: Dask cluster already exists.")
else:
raise
except Exception:
logging.exception(
"An error occurred while trying to create a Dask cluster."
Expand All @@ -505,6 +512,11 @@ def _create_dask_autoscaler(self):
namespace="default",
body=self.autoscaler_body,
)
except ApiException as e:
if e.status == 409:
logging.error("Conflict error: Dask autoscaler already exists.")
else:
raise
except Exception:
logging.exception(
"An error occurred while trying to create a Dask autoscaler."
Expand Down
48 changes: 32 additions & 16 deletions reana_workflow_controller/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

"""REANA Workflow Controller Kubernetes utils."""

import logging

from kubernetes import client
from kubernetes.client.rest import ApiException
from reana_commons.config import (
Expand Down Expand Up @@ -418,7 +420,7 @@ def create_dask_dashboard_ingress(cluster_name, workflow_id):

ingress = client.V1Ingress(
api_version="networking.k8s.io/v1",
kind="Ingress",
kind="Ingrss",
metadata=client.V1ObjectMeta(
name=f"dask-dashboard-ingress-{cluster_name}",
annotations={
Expand Down Expand Up @@ -451,14 +453,22 @@ def create_dask_dashboard_ingress(cluster_name, workflow_id):
if REANA_INGRESS_CLASS_NAME:
ingress.spec.ingress_class_name = REANA_INGRESS_CLASS_NAME

# Create middleware for ingress
current_k8s_custom_objects_api_client.create_namespaced_custom_object(
group="traefik.io",
version="v1alpha1",
namespace="default",
plural="middlewares",
body=middleware_spec,
)
try:
# Create middleware for ingress
current_k8s_custom_objects_api_client.create_namespaced_custom_object(
group="traefik.io",
version="v1alpha1",
namespace="default",
plural="middlewares",
body=middleware_spec,
)
except ApiException as e:
if e.status == 409:
logging.error(
"Conflict error: Dask dashboard ingress middleware already exists."
)
else:
raise
# Create the ingress resource
current_k8s_networking_api_client.create_namespaced_ingress(
namespace="default", body=ingress
Expand All @@ -470,10 +480,16 @@ def delete_dask_dashboard_ingress(cluster_name, workflow_id):
current_k8s_networking_api_client.delete_namespaced_ingress(
name=cluster_name, namespace="default", body=client.V1DeleteOptions()
)
current_k8s_custom_objects_api_client.delete_namespaced_custom_object(
group="traefik.io",
version="v1alpha1",
namespace="default",
plural="middlewares",
name=f"replacepath-{workflow_id}",
)
try:
current_k8s_custom_objects_api_client.delete_namespaced_custom_object(
group="traefik.io",
version="v1alpha1",
namespace="default",
plural="middlewares",
name=f"replacepath-{workflow_id}",
)
except ApiException as e:
if e.status == 409:
logging.error("Conflict error: Dask dashboard ingress already exists.")
else:
raise

0 comments on commit f56f50c

Please sign in to comment.