Skip to content

Commit

Permalink
Account for nullable events
Browse files Browse the repository at this point in the history
  • Loading branch information
w1am committed Oct 21, 2024
1 parent 09fcaa6 commit 0ceeff1
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// ReSharper disable ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract

using System.Diagnostics;
using EventStore.Diagnostics;
using EventStore.Diagnostics.Telemetry;
Expand Down Expand Up @@ -32,7 +34,7 @@ public static void TraceSubscriptionEvent(
EventStoreClientSettings settings,
UserCredentials? userCredentials
) {
if (source.HasNoActiveListeners())
if (source.HasNoActiveListeners() || resolvedEvent.Event is null)
return;

var parentContext = resolvedEvent.Event.Metadata.ExtractPropagationContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,4 +171,55 @@ async Task Subscribe(IAsyncEnumerator<StreamMessage> internalEnumerator) {
}
}
}

[Fact]
[Trait("Category", "Special cases")]
public async Task should_not_trace_when_event_is_null() {
var category = Guid.NewGuid().ToString("N");
var streamName = category + "-123";

var seedEvents = Fixture.CreateTestEvents(type: $"{category}-{Fixture.GetStreamName()}").ToArray();
await Fixture.Streams.AppendToStreamAsync(streamName, StreamState.NoStream, seedEvents);
await Fixture.Streams.DeleteAsync(streamName, StreamState.StreamExists);

await using var subscription = Fixture.Streams.SubscribeToStream("$ce-" + category, FromStream.Start, resolveLinkTos: true);

var ex = await Assert.ThrowsAsync<NullReferenceException>(
async () => {
await using var enumerator = subscription.Messages.GetAsyncEnumerator();
Assert.True(await enumerator.MoveNextAsync());
Assert.IsType<StreamMessage.SubscriptionConfirmation>(enumerator.Current);
await Subscribe().WithTimeout();
var appendActivities = Fixture
.GetActivitiesForOperation(TracingConstants.Operations.Append, "$ce-" + category)
.ShouldNotBeNull();
var subscribeActivities = Fixture
.GetActivitiesForOperation(TracingConstants.Operations.Subscribe, "$ce-" + category)
.ToArray();
appendActivities.ShouldHaveSingleItem();
subscribeActivities.ShouldBeEmpty();
return;
async Task Subscribe() {
while (await enumerator.MoveNextAsync()) {
if (enumerator.Current is not StreamMessage.Event(var resolvedEvent))
continue;
resolvedEvent.Event.Data.ShouldNotBe(null);
return;
}
}
}
).WithTimeout();

Assert.NotNull(ex);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -230,4 +230,45 @@ async Task Subscribe() {
}
}
}

[Fact]
[Trait("Category", "Special cases")]
public async Task should_throw_exception_when_subscribing_to_deleted_stream_category() {
var category = Guid.NewGuid().ToString("N");
var streamName = category + "-123";

var seedEvents = Fixture.CreateTestEvents(type: $"{category}-{Fixture.GetStreamName()}").ToArray();
await Fixture.Streams.AppendToStreamAsync(streamName, StreamState.NoStream, seedEvents);
await Fixture.Streams.DeleteAsync(streamName, StreamState.StreamExists);

await using var subscription = Fixture.Streams.SubscribeToStream("$ce-" + category, FromStream.Start, resolveLinkTos: true);

var ex = await Assert.ThrowsAsync<NullReferenceException>(
async () => {
await using var enumerator = subscription.Messages.GetAsyncEnumerator();
Assert.True(await enumerator.MoveNextAsync());
Assert.IsType<StreamMessage.SubscriptionConfirmation>(enumerator.Current);
await Subscribe().WithTimeout();
return;
async Task Subscribe() {
while (await enumerator.MoveNextAsync()) {
if (enumerator.Current is not StreamMessage.Event(var resolvedEvent)) {
continue;
}
resolvedEvent.Event.Data.ShouldNotBe(null);
return;
}
}
}
).WithTimeout();

Assert.NotNull(ex);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace EventStore.Client.Tests;
public class DiagnosticsFixture : EventStoreFixture {
readonly ConcurrentDictionary<(string Operation, string Stream), List<Activity>> _activities = [];

public DiagnosticsFixture() {
public DiagnosticsFixture() : base(x => x.RunProjections()) {
var diagnosticActivityListener = new ActivityListener {
ShouldListenTo = source => source.Name == EventStoreClientDiagnostics.InstrumentationName,
Sample = (ref ActivityCreationOptions<ActivityContext> _) => ActivitySamplingResult.AllDataAndRecorded,
Expand Down

0 comments on commit 0ceeff1

Please sign in to comment.