From a8a4ec0137feb032bc113c8be0e23fed4031490e Mon Sep 17 00:00:00 2001 From: Matt Magoffin Date: Tue, 17 Dec 2019 15:15:40 +1300 Subject: [PATCH] Support injecting a "_v2" tag in posted datum, to work around CBOR bug https://github.com/FasterXML/jackson-dataformats-binary/issues/139. This way the recipient can see if the message was encoded with >= 2.10 CBOR version, or else < 2.10 with the bug present. --- .../mqtt/test/MqttUploadServiceTests.java | 4 ++ .../META-INF/MANIFEST.MF | 15 ++--- .../node/upload/mqtt/MqttUploadService.java | 61 ++++++++++++++++++- 3 files changed, 72 insertions(+), 8 deletions(-) diff --git a/net.solarnetwork.node.upload.mqtt.test/src/net/solarnetwork/node/upload/mqtt/test/MqttUploadServiceTests.java b/net.solarnetwork.node.upload.mqtt.test/src/net/solarnetwork/node/upload/mqtt/test/MqttUploadServiceTests.java index 77216bf81..6aebacc76 100644 --- a/net.solarnetwork.node.upload.mqtt.test/src/net/solarnetwork/node/upload/mqtt/test/MqttUploadServiceTests.java +++ b/net.solarnetwork.node.upload.mqtt.test/src/net/solarnetwork/node/upload/mqtt/test/MqttUploadServiceTests.java @@ -284,6 +284,8 @@ public void uploadWithConnectionToMqttServer() throws IOException { InterceptPublishMessage pubMsg = session.publishMessages.get(0); assertThat("Publish client ID", pubMsg.getClientID(), equalTo(nodeId.toString())); assertThat("Publish topic", pubMsg.getTopicName(), equalTo(datumTopic(nodeId))); + + datum.addTag(MqttUploadService.TAG_VERSION_2); assertThat("Publish payload", session.getPublishPayloadStringAtIndex(0), equalTo(objectMapper.writeValueAsString(datum))); @@ -325,6 +327,8 @@ public void uploadLocationDatumWithConnectionToMqttServer() throws IOException { InterceptPublishMessage pubMsg = session.publishMessages.get(0); assertThat("Publish client ID", pubMsg.getClientID(), equalTo(nodeId.toString())); assertThat("Publish topic", pubMsg.getTopicName(), equalTo(datumTopic(nodeId))); + + datum.addTag(MqttUploadService.TAG_VERSION_2); assertThat("Publish payload", session.getPublishPayloadStringAtIndex(0), equalTo(objectMapper.writeValueAsString(datum))); diff --git a/net.solarnetwork.node.upload.mqtt/META-INF/MANIFEST.MF b/net.solarnetwork.node.upload.mqtt/META-INF/MANIFEST.MF index 72c37993d..775bfd5cc 100644 --- a/net.solarnetwork.node.upload.mqtt/META-INF/MANIFEST.MF +++ b/net.solarnetwork.node.upload.mqtt/META-INF/MANIFEST.MF @@ -4,17 +4,18 @@ Bundle-Name: MQTT integration Bundle-Description: Integrate with the SolarIn MQTT service to publish data and receive instructions in real time. Bundle-SymbolicName: net.solarnetwork.node.upload.mqtt -Bundle-Version: 1.4.0 +Bundle-Version: 1.5.0 Bundle-Vendor: SolarNetwork Automatic-Module-Name: net.solarnetwork.node.upload.mqtt Bundle-RequiredExecutionEnvironment: JavaSE-1.8 Import-Package: - com.fasterxml.jackson.annotation;version="[2.8,3.0)", - com.fasterxml.jackson.core;version="[2.8,3.0)", - com.fasterxml.jackson.databind;version="[2.8,3.0)", - com.fasterxml.jackson.databind.annotation;version="[2.8,3.0)", - com.fasterxml.jackson.databind.ser.std;version="[2.8,3.0)", - com.fasterxml.jackson.dataformat.cbor;version="[2.8,3.0)", + com.fasterxml.jackson.annotation;version="[2.10,3.0)", + com.fasterxml.jackson.core;version="[2.10,3.0)", + com.fasterxml.jackson.databind;version="[2.10,3.0)", + com.fasterxml.jackson.databind.annotation;version="[2.10,3.0)", + com.fasterxml.jackson.databind.node;version="[2.10,3.0)", + com.fasterxml.jackson.databind.ser.std;version="[2.10,3.0)", + com.fasterxml.jackson.dataformat.cbor;version="[2.10,3.0)", javax.net, javax.net.ssl, net.solarnetwork.common.mqtt;version="1.0.0", diff --git a/net.solarnetwork.node.upload.mqtt/src/net/solarnetwork/node/upload/mqtt/MqttUploadService.java b/net.solarnetwork.node.upload.mqtt/src/net/solarnetwork/node/upload/mqtt/MqttUploadService.java index 5eea38d80..8140d5ad3 100644 --- a/net.solarnetwork.node.upload.mqtt/src/net/solarnetwork/node/upload/mqtt/MqttUploadService.java +++ b/net.solarnetwork.node.upload.mqtt/src/net/solarnetwork/node/upload/mqtt/MqttUploadService.java @@ -40,6 +40,9 @@ import org.springframework.util.DigestUtils; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.JsonNodeCreator; +import com.fasterxml.jackson.databind.node.ObjectNode; import net.solarnetwork.common.mqtt.BaseMqttConnectionService; import net.solarnetwork.common.mqtt.BasicMqttMessage; import net.solarnetwork.common.mqtt.MqttConnection; @@ -67,7 +70,7 @@ * {@link UploadService} using MQTT. * * @author matt - * @version 1.2 + * @version 1.3 */ public class MqttUploadService extends BaseMqttConnectionService implements UploadService, MqttMessageHandler, MqttConnectionObserver { @@ -81,12 +84,19 @@ public class MqttUploadService extends BaseMqttConnectionService /** The MQTT topic template for node data publication. */ public static final String NODE_DATUM_TOPIC_TEMPLATE = "node/%s/datum"; + /** The default value for the {@code includeVersionTag} property. */ + public static final boolean DEFAULT_INCLUDE_VERSION_TAG = true; + + /** A tag to indicate that CBOR encoding v2 is in use. */ + public static final String TAG_VERSION_2 = "_v2"; + private final ObjectMapper objectMapper; private final IdentityService identityService; private final OptionalService reactorServiceOpt; private final OptionalService instructionExecutionServiceOpt; private final OptionalService eventAdminOpt; private Executor executor; + private boolean includeVersionTag = DEFAULT_INCLUDE_VERSION_TAG; private CompletableFuture startupFuture; @@ -224,6 +234,31 @@ public String uploadDatum(Datum data) { String topic = String.format(NODE_DATUM_TOPIC_TEMPLATE, nodeId); try { JsonNode jsonData = objectMapper.valueToTree(data); + if ( includeVersionTag ) { + JsonNode samplesData = jsonData.path("samples"); + if ( samplesData.isObject() ) { + JsonNode tagsData = samplesData.path("t"); + ArrayNode tagsArrayNode = null; + if ( tagsData.isArray() ) { + tagsArrayNode = (ArrayNode) tagsData; + } else if ( tagsData.isNull() || tagsData.isMissingNode() ) { + tagsArrayNode = ((JsonNodeCreator) samplesData).arrayNode(1); + ((ObjectNode) samplesData).set("t", tagsArrayNode); + } + if ( tagsArrayNode != null ) { + boolean found = false; + for ( JsonNode t : tagsArrayNode ) { + if ( TAG_VERSION_2.equals(t.textValue()) ) { + found = true; + break; + } + } + if ( !found ) { + tagsArrayNode.add(TAG_VERSION_2); + } + } + } + } if ( jsonData != null && !jsonData.isNull() ) { conn.publish(new BasicMqttMessage(topic, false, getPublishQos(), objectMapper.writeValueAsBytes(jsonData))) @@ -457,4 +492,28 @@ public void onMqttServerConnectionEstablisehd(MqttConnection connection, boolean public void setExecutor(Executor executor) { this.executor = executor; } + + /** + * Get the "include version tag" toggle. + * + * @return {@literal true} to include the {@literal _v} version tag with + * each datum; defaults to {@link #DEFAULT_INCLUDE_VERSION_TAG} + * @since 1.3 + */ + public boolean isIncludeVersionTag() { + return includeVersionTag; + } + + /** + * Set the "inclue version tag" toggle. + * + * @param includeVersionTag + * {@literal true} to include the {@link #TAG_VERSION} property with + * each datum; only disable if you can be sure that all receivers of + * SolarFlux messages interpret the data in the same way + * @since 1.3 + */ + public void setIncludeVersionTag(boolean includeVersionTag) { + this.includeVersionTag = includeVersionTag; + } }