diff --git a/src/main/java/com/serotonin/bacnet4j/transport/DefaultTransport.java b/src/main/java/com/serotonin/bacnet4j/transport/DefaultTransport.java index 4dba3843..ab12019d 100644 --- a/src/main/java/com/serotonin/bacnet4j/transport/DefaultTransport.java +++ b/src/main/java/com/serotonin/bacnet4j/transport/DefaultTransport.java @@ -680,9 +680,13 @@ private void segmentedIncoming(final UnackedMessageKey key, final Segmentable ms private static void completeComplexAckResponse(final ComplexACK cack, final ResponseConsumer consumer) { try { cack.parseServiceData(); - consumer.success(cack.getService()); + if (consumer != null) { + consumer.success(cack.getService()); + } } catch (final BACnetException e) { - consumer.ex(e); + if (consumer != null) { + consumer.ex(e); + } } } @@ -720,7 +724,7 @@ private void segmentedOutgoing(final UnackedMessageKey key, final UnackedMessage try { network.sendAPDU(key.getAddress(), key.getLinkService(), segment, false); } catch (final BACnetException e) { - ctx.getConsumer().ex(e); + ctx.useConsumer((consumer) -> consumer.ex(e)); return; } @@ -858,15 +862,14 @@ private boolean expire() { umIter.remove(); if (ctx.getSegmentWindow() == null) { // Not a segmented message, at least as far as we know. - ctx.getConsumer().ex(new BACnetTimeoutException()); + ctx.useConsumer((consumer) -> consumer.ex(new BACnetTimeoutException())); } else { // A segmented message. - if (ctx.getSegmentWindow().isEmpty() && ctx.getConsumer() != null) { + if (ctx.getSegmentWindow().isEmpty()) { // No segments received. Return a timeout. - ctx.getConsumer() - .ex(new BACnetTimeoutException( - "Timeout while waiting for segment part: invokeId=" + key.getInvokeId() - + ", sequenceId=" + ctx.getSegmentWindow().getFirstSequenceId())); + ctx.useConsumer((consumer) -> consumer.ex(new BACnetTimeoutException( + "Timeout while waiting for segment part: invokeId=" + key.getInvokeId() + + ", sequenceId=" + ctx.getSegmentWindow().getFirstSequenceId()))); } else if (ctx.getSegmentWindow().isEmpty()) LOG.warn("No segments received for message " + ctx.getOriginalApdu()); else { @@ -878,7 +881,7 @@ private boolean expire() { ctx.getSegmentWindow().getWindowSize(), true), false); } catch (final BACnetException ex) { - ctx.getConsumer().ex(ex); + ctx.useConsumer((consumer) -> consumer.ex(ex)); } } } @@ -896,10 +899,7 @@ void sendForResponse(final UnackedMessageKey key, final UnackedMessageContext ct network.sendAPDU(key.getAddress(), key.getLinkService(), ctx.getOriginalApdu(), false); } catch (final BACnetException e) { unackedMessages.remove(key); - if (ctx.getConsumer() != null) - ctx.getConsumer().ex(e); - else - LOG.error("", e); + ctx.useConsumer((consumer) -> consumer.ex(e)); } } diff --git a/src/main/java/com/serotonin/bacnet4j/transport/UnackedMessageContext.java b/src/main/java/com/serotonin/bacnet4j/transport/UnackedMessageContext.java index fc20e4bf..72b2d084 100644 --- a/src/main/java/com/serotonin/bacnet4j/transport/UnackedMessageContext.java +++ b/src/main/java/com/serotonin/bacnet4j/transport/UnackedMessageContext.java @@ -157,6 +157,12 @@ public void setLastIdSent(final int lastIdSent) { this.lastIdSent = lastIdSent; } + public void useConsumer(final ConsumerClient client) { + if (consumer != null) { + client.use(consumer); + } + } + @Override public String toString() { return "UnackedMessageContext [deadline=" + deadline + ", attemptsLeft=" + attemptsLeft + ", clock=" + clock @@ -165,4 +171,9 @@ public String toString() { + segmentTemplate + ", serviceData=" + serviceData + ", segBuf=" + Arrays.toString(segBuf) + ", lastIdSent=" + lastIdSent + "]"; } + + @FunctionalInterface + public static interface ConsumerClient { + void use(ResponseConsumer consumer); + } }