From 144de7a41ea9af6ec169463063e44155ea7e04c6 Mon Sep 17 00:00:00 2001 From: Max <43565398+maxim-mityutko@users.noreply.github.com> Date: Wed, 8 May 2024 18:45:37 +0200 Subject: [PATCH] fix (#113) Co-authored-by: Maxim Mityutko --- .../airflow/operators/external_tasks.py | 14 ++-- tests/airflow_plugins/test_autosys.py | 72 +++++++++++++++++++ 2 files changed, 81 insertions(+), 5 deletions(-) create mode 100644 tests/airflow_plugins/test_autosys.py diff --git a/brickflow_plugins/airflow/operators/external_tasks.py b/brickflow_plugins/airflow/operators/external_tasks.py index 66011b6b..5f72f60c 100644 --- a/brickflow_plugins/airflow/operators/external_tasks.py +++ b/brickflow_plugins/airflow/operators/external_tasks.py @@ -387,9 +387,9 @@ def poke(self, context): else: status = response.json()["status"][:2].upper() - last_end_timestamp = parse(response.json()["lastEndUTC"]).replace( - tzinfo=pytz.UTC - ) + last_end_timestamp = None + if last_end_utc := response.json().get("lastEndUTC"): + last_end_timestamp = parse(last_end_utc).replace(tzinfo=pytz.UTC) time_delta = ( self.time_delta @@ -397,10 +397,14 @@ def poke(self, context): else timedelta(**self.time_delta) ) - execution_timestamp = parse(str(context["execution_date"])) + execution_timestamp = parse(context["execution_date"]) run_timestamp = execution_timestamp - time_delta - if "SU" in status and last_end_timestamp >= run_timestamp: + if ( + "SU" in status + and last_end_timestamp + and last_end_timestamp >= run_timestamp + ): logging.info( f"Last End: {last_end_timestamp}, Run Timestamp: {run_timestamp}" ) diff --git a/tests/airflow_plugins/test_autosys.py b/tests/airflow_plugins/test_autosys.py new file mode 100644 index 00000000..f7329d67 --- /dev/null +++ b/tests/airflow_plugins/test_autosys.py @@ -0,0 +1,72 @@ +import pytest +from requests.exceptions import HTTPError +from requests_mock.mocker import Mocker as RequestsMocker + +from brickflow_plugins.airflow.operators.external_tasks import AutosysSensor + + +class TestAutosysSensor: + @pytest.fixture(autouse=True, name="api", scope="class") + def mock_api(self): + rm = RequestsMocker() + rm.register_uri( + method="GET", + url="https://42.autosys.my-org.com/foo", + response_list=[ + # Test 1: Success + { + "json": {"status": "SU", "lastEndUTC": "2024-01-01T00:55:00Z"}, + "status_code": int(200), + }, + # Test 2: Raise Error + { + "json": {}, + "status_code": int(404), + }, + # Test 3: Poke 4 times until success + { + "json": {"status": "FA", "lastEndUTC": "2024-01-01T00:55:00Z"}, + "status_code": int(200), + }, + { + "json": {"status": "UNK", "lastEndUTC": None}, + "status_code": int(200), + }, + { + "json": {"status": "UNK", "lastEndUTC": ""}, + "status_code": int(200), + }, + { + "json": {"status": "SU", "lastEndUTC": "2024-01-01T01:55:00Z"}, + "status_code": int(200), + }, + ], + ) + yield rm + + @pytest.fixture() + def sensor(self): + yield AutosysSensor( + task_id="test", + url="https://42.autosys.my-org.com/", + job_name="foo", + poke_interval=1, + time_delta={"hours": 1}, + ) + + def test_success(self, api, caplog, sensor): + with api: + sensor.poke(context={"execution_date": "2024-01-01T01:00:00Z"}) + assert caplog.text.count("Poking again") == 0 + assert "Success criteria met. Exiting" in caplog.text + + def test_non_200(self, api, sensor): + with pytest.raises(HTTPError): + with api: + sensor.poke(context={"execution_date": "2024-01-01T01:00:00Z"}) + + def test_poking(self, api, caplog, sensor): + with api: + sensor.poke(context={"execution_date": "2024-01-01T02:00:00Z"}) + assert caplog.text.count("Poking again") == 3 + assert "Success criteria met. Exiting" in caplog.text