diff --git a/src/NServiceBus.Extensions.IntegrationTesting/AttachIncomingLogicalMessageContextToActivity.cs b/src/NServiceBus.Extensions.IntegrationTesting/AttachIncomingLogicalMessageContextToActivity.cs new file mode 100644 index 0000000..b7f0538 --- /dev/null +++ b/src/NServiceBus.Extensions.IntegrationTesting/AttachIncomingLogicalMessageContextToActivity.cs @@ -0,0 +1,19 @@ +using System; +using System.Diagnostics; +using System.Threading.Tasks; +using NServiceBus.Pipeline; + +namespace NServiceBus.Extensions.IntegrationTesting +{ + internal class AttachIncomingLogicalMessageContextToActivity : Behavior + { + public override Task Invoke( + IIncomingLogicalMessageContext context, + Func next + ) + { + Activity.Current?.AddTag("testing.incoming.message.context", context); + return next(); + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.Extensions.IntegrationTesting/AttachInvokeHandlerContextToActivity.cs b/src/NServiceBus.Extensions.IntegrationTesting/AttachInvokeHandlerContextToActivity.cs new file mode 100644 index 0000000..c577d56 --- /dev/null +++ b/src/NServiceBus.Extensions.IntegrationTesting/AttachInvokeHandlerContextToActivity.cs @@ -0,0 +1,19 @@ +using System; +using System.Diagnostics; +using System.Threading.Tasks; +using NServiceBus.Pipeline; + +namespace NServiceBus.Extensions.IntegrationTesting +{ + internal class AttachInvokeHandlerContextToActivity : Behavior + { + public override Task Invoke( + IInvokeHandlerContext context, + Func next + ) + { + Activity.Current?.AddTag("testing.invoke.handler.context", context); + return next(); + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.Extensions.IntegrationTesting/AttachOutgoingLogicalMessageContextToActivity.cs b/src/NServiceBus.Extensions.IntegrationTesting/AttachOutgoingLogicalMessageContextToActivity.cs new file mode 100644 index 0000000..919bfca --- /dev/null +++ b/src/NServiceBus.Extensions.IntegrationTesting/AttachOutgoingLogicalMessageContextToActivity.cs @@ -0,0 +1,19 @@ +using System; +using System.Diagnostics; +using System.Threading.Tasks; +using NServiceBus.Pipeline; + +namespace NServiceBus.Extensions.IntegrationTesting +{ + internal class AttachOutgoingLogicalMessageContextToActivity : Behavior + { + public override Task Invoke( + IOutgoingLogicalMessageContext context, + Func next + ) + { + Activity.Current?.AddTag("testing.outgoing.message.context", context); + return next(); + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.Extensions.IntegrationTesting/EndpointConfigurationExtensions.cs b/src/NServiceBus.Extensions.IntegrationTesting/EndpointConfigurationExtensions.cs index ddccb70..71ff6ed 100644 --- a/src/NServiceBus.Extensions.IntegrationTesting/EndpointConfigurationExtensions.cs +++ b/src/NServiceBus.Extensions.IntegrationTesting/EndpointConfigurationExtensions.cs @@ -35,7 +35,21 @@ public static EndpointConfiguration ConfigureTestEndpoint(this EndpointConfigura transportConfigurationAction?.Invoke(transport); endpoint.PurgeOnStartup(true); - endpoint.DisableFeature(); + endpoint.DisableFeature(); + endpoint.EnableOpenTelemetry(); + + endpoint.Pipeline.Register( + new AttachIncomingLogicalMessageContextToActivity(), + "Attach incoming logical message context as OpenTelemetry tags"); + + endpoint.Pipeline.Register( + new AttachOutgoingLogicalMessageContextToActivity(), + "Attach Outgoing Logical Message Context as OpenTelemetry tags"); + + endpoint.Pipeline.Register( + new AttachInvokeHandlerContextToActivity(), + "Attach invoke handler Context as OpenTelemetry tags"); + endpoint .Recoverability() .Immediate(i => i.NumberOfRetries(0)) diff --git a/src/NServiceBus.Extensions.IntegrationTesting/EndpointFixture.cs b/src/NServiceBus.Extensions.IntegrationTesting/EndpointFixture.cs index 2a460f6..3fdf308 100644 --- a/src/NServiceBus.Extensions.IntegrationTesting/EndpointFixture.cs +++ b/src/NServiceBus.Extensions.IntegrationTesting/EndpointFixture.cs @@ -4,7 +4,6 @@ using System.Reactive.Linq; using System.Threading; using System.Threading.Tasks; -using NServiceBus.Extensions.Diagnostics; using NServiceBus.Pipeline; using NServiceBus.Sagas; @@ -72,58 +71,55 @@ private static async Task ExecuteAndWait(); - using var all = DiagnosticListener.AllListeners - .Subscribe(listener => + // https://github.com/dotnet/runtime/blob/eccafbac942be9e3b06d48cff735fd6e50c3f25a/src/libraries/System.Diagnostics.DiagnosticSource/tests/ActivitySourceTests.cs#L134 + using ActivityListener listener = new(); + listener.ActivityStopped = (activitySource) => + { + switch (activitySource.OperationName) { - switch (listener.Name) - { - case ActivityNames.IncomingLogicalMessage: - var incomingObs = listener - .Select(e => e.Value) - .Cast(); - - subscriptions.Add(incomingObs.Subscribe(e => - { - incomingMessageContexts.Add(e); - - if (e is TMessageContext ctx && predicate(ctx)) - { - messageReceivingTaskSource.SetResult(null); - } - })); - - break; - case ActivityNames.OutgoingLogicalMessage: - var outgoingObs = listener - .Select(e => e.Value) - .Cast(); - - subscriptions.Add(outgoingObs.Subscribe((e) => - { - outgoingMessageContexts.Add(e); - - if (e is TMessageContext ctx && predicate(ctx)) - { - messageReceivingTaskSource.SetResult(null); - } - })); - - break; - case ActivityNames.InvokedHandler: - var invokeHandlerObs = listener.Select(e => e.Value).Cast(); - subscriptions.Add(invokeHandlerObs.Subscribe((e) => - { - invokeHandlerContexts.Add(e); - - if (e is TMessageContext ctx && predicate(ctx)) - { - messageReceivingTaskSource.SetResult(null); - } - })); - - break; - } - }); + case NsbActivityNames.IncomingMessageActivityName: + var context = activitySource.GetTagItem("testing.incoming.message.context") as IIncomingLogicalMessageContext; + + incomingMessageContexts.Add(context); + + if (context is TMessageContext ctx && predicate(ctx)) + { + messageReceivingTaskSource.SetResult(null); + } + + break; + case NsbActivityNames.OutgoingMessageActivityName: + var outgoingContext = activitySource.GetTagItem("testing.outgoing.message.context") as IOutgoingLogicalMessageContext; + + outgoingMessageContexts.Add(outgoingContext); + + if (outgoingContext is TMessageContext ctx2 && predicate(ctx2)) + { + messageReceivingTaskSource.SetResult(null); + } + + + break; + case NsbActivityNames.InvokeHandlerActivityName: + var handlerContext = activitySource.Parent.GetTagItem("testing.invoke.handler.context") as IInvokeHandlerContext; + + invokeHandlerContexts.Add(handlerContext); + + if (handlerContext is TMessageContext ctx3 && predicate(ctx3)) + { + messageReceivingTaskSource.SetResult(null); + } + + + break; + } + }; + listener.ShouldListenTo = _ => true; + listener.Sample = (ref ActivityCreationOptions activityOptions) => ActivitySamplingResult.AllData; + + ActivitySource.AddActivityListener(listener); + + await testAction(); diff --git a/src/NServiceBus.Extensions.IntegrationTesting/NServiceBus.Extensions.IntegrationTesting.csproj b/src/NServiceBus.Extensions.IntegrationTesting/NServiceBus.Extensions.IntegrationTesting.csproj index ad9c4d5..20617c9 100644 --- a/src/NServiceBus.Extensions.IntegrationTesting/NServiceBus.Extensions.IntegrationTesting.csproj +++ b/src/NServiceBus.Extensions.IntegrationTesting/NServiceBus.Extensions.IntegrationTesting.csproj @@ -1,7 +1,7 @@  - netstandard2.0 + net6.0 Jimmy Bogard Jimmy Bogard Apache-2.0 @@ -12,10 +12,11 @@ true true nservicebus;messaging;testing + Library - + diff --git a/src/NServiceBus.Extensions.IntegrationTesting/NsbActivityNames.cs b/src/NServiceBus.Extensions.IntegrationTesting/NsbActivityNames.cs new file mode 100644 index 0000000..2186f8a --- /dev/null +++ b/src/NServiceBus.Extensions.IntegrationTesting/NsbActivityNames.cs @@ -0,0 +1,11 @@ +namespace NServiceBus.Extensions.IntegrationTesting +{ + internal static class NsbActivityNames + { + public const string IncomingMessageActivityName = "NServiceBus.Diagnostics.ReceiveMessage"; + + public const string OutgoingMessageActivityName = "NServiceBus.Diagnostics.SendMessage"; + + public const string InvokeHandlerActivityName = "NServiceBus.Diagnostics.InvokeHandler"; + } +} \ No newline at end of file diff --git a/tests/NServiceBus.Extensions.IntegrationTesting.Tests/NServiceBus.Extensions.IntegrationTesting.Tests.csproj b/tests/NServiceBus.Extensions.IntegrationTesting.Tests/NServiceBus.Extensions.IntegrationTesting.Tests.csproj index 7dfdf9a..b147a82 100644 --- a/tests/NServiceBus.Extensions.IntegrationTesting.Tests/NServiceBus.Extensions.IntegrationTesting.Tests.csproj +++ b/tests/NServiceBus.Extensions.IntegrationTesting.Tests/NServiceBus.Extensions.IntegrationTesting.Tests.csproj @@ -9,7 +9,7 @@ - +