Skip to content

Commit

Permalink
Added unit tests and restructred await_xcom_sidecar_container_start
Browse files Browse the repository at this point in the history
… method. (apache#42504)

* Added unit tests and restructred `await_xcom_sidecar_container_start`
method.

- The `await_xcom_sidecar_container_start` method in `PodManager` checks if the xcom sidecar container has started running before executing `do_xcom_push`.
- The function logs the status periodically and raises an `AirflowException` if the container does not start within the specified timeout.
- Added two unit tests:
  - `test_await_xcom_sidecar_container_timeout`: Verifies that an `AirflowException` is raised if the sidecar container fails to start within the timeout.
  - `test_await_xcom_sidecar_container_starts`: Confirms the method successfully exits when the sidecar container starts.

* Fixed the assertion test failures
  • Loading branch information
harjeevanmaan authored Oct 4, 2024
1 parent 455caf1 commit 0120515
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 6 deletions.
26 changes: 20 additions & 6 deletions airflow/providers/cncf/kubernetes/utils/pod_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from __future__ import annotations

import enum
import itertools
import json
import math
import time
Expand Down Expand Up @@ -721,14 +720,29 @@ def read_pod(self, pod: V1Pod) -> V1Pod:
except HTTPError as e:
raise AirflowException(f"There was an error reading the kubernetes API: {e}")

def await_xcom_sidecar_container_start(self, pod: V1Pod) -> None:
def await_xcom_sidecar_container_start(
self, pod: V1Pod, timeout: int = 900, log_interval: int = 30
) -> None:
"""Check if the sidecar container has reached the 'Running' state before performing do_xcom_push."""
self.log.info("Checking if xcom sidecar container is started.")
for attempt in itertools.count():
start_time = time.time()
last_log_time = start_time

while True:
elapsed_time = time.time() - start_time
if self.container_is_running(pod, PodDefaults.SIDECAR_CONTAINER_NAME):
self.log.info("The xcom sidecar container is started.")
self.log.info("The xcom sidecar container has started.")
break
if not attempt:
self.log.warning("The xcom sidecar container is not yet started.")
if (time.time() - last_log_time) >= log_interval:
self.log.warning(
"Still waiting for the xcom sidecar container to start. Elapsed time: %d seconds.",
int(elapsed_time),
)
last_log_time = time.time()
if elapsed_time > timeout:
raise AirflowException(
f"Xcom sidecar container did not start within {timeout // 60} minutes."
)
time.sleep(1)

def extract_xcom(self, pod: V1Pod) -> str:
Expand Down
15 changes: 15 additions & 0 deletions tests/providers/cncf/kubernetes/utils/test_pod_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,21 @@ def test_extract_xcom_none(self, mock_exec_xcom_kill, mock_exec_pod_command, moc
self.pod_manager.extract_xcom(pod=mock_pod)
assert mock_exec_xcom_kill.call_count == 1

@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running")
def test_await_xcom_sidecar_container_timeout(self, mock_container_is_running):
mock_pod = MagicMock()
mock_container_is_running.return_value = False
with pytest.raises(AirflowException):
self.pod_manager.await_xcom_sidecar_container_start(pod=mock_pod, timeout=10, log_interval=5)
mock_container_is_running.assert_any_call(mock_pod, "airflow-xcom-sidecar")

@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running")
def test_await_xcom_sidecar_container_starts(self, mock_container_is_running):
mock_pod = MagicMock()
mock_container_is_running.return_value = True
self.pod_manager.await_xcom_sidecar_container_start(pod=mock_pod)
mock_container_is_running.assert_any_call(mock_pod, "airflow-xcom-sidecar")


def params_for_test_container_is_running():
"""The `container_is_running` method is designed to handle an assortment of bad objects
Expand Down

0 comments on commit 0120515

Please sign in to comment.