Skip to content

Commit

Permalink
[AWSX][Logs forwarder] Fix a bug in setting service tag for Kinesis e…
Browse files Browse the repository at this point in the history
…vent type logs
  • Loading branch information
ge0Aja committed Jan 24, 2025
1 parent ec37696 commit e4bb942
Show file tree
Hide file tree
Showing 9 changed files with 120 additions and 95 deletions.
50 changes: 49 additions & 1 deletion aws/logs_monitoring/steps/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,20 @@
AwsCwEventSourcePrefix,
AwsS3EventSourceKeyword,
)
from settings import DD_CUSTOM_TAGS, DD_SERVICE, DD_SOURCE
from settings import (
AWS_STRING,
DD_CUSTOM_TAGS,
DD_FORWARDER_VERSION,
DD_SERVICE,
DD_SOURCE,
DD_TAGS,
FUNCTIONVERSION_STRING,
FORWARDERNAME_STRING,
FORWARDERMEMSIZE_STRING,
FORWARDERVERSION_STRING,
INVOKEDFUNCTIONARN_STRING,
SOURCECATEGORY_STRING,
)

CLOUDTRAIL_REGEX = re.compile(
"\d+_CloudTrail(|-Digest|-Insight)_\w{2}(|-gov|-cn)-\w{4,9}-\d_(|.+)\d{8}T\d{4,6}Z(|.+).json.gz$",
Expand Down Expand Up @@ -101,3 +114,38 @@ def merge_dicts(a, b, path=None):
else:
a[key] = b[key]
return a


def generate_metadata(context):
metadata = {
SOURCECATEGORY_STRING: AWS_STRING,
AWS_STRING: {
FUNCTIONVERSION_STRING: context.function_version,
INVOKEDFUNCTIONARN_STRING: context.invoked_function_arn,
},
}
# Add custom tags here by adding new value with the following format "key1:value1, key2:value2" - might be subject to modifications
dd_custom_tags_data = generate_custom_tags(context)
metadata[DD_CUSTOM_TAGS] = ",".join(
filter(
None,
[
DD_TAGS,
",".join(
["{}:{}".format(k, v) for k, v in dd_custom_tags_data.items()]
),
],
)
)

return metadata


def generate_custom_tags(context):
dd_custom_tags_data = {
FORWARDERNAME_STRING: context.function_name.lower(),
FORWARDERMEMSIZE_STRING: context.memory_limit_in_mb,
FORWARDERVERSION_STRING: DD_FORWARDER_VERSION,
}

return dd_custom_tags_data
6 changes: 3 additions & 3 deletions aws/logs_monitoring/steps/enrichment.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def add_metadata_to_lambda_log(event, cache_layer):
# If not set during parsing or has a default value
# then set the service tag from lambda tags cache or using the function name
# otherwise, remove the service tag from the custom lambda tags if exists to avoid duplication
if not event[DD_SERVICE] or event[DD_SERVICE] == event[DD_SOURCE]:
if not event.get(DD_SERVICE) or event.get(DD_SERVICE) == event.get(DD_SOURCE):
service_tag = next(
(tag for tag in custom_lambda_tags if tag.startswith("service:")),
f"service:{function_name}",
Expand All @@ -86,15 +86,15 @@ def add_metadata_to_lambda_log(event, cache_layer):
(tag for tag in custom_lambda_tags if tag.startswith("env:")), None
)
if custom_env_tag is not None:
event[DD_CUSTOM_TAGS] = event[DD_CUSTOM_TAGS].replace("env:none", "")
event[DD_CUSTOM_TAGS] = event.get(DD_CUSTOM_TAGS, "").replace("env:none", "")

tags += custom_lambda_tags

# Dedup tags, so we don't end up with functionname twice
tags = list(set(tags))
tags.sort() # Keep order deterministic

event[DD_CUSTOM_TAGS] = ",".join([event[DD_CUSTOM_TAGS]] + tags)
event[DD_CUSTOM_TAGS] = ",".join([event.get(DD_CUSTOM_TAGS)] + tags)


def get_enriched_lambda_log_tags(log_event, cache_layer):
Expand Down
8 changes: 6 additions & 2 deletions aws/logs_monitoring/steps/handlers/awslogs_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from steps.common import (
add_service_tag,
generate_metadata,
merge_dicts,
parse_event_source,
)
Expand All @@ -33,7 +34,9 @@ def __init__(self, context, cache_layer):
self.context = context
self.cache_layer = cache_layer

def handle(self, event, metadata):
def handle(self, event):
# Generate metadata
metadata = generate_metadata(self.context)
# Get logs
logs = self.extract_logs(event)
# Build aws attributes
Expand Down Expand Up @@ -65,7 +68,8 @@ def handle(self, event, metadata):
self.process_eks_logs(metadata, aws_attributes)
# Create and send structured logs to Datadog
for log in logs["logEvents"]:
yield merge_dicts(log, aws_attributes.to_dict())
merged = merge_dicts(log, aws_attributes.to_dict())
yield merge_dicts(merged, metadata)

@staticmethod
def extract_logs(event):
Expand Down
77 changes: 20 additions & 57 deletions aws/logs_monitoring/steps/parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,14 @@
from steps.handlers.awslogs_handler import AwsLogsHandler
from steps.handlers.s3_handler import S3EventHandler
from steps.common import (
merge_dicts,
generate_metadata,
get_service_from_tags_and_remove_duplicates,
merge_dicts,
)
from steps.enums import AwsEventType, AwsEventTypeKeyword, AwsEventSource
from settings import (
AWS_STRING,
FUNCTIONVERSION_STRING,
INVOKEDFUNCTIONARN_STRING,
SOURCECATEGORY_STRING,
FORWARDERNAME_STRING,
FORWARDERMEMSIZE_STRING,
FORWARDERVERSION_STRING,
DD_TAGS,
DD_SOURCE,
DD_CUSTOM_TAGS,
DD_SERVICE,
DD_FORWARDER_VERSION,
)

logger = logging.getLogger()
Expand All @@ -37,21 +28,19 @@
def parse(event, context, cache_layer):
"""Parse Lambda input to normalized events"""
metadata = generate_metadata(context)
event_type = AwsEventType.UNKNOWN
try:
# Route to the corresponding parser
event_type = parse_event_type(event)
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"Parsed event type: {event_type}")
set_forwarder_telemetry_tags(context, event_type)
match event_type:
case AwsEventType.AWSLOGS:
aws_handler = AwsLogsHandler(context, cache_layer)
events = aws_handler.handle(event)
return collect_and_count(events)
case AwsEventType.S3:
s3_handler = S3EventHandler(context, metadata, cache_layer)
events = s3_handler.handle(event)
case AwsEventType.AWSLOGS:
aws_handler = AwsLogsHandler(context, cache_layer)
# regenerate a metadata object for each event
metadata = generate_metadata(context)
events = aws_handler.handle(event, metadata)
case AwsEventType.EVENTS:
events = cwevent_handler(event, metadata)
case AwsEventType.SNS:
Expand All @@ -67,21 +56,9 @@ def parse(event, context, cache_layer):
)
events = [err_message]

set_forwarder_telemetry_tags(context, event_type)

return normalize_events(events, metadata)


def generate_custom_tags(context):
dd_custom_tags_data = {
FORWARDERNAME_STRING: context.function_name.lower(),
FORWARDERMEMSIZE_STRING: context.memory_limit_in_mb,
FORWARDERVERSION_STRING: DD_FORWARDER_VERSION,
}

return dd_custom_tags_data


def parse_event_type(event):
if records := event.get(str(AwsEventTypeKeyword.RECORDS), None):
record = records[0]
Expand Down Expand Up @@ -138,36 +115,10 @@ def reformat_record(record):

awslogs_handler = AwsLogsHandler(context, cache_layer)
return itertools.chain.from_iterable(
awslogs_handler.handle(reformat_record(r), generate_metadata(context))
for r in event["Records"]
awslogs_handler.handle(reformat_record(r)) for r in event["Records"]
)


def generate_metadata(context):
metadata = {
SOURCECATEGORY_STRING: AWS_STRING,
AWS_STRING: {
FUNCTIONVERSION_STRING: context.function_version,
INVOKEDFUNCTIONARN_STRING: context.invoked_function_arn,
},
}
# Add custom tags here by adding new value with the following format "key1:value1, key2:value2" - might be subject to modifications
dd_custom_tags_data = generate_custom_tags(context)
metadata[DD_CUSTOM_TAGS] = ",".join(
filter(
None,
[
DD_TAGS,
",".join(
["{}:{}".format(k, v) for k, v in dd_custom_tags_data.items()]
),
],
)
)

return metadata


def normalize_events(events, metadata):
normalized = []
events_counter = 0
Expand All @@ -186,3 +137,15 @@ def normalize_events(events, metadata):
send_event_metric("incoming_events", events_counter)

return normalized


def collect_and_count(events):
collected = []
counter = 0
for event in events:
counter += 1
collected.append(event)

send_event_metric("incoming_events", counter)

return collected
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,17 @@
"logGroup": "/aws/rds/instance/datadog/postgresql",
"logStream": "datadog.0",
"owner": "123456789012"
}
},
"function_version": 0,
"invoked_function_arn": "invoked_function_arn"
},
"ddsource": "cloudwatch",
"ddsourcecategory": "aws",
"ddtags": "forwardername:function_name,forwarder_memorysize:10,forwarder_version:4.0.1,test_tag_key:test_tag_value",
"host": "/aws/rds/instance/datadog/postgresql",
"id": "31953106606966983378809025079804211143289615424298221568",
"message": "2021-01-02 03:04:05 UTC::@:[5306]:LOG: database system is ready to accept connections",
"service": "cloudwatch",
"timestamp": 1609556645000
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,17 @@
"logGroup": "test/logs",
"logStream": "states/logs-to-traces-sequential/2022-11-10-15-50/7851b2d9",
"owner": "425362996713"
}
},
"function_version": 0,
"invoked_function_arn": "invoked_function_arn"
},
"ddsource": "stepfunction",
"ddsourcecategory": "aws",
"ddtags": "forwardername:function_name,forwarder_memorysize:10,forwarder_version:4.0.1,test_tag_key:test_tag_value,dd_step_functions_trace_enabled:true",
"host": "arn:aws:states:us-east-1:12345678910:stateMachine:StepFunction2",
"id": "37199773595581154154810589279545129148442535997644275712",
"message": "{\"id\": \"1\",\"type\": \"ExecutionStarted\",\"details\": {\"input\": \"{}\",\"inputDetails\": {\"truncated\": \"false\"},\"roleArn\": \"arn:aws:iam::12345678910:role/service-role/StepFunctions-test-role-a0iurr4pt\"},\"previous_event_id\": \"0\",\"event_timestamp\": \"1716992192441\",\"execution_arn\": \"arn:aws:states:us-east-1:12345678910:execution:StepFunction2:ccccccc-d1da-4c38-b32c-2b6b07d713fa\",\"redrive_count\": \"0\"}",
"service": "stepfunction",
"timestamp": 1668095539607
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,17 @@
"logGroup": "/aws/vendedlogs/states/logs-to-traces-sequential-Logs",
"logStream": "states/logs-to-traces-sequential/2022-11-10-15-50/7851b2d9",
"owner": "425362996713"
}
},
"function_version": 0,
"invoked_function_arn": "invoked_function_arn"
},
"ddsource": "stepfunction",
"ddsourcecategory": "aws",
"ddtags": "forwardername:function_name,forwarder_memorysize:10,forwarder_version:4.0.1,test_tag_key:test_tag_value,dd_step_functions_trace_enabled:true",
"host": "arn:aws:states:us-east-1:12345678910:stateMachine:StepFunction1",
"id": "37199773595581154154810589279545129148442535997644275712",
"message": "{\"id\": \"1\",\"type\": \"ExecutionStarted\",\"details\": {\"input\": \"{}\",\"inputDetails\": {\"truncated\": \"false\"},\"roleArn\": \"arn:aws:iam::12345678910:role/service-role/StepFunctions-test-role-a0iurr4pt\"},\"previous_event_id\": \"0\",\"event_timestamp\": \"1716992192441\",\"execution_arn\": \"arn:aws:states:us-east-1:12345678910:execution:StepFunction1:ccccccc-d1da-4c38-b32c-2b6b07d713fa\",\"redrive_count\": \"0\"}",
"service": "stepfunction",
"timestamp": 1668095539607
}
]
Loading

0 comments on commit e4bb942

Please sign in to comment.