Skip to content

Commit

Permalink
Fix context check in trace subscription method
Browse files Browse the repository at this point in the history
  • Loading branch information
w1am committed Aug 28, 2024
1 parent c736189 commit 62915aa
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ public static async ValueTask<T> TraceClientOperation<T>(
var res = await tracedOperation().ConfigureAwait(false);
activity?.StatusOk();
return res;
}
catch (Exception ex) {
} catch (Exception ex) {
activity?.StatusError(ex);
throw;
}
Expand All @@ -33,15 +32,12 @@ public static void TraceSubscriptionEvent(
EventStoreClientSettings settings,
UserCredentials? userCredentials
) {
if (resolvedEvent.OriginalEvent.ContentType != Constants.Metadata.ContentTypes.ApplicationJson)
return;

if (source.HasNoActiveListeners())
return;

var parentContext = resolvedEvent.OriginalEvent.Metadata.ExtractPropagationContext();

if (parentContext is null) return;
if (parentContext == default(ActivityContext)) return;

var tags = new ActivityTagsCollection()
.WithRequiredTag(TelemetryTags.EventStore.Stream, resolvedEvent.OriginalEvent.EventStreamId)
Expand All @@ -51,14 +47,19 @@ public static void TraceSubscriptionEvent(
// Ensure consistent server.address attribute when connecting to cluster via dns discovery
.WithGrpcChannelServerTags(channelInfo)
.WithClientSettingsServerTags(settings)
.WithOptionalTag(TelemetryTags.Database.User, userCredentials?.Username ?? settings.DefaultCredentials?.Username);
.WithOptionalTag(
TelemetryTags.Database.User,
userCredentials?.Username ?? settings.DefaultCredentials?.Username
);

StartActivity(source, TracingConstants.Operations.Subscribe, ActivityKind.Consumer, tags, parentContext)?.Dispose();
StartActivity(source, TracingConstants.Operations.Subscribe, ActivityKind.Consumer, tags, parentContext)
?.Dispose();
}

static Activity? StartActivity(
this ActivitySource source,
string operationName, ActivityKind activityKind, ActivityTagsCollection? tags = null, ActivityContext? parentContext = null
string operationName, ActivityKind activityKind, ActivityTagsCollection? tags = null,
ActivityContext? parentContext = null
) {
if (source.HasNoActiveListeners())
return null;
Expand All @@ -79,4 +80,4 @@ public static void TraceSubscriptionEvent(
}

static bool HasNoActiveListeners(this ActivitySource source) => !source.HasListeners();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@ readonly record struct TracingMetadata(
isRemote: isRemote
)
: default;
}
catch (Exception) {
} catch (Exception) {
return default;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ namespace EventStore.Client.Diagnostics;

static class EventMetadataExtensions {
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static ReadOnlySpan<byte> InjectTracingContext(this ReadOnlyMemory<byte> eventMetadata, Activity? activity) =>
public static ReadOnlySpan<byte> InjectTracingContext(
this ReadOnlyMemory<byte> eventMetadata, Activity? activity
) =>
eventMetadata.InjectTracingMetadata(activity?.GetTracingMetadata() ?? TracingMetadata.None);

[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand All @@ -32,14 +34,15 @@ public static TracingMetadata ExtractTracingMetadata(this ReadOnlyMemory<byte> e

return new TracingMetadata(traceId.GetString(), spanId.GetString());
}
}
catch (Exception) {
} catch (Exception) {
return TracingMetadata.None;
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
static ReadOnlySpan<byte> InjectTracingMetadata(this ReadOnlyMemory<byte> eventMetadata, TracingMetadata tracingMetadata) {
static ReadOnlySpan<byte> InjectTracingMetadata(
this ReadOnlyMemory<byte> eventMetadata, TracingMetadata tracingMetadata
) {
if (tracingMetadata == TracingMetadata.None || !tracingMetadata.IsValid)
return eventMetadata.Span;

Expand All @@ -49,7 +52,9 @@ static ReadOnlySpan<byte> InjectTracingMetadata(this ReadOnlyMemory<byte> eventM
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
static ReadOnlyMemory<byte> TryInjectTracingMetadata(this ReadOnlyMemory<byte> utf8Json, TracingMetadata tracingMetadata) {
static ReadOnlyMemory<byte> TryInjectTracingMetadata(
this ReadOnlyMemory<byte> utf8Json, TracingMetadata tracingMetadata
) {
try {
using var doc = JsonDocument.Parse(utf8Json);
using var stream = new MemoryStream();
Expand All @@ -72,9 +77,8 @@ static ReadOnlyMemory<byte> TryInjectTracingMetadata(this ReadOnlyMemory<byte> u
writer.Flush();

return stream.ToArray();
}
catch (Exception) {
} catch (Exception) {
return utf8Json;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
namespace EventStore.Client.Streams.Tests.Diagnostics;

[Trait("Category", "Diagnostics:Tracing")]
public class StreamsTracingInstrumentationTests(ITestOutputHelper output, DiagnosticsFixture fixture) : EventStoreTests<DiagnosticsFixture>(output, fixture) {
public class StreamsTracingInstrumentationTests(ITestOutputHelper output, DiagnosticsFixture fixture)
: EventStoreTests<DiagnosticsFixture>(output, fixture) {
[Fact]
public async Task AppendIsInstrumentedWithTracingAsExpected() {
var stream = Fixture.GetStreamName();
Expand Down Expand Up @@ -42,66 +43,6 @@ public async Task AppendTraceIsTaggedWithErrorStatusOnException() {
Fixture.AssertErroneousAppendActivityHasExpectedTags(activity, actualException);
}

[Fact]
public async Task CatchupSubscriptionIsInstrumentedWithTracingAndRestoresRemoteAppendContextAsExpected() {
var stream = Fixture.GetStreamName();
var events = Fixture.CreateTestEvents(2, metadata: Fixture.CreateTestJsonMetadata()).ToArray();

await Fixture.Streams.AppendToStreamAsync(
stream,
StreamState.NoStream,
events
);

string? subscriptionId = null;
await Subscribe().WithTimeout();

var appendActivity = Fixture
.GetActivitiesForOperation(TracingConstants.Operations.Append, stream)
.SingleOrDefault()
.ShouldNotBeNull();

var subscribeActivities = Fixture
.GetActivitiesForOperation(TracingConstants.Operations.Subscribe, stream)
.ToArray();

subscriptionId.ShouldNotBeNull();
subscribeActivities.Length.ShouldBe(events.Length);

for (var i = 0; i < subscribeActivities.Length; i++) {
subscribeActivities[i].TraceId.ShouldBe(appendActivity.Context.TraceId);
subscribeActivities[i].ParentSpanId.ShouldBe(appendActivity.Context.SpanId);
subscribeActivities[i].HasRemoteParent.ShouldBeTrue();

Fixture.AssertSubscriptionActivityHasExpectedTags(
subscribeActivities[i],
stream,
events[i].EventId.ToString(),
subscriptionId
);
}

return;

async Task Subscribe() {
await using var subscription = Fixture.Streams.SubscribeToStream(stream, FromStream.Start);
await using var enumerator = subscription.Messages.GetAsyncEnumerator();

var eventsAppeared = 0;
while (await enumerator.MoveNextAsync()) {
if (enumerator.Current is StreamMessage.SubscriptionConfirmation(var sid))
subscriptionId = sid;

if (enumerator.Current is not StreamMessage.Event(_))
continue;

eventsAppeared++;
if (eventsAppeared >= events.Length)
return;
}
}
}

[Fact]
public async Task TracingContextIsInjectedWhenUserMetadataIsValidJsonObject() {
var stream = Fixture.GetStreamName();
Expand Down Expand Up @@ -169,4 +110,63 @@ await Fixture.Streams.AppendToStreamAsync(
var test = JsonSerializer.Deserialize<object>(outputMetadata);
outputMetadata.ShouldBe(inputMetadata);
}
}

[Fact]
public async Task json_metadata_event_is_traced_and_non_json_metadata_event_is_not_traced() {
var streamName = Fixture.GetStreamName();

var seedEvents = new[] {
Fixture.CreateTestEvent(metadata: Fixture.CreateTestJsonMetadata()),
Fixture.CreateTestEvent(metadata: Fixture.CreateTestNonJsonMetadata())
};

var availableEvents = new HashSet<Uuid>(seedEvents.Select(x => x.EventId));

await Fixture.Streams.AppendToStreamAsync(streamName, StreamState.NoStream, seedEvents);

await using var subscription = Fixture.Streams.SubscribeToStream(streamName, FromStream.Start);
await using var enumerator = subscription.Messages.GetAsyncEnumerator();

var appendActivities = Fixture
.GetActivitiesForOperation(TracingConstants.Operations.Append, streamName)
.ShouldNotBeNull();

Assert.True(await enumerator.MoveNextAsync());

Assert.IsType<StreamMessage.SubscriptionConfirmation>(enumerator.Current);

await Subscribe(enumerator).WithTimeout();

var subscribeActivities = Fixture
.GetActivitiesForOperation(TracingConstants.Operations.Subscribe, streamName)
.ToArray();

appendActivities.ShouldHaveSingleItem();

subscribeActivities.ShouldHaveSingleItem();

subscribeActivities.First().ParentId.ShouldBe(appendActivities.First().Id);

var jsonMetadataEvent = seedEvents.First();

Fixture.AssertSubscriptionActivityHasExpectedTags(
subscribeActivities.First(),
streamName,
jsonMetadataEvent.EventId.ToString()
);

return;

async Task Subscribe(IAsyncEnumerator<StreamMessage> internalEnumerator) {
while (await internalEnumerator.MoveNextAsync()) {
if (internalEnumerator.Current is not StreamMessage.Event(var resolvedEvent))
continue;

availableEvents.Remove(resolvedEvent.Event.EventId);

if (availableEvents.Count == 0)
return;
}
}
}
}
28 changes: 21 additions & 7 deletions test/EventStore.Client.Tests.Common/Fixtures/DiagnosticsFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public class DiagnosticsFixture : EventStoreFixture {
public DiagnosticsFixture() {
var diagnosticActivityListener = new ActivityListener {
ShouldListenTo = source => source.Name == EventStoreClientDiagnostics.InstrumentationName,
Sample = (ref ActivityCreationOptions<ActivityContext> _) => ActivitySamplingResult.AllDataAndRecorded,
Sample = (ref ActivityCreationOptions<ActivityContext> _) => ActivitySamplingResult.AllDataAndRecorded,
ActivityStopped = activity => {
var operation = (string?)activity.GetTagItem(TelemetryTags.Database.Operation);
var stream = (string?)activity.GetTagItem(TelemetryTags.EventStore.Stream);
Expand Down Expand Up @@ -71,23 +71,37 @@ public void AssertErroneousAppendActivityHasExpectedTags(Activity activity, Exce
var actualEvent = activity.Events.ShouldHaveSingleItem();

actualEvent.Name.ShouldBe(TelemetryTags.Exception.EventName);
actualEvent.Tags.ShouldContain(new KeyValuePair<string, object?>(TelemetryTags.Exception.Type, actualException.GetType().FullName));
actualEvent.Tags.ShouldContain(new KeyValuePair<string, object?>(TelemetryTags.Exception.Message, actualException.Message));
actualEvent.Tags.ShouldContain(
new KeyValuePair<string, object?>(TelemetryTags.Exception.Type, actualException.GetType().FullName)
);

actualEvent.Tags.ShouldContain(
new KeyValuePair<string, object?>(TelemetryTags.Exception.Message, actualException.Message)
);

actualEvent.Tags.Any(x => x.Key == TelemetryTags.Exception.Stacktrace).ShouldBeTrue();
}

public void AssertSubscriptionActivityHasExpectedTags(Activity activity, string stream, string eventId, string? subscriptionId) {
public void AssertSubscriptionActivityHasExpectedTags(
Activity activity,
string stream,
string eventId,
string? subscriptionId = null
) {
var expectedTags = new Dictionary<string, string?> {
{ TelemetryTags.Database.System, EventStoreClientDiagnostics.InstrumentationName },
{ TelemetryTags.Database.Operation, TracingConstants.Operations.Subscribe },
{ TelemetryTags.EventStore.Stream, stream },
{ TelemetryTags.EventStore.EventId, eventId },
{ TelemetryTags.EventStore.EventType, TestEventType },
{ TelemetryTags.EventStore.SubscriptionId, subscriptionId },
{ TelemetryTags.Database.User, TestCredentials.Root.Username }
};

foreach (var tag in expectedTags)
if (subscriptionId != null)
expectedTags[TelemetryTags.EventStore.SubscriptionId] = subscriptionId;

foreach (var tag in expectedTags) {
activity.Tags.ShouldContain(tag);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,18 @@ public ReadOnlyMemory<byte> CreateMetadataOfSize(int metadataSize) =>

public ReadOnlyMemory<byte> CreateTestJsonMetadata() => "{\"Foo\": \"Bar\"}"u8.ToArray();

public IEnumerable<EventData> CreateTestEvents(int count = 1, string? type = null, ReadOnlyMemory<byte>? metadata = null, string? contentType = null) =>
Enumerable.Range(0, count).Select(index => CreateTestEvent(index, type ?? TestEventType, metadata, contentType));
public ReadOnlyMemory<byte> CreateTestNonJsonMetadata() => "non-json-metadata"u8.ToArray();

public IEnumerable<EventData> CreateTestEvents(
int count = 1, string? type = null, ReadOnlyMemory<byte>? metadata = null, string? contentType = null
) =>
Enumerable.Range(0, count)
.Select(index => CreateTestEvent(index, type ?? TestEventType, metadata, contentType));

public EventData CreateTestEvent(
string? type = null, ReadOnlyMemory<byte>? metadata = null, string? contentType = null
) =>
CreateTestEvent(0, type ?? TestEventType, metadata, contentType);

public IEnumerable<EventData> CreateTestEventsThatThrowsException() {
// Ensure initial IEnumerator.Current does not throw
Expand All @@ -32,7 +42,9 @@ public IEnumerable<EventData> CreateTestEventsThatThrowsException() {

protected static EventData CreateTestEvent(int index) => CreateTestEvent(index, TestEventType);

protected static EventData CreateTestEvent(int index, string type, ReadOnlyMemory<byte>? metadata = null, string? contentType = null) =>
protected static EventData CreateTestEvent(
int index, string type, ReadOnlyMemory<byte>? metadata = null, string? contentType = null
) =>
new(
Uuid.NewUuid(),
type,
Expand Down Expand Up @@ -71,4 +83,4 @@ public async Task RestartService(TimeSpan delay) {
await Streams.WarmUp();
Log.Information("Service restarted.");
}
}
}

0 comments on commit 62915aa

Please sign in to comment.