Skip to content

Commit

Permalink
Merge pull request #70 from mskcc/trace-prop-updates
Browse files Browse the repository at this point in the history
Updates to support B3 trace propagation
  • Loading branch information
n1zea144 authored Jan 30, 2024
2 parents d50a208 + ef515da commit 3022083
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 12 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<groupId>org.mskcc.cmo</groupId>
<artifactId>smile-messaging-java</artifactId>
<name>CMO SMILE Messaging Library</name>
<version>0.1.1-SNAPSHOT</version>
<version>1.0.0-SNAPSHOT</version>
<description>master maven module</description>
<packaging>jar</packaging>
<parent>
Expand All @@ -21,7 +21,7 @@
<jackson.version>2.11.2</jackson.version>
<!-- smile common centralized config properties -->
<smile_commons.group>com.github.mskcc</smile_commons.group>
<smile_commons.version>1.3.10.RELEASE</smile_commons.version>
<smile_commons.version>1.4.0.RELEASE</smile_commons.version>
</properties>

<repositories>
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/org/mskcc/cmo/messaging/Gateway.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.mskcc.cmo.messaging;

import io.nats.client.Message;
import org.mskcc.smile.commons.OpenTelemetryUtils.TraceMetadata;
import java.util.Map;

public interface Gateway {

Expand All @@ -11,7 +11,8 @@ public interface Gateway {

boolean isConnected();

void publishWithTrace(String subject, Object message, TraceMetadata tmd) throws Exception;
void publishWithTracePropagation(String subject, Object message, Map<String, String> traceMetadata)
throws Exception;

void publish(String subject, Object message) throws Exception;

Expand Down
18 changes: 10 additions & 8 deletions src/main/java/org/mskcc/cmo/messaging/impl/JSGatewayImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.mskcc.cmo.messaging.MessageConsumer;
import org.mskcc.cmo.messaging.utils.SSLUtils;
import org.mskcc.smile.commons.FileUtil;
import org.mskcc.smile.commons.OpenTelemetryUtils.TraceMetadata;
import org.mskcc.smile.commons.impl.FileUtilImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
Expand Down Expand Up @@ -142,12 +141,13 @@ private void reconnect() throws Exception {
}

@Override
public void publishWithTrace(String subject, Object message, TraceMetadata tmd) throws Exception {
public void publishWithTracePropagation(String subject, Object message, Map<String, String> traceMetadata)
throws Exception {
if (!isConnected()) {
reconnect();
}
if (!shutdownInitiated) {
publishingQueue.put(new PublishingQueueTask(subject, message, tmd));
publishingQueue.put(new PublishingQueueTask(subject, message, traceMetadata));
} else {
LOG.error("Shutdown initiated, not accepting publish request: \n" + message);
throw new IllegalStateException("Shutdown initiated, not accepting anymore publish requests");
Expand Down Expand Up @@ -323,13 +323,13 @@ private class PublishingQueueTask {
String msgId;
String subject;
Object payload;
TraceMetadata tmd;
Map<String, String> traceMetadata;

public PublishingQueueTask(String subject, Object payload, TraceMetadata tmd) {
public PublishingQueueTask(String subject, Object payload, Map<String, String> traceMetadata) {
this.msgId = NUID.nextGlobal();
this.subject = subject;
this.payload = payload;
this.tmd = tmd;
this.traceMetadata = traceMetadata;
}

public PublishingQueueTask(String subject, Object payload) {
Expand All @@ -346,8 +346,10 @@ public PublishingQueueTask(String msgId, String subject, Object payload) {

public Message getMessage() throws JsonProcessingException {
Headers headers = new Headers().add("Nats-Msg-Subject", subject);
if (tmd != null) {
headers.add(tmd.getHeaderKey(), tmd.getMetadata());
if (traceMetadata != null) {
for (Map.Entry<String, String> entry : traceMetadata.entrySet()) {
headers.add(entry.getKey(), entry.getValue());
}
}
return NatsMessage.builder()
.subject(subject)
Expand Down

0 comments on commit 3022083

Please sign in to comment.