Skip to content

Commit

Permalink
[AWS log forwarder] Generate new objects per each received event reco…
Browse files Browse the repository at this point in the history
…rd in AWS logs handler (#888)

* [AWS log forwarder] Generate new objects per each received event in AWS logs handler

The logs forwarder shares the same AWS log handler for multiple event types i.e. Kinesis and Cloudwatch logs.
For Kinesis event types, there's a chance of receiving log records from  several log group sources as the order is not guaranteed.
Therefore, setting unified attributes such as log_group or log_source per event leads to a random behavior when setting the host, service, source and other metadata parameters.
To prevent this random behavior, regenerate a new metadata and attributes object per each received event record in the logs handler.

* update codeowners file for aws logs forwarder
  • Loading branch information
ge0Aja authored Jan 22, 2025
1 parent 0ebb0b4 commit 82f183b
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 144 deletions.
4 changes: 2 additions & 2 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Azure Integrations
azure/ @DataDog/azure-integrations
# AWS Integrations
aws/ @DataDog/aws-context
aws/*/*template.yaml @DataDog/aws-context @DataDog/aws-customer-experience
aws/ @DataDog/aws-ints-context
aws/*/*template.yaml @DataDog/aws-ints-context @DataDog/aws-ints-customer-experience
142 changes: 70 additions & 72 deletions aws/logs_monitoring/steps/handlers/awslogs_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,45 +29,43 @@


class AwsLogsHandler:
def __init__(self, context, metadata, cache_layer):
def __init__(self, context, cache_layer):
self.context = context
self.metadata = metadata
self.cache_layer = cache_layer
self.aws_attributes = None

def handle(self, event):
def handle(self, event, metadata):
# Get logs
logs = self.extract_logs(event)
# Build aws attributes
self.aws_attributes = AwsAttributes(
aws_attributes = AwsAttributes(
logs.get("logGroup"),
logs.get("logStream"),
logs.get("logEvents"),
logs.get("owner"),
)
# Set account and region from lambda function ARN
self.set_account_region()
self.set_account_region(aws_attributes)
# Set the source on the logs
self.set_source(event)
self.set_source(event, metadata, aws_attributes)
# Add custom tags from cache
self.add_cloudwatch_tags_from_cache()
self.add_cloudwatch_tags_from_cache(metadata, aws_attributes)
# Set service from custom tags, which may include the tags set on the log group
# Returns DD_SOURCE by default
add_service_tag(self.metadata)
add_service_tag(metadata)
# Set host as log group where cloudwatch is source
self.set_host()
self.set_host(metadata, aws_attributes)
# For Lambda logs we want to extract the function name,
# then rebuild the arn of the monitored lambda using that name.
if self.metadata[DD_SOURCE] == str(AwsEventSource.LAMBDA):
self.process_lambda_logs()
if metadata[DD_SOURCE] == str(AwsEventSource.LAMBDA):
self.process_lambda_logs(metadata, aws_attributes)
# The EKS log group contains various sources from the K8S control plane.
# In order to have these automatically trigger the correct pipelines they
# need to send their events with the correct log source.
if self.metadata[DD_SOURCE] == str(AwsEventSource.EKS):
self.process_eks_logs()
if metadata[DD_SOURCE] == str(AwsEventSource.EKS):
self.process_eks_logs(metadata, aws_attributes)
# Create and send structured logs to Datadog
for log in logs["logEvents"]:
yield merge_dicts(log, self.aws_attributes.to_dict())
yield merge_dicts(log, aws_attributes.to_dict())

@staticmethod
def extract_logs(event):
Expand All @@ -79,17 +77,17 @@ def extract_logs(event):
data = b"".join(BufferedReader(decompress_stream))
return json.loads(data)

def set_account_region(self):
def set_account_region(self, aws_attributes):
try:
self.aws_attributes.set_account_region(self.context.invoked_function_arn)
aws_attributes.set_account_region(self.context.invoked_function_arn)
except Exception as e:
logger.error(
"Unable to set account and region from lambda function ARN: %s" % e
)

def set_source(self, event):
log_group = self.aws_attributes.get_log_group()
log_stream = self.aws_attributes.get_log_stream()
def set_source(self, event, metadata, aws_attributes):
log_group = aws_attributes.get_log_group()
log_stream = aws_attributes.get_log_stream()
source = log_group if log_group else str(AwsEventSource.CLOUDWATCH)
# Use the logStream to identify if this is a CloudTrail, TransitGateway, or Bedrock event
# i.e. 123456779121_CloudTrail_us-east-1
Expand All @@ -99,100 +97,98 @@ def set_source(self, event):
source = str(AwsEventSource.TRANSITGATEWAY)
if str(AwsCwEventSourcePrefix.BEDROCK) in log_stream:
source = str(AwsEventSource.BEDROCK)
self.metadata[DD_SOURCE] = parse_event_source(event, source)
metadata[DD_SOURCE] = parse_event_source(event, source)

# Special handling for customized log group of Lambda Functions and Step Functions
# Multiple functions can share one single customized log group. Need to parse logStream name to determine
# Need to place the handling of customized log group at the bottom so that it can correct the source for some edge cases
if is_lambda_customized_log_group(log_stream):
self.metadata[DD_SOURCE] = str(AwsEventSource.LAMBDA)
metadata[DD_SOURCE] = str(AwsEventSource.LAMBDA)
# Regardless of whether the log group is customized, the corresponding log stream starts with 'states/'."
if is_step_functions_log_group(log_stream):
self.metadata[DD_SOURCE] = str(AwsEventSource.STEPFUNCTION)
metadata[DD_SOURCE] = str(AwsEventSource.STEPFUNCTION)

def add_cloudwatch_tags_from_cache(self):
log_group_arn = self.aws_attributes.get_log_group_arn()
def add_cloudwatch_tags_from_cache(self, metadata, aws_attributes):
log_group_arn = aws_attributes.get_log_group_arn()
formatted_tags = self.cache_layer.get_cloudwatch_log_group_tags_cache().get(
log_group_arn
)
if len(formatted_tags) > 0:
self.metadata[DD_CUSTOM_TAGS] = (
metadata[DD_CUSTOM_TAGS] = (
",".join(formatted_tags)
if not self.metadata[DD_CUSTOM_TAGS]
else self.metadata[DD_CUSTOM_TAGS] + "," + ",".join(formatted_tags)
if not metadata[DD_CUSTOM_TAGS]
else metadata[DD_CUSTOM_TAGS] + "," + ",".join(formatted_tags)
)

def set_host(self):
if src := self.metadata.get(DD_SOURCE, None):
def set_host(self, metadata, aws_attributes):
if src := metadata.get(DD_SOURCE, None):
metadata_source = AwsEventSource._value2member_map_.get(src)
else:
metadata_source = AwsEventSource.CLOUDWATCH
metadata_host = self.metadata.get(DD_HOST, None)
log_group = self.aws_attributes.get_log_group()
metadata_host = metadata.get(DD_HOST, None)
log_group = aws_attributes.get_log_group()

if metadata_host is None:
self.metadata[DD_HOST] = log_group
metadata[DD_HOST] = log_group

match metadata_source:
case AwsEventSource.CLOUDWATCH:
self.metadata[DD_HOST] = log_group
metadata[DD_HOST] = log_group
case AwsEventSource.APPSYNC:
self.metadata[DD_HOST] = log_group.split("/")[-1]
metadata[DD_HOST] = log_group.split("/")[-1]
case AwsEventSource.VERIFIED_ACCESS:
self.handle_verified_access_source()
self.handle_verified_access_source(metadata, aws_attributes)
case AwsEventSource.STEPFUNCTION:
self.handle_step_function_source()
self.handle_step_function_source(metadata, aws_attributes)

def handle_step_function_source(self):
state_machine_arn = self.get_state_machine_arn()
def handle_verified_access_source(self, metadata, aws_attributes):
try:
message = json.loads(aws_attributes.get_log_events()[0].get("message"))
metadata[DD_HOST] = message.get("http_request").get("url").get("hostname")
except Exception as e:
logger.debug("Unable to set verified-access log host: %s" % e)

def handle_step_function_source(self, metadata, aws_attributes):
state_machine_arn = self.get_state_machine_arn(aws_attributes)
if not state_machine_arn:
return

self.metadata[DD_HOST] = state_machine_arn
metadata[DD_HOST] = state_machine_arn
formatted_stepfunctions_tags = (
self.cache_layer.get_step_functions_tags_cache().get(state_machine_arn)
)
if len(formatted_stepfunctions_tags) > 0:
self.metadata[DD_CUSTOM_TAGS] = (
metadata[DD_CUSTOM_TAGS] = (
",".join(formatted_stepfunctions_tags)
if not self.metadata[DD_CUSTOM_TAGS]
else self.metadata[DD_CUSTOM_TAGS]
if not metadata[DD_CUSTOM_TAGS]
else metadata[DD_CUSTOM_TAGS]
+ ","
+ ",".join(formatted_stepfunctions_tags)
)

if os.environ.get("DD_STEP_FUNCTIONS_TRACE_ENABLED", "false").lower() == "true":
self.metadata[DD_CUSTOM_TAGS] = ",".join(
[self.metadata.get(DD_CUSTOM_TAGS, [])]
metadata[DD_CUSTOM_TAGS] = ",".join(
[metadata.get(DD_CUSTOM_TAGS, [])]
+ ["dd_step_functions_trace_enabled:true"]
)

def handle_verified_access_source(self):
try:
message = json.loads(self.aws_attributes.get_log_events()[0].get("message"))
self.metadata[DD_HOST] = (
message.get("http_request").get("url").get("hostname")
)
except Exception as e:
logger.debug("Unable to set verified-access log host: %s" % e)

def process_eks_logs(self):
log_stream = self.aws_attributes.get_log_stream()
def process_eks_logs(self, metadata, aws_attributes):
log_stream = aws_attributes.get_log_stream()
if log_stream.startswith("kube-apiserver-audit-"):
self.metadata[DD_SOURCE] = "kubernetes.audit"
metadata[DD_SOURCE] = "kubernetes.audit"
elif log_stream.startswith("kube-scheduler-"):
self.metadata[DD_SOURCE] = "kube_scheduler"
metadata[DD_SOURCE] = "kube_scheduler"
elif log_stream.startswith("kube-apiserver-"):
self.metadata[DD_SOURCE] = "kube-apiserver"
metadata[DD_SOURCE] = "kube-apiserver"
elif log_stream.startswith("kube-controller-manager-"):
self.metadata[DD_SOURCE] = "kube-controller-manager"
metadata[DD_SOURCE] = "kube-controller-manager"
elif log_stream.startswith("authenticator-"):
self.metadata[DD_SOURCE] = "aws-iam-authenticator"
metadata[DD_SOURCE] = "aws-iam-authenticator"
# In case the conditions above don't match we maintain eks as the source

def get_state_machine_arn(self):
def get_state_machine_arn(self, aws_attributes):
try:
message = json.loads(self.aws_attributes.get_log_events()[0].get("message"))
message = json.loads(aws_attributes.get_log_events()[0].get("message"))
if message.get("execution_arn") is not None:
execution_arn = message["execution_arn"]
arn_tokens = re.split(r"[:/\\]", execution_arn)
Expand All @@ -203,8 +199,10 @@ def get_state_machine_arn(self):
return ""

# Lambda logs can be from either default or customized log group
def process_lambda_logs(self):
lower_cased_lambda_function_name = self.get_lower_cased_lambda_function_name()
def process_lambda_logs(self, metadata, aws_attributes):
lower_cased_lambda_function_name = self.get_lower_cased_lambda_function_name(
aws_attributes
)

if lower_cased_lambda_function_name is None:
return
Expand All @@ -218,23 +216,23 @@ def process_lambda_logs(self):
arn_prefix + "function:" + lower_cased_lambda_function_name
)
# Add the lowe_rcased arn as a log attribute
self.aws_attributes.set_lambda_arn(lower_cased_lambda_arn)
aws_attributes.set_lambda_arn(lower_cased_lambda_arn)
env_tag_exists = (
self.metadata[DD_CUSTOM_TAGS].startswith("env:")
or ",env:" in self.metadata[DD_CUSTOM_TAGS]
metadata[DD_CUSTOM_TAGS].startswith("env:")
or ",env:" in metadata[DD_CUSTOM_TAGS]
)
# If there is no env specified, default to env:none
if not env_tag_exists:
self.metadata[DD_CUSTOM_TAGS] += ",env:none"
metadata[DD_CUSTOM_TAGS] += ",env:none"

# The lambda function name can be inferred from either a customized logstream name, or a loggroup name
def get_lower_cased_lambda_function_name(self):
def get_lower_cased_lambda_function_name(self, aws_attributes):
# function name parsed from logstream is preferred for handling some edge cases
function_name = get_lambda_function_name_from_logstream_name(
self.aws_attributes.get_log_stream()
aws_attributes.get_log_stream()
)
if function_name is None:
log_group_parts = self.aws_attributes.get_log_group().split("/lambda/")
log_group_parts = aws_attributes.get_log_group().split("/lambda/")
if len(log_group_parts) > 1:
function_name = log_group_parts[1]
else:
Expand Down
65 changes: 34 additions & 31 deletions aws/logs_monitoring/steps/parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,16 @@ def parse(event, context, cache_layer):
s3_handler = S3EventHandler(context, metadata, cache_layer)
events = s3_handler.handle(event)
case AwsEventType.AWSLOGS:
aws_handler = AwsLogsHandler(context, metadata, cache_layer)
events = aws_handler.handle(event)
aws_handler = AwsLogsHandler(context, cache_layer)
# regenerate a metadata object for each event
metadata = generate_metadata(context)
events = aws_handler.handle(event, metadata)
case AwsEventType.EVENTS:
events = cwevent_handler(event, metadata)
case AwsEventType.SNS:
events = sns_handler(event, metadata)
case AwsEventType.KINESIS:
events = kinesis_awslogs_handler(event, context, metadata, cache_layer)
events = kinesis_awslogs_handler(event, context, cache_layer)
case _:
events = ["Parsing: Unsupported event type"]
except Exception as e:
Expand All @@ -70,31 +72,6 @@ def parse(event, context, cache_layer):
return normalize_events(events, metadata)


def generate_metadata(context):
metadata = {
SOURCECATEGORY_STRING: AWS_STRING,
AWS_STRING: {
FUNCTIONVERSION_STRING: context.function_version,
INVOKEDFUNCTIONARN_STRING: context.invoked_function_arn,
},
}
# Add custom tags here by adding new value with the following format "key1:value1, key2:value2" - might be subject to modifications
dd_custom_tags_data = generate_custom_tags(context)
metadata[DD_CUSTOM_TAGS] = ",".join(
filter(
None,
[
DD_TAGS,
",".join(
["{}:{}".format(k, v) for k, v in dd_custom_tags_data.items()]
),
],
)
)

return metadata


def generate_custom_tags(context):
dd_custom_tags_data = {
FORWARDERNAME_STRING: context.function_name.lower(),
Expand Down Expand Up @@ -155,16 +132,42 @@ def sns_handler(event, metadata):


# Handle CloudWatch logs from Kinesis
def kinesis_awslogs_handler(event, context, metadata, cache_layer):
def kinesis_awslogs_handler(event, context, cache_layer):
def reformat_record(record):
return {"awslogs": {"data": record["kinesis"]["data"]}}

awslogs_handler = AwsLogsHandler(context, metadata, cache_layer)
awslogs_handler = AwsLogsHandler(context, cache_layer)
return itertools.chain.from_iterable(
awslogs_handler.handle(reformat_record(r)) for r in event["Records"]
awslogs_handler.handle(reformat_record(r), generate_metadata(context))
for r in event["Records"]
)


def generate_metadata(context):
metadata = {
SOURCECATEGORY_STRING: AWS_STRING,
AWS_STRING: {
FUNCTIONVERSION_STRING: context.function_version,
INVOKEDFUNCTIONARN_STRING: context.invoked_function_arn,
},
}
# Add custom tags here by adding new value with the following format "key1:value1, key2:value2" - might be subject to modifications
dd_custom_tags_data = generate_custom_tags(context)
metadata[DD_CUSTOM_TAGS] = ",".join(
filter(
None,
[
DD_TAGS,
",".join(
["{}:{}".format(k, v) for k, v in dd_custom_tags_data.items()]
),
],
)
)

return metadata


def normalize_events(events, metadata):
normalized = []
events_counter = 0
Expand Down
Loading

0 comments on commit 82f183b

Please sign in to comment.