Skip to content

Commit

Permalink
refactor(dask): use central function for Dask K8s component names (re…
Browse files Browse the repository at this point in the history
  • Loading branch information
Alputer committed Jan 28, 2025
1 parent 2797300 commit f252098
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 32 deletions.
11 changes: 5 additions & 6 deletions reana_workflow_controller/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
calculate_hash_of_dir,
calculate_job_input_hash,
build_unique_component_name,
get_dask_component_name,
)
from reana_db.database import Session
from reana_db.models import Job, JobCache, Workflow, RunStatus, Service
Expand Down Expand Up @@ -322,26 +323,24 @@ def _delete_dask_cluster(workflow: Workflow) -> None:
group="kubernetes.dask.org",
version="v1",
plural="daskclusters",
name=get_dask_component_name(workflow.id_, "cluster"),
namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE,
name=f"reana-run-dask-{workflow.id_}",
)

if DASK_AUTOSCALER_ENABLED:
current_k8s_custom_objects_api_client.delete_namespaced_custom_object(
group="kubernetes.dask.org",
version="v1",
plural="daskautoscalers",
name=get_dask_component_name(workflow.id_, "autoscaler"),
namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE,
name=f"dask-autoscaler-reana-run-dask-{workflow.id_}",
)

delete_dask_dashboard_ingress(
f"dask-dashboard-ingress-reana-run-dask-{workflow.id_}", workflow.id_
)
delete_dask_dashboard_ingress(workflow.id_)

dask_service = (
Session.query(Service)
.filter_by(name=f"dask-dashboard-{workflow.id_}")
.filter_by(name=get_dask_component_name(workflow.id_, "database_model_service"))
.one_or_none()
)
workflow.services.remove(dask_service)
Expand Down
16 changes: 10 additions & 6 deletions reana_workflow_controller/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
get_reana_shared_volume,
)
from reana_commons.job_utils import kubernetes_memory_to_bytes
from reana_commons.utils import get_dask_component_name

from reana_workflow_controller.config import DASK_AUTOSCALER_ENABLED
from reana_workflow_controller.k8s import create_dask_dashboard_ingress
Expand All @@ -39,7 +40,7 @@ class DaskResourceManager:

def __init__(
self,
cluster_name,
workflow_id,
workflow_spec,
workflow_workspace,
user_id,
Expand All @@ -57,10 +58,9 @@ def __init__(
:param user_id: Id of the user
:type user_id: str
"""
self.cluster_name = cluster_name
self.cluster_name = get_dask_component_name(workflow_id, "cluster")
self.num_of_workers = num_of_workers
self.single_worker_memory = single_worker_memory
self.autoscaler_name = f"dask-autoscaler-{cluster_name}"
self.workflow_spec = workflow_spec
self.workflow_workspace = workflow_workspace
self.workflow_id = workflow_workspace.split("/")[-1]
Expand All @@ -69,7 +69,9 @@ def __init__(
self.cluster_spec = workflow_spec.get("resources", {}).get("dask", [])
self.cluster_body = self._load_dask_cluster_template()
self.cluster_image = self.cluster_spec["image"]
self.dask_scheduler_uri = f"{self.cluster_name}-scheduler.{REANA_RUNTIME_KUBERNETES_NAMESPACE}.svc.cluster.local:8786"
self.dask_scheduler_uri = get_dask_component_name(
workflow_id, "dashboard_service_uri", REANA_RUNTIME_KUBERNETES_NAMESPACE
)

self.secrets_store = UserSecretsStore.fetch(self.user_id)
self.secret_env_vars = self.secrets_store.get_env_secrets_as_k8s_spec()
Expand All @@ -79,7 +81,7 @@ def __init__(
self.kubernetes_uid = WORKFLOW_RUNTIME_USER_UID

if DASK_AUTOSCALER_ENABLED:
self.autoscaler_name = f"dask-autoscaler-{cluster_name}"
self.autoscaler_name = get_dask_component_name(workflow_id, "autoscaler")
self.autoscaler_body = self._load_dask_autoscaler_template()

def _load_dask_cluster_template(self):
Expand Down Expand Up @@ -115,7 +117,7 @@ def create_dask_resources(self):
self._prepare_autoscaler()
self._create_dask_autoscaler()

create_dask_dashboard_ingress(self.cluster_name, self.workflow_id)
create_dask_dashboard_ingress(self.workflow_id)

def _prepare_cluster(self):
"""Prepare Dask cluster body by adding necessary image-pull secrets, volumes, volume mounts, init containers and sidecar containers."""
Expand All @@ -128,6 +130,8 @@ def _prepare_cluster(self):
# Add the name of the cluster, used in scheduler service name
self.cluster_body["metadata"] = {"name": self.cluster_name}

# self.cluster_body["spec"]["worker"]["spec"]["metadata"] = {"name": "amcik"}

self.cluster_body["spec"]["scheduler"]["service"]["selector"][
"dask.org/cluster-name"
] = self.cluster_name
Expand Down
21 changes: 13 additions & 8 deletions reana_workflow_controller/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
get_k8s_cvmfs_volumes,
get_workspace_volume,
)
from reana_commons.utils import get_dask_component_name

from reana_workflow_controller.config import ( # isort:skip
JUPYTER_INTERACTIVE_SESSION_DEFAULT_PORT,
Expand Down Expand Up @@ -404,14 +405,16 @@ def delete_k8s_ingress_object(ingress_name, namespace):
)


def create_dask_dashboard_ingress(cluster_name, workflow_id):
def create_dask_dashboard_ingress(workflow_id):
"""Create K8S Ingress object for Dask dashboard."""
# Define the middleware spec
middleware_spec = {
"apiVersion": "traefik.io/v1alpha1",
"kind": "Middleware",
"metadata": {
"name": f"replacepath-{workflow_id}",
"name": get_dask_component_name(
workflow_id, "dashboard_ingress_middleware"
),
"namespace": REANA_RUNTIME_KUBERNETES_NAMESPACE,
},
"spec": {
Expand All @@ -426,10 +429,10 @@ def create_dask_dashboard_ingress(cluster_name, workflow_id):
api_version="networking.k8s.io/v1",
kind="Ingress",
metadata=client.V1ObjectMeta(
name=f"dask-dashboard-ingress-{cluster_name}",
name=get_dask_component_name(workflow_id, "dashboard_ingress"),
annotations={
**REANA_INGRESS_ANNOTATIONS,
"traefik.ingress.kubernetes.io/router.middlewares": f"{REANA_RUNTIME_KUBERNETES_NAMESPACE}-replacepath-{workflow_id}@kubernetescrd",
"traefik.ingress.kubernetes.io/router.middlewares": f"{REANA_RUNTIME_KUBERNETES_NAMESPACE}-{get_dask_component_name(workflow_id, 'dashboard_ingress_middleware')}@kubernetescrd",
},
),
spec=client.V1IngressSpec(
Expand All @@ -443,7 +446,9 @@ def create_dask_dashboard_ingress(cluster_name, workflow_id):
path_type="Prefix",
backend=client.V1IngressBackend(
service=client.V1IngressServiceBackend(
name=f"{cluster_name}-scheduler",
name=get_dask_component_name(
workflow_id, "dashboard_service"
),
port=client.V1ServiceBackendPort(number=8787),
)
),
Expand Down Expand Up @@ -471,10 +476,10 @@ def create_dask_dashboard_ingress(cluster_name, workflow_id):
)


def delete_dask_dashboard_ingress(cluster_name, workflow_id):
def delete_dask_dashboard_ingress(workflow_id):
"""Delete K8S Ingress Object for Dask dashboard."""
current_k8s_networking_api_client.delete_namespaced_ingress(
name=cluster_name,
name=get_dask_component_name(workflow_id, "dashboard_ingress"),
namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE,
body=client.V1DeleteOptions(),
)
Expand All @@ -483,7 +488,7 @@ def delete_dask_dashboard_ingress(cluster_name, workflow_id):
version="v1alpha1",
namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE,
plural="middlewares",
name=f"replacepath-{workflow_id}",
name=get_dask_component_name(workflow_id, "dashboard_ingress_middleware"),
)


Expand Down
5 changes: 3 additions & 2 deletions reana_workflow_controller/opensearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import logging
from opensearchpy import OpenSearch

from reana_commons.utils import get_dask_component_name
from reana_workflow_controller.config import (
REANA_OPENSEARCH_CA_CERTS,
REANA_OPENSEARCH_HOST,
Expand Down Expand Up @@ -188,7 +189,7 @@ def fetch_dask_scheduler_logs(self, workflow_id: str) -> str | None:
id=None,
index=self.dask_index,
matches={
self.dask_log_matcher: f"reana-run-dask-{workflow_id}",
self.dask_log_matcher: get_dask_component_name(workflow_id, "cluster"),
"kubernetes.labels.dask.org/component": "scheduler",
},
)
Expand All @@ -205,7 +206,7 @@ def fetch_dask_worker_logs(self, workflow_id: str) -> str | None:
id=None,
index=self.dask_index,
matches={
self.dask_log_matcher: f"reana-run-dask-{workflow_id}",
self.dask_log_matcher: get_dask_component_name(workflow_id, "cluster"),
"kubernetes.labels.dask.org/component": "worker",
},
)
Expand Down
8 changes: 4 additions & 4 deletions reana_workflow_controller/rest/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from webargs import fields, validate
from webargs.flaskparser import use_args, use_kwargs
from reana_commons.config import WORKFLOW_TIME_FORMAT
from reana_commons.utils import build_unique_component_name
from reana_commons.utils import build_unique_component_name, get_dask_component_name
from reana_db.database import Session
from reana_db.models import (
RunStatus,
Expand Down Expand Up @@ -416,7 +416,7 @@ def get_workflows(args, paginate=None): # noqa
dask_service = workflow.services.first()
if dask_service and dask_service.status == ServiceStatus.created:
pod_readiness = check_pod_readiness_by_prefix(
pod_name_prefix=f"reana-run-dask-{workflow.id_}"
pod_name_prefix=get_dask_component_name(workflow.id_, "cluster")
)

if pod_readiness == "Ready":
Expand Down Expand Up @@ -630,12 +630,12 @@ def create_workflow(): # noqa
)
if requires_dask(workflow):
dask_service = Service(
name=f"dask-dashboard-{workflow_uuid}",
name=get_dask_component_name(workflow.id_, "database_model_service"),
uri=f"https://{REANA_HOSTNAME}/{workflow_uuid}/dashboard/status",
type_=ServiceType.dask,
status=ServiceStatus.created,
owner_id=request.args["user"],
)

workflow.services.append(dask_service)

Session.add(workflow)
Expand Down
15 changes: 12 additions & 3 deletions reana_workflow_controller/workflow_run_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,18 @@
from reana_commons.utils import (
build_unique_component_name,
format_cmd,
get_dask_component_name,
)
from reana_db.config import SQLALCHEMY_DATABASE_URI
from reana_db.database import Session
from reana_db.models import Job, JobStatus, InteractiveSession, InteractiveSessionType

from reana_workflow_controller.errors import REANAInteractiveSessionError

from reana_workflow_controller.dask import DaskResourceManager, requires_dask
from reana_workflow_controller.dask import (
DaskResourceManager,
requires_dask,
)

from reana_workflow_controller.k8s import (
build_interactive_k8s_objects,
Expand Down Expand Up @@ -374,9 +378,10 @@ def start_batch_workflow_run(

try:
# Create the dask cluster and required resources

if requires_dask(self.workflow):
DaskResourceManager(
cluster_name=f"reana-run-dask-{self.workflow.id_}",
workflow_id=self.workflow.id_,
workflow_spec=self.workflow.reana_specification["workflow"],
workflow_workspace=self.workflow.workspace_path,
user_id=self.workflow.owner_id,
Expand Down Expand Up @@ -759,7 +764,11 @@ def _create_job_spec(
job_controller_container.env.append(
{
"name": "DASK_SCHEDULER_URI",
"value": f"reana-run-dask-{self.workflow.id_}-scheduler.{REANA_RUNTIME_KUBERNETES_NAMESPACE}.svc.cluster.local:8786",
"value": get_dask_component_name(
self.workflow.id_,
"dashboard_service_uri",
REANA_RUNTIME_KUBERNETES_NAMESPACE,
),
},
)

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
"marshmallow>2.13.0,<3.0.0", # same upper pin as reana-server
"opensearch-py>=2.7.0,<2.8.0",
"packaging>=18.0",
"reana-commons[kubernetes]>=0.95.0a6,<0.96.0",
"reana-commons[kubernetes]>=0.95.0a7,<0.96.0",
"reana-db>=0.95.0a5,<0.96.0",
"requests>=2.25.0",
"sqlalchemy-utils>=0.31.0",
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ def mock_user_secrets(monkeypatch):
def dask_resource_manager(sample_serial_workflow_in_db_with_dask, mock_user_secrets):
"""Fixture to create a DaskResourceManager instance."""
manager = DaskResourceManager(
cluster_name="test-cluster",
workflow_id="9eef9a08-5629-420d-8e97-29d498d88e20",
workflow_spec=sample_serial_workflow_in_db_with_dask.reana_specification[
"workflow"
],
Expand Down
2 changes: 1 addition & 1 deletion tests/test_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def test_create_dask_resources(dask_resource_manager):
mock_create_cluster.assert_called_once()
mock_create_autoscaler.assert_called_once()
mock_create_dashboard_ingress.assert_called_once_with(
dask_resource_manager.cluster_name, dask_resource_manager.workflow_id
dask_resource_manager.workflow_id
)


Expand Down

0 comments on commit f252098

Please sign in to comment.