diff --git a/prefect_aws/workers/ecs_worker.py b/prefect_aws/workers/ecs_worker.py index 1a5c3d28..917d8bab 100644 --- a/prefect_aws/workers/ecs_worker.py +++ b/prefect_aws/workers/ecs_worker.py @@ -259,6 +259,7 @@ class ECSJobConfiguration(BaseJobConfiguration): ) configure_cloudwatch_logs: Optional[bool] = Field(default=None) cloudwatch_logs_options: Dict[str, str] = Field(default_factory=dict) + cloudwatch_logs_prefix: Optional[str] = Field(default=None) network_configuration: Dict[str, Any] = Field(default_factory=dict) stream_output: Optional[bool] = Field(default=None) task_start_timeout_seconds: int = Field(default=300) @@ -507,6 +508,16 @@ class ECSVariables(BaseVariables): " for available options. " ), ) + cloudwatch_logs_prefix: Optional[str] = Field( + default=None, + description=( + "When `configure_cloudwatch_logs` is enabled, this setting may be used to" + " set a prefix for the log group. If not provided, the default prefix will" + " be `prefect-logs__`. If" + " `awslogs-stream-prefix` is present in `Cloudwatch logs options` this" + " setting will be ignored." + ), + ) network_configuration: Dict[str, Any] = Field( default_factory=dict, @@ -1276,13 +1287,16 @@ def _prepare_task_definition( container["environment"].remove(item) if configuration.configure_cloudwatch_logs: + prefix = f"prefect-logs_{self._work_pool_name}_{flow_run.deployment_id}" container["logConfiguration"] = { "logDriver": "awslogs", "options": { "awslogs-create-group": "true", "awslogs-group": "prefect", "awslogs-region": region, - "awslogs-stream-prefix": configuration.name or "prefect", + "awslogs-stream-prefix": ( + configuration.cloudwatch_logs_prefix or prefix + ), **configuration.cloudwatch_logs_options, }, } diff --git a/tests/test_ecs.py b/tests/test_ecs.py index 6c429e9f..b105fcd6 100644 --- a/tests/test_ecs.py +++ b/tests/test_ecs.py @@ -1232,8 +1232,8 @@ async def test_cloudwatch_log_options(aws_credentials): configure_cloudwatch_logs=True, execution_role_arn="test", cloudwatch_logs_options={ - "awslogs-stream-prefix": "override-prefix", "max-buffer-size": "2m", + "awslogs-stream-prefix": "override-prefix", }, ) diff --git a/tests/workers/test_ecs_worker.py b/tests/workers/test_ecs_worker.py index e4dab38c..509393f8 100644 --- a/tests/workers/test_ecs_worker.py +++ b/tests/workers/test_ecs_worker.py @@ -1324,8 +1324,20 @@ async def write_fake_log(task_arn): @pytest.mark.usefixtures("ecs_mocks") +@pytest.mark.parametrize( + "cloudwatch_logs_options", + [ + { + "awslogs-stream-prefix": "override-prefix", + "max-buffer-size": "2m", + }, + { + "max-buffer-size": "2m", + }, + ], +) async def test_cloudwatch_log_options( - aws_credentials: AwsCredentials, flow_run: FlowRun + aws_credentials: AwsCredentials, flow_run: FlowRun, cloudwatch_logs_options: dict ): session = aws_credentials.get_boto3_session() ecs_client = session.client("ecs") @@ -1334,12 +1346,10 @@ async def test_cloudwatch_log_options( aws_credentials=aws_credentials, configure_cloudwatch_logs=True, execution_role_arn="test", - cloudwatch_logs_options={ - "awslogs-stream-prefix": "override-prefix", - "max-buffer-size": "2m", - }, + cloudwatch_logs_options=cloudwatch_logs_options, ) - async with ECSWorker(work_pool_name="test") as worker: + work_pool_name = "test" + async with ECSWorker(work_pool_name=work_pool_name) as worker: result = await run_then_stop_task(worker, configuration, flow_run) assert result.status_code == 0 @@ -1349,6 +1359,9 @@ async def test_cloudwatch_log_options( task_definition = describe_task_definition(ecs_client, task) for container in task_definition["containerDefinitions"]: + prefix = f"prefect-logs_{work_pool_name}_{flow_run.deployment_id}" + if cloudwatch_logs_options.get("awslogs-stream-prefix"): + prefix = cloudwatch_logs_options["awslogs-stream-prefix"] if container["name"] == ECS_DEFAULT_CONTAINER_NAME: # Assert that the container has logging configured with user # provided options @@ -1358,7 +1371,7 @@ async def test_cloudwatch_log_options( "awslogs-create-group": "true", "awslogs-group": "prefect", "awslogs-region": "us-east-1", - "awslogs-stream-prefix": "override-prefix", + "awslogs-stream-prefix": prefix, "max-buffer-size": "2m", }, }