From 0a4028267396e5fae9ed5b0dbe942df9ace4c0a2 Mon Sep 17 00:00:00 2001 From: Trevor Pering Date: Wed, 6 Nov 2024 13:42:23 -0800 Subject: [PATCH 1/2] Synchronize mqtt client to prevent errors during token refresh --- .../bos/iot/core/proxy/MqttPublisher.java | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/validator/src/main/java/com/google/bos/iot/core/proxy/MqttPublisher.java b/validator/src/main/java/com/google/bos/iot/core/proxy/MqttPublisher.java index 9da8a3755..bc79f8fa0 100644 --- a/validator/src/main/java/com/google/bos/iot/core/proxy/MqttPublisher.java +++ b/validator/src/main/java/com/google/bos/iot/core/proxy/MqttPublisher.java @@ -44,7 +44,6 @@ import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; @@ -95,7 +94,7 @@ public class MqttPublisher implements MessagePublisher { private static final int INITIALIZE_TIME_MS = 20000; private static final String BROKER_URL_FORMAT = "%s://%s:%s"; private static final int PUBLISH_THREAD_COUNT = 10; - private static final int TOKEN_EXPIRATION_SEC = 60 * 60; + private static final int TOKEN_EXPIRATION_SEC = 60; private static final int TOKEN_EXPIRATION_MS = TOKEN_EXPIRATION_SEC * 1000; private static final String TICKLE_TOPIC = "events/udmi"; private static final long TICKLE_PERIOD_SEC = 10; @@ -131,6 +130,7 @@ public class MqttPublisher implements MessagePublisher { private final Envelope savedState = new Envelope(); private final AtomicInteger publisherQueueSize = new AtomicInteger(); private final AtomicInteger publishCount = new AtomicInteger(); + private final String mqttClientId; private long mqttTokenSetTimeMs; private MqttConnectOptions mqttConnectOptions; private boolean shutdown; @@ -153,6 +153,7 @@ public class MqttPublisher implements MessagePublisher { LOG.info(deviceId + " token expiration sec " + TOKEN_EXPIRATION_SEC); certManager = getCertManager(); mqttClient = newMqttClient(deviceId); + mqttClientId = mqttClient.getClientId(); connectMqttClient(deviceId); tickler = scheduleTickler(); } @@ -251,7 +252,7 @@ private ScheduledFuture scheduleTickler() { } private void tickleConnection() { - LOG.debug("Tickle " + mqttClient.getClientId()); + LOG.debug("Tickle " + mqttClientId); if (shutdown) { try { LOG.info("Tickler closing connection due to shutdown request"); @@ -308,7 +309,7 @@ private synchronized void publishCore(String deviceId, String topic, String payl publishRaw(deviceId, topic, payload, start); } - private void publishRaw(String deviceId, String topic, String payload, Instant start) { + private synchronized void publishRaw(String deviceId, String topic, String payload, Instant start) { try { publisherQueueSize.decrementAndGet(); if (!connectWait.tryAcquire(INITIALIZE_TIME_MS, TimeUnit.MILLISECONDS)) { @@ -362,14 +363,14 @@ private synchronized void delayStateUpdate(String deviceId) { lastStateTime.put(deviceId, now); } - private void sendMessage(String mqttTopic, byte[] mqttMessage) throws Exception { + private synchronized void sendMessage(String mqttTopic, byte[] mqttMessage) throws Exception { LOG.debug(deviceId + " sending message to " + mqttTopic); mqttClient.publish(mqttTopic, mqttMessage, QOS_AT_LEAST_ONCE, MQTT_NO_RETAIN); publishCounter.incrementAndGet(); } @Override - public void close() { + public synchronized void close() { try { LOG.debug(format("Shutting down executor %x", publisherExecutor.hashCode())); ifNotNullThen(tickler, () -> tickler.cancel(false)); @@ -388,7 +389,7 @@ public void close() { @Override public String getSubscriptionId() { - return mqttClient.getClientId(); + return mqttClientId; } @Override @@ -396,7 +397,7 @@ public void activate() { } @Override - public boolean isActive() { + public synchronized boolean isActive() { return mqttClient.isConnected(); } @@ -430,7 +431,7 @@ private MqttClient newMqttClient(String deviceId) { } } - private void connectMqttClient(String deviceId) { + private synchronized void connectMqttClient(String deviceId) { try { if (mqttClient.isConnected()) { return; @@ -465,7 +466,7 @@ private String getUserName() { }; } - private void connectAndSetupMqtt() { + private synchronized void connectAndSetupMqtt() { try { LOG.info(deviceId + " creating new auth token for audience " + projectId); mqttConnectOptions.setPassword(getAuthToken(projectId)); @@ -497,7 +498,7 @@ private char[] getHashPassword(String audience) { return hashKeyPassword.toCharArray(); } - private void maybeRefreshJwt() { + private synchronized void maybeRefreshJwt() { long refreshTime = mqttTokenSetTimeMs + TOKEN_EXPIRATION_MS / 2; long currentTimeMillis = System.currentTimeMillis(); long remaining = refreshTime - currentTimeMillis; @@ -552,7 +553,7 @@ private void subscribeToConfig(String deviceId) { clientSubscribe(CONFIG_TOPIC, QOS_AT_LEAST_ONCE); } - private void clientSubscribe(String topicSuffix, int qos) { + private synchronized void clientSubscribe(String topicSuffix, int qos) { String topic = topicBase + topicSuffix; try { LOG.info(format("Subscribing with qos %d to topic %s", qos, topic)); From 5dac5c40aa1c156a0abf36eb17378630e88f325e Mon Sep 17 00:00:00 2001 From: Trevor Pering Date: Wed, 6 Nov 2024 13:51:16 -0800 Subject: [PATCH 2/2] Fixing linty --- .../com/google/bos/iot/core/proxy/MqttPublisher.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/validator/src/main/java/com/google/bos/iot/core/proxy/MqttPublisher.java b/validator/src/main/java/com/google/bos/iot/core/proxy/MqttPublisher.java index bc79f8fa0..f1b71e9f5 100644 --- a/validator/src/main/java/com/google/bos/iot/core/proxy/MqttPublisher.java +++ b/validator/src/main/java/com/google/bos/iot/core/proxy/MqttPublisher.java @@ -94,8 +94,7 @@ public class MqttPublisher implements MessagePublisher { private static final int INITIALIZE_TIME_MS = 20000; private static final String BROKER_URL_FORMAT = "%s://%s:%s"; private static final int PUBLISH_THREAD_COUNT = 10; - private static final int TOKEN_EXPIRATION_SEC = 60; - private static final int TOKEN_EXPIRATION_MS = TOKEN_EXPIRATION_SEC * 1000; + private static final Duration TOKEN_EXPIRATION = Duration.ofHours(1); private static final String TICKLE_TOPIC = "events/udmi"; private static final long TICKLE_PERIOD_SEC = 10; private static final String REFLECTOR_PUBLIC_KEY = "reflector/rsa_public.pem"; @@ -150,7 +149,7 @@ public class MqttPublisher implements MessagePublisher { providerHostname = getProviderHostname(config); topicBase = getTopicBase(); clientId = catchToNull(() -> config.reflector_endpoint.client_id); - LOG.info(deviceId + " token expiration sec " + TOKEN_EXPIRATION_SEC); + LOG.info(deviceId + " token expiration sec " + TOKEN_EXPIRATION.getSeconds()); certManager = getCertManager(); mqttClient = newMqttClient(deviceId); mqttClientId = mqttClient.getClientId(); @@ -309,7 +308,8 @@ private synchronized void publishCore(String deviceId, String topic, String payl publishRaw(deviceId, topic, payload, start); } - private synchronized void publishRaw(String deviceId, String topic, String payload, Instant start) { + private synchronized void publishRaw(String deviceId, String topic, String payload, + Instant start) { try { publisherQueueSize.decrementAndGet(); if (!connectWait.tryAcquire(INITIALIZE_TIME_MS, TimeUnit.MILLISECONDS)) { @@ -499,7 +499,7 @@ private char[] getHashPassword(String audience) { } private synchronized void maybeRefreshJwt() { - long refreshTime = mqttTokenSetTimeMs + TOKEN_EXPIRATION_MS / 2; + long refreshTime = mqttTokenSetTimeMs + TOKEN_EXPIRATION.toMillis() / 2; long currentTimeMillis = System.currentTimeMillis(); long remaining = refreshTime - currentTimeMillis; LOG.debug(deviceId + " remaining until refresh " + remaining); @@ -606,7 +606,7 @@ private String createJwt(String projectId, byte[] privateKeyBytes, String algori JwtBuilder jwtBuilder = Jwts.builder() .setIssuedAt(now.toDate()) - .setExpiration(now.plusMillis(TOKEN_EXPIRATION_MS).toDate()) + .setExpiration(now.plusMillis((int) TOKEN_EXPIRATION.toMillis()).toDate()) .setAudience(projectId); LOG.info(format("Creating jwt %s key with audience %s", algorithm, projectId));