Skip to content

Commit

Permalink
Fix check for parent context when extract propagation context fails (#…
Browse files Browse the repository at this point in the history
…319)

* Fix context check in trace subscription method
* Extract propagation context from Event instead of OriginalEvent
  • Loading branch information
w1am authored Aug 29, 2024
1 parent c736189 commit 09fcaa6
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 114 deletions.
22 changes: 7 additions & 15 deletions src/EventStore.Client.Streams/EventStoreClient.Append.cs
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,9 @@ await call.RequestStream
foreach (var e in eventData) {
var appendReq = new AppendReq {
ProposedMessage = new() {
Id = e.EventId.ToDto(),
Data = ByteString.CopyFrom(e.Data.Span),
CustomMetadata = ByteString.CopyFrom(
e.ContentType == Constants.Metadata.ContentTypes.ApplicationJson
? e.Metadata.InjectTracingContext(Activity.Current)
: e.Metadata.Span
),
Id = e.EventId.ToDto(),
Data = ByteString.CopyFrom(e.Data.Span),
CustomMetadata = ByteString.CopyFrom(e.Metadata.InjectTracingContext(Activity.Current)),
Metadata = {
{ Constants.Metadata.Type, e.Type },
{ Constants.Metadata.ContentType, e.ContentType }
Expand Down Expand Up @@ -392,13 +388,9 @@ IEnumerable<BatchAppendReq> GetRequests(IEnumerable<EventData> events, BatchAppe

foreach (var eventData in events) {
var proposedMessage = new BatchAppendReq.Types.ProposedMessage {
Data = ByteString.CopyFrom(eventData.Data.Span),
CustomMetadata = ByteString.CopyFrom(
eventData.ContentType == Constants.Metadata.ContentTypes.ApplicationJson
? eventData.Metadata.InjectTracingContext(Activity.Current)
: eventData.Metadata.Span
),
Id = eventData.EventId.ToDto(),
Data = ByteString.CopyFrom(eventData.Data.Span),
CustomMetadata = ByteString.CopyFrom(eventData.Metadata.InjectTracingContext(Activity.Current)),
Id = eventData.EventId.ToDto(),
Metadata = {
{ Constants.Metadata.Type, eventData.Type },
{ Constants.Metadata.ContentType, eventData.ContentType }
Expand Down Expand Up @@ -435,4 +427,4 @@ public void Dispose() {
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ public static async ValueTask<T> TraceClientOperation<T>(
var res = await tracedOperation().ConfigureAwait(false);
activity?.StatusOk();
return res;
}
catch (Exception ex) {
} catch (Exception ex) {
activity?.StatusError(ex);
throw;
}
Expand All @@ -33,15 +32,12 @@ public static void TraceSubscriptionEvent(
EventStoreClientSettings settings,
UserCredentials? userCredentials
) {
if (resolvedEvent.OriginalEvent.ContentType != Constants.Metadata.ContentTypes.ApplicationJson)
return;

if (source.HasNoActiveListeners())
return;

var parentContext = resolvedEvent.OriginalEvent.Metadata.ExtractPropagationContext();
var parentContext = resolvedEvent.Event.Metadata.ExtractPropagationContext();

if (parentContext is null) return;
if (parentContext == default(ActivityContext)) return;

var tags = new ActivityTagsCollection()
.WithRequiredTag(TelemetryTags.EventStore.Stream, resolvedEvent.OriginalEvent.EventStreamId)
Expand All @@ -51,14 +47,19 @@ public static void TraceSubscriptionEvent(
// Ensure consistent server.address attribute when connecting to cluster via dns discovery
.WithGrpcChannelServerTags(channelInfo)
.WithClientSettingsServerTags(settings)
.WithOptionalTag(TelemetryTags.Database.User, userCredentials?.Username ?? settings.DefaultCredentials?.Username);
.WithOptionalTag(
TelemetryTags.Database.User,
userCredentials?.Username ?? settings.DefaultCredentials?.Username
);

StartActivity(source, TracingConstants.Operations.Subscribe, ActivityKind.Consumer, tags, parentContext)?.Dispose();
StartActivity(source, TracingConstants.Operations.Subscribe, ActivityKind.Consumer, tags, parentContext)
?.Dispose();
}

static Activity? StartActivity(
this ActivitySource source,
string operationName, ActivityKind activityKind, ActivityTagsCollection? tags = null, ActivityContext? parentContext = null
string operationName, ActivityKind activityKind, ActivityTagsCollection? tags = null,
ActivityContext? parentContext = null
) {
if (source.HasNoActiveListeners())
return null;
Expand All @@ -79,4 +80,4 @@ public static void TraceSubscriptionEvent(
}

static bool HasNoActiveListeners(this ActivitySource source) => !source.HasListeners();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@ readonly record struct TracingMetadata(
isRemote: isRemote
)
: default;
}
catch (Exception) {
} catch (Exception) {
return default;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ namespace EventStore.Client.Diagnostics;

static class EventMetadataExtensions {
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static ReadOnlySpan<byte> InjectTracingContext(this ReadOnlyMemory<byte> eventMetadata, Activity? activity) =>
public static ReadOnlySpan<byte> InjectTracingContext(
this ReadOnlyMemory<byte> eventMetadata, Activity? activity
) =>
eventMetadata.InjectTracingMetadata(activity?.GetTracingMetadata() ?? TracingMetadata.None);

[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand All @@ -32,14 +34,15 @@ public static TracingMetadata ExtractTracingMetadata(this ReadOnlyMemory<byte> e

return new TracingMetadata(traceId.GetString(), spanId.GetString());
}
}
catch (Exception) {
} catch (Exception) {
return TracingMetadata.None;
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
static ReadOnlySpan<byte> InjectTracingMetadata(this ReadOnlyMemory<byte> eventMetadata, TracingMetadata tracingMetadata) {
static ReadOnlySpan<byte> InjectTracingMetadata(
this ReadOnlyMemory<byte> eventMetadata, TracingMetadata tracingMetadata
) {
if (tracingMetadata == TracingMetadata.None || !tracingMetadata.IsValid)
return eventMetadata.Span;

Expand All @@ -49,7 +52,9 @@ static ReadOnlySpan<byte> InjectTracingMetadata(this ReadOnlyMemory<byte> eventM
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
static ReadOnlyMemory<byte> TryInjectTracingMetadata(this ReadOnlyMemory<byte> utf8Json, TracingMetadata tracingMetadata) {
static ReadOnlyMemory<byte> TryInjectTracingMetadata(
this ReadOnlyMemory<byte> utf8Json, TracingMetadata tracingMetadata
) {
try {
using var doc = JsonDocument.Parse(utf8Json);
using var stream = new MemoryStream();
Expand All @@ -72,9 +77,8 @@ static ReadOnlyMemory<byte> TryInjectTracingMetadata(this ReadOnlyMemory<byte> u
writer.Flush();

return stream.ToArray();
}
catch (Exception) {
} catch (Exception) {
return utf8Json;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
using System.Text.Json;
using EventStore.Client.Diagnostics;
using EventStore.Diagnostics.Tracing;

namespace EventStore.Client.Streams.Tests.Diagnostics;

[Trait("Category", "Diagnostics:Tracing")]
public class StreamsTracingInstrumentationTests(ITestOutputHelper output, DiagnosticsFixture fixture) : EventStoreTests<DiagnosticsFixture>(output, fixture) {
public class StreamsTracingInstrumentationTests(ITestOutputHelper output, DiagnosticsFixture fixture)
: EventStoreTests<DiagnosticsFixture>(output, fixture) {
[Fact]
public async Task AppendIsInstrumentedWithTracingAsExpected() {
var stream = Fixture.GetStreamName();
Expand Down Expand Up @@ -42,66 +42,6 @@ public async Task AppendTraceIsTaggedWithErrorStatusOnException() {
Fixture.AssertErroneousAppendActivityHasExpectedTags(activity, actualException);
}

[Fact]
public async Task CatchupSubscriptionIsInstrumentedWithTracingAndRestoresRemoteAppendContextAsExpected() {
var stream = Fixture.GetStreamName();
var events = Fixture.CreateTestEvents(2, metadata: Fixture.CreateTestJsonMetadata()).ToArray();

await Fixture.Streams.AppendToStreamAsync(
stream,
StreamState.NoStream,
events
);

string? subscriptionId = null;
await Subscribe().WithTimeout();

var appendActivity = Fixture
.GetActivitiesForOperation(TracingConstants.Operations.Append, stream)
.SingleOrDefault()
.ShouldNotBeNull();

var subscribeActivities = Fixture
.GetActivitiesForOperation(TracingConstants.Operations.Subscribe, stream)
.ToArray();

subscriptionId.ShouldNotBeNull();
subscribeActivities.Length.ShouldBe(events.Length);

for (var i = 0; i < subscribeActivities.Length; i++) {
subscribeActivities[i].TraceId.ShouldBe(appendActivity.Context.TraceId);
subscribeActivities[i].ParentSpanId.ShouldBe(appendActivity.Context.SpanId);
subscribeActivities[i].HasRemoteParent.ShouldBeTrue();

Fixture.AssertSubscriptionActivityHasExpectedTags(
subscribeActivities[i],
stream,
events[i].EventId.ToString(),
subscriptionId
);
}

return;

async Task Subscribe() {
await using var subscription = Fixture.Streams.SubscribeToStream(stream, FromStream.Start);
await using var enumerator = subscription.Messages.GetAsyncEnumerator();

var eventsAppeared = 0;
while (await enumerator.MoveNextAsync()) {
if (enumerator.Current is StreamMessage.SubscriptionConfirmation(var sid))
subscriptionId = sid;

if (enumerator.Current is not StreamMessage.Event(_))
continue;

eventsAppeared++;
if (eventsAppeared >= events.Length)
return;
}
}
}

[Fact]
public async Task TracingContextIsInjectedWhenUserMetadataIsValidJsonObject() {
var stream = Fixture.GetStreamName();
Expand Down Expand Up @@ -148,7 +88,7 @@ await Fixture.Streams.AppendToStreamAsync(
}

[Fact]
public async Task TracingContextIsNotInjectedWhenEventIsNotJsonButHasJsonMetadata() {
public async Task TracingContextIsInjectedWhenEventIsNotJsonButHasJsonMetadata() {
var stream = Fixture.GetStreamName();

var inputMetadata = Fixture.CreateTestJsonMetadata().ToArray();
Expand All @@ -166,7 +106,69 @@ await Fixture.Streams.AppendToStreamAsync(
.ToListAsync();

var outputMetadata = readResult[0].OriginalEvent.Metadata.ToArray();
var test = JsonSerializer.Deserialize<object>(outputMetadata);
outputMetadata.ShouldBe(inputMetadata);
outputMetadata.ShouldNotBe(inputMetadata);

var appendActivities = Fixture.GetActivitiesForOperation(TracingConstants.Operations.Append, stream);

appendActivities.ShouldNotBeEmpty();
}

[Fact]
public async Task json_metadata_event_is_traced_and_non_json_metadata_event_is_not_traced() {
var streamName = Fixture.GetStreamName();

var seedEvents = new[] {
Fixture.CreateTestEvent(metadata: Fixture.CreateTestJsonMetadata()),
Fixture.CreateTestEvent(metadata: Fixture.CreateTestNonJsonMetadata())
};

var availableEvents = new HashSet<Uuid>(seedEvents.Select(x => x.EventId));

await Fixture.Streams.AppendToStreamAsync(streamName, StreamState.NoStream, seedEvents);

await using var subscription = Fixture.Streams.SubscribeToStream(streamName, FromStream.Start);
await using var enumerator = subscription.Messages.GetAsyncEnumerator();

var appendActivities = Fixture
.GetActivitiesForOperation(TracingConstants.Operations.Append, streamName)
.ShouldNotBeNull();

Assert.True(await enumerator.MoveNextAsync());

Assert.IsType<StreamMessage.SubscriptionConfirmation>(enumerator.Current);

await Subscribe(enumerator).WithTimeout();

var subscribeActivities = Fixture
.GetActivitiesForOperation(TracingConstants.Operations.Subscribe, streamName)
.ToArray();

appendActivities.ShouldHaveSingleItem();

subscribeActivities.ShouldHaveSingleItem();

subscribeActivities.First().ParentId.ShouldBe(appendActivities.First().Id);

var jsonMetadataEvent = seedEvents.First();

Fixture.AssertSubscriptionActivityHasExpectedTags(
subscribeActivities.First(),
streamName,
jsonMetadataEvent.EventId.ToString()
);

return;

async Task Subscribe(IAsyncEnumerator<StreamMessage> internalEnumerator) {
while (await internalEnumerator.MoveNextAsync()) {
if (internalEnumerator.Current is not StreamMessage.Event(var resolvedEvent))
continue;

availableEvents.Remove(resolvedEvent.Event.EventId);

if (availableEvents.Count == 0)
return;
}
}
}
}
}
28 changes: 21 additions & 7 deletions test/EventStore.Client.Tests.Common/Fixtures/DiagnosticsFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public class DiagnosticsFixture : EventStoreFixture {
public DiagnosticsFixture() {
var diagnosticActivityListener = new ActivityListener {
ShouldListenTo = source => source.Name == EventStoreClientDiagnostics.InstrumentationName,
Sample = (ref ActivityCreationOptions<ActivityContext> _) => ActivitySamplingResult.AllDataAndRecorded,
Sample = (ref ActivityCreationOptions<ActivityContext> _) => ActivitySamplingResult.AllDataAndRecorded,
ActivityStopped = activity => {
var operation = (string?)activity.GetTagItem(TelemetryTags.Database.Operation);
var stream = (string?)activity.GetTagItem(TelemetryTags.EventStore.Stream);
Expand Down Expand Up @@ -71,23 +71,37 @@ public void AssertErroneousAppendActivityHasExpectedTags(Activity activity, Exce
var actualEvent = activity.Events.ShouldHaveSingleItem();

actualEvent.Name.ShouldBe(TelemetryTags.Exception.EventName);
actualEvent.Tags.ShouldContain(new KeyValuePair<string, object?>(TelemetryTags.Exception.Type, actualException.GetType().FullName));
actualEvent.Tags.ShouldContain(new KeyValuePair<string, object?>(TelemetryTags.Exception.Message, actualException.Message));
actualEvent.Tags.ShouldContain(
new KeyValuePair<string, object?>(TelemetryTags.Exception.Type, actualException.GetType().FullName)
);

actualEvent.Tags.ShouldContain(
new KeyValuePair<string, object?>(TelemetryTags.Exception.Message, actualException.Message)
);

actualEvent.Tags.Any(x => x.Key == TelemetryTags.Exception.Stacktrace).ShouldBeTrue();
}

public void AssertSubscriptionActivityHasExpectedTags(Activity activity, string stream, string eventId, string? subscriptionId) {
public void AssertSubscriptionActivityHasExpectedTags(
Activity activity,
string stream,
string eventId,
string? subscriptionId = null
) {
var expectedTags = new Dictionary<string, string?> {
{ TelemetryTags.Database.System, EventStoreClientDiagnostics.InstrumentationName },
{ TelemetryTags.Database.Operation, TracingConstants.Operations.Subscribe },
{ TelemetryTags.EventStore.Stream, stream },
{ TelemetryTags.EventStore.EventId, eventId },
{ TelemetryTags.EventStore.EventType, TestEventType },
{ TelemetryTags.EventStore.SubscriptionId, subscriptionId },
{ TelemetryTags.Database.User, TestCredentials.Root.Username }
};

foreach (var tag in expectedTags)
if (subscriptionId != null)
expectedTags[TelemetryTags.EventStore.SubscriptionId] = subscriptionId;

foreach (var tag in expectedTags) {
activity.Tags.ShouldContain(tag);
}
}
}
}
Loading

0 comments on commit 09fcaa6

Please sign in to comment.