Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ignore hubspot 400 already unsubscribed (Draft) #5211

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/fides/api/schemas/saas/strategy_configuration.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from enum import Enum
from http import HTTPStatus
from typing import Any, Dict, List, Optional, Union

from pydantic import BaseModel, root_validator, validator
Expand Down Expand Up @@ -26,6 +27,15 @@ class FilterPostProcessorConfiguration(StrategyConfiguration):
case_sensitive: bool = True


class ErrorValidationPostProcessorConfiguration(StrategyConfiguration):
"""Validates that we ignore errors For a Given code with a Given Message"""

http_code: HTTPStatus
##TODO: Accept a list of expected message
expected_message: str
error_message_field: str


class OffsetPaginationConfiguration(StrategyConfiguration):
"""
Increases the value of the query param `incremental_param` by the `increment_by` until the `limit` is hit
Expand Down
7 changes: 6 additions & 1 deletion src/fides/api/service/connectors/saas_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,10 +417,12 @@ def execute_prepared_request(
response_data = self._unwrap_response_data(saas_request, response)

# process response and add to rows
## We could set up a cleaner process_response, but at mask_data() we are using this function too
rows = self.process_response_data(
response_data,
identity_data,
cast(Optional[List[PostProcessorStrategy]], saas_request.postprocessors),
response,
)

logger.info(
Expand Down Expand Up @@ -454,6 +456,7 @@ def process_response_data(
response_data: Union[List[Dict[str, Any]], Dict[str, Any]],
identity_data: Dict[str, Any],
postprocessors: Optional[List[PostProcessorStrategy]],
response: Response = None,
) -> List[Row]:
"""
Runs the raw response through all available postprocessors for the request,
Expand All @@ -474,7 +477,9 @@ def process_response_data(
postprocessor.strategy, # type: ignore
)
try:
processed_data = strategy.process(processed_data, identity_data)
processed_data = strategy.process(
processed_data, response, identity_data
)
except Exception as exc:
raise PostProcessingException(
f"Exception occurred during the '{postprocessor.strategy}' postprocessor " # type: ignore
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from fides.api.service.processors.post_processor_strategy import (
post_processor_strategy_error_validation,
post_processor_strategy_filter,
post_processor_strategy_unwrap,
)
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from abc import abstractmethod
from typing import Any, Dict, List, Union

from requests import Response

from fides.api.service.strategy import Strategy


Expand All @@ -9,6 +11,6 @@ class PostProcessorStrategy(Strategy):

@abstractmethod
def process(
self, data: Any, identity_data: Dict[str, Any] = None
self, data: Any, response: Response = None, identity_data: Dict[str, Any] = None
) -> Union[List[Dict[str, Any]], Dict[str, Any]]:
"""Process data from SaaS connector"""
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from typing import Any, Dict, List, Union

from requests import Response

from fides.api.schemas.saas.strategy_configuration import (
ErrorValidationPostProcessorConfiguration,
)
from fides.api.service.processors.post_processor_strategy.post_processor_strategy import (
PostProcessorStrategy,
)


class ErrorValidationPostProcessorStrategy(PostProcessorStrategy):
"""
Expands on the ignore_errors flag.Validates that a Given error is a non-critical error that can be ignored
There are some API's where diferent errors have the same error code (I.E 400).
For those cases, we need to check the message in order to figure out if the error is ignorable

E.g:
ignorable_response = [
"HttpCode" : 400
"status" : "error",
"message" : "[email protected] is already unsubscribed from subscription 1234567",
]
v/s
non_ignorable_response = [
"HttpCode" : 400
"status" : "error",
"message" : "Given account does not have permissions to remove the content ",
]
"""

name = "error_validation"
##TODO: Check configuration model
configuration_model = ErrorValidationPostProcessorConfiguration

def __init__(self, configuration: ErrorValidationPostProcessorConfiguration):
self.http_code = configuration.http_code
self.expected_message = configuration.expected_message
self.error_message_field = configuration.error_message_field

def process(
self,
data: Any,
response: Response = None,
identity_data: Dict[str, Any] = None,
) -> Union[List[Dict[str, Any]], Dict[str, Any]]:
"""
:param data: A list or dict
:param Response: The Response given by the endpoint
:param identity_data: dict of cached identity data
:return: data for processing
"""
if response is None:
return data

response_json = response.json()
error_message = response_json.get(self.error_message_field)
## TODO: expand for multiple error messages
if (
response.status_code == self.http_code
and self.expected_message in error_message
):
## Managed similar to saas_connector::handle_error_response: return empty
return []
## Else, we proceed as normal
return data
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
from fides.api.service.processors.post_processor_strategy.post_processor_strategy import (
PostProcessorStrategy,
)
from fides.api.service.processors.post_processor_strategy.post_processor_strategy_error_validation import (
ErrorValidationPostProcessorStrategy,
)
from fides.api.service.processors.post_processor_strategy.post_processor_strategy_filter import (
FilterPostProcessorStrategy,
)
Expand All @@ -24,6 +27,7 @@ class SupportedPostProcessorStrategies(Enum):

unwrap = UnwrapPostProcessorStrategy
filter = FilterPostProcessorStrategy
error_validation = ErrorValidationPostProcessorStrategy

@classmethod
def __contains__(cls, item: str) -> bool:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import pydash
from loguru import logger
from requests import Response

from fides.api.common_exceptions import FidesopsException
from fides.api.schemas.saas.shared_schemas import IdentityParamRef
Expand Down Expand Up @@ -48,9 +49,11 @@ def __init__(self, configuration: FilterPostProcessorConfiguration):
self.exact = configuration.exact
self.case_sensitive = configuration.case_sensitive

## We might need to add a response = null?
def process(
self,
data: Union[List[Dict[str, Any]], Dict[str, Any]],
response: Response = None,
identity_data: Dict[str, Any] = None,
) -> Union[List[Dict[str, Any]], Dict[str, Any]]:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import pydash
from loguru import logger
from requests import Response

from fides.api.schemas.saas.strategy_configuration import (
UnwrapPostProcessorConfiguration,
Expand Down Expand Up @@ -38,9 +39,11 @@ class UnwrapPostProcessorStrategy(PostProcessorStrategy):
def __init__(self, configuration: UnwrapPostProcessorConfiguration):
self.data_path = configuration.data_path

## We might need to add a response = null?
def process(
self,
data: Union[List[Dict[str, Any]], Dict[str, Any]],
response: Response = None,
identity_data: Dict[str, Any] = None,
) -> Union[List[Dict[str, Any]], Dict[str, Any]]:
"""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from typing import Any, Dict

import pytest

from pydantic.error_wrappers import ValidationError
from fides.api.schemas.saas.strategy_configuration import (
ErrorValidationPostProcessorConfiguration,
)
from fides.api.service.processors.post_processor_strategy.post_processor_strategy_error_validation import (
ErrorValidationPostProcessorStrategy,
)


def test_http_code_must_be_recognized():
with pytest.raises(ValidationError):
config = ErrorValidationPostProcessorConfiguration(
http_code=497,
error_message_field="error_msg",
expected_message="Some Error Msg",
)

def test_error_message_is_removed_when_expected():
print("To Be Done")


def test_error_message_is_kept_when_not_expected():
print("To Be Done")


def test_data_is_undisturbed_when_no_error_message():
print("To be Done")


def test_list_of_error_messages_is_checked_properly():
print("To Be Implemented")
##TODO: What do we use when we want a Data provider on pytest?
##The idea would be to have a list of 3 expected error messages
## And set the error message one by one
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
from fides.api.service.processors.post_processor_strategy.post_processor_strategy import (
PostProcessorStrategy,
)
from fides.api.service.processors.post_processor_strategy.post_processor_strategy_error_validation import (
ErrorValidationPostProcessorStrategy,
)
from fides.api.service.processors.post_processor_strategy.post_processor_strategy_filter import (
FilterPostProcessorStrategy,
)
Expand All @@ -28,6 +31,18 @@ def test_get_strategy_unwrap():
assert isinstance(strategy, UnwrapPostProcessorStrategy)


def test_strategy_error_validation():
config = {
"http_code": 400,
"error_message_field": "error_msg",
"expected_message": "This is an Example",
}
strategy = PostProcessorStrategy.get_strategy(
strategy_name="error_validation", configuration=config
)
assert isinstance(strategy, ErrorValidationPostProcessorStrategy)


def test_get_strategy_invalid_config():
with pytest.raises(ValidationError):
PostProcessorStrategy.get_strategy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def test_filter_array_by_identity_reference():
},
]
processor = FilterPostProcessorStrategy(configuration=config)
result = processor.process(data, identity_data)
result = processor.process(data, identity_data=identity_data)
assert result == [
{
"id": 1397429347,
Expand Down Expand Up @@ -57,7 +57,7 @@ def test_filter_array_by_identity_reference_no_results():
},
]
processor = FilterPostProcessorStrategy(configuration=config)
result = processor.process(data, identity_data)
result = processor.process(data, identity_data=identity_data)
assert result == []


Expand Down