Skip to content

Commit

Permalink
fix: Fix missing type definitions
Browse files Browse the repository at this point in the history
Signed-off-by: Tudor Plugaru <[email protected]>
  • Loading branch information
PlugaruT committed Nov 14, 2024
1 parent d0bba86 commit 599d05c
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 32 deletions.
78 changes: 53 additions & 25 deletions src/cloudevents/core/v1/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def __init__(self, attributes: dict[str, Any], data: Optional[dict] = None) -> N
:raises ValueError: If any of the required attributes are missing or have invalid values.
:raises TypeError: If any of the attributes have invalid types.
"""
self._validate_attribute(attributes)
self._validate_attribute(attributes=attributes)
self._attributes: dict[str, Any] = attributes
self._data: Optional[dict] = data

Expand All @@ -66,9 +66,9 @@ def _validate_attribute(attributes: dict[str, Any]) -> None:
See https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md#required-attributes
"""
errors: dict[str, list[BaseCloudEventException]] = defaultdict(list)
errors.update(CloudEvent._validate_required_attributes(attributes))
errors.update(CloudEvent._validate_optional_attributes(attributes))
errors.update(CloudEvent._validate_extension_attributes(attributes))
errors.update(CloudEvent._validate_required_attributes(attributes=attributes))
errors.update(CloudEvent._validate_optional_attributes(attributes=attributes))
errors.update(CloudEvent._validate_extension_attributes(attributes=attributes))
if errors:
raise CloudEventValidationError(dict(errors))

Expand All @@ -88,33 +88,46 @@ def _validate_required_attributes(
errors["id"].append(MissingRequiredAttributeError(attribute_name="id"))
if attributes.get("id") is None:
errors["id"].append(
InvalidAttributeValueError("id", "Attribute 'id' must not be None")
InvalidAttributeValueError(
attribute_name="id", msg="Attribute 'id' must not be None"
)
)
if not isinstance(attributes.get("id"), str):
errors["id"].append(InvalidAttributeTypeError("id", str))
errors["id"].append(
InvalidAttributeTypeError(attribute_name="id", expected_type=str)
)

if "source" not in attributes:
errors["source"].append(
MissingRequiredAttributeError(attribute_name="source")
)
if not isinstance(attributes.get("source"), str):
errors["source"].append(InvalidAttributeTypeError("source", str))
errors["source"].append(
InvalidAttributeTypeError(attribute_name="source", expected_type=str)
)

if "type" not in attributes:
errors["type"].append(MissingRequiredAttributeError(attribute_name="type"))
if not isinstance(attributes.get("type"), str):
errors["type"].append(InvalidAttributeTypeError("type", str))
errors["type"].append(
InvalidAttributeTypeError(attribute_name="type", expected_type=str)
)

if "specversion" not in attributes:
errors["specversion"].append(
MissingRequiredAttributeError(attribute_name="specversion")
)
if not isinstance(attributes.get("specversion"), str):
errors["specversion"].append(InvalidAttributeTypeError("specversion", str))
errors["specversion"].append(
InvalidAttributeTypeError(
attribute_name="specversion", expected_type=str
)
)
if attributes.get("specversion") != "1.0":
errors["specversion"].append(
InvalidAttributeValueError(
"specversion", "Attribute 'specversion' must be '1.0'"
attribute_name="specversion",
msg="Attribute 'specversion' must be '1.0'",
)
)
return errors
Expand All @@ -133,43 +146,58 @@ def _validate_optional_attributes(

if "time" in attributes:
if not isinstance(attributes["time"], datetime):
errors["time"].append(InvalidAttributeTypeError("time", datetime))
errors["time"].append(
InvalidAttributeTypeError(
attribute_name="time", expected_type=datetime
)
)
if hasattr(attributes["time"], "tzinfo") and not attributes["time"].tzinfo:
errors["time"].append(
InvalidAttributeValueError(
"time", "Attribute 'time' must be timezone aware"
attribute_name="time",
msg="Attribute 'time' must be timezone aware",
)
)
if "subject" in attributes:
if not isinstance(attributes["subject"], str):
errors["subject"].append(InvalidAttributeTypeError("subject", str))
errors["subject"].append(
InvalidAttributeTypeError(
attribute_name="subject", expected_type=str
)
)
if not attributes["subject"]:
errors["subject"].append(
InvalidAttributeValueError(
"subject", "Attribute 'subject' must not be empty"
attribute_name="subject",
msg="Attribute 'subject' must not be empty",
)
)
if "datacontenttype" in attributes:
if not isinstance(attributes["datacontenttype"], str):
errors["datacontenttype"].append(
InvalidAttributeTypeError("datacontenttype", str)
InvalidAttributeTypeError(
attribute_name="datacontenttype", expected_type=str
)
)
if not attributes["datacontenttype"]:
errors["datacontenttype"].append(
InvalidAttributeValueError(
"datacontenttype",
"Attribute 'datacontenttype' must not be empty",
attribute_name="datacontenttype",
msg="Attribute 'datacontenttype' must not be empty",
)
)
if "dataschema" in attributes:
if not isinstance(attributes["dataschema"], str):
errors["dataschema"].append(
InvalidAttributeTypeError("dataschema", str)
InvalidAttributeTypeError(
attribute_name="dataschema", expected_type=str
)
)
if not attributes["dataschema"]:
errors["dataschema"].append(
InvalidAttributeValueError(
"dataschema", "Attribute 'dataschema' must not be empty"
attribute_name="dataschema",
msg="Attribute 'dataschema' must not be empty",
)
)
return errors
Expand All @@ -194,22 +222,22 @@ def _validate_extension_attributes(
if extension_attribute == "data":
errors[extension_attribute].append(
CustomExtensionAttributeError(
extension_attribute,
"Extension attribute 'data' is reserved and must not be used",
attribute_name=extension_attribute,
msg="Extension attribute 'data' is reserved and must not be used",
)
)
if not (1 <= len(extension_attribute) <= 20):
errors[extension_attribute].append(
CustomExtensionAttributeError(
extension_attribute,
f"Extension attribute '{extension_attribute}' should be between 1 and 20 characters long",
attribute_name=extension_attribute,
msg=f"Extension attribute '{extension_attribute}' should be between 1 and 20 characters long",
)
)
if not re.match(r"^[a-z0-9]+$", extension_attribute):
errors[extension_attribute].append(
CustomExtensionAttributeError(
extension_attribute,
f"Extension attribute '{extension_attribute}' should only contain lowercase letters and numbers",
attribute_name=extension_attribute,
msg=f"Extension attribute '{extension_attribute}' should only contain lowercase letters and numbers",
)
)
return errors
Expand Down
12 changes: 5 additions & 7 deletions src/cloudevents/core/v1/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
class BaseCloudEventException(Exception):
"""A CloudEvent generic exception."""

pass


class CloudEventValidationError(BaseCloudEventException):
"""
Expand All @@ -31,7 +29,7 @@ def __init__(self, errors: dict[str, list[BaseCloudEventException]]) -> None:
self.errors: dict[str, list[BaseCloudEventException]] = errors

def __str__(self) -> str:
error_messages = [
error_messages: list[str] = [
f"{key}: {', '.join(str(e) for e in value)}"
for key, value in self.errors.items()
]
Expand All @@ -52,8 +50,8 @@ class CustomExtensionAttributeError(BaseCloudEventException, ValueError):
Raised when a custom extension attribute violates naming conventions.
"""

def __init__(self, extension_attribute: str, msg: str) -> None:
self.extension_attribute = extension_attribute
def __init__(self, attribute_name: str, msg: str) -> None:
self.attribute_name: str = attribute_name
super().__init__(msg)


Expand All @@ -63,7 +61,7 @@ class InvalidAttributeTypeError(BaseCloudEventException, TypeError):
"""

def __init__(self, attribute_name: str, expected_type: type) -> None:
self.attribute_name = attribute_name
self.attribute_name: str = attribute_name
super().__init__(f"Attribute '{attribute_name}' must be a {expected_type}")


Expand All @@ -73,5 +71,5 @@ class InvalidAttributeValueError(BaseCloudEventException, ValueError):
"""

def __init__(self, attribute_name: str, msg: str) -> None:
self.attribute_name = attribute_name
self.attribute_name: str = attribute_name
super().__init__(msg)

0 comments on commit 599d05c

Please sign in to comment.