From b9bd76d6152f4c356053616bca3f1b45d0aa9e55 Mon Sep 17 00:00:00 2001 From: Alputer Date: Thu, 28 Nov 2024 10:18:44 +0100 Subject: [PATCH] fix(k8s): handle 409 conflict errors for dask resources (#618) Closes #617 --- reana_workflow_controller/dask.py | 12 ++++++++ reana_workflow_controller/k8s.py | 46 +++++++++++++++++++++---------- 2 files changed, 43 insertions(+), 15 deletions(-) diff --git a/reana_workflow_controller/dask.py b/reana_workflow_controller/dask.py index 3c74a837..48a24868 100644 --- a/reana_workflow_controller/dask.py +++ b/reana_workflow_controller/dask.py @@ -11,6 +11,8 @@ from flask import current_app +from kubernetes.client.exceptions import ApiException + from reana_commons.config import ( K8S_CERN_EOS_AVAILABLE, K8S_CERN_EOS_MOUNT_CONFIGURATION, @@ -489,6 +491,11 @@ def _create_dask_cluster(self): namespace="default", body=self.cluster_body, ) + except ApiException as e: + if e.status == 409: + logging.error("Conflict error: Dask cluster already exists.") + else: + raise except Exception: logging.exception( "An error occurred while trying to create a Dask cluster." @@ -505,6 +512,11 @@ def _create_dask_autoscaler(self): namespace="default", body=self.autoscaler_body, ) + except ApiException as e: + if e.status == 409: + logging.error("Conflict error: Dask autoscaler already exists.") + else: + raise except Exception: logging.exception( "An error occurred while trying to create a Dask autoscaler." diff --git a/reana_workflow_controller/k8s.py b/reana_workflow_controller/k8s.py index 5661f2c5..07026c5f 100644 --- a/reana_workflow_controller/k8s.py +++ b/reana_workflow_controller/k8s.py @@ -6,6 +6,8 @@ """REANA Workflow Controller Kubernetes utils.""" +import logging + from kubernetes import client from kubernetes.client.rest import ApiException from reana_commons.config import ( @@ -451,14 +453,22 @@ def create_dask_dashboard_ingress(cluster_name, workflow_id): if REANA_INGRESS_CLASS_NAME: ingress.spec.ingress_class_name = REANA_INGRESS_CLASS_NAME - # Create middleware for ingress - current_k8s_custom_objects_api_client.create_namespaced_custom_object( - group="traefik.io", - version="v1alpha1", - namespace="default", - plural="middlewares", - body=middleware_spec, - ) + try: + # Create middleware for ingress + current_k8s_custom_objects_api_client.create_namespaced_custom_object( + group="traefik.io", + version="v1alpha1", + namespace="default", + plural="middlewares", + body=middleware_spec, + ) + except ApiException as e: + if e.status == 409: + logging.error( + "Conflict error: Dask dashboard ingress middleware already exists." + ) + else: + raise # Create the ingress resource current_k8s_networking_api_client.create_namespaced_ingress( namespace="default", body=ingress @@ -470,10 +480,16 @@ def delete_dask_dashboard_ingress(cluster_name, workflow_id): current_k8s_networking_api_client.delete_namespaced_ingress( name=cluster_name, namespace="default", body=client.V1DeleteOptions() ) - current_k8s_custom_objects_api_client.delete_namespaced_custom_object( - group="traefik.io", - version="v1alpha1", - namespace="default", - plural="middlewares", - name=f"replacepath-{workflow_id}", - ) + try: + current_k8s_custom_objects_api_client.delete_namespaced_custom_object( + group="traefik.io", + version="v1alpha1", + namespace="default", + plural="middlewares", + name=f"replacepath-{workflow_id}", + ) + except ApiException as e: + if e.status == 409: + logging.error("Conflict error: Dask dashboard ingress already exists.") + else: + raise