From c61a71296471f0031bddee6e9157cbad629c01a3 Mon Sep 17 00:00:00 2001 From: "thomas.storek" <20579672+tstorek@users.noreply.github.com> Date: Fri, 29 Dec 2023 14:26:42 +0100 Subject: [PATCH 1/5] chore: removed exclude_defaults For #228 --- filip/clients/ngsi_v2/cb.py | 5 ----- filip/clients/ngsi_v2/quantumleap.py | 1 - 2 files changed, 6 deletions(-) diff --git a/filip/clients/ngsi_v2/cb.py b/filip/clients/ngsi_v2/cb.py index baa7546c..04a41841 100644 --- a/filip/clients/ngsi_v2/cb.py +++ b/filip/clients/ngsi_v2/cb.py @@ -234,7 +234,6 @@ def post_entity(self, url=url, headers=headers, json=entity.model_dump(exclude_unset=True, - exclude_defaults=True, exclude_none=True)) if res.ok: self.logger.info("Entity successfully posted!") @@ -1202,7 +1201,6 @@ def post_subscription(self, headers=headers, data=subscription.model_dump_json(exclude={'id'}, exclude_unset=True, - exclude_defaults=True, exclude_none=True), params=params) if res.ok: @@ -1276,7 +1274,6 @@ def update_subscription(self, headers=headers, data=subscription.model_dump_json(exclude={'id'}, exclude_unset=True, - exclude_defaults=False, exclude_none=True)) if res.ok: self.logger.info("Subscription successfully updated!") @@ -1360,7 +1357,6 @@ def post_registration(self, registration: Registration): headers=headers, data=registration.model_dump_json(exclude={'id'}, exclude_unset=True, - exclude_defaults=True, exclude_none=True)) if res.ok: self.logger.info("Registration successfully created!") @@ -1412,7 +1408,6 @@ def update_registration(self, registration: Registration): headers=headers, data=registration.model_dump_json(exclude={'id'}, exclude_unset=True, - exclude_defaults=True, exclude_none=True)) if res.ok: self.logger.info("Registration successfully updated!") diff --git a/filip/clients/ngsi_v2/quantumleap.py b/filip/clients/ngsi_v2/quantumleap.py index f76ed420..5804200b 100644 --- a/filip/clients/ngsi_v2/quantumleap.py +++ b/filip/clients/ngsi_v2/quantumleap.py @@ -116,7 +116,6 @@ def post_notification(self, notification: Message): data = [] for entity in notification.data: data.append(entity.model_dump(exclude_unset=True, - exclude_defaults=True, exclude_none=True)) data_set = { "data": data, From 4707d314db9c56e047a4275b812a4a1604c9c0bb Mon Sep 17 00:00:00 2001 From: "thomas.storek" <20579672+tstorek@users.noreply.github.com> Date: Fri, 29 Dec 2023 14:46:20 +0100 Subject: [PATCH 2/5] chore: duplicated pattern fix For #228 --- filip/models/base.py | 94 +++++++++++++++++++++++++++----------------- 1 file changed, 57 insertions(+), 37 deletions(-) diff --git a/filip/models/base.py b/filip/models/base.py index 0f9dbd0c..581d1c18 100644 --- a/filip/models/base.py +++ b/filip/models/base.py @@ -3,9 +3,10 @@ """ from aenum import Enum -from pydantic import ConfigDict, BaseModel, Field, BaseConfig, field_validator +from pydantic import ConfigDict, BaseModel, Field, field_validator -from filip.utils.validators import validate_fiware_service_path, validate_fiware_service +from filip.utils.validators import (validate_fiware_service_path, + validate_fiware_service) class NgsiVersion(str, Enum): @@ -14,6 +15,7 @@ class NgsiVersion(str, Enum): Note: Currently, the library only supports functionality for NGSI-v2 """ + v2 = "v2" ld = "ld" @@ -27,41 +29,58 @@ class DataType(str, Enum): https://schema.org/DataType """ - _init_ = 'value __doc__' + + _init_ = "value __doc__" BOOLEAN = "Boolean", "True or False." DATE = "Date", "A date value in ISO 8601 date format." - DATETIME = "DateTime", "A combination of date and time of day in the form "\ - "[-]CCYY-MM-DDThh:mm:ss[Z|(+|-)hh:mm] " \ - "(see Chapter 5.4 of ISO 8601)." - NUMBER = "Number", "Use values from 0123456789 (Unicode 'DIGIT ZERO' " \ - "(U+0030) to 'DIGIT NINE' (U+0039)) rather than " \ - "superficially similiar Unicode symbols. Use '.' " \ - "(Unicode 'FULL STOP' (U+002E)) rather than ',' to " \ - "indicate a decimal point. Avoid using these symbols " \ - "as a readability separator." + DATETIME = ( + "DateTime", + "A combination of date and time of day in the form " + "[-]CCYY-MM-DDThh:mm:ss[Z|(+|-)hh:mm] " + "(see Chapter 5.4 of ISO 8601).", + ) + NUMBER = ( + "Number", + "Use values from 0123456789 (Unicode 'DIGIT ZERO' " + "(U+0030) to 'DIGIT NINE' (U+0039)) rather than " + "superficially similiar Unicode symbols. Use '.' " + "(Unicode 'FULL STOP' (U+002E)) rather than ',' to " + "indicate a decimal point. Avoid using these symbols " + "as a readability separator.", + ) INTEGER = "Integer", "Integer number" FLOAT = "Float", "Floating number. Please check 'DataType.Number'" TEXT = "Text", "https://schema.org/Text" - TIME = "Time", "A point in time recurring on multiple days in the form " \ - "hh:mm:ss[Z|(+|-)hh:mm] (see XML schema for details)." + TIME = ( + "Time", + "A point in time recurring on multiple days in the form " + "hh:mm:ss[Z|(+|-)hh:mm] (see XML schema for details).", + ) RELATIONSHIP = "Relationship", "Reference to another context entity" - STRUCTUREDVALUE = "StructuredValue", "Structered datatype must be " \ - "serializable" + STRUCTUREDVALUE = "StructuredValue", ("Structured datatype must be " + "serializable") ARRAY = "Array", "Array of the types above" COMMAND = "command", "A command for IoT Devices" - COMMAND_RESULT = "commandResult", "An entity containing a command, " \ - "contains an autogenerated attribute" \ - "of this type" - COMMAND_STATUS = "commandStatus", "An entity containing a command, " \ - "contains an autogenerated attribute " \ - "of this type" + COMMAND_RESULT = ( + "commandResult", + "An entity containing a command, " + "contains an autogenerated attribute" + "of this type", + ) + COMMAND_STATUS = ( + "commandStatus", + "An entity containing a command, " + "contains an autogenerated attribute " + "of this type", + ) class PaginationMethod(str, Enum): """ Options for the internal pagination methods """ + GET = "GET" POST = "POST" @@ -72,6 +91,7 @@ class FiwareHeader(BaseModel): Context Brokers to support hierarchical scopes: https://fiware-orion.readthedocs.io/en/master/user/service_path/index.html """ + model_config = ConfigDict(populate_by_name=True, validate_assignment=True) service: str = Field( @@ -79,7 +99,7 @@ class FiwareHeader(BaseModel): default="", max_length=50, description="Fiware service used for multi-tenancy", - pattern=r"\w*$" + pattern=r"\w*$", ) service_path: str = Field( alias="fiware-servicepath", @@ -87,39 +107,39 @@ class FiwareHeader(BaseModel): description="Fiware service path", max_length=51, ) - valid_service = field_validator("service")( - validate_fiware_service) - valid_service_path = field_validator("service_path")( - validate_fiware_service_path) + valid_service = field_validator("service")(validate_fiware_service) + valid_service_path = field_validator("service_path")(validate_fiware_service_path) class FiwareHeaderSecure(FiwareHeader): """ - Defines entity service paths and a autorization via Baerer-Token which are supported by the NGSI + Defines entity service paths and an autorization via Baerer-Token which are + supported by the NGSI Context Brokers to support hierarchical scopes: https://fiware-orion.readthedocs.io/en/master/user/service_path/index.html """ + authorization: str = Field( alias="authorization", default="", max_length=3000, description="authorization key", - regex=r".*" + pattern=r".*", ) class LogLevel(str, Enum): - CRITICAL = 'CRITICAL' - ERROR = 'ERROR' - WARNING = 'WARNING' - INFO = 'INFO' - DEBUG = 'DEBUG' - NOTSET = 'NOTSET' + CRITICAL = "CRITICAL" + ERROR = "ERROR" + WARNING = "WARNING" + INFO = "INFO" + DEBUG = "DEBUG" + NOTSET = "NOTSET" @classmethod def _missing_name_(cls, name): """ - Class method to realize case insensitive args + Class method to realize case-insensitive args Args: name: missing argument @@ -129,4 +149,4 @@ def _missing_name_(cls, name): """ for member in cls: if member.value.casefold() == name.casefold(): - return member \ No newline at end of file + return member From 5dae2ce8d882707155f29d8037203fb436a49cfd Mon Sep 17 00:00:00 2001 From: "thomas.storek" <20579672+tstorek@users.noreply.github.com> Date: Fri, 29 Dec 2023 14:49:37 +0100 Subject: [PATCH 3/5] chore: corrected some typos For #228 --- filip/models/base.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/filip/models/base.py b/filip/models/base.py index 581d1c18..3d61dde0 100644 --- a/filip/models/base.py +++ b/filip/models/base.py @@ -44,7 +44,7 @@ class DataType(str, Enum): "Number", "Use values from 0123456789 (Unicode 'DIGIT ZERO' " "(U+0030) to 'DIGIT NINE' (U+0039)) rather than " - "superficially similiar Unicode symbols. Use '.' " + "superficially similar Unicode symbols. Use '.' " "(Unicode 'FULL STOP' (U+002E)) rather than ',' to " "indicate a decimal point. Avoid using these symbols " "as a readability separator.", @@ -113,7 +113,7 @@ class FiwareHeader(BaseModel): class FiwareHeaderSecure(FiwareHeader): """ - Defines entity service paths and an autorization via Baerer-Token which are + Defines entity service paths and an authorization via Bearer-Token which are supported by the NGSI Context Brokers to support hierarchical scopes: https://fiware-orion.readthedocs.io/en/master/user/service_path/index.html From d9efbd52c1895919fd068420536e5c21810a080c Mon Sep 17 00:00:00 2001 From: "thomas.storek" <20579672+tstorek@users.noreply.github.com> Date: Fri, 29 Dec 2023 20:27:14 +0100 Subject: [PATCH 4/5] chore: removed exclude_unset wherever it seemed safe For #228 --- filip/clients/ngsi_v2/cb.py | 1013 +++++++++++++------------- filip/clients/ngsi_v2/iota.py | 8 +- filip/clients/ngsi_v2/quantumleap.py | 3 +- filip/models/ngsi_v2/context.py | 14 +- 4 files changed, 536 insertions(+), 502 deletions(-) diff --git a/filip/clients/ngsi_v2/cb.py b/filip/clients/ngsi_v2/cb.py index 04a41841..7d773c96 100644 --- a/filip/clients/ngsi_v2/cb.py +++ b/filip/clients/ngsi_v2/cb.py @@ -6,10 +6,7 @@ from copy import deepcopy from math import inf from pkg_resources import parse_version -from pydantic import \ - PositiveInt, \ - PositiveFloat, \ - AnyHttpUrl +from pydantic import PositiveInt, PositiveFloat, AnyHttpUrl from pydantic.type_adapter import TypeAdapter from typing import Any, Dict, List, Optional, TYPE_CHECKING, Union import re @@ -20,17 +17,18 @@ from filip.config import settings from filip.models.base import FiwareHeader, PaginationMethod from filip.utils.simple_ql import QueryString -from filip.models.ngsi_v2.context import \ - ActionType, \ - Command, \ - ContextEntity, \ - ContextEntityKeyValues, \ - ContextAttribute, \ - NamedCommand, \ - NamedContextAttribute, \ - Query, \ - Update, \ - PropertyFormat +from filip.models.ngsi_v2.context import ( + ActionType, + Command, + ContextEntity, + ContextEntityKeyValues, + ContextAttribute, + NamedCommand, + NamedContextAttribute, + Query, + Update, + PropertyFormat, +) from filip.models.ngsi_v2.base import AttrsFormat from filip.models.ngsi_v2.subscriptions import Subscription, Message from filip.models.ngsi_v2.registrations import Registration @@ -54,12 +52,14 @@ class ContextBrokerClient(BaseHttpClient): other brokers may show slightly different behavior! """ - def __init__(self, - url: str = None, - *, - session: requests.Session = None, - fiware_header: FiwareHeader = None, - **kwargs): + def __init__( + self, + url: str = None, + *, + session: requests.Session = None, + fiware_header: FiwareHeader = None, + **kwargs, + ): """ Args: @@ -70,19 +70,20 @@ def __init__(self, """ # set service url url = url or settings.CB_URL - super().__init__(url=url, - session=session, - fiware_header=fiware_header, - **kwargs) - - def __pagination(self, - *, - method: PaginationMethod = PaginationMethod.GET, - url: str, - headers: Dict, - limit: Union[PositiveInt, PositiveFloat] = None, - params: Dict = None, - data: str = None) -> List[Dict]: + super().__init__( + url=url, session=session, fiware_header=fiware_header, **kwargs + ) + + def __pagination( + self, + *, + method: PaginationMethod = PaginationMethod.GET, + url: str, + headers: Dict, + limit: Union[PositiveInt, PositiveFloat] = None, + params: Dict = None, + data: str = None, + ) -> List[Dict]: """ NGSIv2 implements a pagination mechanism in order to help clients to retrieve large sets of resources. This mechanism works for all listing @@ -105,39 +106,39 @@ def __pagination(self, if limit is None: limit = inf if limit > 1000: - params['limit'] = 1000 # maximum items per request + params["limit"] = 1000 # maximum items per request else: - params['limit'] = limit + params["limit"] = limit if self.session: session = self.session else: session = requests.Session() with session: - res = session.request(method=method, - url=url, - params=params, - headers=headers, - data=data) + res = session.request( + method=method, url=url, params=params, headers=headers, data=data + ) if res.ok: items = res.json() # do pagination - count = int(res.headers['Fiware-Total-Count']) + count = int(res.headers["Fiware-Total-Count"]) while len(items) < limit and len(items) < count: # Establishing the offset from where entities are retrieved - params['offset'] = len(items) - params['limit'] = min(1000, (limit - len(items))) - res = session.request(method=method, - url=url, - params=params, - headers=headers, - data=data) + params["offset"] = len(items) + params["limit"] = min(1000, (limit - len(items))) + res = session.request( + method=method, + url=url, + params=params, + headers=headers, + data=data, + ) if res.ok: items.extend(res.json()) else: res.raise_for_status() - self.logger.debug('Received: %s', items) + self.logger.debug("Received: %s", items) return items res.raise_for_status() @@ -148,7 +149,7 @@ def get_version(self) -> Dict: Returns: Dictionary with response """ - url = urljoin(self.base_url, '/version') + url = urljoin(self.base_url, "/version") try: res = self.get(url=url, headers=self.headers) if res.ok: @@ -165,7 +166,7 @@ def get_resources(self) -> Dict: Returns: Dict """ - url = urljoin(self.base_url, '/v2') + url = urljoin(self.base_url, "/v2") try: res = self.get(url=url, headers=self.headers) if res.ok: @@ -182,7 +183,7 @@ def get_statistics(self) -> Dict: Returns: Dictionary with response """ - url = urljoin(self.base_url, 'statistics') + url = urljoin(self.base_url, "statistics") try: res = self.get(url=url, headers=self.headers) if res.ok: @@ -194,12 +195,13 @@ def get_statistics(self) -> Dict: # CONTEXT MANAGEMENT API ENDPOINTS # Entity Operations - def post_entity(self, - entity: ContextEntity, - update: bool = False, - patch: bool = False, - override_attr_metadata: bool = True - ): + def post_entity( + self, + entity: ContextEntity, + update: bool = False, + patch: bool = False, + override_attr_metadata: bool = True, + ): """ Function registers an Object with the NGSI Context Broker, if it already exists it can be automatically updated (overwritten) @@ -219,58 +221,53 @@ def post_entity(self, If the response.status_code is 422, whether the override and existing entity patch (bool): - If the response.status_code is 422, whether the manipulate the + If the response.status_code is 422, whether to manipulate the existing entity. Omitted if update `True`. override_attr_metadata: Only applies for patch equal to `True`. - Whether to override or append the attributes metadata. + Whether to override or append the attribute's metadata. `True` for overwrite or `False` for update/append """ - url = urljoin(self.base_url, 'v2/entities') + url = urljoin(self.base_url, "v2/entities") headers = self.headers.copy() try: res = self.post( - url=url, - headers=headers, - json=entity.model_dump(exclude_unset=True, - exclude_none=True)) + url=url, headers=headers, json=entity.model_dump(exclude_none=True) + ) if res.ok: self.logger.info("Entity successfully posted!") - return res.headers.get('Location') + return res.headers.get("Location") res.raise_for_status() except requests.RequestException as err: if update and err.response.status_code == 422: - return self.update_entity( - entity=entity) + return self.update_entity(entity=entity) if patch and err.response.status_code == 422: return self.patch_entity( - entity=entity, - override_attr_metadata=override_attr_metadata) + entity=entity, override_attr_metadata=override_attr_metadata + ) msg = f"Could not post entity {entity.id}" self.log_error(err=err, msg=msg) raise - def get_entity_list(self, - *, - entity_ids: List[str] = None, - entity_types: List[str] = None, - id_pattern: str = None, - type_pattern: str = None, - q: Union[str, QueryString] = None, - mq: Union[str, QueryString] = None, - georel: str = None, - geometry: str = None, - coords: str = None, - limit: PositiveInt = inf, - attrs: List[str] = None, - metadata: str = None, - order_by: str = None, - response_format: Union[AttrsFormat, str] = - AttrsFormat.NORMALIZED - ) -> List[Union[ContextEntity, - ContextEntityKeyValues, - Dict[str, Any]]]: + def get_entity_list( + self, + *, + entity_ids: List[str] = None, + entity_types: List[str] = None, + id_pattern: str = None, + type_pattern: str = None, + q: Union[str, QueryString] = None, + mq: Union[str, QueryString] = None, + georel: str = None, + geometry: str = None, + coords: str = None, + limit: PositiveInt = inf, + attrs: List[str] = None, + metadata: str = None, + order_by: str = None, + response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED, + ) -> List[Union[ContextEntity, ContextEntityKeyValues, Dict[str, Any]]]: r""" Retrieves a list of context entities that match different criteria by id, type, pattern matching (either id or type) and/or those which @@ -328,7 +325,7 @@ def get_entity_list(self, Returns: """ - url = urljoin(self.base_url, 'v2/entities/') + url = urljoin(self.base_url, "v2/entities/") headers = self.headers.copy() params = {} @@ -339,51 +336,53 @@ def get_entity_list(self, if entity_ids: if not isinstance(entity_ids, list): entity_ids = [entity_ids] - params.update({'id': ','.join(entity_ids)}) + params.update({"id": ",".join(entity_ids)}) if id_pattern: try: re.compile(id_pattern) except re.error as err: - raise ValueError(f'Invalid Pattern: {err}') from err - params.update({'idPattern': id_pattern}) + raise ValueError(f"Invalid Pattern: {err}") from err + params.update({"idPattern": id_pattern}) if entity_types: if not isinstance(entity_types, list): entity_types = [entity_types] - params.update({'type': ','.join(entity_types)}) + params.update({"type": ",".join(entity_types)}) if type_pattern: try: re.compile(type_pattern) except re.error as err: - raise ValueError(f'Invalid Pattern: {err.msg}') from err - params.update({'typePattern': type_pattern}) + raise ValueError(f"Invalid Pattern: {err.msg}") from err + params.update({"typePattern": type_pattern}) if attrs: - params.update({'attrs': ','.join(attrs)}) + params.update({"attrs": ",".join(attrs)}) if metadata: - params.update({'metadata': ','.join(metadata)}) + params.update({"metadata": ",".join(metadata)}) if q: if isinstance(q, str): q = QueryString.parse_str(q) - params.update({'q': str(q)}) + params.update({"q": str(q)}) if mq: - params.update({'mq': str(mq)}) + params.update({"mq": str(mq)}) if geometry: - params.update({'geometry': geometry}) + params.update({"geometry": geometry}) if georel: - params.update({'georel': georel}) + params.update({"georel": georel}) if coords: - params.update({'coords': coords}) + params.update({"coords": coords}) if order_by: - params.update({'orderBy': order_by}) + params.update({"orderBy": order_by}) if response_format not in list(AttrsFormat): - raise ValueError(f'Value must be in {list(AttrsFormat)}') - response_format = ','.join(['count', response_format]) - params.update({'options': response_format}) + raise ValueError(f"Value must be in {list(AttrsFormat)}") + response_format = ",".join(["count", response_format]) + params.update({"options": response_format}) try: - items = self.__pagination(method=PaginationMethod.GET, - limit=limit, - url=url, - params=params, - headers=headers) + items = self.__pagination( + method=PaginationMethod.GET, + limit=limit, + url=url, + params=params, + headers=headers, + ) if AttrsFormat.NORMALIZED in response_format: adapter = TypeAdapter(List[ContextEntity]) return adapter.validate_python(items) @@ -397,14 +396,14 @@ def get_entity_list(self, self.log_error(err=err, msg=msg) raise - def get_entity(self, - entity_id: str, - entity_type: str = None, - attrs: List[str] = None, - metadata: List[str] = None, - response_format: Union[AttrsFormat, str] = - AttrsFormat.NORMALIZED) \ - -> Union[ContextEntity, ContextEntityKeyValues, Dict[str, Any]]: + def get_entity( + self, + entity_id: str, + entity_type: str = None, + attrs: List[str] = None, + metadata: List[str] = None, + response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED, + ) -> Union[ContextEntity, ContextEntityKeyValues, Dict[str, Any]]: """ This operation must return one entity element only, but there may be more than one entity with the same ID (e.g. entities with same ID but @@ -431,18 +430,18 @@ def get_entity(self, Returns: ContextEntity """ - url = urljoin(self.base_url, f'v2/entities/{entity_id}') + url = urljoin(self.base_url, f"v2/entities/{entity_id}") headers = self.headers.copy() params = {} if entity_type: - params.update({'type': entity_type}) + params.update({"type": entity_type}) if attrs: - params.update({'attrs': ','.join(attrs)}) + params.update({"attrs": ",".join(attrs)}) if metadata: - params.update({'metadata': ','.join(metadata)}) + params.update({"metadata": ",".join(metadata)}) if response_format not in list(AttrsFormat): - raise ValueError(f'Value must be in {list(AttrsFormat)}') - params.update({'options': response_format}) + raise ValueError(f"Value must be in {list(AttrsFormat)}") + params.update({"options": response_format}) try: res = self.get(url=url, params=params, headers=headers) @@ -460,14 +459,14 @@ def get_entity(self, self.log_error(err=err, msg=msg) raise - def get_entity_attributes(self, - entity_id: str, - entity_type: str = None, - attrs: List[str] = None, - metadata: List[str] = None, - response_format: Union[AttrsFormat, str] = - AttrsFormat.NORMALIZED) -> \ - Dict[str, ContextAttribute]: + def get_entity_attributes( + self, + entity_id: str, + entity_type: str = None, + attrs: List[str] = None, + metadata: List[str] = None, + response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED, + ) -> Dict[str, ContextAttribute]: """ This request is similar to retrieving the whole entity, however this one omits the id and type fields. Just like the general request of @@ -496,24 +495,26 @@ def get_entity_attributes(self, Returns: Dict """ - url = urljoin(self.base_url, f'v2/entities/{entity_id}/attrs') + url = urljoin(self.base_url, f"v2/entities/{entity_id}/attrs") headers = self.headers.copy() params = {} if entity_type: - params.update({'type': entity_type}) + params.update({"type": entity_type}) if attrs: - params.update({'attrs': ','.join(attrs)}) + params.update({"attrs": ",".join(attrs)}) if metadata: - params.update({'metadata': ','.join(metadata)}) + params.update({"metadata": ",".join(metadata)}) if response_format not in list(AttrsFormat): - raise ValueError(f'Value must be in {list(AttrsFormat)}') - params.update({'options': response_format}) + raise ValueError(f"Value must be in {list(AttrsFormat)}") + params.update({"options": response_format}) try: res = self.get(url=url, params=params, headers=headers) if res.ok: if response_format == AttrsFormat.NORMALIZED: - return {key: ContextAttribute(**values) - for key, values in res.json().items()} + return { + key: ContextAttribute(**values) + for key, values in res.json().items() + } return res.json() res.raise_for_status() except requests.RequestException as err: @@ -521,10 +522,7 @@ def get_entity_attributes(self, self.log_error(err=err, msg=msg) raise - def update_entity(self, - entity: ContextEntity, - append_strict: bool = False - ): + def update_entity(self, entity: ContextEntity, append_strict: bool = False): """ The request payload is an object representing the attributes to append or update. @@ -547,18 +545,21 @@ def update_entity(self, Returns: None """ - self.update_or_append_entity_attributes(entity_id=entity.id, - entity_type=entity.type, - attrs=entity.get_properties(), - append_strict=append_strict) - - def delete_entity(self, - entity_id: str, - entity_type: str, - delete_devices: bool = False, - iota_client: IoTAClient = None, - iota_url: AnyHttpUrl = settings.IOTA_URL) -> None: - + self.update_or_append_entity_attributes( + entity_id=entity.id, + entity_type=entity.type, + attrs=entity.get_properties(), + append_strict=append_strict, + ) + + def delete_entity( + self, + entity_id: str, + entity_type: str, + delete_devices: bool = False, + iota_client: IoTAClient = None, + iota_url: AnyHttpUrl = settings.IOTA_URL, + ) -> None: """ Remove a entity from the context broker. No payload is required or received. @@ -581,9 +582,9 @@ def delete_entity(self, Returns: None """ - url = urljoin(self.base_url, f'v2/entities/{entity_id}') + url = urljoin(self.base_url, f"v2/entities/{entity_id}") headers = self.headers.copy() - params = {'type': entity_type} + params = {"type": entity_type} try: res = self.delete(url=url, params=params, headers=headers) @@ -598,20 +599,23 @@ def delete_entity(self, if delete_devices: from filip.clients.ngsi_v2 import IoTAClient + if iota_client: iota_client_local = deepcopy(iota_client) else: - warnings.warn("No IoTA-Client object provided! " - "Will try to generate one. " - "This usage is not recommended.") + warnings.warn( + "No IoTA-Client object provided! " + "Will try to generate one. " + "This usage is not recommended." + ) iota_client_local = IoTAClient( url=iota_url, fiware_header=self.fiware_headers, - headers=self.headers) + headers=self.headers, + ) - for device in iota_client_local.get_device_list( - entity_names=[entity_id]): + for device in iota_client_local.get_device_list(entity_names=[entity_id]): if device.entity_type == entity_type: iota_client_local.delete_device(device_id=device.device_id) @@ -638,11 +642,15 @@ def delete_entities(self, entities: List[ContextEntity]) -> None: # attributes. entities_with_attributes: List[ContextEntity] = [] for entity in entities: - attribute_names = [key for key in entity.model_dump() if key not in - ContextEntity.model_fields] + attribute_names = [ + key + for key in entity.model_dump() + if key not in ContextEntity.model_fields + ] if len(attribute_names) > 0: entities_with_attributes.append( - ContextEntity(id=entity.id, type=entity.type)) + ContextEntity(id=entity.id, type=entity.type) + ) # Post update_delete for those without attribute only once, # for the other post update_delete again but for the changed entity @@ -653,12 +661,12 @@ def delete_entities(self, entities: List[ContextEntity]) -> None: self.update(entities=entities_with_attributes, action_type="delete") def update_or_append_entity_attributes( - self, - entity_id: str, - entity_type: str, - attrs: List[Union[NamedContextAttribute, - Dict[str, ContextAttribute]]], - append_strict: bool = False): + self, + entity_id: str, + entity_type: str, + attrs: List[Union[NamedContextAttribute, Dict[str, ContextAttribute]]], + append_strict: bool = False, + ): """ The request payload is an object representing the attributes to append or update. This corresponds to a 'POST' request if append is @@ -687,46 +695,46 @@ def update_or_append_entity_attributes( None """ - url = urljoin(self.base_url, f'v2/entities/{entity_id}/attrs') + url = urljoin(self.base_url, f"v2/entities/{entity_id}/attrs") headers = self.headers.copy() params = {} if entity_type: - params.update({'type': entity_type}) + params.update({"type": entity_type}) if append_strict: - params.update({'options': 'append'}) + params.update({"options": "append"}) - entity = ContextEntity(id=entity_id, - type=entity_type) + entity = ContextEntity(id=entity_id, type=entity_type) entity.add_attributes(attrs) # exclude commands from the send data, # as they live in the IoTA-agent - excluded_keys = {'id', 'type'} + excluded_keys = {"id", "type"} excluded_keys.update( - entity.get_commands(response_format=PropertyFormat.DICT).keys()) + entity.get_commands(response_format=PropertyFormat.DICT).keys() + ) try: - res = self.post(url=url, - headers=headers, - json=entity.model_dump(exclude=excluded_keys, - exclude_unset=True, - exclude_none=True), - params=params) + res = self.post( + url=url, + headers=headers, + json=entity.model_dump( + exclude=excluded_keys, exclude_unset=True, exclude_none=True + ), + params=params, + ) if res.ok: - self.logger.info("Entity '%s' successfully " - "updated!", entity.id) + self.logger.info("Entity '%s' successfully " "updated!", entity.id) else: res.raise_for_status() except requests.RequestException as err: - msg = f"Could not update or append attributes of entity" \ - f" {entity.id} !" + msg = f"Could not update or append attributes of entity" f" {entity.id} !" self.log_error(err=err, msg=msg) raise def update_existing_entity_attributes( - self, - entity_id: str, - entity_type: str, - attrs: List[Union[NamedContextAttribute, - Dict[str, ContextAttribute]]]): + self, + entity_id: str, + entity_type: str, + attrs: List[Union[NamedContextAttribute, Dict[str, ContextAttribute]]], + ): """ The entity attributes are updated with the ones in the payload. In addition to that, if one or more attributes in the payload doesn't @@ -743,38 +751,37 @@ def update_existing_entity_attributes( None """ - url = urljoin(self.base_url, f'v2/entities/{entity_id}/attrs') + url = urljoin(self.base_url, f"v2/entities/{entity_id}/attrs") headers = self.headers.copy() params = {"type": entity_type} - entity = ContextEntity(id=entity_id, - type=entity_type) + entity = ContextEntity(id=entity_id, type=entity_type) entity.add_attributes(attrs) try: - res = self.patch(url=url, - headers=headers, - json=entity.model_dump(exclude={'id', 'type'}, - exclude_unset=True, - exclude_none=True), - params=params) + res = self.patch( + url=url, + headers=headers, + json=entity.model_dump( + exclude={"id", "type"}, exclude_unset=True, exclude_none=True + ), + params=params, + ) if res.ok: - self.logger.info("Entity '%s' successfully " - "updated!", entity.id) + self.logger.info("Entity '%s' successfully " "updated!", entity.id) else: res.raise_for_status() except requests.RequestException as err: - msg = f"Could not update attributes of entity" \ - f" {entity.id} !" + msg = f"Could not update attributes of entity" f" {entity.id} !" self.log_error(err=err, msg=msg) raise def replace_entity_attributes( - self, - entity_id: str, - entity_type: str, - attrs: List[Union[NamedContextAttribute, - Dict[str, ContextAttribute]]]): + self, + entity_id: str, + entity_type: str, + attrs: List[Union[NamedContextAttribute, Dict[str, ContextAttribute]]], + ): """ The attributes previously existing in the entity are removed and replaced by the ones in the request. This corresponds to a 'PUT' @@ -788,26 +795,26 @@ def replace_entity_attributes( Returns: None """ - url = urljoin(self.base_url, f'v2/entities/{entity_id}/attrs') + url = urljoin(self.base_url, f"v2/entities/{entity_id}/attrs") headers = self.headers.copy() params = {} if entity_type: - params.update({'type': entity_type}) + params.update({"type": entity_type}) - entity = ContextEntity(id=entity_id, - type=entity_type) + entity = ContextEntity(id=entity_id, type=entity_type) entity.add_attributes(attrs) try: - res = self.put(url=url, - headers=headers, - json=entity.model_dump(exclude={'id', 'type'}, - exclude_unset=True, - exclude_none=True), - params=params) + res = self.put( + url=url, + headers=headers, + json=entity.model_dump( + exclude={"id", "type"}, exclude_unset=True, exclude_none=True + ), + params=params, + ) if res.ok: - self.logger.info("Entity '%s' successfully " - "updated!", entity.id) + self.logger.info("Entity '%s' successfully " "updated!", entity.id) else: res.raise_for_status() except requests.RequestException as err: @@ -816,12 +823,14 @@ def replace_entity_attributes( raise # Attribute operations - def get_attribute(self, - entity_id: str, - attr_name: str, - entity_type: str = None, - metadata: str = None, - response_format='') -> ContextAttribute: + def get_attribute( + self, + entity_id: str, + attr_name: str, + entity_type: str = None, + metadata: str = None, + response_format="", + ) -> ContextAttribute: """ Retrieves a specified attribute from an entity. @@ -840,34 +849,35 @@ def get_attribute(self, Error """ - url = urljoin(self.base_url, - f'v2/entities/{entity_id}/attrs/{attr_name}') + url = urljoin(self.base_url, f"v2/entities/{entity_id}/attrs/{attr_name}") headers = self.headers.copy() params = {} if entity_type: - params.update({'type': entity_type}) + params.update({"type": entity_type}) if metadata: - params.update({'metadata': ','.join(metadata)}) + params.update({"metadata": ",".join(metadata)}) try: res = self.get(url=url, params=params, headers=headers) if res.ok: - self.logger.debug('Received: %s', res.json()) + self.logger.debug("Received: %s", res.json()) return ContextAttribute(**res.json()) res.raise_for_status() except requests.RequestException as err: - msg = f"Could not load attribute '{attr_name}' from entity" \ - f"'{entity_id}' " + msg = ( + f"Could not load attribute '{attr_name}' from entity" f"'{entity_id}' " + ) self.log_error(err=err, msg=msg) raise - def update_entity_attribute(self, - entity_id: str, - attr: Union[ContextAttribute, - NamedContextAttribute], - *, - entity_type: str = None, - attr_name: str = None, - override_metadata: bool = True): + def update_entity_attribute( + self, + entity_id: str, + attr: Union[ContextAttribute, NamedContextAttribute], + *, + entity_type: str = None, + attr_name: str = None, + override_metadata: bool = True, + ): """ Updates a specified attribute from an entity. @@ -879,6 +889,8 @@ def update_entity_attribute(self, entity_type: Entity type, to avoid ambiguity in case there are several entities with the same entity id. + attr_name: + Name of the attribute to be updated. override_metadata: Bool, if set to `True` (default) the metadata will be overwritten. This is for backwards compatibility reasons. @@ -886,49 +898,56 @@ def update_entity_attribute(self, already existing or append if not. See also: https://fiware-orion.readthedocs.io/en/master/user/metadata.html - """ headers = self.headers.copy() if not isinstance(attr, NamedContextAttribute): - assert attr_name is not None, "Missing name for attribute. " \ - "attr_name must be present if" \ - "attr is of type ContextAttribute" + assert attr_name is not None, ( + "Missing name for attribute. " + "attr_name must be present if" + "attr is of type ContextAttribute" + ) else: - assert attr_name is None, "Invalid argument attr_name. Do not set " \ - "attr_name if attr is of type " \ - "NamedContextAttribute" + assert attr_name is None, ( + "Invalid argument attr_name. Do not set " + "attr_name if attr is of type " + "NamedContextAttribute" + ) attr_name = attr.name - url = urljoin(self.base_url, - f'v2/entities/{entity_id}/attrs/{attr_name}') + url = urljoin(self.base_url, f"v2/entities/{entity_id}/attrs/{attr_name}") params = {} if entity_type: - params.update({'type': entity_type}) + params.update({"type": entity_type}) # set overrideMetadata option (we assure backwards compatibility here) if override_metadata: - params.update({'options': 'overrideMetadata'}) + params.update({"options": "overrideMetadata"}) try: - res = self.put(url=url, - headers=headers, - params=params, - json=attr.model_dump(exclude={'name'}, - exclude_unset=True, - exclude_none=True)) + res = self.put( + url=url, + headers=headers, + params=params, + json=attr.model_dump( + exclude={"name"}, exclude_unset=True, exclude_none=True + ), + ) if res.ok: - self.logger.info("Attribute '%s' of '%s' " - "successfully updated!", attr_name, entity_id) + self.logger.info( + "Attribute '%s' of '%s' " "successfully updated!", + attr_name, + entity_id, + ) else: res.raise_for_status() except requests.RequestException as err: - msg = f"Could not update attribute '{attr_name}' of entity" \ - f"'{entity_id}' " + msg = ( + f"Could not update attribute '{attr_name}' of entity" f"'{entity_id}' " + ) self.log_error(err=err, msg=msg) raise - def delete_entity_attribute(self, - entity_id: str, - attr_name: str, - entity_type: str = None) -> None: + def delete_entity_attribute( + self, entity_id: str, attr_name: str, entity_type: str = None + ) -> None: """ Removes a specified attribute from an entity. @@ -941,30 +960,32 @@ def delete_entity_attribute(self, Error """ - url = urljoin(self.base_url, - f'v2/entities/{entity_id}/attrs/{attr_name}') + url = urljoin(self.base_url, f"v2/entities/{entity_id}/attrs/{attr_name}") headers = self.headers.copy() params = {} if entity_type: - params.update({'type': entity_type}) + params.update({"type": entity_type}) try: res = self.delete(url=url, headers=headers) if res.ok: - self.logger.info("Attribute '%s' of '%s' " - "successfully deleted!", attr_name, entity_id) + self.logger.info( + "Attribute '%s' of '%s' " "successfully deleted!", + attr_name, + entity_id, + ) else: res.raise_for_status() except requests.RequestException as err: - msg = f"Could not delete attribute '{attr_name}' of entity" \ - f"'{entity_id}' " + msg = ( + f"Could not delete attribute '{attr_name}' of entity" f"'{entity_id}' " + ) self.log_error(err=err, msg=msg) raise # Attribute value operations - def get_attribute_value(self, - entity_id: str, - attr_name: str, - entity_type: str = None) -> Any: + def get_attribute_value( + self, entity_id: str, attr_name: str, entity_type: str = None + ) -> Any: """ This operation returns the value property with the value of the attribute. @@ -979,29 +1000,28 @@ def get_attribute_value(self, Returns: """ - url = urljoin(self.base_url, - f'v2/entities/{entity_id}/attrs/{attr_name}/value') + url = urljoin(self.base_url, f"v2/entities/{entity_id}/attrs/{attr_name}/value") headers = self.headers.copy() params = {} if entity_type: - params.update({'type': entity_type}) + params.update({"type": entity_type}) try: res = self.get(url=url, params=params, headers=headers) if res.ok: - self.logger.debug('Received: %s', res.json()) + self.logger.debug("Received: %s", res.json()) return res.json() res.raise_for_status() except requests.RequestException as err: - msg = f"Could not load value of attribute '{attr_name}' from " \ - f"entity'{entity_id}' " + msg = ( + f"Could not load value of attribute '{attr_name}' from " + f"entity'{entity_id}' " + ) self.log_error(err=err, msg=msg) raise - def update_attribute_value(self, *, - entity_id: str, - attr_name: str, - value: Any, - entity_type: str = None): + def update_attribute_value( + self, *, entity_id: str, attr_name: str, value: Any, entity_type: str = None + ): """ Updates the value of a specified attribute of an entity @@ -1015,43 +1035,39 @@ def update_attribute_value(self, *, Returns: """ - url = urljoin(self.base_url, - f'v2/entities/{entity_id}/attrs/{attr_name}/value') + url = urljoin(self.base_url, f"v2/entities/{entity_id}/attrs/{attr_name}/value") headers = self.headers.copy() params = {} if entity_type: - params.update({'type': entity_type}) + params.update({"type": entity_type}) try: if not isinstance(value, (dict, list)): - headers.update({'Content-Type': 'text/plain'}) + headers.update({"Content-Type": "text/plain"}) if isinstance(value, str): - value = f'{value}' - res = self.put(url=url, - headers=headers, - json=value, - params=params) + value = f"{value}" + res = self.put(url=url, headers=headers, json=value, params=params) else: - res = self.put(url=url, - headers=headers, - json=value, - params=params) + res = self.put(url=url, headers=headers, json=value, params=params) if res.ok: - self.logger.info("Attribute '%s' of '%s' " - "successfully updated!", attr_name, entity_id) + self.logger.info( + "Attribute '%s' of '%s' " "successfully updated!", + attr_name, + entity_id, + ) else: res.raise_for_status() except requests.RequestException as err: - msg = f"Could not update value of attribute '{attr_name}' from " \ - f"entity '{entity_id}' " + msg = ( + f"Could not update value of attribute '{attr_name}' from " + f"entity '{entity_id}' " + ) self.log_error(err=err, msg=msg) raise # Types Operations - def get_entity_types(self, - *, - limit: int = None, - offset: int = None, - options: str = None) -> List[Dict[str, Any]]: + def get_entity_types( + self, *, limit: int = None, offset: int = None, options: str = None + ) -> List[Dict[str, Any]]: """ Args: @@ -1062,19 +1078,19 @@ def get_entity_types(self, Returns: """ - url = urljoin(self.base_url, 'v2/types') + url = urljoin(self.base_url, "v2/types") headers = self.headers.copy() params = {} if limit: - params.update({'limit': limit}) + params.update({"limit": limit}) if offset: - params.update({'offset': offset}) + params.update({"offset": offset}) if options: - params.update({'options': options}) + params.update({"options": options}) try: res = self.get(url=url, params=params, headers=headers) if res.ok: - self.logger.debug('Received: %s', res.json()) + self.logger.debug("Received: %s", res.json()) return res.json() res.raise_for_status() except requests.RequestException as err: @@ -1091,24 +1107,22 @@ def get_entity_type(self, entity_type: str) -> Dict[str, Any]: Returns: """ - url = urljoin(self.base_url, f'v2/types/{entity_type}') + url = urljoin(self.base_url, f"v2/types/{entity_type}") headers = self.headers.copy() params = {} try: res = self.get(url=url, params=params, headers=headers) if res.ok: - self.logger.debug('Received: %s', res.json()) + self.logger.debug("Received: %s", res.json()) return res.json() res.raise_for_status() except requests.RequestException as err: - msg = f"Could not load entities of type" \ - f"'{entity_type}' " + msg = f"Could not load entities of type" f"'{entity_type}' " self.log_error(err=err, msg=msg) raise # SUBSCRIPTION API ENDPOINTS - def get_subscription_list(self, - limit: PositiveInt = inf) -> List[Subscription]: + def get_subscription_list(self, limit: PositiveInt = inf) -> List[Subscription]: """ Returns a list of all the subscriptions present in the system. Args: @@ -1116,18 +1130,17 @@ def get_subscription_list(self, Returns: list of subscriptions """ - url = urljoin(self.base_url, 'v2/subscriptions/') + url = urljoin(self.base_url, "v2/subscriptions/") headers = self.headers.copy() params = {} # We always use the 'count' option to check weather pagination is # required - params.update({'options': 'count'}) + params.update({"options": "count"}) try: - items = self.__pagination(limit=limit, - url=url, - params=params, - headers=headers) + items = self.__pagination( + limit=limit, url=url, params=params, headers=headers + ) adapter = TypeAdapter(List[Subscription]) return adapter.validate_python(items) except requests.RequestException as err: @@ -1135,10 +1148,12 @@ def get_subscription_list(self, self.log_error(err=err, msg=msg) raise - def post_subscription(self, - subscription: Subscription, - update: bool = False, - skip_initial_notification: bool = False) -> str: + def post_subscription( + self, + subscription: Subscription, + update: bool = False, + skip_initial_notification: bool = False, + ) -> str: """ Creates a new subscription. The subscription is represented by a Subscription object defined in filip.cb.models. @@ -1155,7 +1170,7 @@ def post_subscription(self, update: True - If the subscription already exists, update it False- If the subscription already exists, throw warning skip_initial_notification: True - Initial Notifications will be - send to recipient containing the whole data. This is + sent to recipient containing the whole data. This is deprecated and removed from version 3.0 of the context broker. False - skip the initial notification Returns: @@ -1164,48 +1179,50 @@ def post_subscription(self, """ existing_subscriptions = self.get_subscription_list() - sub_hash = subscription.model_dump_json(include={'subject', 'notification'}) + sub_hash = subscription.model_dump_json(include={"subject", "notification"}) for ex_sub in existing_subscriptions: - if sub_hash == ex_sub.model_dump_json(include={'subject', 'notification'}): + if sub_hash == ex_sub.model_dump_json(include={"subject", "notification"}): self.logger.info("Subscription already exists") if update: self.logger.info("Updated subscription") subscription.id = ex_sub.id self.update_subscription(subscription) else: - warnings.warn(f"Subscription existed already with the id" - f" {ex_sub.id}") + warnings.warn( + f"Subscription existed already with the id" f" {ex_sub.id}" + ) return ex_sub.id params = {} if skip_initial_notification: - version = self.get_version()['orion']['version'] - if parse_version(version) <= parse_version('3.1'): - params.update({'options': "skipInitialNotification"}) + version = self.get_version()["orion"]["version"] + if parse_version(version) <= parse_version("3.1"): + params.update({"options": "skipInitialNotification"}) else: pass - warnings.warn(f"Skip initial notifications is a deprecated " - f"feature of older versions <=3.1 of the context " - f"broker. The Context Broker that you requesting has " - f"version: {version}. For newer versions we " - f"automatically skip this option. Consider " - f"refactoring and updating your services", - DeprecationWarning) - - url = urljoin(self.base_url, 'v2/subscriptions') + warnings.warn( + f"Skip initial notifications is a deprecated " + f"feature of older versions <=3.1 of the context " + f"broker. The Context Broker that you requesting has " + f"version: {version}. For newer versions we " + f"automatically skip this option. Consider " + f"refactoring and updating your services", + DeprecationWarning, + ) + + url = urljoin(self.base_url, "v2/subscriptions") headers = self.headers.copy() - headers.update({'Content-Type': 'application/json'}) + headers.update({"Content-Type": "application/json"}) try: res = self.post( url=url, headers=headers, - data=subscription.model_dump_json(exclude={'id'}, - exclude_unset=True, - exclude_none=True), - params=params) + data=subscription.model_dump_json(exclude={"id"}, exclude_none=True), + params=params, + ) if res.ok: self.logger.info("Subscription successfully created!") - return res.headers['Location'].split('/')[-1] + return res.headers["Location"].split("/")[-1] res.raise_for_status() except requests.RequestException as err: msg = "Could not send subscription!" @@ -1221,12 +1238,12 @@ def get_subscription(self, subscription_id: str) -> Subscription: Returns: """ - url = urljoin(self.base_url, f'v2/subscriptions/{subscription_id}') + url = urljoin(self.base_url, f"v2/subscriptions/{subscription_id}") headers = self.headers.copy() try: res = self.get(url=url, headers=headers) if res.ok: - self.logger.debug('Received: %s', res.json()) + self.logger.debug("Received: %s", res.json()) return Subscription(**res.json()) res.raise_for_status() except requests.RequestException as err: @@ -1234,16 +1251,16 @@ def get_subscription(self, subscription_id: str) -> Subscription: self.log_error(err=err, msg=msg) raise - def update_subscription(self, - subscription: Subscription, - skip_initial_notification: bool = False): + def update_subscription( + self, subscription: Subscription, skip_initial_notification: bool = False + ): """ Only the fields included in the request are updated in the subscription. Args: subscription: Subscription to update skip_initial_notification: True - Initial Notifications will be - send to recipient containing the whole data. This is + sent to recipient containing the whole data. This is deprecated and removed from version 3.0 of the context broker. False - skip the initial notification @@ -1252,29 +1269,32 @@ def update_subscription(self, """ params = {} if skip_initial_notification: - version = self.get_version()['orion']['version'] - if parse_version(version) <= parse_version('3.1'): - params.update({'options': "skipInitialNotification"}) + version = self.get_version()["orion"]["version"] + if parse_version(version) <= parse_version("3.1"): + params.update({"options": "skipInitialNotification"}) else: pass - warnings.warn(f"Skip initial notifications is a deprecated " - f"feature of older versions <3.1 of the context " - f"broker. The Context Broker that you requesting has " - f"version: {version}. For newer versions we " - f"automatically skip this option. Consider " - f"refactoring and updating your services", - DeprecationWarning) - - url = urljoin(self.base_url, f'v2/subscriptions/{subscription.id}') + warnings.warn( + f"Skip initial notifications is a deprecated " + f"feature of older versions <3.1 of the context " + f"broker. The Context Broker that you requesting has " + f"version: {version}. For newer versions we " + f"automatically skip this option. Consider " + f"refactoring and updating your services", + DeprecationWarning, + ) + + url = urljoin(self.base_url, f"v2/subscriptions/{subscription.id}") headers = self.headers.copy() - headers.update({'Content-Type': 'application/json'}) + headers.update({"Content-Type": "application/json"}) try: res = self.patch( url=url, headers=headers, - data=subscription.model_dump_json(exclude={'id'}, - exclude_unset=True, - exclude_none=True)) + data=subscription.model_dump_json( + exclude={"id"}, exclude_unset=True, exclude_none=True + ), + ) if res.ok: self.logger.info("Subscription successfully updated!") else: @@ -1290,14 +1310,14 @@ def delete_subscription(self, subscription_id: str) -> None: Args: subscription_id: id of the subscription """ - url = urljoin(self.base_url, - f'v2/subscriptions/{subscription_id}') + url = urljoin(self.base_url, f"v2/subscriptions/{subscription_id}") headers = self.headers.copy() try: res = self.delete(url=url, headers=headers) if res.ok: - self.logger.info(f"Subscription '{subscription_id}' " - f"successfully deleted!") + self.logger.info( + f"Subscription '{subscription_id}' " f"successfully deleted!" + ) else: res.raise_for_status() except requests.RequestException as err: @@ -1306,9 +1326,7 @@ def delete_subscription(self, subscription_id: str) -> None: raise # Registration API - def get_registration_list(self, - *, - limit: PositiveInt = None) -> List[Registration]: + def get_registration_list(self, *, limit: PositiveInt = None) -> List[Registration]: """ Lists all the context provider registrations present in the system. @@ -1317,18 +1335,17 @@ def get_registration_list(self, Returns: """ - url = urljoin(self.base_url, 'v2/registrations/') + url = urljoin(self.base_url, "v2/registrations/") headers = self.headers.copy() params = {} # We always use the 'count' option to check weather pagination is # required - params.update({'options': 'count'}) + params.update({"options": "count"}) try: - items = self.__pagination(limit=limit, - url=url, - params=params, - headers=headers) + items = self.__pagination( + limit=limit, url=url, params=params, headers=headers + ) adapter = TypeAdapter(List[Registration]) return adapter.validate_python(items) except requests.RequestException as err: @@ -1348,22 +1365,21 @@ def post_registration(self, registration: Registration): Returns: """ - url = urljoin(self.base_url, 'v2/registrations') + url = urljoin(self.base_url, "v2/registrations") headers = self.headers.copy() - headers.update({'Content-Type': 'application/json'}) + headers.update({"Content-Type": "application/json"}) try: res = self.post( url=url, headers=headers, - data=registration.model_dump_json(exclude={'id'}, - exclude_unset=True, - exclude_none=True)) + data=registration.model_dump_json(exclude={"id"}, exclude_none=True), + ) if res.ok: self.logger.info("Registration successfully created!") - return res.headers['Location'].split('/')[-1] + return res.headers["Location"].split("/")[-1] res.raise_for_status() except requests.RequestException as err: - msg = f"Could not send registration {registration.id} !" + msg = f"Could not send registration {registration.id}!" self.log_error(err=err, msg=msg) raise @@ -1377,12 +1393,12 @@ def get_registration(self, registration_id: str) -> Registration: Returns: Registration """ - url = urljoin(self.base_url, f'v2/registrations/{registration_id}') + url = urljoin(self.base_url, f"v2/registrations/{registration_id}") headers = self.headers.copy() try: res = self.get(url=url, headers=headers) if res.ok: - self.logger.debug('Received: %s', res.json()) + self.logger.debug("Received: %s", res.json()) return Registration(**res.json()) res.raise_for_status() except requests.RequestException as err: @@ -1399,16 +1415,17 @@ def update_registration(self, registration: Registration): Returns: """ - url = urljoin(self.base_url, f'v2/registrations/{registration.id}') + url = urljoin(self.base_url, f"v2/registrations/{registration.id}") headers = self.headers.copy() - headers.update({'Content-Type': 'application/json'}) + headers.update({"Content-Type": "application/json"}) try: res = self.patch( url=url, headers=headers, - data=registration.model_dump_json(exclude={'id'}, - exclude_unset=True, - exclude_none=True)) + data=registration.model_dump_json( + exclude={"id"}, exclude_unset=True, exclude_none=True + ), + ) if res.ok: self.logger.info("Registration successfully updated!") else: @@ -1424,14 +1441,14 @@ def delete_registration(self, registration_id: str) -> None: Args: registration_id: id of the subscription """ - url = urljoin(self.base_url, - f'v2/registrations/{registration_id}') + url = urljoin(self.base_url, f"v2/registrations/{registration_id}") headers = self.headers.copy() try: res = self.delete(url=url, headers=headers) if res.ok: - self.logger.info("Registration '%s' " - "successfully deleted!", registration_id) + self.logger.info( + "Registration '%s' " "successfully deleted!", registration_id + ) res.raise_for_status() except requests.RequestException as err: msg = f"Could not delete registration {registration_id} !" @@ -1439,11 +1456,13 @@ def delete_registration(self, registration_id: str) -> None: raise # Batch operation API - def update(self, - *, - entities: List[ContextEntity], - action_type: Union[ActionType, str], - update_format: str = None) -> None: + def update( + self, + *, + entities: List[ContextEntity], + action_type: Union[ActionType, str], + update_format: str = None, + ) -> None: """ This operation allows to create, update and/or delete several entities in a single batch operation. @@ -1480,24 +1499,25 @@ def update(self, """ - url = urljoin(self.base_url, 'v2/op/update') + url = urljoin(self.base_url, "v2/op/update") headers = self.headers.copy() - headers.update({'Content-Type': 'application/json'}) + headers.update({"Content-Type": "application/json"}) params = {} if update_format: - assert update_format == 'keyValues', \ - "Only 'keyValues' is allowed as update format" - params.update({'options': 'keyValues'}) + assert ( + update_format == "keyValues" + ), "Only 'keyValues' is allowed as update format" + params.update({"options": "keyValues"}) update = Update(actionType=action_type, entities=entities) try: res = self.post( url=url, headers=headers, params=params, - json=update.model_dump(by_alias=True)) + json=update.model_dump(by_alias=True), + ) if res.ok: - self.logger.info("Update operation '%s' succeeded!", - action_type) + self.logger.info("Update operation '%s' succeeded!", action_type) else: res.raise_for_status() except requests.RequestException as err: @@ -1505,13 +1525,14 @@ def update(self, self.log_error(err=err, msg=msg) raise - def query(self, - *, - query: Query, - limit: PositiveInt = None, - order_by: str = None, - response_format: Union[AttrsFormat, str] = - AttrsFormat.NORMALIZED) -> List[Any]: + def query( + self, + *, + query: Query, + limit: PositiveInt = None, + order_by: str = None, + response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED, + ) -> List[Any]: """ Generate api query Args: @@ -1525,23 +1546,24 @@ def query(self, follow the JSON entity representation format (described in the section "JSON Entity Representation"). """ - url = urljoin(self.base_url, 'v2/op/query') + url = urljoin(self.base_url, "v2/op/query") headers = self.headers.copy() - headers.update({'Content-Type': 'application/json'}) - params = {'options': 'count'} + headers.update({"Content-Type": "application/json"}) + params = {"options": "count"} if response_format: if response_format not in list(AttrsFormat): - raise ValueError(f'Value must be in {list(AttrsFormat)}') - params['options'] = ','.join([response_format, 'count']) + raise ValueError(f"Value must be in {list(AttrsFormat)}") + params["options"] = ",".join([response_format, "count"]) try: - items = self.__pagination(method=PaginationMethod.POST, - url=url, - headers=headers, - params=params, - data=query.model_dump_json(exclude_unset=True, - exclude_none=True), - limit=limit) + items = self.__pagination( + method=PaginationMethod.POST, + url=url, + headers=headers, + params=params, + data=query.model_dump_json(exclude_none=True), + limit=limit, + ) if response_format == AttrsFormat.NORMALIZED: adapter = TypeAdapter(List[ContextEntity]) return adapter.validate_python(items) @@ -1570,32 +1592,37 @@ def notify(self, message: Message) -> None: Returns: None """ - url = urljoin(self.base_url, 'v2/op/notify') + url = urljoin(self.base_url, "v2/op/notify") headers = self.headers.copy() - headers.update({'Content-Type': 'application/json'}) + headers.update({"Content-Type": "application/json"}) params = {} try: res = self.post( url=url, headers=headers, params=params, - data=message.model_dump_json(by_alias=True)) + data=message.model_dump_json(by_alias=True), + ) if res.ok: self.logger.info("Notification message sent!") else: res.raise_for_status() except requests.RequestException as err: - msg = f"Sending notifcation message failed! \n " \ - f"{message.model_dump_json(inent=2)}" + msg = ( + f"Sending notifcation message failed! \n " + f"{message.model_dump_json(inent=2)}" + ) self.log_error(err=err, msg=msg) raise - def post_command(self, - *, - entity_id: str, - entity_type: str, - command: Union[Command, NamedCommand, Dict], - command_name: str = None) -> None: + def post_command( + self, + *, + entity_id: str, + entity_type: str, + command: Union[Command, NamedCommand, Dict], + command_name: str = None, + ) -> None: """ Post a command to a context entity this corresponds to 'PATCH' of the specified command attribute. @@ -1619,13 +1646,11 @@ def post_command(self, if isinstance(command, dict): command = NamedCommand(**command) - self.update_existing_entity_attributes(entity_id=entity_id, - entity_type=entity_type, - attrs=[command]) + self.update_existing_entity_attributes( + entity_id=entity_id, entity_type=entity_type, attrs=[command] + ) - def does_entity_exist(self, - entity_id: str, - entity_type: str) -> bool: + def does_entity_exist(self, entity_id: str, entity_type: str) -> bool: """ Test if an entity with given id and type is present in the CB @@ -1640,9 +1665,9 @@ def does_entity_exist(self, RequestException, if any error occurs (e.g: No Connection), except that the entity is not found """ - url = urljoin(self.base_url, f'v2/entities/{entity_id}') + url = urljoin(self.base_url, f"v2/entities/{entity_id}") headers = self.headers.copy() - params = {'type': entity_type} + params = {"type": entity_type} try: res = self.get(url=url, params=params, headers=headers) @@ -1654,10 +1679,12 @@ def does_entity_exist(self, raise return False - def patch_entity(self, - entity: ContextEntity, - old_entity: Optional[ContextEntity] = None, - override_attr_metadata: bool = True) -> None: + def patch_entity( + self, + entity: ContextEntity, + old_entity: Optional[ContextEntity] = None, + override_attr_metadata: bool = True, + ) -> None: """ Takes a given entity and updates the state in the CB to match it. It is an extended equivalent to the HTTP method PATCH, which applies @@ -1683,10 +1710,12 @@ def patch_entity(self, if old_entity is None: # If no old entity_was provided we use the current state to compare # the entity to - if self.does_entity_exist(entity_id=new_entity.id, - entity_type=new_entity.type): - old_entity = self.get_entity(entity_id=new_entity.id, - entity_type=new_entity.type) + if self.does_entity_exist( + entity_id=new_entity.id, entity_type=new_entity.type + ): + old_entity = self.get_entity( + entity_id=new_entity.id, entity_type=new_entity.type + ) else: # the entity is new, post and finish self.post_entity(new_entity, update=False) @@ -1696,22 +1725,23 @@ def patch_entity(self, # An old_entity was provided # check if the old_entity (still) exists else recall methode # and discard old_entity - if not self.does_entity_exist(entity_id=old_entity.id, - entity_type=old_entity.type): - self.patch_entity(new_entity, - override_attr_metadata=override_attr_metadata) + if not self.does_entity_exist( + entity_id=old_entity.id, entity_type=old_entity.type + ): + self.patch_entity( + new_entity, override_attr_metadata=override_attr_metadata + ) return # if type or id was changed, the old_entity needs to be deleted # and the new_entity created # In this case we will lose the current state of the entity - if old_entity.id != new_entity.id or \ - old_entity.type != new_entity.type: - self.delete_entity(entity_id=old_entity.id, - entity_type=old_entity.type) + if old_entity.id != new_entity.id or old_entity.type != new_entity.type: + self.delete_entity(entity_id=old_entity.id, entity_type=old_entity.type) - if not self.does_entity_exist(entity_id=new_entity.id, - entity_type=new_entity.type): + if not self.does_entity_exist( + entity_id=new_entity.id, entity_type=new_entity.type + ): self.post_entity(entity=new_entity, update=False) return @@ -1727,7 +1757,7 @@ def patch_entity(self, for old_attr in old_attributes: # commands do not exist in the ContextEntity and are only # registrations to the corresponding device. Operations as - # delete will fail as it does not technically exists + # delete will fail as it does not technically exist corresponding_new_attr = None for new_attr in new_attributes: if new_attr.name == old_attr.name: @@ -1736,9 +1766,11 @@ def patch_entity(self, if corresponding_new_attr is None: # Attribute no longer exists, delete it try: - self.delete_entity_attribute(entity_id=new_entity.id, - entity_type=new_entity.type, - attr_name=old_attr.name) + self.delete_entity_attribute( + entity_id=new_entity.id, + entity_type=new_entity.type, + attr_name=old_attr.name, + ) except requests.RequestException as err: # if the attribute is provided by a registration the # deletion will fail @@ -1753,7 +1785,7 @@ def patch_entity(self, entity_id=new_entity.id, entity_type=new_entity.type, attr=corresponding_new_attr, - override_metadata=override_attr_metadata + override_metadata=override_attr_metadata, ) except requests.RequestException as err: # if the attribute is provided by a registration the @@ -1780,6 +1812,7 @@ def patch_entity(self, if update_needed: self.update_entity(update_entity) + # # # def check_duplicate_subscription(self, subscription_body, limit: int = 20): diff --git a/filip/clients/ngsi_v2/iota.py b/filip/clients/ngsi_v2/iota.py index 3e5eb04c..67b75882 100644 --- a/filip/clients/ngsi_v2/iota.py +++ b/filip/clients/ngsi_v2/iota.py @@ -96,9 +96,8 @@ def post_groups(self, url = urljoin(self.base_url, 'iot/services') headers = self.headers data = {'services': [group.model_dump(exclude={'service', 'subservice'}, - exclude_none=True, - exclude_unset=True) for - group in service_groups]} + exclude_none=True) + for group in service_groups]} try: res = self.post(url=url, headers=headers, json=data) if res.ok: @@ -229,7 +228,8 @@ def update_group(self, *, service_group: ServiceGroup, json=service_group.model_dump( include=fields, exclude={'service', 'subservice'}, - exclude_unset=True)) + exclude_unset=True, + exclude_none=True)) if res.ok: self.logger.info("ServiceGroup updated!") elif (res.status_code == 404) & (add is True): diff --git a/filip/clients/ngsi_v2/quantumleap.py b/filip/clients/ngsi_v2/quantumleap.py index 5804200b..0e5f60ba 100644 --- a/filip/clients/ngsi_v2/quantumleap.py +++ b/filip/clients/ngsi_v2/quantumleap.py @@ -115,8 +115,7 @@ def post_notification(self, notification: Message): headers = self.headers.copy() data = [] for entity in notification.data: - data.append(entity.model_dump(exclude_unset=True, - exclude_none=True)) + data.append(entity.model_dump(exclude_none=True)) data_set = { "data": data, "subscriptionId": notification.subscriptionId diff --git a/filip/models/ngsi_v2/context.py b/filip/models/ngsi_v2/context.py index ae900916..b6106c29 100644 --- a/filip/models/ngsi_v2/context.py +++ b/filip/models/ngsi_v2/context.py @@ -161,9 +161,9 @@ class ContextEntity(ContextEntityKeyValues): is a string containing the entity's type name. Entity attributes are specified by additional properties, whose names are - the name of the attribute and whose representation is described in the - "ContextAttribute"-model. Obviously, id and type are - not allowed to be used as attribute names. + the name of the attribute and whose representation is described by the + "ContextAttribute"-model. Obviously, `id` and `type` are + not allowed as attribute names. Example:: @@ -174,7 +174,9 @@ class ContextEntity(ContextEntityKeyValues): >>> entity = ContextEntity(**data) """ - model_config = ConfigDict(extra='allow', validate_default=True, validate_assignment=True) + model_config = ConfigDict(extra='allow', + validate_default=True, + validate_assignment=True) def __init__(self, id: str, type: str, **data): @@ -283,7 +285,7 @@ def update_attribute(self, attrs: List of NamedContextAttributes, Dict of {attribute_name: ContextAttribute} Raises: - NameError, if the attribute does not currently exists in the entity + NameError, if the attribute does not currently exist in the entity Returns: None """ @@ -415,7 +417,7 @@ def get_commands( -> Union[List[NamedContextAttribute], Dict[str, ContextAttribute]]: """ Get all commands of the context entity. Only works if the commands - were autogenerated by Fiware from an Device. + were autogenerated by Fiware from a device. Args: response_format: Wanted result format, From 0a447f72fb6639b6369232cabcebb05a75f505ed Mon Sep 17 00:00:00 2001 From: JunsongDu Date: Mon, 22 Jan 2024 16:47:17 +0100 Subject: [PATCH 5/5] chore: remove exclude_unset for consistency --- filip/clients/ngsi_v2/cb.py | 18 ++++++++++++------ filip/clients/ngsi_v2/iota.py | 1 - 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/filip/clients/ngsi_v2/cb.py b/filip/clients/ngsi_v2/cb.py index 7d773c96..309c81ee 100644 --- a/filip/clients/ngsi_v2/cb.py +++ b/filip/clients/ngsi_v2/cb.py @@ -716,7 +716,8 @@ def update_or_append_entity_attributes( url=url, headers=headers, json=entity.model_dump( - exclude=excluded_keys, exclude_unset=True, exclude_none=True + exclude=excluded_keys, + exclude_none=True ), params=params, ) @@ -763,7 +764,8 @@ def update_existing_entity_attributes( url=url, headers=headers, json=entity.model_dump( - exclude={"id", "type"}, exclude_unset=True, exclude_none=True + exclude={"id", "type"}, + exclude_none=True ), params=params, ) @@ -809,7 +811,8 @@ def replace_entity_attributes( url=url, headers=headers, json=entity.model_dump( - exclude={"id", "type"}, exclude_unset=True, exclude_none=True + exclude={"id", "type"}, + exclude_none=True ), params=params, ) @@ -927,7 +930,8 @@ def update_entity_attribute( headers=headers, params=params, json=attr.model_dump( - exclude={"name"}, exclude_unset=True, exclude_none=True + exclude={"name"}, + exclude_none=True ), ) if res.ok: @@ -1292,7 +1296,8 @@ def update_subscription( url=url, headers=headers, data=subscription.model_dump_json( - exclude={"id"}, exclude_unset=True, exclude_none=True + exclude={"id"}, + exclude_none=True ), ) if res.ok: @@ -1423,7 +1428,8 @@ def update_registration(self, registration: Registration): url=url, headers=headers, data=registration.model_dump_json( - exclude={"id"}, exclude_unset=True, exclude_none=True + exclude={"id"}, + exclude_none=True ), ) if res.ok: diff --git a/filip/clients/ngsi_v2/iota.py b/filip/clients/ngsi_v2/iota.py index 67b75882..87b54406 100644 --- a/filip/clients/ngsi_v2/iota.py +++ b/filip/clients/ngsi_v2/iota.py @@ -228,7 +228,6 @@ def update_group(self, *, service_group: ServiceGroup, json=service_group.model_dump( include=fields, exclude={'service', 'subservice'}, - exclude_unset=True, exclude_none=True)) if res.ok: self.logger.info("ServiceGroup updated!")