Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix check for parent context when extract propagation context fails #319

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 7 additions & 15 deletions src/EventStore.Client.Streams/EventStoreClient.Append.cs
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,9 @@ await call.RequestStream
foreach (var e in eventData) {
var appendReq = new AppendReq {
ProposedMessage = new() {
Id = e.EventId.ToDto(),
Data = ByteString.CopyFrom(e.Data.Span),
CustomMetadata = ByteString.CopyFrom(
e.ContentType == Constants.Metadata.ContentTypes.ApplicationJson
? e.Metadata.InjectTracingContext(Activity.Current)
: e.Metadata.Span
),
Id = e.EventId.ToDto(),
Data = ByteString.CopyFrom(e.Data.Span),
CustomMetadata = ByteString.CopyFrom(e.Metadata.InjectTracingContext(Activity.Current)),
Metadata = {
{ Constants.Metadata.Type, e.Type },
{ Constants.Metadata.ContentType, e.ContentType }
Expand Down Expand Up @@ -392,13 +388,9 @@ IEnumerable<BatchAppendReq> GetRequests(IEnumerable<EventData> events, BatchAppe

foreach (var eventData in events) {
var proposedMessage = new BatchAppendReq.Types.ProposedMessage {
Data = ByteString.CopyFrom(eventData.Data.Span),
CustomMetadata = ByteString.CopyFrom(
eventData.ContentType == Constants.Metadata.ContentTypes.ApplicationJson
? eventData.Metadata.InjectTracingContext(Activity.Current)
: eventData.Metadata.Span
),
Id = eventData.EventId.ToDto(),
Data = ByteString.CopyFrom(eventData.Data.Span),
CustomMetadata = ByteString.CopyFrom(eventData.Metadata.InjectTracingContext(Activity.Current)),
Id = eventData.EventId.ToDto(),
Metadata = {
{ Constants.Metadata.Type, eventData.Type },
{ Constants.Metadata.ContentType, eventData.ContentType }
Expand Down Expand Up @@ -435,4 +427,4 @@ public void Dispose() {
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ public static async ValueTask<T> TraceClientOperation<T>(
var res = await tracedOperation().ConfigureAwait(false);
activity?.StatusOk();
return res;
}
catch (Exception ex) {
} catch (Exception ex) {
activity?.StatusError(ex);
throw;
}
Expand All @@ -33,15 +32,12 @@ public static void TraceSubscriptionEvent(
EventStoreClientSettings settings,
UserCredentials? userCredentials
) {
if (resolvedEvent.OriginalEvent.ContentType != Constants.Metadata.ContentTypes.ApplicationJson)
return;

if (source.HasNoActiveListeners())
return;

var parentContext = resolvedEvent.OriginalEvent.Metadata.ExtractPropagationContext();
var parentContext = resolvedEvent.Event.Metadata.ExtractPropagationContext();

if (parentContext is null) return;
if (parentContext == default(ActivityContext)) return;

var tags = new ActivityTagsCollection()
.WithRequiredTag(TelemetryTags.EventStore.Stream, resolvedEvent.OriginalEvent.EventStreamId)
Expand All @@ -51,14 +47,19 @@ public static void TraceSubscriptionEvent(
// Ensure consistent server.address attribute when connecting to cluster via dns discovery
.WithGrpcChannelServerTags(channelInfo)
.WithClientSettingsServerTags(settings)
.WithOptionalTag(TelemetryTags.Database.User, userCredentials?.Username ?? settings.DefaultCredentials?.Username);
.WithOptionalTag(
TelemetryTags.Database.User,
userCredentials?.Username ?? settings.DefaultCredentials?.Username
);

StartActivity(source, TracingConstants.Operations.Subscribe, ActivityKind.Consumer, tags, parentContext)?.Dispose();
StartActivity(source, TracingConstants.Operations.Subscribe, ActivityKind.Consumer, tags, parentContext)
?.Dispose();
}

static Activity? StartActivity(
this ActivitySource source,
string operationName, ActivityKind activityKind, ActivityTagsCollection? tags = null, ActivityContext? parentContext = null
string operationName, ActivityKind activityKind, ActivityTagsCollection? tags = null,
ActivityContext? parentContext = null
) {
if (source.HasNoActiveListeners())
return null;
Expand All @@ -79,4 +80,4 @@ public static void TraceSubscriptionEvent(
}

static bool HasNoActiveListeners(this ActivitySource source) => !source.HasListeners();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@ readonly record struct TracingMetadata(
isRemote: isRemote
)
: default;
}
catch (Exception) {
} catch (Exception) {
return default;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ namespace EventStore.Client.Diagnostics;

static class EventMetadataExtensions {
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static ReadOnlySpan<byte> InjectTracingContext(this ReadOnlyMemory<byte> eventMetadata, Activity? activity) =>
public static ReadOnlySpan<byte> InjectTracingContext(
this ReadOnlyMemory<byte> eventMetadata, Activity? activity
) =>
eventMetadata.InjectTracingMetadata(activity?.GetTracingMetadata() ?? TracingMetadata.None);

[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand All @@ -32,14 +34,15 @@ public static TracingMetadata ExtractTracingMetadata(this ReadOnlyMemory<byte> e

return new TracingMetadata(traceId.GetString(), spanId.GetString());
}
}
catch (Exception) {
} catch (Exception) {
return TracingMetadata.None;
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
static ReadOnlySpan<byte> InjectTracingMetadata(this ReadOnlyMemory<byte> eventMetadata, TracingMetadata tracingMetadata) {
static ReadOnlySpan<byte> InjectTracingMetadata(
this ReadOnlyMemory<byte> eventMetadata, TracingMetadata tracingMetadata
) {
if (tracingMetadata == TracingMetadata.None || !tracingMetadata.IsValid)
return eventMetadata.Span;

Expand All @@ -49,7 +52,9 @@ static ReadOnlySpan<byte> InjectTracingMetadata(this ReadOnlyMemory<byte> eventM
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
static ReadOnlyMemory<byte> TryInjectTracingMetadata(this ReadOnlyMemory<byte> utf8Json, TracingMetadata tracingMetadata) {
static ReadOnlyMemory<byte> TryInjectTracingMetadata(
this ReadOnlyMemory<byte> utf8Json, TracingMetadata tracingMetadata
) {
try {
using var doc = JsonDocument.Parse(utf8Json);
using var stream = new MemoryStream();
Expand All @@ -72,9 +77,8 @@ static ReadOnlyMemory<byte> TryInjectTracingMetadata(this ReadOnlyMemory<byte> u
writer.Flush();

return stream.ToArray();
}
catch (Exception) {
} catch (Exception) {
return utf8Json;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
using System.Text.Json;
using EventStore.Client.Diagnostics;
using EventStore.Diagnostics.Tracing;

namespace EventStore.Client.Streams.Tests.Diagnostics;

[Trait("Category", "Diagnostics:Tracing")]
public class StreamsTracingInstrumentationTests(ITestOutputHelper output, DiagnosticsFixture fixture) : EventStoreTests<DiagnosticsFixture>(output, fixture) {
public class StreamsTracingInstrumentationTests(ITestOutputHelper output, DiagnosticsFixture fixture)
: EventStoreTests<DiagnosticsFixture>(output, fixture) {
[Fact]
public async Task AppendIsInstrumentedWithTracingAsExpected() {
var stream = Fixture.GetStreamName();
Expand Down Expand Up @@ -42,66 +42,6 @@ public async Task AppendTraceIsTaggedWithErrorStatusOnException() {
Fixture.AssertErroneousAppendActivityHasExpectedTags(activity, actualException);
}

[Fact]
public async Task CatchupSubscriptionIsInstrumentedWithTracingAndRestoresRemoteAppendContextAsExpected() {
var stream = Fixture.GetStreamName();
var events = Fixture.CreateTestEvents(2, metadata: Fixture.CreateTestJsonMetadata()).ToArray();

await Fixture.Streams.AppendToStreamAsync(
stream,
StreamState.NoStream,
events
);

string? subscriptionId = null;
await Subscribe().WithTimeout();

var appendActivity = Fixture
.GetActivitiesForOperation(TracingConstants.Operations.Append, stream)
.SingleOrDefault()
.ShouldNotBeNull();

var subscribeActivities = Fixture
.GetActivitiesForOperation(TracingConstants.Operations.Subscribe, stream)
.ToArray();

subscriptionId.ShouldNotBeNull();
subscribeActivities.Length.ShouldBe(events.Length);

for (var i = 0; i < subscribeActivities.Length; i++) {
subscribeActivities[i].TraceId.ShouldBe(appendActivity.Context.TraceId);
subscribeActivities[i].ParentSpanId.ShouldBe(appendActivity.Context.SpanId);
subscribeActivities[i].HasRemoteParent.ShouldBeTrue();

Fixture.AssertSubscriptionActivityHasExpectedTags(
subscribeActivities[i],
stream,
events[i].EventId.ToString(),
subscriptionId
);
}

return;

async Task Subscribe() {
await using var subscription = Fixture.Streams.SubscribeToStream(stream, FromStream.Start);
await using var enumerator = subscription.Messages.GetAsyncEnumerator();

var eventsAppeared = 0;
while (await enumerator.MoveNextAsync()) {
if (enumerator.Current is StreamMessage.SubscriptionConfirmation(var sid))
subscriptionId = sid;

if (enumerator.Current is not StreamMessage.Event(_))
continue;

eventsAppeared++;
if (eventsAppeared >= events.Length)
return;
}
}
}

[Fact]
public async Task TracingContextIsInjectedWhenUserMetadataIsValidJsonObject() {
var stream = Fixture.GetStreamName();
Expand Down Expand Up @@ -148,7 +88,7 @@ await Fixture.Streams.AppendToStreamAsync(
}

[Fact]
public async Task TracingContextIsNotInjectedWhenEventIsNotJsonButHasJsonMetadata() {
public async Task TracingContextIsInjectedWhenEventIsNotJsonButHasJsonMetadata() {
var stream = Fixture.GetStreamName();

var inputMetadata = Fixture.CreateTestJsonMetadata().ToArray();
Expand All @@ -166,7 +106,69 @@ await Fixture.Streams.AppendToStreamAsync(
.ToListAsync();

var outputMetadata = readResult[0].OriginalEvent.Metadata.ToArray();
var test = JsonSerializer.Deserialize<object>(outputMetadata);
outputMetadata.ShouldBe(inputMetadata);
outputMetadata.ShouldNotBe(inputMetadata);

var appendActivities = Fixture.GetActivitiesForOperation(TracingConstants.Operations.Append, stream);

appendActivities.ShouldNotBeEmpty();
}

[Fact]
public async Task json_metadata_event_is_traced_and_non_json_metadata_event_is_not_traced() {
var streamName = Fixture.GetStreamName();

var seedEvents = new[] {
Fixture.CreateTestEvent(metadata: Fixture.CreateTestJsonMetadata()),
Fixture.CreateTestEvent(metadata: Fixture.CreateTestNonJsonMetadata())
};

var availableEvents = new HashSet<Uuid>(seedEvents.Select(x => x.EventId));

await Fixture.Streams.AppendToStreamAsync(streamName, StreamState.NoStream, seedEvents);

await using var subscription = Fixture.Streams.SubscribeToStream(streamName, FromStream.Start);
await using var enumerator = subscription.Messages.GetAsyncEnumerator();

var appendActivities = Fixture
.GetActivitiesForOperation(TracingConstants.Operations.Append, streamName)
.ShouldNotBeNull();

Assert.True(await enumerator.MoveNextAsync());

Assert.IsType<StreamMessage.SubscriptionConfirmation>(enumerator.Current);

await Subscribe(enumerator).WithTimeout();

var subscribeActivities = Fixture
.GetActivitiesForOperation(TracingConstants.Operations.Subscribe, streamName)
.ToArray();

appendActivities.ShouldHaveSingleItem();

subscribeActivities.ShouldHaveSingleItem();

subscribeActivities.First().ParentId.ShouldBe(appendActivities.First().Id);

var jsonMetadataEvent = seedEvents.First();

Fixture.AssertSubscriptionActivityHasExpectedTags(
subscribeActivities.First(),
streamName,
jsonMetadataEvent.EventId.ToString()
);

return;

async Task Subscribe(IAsyncEnumerator<StreamMessage> internalEnumerator) {
while (await internalEnumerator.MoveNextAsync()) {
if (internalEnumerator.Current is not StreamMessage.Event(var resolvedEvent))
continue;

availableEvents.Remove(resolvedEvent.Event.EventId);

if (availableEvents.Count == 0)
return;
}
}
}
}
}
28 changes: 21 additions & 7 deletions test/EventStore.Client.Tests.Common/Fixtures/DiagnosticsFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public class DiagnosticsFixture : EventStoreFixture {
public DiagnosticsFixture() {
var diagnosticActivityListener = new ActivityListener {
ShouldListenTo = source => source.Name == EventStoreClientDiagnostics.InstrumentationName,
Sample = (ref ActivityCreationOptions<ActivityContext> _) => ActivitySamplingResult.AllDataAndRecorded,
Sample = (ref ActivityCreationOptions<ActivityContext> _) => ActivitySamplingResult.AllDataAndRecorded,
ActivityStopped = activity => {
var operation = (string?)activity.GetTagItem(TelemetryTags.Database.Operation);
var stream = (string?)activity.GetTagItem(TelemetryTags.EventStore.Stream);
Expand Down Expand Up @@ -71,23 +71,37 @@ public void AssertErroneousAppendActivityHasExpectedTags(Activity activity, Exce
var actualEvent = activity.Events.ShouldHaveSingleItem();

actualEvent.Name.ShouldBe(TelemetryTags.Exception.EventName);
actualEvent.Tags.ShouldContain(new KeyValuePair<string, object?>(TelemetryTags.Exception.Type, actualException.GetType().FullName));
actualEvent.Tags.ShouldContain(new KeyValuePair<string, object?>(TelemetryTags.Exception.Message, actualException.Message));
actualEvent.Tags.ShouldContain(
new KeyValuePair<string, object?>(TelemetryTags.Exception.Type, actualException.GetType().FullName)
);

actualEvent.Tags.ShouldContain(
new KeyValuePair<string, object?>(TelemetryTags.Exception.Message, actualException.Message)
);

actualEvent.Tags.Any(x => x.Key == TelemetryTags.Exception.Stacktrace).ShouldBeTrue();
}

public void AssertSubscriptionActivityHasExpectedTags(Activity activity, string stream, string eventId, string? subscriptionId) {
public void AssertSubscriptionActivityHasExpectedTags(
Activity activity,
string stream,
string eventId,
string? subscriptionId = null
) {
var expectedTags = new Dictionary<string, string?> {
{ TelemetryTags.Database.System, EventStoreClientDiagnostics.InstrumentationName },
{ TelemetryTags.Database.Operation, TracingConstants.Operations.Subscribe },
{ TelemetryTags.EventStore.Stream, stream },
{ TelemetryTags.EventStore.EventId, eventId },
{ TelemetryTags.EventStore.EventType, TestEventType },
{ TelemetryTags.EventStore.SubscriptionId, subscriptionId },
{ TelemetryTags.Database.User, TestCredentials.Root.Username }
};

foreach (var tag in expectedTags)
if (subscriptionId != null)
expectedTags[TelemetryTags.EventStore.SubscriptionId] = subscriptionId;

foreach (var tag in expectedTags) {
activity.Tags.ShouldContain(tag);
}
}
}
}
Loading
Loading