Skip to content

Commit

Permalink
Support injecting a "_v2" tag in posted datum, to work around CBOR bug
Browse files Browse the repository at this point in the history
…FasterXML/jackson-dataformats-binary#139. This way the recipient can see if the message was encoded with >= 2.10 CBOR version, or else < 2.10 with the bug present.
  • Loading branch information
msqr committed Dec 17, 2019
1 parent ef85596 commit a8a4ec0
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,8 @@ public void uploadWithConnectionToMqttServer() throws IOException {
InterceptPublishMessage pubMsg = session.publishMessages.get(0);
assertThat("Publish client ID", pubMsg.getClientID(), equalTo(nodeId.toString()));
assertThat("Publish topic", pubMsg.getTopicName(), equalTo(datumTopic(nodeId)));

datum.addTag(MqttUploadService.TAG_VERSION_2);
assertThat("Publish payload", session.getPublishPayloadStringAtIndex(0),
equalTo(objectMapper.writeValueAsString(datum)));

Expand Down Expand Up @@ -325,6 +327,8 @@ public void uploadLocationDatumWithConnectionToMqttServer() throws IOException {
InterceptPublishMessage pubMsg = session.publishMessages.get(0);
assertThat("Publish client ID", pubMsg.getClientID(), equalTo(nodeId.toString()));
assertThat("Publish topic", pubMsg.getTopicName(), equalTo(datumTopic(nodeId)));

datum.addTag(MqttUploadService.TAG_VERSION_2);
assertThat("Publish payload", session.getPublishPayloadStringAtIndex(0),
equalTo(objectMapper.writeValueAsString(datum)));

Expand Down
15 changes: 8 additions & 7 deletions net.solarnetwork.node.upload.mqtt/META-INF/MANIFEST.MF
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@ Bundle-Name: MQTT integration
Bundle-Description: Integrate with the SolarIn MQTT service to publish data
and receive instructions in real time.
Bundle-SymbolicName: net.solarnetwork.node.upload.mqtt
Bundle-Version: 1.4.0
Bundle-Version: 1.5.0
Bundle-Vendor: SolarNetwork
Automatic-Module-Name: net.solarnetwork.node.upload.mqtt
Bundle-RequiredExecutionEnvironment: JavaSE-1.8
Import-Package:
com.fasterxml.jackson.annotation;version="[2.8,3.0)",
com.fasterxml.jackson.core;version="[2.8,3.0)",
com.fasterxml.jackson.databind;version="[2.8,3.0)",
com.fasterxml.jackson.databind.annotation;version="[2.8,3.0)",
com.fasterxml.jackson.databind.ser.std;version="[2.8,3.0)",
com.fasterxml.jackson.dataformat.cbor;version="[2.8,3.0)",
com.fasterxml.jackson.annotation;version="[2.10,3.0)",
com.fasterxml.jackson.core;version="[2.10,3.0)",
com.fasterxml.jackson.databind;version="[2.10,3.0)",
com.fasterxml.jackson.databind.annotation;version="[2.10,3.0)",
com.fasterxml.jackson.databind.node;version="[2.10,3.0)",
com.fasterxml.jackson.databind.ser.std;version="[2.10,3.0)",
com.fasterxml.jackson.dataformat.cbor;version="[2.10,3.0)",
javax.net,
javax.net.ssl,
net.solarnetwork.common.mqtt;version="1.0.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@
import org.springframework.util.DigestUtils;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeCreator;
import com.fasterxml.jackson.databind.node.ObjectNode;
import net.solarnetwork.common.mqtt.BaseMqttConnectionService;
import net.solarnetwork.common.mqtt.BasicMqttMessage;
import net.solarnetwork.common.mqtt.MqttConnection;
Expand Down Expand Up @@ -67,7 +70,7 @@
* {@link UploadService} using MQTT.
*
* @author matt
* @version 1.2
* @version 1.3
*/
public class MqttUploadService extends BaseMqttConnectionService
implements UploadService, MqttMessageHandler, MqttConnectionObserver {
Expand All @@ -81,12 +84,19 @@ public class MqttUploadService extends BaseMqttConnectionService
/** The MQTT topic template for node data publication. */
public static final String NODE_DATUM_TOPIC_TEMPLATE = "node/%s/datum";

/** The default value for the {@code includeVersionTag} property. */
public static final boolean DEFAULT_INCLUDE_VERSION_TAG = true;

/** A tag to indicate that CBOR encoding v2 is in use. */
public static final String TAG_VERSION_2 = "_v2";

private final ObjectMapper objectMapper;
private final IdentityService identityService;
private final OptionalService<ReactorService> reactorServiceOpt;
private final OptionalService<InstructionExecutionService> instructionExecutionServiceOpt;
private final OptionalService<EventAdmin> eventAdminOpt;
private Executor executor;
private boolean includeVersionTag = DEFAULT_INCLUDE_VERSION_TAG;

private CompletableFuture<?> startupFuture;

Expand Down Expand Up @@ -224,6 +234,31 @@ public String uploadDatum(Datum data) {
String topic = String.format(NODE_DATUM_TOPIC_TEMPLATE, nodeId);
try {
JsonNode jsonData = objectMapper.valueToTree(data);
if ( includeVersionTag ) {
JsonNode samplesData = jsonData.path("samples");
if ( samplesData.isObject() ) {
JsonNode tagsData = samplesData.path("t");
ArrayNode tagsArrayNode = null;
if ( tagsData.isArray() ) {
tagsArrayNode = (ArrayNode) tagsData;
} else if ( tagsData.isNull() || tagsData.isMissingNode() ) {
tagsArrayNode = ((JsonNodeCreator) samplesData).arrayNode(1);
((ObjectNode) samplesData).set("t", tagsArrayNode);
}
if ( tagsArrayNode != null ) {
boolean found = false;
for ( JsonNode t : tagsArrayNode ) {
if ( TAG_VERSION_2.equals(t.textValue()) ) {
found = true;
break;
}
}
if ( !found ) {
tagsArrayNode.add(TAG_VERSION_2);
}
}
}
}
if ( jsonData != null && !jsonData.isNull() ) {
conn.publish(new BasicMqttMessage(topic, false, getPublishQos(),
objectMapper.writeValueAsBytes(jsonData)))
Expand Down Expand Up @@ -457,4 +492,28 @@ public void onMqttServerConnectionEstablisehd(MqttConnection connection, boolean
public void setExecutor(Executor executor) {
this.executor = executor;
}

/**
* Get the "include version tag" toggle.
*
* @return {@literal true} to include the {@literal _v} version tag with
* each datum; defaults to {@link #DEFAULT_INCLUDE_VERSION_TAG}
* @since 1.3
*/
public boolean isIncludeVersionTag() {
return includeVersionTag;
}

/**
* Set the "inclue version tag" toggle.
*
* @param includeVersionTag
* {@literal true} to include the {@link #TAG_VERSION} property with
* each datum; only disable if you can be sure that all receivers of
* SolarFlux messages interpret the data in the same way
* @since 1.3
*/
public void setIncludeVersionTag(boolean includeVersionTag) {
this.includeVersionTag = includeVersionTag;
}
}

0 comments on commit a8a4ec0

Please sign in to comment.