From 1ff5d29571b62fec4438a3af716825c00bfc1643 Mon Sep 17 00:00:00 2001 From: William Chong Date: Wed, 28 Aug 2024 16:18:24 +0400 Subject: [PATCH 1/3] Fix context check in trace subscription method --- .../EventStoreClient.Append.cs | 22 +--- .../Diagnostics/ActivitySourceExtensions.cs | 21 +-- .../Core/Tracing/TracingMetadata.cs | 5 +- .../Diagnostics/EventMetadataExtensions.cs | 20 +-- .../StreamsTracingInstrumentationTests.cs | 124 +++++++++--------- .../Fixtures/DiagnosticsFixture.cs | 28 +++- .../Fixtures/EventStoreFixture.Helpers.cs | 20 ++- 7 files changed, 131 insertions(+), 109 deletions(-) diff --git a/src/EventStore.Client.Streams/EventStoreClient.Append.cs b/src/EventStore.Client.Streams/EventStoreClient.Append.cs index ec719c4fc..9fa0a62d8 100644 --- a/src/EventStore.Client.Streams/EventStoreClient.Append.cs +++ b/src/EventStore.Client.Streams/EventStoreClient.Append.cs @@ -132,13 +132,9 @@ await call.RequestStream foreach (var e in eventData) { var appendReq = new AppendReq { ProposedMessage = new() { - Id = e.EventId.ToDto(), - Data = ByteString.CopyFrom(e.Data.Span), - CustomMetadata = ByteString.CopyFrom( - e.ContentType == Constants.Metadata.ContentTypes.ApplicationJson - ? e.Metadata.InjectTracingContext(Activity.Current) - : e.Metadata.Span - ), + Id = e.EventId.ToDto(), + Data = ByteString.CopyFrom(e.Data.Span), + CustomMetadata = ByteString.CopyFrom(e.Metadata.InjectTracingContext(Activity.Current)), Metadata = { { Constants.Metadata.Type, e.Type }, { Constants.Metadata.ContentType, e.ContentType } @@ -392,13 +388,9 @@ IEnumerable GetRequests(IEnumerable events, BatchAppe foreach (var eventData in events) { var proposedMessage = new BatchAppendReq.Types.ProposedMessage { - Data = ByteString.CopyFrom(eventData.Data.Span), - CustomMetadata = ByteString.CopyFrom( - eventData.ContentType == Constants.Metadata.ContentTypes.ApplicationJson - ? eventData.Metadata.InjectTracingContext(Activity.Current) - : eventData.Metadata.Span - ), - Id = eventData.EventId.ToDto(), + Data = ByteString.CopyFrom(eventData.Data.Span), + CustomMetadata = ByteString.CopyFrom(eventData.Metadata.InjectTracingContext(Activity.Current)), + Id = eventData.EventId.ToDto(), Metadata = { { Constants.Metadata.Type, eventData.Type }, { Constants.Metadata.ContentType, eventData.ContentType } @@ -435,4 +427,4 @@ public void Dispose() { } } } -} \ No newline at end of file +} diff --git a/src/EventStore.Client/Common/Diagnostics/ActivitySourceExtensions.cs b/src/EventStore.Client/Common/Diagnostics/ActivitySourceExtensions.cs index ef0be085f..9ebe09799 100644 --- a/src/EventStore.Client/Common/Diagnostics/ActivitySourceExtensions.cs +++ b/src/EventStore.Client/Common/Diagnostics/ActivitySourceExtensions.cs @@ -18,8 +18,7 @@ public static async ValueTask TraceClientOperation( var res = await tracedOperation().ConfigureAwait(false); activity?.StatusOk(); return res; - } - catch (Exception ex) { + } catch (Exception ex) { activity?.StatusError(ex); throw; } @@ -33,15 +32,12 @@ public static void TraceSubscriptionEvent( EventStoreClientSettings settings, UserCredentials? userCredentials ) { - if (resolvedEvent.OriginalEvent.ContentType != Constants.Metadata.ContentTypes.ApplicationJson) - return; - if (source.HasNoActiveListeners()) return; var parentContext = resolvedEvent.OriginalEvent.Metadata.ExtractPropagationContext(); - if (parentContext is null) return; + if (parentContext == default(ActivityContext)) return; var tags = new ActivityTagsCollection() .WithRequiredTag(TelemetryTags.EventStore.Stream, resolvedEvent.OriginalEvent.EventStreamId) @@ -51,14 +47,19 @@ public static void TraceSubscriptionEvent( // Ensure consistent server.address attribute when connecting to cluster via dns discovery .WithGrpcChannelServerTags(channelInfo) .WithClientSettingsServerTags(settings) - .WithOptionalTag(TelemetryTags.Database.User, userCredentials?.Username ?? settings.DefaultCredentials?.Username); + .WithOptionalTag( + TelemetryTags.Database.User, + userCredentials?.Username ?? settings.DefaultCredentials?.Username + ); - StartActivity(source, TracingConstants.Operations.Subscribe, ActivityKind.Consumer, tags, parentContext)?.Dispose(); + StartActivity(source, TracingConstants.Operations.Subscribe, ActivityKind.Consumer, tags, parentContext) + ?.Dispose(); } static Activity? StartActivity( this ActivitySource source, - string operationName, ActivityKind activityKind, ActivityTagsCollection? tags = null, ActivityContext? parentContext = null + string operationName, ActivityKind activityKind, ActivityTagsCollection? tags = null, + ActivityContext? parentContext = null ) { if (source.HasNoActiveListeners()) return null; @@ -79,4 +80,4 @@ public static void TraceSubscriptionEvent( } static bool HasNoActiveListeners(this ActivitySource source) => !source.HasListeners(); -} \ No newline at end of file +} diff --git a/src/EventStore.Client/Common/Diagnostics/Core/Tracing/TracingMetadata.cs b/src/EventStore.Client/Common/Diagnostics/Core/Tracing/TracingMetadata.cs index 660dcfc0c..ecb9c68c6 100644 --- a/src/EventStore.Client/Common/Diagnostics/Core/Tracing/TracingMetadata.cs +++ b/src/EventStore.Client/Common/Diagnostics/Core/Tracing/TracingMetadata.cs @@ -25,9 +25,8 @@ readonly record struct TracingMetadata( isRemote: isRemote ) : default; - } - catch (Exception) { + } catch (Exception) { return default; } } -} \ No newline at end of file +} diff --git a/src/EventStore.Client/Common/Diagnostics/EventMetadataExtensions.cs b/src/EventStore.Client/Common/Diagnostics/EventMetadataExtensions.cs index d97861c5a..4245d5bb7 100644 --- a/src/EventStore.Client/Common/Diagnostics/EventMetadataExtensions.cs +++ b/src/EventStore.Client/Common/Diagnostics/EventMetadataExtensions.cs @@ -8,7 +8,9 @@ namespace EventStore.Client.Diagnostics; static class EventMetadataExtensions { [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static ReadOnlySpan InjectTracingContext(this ReadOnlyMemory eventMetadata, Activity? activity) => + public static ReadOnlySpan InjectTracingContext( + this ReadOnlyMemory eventMetadata, Activity? activity + ) => eventMetadata.InjectTracingMetadata(activity?.GetTracingMetadata() ?? TracingMetadata.None); [MethodImpl(MethodImplOptions.AggressiveInlining)] @@ -32,14 +34,15 @@ public static TracingMetadata ExtractTracingMetadata(this ReadOnlyMemory e return new TracingMetadata(traceId.GetString(), spanId.GetString()); } - } - catch (Exception) { + } catch (Exception) { return TracingMetadata.None; } } [MethodImpl(MethodImplOptions.AggressiveInlining)] - static ReadOnlySpan InjectTracingMetadata(this ReadOnlyMemory eventMetadata, TracingMetadata tracingMetadata) { + static ReadOnlySpan InjectTracingMetadata( + this ReadOnlyMemory eventMetadata, TracingMetadata tracingMetadata + ) { if (tracingMetadata == TracingMetadata.None || !tracingMetadata.IsValid) return eventMetadata.Span; @@ -49,7 +52,9 @@ static ReadOnlySpan InjectTracingMetadata(this ReadOnlyMemory eventM } [MethodImpl(MethodImplOptions.AggressiveInlining)] - static ReadOnlyMemory TryInjectTracingMetadata(this ReadOnlyMemory utf8Json, TracingMetadata tracingMetadata) { + static ReadOnlyMemory TryInjectTracingMetadata( + this ReadOnlyMemory utf8Json, TracingMetadata tracingMetadata + ) { try { using var doc = JsonDocument.Parse(utf8Json); using var stream = new MemoryStream(); @@ -72,9 +77,8 @@ static ReadOnlyMemory TryInjectTracingMetadata(this ReadOnlyMemory u writer.Flush(); return stream.ToArray(); - } - catch (Exception) { + } catch (Exception) { return utf8Json; } } -} \ No newline at end of file +} diff --git a/test/EventStore.Client.Streams.Tests/Diagnostics/StreamsTracingInstrumentationTests.cs b/test/EventStore.Client.Streams.Tests/Diagnostics/StreamsTracingInstrumentationTests.cs index b208e4abe..b44569a13 100644 --- a/test/EventStore.Client.Streams.Tests/Diagnostics/StreamsTracingInstrumentationTests.cs +++ b/test/EventStore.Client.Streams.Tests/Diagnostics/StreamsTracingInstrumentationTests.cs @@ -5,7 +5,8 @@ namespace EventStore.Client.Streams.Tests.Diagnostics; [Trait("Category", "Diagnostics:Tracing")] -public class StreamsTracingInstrumentationTests(ITestOutputHelper output, DiagnosticsFixture fixture) : EventStoreTests(output, fixture) { +public class StreamsTracingInstrumentationTests(ITestOutputHelper output, DiagnosticsFixture fixture) + : EventStoreTests(output, fixture) { [Fact] public async Task AppendIsInstrumentedWithTracingAsExpected() { var stream = Fixture.GetStreamName(); @@ -42,66 +43,6 @@ public async Task AppendTraceIsTaggedWithErrorStatusOnException() { Fixture.AssertErroneousAppendActivityHasExpectedTags(activity, actualException); } - [Fact] - public async Task CatchupSubscriptionIsInstrumentedWithTracingAndRestoresRemoteAppendContextAsExpected() { - var stream = Fixture.GetStreamName(); - var events = Fixture.CreateTestEvents(2, metadata: Fixture.CreateTestJsonMetadata()).ToArray(); - - await Fixture.Streams.AppendToStreamAsync( - stream, - StreamState.NoStream, - events - ); - - string? subscriptionId = null; - await Subscribe().WithTimeout(); - - var appendActivity = Fixture - .GetActivitiesForOperation(TracingConstants.Operations.Append, stream) - .SingleOrDefault() - .ShouldNotBeNull(); - - var subscribeActivities = Fixture - .GetActivitiesForOperation(TracingConstants.Operations.Subscribe, stream) - .ToArray(); - - subscriptionId.ShouldNotBeNull(); - subscribeActivities.Length.ShouldBe(events.Length); - - for (var i = 0; i < subscribeActivities.Length; i++) { - subscribeActivities[i].TraceId.ShouldBe(appendActivity.Context.TraceId); - subscribeActivities[i].ParentSpanId.ShouldBe(appendActivity.Context.SpanId); - subscribeActivities[i].HasRemoteParent.ShouldBeTrue(); - - Fixture.AssertSubscriptionActivityHasExpectedTags( - subscribeActivities[i], - stream, - events[i].EventId.ToString(), - subscriptionId - ); - } - - return; - - async Task Subscribe() { - await using var subscription = Fixture.Streams.SubscribeToStream(stream, FromStream.Start); - await using var enumerator = subscription.Messages.GetAsyncEnumerator(); - - var eventsAppeared = 0; - while (await enumerator.MoveNextAsync()) { - if (enumerator.Current is StreamMessage.SubscriptionConfirmation(var sid)) - subscriptionId = sid; - - if (enumerator.Current is not StreamMessage.Event(_)) - continue; - - eventsAppeared++; - if (eventsAppeared >= events.Length) - return; - } - } - } - [Fact] public async Task TracingContextIsInjectedWhenUserMetadataIsValidJsonObject() { var stream = Fixture.GetStreamName(); @@ -169,4 +110,63 @@ await Fixture.Streams.AppendToStreamAsync( var test = JsonSerializer.Deserialize(outputMetadata); outputMetadata.ShouldBe(inputMetadata); } -} \ No newline at end of file + + [Fact] + public async Task json_metadata_event_is_traced_and_non_json_metadata_event_is_not_traced() { + var streamName = Fixture.GetStreamName(); + + var seedEvents = new[] { + Fixture.CreateTestEvent(metadata: Fixture.CreateTestJsonMetadata()), + Fixture.CreateTestEvent(metadata: Fixture.CreateTestNonJsonMetadata()) + }; + + var availableEvents = new HashSet(seedEvents.Select(x => x.EventId)); + + await Fixture.Streams.AppendToStreamAsync(streamName, StreamState.NoStream, seedEvents); + + await using var subscription = Fixture.Streams.SubscribeToStream(streamName, FromStream.Start); + await using var enumerator = subscription.Messages.GetAsyncEnumerator(); + + var appendActivities = Fixture + .GetActivitiesForOperation(TracingConstants.Operations.Append, streamName) + .ShouldNotBeNull(); + + Assert.True(await enumerator.MoveNextAsync()); + + Assert.IsType(enumerator.Current); + + await Subscribe(enumerator).WithTimeout(); + + var subscribeActivities = Fixture + .GetActivitiesForOperation(TracingConstants.Operations.Subscribe, streamName) + .ToArray(); + + appendActivities.ShouldHaveSingleItem(); + + subscribeActivities.ShouldHaveSingleItem(); + + subscribeActivities.First().ParentId.ShouldBe(appendActivities.First().Id); + + var jsonMetadataEvent = seedEvents.First(); + + Fixture.AssertSubscriptionActivityHasExpectedTags( + subscribeActivities.First(), + streamName, + jsonMetadataEvent.EventId.ToString() + ); + + return; + + async Task Subscribe(IAsyncEnumerator internalEnumerator) { + while (await internalEnumerator.MoveNextAsync()) { + if (internalEnumerator.Current is not StreamMessage.Event(var resolvedEvent)) + continue; + + availableEvents.Remove(resolvedEvent.Event.EventId); + + if (availableEvents.Count == 0) + return; + } + } + } +} diff --git a/test/EventStore.Client.Tests.Common/Fixtures/DiagnosticsFixture.cs b/test/EventStore.Client.Tests.Common/Fixtures/DiagnosticsFixture.cs index 3eed0f25b..f8cd5913b 100644 --- a/test/EventStore.Client.Tests.Common/Fixtures/DiagnosticsFixture.cs +++ b/test/EventStore.Client.Tests.Common/Fixtures/DiagnosticsFixture.cs @@ -14,7 +14,7 @@ public class DiagnosticsFixture : EventStoreFixture { public DiagnosticsFixture() { var diagnosticActivityListener = new ActivityListener { ShouldListenTo = source => source.Name == EventStoreClientDiagnostics.InstrumentationName, - Sample = (ref ActivityCreationOptions _) => ActivitySamplingResult.AllDataAndRecorded, + Sample = (ref ActivityCreationOptions _) => ActivitySamplingResult.AllDataAndRecorded, ActivityStopped = activity => { var operation = (string?)activity.GetTagItem(TelemetryTags.Database.Operation); var stream = (string?)activity.GetTagItem(TelemetryTags.EventStore.Stream); @@ -71,23 +71,37 @@ public void AssertErroneousAppendActivityHasExpectedTags(Activity activity, Exce var actualEvent = activity.Events.ShouldHaveSingleItem(); actualEvent.Name.ShouldBe(TelemetryTags.Exception.EventName); - actualEvent.Tags.ShouldContain(new KeyValuePair(TelemetryTags.Exception.Type, actualException.GetType().FullName)); - actualEvent.Tags.ShouldContain(new KeyValuePair(TelemetryTags.Exception.Message, actualException.Message)); + actualEvent.Tags.ShouldContain( + new KeyValuePair(TelemetryTags.Exception.Type, actualException.GetType().FullName) + ); + + actualEvent.Tags.ShouldContain( + new KeyValuePair(TelemetryTags.Exception.Message, actualException.Message) + ); + actualEvent.Tags.Any(x => x.Key == TelemetryTags.Exception.Stacktrace).ShouldBeTrue(); } - public void AssertSubscriptionActivityHasExpectedTags(Activity activity, string stream, string eventId, string? subscriptionId) { + public void AssertSubscriptionActivityHasExpectedTags( + Activity activity, + string stream, + string eventId, + string? subscriptionId = null + ) { var expectedTags = new Dictionary { { TelemetryTags.Database.System, EventStoreClientDiagnostics.InstrumentationName }, { TelemetryTags.Database.Operation, TracingConstants.Operations.Subscribe }, { TelemetryTags.EventStore.Stream, stream }, { TelemetryTags.EventStore.EventId, eventId }, { TelemetryTags.EventStore.EventType, TestEventType }, - { TelemetryTags.EventStore.SubscriptionId, subscriptionId }, { TelemetryTags.Database.User, TestCredentials.Root.Username } }; - foreach (var tag in expectedTags) + if (subscriptionId != null) + expectedTags[TelemetryTags.EventStore.SubscriptionId] = subscriptionId; + + foreach (var tag in expectedTags) { activity.Tags.ShouldContain(tag); + } } -} \ No newline at end of file +} diff --git a/test/EventStore.Client.Tests.Common/Fixtures/EventStoreFixture.Helpers.cs b/test/EventStore.Client.Tests.Common/Fixtures/EventStoreFixture.Helpers.cs index 9fdf72d4a..1e8bb3d83 100644 --- a/test/EventStore.Client.Tests.Common/Fixtures/EventStoreFixture.Helpers.cs +++ b/test/EventStore.Client.Tests.Common/Fixtures/EventStoreFixture.Helpers.cs @@ -19,8 +19,18 @@ public ReadOnlyMemory CreateMetadataOfSize(int metadataSize) => public ReadOnlyMemory CreateTestJsonMetadata() => "{\"Foo\": \"Bar\"}"u8.ToArray(); - public IEnumerable CreateTestEvents(int count = 1, string? type = null, ReadOnlyMemory? metadata = null, string? contentType = null) => - Enumerable.Range(0, count).Select(index => CreateTestEvent(index, type ?? TestEventType, metadata, contentType)); + public ReadOnlyMemory CreateTestNonJsonMetadata() => "non-json-metadata"u8.ToArray(); + + public IEnumerable CreateTestEvents( + int count = 1, string? type = null, ReadOnlyMemory? metadata = null, string? contentType = null + ) => + Enumerable.Range(0, count) + .Select(index => CreateTestEvent(index, type ?? TestEventType, metadata, contentType)); + + public EventData CreateTestEvent( + string? type = null, ReadOnlyMemory? metadata = null, string? contentType = null + ) => + CreateTestEvent(0, type ?? TestEventType, metadata, contentType); public IEnumerable CreateTestEventsThatThrowsException() { // Ensure initial IEnumerator.Current does not throw @@ -32,7 +42,9 @@ public IEnumerable CreateTestEventsThatThrowsException() { protected static EventData CreateTestEvent(int index) => CreateTestEvent(index, TestEventType); - protected static EventData CreateTestEvent(int index, string type, ReadOnlyMemory? metadata = null, string? contentType = null) => + protected static EventData CreateTestEvent( + int index, string type, ReadOnlyMemory? metadata = null, string? contentType = null + ) => new( Uuid.NewUuid(), type, @@ -71,4 +83,4 @@ public async Task RestartService(TimeSpan delay) { await Streams.WarmUp(); Log.Information("Service restarted."); } -} \ No newline at end of file +} From ea304cb4ec381bcff6b86877bfc654a14cd96200 Mon Sep 17 00:00:00 2001 From: William Chong Date: Wed, 28 Aug 2024 18:26:35 +0400 Subject: [PATCH 2/3] Update tests --- .../Diagnostics/StreamsTracingInstrumentationTests.cs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/test/EventStore.Client.Streams.Tests/Diagnostics/StreamsTracingInstrumentationTests.cs b/test/EventStore.Client.Streams.Tests/Diagnostics/StreamsTracingInstrumentationTests.cs index b44569a13..4c23bfffa 100644 --- a/test/EventStore.Client.Streams.Tests/Diagnostics/StreamsTracingInstrumentationTests.cs +++ b/test/EventStore.Client.Streams.Tests/Diagnostics/StreamsTracingInstrumentationTests.cs @@ -1,4 +1,3 @@ -using System.Text.Json; using EventStore.Client.Diagnostics; using EventStore.Diagnostics.Tracing; @@ -89,7 +88,7 @@ await Fixture.Streams.AppendToStreamAsync( } [Fact] - public async Task TracingContextIsNotInjectedWhenEventIsNotJsonButHasJsonMetadata() { + public async Task TracingContextIsInjectedWhenEventIsNotJsonButHasJsonMetadata() { var stream = Fixture.GetStreamName(); var inputMetadata = Fixture.CreateTestJsonMetadata().ToArray(); @@ -107,8 +106,11 @@ await Fixture.Streams.AppendToStreamAsync( .ToListAsync(); var outputMetadata = readResult[0].OriginalEvent.Metadata.ToArray(); - var test = JsonSerializer.Deserialize(outputMetadata); - outputMetadata.ShouldBe(inputMetadata); + outputMetadata.ShouldNotBe(inputMetadata); + + var appendActivities = Fixture.GetActivitiesForOperation(TracingConstants.Operations.Append, stream); + + appendActivities.ShouldNotBeEmpty(); } [Fact] From fff1cf0ed323ce060c2c7df50bf7c519464fce1c Mon Sep 17 00:00:00 2001 From: William Chong Date: Thu, 29 Aug 2024 10:36:04 +0400 Subject: [PATCH 3/3] Extract propagation context from Event instead of OriginalEvent --- .../Common/Diagnostics/ActivitySourceExtensions.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/EventStore.Client/Common/Diagnostics/ActivitySourceExtensions.cs b/src/EventStore.Client/Common/Diagnostics/ActivitySourceExtensions.cs index 9ebe09799..26bd00b44 100644 --- a/src/EventStore.Client/Common/Diagnostics/ActivitySourceExtensions.cs +++ b/src/EventStore.Client/Common/Diagnostics/ActivitySourceExtensions.cs @@ -35,7 +35,7 @@ public static void TraceSubscriptionEvent( if (source.HasNoActiveListeners()) return; - var parentContext = resolvedEvent.OriginalEvent.Metadata.ExtractPropagationContext(); + var parentContext = resolvedEvent.Event.Metadata.ExtractPropagationContext(); if (parentContext == default(ActivityContext)) return;