diff --git a/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/KafkaTracing.java b/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/KafkaTracing.java index f40607c93e..c64f4317a8 100644 --- a/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/KafkaTracing.java +++ b/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/KafkaTracing.java @@ -190,7 +190,7 @@ public Producer producer(Producer producer) { * one couldn't be extracted. */ public Span nextSpan(ConsumerRecord record) { - // Eventhough the type is ConsumerRecord, this is not a (remote) consumer span. Only "poll" + // Even though the type is ConsumerRecord, this is not a (remote) consumer span. Only "poll" // events create consumer spans. Since this is a processor span, we use the normal sampler. TraceContextOrSamplingFlags extracted = extractAndClearTraceIdHeaders(processorExtractor, record.headers(), record.headers()); diff --git a/instrumentation/kafka-clients/src/test/java/brave/kafka/clients/KafkaContainer.java b/instrumentation/kafka-clients/src/test/java/brave/kafka/clients/KafkaContainer.java index b02bd7daaa..e97e639707 100644 --- a/instrumentation/kafka-clients/src/test/java/brave/kafka/clients/KafkaContainer.java +++ b/instrumentation/kafka-clients/src/test/java/brave/kafka/clients/KafkaContainer.java @@ -32,7 +32,7 @@ final class KafkaContainer extends GenericContainer { static final int KAFKA_PORT = 19092; KafkaContainer() { - super(parse("ghcr.io/openzipkin/zipkin-kafka:3.0.2")); + super(parse("ghcr.io/openzipkin/zipkin-kafka:3.1.0")); waitStrategy = Wait.forHealthcheck(); // Kafka broker listener port (19092) needs to be exposed for test cases to access it. addFixedExposedPort(KAFKA_PORT, KAFKA_PORT, InternetProtocol.TCP); diff --git a/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/KafkaStreamsPropagation.java b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/KafkaStreamsPropagation.java index eecb6ad505..df246f7243 100644 --- a/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/KafkaStreamsPropagation.java +++ b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/KafkaStreamsPropagation.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2020 The OpenZipkin Authors + * Copyright 2013-2024 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -16,12 +16,12 @@ import brave.propagation.Propagation.Getter; import brave.propagation.Propagation.Setter; import org.apache.kafka.common.header.Headers; -import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.api.ProcessingContext; final class KafkaStreamsPropagation { /** - * Used by {@link KafkaStreamsTracing#nextSpan(ProcessorContext)} to extract a trace context from - * a prior stage. + * Used by {@link KafkaStreamsTracing#nextSpan(ProcessingContext, Headers)} to extract a trace + * context from a prior stage. */ static final Getter GETTER = new Getter() { @Override public String get(Headers headers, String key) { diff --git a/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingFixedKeyProcessor.java b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingFixedKeyProcessor.java index 01521b7649..2f4090fdfb 100644 --- a/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingFixedKeyProcessor.java +++ b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingFixedKeyProcessor.java @@ -13,12 +13,14 @@ */ package brave.kafka.streams; +import brave.propagation.CurrentTraceContext; +import brave.propagation.TraceContext; import org.apache.kafka.common.header.Headers; import org.apache.kafka.streams.processor.api.FixedKeyProcessor; import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext; import org.apache.kafka.streams.processor.api.FixedKeyRecord; -class TracingFixedKeyProcessor extends +final class TracingFixedKeyProcessor extends BaseTracingProcessor, FixedKeyRecord, FixedKeyProcessor> implements FixedKeyProcessor { @@ -31,13 +33,20 @@ class TracingFixedKeyProcessor extends return record.headers(); } - @Override - void process(FixedKeyProcessor delegate, FixedKeyRecord record) { + @Override void process(FixedKeyProcessor delegate, + FixedKeyRecord record) { delegate.process(record); } @Override public void init(FixedKeyProcessorContext context) { this.context = context; + CurrentTraceContext current = + kafkaStreamsTracing.kafkaTracing.messagingTracing().tracing().currentTraceContext(); + TraceContext traceContext = current.get(); + if (traceContext != null) { + context = + new TracingFixedKeyProcessorContext<>(context, kafkaStreamsTracing.injector, traceContext); + } delegate.init(context); } diff --git a/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingFixedKeyProcessorContext.java b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingFixedKeyProcessorContext.java new file mode 100644 index 0000000000..86c995bac1 --- /dev/null +++ b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingFixedKeyProcessorContext.java @@ -0,0 +1,41 @@ +/* + * Copyright 2013-2024 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package brave.kafka.streams; + +import brave.propagation.TraceContext; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext; +import org.apache.kafka.streams.processor.api.FixedKeyRecord; + +/** Injects the initialization tracing context to record headers on forward */ +final class TracingFixedKeyProcessorContext + extends TracingProcessingContext> + implements FixedKeyProcessorContext { + + TracingFixedKeyProcessorContext(FixedKeyProcessorContext delegate, + TraceContext.Injector injector, TraceContext context) { + super(delegate, injector, context); + } + + @Override public void forward(FixedKeyRecord r) { + injector.inject(context, r.headers()); + delegate.forward(r); + } + + @Override + public void forward(FixedKeyRecord r, String s) { + injector.inject(context, r.headers()); + delegate.forward(r, s); + } +} diff --git a/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingProcessingContext.java b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingProcessingContext.java new file mode 100644 index 0000000000..8f7e0b75ed --- /dev/null +++ b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingProcessingContext.java @@ -0,0 +1,101 @@ +/* + * Copyright 2013-2024 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package brave.kafka.streams; + +import brave.propagation.TraceContext; +import brave.propagation.TraceContext.Injector; +import java.io.File; +import java.time.Duration; +import java.util.Map; +import java.util.Optional; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.streams.StreamsMetrics; +import org.apache.kafka.streams.processor.Cancellable; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.Punctuator; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.api.ProcessingContext; +import org.apache.kafka.streams.processor.api.RecordMetadata; + +abstract class TracingProcessingContext implements ProcessingContext { + final C delegate; + final Injector injector; + final TraceContext context; + + TracingProcessingContext(C delegate, Injector injector, + TraceContext context) { + this.delegate = delegate; + this.injector = injector; + this.context = context; + } + + @Override public String applicationId() { + return delegate.applicationId(); + } + + @Override public TaskId taskId() { + return delegate.taskId(); + } + + @Override public Optional recordMetadata() { + return delegate.recordMetadata(); + } + + @Override public Serde keySerde() { + return delegate.keySerde(); + } + + @Override public Serde valueSerde() { + return delegate.valueSerde(); + } + + @Override public File stateDir() { + return delegate.stateDir(); + } + + @Override public StreamsMetrics metrics() { + return delegate.metrics(); + } + + @Override public S getStateStore(String s) { + return delegate.getStateStore(s); + } + + @Override public Cancellable schedule(Duration duration, PunctuationType punctuationType, + Punctuator punctuator) { + return delegate.schedule(duration, punctuationType, punctuator); + } + + @Override public void commit() { + delegate.commit(); + } + + @Override public Map appConfigs() { + return delegate.appConfigs(); + } + + @Override public Map appConfigsWithPrefix(String s) { + return delegate.appConfigsWithPrefix(s); + } + + @Override public long currentSystemTimeMs() { + return delegate.currentSystemTimeMs(); + } + + @Override public long currentStreamTimeMs() { + return delegate.currentStreamTimeMs(); + } +} diff --git a/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingProcessor.java b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingProcessor.java index 924bd7d1a6..184546f3e6 100644 --- a/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingProcessor.java +++ b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingProcessor.java @@ -13,6 +13,8 @@ */ package brave.kafka.streams; +import brave.propagation.CurrentTraceContext; +import brave.propagation.TraceContext; import org.apache.kafka.common.header.Headers; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; @@ -37,6 +39,12 @@ final class TracingProcessor extends @Override public void init(ProcessorContext context) { this.context = context; + CurrentTraceContext current = + kafkaStreamsTracing.kafkaTracing.messagingTracing().tracing().currentTraceContext(); + TraceContext traceContext = current.get(); + if (traceContext != null) { + context = new TracingProcessorContext<>(context, kafkaStreamsTracing.injector, traceContext); + } delegate.init(context); } diff --git a/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingProcessorContext.java b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingProcessorContext.java new file mode 100644 index 0000000000..36331f8de2 --- /dev/null +++ b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingProcessorContext.java @@ -0,0 +1,42 @@ +/* + * Copyright 2013-2024 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package brave.kafka.streams; + +import brave.propagation.TraceContext; +import brave.propagation.TraceContext.Injector; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; + +/** Injects the initialization tracing context to record headers on forward */ +final class TracingProcessorContext + extends TracingProcessingContext> + implements ProcessorContext { + + TracingProcessorContext(ProcessorContext delegate, + Injector injector, TraceContext context) { + super(delegate, injector, context); + } + + @Override public void forward(Record r) { + injector.inject(context, r.headers()); + delegate.forward(r); + } + + @Override + public void forward(Record r, String s) { + injector.inject(context, r.headers()); + delegate.forward(r, s); + } +} diff --git a/instrumentation/kafka-streams/src/test/java/brave/kafka/streams/ITKafkaStreamsTracing.java b/instrumentation/kafka-streams/src/test/java/brave/kafka/streams/ITKafkaStreamsTracing.java index 98ef2242f4..4eb3d6d24a 100644 --- a/instrumentation/kafka-streams/src/test/java/brave/kafka/streams/ITKafkaStreamsTracing.java +++ b/instrumentation/kafka-streams/src/test/java/brave/kafka/streams/ITKafkaStreamsTracing.java @@ -16,9 +16,14 @@ import brave.handler.MutableSpan; import brave.kafka.clients.KafkaTracing; import brave.messaging.MessagingTracing; +import brave.propagation.SamplingFlags; +import brave.propagation.TraceContext; import java.util.ArrayList; import java.util.List; import java.util.Properties; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.function.BiFunction; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -32,13 +37,18 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.processor.api.ContextualFixedKeyProcessor; +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier; +import org.apache.kafka.streams.processor.api.FixedKeyRecord; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.Timeout; -import org.junit.jupiter.api.extension.RegisterExtension; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; @@ -191,18 +201,15 @@ class ITKafkaStreamsTracing extends ITKafkaStreams { consumer.close(); } - @Test void should_create_spans_from_stream_with_tracing_v2_processor() { - org.apache.kafka.streams.processor.api.ProcessorSupplier - processorSupplier = - kafkaStreamsTracing.process( - "forward-1", () -> - record -> { - try { - Thread.sleep(100L); - } catch (InterruptedException e) { - e.printStackTrace(); - } - }); + @Test void should_create_spans_from_stream_with_tracing_processor() { + ProcessorSupplier processorSupplier = + kafkaStreamsTracing.process("forward-1", () -> record -> { + try { + Thread.sleep(100L); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); String inputTopic = testName + "-input"; @@ -227,18 +234,15 @@ record -> { streams.cleanUp(); } - @Test void should_create_spans_from_stream_with_tracing_v2_fixed_key_processor() { - org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier - processorSupplier = - kafkaStreamsTracing.processValues( - "forward-1", () -> - record -> { - try { - Thread.sleep(100L); - } catch (InterruptedException e) { - e.printStackTrace(); - } - }); + @Test void should_create_spans_from_stream_with_tracing_fixed_key_processor() { + FixedKeyProcessorSupplier processorSupplier = + kafkaStreamsTracing.processValues("forward-1", () -> record -> { + try { + Thread.sleep(100L); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); String inputTopic = testName + "-input"; @@ -263,6 +267,79 @@ record -> { streams.cleanUp(); } + @Test void injectsInitContextOnForward_process() { + injectsInitContextOnForward( + (stream, contexts) -> stream.process(kafkaStreamsTracing.process("forward", + () -> new ContextualProcessor() { + @Override public void process(Record record) { + context().forward(record); + } + })) + .process(kafkaStreamsTracing.process("check", + () -> record -> contexts.add(currentTraceContext.get())) + )); + } + + @Test void injectsInitContextOnForward_processValues() { + injectsInitContextOnForward( + (stream, contexts) -> stream.processValues(kafkaStreamsTracing.processValues("forward", + () -> new ContextualFixedKeyProcessor() { + @Override public void process(FixedKeyRecord record) { + context().forward(record); + } + })) + .processValues(kafkaStreamsTracing.processValues("check", + () -> record -> contexts.add(currentTraceContext.get())) + )); + } + + void injectsInitContextOnForward( + BiFunction, BlockingDeque, KStream> configureStream) { + + String inputTopic = testName + "-input"; + + BlockingDeque contexts = new LinkedBlockingDeque<>(); + + StreamsBuilder builder = new StreamsBuilder(); + configureStream.apply(builder.stream(inputTopic), contexts); + Topology topology = builder.build(); + + KafkaStreams streams = buildKafkaStreams(topology); + + // Simulate a trace that started from an external producer + TraceContext root = newTraceContext(SamplingFlags.SAMPLED); + ProducerRecord record = new ProducerRecord<>(inputTopic, TEST_KEY, TEST_VALUE); + kafkaStreamsTracing.injector.inject(root, record.headers()); + send(record); + + waitForStreamToRun(streams); + + MutableSpan spanPoll = testSpanHandler.takeRemoteSpan(CONSUMER); + assertThat(spanPoll.name()).isEqualTo("poll"); + assertThat(spanPoll.tags()).containsEntry("kafka.topic", inputTopic); + + MutableSpan spanCheck = testSpanHandler.takeLocalSpan(); + assertThat(spanCheck.name()).isEqualTo("check"); + assertThat(spanCheck.tags()).doesNotContainKey("kafka.topic"); // not remote + + MutableSpan spanForward = testSpanHandler.takeLocalSpan(); + assertThat(spanForward.name()).isEqualTo("forward"); + assertThat(spanCheck.tags()).doesNotContainKey("kafka.topic"); // not remote + + // All spans are in the same trace + assertChildOf(spanCheck, spanForward); + assertChildOf(spanForward, spanPoll); + assertChildOf(spanPoll, root); + + // The terminal processor had the right span in context + TraceContext contextCheck = contexts.poll(); + assertThat(contextCheck).isNotNull(); + assertSameIds(spanCheck, contextCheck); + + streams.close(); + streams.cleanUp(); + } + private void waitForStreamToRun(KafkaStreams streams) { streams.start(); diff --git a/instrumentation/kafka-streams/src/test/java/brave/kafka/streams/KafkaContainer.java b/instrumentation/kafka-streams/src/test/java/brave/kafka/streams/KafkaContainer.java index 18a3316338..85f6d1fac5 100644 --- a/instrumentation/kafka-streams/src/test/java/brave/kafka/streams/KafkaContainer.java +++ b/instrumentation/kafka-streams/src/test/java/brave/kafka/streams/KafkaContainer.java @@ -32,7 +32,7 @@ final class KafkaContainer extends GenericContainer { static final int KAFKA_PORT = 19092; KafkaContainer() { - super(parse("ghcr.io/openzipkin/zipkin-kafka:3.0.2")); + super(parse("ghcr.io/openzipkin/zipkin-kafka:3.1.0")); waitStrategy = Wait.forHealthcheck(); // Kafka broker listener port (19092) needs to be exposed for test cases to access it. addFixedExposedPort(KAFKA_PORT, KAFKA_PORT, InternetProtocol.TCP); diff --git a/instrumentation/spring-rabbit/src/test/java/brave/spring/rabbit/ITSpringRabbit.java b/instrumentation/spring-rabbit/src/test/java/brave/spring/rabbit/ITSpringRabbit.java index 49e294d094..8bf2de9d6b 100644 --- a/instrumentation/spring-rabbit/src/test/java/brave/spring/rabbit/ITSpringRabbit.java +++ b/instrumentation/spring-rabbit/src/test/java/brave/spring/rabbit/ITSpringRabbit.java @@ -97,7 +97,7 @@ abstract class ITSpringRabbit extends ITRemote { static final class RabbitMQContainer extends GenericContainer { RabbitMQContainer() { - super(parse("ghcr.io/openzipkin/zipkin-rabbitmq:3.0.2")); + super(parse("ghcr.io/openzipkin/zipkin-rabbitmq:3.1.0")); withExposedPorts(RABBIT_PORT); waitStrategy = Wait.forLogMessage(".*Server startup complete.*", 1); withStartupTimeout(Duration.ofSeconds(60));