From 0120515f6c0061711feba4990cfc61db47a5b4f0 Mon Sep 17 00:00:00 2001 From: harjeevan maan Date: Thu, 3 Oct 2024 22:02:54 -0400 Subject: [PATCH] Added unit tests and restructred `await_xcom_sidecar_container_start` method. (#42504) * Added unit tests and restructred `await_xcom_sidecar_container_start` method. - The `await_xcom_sidecar_container_start` method in `PodManager` checks if the xcom sidecar container has started running before executing `do_xcom_push`. - The function logs the status periodically and raises an `AirflowException` if the container does not start within the specified timeout. - Added two unit tests: - `test_await_xcom_sidecar_container_timeout`: Verifies that an `AirflowException` is raised if the sidecar container fails to start within the timeout. - `test_await_xcom_sidecar_container_starts`: Confirms the method successfully exits when the sidecar container starts. * Fixed the assertion test failures --- .../cncf/kubernetes/utils/pod_manager.py | 26 ++++++++++++++----- .../cncf/kubernetes/utils/test_pod_manager.py | 15 +++++++++++ 2 files changed, 35 insertions(+), 6 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 7c283eaccc98..cd91dc09281f 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -19,7 +19,6 @@ from __future__ import annotations import enum -import itertools import json import math import time @@ -721,14 +720,29 @@ def read_pod(self, pod: V1Pod) -> V1Pod: except HTTPError as e: raise AirflowException(f"There was an error reading the kubernetes API: {e}") - def await_xcom_sidecar_container_start(self, pod: V1Pod) -> None: + def await_xcom_sidecar_container_start( + self, pod: V1Pod, timeout: int = 900, log_interval: int = 30 + ) -> None: + """Check if the sidecar container has reached the 'Running' state before performing do_xcom_push.""" self.log.info("Checking if xcom sidecar container is started.") - for attempt in itertools.count(): + start_time = time.time() + last_log_time = start_time + + while True: + elapsed_time = time.time() - start_time if self.container_is_running(pod, PodDefaults.SIDECAR_CONTAINER_NAME): - self.log.info("The xcom sidecar container is started.") + self.log.info("The xcom sidecar container has started.") break - if not attempt: - self.log.warning("The xcom sidecar container is not yet started.") + if (time.time() - last_log_time) >= log_interval: + self.log.warning( + "Still waiting for the xcom sidecar container to start. Elapsed time: %d seconds.", + int(elapsed_time), + ) + last_log_time = time.time() + if elapsed_time > timeout: + raise AirflowException( + f"Xcom sidecar container did not start within {timeout // 60} minutes." + ) time.sleep(1) def extract_xcom(self, pod: V1Pod) -> str: diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py index 3e4f2d086fc1..73dac5255d62 100644 --- a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py +++ b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py @@ -601,6 +601,21 @@ def test_extract_xcom_none(self, mock_exec_xcom_kill, mock_exec_pod_command, moc self.pod_manager.extract_xcom(pod=mock_pod) assert mock_exec_xcom_kill.call_count == 1 + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running") + def test_await_xcom_sidecar_container_timeout(self, mock_container_is_running): + mock_pod = MagicMock() + mock_container_is_running.return_value = False + with pytest.raises(AirflowException): + self.pod_manager.await_xcom_sidecar_container_start(pod=mock_pod, timeout=10, log_interval=5) + mock_container_is_running.assert_any_call(mock_pod, "airflow-xcom-sidecar") + + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running") + def test_await_xcom_sidecar_container_starts(self, mock_container_is_running): + mock_pod = MagicMock() + mock_container_is_running.return_value = True + self.pod_manager.await_xcom_sidecar_container_start(pod=mock_pod) + mock_container_is_running.assert_any_call(mock_pod, "airflow-xcom-sidecar") + def params_for_test_container_is_running(): """The `container_is_running` method is designed to handle an assortment of bad objects