Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
mock run task call
Browse files Browse the repository at this point in the history
  • Loading branch information
jeanluciano committed Apr 10, 2024
1 parent 7e8bc45 commit a5743ce
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 0 deletions.
1 change: 1 addition & 0 deletions prefect_aws/workers/ecs_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1567,6 +1567,7 @@ def _prepare_task_run_request(
CREATE_TASK_RUN_MIN_DELAY_JITTER_SECONDS,
CREATE_TASK_RUN_MAX_DELAY_JITTER_SECONDS,
),
reraise=True,
)
def _create_task_run(self, ecs_client: _ECSClient, task_run_request: dict) -> str:
"""
Expand Down
37 changes: 37 additions & 0 deletions tests/workers/test_ecs_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
from functools import partial
from typing import Any, Awaitable, Callable, Dict, List, Optional
from unittest.mock import ANY, MagicMock
from unittest.mock import patch as mock_patch
from uuid import uuid4

import anyio
import botocore
import pytest
import yaml
from moto import mock_ec2, mock_ecs, mock_logs
Expand Down Expand Up @@ -1322,6 +1324,41 @@ async def write_fake_log(task_arn):
# assert "test-message-{i}" in err


orig = botocore.client.BaseClient._make_api_call


def mock_make_api_call(self, operation_name, kwarg):
if operation_name == "RunTask":
return {
"failures": [
{"arn": "string", "reason": "string", "detail": "string"},
]
}
return orig(self, operation_name, kwarg)


@pytest.mark.usefixtures("ecs_mocks")
async def test_run_task_error_handling(
aws_credentials: AwsCredentials,
flow_run: FlowRun,
capsys,
):
configuration = await construct_configuration(
aws_credentials=aws_credentials,
task_role_arn="test",
)

with mock_patch(
"botocore.client.BaseClient._make_api_call", new=mock_make_api_call
):
async with ECSWorker(work_pool_name="test") as worker:
with pytest.raises(RuntimeError) as exc:
await run_then_stop_task(worker, configuration, flow_run)

print(exc.value)
assert "Failed to run ECS task: string" in capsys.readouterr().out


@pytest.mark.usefixtures("ecs_mocks")
async def test_cloudwatch_log_options(
aws_credentials: AwsCredentials, flow_run: FlowRun
Expand Down

0 comments on commit a5743ce

Please sign in to comment.