feat: Add script to delete CloudWatch log groups based on age. #64
Annotations
10 errors and 2 warnings
/home/runner/work/aws-toolbox/aws-toolbox/ec2/delete_tagged_security_groups.py#L23
def revoke_permissions(ec2_client, group_id, permissions):
for sg in permissions:
if sg.get("IpPermissions", []):
for rule in sg.get("IpPermissions", []):
- ec2_client.revoke_security_group_ingress(GroupId=group_id, IpPermissions=[rule])
- print("Revoked ingress IP permissions for Security Group ID: {}".format(group_id))
+ ec2_client.revoke_security_group_ingress(
+ GroupId=group_id, IpPermissions=[rule]
+ )
+ print(
+ "Revoked ingress IP permissions for Security Group ID: {}".format(
+ group_id
+ )
+ )
if sg.get("IpPermissionsEgress", []):
for rule in sg.get("IpPermissionsEgress", []):
- ec2_client.revoke_security_group_egress(GroupId=group_id, IpPermissions=[rule])
- print("Revoked egress IP permissions for Security Group ID: {}".format(group_id))
+ ec2_client.revoke_security_group_egress(
+ GroupId=group_id, IpPermissions=[rule]
+ )
+ print(
+ "Revoked egress IP permissions for Security Group ID: {}".format(
+ group_id
+ )
+ )
def delete_security_group(ec2_client, group_id):
ec2_client.delete_security_group(GroupId=group_id)
print("Deleted Security Group ID: {}".format(group_id))
|
/home/runner/work/aws-toolbox/aws-toolbox/ec2/delete_tagged_security_groups.py#L48
# Modify the tag key and value to your own liking
tag_key = "ManagedByAmazonSageMakerResource"
tag_value_contains = f"arn:aws:sagemaker:{aws_region}:{account_id}:domain"
# Find security groups
- tagged_security_groups = find_security_groups(ec2_client, tag_key, tag_value_contains)
+ tagged_security_groups = find_security_groups(
+ ec2_client, tag_key, tag_value_contains
+ )
# Iterate through security groups, revoke permissions, and delete
for sg in tagged_security_groups:
group_id = sg["GroupId"]
# Fetch the current ingress and egress IP permissions
- sg = ec2_client.describe_security_groups(Filters=[{"Name": "group-id", "Values": [group_id]}]).get(
- "SecurityGroups", []
- )
+ sg = ec2_client.describe_security_groups(
+ Filters=[{"Name": "group-id", "Values": [group_id]}]
+ ).get("SecurityGroups", [])
# Revoke permissions
revoke_permissions(ec2_client, group_id, sg)
# Delete the security group
|
/home/runner/work/aws-toolbox/aws-toolbox/cloudwatch/delete_cloudwatch_log_groups.py#L63
to_delete_groups.append((group["logGroupName"], age))
# Print kept groups
print("Log groups to keep:")
for name, age in kept_groups:
- print(f"{'[DRY RUN] ' if dry_run else ''}Keeping log group: {name} (Age: {age})")
+ print(
+ f"{'[DRY RUN] ' if dry_run else ''}Keeping log group: {name} (Age: {age})"
+ )
# Print groups to delete
print("\nLog groups to delete:")
for name, age in to_delete_groups:
- print(f"{'[DRY RUN] Would delete' if dry_run else 'Deleting'} log group: {name} (Age: {age})")
+ print(
+ f"{'[DRY RUN] Would delete' if dry_run else 'Deleting'} log group: {name} (Age: {age})"
+ )
print("\nSummary:")
print(f"Total log groups: {total_groups}")
print(f"Log groups kept: {len(kept_groups)}")
print(f"Log groups to be deleted: {len(to_delete_groups)}")
|
/home/runner/work/aws-toolbox/aws-toolbox/cloudwatch/delete_cloudwatch_log_groups.py#L86
print(f"Access denied when trying to delete log group: {name}")
failed_deletions.append(name)
else:
raise # Re-raise the exception if it's not an AccessDeniedException
- print(f"Log groups actually deleted: {len(to_delete_groups) - len(failed_deletions)}")
+ print(
+ f"Log groups actually deleted: {len(to_delete_groups) - len(failed_deletions)}"
+ )
if failed_deletions:
- print(f"Failed to delete {len(failed_deletions)} log groups due to access denial:")
+ print(
+ f"Failed to delete {len(failed_deletions)} log groups due to access denial:"
+ )
for name in failed_deletions:
print(f" - {name}")
def main():
- parser = argparse.ArgumentParser(description="Delete CloudWatch log groups based on retention.")
+ parser = argparse.ArgumentParser(
+ description="Delete CloudWatch log groups based on retention."
+ )
parser.add_argument(
"--keep",
type=parse_time_period,
help="Keep log groups newer than this period (e.g., '5 days', '2 weeks', '1 months')",
)
- parser.add_argument("--dry-run", action="store_true", help="Perform a dry run without actually deleting log groups")
+ parser.add_argument(
+ "--dry-run",
+ action="store_true",
+ help="Perform a dry run without actually deleting log groups",
+ )
args = parser.parse_args()
client = boto3.client("logs")
process_log_groups(client, args.keep, args.dry_run)
|
/home/runner/work/aws-toolbox/aws-toolbox/cloudwatch/set_cloudwatch_logs_retention.py#L42
def update_log_group_retention(group, retention):
try:
if "retentionInDays" not in group or group["retentionInDays"] != retention:
- cloudwatch.put_retention_policy(logGroupName=group["logGroupName"], retentionInDays=retention)
+ cloudwatch.put_retention_policy(
+ logGroupName=group["logGroupName"], retentionInDays=retention
+ )
# Verify the update
- updated_group = cloudwatch.describe_log_groups(logGroupNamePrefix=group["logGroupName"])["logGroups"][0]
+ updated_group = cloudwatch.describe_log_groups(
+ logGroupNamePrefix=group["logGroupName"]
+ )["logGroups"][0]
if updated_group.get("retentionInDays") == retention:
return f"Successfully updated retention for: {group['logGroupName']}"
else:
return f"Failed to update retention for: {group['logGroupName']}. Current retention: {updated_group.get('retentionInDays')}"
else:
- return (
- f"CloudWatch Loggroup: {group['logGroupName']} already has the specified retention of {retention} days."
- )
+ return f"CloudWatch Loggroup: {group['logGroupName']} already has the specified retention of {retention} days."
except botocore.exceptions.ClientError as e:
return f"Error updating {group['logGroupName']}: {e}"
def count_retention_periods(cloudwatch_log_groups):
|
/home/runner/work/aws-toolbox/aws-toolbox/cloudwatch/set_cloudwatch_logs_retention.py#L96
for group in cloudwatch_log_groups
if "retentionInDays" not in group or group["retentionInDays"] != retention
]
if not groups_to_update:
- print(f"All log groups already have the specified retention of {retention} days.")
+ print(
+ f"All log groups already have the specified retention of {retention} days."
+ )
return
print(f"Log groups that need to be updated to {retention} days retention:")
for group in groups_to_update:
current_retention = group.get("retentionInDays", "Not set")
|
/home/runner/work/aws-toolbox/aws-toolbox/cloudwatch/set_cloudwatch_logs_retention.py#L113
updated_count = 0
failed_count = 0
with ThreadPoolExecutor(max_workers=10) as executor:
future_to_group = {
- executor.submit(update_log_group_retention, group, retention): group for group in groups_to_update
+ executor.submit(update_log_group_retention, group, retention): group
+ for group in groups_to_update
}
for future in as_completed(future_to_group):
result = future.result()
print(result)
if "Successfully updated" in result:
|
/home/runner/work/aws-toolbox/aws-toolbox/cloudwatch/set_cloudwatch_logs_retention.py#L160
3653,
],
help="Enter the retention in days for the CloudWatch Logs.",
)
parser.add_argument(
- "--print-retention-counts", action="store_true", help="Print the number of log groups for each retention period"
+ "--print-retention-counts",
+ action="store_true",
+ help="Print the number of log groups for each retention period",
)
if len(sys.argv) == 1:
parser.print_help(sys.stderr)
sys.exit(1)
args = parser.parse_args()
if args.print_retention_counts and args.retention is not None:
- parser.error("--print-retention-counts cannot be used with --retention argument")
+ parser.error(
+ "--print-retention-counts cannot be used with --retention argument"
+ )
cloudwatch_set_retention(args)
|
/home/runner/work/aws-toolbox/aws-toolbox/efs/delete_tagged_efs.py#L38
# Delete the mount targets for the EFS filesystem
delete_mount_targets(efs_client, filesystem_id)
# Wait with exponential backoff
delay = (2**current_retry) + random.uniform(0, 1)
- print(f"Waiting for {delay} seconds before attempting to delete the EFS filesystem.")
+ print(
+ f"Waiting for {delay} seconds before attempting to delete the EFS filesystem."
+ )
time.sleep(delay)
# Delete the specified EFS filesystem
efs_client.delete_file_system(FileSystemId=filesystem_id)
print("Deleted EFS Filesystem: {}".format(filesystem_id))
|
/home/runner/work/aws-toolbox/aws-toolbox/ecs/delete_all_inactive_task_definitions.py#L27
client.delete_task_definitions(taskDefinitions=[arn])
print(f"Deleted task definition {arn}")
break # Break the loop if deletion was successful
except client.exceptions.ClientException as e:
if "Throttling" in str(e): # Check for throttling in the error message
- print(f"Throttling exception when deleting {arn}: {e}, retrying in {backoff} seconds...")
+ print(
+ f"Throttling exception when deleting {arn}: {e}, retrying in {backoff} seconds..."
+ )
time.sleep(backoff)
backoff *= 2 # Exponential backoff
else:
print(f"Client exception when deleting task definition {arn}: {e}")
break # Break the loop for other client exceptions
except client.exceptions.ServerException as e:
if "Throttling" in str(e): # Check for throttling in the error message
- print(f"Throttling exception when deleting {arn}: {e}, retrying in {backoff} seconds...")
+ print(
+ f"Throttling exception when deleting {arn}: {e}, retrying in {backoff} seconds..."
+ )
time.sleep(backoff)
backoff *= 2 # Exponential backoff
else:
print(f"Server exception when deleting task definition {arn}: {e}")
break # Break the loop for other server exceptions
|
Run linters
The following actions uses node12 which is deprecated and will be forced to run on node16: actions/checkout@v2, actions/setup-python@v1, wearerequired/lint-action@v1. For more info: https://github.blog/changelog/2023-06-13-github-actions-all-actions-will-run-on-node16-instead-of-node12-by-default/
|
Run linters
The following actions uses Node.js version which is deprecated and will be forced to run on node20: actions/checkout@v2, actions/setup-python@v1, wearerequired/lint-action@v1. For more info: https://github.blog/changelog/2024-03-07-github-actions-all-actions-will-run-on-node20-instead-of-node16-by-default/
|