From 82f183b5f1410b389b96ec5879ad41ec45a9da9e Mon Sep 17 00:00:00 2001 From: Georgi Date: Wed, 22 Jan 2025 15:02:43 +0100 Subject: [PATCH] [AWS log forwarder] Generate new objects per each received event record in AWS logs handler (#888) * [AWS log forwarder] Generate new objects per each received event in AWS logs handler The logs forwarder shares the same AWS log handler for multiple event types i.e. Kinesis and Cloudwatch logs. For Kinesis event types, there's a chance of receiving log records from several log group sources as the order is not guaranteed. Therefore, setting unified attributes such as log_group or log_source per event leads to a random behavior when setting the host, service, source and other metadata parameters. To prevent this random behavior, regenerate a new metadata and attributes object per each received event record in the logs handler. * update codeowners file for aws logs forwarder --- .github/CODEOWNERS | 4 +- .../steps/handlers/awslogs_handler.py | 142 +++++++++--------- aws/logs_monitoring/steps/parsing.py | 65 ++++---- .../tests/test_awslogs_handler.py | 73 +++++---- 4 files changed, 140 insertions(+), 144 deletions(-) diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 4d5cbde5b..1d23082a4 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1,5 +1,5 @@ # Azure Integrations azure/ @DataDog/azure-integrations # AWS Integrations -aws/ @DataDog/aws-context -aws/*/*template.yaml @DataDog/aws-context @DataDog/aws-customer-experience +aws/ @DataDog/aws-ints-context +aws/*/*template.yaml @DataDog/aws-ints-context @DataDog/aws-ints-customer-experience diff --git a/aws/logs_monitoring/steps/handlers/awslogs_handler.py b/aws/logs_monitoring/steps/handlers/awslogs_handler.py index f444d5c35..4ab2e24df 100644 --- a/aws/logs_monitoring/steps/handlers/awslogs_handler.py +++ b/aws/logs_monitoring/steps/handlers/awslogs_handler.py @@ -29,45 +29,43 @@ class AwsLogsHandler: - def __init__(self, context, metadata, cache_layer): + def __init__(self, context, cache_layer): self.context = context - self.metadata = metadata self.cache_layer = cache_layer - self.aws_attributes = None - def handle(self, event): + def handle(self, event, metadata): # Get logs logs = self.extract_logs(event) # Build aws attributes - self.aws_attributes = AwsAttributes( + aws_attributes = AwsAttributes( logs.get("logGroup"), logs.get("logStream"), logs.get("logEvents"), logs.get("owner"), ) # Set account and region from lambda function ARN - self.set_account_region() + self.set_account_region(aws_attributes) # Set the source on the logs - self.set_source(event) + self.set_source(event, metadata, aws_attributes) # Add custom tags from cache - self.add_cloudwatch_tags_from_cache() + self.add_cloudwatch_tags_from_cache(metadata, aws_attributes) # Set service from custom tags, which may include the tags set on the log group # Returns DD_SOURCE by default - add_service_tag(self.metadata) + add_service_tag(metadata) # Set host as log group where cloudwatch is source - self.set_host() + self.set_host(metadata, aws_attributes) # For Lambda logs we want to extract the function name, # then rebuild the arn of the monitored lambda using that name. - if self.metadata[DD_SOURCE] == str(AwsEventSource.LAMBDA): - self.process_lambda_logs() + if metadata[DD_SOURCE] == str(AwsEventSource.LAMBDA): + self.process_lambda_logs(metadata, aws_attributes) # The EKS log group contains various sources from the K8S control plane. # In order to have these automatically trigger the correct pipelines they # need to send their events with the correct log source. - if self.metadata[DD_SOURCE] == str(AwsEventSource.EKS): - self.process_eks_logs() + if metadata[DD_SOURCE] == str(AwsEventSource.EKS): + self.process_eks_logs(metadata, aws_attributes) # Create and send structured logs to Datadog for log in logs["logEvents"]: - yield merge_dicts(log, self.aws_attributes.to_dict()) + yield merge_dicts(log, aws_attributes.to_dict()) @staticmethod def extract_logs(event): @@ -79,17 +77,17 @@ def extract_logs(event): data = b"".join(BufferedReader(decompress_stream)) return json.loads(data) - def set_account_region(self): + def set_account_region(self, aws_attributes): try: - self.aws_attributes.set_account_region(self.context.invoked_function_arn) + aws_attributes.set_account_region(self.context.invoked_function_arn) except Exception as e: logger.error( "Unable to set account and region from lambda function ARN: %s" % e ) - def set_source(self, event): - log_group = self.aws_attributes.get_log_group() - log_stream = self.aws_attributes.get_log_stream() + def set_source(self, event, metadata, aws_attributes): + log_group = aws_attributes.get_log_group() + log_stream = aws_attributes.get_log_stream() source = log_group if log_group else str(AwsEventSource.CLOUDWATCH) # Use the logStream to identify if this is a CloudTrail, TransitGateway, or Bedrock event # i.e. 123456779121_CloudTrail_us-east-1 @@ -99,100 +97,98 @@ def set_source(self, event): source = str(AwsEventSource.TRANSITGATEWAY) if str(AwsCwEventSourcePrefix.BEDROCK) in log_stream: source = str(AwsEventSource.BEDROCK) - self.metadata[DD_SOURCE] = parse_event_source(event, source) + metadata[DD_SOURCE] = parse_event_source(event, source) # Special handling for customized log group of Lambda Functions and Step Functions # Multiple functions can share one single customized log group. Need to parse logStream name to determine # Need to place the handling of customized log group at the bottom so that it can correct the source for some edge cases if is_lambda_customized_log_group(log_stream): - self.metadata[DD_SOURCE] = str(AwsEventSource.LAMBDA) + metadata[DD_SOURCE] = str(AwsEventSource.LAMBDA) # Regardless of whether the log group is customized, the corresponding log stream starts with 'states/'." if is_step_functions_log_group(log_stream): - self.metadata[DD_SOURCE] = str(AwsEventSource.STEPFUNCTION) + metadata[DD_SOURCE] = str(AwsEventSource.STEPFUNCTION) - def add_cloudwatch_tags_from_cache(self): - log_group_arn = self.aws_attributes.get_log_group_arn() + def add_cloudwatch_tags_from_cache(self, metadata, aws_attributes): + log_group_arn = aws_attributes.get_log_group_arn() formatted_tags = self.cache_layer.get_cloudwatch_log_group_tags_cache().get( log_group_arn ) if len(formatted_tags) > 0: - self.metadata[DD_CUSTOM_TAGS] = ( + metadata[DD_CUSTOM_TAGS] = ( ",".join(formatted_tags) - if not self.metadata[DD_CUSTOM_TAGS] - else self.metadata[DD_CUSTOM_TAGS] + "," + ",".join(formatted_tags) + if not metadata[DD_CUSTOM_TAGS] + else metadata[DD_CUSTOM_TAGS] + "," + ",".join(formatted_tags) ) - def set_host(self): - if src := self.metadata.get(DD_SOURCE, None): + def set_host(self, metadata, aws_attributes): + if src := metadata.get(DD_SOURCE, None): metadata_source = AwsEventSource._value2member_map_.get(src) else: metadata_source = AwsEventSource.CLOUDWATCH - metadata_host = self.metadata.get(DD_HOST, None) - log_group = self.aws_attributes.get_log_group() + metadata_host = metadata.get(DD_HOST, None) + log_group = aws_attributes.get_log_group() if metadata_host is None: - self.metadata[DD_HOST] = log_group + metadata[DD_HOST] = log_group match metadata_source: case AwsEventSource.CLOUDWATCH: - self.metadata[DD_HOST] = log_group + metadata[DD_HOST] = log_group case AwsEventSource.APPSYNC: - self.metadata[DD_HOST] = log_group.split("/")[-1] + metadata[DD_HOST] = log_group.split("/")[-1] case AwsEventSource.VERIFIED_ACCESS: - self.handle_verified_access_source() + self.handle_verified_access_source(metadata, aws_attributes) case AwsEventSource.STEPFUNCTION: - self.handle_step_function_source() + self.handle_step_function_source(metadata, aws_attributes) - def handle_step_function_source(self): - state_machine_arn = self.get_state_machine_arn() + def handle_verified_access_source(self, metadata, aws_attributes): + try: + message = json.loads(aws_attributes.get_log_events()[0].get("message")) + metadata[DD_HOST] = message.get("http_request").get("url").get("hostname") + except Exception as e: + logger.debug("Unable to set verified-access log host: %s" % e) + + def handle_step_function_source(self, metadata, aws_attributes): + state_machine_arn = self.get_state_machine_arn(aws_attributes) if not state_machine_arn: return - self.metadata[DD_HOST] = state_machine_arn + metadata[DD_HOST] = state_machine_arn formatted_stepfunctions_tags = ( self.cache_layer.get_step_functions_tags_cache().get(state_machine_arn) ) if len(formatted_stepfunctions_tags) > 0: - self.metadata[DD_CUSTOM_TAGS] = ( + metadata[DD_CUSTOM_TAGS] = ( ",".join(formatted_stepfunctions_tags) - if not self.metadata[DD_CUSTOM_TAGS] - else self.metadata[DD_CUSTOM_TAGS] + if not metadata[DD_CUSTOM_TAGS] + else metadata[DD_CUSTOM_TAGS] + "," + ",".join(formatted_stepfunctions_tags) ) if os.environ.get("DD_STEP_FUNCTIONS_TRACE_ENABLED", "false").lower() == "true": - self.metadata[DD_CUSTOM_TAGS] = ",".join( - [self.metadata.get(DD_CUSTOM_TAGS, [])] + metadata[DD_CUSTOM_TAGS] = ",".join( + [metadata.get(DD_CUSTOM_TAGS, [])] + ["dd_step_functions_trace_enabled:true"] ) - def handle_verified_access_source(self): - try: - message = json.loads(self.aws_attributes.get_log_events()[0].get("message")) - self.metadata[DD_HOST] = ( - message.get("http_request").get("url").get("hostname") - ) - except Exception as e: - logger.debug("Unable to set verified-access log host: %s" % e) - - def process_eks_logs(self): - log_stream = self.aws_attributes.get_log_stream() + def process_eks_logs(self, metadata, aws_attributes): + log_stream = aws_attributes.get_log_stream() if log_stream.startswith("kube-apiserver-audit-"): - self.metadata[DD_SOURCE] = "kubernetes.audit" + metadata[DD_SOURCE] = "kubernetes.audit" elif log_stream.startswith("kube-scheduler-"): - self.metadata[DD_SOURCE] = "kube_scheduler" + metadata[DD_SOURCE] = "kube_scheduler" elif log_stream.startswith("kube-apiserver-"): - self.metadata[DD_SOURCE] = "kube-apiserver" + metadata[DD_SOURCE] = "kube-apiserver" elif log_stream.startswith("kube-controller-manager-"): - self.metadata[DD_SOURCE] = "kube-controller-manager" + metadata[DD_SOURCE] = "kube-controller-manager" elif log_stream.startswith("authenticator-"): - self.metadata[DD_SOURCE] = "aws-iam-authenticator" + metadata[DD_SOURCE] = "aws-iam-authenticator" # In case the conditions above don't match we maintain eks as the source - def get_state_machine_arn(self): + def get_state_machine_arn(self, aws_attributes): try: - message = json.loads(self.aws_attributes.get_log_events()[0].get("message")) + message = json.loads(aws_attributes.get_log_events()[0].get("message")) if message.get("execution_arn") is not None: execution_arn = message["execution_arn"] arn_tokens = re.split(r"[:/\\]", execution_arn) @@ -203,8 +199,10 @@ def get_state_machine_arn(self): return "" # Lambda logs can be from either default or customized log group - def process_lambda_logs(self): - lower_cased_lambda_function_name = self.get_lower_cased_lambda_function_name() + def process_lambda_logs(self, metadata, aws_attributes): + lower_cased_lambda_function_name = self.get_lower_cased_lambda_function_name( + aws_attributes + ) if lower_cased_lambda_function_name is None: return @@ -218,23 +216,23 @@ def process_lambda_logs(self): arn_prefix + "function:" + lower_cased_lambda_function_name ) # Add the lowe_rcased arn as a log attribute - self.aws_attributes.set_lambda_arn(lower_cased_lambda_arn) + aws_attributes.set_lambda_arn(lower_cased_lambda_arn) env_tag_exists = ( - self.metadata[DD_CUSTOM_TAGS].startswith("env:") - or ",env:" in self.metadata[DD_CUSTOM_TAGS] + metadata[DD_CUSTOM_TAGS].startswith("env:") + or ",env:" in metadata[DD_CUSTOM_TAGS] ) # If there is no env specified, default to env:none if not env_tag_exists: - self.metadata[DD_CUSTOM_TAGS] += ",env:none" + metadata[DD_CUSTOM_TAGS] += ",env:none" # The lambda function name can be inferred from either a customized logstream name, or a loggroup name - def get_lower_cased_lambda_function_name(self): + def get_lower_cased_lambda_function_name(self, aws_attributes): # function name parsed from logstream is preferred for handling some edge cases function_name = get_lambda_function_name_from_logstream_name( - self.aws_attributes.get_log_stream() + aws_attributes.get_log_stream() ) if function_name is None: - log_group_parts = self.aws_attributes.get_log_group().split("/lambda/") + log_group_parts = aws_attributes.get_log_group().split("/lambda/") if len(log_group_parts) > 1: function_name = log_group_parts[1] else: diff --git a/aws/logs_monitoring/steps/parsing.py b/aws/logs_monitoring/steps/parsing.py index 6c972d322..fe58ff551 100644 --- a/aws/logs_monitoring/steps/parsing.py +++ b/aws/logs_monitoring/steps/parsing.py @@ -48,14 +48,16 @@ def parse(event, context, cache_layer): s3_handler = S3EventHandler(context, metadata, cache_layer) events = s3_handler.handle(event) case AwsEventType.AWSLOGS: - aws_handler = AwsLogsHandler(context, metadata, cache_layer) - events = aws_handler.handle(event) + aws_handler = AwsLogsHandler(context, cache_layer) + # regenerate a metadata object for each event + metadata = generate_metadata(context) + events = aws_handler.handle(event, metadata) case AwsEventType.EVENTS: events = cwevent_handler(event, metadata) case AwsEventType.SNS: events = sns_handler(event, metadata) case AwsEventType.KINESIS: - events = kinesis_awslogs_handler(event, context, metadata, cache_layer) + events = kinesis_awslogs_handler(event, context, cache_layer) case _: events = ["Parsing: Unsupported event type"] except Exception as e: @@ -70,31 +72,6 @@ def parse(event, context, cache_layer): return normalize_events(events, metadata) -def generate_metadata(context): - metadata = { - SOURCECATEGORY_STRING: AWS_STRING, - AWS_STRING: { - FUNCTIONVERSION_STRING: context.function_version, - INVOKEDFUNCTIONARN_STRING: context.invoked_function_arn, - }, - } - # Add custom tags here by adding new value with the following format "key1:value1, key2:value2" - might be subject to modifications - dd_custom_tags_data = generate_custom_tags(context) - metadata[DD_CUSTOM_TAGS] = ",".join( - filter( - None, - [ - DD_TAGS, - ",".join( - ["{}:{}".format(k, v) for k, v in dd_custom_tags_data.items()] - ), - ], - ) - ) - - return metadata - - def generate_custom_tags(context): dd_custom_tags_data = { FORWARDERNAME_STRING: context.function_name.lower(), @@ -155,16 +132,42 @@ def sns_handler(event, metadata): # Handle CloudWatch logs from Kinesis -def kinesis_awslogs_handler(event, context, metadata, cache_layer): +def kinesis_awslogs_handler(event, context, cache_layer): def reformat_record(record): return {"awslogs": {"data": record["kinesis"]["data"]}} - awslogs_handler = AwsLogsHandler(context, metadata, cache_layer) + awslogs_handler = AwsLogsHandler(context, cache_layer) return itertools.chain.from_iterable( - awslogs_handler.handle(reformat_record(r)) for r in event["Records"] + awslogs_handler.handle(reformat_record(r), generate_metadata(context)) + for r in event["Records"] ) +def generate_metadata(context): + metadata = { + SOURCECATEGORY_STRING: AWS_STRING, + AWS_STRING: { + FUNCTIONVERSION_STRING: context.function_version, + INVOKEDFUNCTIONARN_STRING: context.invoked_function_arn, + }, + } + # Add custom tags here by adding new value with the following format "key1:value1, key2:value2" - might be subject to modifications + dd_custom_tags_data = generate_custom_tags(context) + metadata[DD_CUSTOM_TAGS] = ",".join( + filter( + None, + [ + DD_TAGS, + ",".join( + ["{}:{}".format(k, v) for k, v in dd_custom_tags_data.items()] + ), + ], + ) + ) + + return metadata + + def normalize_events(events, metadata): normalized = [] events_counter = 0 diff --git a/aws/logs_monitoring/tests/test_awslogs_handler.py b/aws/logs_monitoring/tests/test_awslogs_handler.py index 631c2c75a..c1c77c425 100644 --- a/aws/logs_monitoring/tests/test_awslogs_handler.py +++ b/aws/logs_monitoring/tests/test_awslogs_handler.py @@ -68,8 +68,8 @@ def test_awslogs_handler_rds_postgresql(self, mock_cache_init): return_value=["test_tag_key:test_tag_value"] ) - awslogs_handler = AwsLogsHandler(context, metadata, cache_layer) - verify_as_json(list(awslogs_handler.handle(event))) + awslogs_handler = AwsLogsHandler(context, cache_layer) + verify_as_json(list(awslogs_handler.handle(event, metadata))) verify_as_json(metadata, options=NamerFactory.with_parameters("metadata")) @patch("caching.cloudwatch_log_group_cache.CloudwatchLogGroupTagsCache.__init__") @@ -117,15 +117,13 @@ def test_awslogs_handler_step_functions_tags_added_properly( ) cache_layer._cloudwatch_log_group_cache.get = MagicMock() - awslogs_handler = AwsLogsHandler(context, metadata, cache_layer) - verify_as_json(list(awslogs_handler.handle(event))) + awslogs_handler = AwsLogsHandler(context, cache_layer) + verify_as_json(list(awslogs_handler.handle(event, metadata))) verify_as_json(metadata, options=NamerFactory.with_parameters("metadata")) # verify that the handling can properly handle SF logs with the default log group naming + self.assertEqual(metadata[DD_SOURCE], AwsEventSource.STEPFUNCTION.value) self.assertEqual( - awslogs_handler.metadata[DD_SOURCE], AwsEventSource.STEPFUNCTION.value - ) - self.assertEqual( - awslogs_handler.metadata[DD_HOST], + metadata[DD_HOST], "arn:aws:states:us-east-1:12345678910:stateMachine:StepFunction1", ) @@ -175,15 +173,15 @@ def test_awslogs_handler_step_functions_customized_log_group( ) cache_layer._cloudwatch_log_group_cache.get = MagicMock() - awslogs_handler = AwsLogsHandler(context, metadata, cache_layer) + awslogs_handler = AwsLogsHandler(context, cache_layer) # for some reasons, the below two are needed to update the context of the handler - verify_as_json(list(awslogs_handler.handle(eventFromCustomizedLogGroup))) - verify_as_json(metadata, options=NamerFactory.with_parameters("metadata")) - self.assertEqual( - awslogs_handler.metadata[DD_SOURCE], AwsEventSource.STEPFUNCTION.value + verify_as_json( + list(awslogs_handler.handle(eventFromCustomizedLogGroup, metadata)) ) + verify_as_json(metadata, options=NamerFactory.with_parameters("metadata")) + self.assertEqual(metadata[DD_SOURCE], AwsEventSource.STEPFUNCTION.value) self.assertEqual( - awslogs_handler.metadata[DD_HOST], + metadata[DD_HOST], "arn:aws:states:us-east-1:12345678910:stateMachine:StepFunction2", ) @@ -202,11 +200,10 @@ def test_process_lambda_logs(self): stepfunction_loggroup.get("owner"), ) context = None - aws_handler = AwsLogsHandler(context, metadata, CacheLayer("")) - aws_handler.aws_attributes = aws_attributes + aws_handler = AwsLogsHandler(context, CacheLayer("")) - aws_handler.process_lambda_logs() - self.assertEqual(aws_handler.metadata, {"ddsource": "postgresql", "ddtags": ""}) + aws_handler.process_lambda_logs(metadata, aws_attributes) + self.assertEqual(metadata, {"ddsource": "postgresql", "ddtags": ""}) # Lambda log lambda_default_loggroup = { @@ -224,9 +221,8 @@ def test_process_lambda_logs(self): context = MagicMock() context.invoked_function_arn = "arn:aws:lambda:sa-east-1:601427279990:function:inferred-spans-python-dev-initsender" - aws_handler = AwsLogsHandler(context, metadata, CacheLayer("")) - aws_handler.aws_attributes = aws_attributes - aws_handler.process_lambda_logs() + aws_handler = AwsLogsHandler(context, CacheLayer("")) + aws_handler.process_lambda_logs(metadata, aws_attributes) self.assertEqual( metadata, { @@ -241,8 +237,7 @@ def test_process_lambda_logs(self): # env not set metadata = {"ddsource": "postgresql", "ddtags": ""} - aws_handler.metadata = metadata - aws_handler.process_lambda_logs() + aws_handler.process_lambda_logs(metadata, aws_attributes) self.assertEqual( metadata, { @@ -254,48 +249,48 @@ def test_process_lambda_logs(self): class TestLambdaCustomizedLogGroup(unittest.TestCase): def setUp(self): - self.aws_handler = AwsLogsHandler(None, None, None) + self.aws_handler = AwsLogsHandler(None, None) def test_get_lower_cased_lambda_function_name(self): self.assertEqual(True, True) # Non Lambda log - self.aws_handler.aws_attributes = AwsAttributes( + aws_attributes = AwsAttributes( "/aws/vendedlogs/states/logs-to-traces-sequential-Logs", "states/logs-to-traces-sequential/2022-11-10-15-50/7851b2d9", [], ) self.assertEqual( - self.aws_handler.get_lower_cased_lambda_function_name(), + self.aws_handler.get_lower_cased_lambda_function_name(aws_attributes), None, ) - self.aws_handler.aws_attributes = AwsAttributes( + aws_attributes = AwsAttributes( "/aws/lambda/test-lambda-default-log-group", "2023/11/06/[$LATEST]b25b1f977b3e416faa45a00f427e7acb", [], ) self.assertEqual( - self.aws_handler.get_lower_cased_lambda_function_name(), + self.aws_handler.get_lower_cased_lambda_function_name(aws_attributes), "test-lambda-default-log-group", ) - self.aws_handler.aws_attributes = AwsAttributes( + aws_attributes = AwsAttributes( "customizeLambdaGrop", "2023/11/06/test-customized-log-group1[$LATEST]13e304cba4b9446eb7ef082a00038990", [], ) self.assertEqual( - self.aws_handler.get_lower_cased_lambda_function_name(), + self.aws_handler.get_lower_cased_lambda_function_name(aws_attributes), "test-customized-log-group1", ) class TestParsingStepFunctionLogs(unittest.TestCase): def setUp(self): - self.aws_handler = AwsLogsHandler(None, None, None) + self.aws_handler = AwsLogsHandler(None, None) def test_get_state_machine_arn(self): - self.aws_handler.aws_attributes = AwsAttributes( + aws_attributes = AwsAttributes( log_events=[ { "message": json.dumps({"no_execution_arn": "xxxx/yyy"}), @@ -303,9 +298,9 @@ def test_get_state_machine_arn(self): ] ) - self.assertEqual(self.aws_handler.get_state_machine_arn(), "") + self.assertEqual(self.aws_handler.get_state_machine_arn(aws_attributes), "") - self.aws_handler.aws_attributes = AwsAttributes( + aws_attributes = AwsAttributes( log_events=[ { "message": json.dumps( @@ -317,11 +312,11 @@ def test_get_state_machine_arn(self): ] ) self.assertEqual( - self.aws_handler.get_state_machine_arn(), + self.aws_handler.get_state_machine_arn(aws_attributes), "arn:aws:states:sa-east-1:425362996713:stateMachine:my-Various-States", ) - self.aws_handler.aws_attributes = AwsAttributes( + aws_attributes = AwsAttributes( log_events=[ { "message": json.dumps( @@ -334,11 +329,11 @@ def test_get_state_machine_arn(self): ) self.assertEqual( - self.aws_handler.get_state_machine_arn(), + self.aws_handler.get_state_machine_arn(aws_attributes), "arn:aws:states:sa-east-1:425362996713:stateMachine:my-Various-States", ) - self.aws_handler.aws_attributes = AwsAttributes( + aws_attributes = AwsAttributes( log_events=[ { "message": json.dumps( @@ -350,7 +345,7 @@ def test_get_state_machine_arn(self): ] ) self.assertEqual( - self.aws_handler.get_state_machine_arn(), + self.aws_handler.get_state_machine_arn(aws_attributes), "arn:aws:states:sa-east-1:425362996713:stateMachine:my-Various-States", )