Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

288 filip support right now only paylaod in mqtt custom / http custom but orion already supports json and ngsi as further options #296

2 changes: 1 addition & 1 deletion filip/clients/ngsi_v2/cb.py
Original file line number Diff line number Diff line change
Expand Up @@ -1898,7 +1898,7 @@ def notify(self, message: Message) -> None:
except requests.RequestException as err:
msg = (
f"Sending notifcation message failed! \n "
f"{message.model_dump_json(inent=2)}"
f"{message.model_dump_json(indent=2)}"
)
self.log_error(err=err, msg=msg)
raise
Expand Down
139 changes: 120 additions & 19 deletions filip/models/ngsi_v2/subscriptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,66 @@
Field, \
Json
from .base import AttrsFormat, EntityPattern, Http, Status, Expression
from filip.utils.validators import validate_mqtt_url, validate_mqtt_topic
from filip.utils.validators import (
validate_mqtt_url,
validate_mqtt_topic
)
from filip.models.ngsi_v2.context import ContextEntity
from filip.models.ngsi_v2.base import (
EntityPattern,
Expression,
BaseValueAttribute
)
from filip.custom_types import AnyMqttUrl
import warnings

# The pydantic models still have a .json() function, but this method is deprecated.
warnings.filterwarnings("ignore", category=UserWarning,
message='Field name "json" shadows an attribute in parent "Http"')
warnings.filterwarnings("ignore", category=UserWarning,
message='Field name "json" shadows an attribute in parent "Mqtt"')


class NgsiPayloadAttr(BaseValueAttribute):
"""
Model for NGSI V2 type payload in httpCustom/mqttCustom notifications.
The difference between this model and the usual BaseValueAttribute model is that
a metadata field is not allowed.
In the absence of type/value in some attribute field, one should resort to partial
representations ( as specified in the orion api manual), done by the BaseValueAttr.
model.
"""
model_config = ConfigDict(extra="forbid")


class NgsiPayload(BaseModel):
"""
Model for NGSI V2 type payload in httpCustom/mqttCustom notifications.
Differences between this model and the usual Context entity models include:
- id and type are not mandatory
- an attribute metadata field is not allowed
"""
model_config = ConfigDict(
extra="allow", validate_default=True
)
id: Optional[str] = Field(
default=None,
max_length=256,
min_length=1,
frozen=True
)
type: Optional[Union[str, Enum]] = Field(
default=None,
max_length=256,
min_length=1,
frozen=True,
)

@model_validator(mode='after')
def validate_notification_attrs(self):
for v in self.model_dump(exclude={"id", "type"}).values():
assert isinstance(NgsiPayloadAttr.model_validate(v), NgsiPayloadAttr)
return self


class Message(BaseModel):
Expand Down Expand Up @@ -69,6 +126,35 @@ class HttpCustom(Http):
'default payload (see "Notification Messages" sections) '
'is used.'
)
json: Optional[Dict[str, Union[str, Json]]] = Field(
default=None,
description='get a json as notification. If omitted, the default'
'payload (see "Notification Messages" sections) is used.'
)
ngsi: Optional[NgsiPayload] = Field(
default=None,
description='get an NGSI-v2 normalized entity as notification.If omitted, '
'the default payload (see "Notification Messages" sections) is used.'
)
timeout: Optional[int] = Field(
default=None,
description="Maximum time (in milliseconds) the subscription waits for the "
"response. The maximum value allowed for this parameter is 1800000 "
"(30 minutes). If timeout is defined to 0 or omitted, then the value "
"passed as -httpTimeout CLI parameter is used. See section in the "
"'Command line options' for more details."
)

@model_validator(mode='after')
def validate_notification_payloads(self):
fields = [self.payload, self.json, self.ngsi]
filled_fields = [field for field in fields if field is not None]

if len(filled_fields) > 1:
raise ValueError("Only one of payload, json or ngsi fields accepted at the "
"same time in httpCustom.")

return self


class Mqtt(BaseModel):
Expand All @@ -82,7 +168,7 @@ class Mqtt(BaseModel):
'only includes host and port)')
topic: str = Field(
description='to specify the MQTT topic to use',
)
)
valid_type = field_validator("topic")(validate_mqtt_topic)
qos: Optional[int] = Field(
default=0,
Expand Down Expand Up @@ -124,6 +210,22 @@ class MqttCustom(Mqtt):
'default payload (see "Notification Messages" sections) '
'is used.'
)
json: Optional[Dict[str, Any]] = Field(
default=None,
description='get a json as notification. If omitted, the default'
'payload (see "Notification Messages" sections) is used.'
)
ngsi: Optional[NgsiPayload] = Field(
default=None,
description='get an NGSI-v2 normalized entity as notification.If omitted, '
'the default payload (see "Notification Messages" sections) is used.'
)

@model_validator(mode='after')
def validate_payload_type(self):
assert len([v for k, v in self.model_dump().items()
if ((v is not None) and (k in ['payload', 'ngsi', 'json']))]) <= 1
return self


class Notification(BaseModel):
Expand Down Expand Up @@ -205,17 +307,17 @@ class Notification(BaseModel):
'[A=0, B=null, C=null]. This '
)

@field_validator('httpCustom')
def validate_http(cls, http_custom, values):
if http_custom is not None:
assert values['http'] is None
return http_custom
@model_validator(mode='after')
def validate_http(self):
if self.httpCustom is not None:
assert self.http is None
return self

@field_validator('exceptAttrs')
def validate_attr(cls, except_attrs, values):
if except_attrs is not None:
assert values['attrs'] is None
return except_attrs
@model_validator(mode='after')
def validate_attr(self):
if self.exceptAttrs is not None:
assert self.attrs is None
return self

@model_validator(mode='after')
def validate_endpoints(self):
Expand Down Expand Up @@ -247,7 +349,7 @@ class Response(Notification):
'Last notification timestamp in ISO8601 format.'
)
lastFailure: Optional[datetime] = Field(
default = None,
default=None,
description='(not editable, only present in GET operations): '
'Last failure timestamp in ISO8601 format. Not present if '
'subscription has never had a problem with notifications.'
Expand Down Expand Up @@ -342,33 +444,32 @@ class Subscription(BaseModel):
)
subject: Subject = Field(
description="An object that describes the subject of the subscription.",
example={
examples=[{
'entities': [{'idPattern': '.*', 'type': 'Room'}],
'condition': {
'attrs': ['temperature'],
'expression': {'q': 'temperature>40'},
},
},
}],
)
notification: Notification = Field(
description="An object that describes the notification to send when "
"the subscription is triggered.",
example={
examples=[{
'http': {'url': 'http://localhost:1234'},
'attrs': ['temperature', 'humidity'],
},
}],
)
expires: Optional[datetime] = Field(
default=None,
description="Subscription expiration date in ISO8601 format. "
"Permanent subscriptions must omit this field."
)

throttling: Optional[conint(strict=True, ge=0,)] = Field(
throttling: Optional[conint(strict=True, ge=0, )] = Field(
default=None,
strict=True,
description="Minimal period of time in seconds which "
"must elapse between two consecutive notifications. "
"It is optional."
)

143 changes: 141 additions & 2 deletions tests/clients/test_ngsi_v2_cb.py
Original file line number Diff line number Diff line change
Expand Up @@ -910,7 +910,91 @@ def test_notification(self):
"throttling": 0
})

mqtt_custom_topic = "notification/custom"
sub_with_mqtt_custom_notification_payload = Subscription.model_validate({
"description": "Test mqtt custom notification with payload message",
"subject": {
"entities": [
{
"id": "Test:001",
"type": "Test"
}
]
},
"notification": {
"mqttCustom": {
"url": mqtt_url_internal,
"topic": mqtt_custom_topic,
"payload": "The value of the %22temperature%22 attribute %28of the device ${id}, ${type}%29 is"
" ${temperature}. Humidity is ${humidity} and CO2 is ${co2}."
},
"attrs": ["temperature", "humidity", "co2"],
"onlyChangedAttrs": False
},
"expires": datetime.now() + timedelta(days=1),
"throttling": 0
})

sub_with_mqtt_custom_notification_json = Subscription.model_validate({
"description": "Test mqtt custom notification with json message",
"subject": {
"entities": [
{
"id": "Test:001",
"type": "Test"
}
]
},
"notification": {
"mqttCustom": {
"url": mqtt_url_internal,
"topic": mqtt_custom_topic,
"json": {
"t": "${temperature}",
"h": "${humidity}",
"c": "${co2}"
}
},
"attrs": ["temperature", "humidity", "co2"],
"onlyChangedAttrs": False
},
"expires": datetime.now() + timedelta(days=1),
"throttling": 0
})

sub_with_mqtt_custom_notification_ngsi = Subscription.model_validate({
"description": "Test mqtt custom notification with ngsi message",
"subject": {
"entities": [
{
"id": "Test:001",
"type": "Test"
}
]
},
"notification": {
"mqttCustom": {
"url": mqtt_url_internal,
"topic": mqtt_custom_topic,
"ngsi": {
"id": "prefix:${id}",
"type": "newType",
"temperature": {
"value": 123,
"type": "Number"
},

}
},
"attrs": ["temperature", "humidity", "co2"],
"onlyChangedAttrs": False
},
"expires": datetime.now() + timedelta(days=1),
"throttling": 0
})

# MQTT settings
custom_sub_message = None
sub_message = None
sub_messages = {}

Expand All @@ -923,6 +1007,7 @@ def on_connect(client, userdata, flags, reasonCode, properties=None):
logger.info("Successfully, connected with result code " + str(
reasonCode))
client.subscribe(mqtt_topic)
client.subscribe(mqtt_custom_topic)

def on_subscribe(client, userdata, mid, granted_qos, properties=None):
logger.info("Successfully subscribed to with QoS: %s", granted_qos)
Expand All @@ -931,8 +1016,12 @@ def on_message(client, userdata, msg):
logger.info("Received MQTT message: " + msg.topic + " " + str(
msg.payload))
nonlocal sub_message
sub_message = Message.model_validate_json(msg.payload)
sub_messages[sub_message.subscriptionId] = sub_message
nonlocal custom_sub_message
if msg.topic == mqtt_topic:
sub_message = Message.model_validate_json(msg.payload)
sub_messages[sub_message.subscriptionId] = sub_message
elif msg.topic == mqtt_custom_topic:
custom_sub_message = msg.payload

def on_disconnect(client, userdata, flags, reasonCode, properties=None):
logger.info("MQTT client disconnected with reasonCode "
Expand Down Expand Up @@ -1027,6 +1116,56 @@ def on_disconnect(client, userdata, flags, reasonCode, properties=None):
sub_messages[sub_id_3].data[0].get_attribute(
"temperature").value, 10)

# test4 notification with mqtt custom notification (payload)
sub_id_4 = client.post_subscription(
subscription=sub_with_mqtt_custom_notification_payload)
time.sleep(1)
client.update_attribute_value(entity_id=entity.id,
attr_name="temperature",
value=44
)
time.sleep(1)
sub_4 = client.get_subscription(sub_id_4)
self.assertEqual(first=custom_sub_message,
second=b'The value of the "temperature" attribute (of the device Test:001, Test) is 44. '
b'Humidity is 20 and CO2 is 30.')
self.assertEqual(sub_4.notification.timesSent, 1)
client.delete_subscription(sub_id_4)

# test5 notification with mqtt custom notification (json)
sub_id_5 = client.post_subscription(
subscription=sub_with_mqtt_custom_notification_json)
time.sleep(1)
client.update_attribute_value(entity_id=entity.id,
attr_name="humidity",
value=67
)
time.sleep(1)
sub_5 = client.get_subscription(sub_id_5)
self.assertEqual(first=custom_sub_message,
second=b'{"t":44,"h":67,"c":30}')
self.assertEqual(sub_5.notification.timesSent, 1)
client.delete_subscription(sub_id_5)

# test6 notification with mqtt custom notification (ngsi)
sub_id_6 = client.post_subscription(
subscription=sub_with_mqtt_custom_notification_ngsi)
time.sleep(1)
client.update_attribute_value(entity_id=entity.id,
attr_name="co2",
value=78
)
time.sleep(1)
sub_6 = client.get_subscription(sub_id_6)
sub_message = Message.model_validate_json(custom_sub_message)
self.assertEqual(sub_6.notification.timesSent, 1)
self.assertEqual(len(sub_message.data[0].get_attributes()), 3)
self.assertEqual(sub_message.data[0].id, "prefix:Test:001")
self.assertEqual(sub_message.data[0].type, "newType")
self.assertEqual(sub_message.data[0].get_attribute("co2").value, 78)
self.assertEqual(sub_message.data[0].get_attribute("temperature").value, 123)
client.delete_subscription(sub_id_6)

@clean_test(fiware_service=settings.FIWARE_SERVICE,
fiware_servicepath=settings.FIWARE_SERVICEPATH,
cb_url=settings.CB_URL)
Expand Down
Loading