Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
formats prefix to not be flowrun every time (#400)
Browse files Browse the repository at this point in the history
  • Loading branch information
jeanluciano authored Apr 3, 2024
1 parent df3b9f0 commit e6b15c2
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 9 deletions.
16 changes: 15 additions & 1 deletion prefect_aws/workers/ecs_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ class ECSJobConfiguration(BaseJobConfiguration):
)
configure_cloudwatch_logs: Optional[bool] = Field(default=None)
cloudwatch_logs_options: Dict[str, str] = Field(default_factory=dict)
cloudwatch_logs_prefix: Optional[str] = Field(default=None)
network_configuration: Dict[str, Any] = Field(default_factory=dict)
stream_output: Optional[bool] = Field(default=None)
task_start_timeout_seconds: int = Field(default=300)
Expand Down Expand Up @@ -507,6 +508,16 @@ class ECSVariables(BaseVariables):
" for available options. "
),
)
cloudwatch_logs_prefix: Optional[str] = Field(
default=None,
description=(
"When `configure_cloudwatch_logs` is enabled, this setting may be used to"
" set a prefix for the log group. If not provided, the default prefix will"
" be `prefect-logs_<work_pool_name>_<deployment_id>`. If"
" `awslogs-stream-prefix` is present in `Cloudwatch logs options` this"
" setting will be ignored."
),
)

network_configuration: Dict[str, Any] = Field(
default_factory=dict,
Expand Down Expand Up @@ -1276,13 +1287,16 @@ def _prepare_task_definition(
container["environment"].remove(item)

if configuration.configure_cloudwatch_logs:
prefix = f"prefect-logs_{self._work_pool_name}_{flow_run.deployment_id}"
container["logConfiguration"] = {
"logDriver": "awslogs",
"options": {
"awslogs-create-group": "true",
"awslogs-group": "prefect",
"awslogs-region": region,
"awslogs-stream-prefix": configuration.name or "prefect",
"awslogs-stream-prefix": (
configuration.cloudwatch_logs_prefix or prefix
),
**configuration.cloudwatch_logs_options,
},
}
Expand Down
2 changes: 1 addition & 1 deletion tests/test_ecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1232,8 +1232,8 @@ async def test_cloudwatch_log_options(aws_credentials):
configure_cloudwatch_logs=True,
execution_role_arn="test",
cloudwatch_logs_options={
"awslogs-stream-prefix": "override-prefix",
"max-buffer-size": "2m",
"awslogs-stream-prefix": "override-prefix",
},
)

Expand Down
27 changes: 20 additions & 7 deletions tests/workers/test_ecs_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1324,8 +1324,20 @@ async def write_fake_log(task_arn):


@pytest.mark.usefixtures("ecs_mocks")
@pytest.mark.parametrize(
"cloudwatch_logs_options",
[
{
"awslogs-stream-prefix": "override-prefix",
"max-buffer-size": "2m",
},
{
"max-buffer-size": "2m",
},
],
)
async def test_cloudwatch_log_options(
aws_credentials: AwsCredentials, flow_run: FlowRun
aws_credentials: AwsCredentials, flow_run: FlowRun, cloudwatch_logs_options: dict
):
session = aws_credentials.get_boto3_session()
ecs_client = session.client("ecs")
Expand All @@ -1334,12 +1346,10 @@ async def test_cloudwatch_log_options(
aws_credentials=aws_credentials,
configure_cloudwatch_logs=True,
execution_role_arn="test",
cloudwatch_logs_options={
"awslogs-stream-prefix": "override-prefix",
"max-buffer-size": "2m",
},
cloudwatch_logs_options=cloudwatch_logs_options,
)
async with ECSWorker(work_pool_name="test") as worker:
work_pool_name = "test"
async with ECSWorker(work_pool_name=work_pool_name) as worker:
result = await run_then_stop_task(worker, configuration, flow_run)

assert result.status_code == 0
Expand All @@ -1349,6 +1359,9 @@ async def test_cloudwatch_log_options(
task_definition = describe_task_definition(ecs_client, task)

for container in task_definition["containerDefinitions"]:
prefix = f"prefect-logs_{work_pool_name}_{flow_run.deployment_id}"
if cloudwatch_logs_options.get("awslogs-stream-prefix"):
prefix = cloudwatch_logs_options["awslogs-stream-prefix"]
if container["name"] == ECS_DEFAULT_CONTAINER_NAME:
# Assert that the container has logging configured with user
# provided options
Expand All @@ -1358,7 +1371,7 @@ async def test_cloudwatch_log_options(
"awslogs-create-group": "true",
"awslogs-group": "prefect",
"awslogs-region": "us-east-1",
"awslogs-stream-prefix": "override-prefix",
"awslogs-stream-prefix": prefix,
"max-buffer-size": "2m",
},
}
Expand Down

0 comments on commit e6b15c2

Please sign in to comment.