Skip to content

Commit

Permalink
Merge pull request #320 from RWTH-EBC/301-ngsi-ld-validate-nested-pro…
Browse files Browse the repository at this point in the history
…perties-dynamically

301 ngsi ld validate nested properties dynamically
  • Loading branch information
djs0109 authored Oct 29, 2024
2 parents 4dc2b6f + 8949dbe commit 34355bf
Show file tree
Hide file tree
Showing 3 changed files with 315 additions and 198 deletions.
255 changes: 124 additions & 131 deletions filip/models/ngsi_ld/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
"""
import logging
from typing import Any, List, Dict, Union, Optional

from geojson_pydantic import Point, MultiPoint, LineString, MultiLineString, Polygon, \
MultiPolygon, GeometryCollection
from typing_extensions import Self
from aenum import Enum
from pydantic import field_validator, ConfigDict, BaseModel, Field
from pydantic import field_validator, ConfigDict, BaseModel, Field, model_validator
from filip.models.ngsi_v2 import ContextEntity
from filip.utils.validators import FiwareRegex, \
validate_fiware_datatype_string_protect, validate_fiware_standard_regex
Expand All @@ -17,7 +19,7 @@ class DataTypeLD(str, Enum):
In NGSI-LD the data types on context entities are only divided into properties and relationships.
"""
_init_ = 'value __doc__'

GEOPROPERTY = "GeoProperty", "A property that represents a geometry value"
PROPERTY = "Property", "All attributes that do not represent a relationship"
RELATIONSHIP = "Relationship", "Reference to another context entity, which can be identified with a URN."

Expand Down Expand Up @@ -97,6 +99,15 @@ class ContextProperty(BaseModel):
)
field_validator("datasetId")(validate_fiware_datatype_string_protect)

@classmethod
def get_model_fields_set(cls):
"""
Get all names and aliases of the model fields.
"""
return set([field.validation_alias
for (_, field) in cls.model_fields.items()] +
[field_name for field_name in cls.model_fields])

@field_validator("type")
@classmethod
def check_property_type(cls, value):
Expand All @@ -107,19 +118,15 @@ def check_property_type(cls, value):
Returns:
value
"""
if not value == "Property":
if value == "Relationship":
value == "Relationship"
elif value == "TemporalProperty":
value == "TemporalProperty"
else:
logging.warning(msg='NGSI_LD Properties must have type "Property"')
value = "Property"
valid_property_types = ["Property", "Relationship", "TemporalProperty"]
if value not in valid_property_types:
msg = f'NGSI_LD Properties must have type {valid_property_types}, ' \
f'not "{value}"'
logging.warning(msg=msg)
raise ValueError(msg)
return value




class NamedContextProperty(ContextProperty):
"""
Context properties are properties of context entities. For example, the current speed of a car could be modeled
Expand Down Expand Up @@ -159,48 +166,31 @@ class ContextGeoPropertyValue(BaseModel):
"""
type: Optional[str] = Field(
default="Point",
default=None,
title="type",
frozen=True
)
coordinates: List[float] = Field(
default=None,
title="Geo property coordinates",
description="the actual coordinates"
)
@field_validator("type")
@classmethod
def check_geoproperty_value_type(cls, value):
"""
Force property type to be "Point"
Args:
value: value field
Returns:
value
"""
if not value == "Point":
logging.warning(msg='NGSI_LD GeoProperty values must have type "Point"')
value = "Point"
return value
model_config = ConfigDict(extra='allow')

@field_validator("coordinates")
@classmethod
def check_geoproperty_value_coordinates(cls, value):
@model_validator(mode='after')
def check_geoproperty_value(self) -> Self:
"""
Force property coordinates to be lis of two floats
Args:
value: value field
Returns:
value
Check if the value is a valid GeoProperty
"""
if not isinstance(value, list) or len(value) != 2:
logging.error(msg='NGSI_LD GeoProperty values must have coordinates as list with length two')
raise ValueError
for element in value:
if not isinstance(element, float):
logging.error(msg='NGSI_LD GeoProperty values must have coordinates as list of floats')
raise TypeError
return value
if self.model_dump().get("type") == "Point":
return Point(**self.model_dump())
elif self.model_dump().get("type") == "LineString":
return LineString(**self.model_dump())
elif self.model_dump().get("type") == "Polygon":
return Polygon(**self.model_dump())
elif self.model_dump().get("type") == "MultiPoint":
return MultiPoint(**self.model_dump())
elif self.model_dump().get("type") == "MultiLineString":
return MultiLineString(**self.model_dump())
elif self.model_dump().get("type") == "MultiPolygon":
return MultiPolygon(**self.model_dump())
elif self.model_dump().get("type") == "GeometryCollection":
return GeometryCollection(**self.model_dump())


class ContextGeoProperty(BaseModel):
Expand All @@ -211,7 +201,7 @@ class ContextGeoProperty(BaseModel):
Example:
"location": {
{
"type": "GeoProperty",
"value": {
"type": "Point",
Expand All @@ -220,7 +210,6 @@ class ContextGeoProperty(BaseModel):
43.46296641666926
]
}
}
"""
model_config = ConfigDict(extra='allow')
Expand All @@ -229,7 +218,10 @@ class ContextGeoProperty(BaseModel):
title="type",
frozen=True
)
value: Optional[ContextGeoPropertyValue] = Field(
value: Optional[Union[ContextGeoPropertyValue,
Point, LineString, Polygon,
MultiPoint, MultiPolygon,
MultiLineString, GeometryCollection]] = Field(
default=None,
title="GeoProperty value",
description="the actual data"
Expand All @@ -252,28 +244,6 @@ class ContextGeoProperty(BaseModel):
)
field_validator("datasetId")(validate_fiware_datatype_string_protect)

@field_validator("type")
@classmethod
def check_geoproperty_type(cls, value):
"""
Force property type to be "GeoProperty"
Args:
value: value field
Returns:
value
"""
if not value == "GeoProperty":
if value == "Relationship":
value == "Relationship"
elif value == "TemporalProperty":
value == "TemporalProperty"
else:
logging.warning(msg='NGSI_LD GeoProperties must have type "GeoProperty" '
'-> They are checked first, so if no GeoProperties are used ignore this warning!')
raise ValueError('NGSI_LD GeoProperties must have type "GeoProperty" '
'-> They are checked first, so if no GeoProperties are used ignore this warning!')
return value


class NamedContextGeoProperty(ContextGeoProperty):
"""
Expand Down Expand Up @@ -538,23 +508,52 @@ def __init__(self,
data.update(self._validate_attributes(data))
super().__init__(id=id, type=type, **data)

# TODO we should distinguish between context relationship
@classmethod
def get_model_fields_set(cls):
"""
Get all names and aliases of the model fields.
"""
return set([field.validation_alias
for (_, field) in cls.model_fields.items()] +
[field_name for field_name in cls.model_fields])

# TODO should geoproperty has subproperties? and can geoproperty be subproperties?
@classmethod
def _validate_single_property(cls, attr) -> ContextProperty:
property_fields = ContextProperty.get_model_fields_set()
property_fields.remove(None)
# subattrs = {}
if attr.get("type") == "Relationship":
attr_instance = ContextRelationship.model_validate(attr)
elif attr.get("type") == "GeoProperty":
try:
attr_instance = ContextGeoProperty.model_validate(attr)
except Exception as e:
pass
elif attr.get("type") == "Property":
attr_instance = ContextProperty.model_validate(attr)
else:
raise ValueError(f"Attribute {attr.get('type')} "
"is not a valid type")
for subkey, subattr in attr.items():
# TODO can we ensure that the subattr can only be dict?
if isinstance(subattr, dict) and subkey not in property_fields:
attr_instance.model_extra.update(
{subkey: cls._validate_single_property(attr=subattr)}
)
return attr_instance

@classmethod
def _validate_attributes(cls, data: Dict):
fields = set([field.validation_alias for (_, field) in cls.model_fields.items()] +
[field_name for field_name in cls.model_fields])
fields.remove(None)
entity_fields = cls.get_model_fields_set()
entity_fields.remove(None)
# Initialize the attribute dictionary
attrs = {}

# Iterate through the data
for key, attr in data.items():
# Check if the keyword is not already present in the fields
if key not in fields:
try:
attrs[key] = ContextGeoProperty.model_validate(attr)
except ValueError:
attrs[key] = ContextProperty.model_validate(attr)
if key not in entity_fields:
attrs[key] = cls._validate_single_property(attr=attr)
return attrs

model_config = ConfigDict(extra='allow', validate_default=True, validate_assignment=True)
Expand All @@ -575,17 +574,6 @@ def _validate_id(cls, id: str):
'starting with the namespace "urn:ngsi-ld:"')
return id

@classmethod
def _validate_properties(cls, data: Dict):
attrs = {}
for key, attr in data.items():
if key not in ContextEntity.model_fields:
if attr["type"] == DataTypeLD.RELATIONSHIP:
attrs[key] = ContextRelationship.model_validate(attr)
else:
attrs[key] = ContextProperty.model_validate(attr)
return attrs

def get_properties(self,
response_format: Union[str, PropertyFormat] =
PropertyFormat.LIST) -> \
Expand All @@ -603,30 +591,26 @@ def get_properties(self,
if response_format == PropertyFormat.DICT:
final_dict = {}
for key, value in self.model_dump(exclude_unset=True).items():
if key not in ContextLDEntity.model_fields:
try:
if value.get('type') != DataTypeLD.RELATIONSHIP:
try:
final_dict[key] = ContextGeoProperty(**value)
except ValueError: # if context attribute
final_dict[key] = ContextProperty(**value)
except AttributeError:
if isinstance(value, list):
pass
if key not in ContextLDEntity.get_model_fields_set():
if value.get('type') != DataTypeLD.RELATIONSHIP:
if value.get('type') == DataTypeLD.GEOPROPERTY:
final_dict[key] = ContextGeoProperty(**value)
elif value.get('type') == DataTypeLD.PROPERTY:
final_dict[key] = ContextProperty(**value)
else: # named context property by default
final_dict[key] = ContextProperty(**value)
return final_dict
# response format list:
final_list = []
for key, value in self.model_dump(exclude_unset=True).items():
if key not in ContextLDEntity.model_fields:
try:
if value.get('type') != DataTypeLD.RELATIONSHIP:
try:
final_list.append(NamedContextGeoProperty(name=key, **value))
except ValueError: # if context attribute
final_list.append(NamedContextProperty(name=key, **value))
except AttributeError:
if isinstance(value, list):
pass
if key not in ContextLDEntity.get_model_fields_set():
if value.get('type') != DataTypeLD.RELATIONSHIP:
if value.get('type') == DataTypeLD.GEOPROPERTY:
final_list.append(NamedContextGeoProperty(name=key, **value))
elif value.get('type') == DataTypeLD.PROPERTY:
final_list.append(NamedContextProperty(name=key, **value))
else: # named context property by default
final_list.append(NamedContextProperty(name=key, **value))
return final_list

def add_attributes(self, **kwargs):
Expand Down Expand Up @@ -691,6 +675,22 @@ def delete_properties(self, props: Union[Dict[str, ContextProperty],
for name in names:
delattr(self, name)

def add_geo_properties(self, attrs: Union[Dict[str, ContextGeoProperty],
List[NamedContextGeoProperty]]) -> None:
"""
Add property to entity
Args:
attrs:
Returns:
None
"""
if isinstance(attrs, list):
attrs = {attr.name: ContextGeoProperty(**attr.model_dump(exclude={'name'},
exclude_unset=True))
for attr in attrs}
for key, attr in attrs.items():
self.__setattr__(name=key, value=attr)

def add_properties(self, attrs: Union[Dict[str, ContextProperty],
List[NamedContextProperty]]) -> None:
"""
Expand Down Expand Up @@ -741,7 +741,7 @@ def get_relationships(self,
if response_format == PropertyFormat.DICT:
final_dict = {}
for key, value in self.model_dump(exclude_unset=True).items():
if key not in ContextLDEntity.model_fields:
if key not in ContextLDEntity.get_model_fields_set():
try:
if value.get('type') == DataTypeLD.RELATIONSHIP:
final_dict[key] = ContextRelationship(**value)
Expand All @@ -752,13 +752,9 @@ def get_relationships(self,
# response format list:
final_list = []
for key, value in self.model_dump(exclude_unset=True).items():
if key not in ContextLDEntity.model_fields:
try:
if value.get('type') == DataTypeLD.RELATIONSHIP:
final_list.append(NamedContextRelationship(name=key, **value))
except AttributeError: # if context attribute
if isinstance(value, list):
pass
if key not in ContextLDEntity.get_model_fields_set():
if value.get('type') == DataTypeLD.RELATIONSHIP:
final_list.append(NamedContextRelationship(name=key, **value))
return final_list

def get_context(self):
Expand All @@ -769,15 +765,12 @@ def get_context(self):
Returns: context of the entity as list
"""
found_list = False
for key, value in self.model_dump(exclude_unset=True).items():
if key not in ContextLDEntity.model_fields:
if isinstance(value, list):
found_list = True
return value
if not found_list:
_, context = self.model_dump(include={"context"}).popitem()
if not context:
logging.warning("No context in entity")
return None
return None
else:
return context


class ActionTypeLD(str, Enum):
Expand Down
Loading

0 comments on commit 34355bf

Please sign in to comment.