Skip to content

Commit

Permalink
Add interceptor on request stream
Browse files Browse the repository at this point in the history
  • Loading branch information
w1am committed Feb 15, 2024
1 parent bd043f8 commit f28f2c3
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 95 deletions.
67 changes: 27 additions & 40 deletions src/EventStore.Client.Streams/EventStoreClient.Append.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,51 +92,38 @@ public async Task<IWriteResult> AppendToStreamAsync(
}

private async ValueTask<IWriteResult> AppendToStreamInternal(
CallInvoker callInvoker,
AppendReq header,
IEnumerable<EventData> eventData,
EventStoreClientOperationOptions operationOptions,
TimeSpan? deadline,
UserCredentials? userCredentials,
CancellationToken cancellationToken
) {
CallInvoker callInvoker,
AppendReq header,
IEnumerable<EventData> eventData,
EventStoreClientOperationOptions operationOptions,
TimeSpan? deadline,
UserCredentials? userCredentials,
CancellationToken cancellationToken
) {
using var call = new Streams.Streams.StreamsClient(callInvoker).Append(
EventStoreCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken)
);
EventStoreCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken)
);

try {
await call.RequestStream.WriteAsync(header).ConfigureAwait(false);
foreach (var e in eventData) {
await call.RequestStream.WriteAsync(
new AppendReq {
ProposedMessage = new AppendReq.Types.ProposedMessage {
Id = e.EventId.ToDto(),
Data = ByteString.CopyFrom(e.Data.Span),
CustomMetadata = ByteString.CopyFrom(e.Metadata.Span),
Metadata = {
{ Constants.Metadata.Type, e.Type },
{ Constants.Metadata.ContentType, e.ContentType }
}
},
}
).ConfigureAwait(false);
}
await call.RequestStream.WriteAsync(header).ConfigureAwait(false);

await call.RequestStream.CompleteAsync().ConfigureAwait(false);
} catch (InvalidOperationException exc) {
_log.LogDebug(
exc,
"Got InvalidOperationException when appending events to stream - {streamName}. This is perfectly normal if the connection was closed from the server-side.",
header.Options.StreamIdentifier
);
} catch (RpcException exc) {
_log.LogDebug(
exc,
"Got RpcException when appending events to stream - {streamName}. This is perfectly normal if the connection was closed from the server-side.",
header.Options.StreamIdentifier
);
foreach (var e in eventData) {
await call.RequestStream.WriteAsync(
new AppendReq {
ProposedMessage = new AppendReq.Types.ProposedMessage {
Id = e.EventId.ToDto(),
Data = ByteString.CopyFrom(e.Data.Span),
CustomMetadata = ByteString.CopyFrom(e.Metadata.Span),
Metadata = {
{ Constants.Metadata.Type, e.Type },
{ Constants.Metadata.ContentType, e.ContentType }
}
},
}
).ConfigureAwait(false);
}

await call.RequestStream.CompleteAsync().ConfigureAwait(false);

var response = await call.ResponseAsync.ConfigureAwait(false);

if (response.Success != null)
Expand Down
27 changes: 24 additions & 3 deletions src/EventStore.Client/Interceptors/TypedExceptionInterceptor.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
using System.Diagnostics.CodeAnalysis;
using Grpc.Core;
using Grpc.Core.Interceptors;
using static EventStore.Client.Constants;
Expand Down Expand Up @@ -55,7 +54,7 @@ AsyncClientStreamingCallContinuation<TRequest, TResponse> continuation
var response = continuation(context);

return new AsyncClientStreamingCall<TRequest, TResponse>(
response.RequestStream,
response.RequestStream.Apply(ConvertRpcException),
response.ResponseAsync.Apply(ConvertRpcException),
response.ResponseHeadersAsync,
response.GetStatus,
Expand Down Expand Up @@ -103,7 +102,15 @@ public static IAsyncStreamReader<TRequest> Apply<TRequest>(this IAsyncStreamRead

public static Task<TResponse> Apply<TResponse>(this Task<TResponse> task, Func<RpcException, Exception> convertException) =>
task.ContinueWith(t => t.Exception?.InnerException is RpcException ex ? throw convertException(ex) : t.Result);


public static IClientStreamWriter<TRequest> Apply<TRequest>(
this IClientStreamWriter<TRequest> writer, Func<RpcException, Exception> convertException
) =>
new ExceptionConverterStreamWriter<TRequest>(writer, convertException);

public static Task Apply(this Task task, Func<RpcException, Exception> convertException) =>
task.ContinueWith(t => t.Exception?.InnerException is RpcException ex ? throw convertException(ex) : t);

public static AccessDeniedException ToAccessDeniedException(this RpcException exception) =>
new(exception.Message, exception);

Expand Down Expand Up @@ -142,3 +149,17 @@ public async Task<bool> MoveNext(CancellationToken cancellationToken) {
}
}
}

class ExceptionConverterStreamWriter<TRequest>(
IClientStreamWriter<TRequest> writer,
Func<RpcException, Exception> convertException
)
: IClientStreamWriter<TRequest> {
public WriteOptions? WriteOptions {
get => writer.WriteOptions;
set => writer.WriteOptions = value;
}

public Task WriteAsync(TRequest message) => writer.WriteAsync(message).Apply(convertException);
public Task CompleteAsync() => writer.CompleteAsync().Apply(convertException);
}
66 changes: 14 additions & 52 deletions test/EventStore.Client.Streams.Tests/Append/append_to_stream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -424,47 +424,6 @@ public async Task with_timeout_any_stream_revision_fails_when_operation_expired(
ex.StatusCode.ShouldBe(StatusCode.DeadlineExceeded);
}

[Fact]
public async Task should_not_append_to_stream_when_error_thrown_midway() {
var streamName = Fixture.GetStreamName();
const int initialNumberOfEvents = 5;

// Append some events before
await Fixture.Streams.AppendToStreamAsync(
streamName,
StreamState.Any,
Fixture.CreateTestEvents(initialNumberOfEvents)
);

// Force regular append by passing credentials
await Assert.ThrowsAsync<EnumerationFailedException>(
async () =>{
await Fixture.Streams.AppendToStreamAsync(
streamName,
StreamState.StreamExists,
GetEvents(),
userCredentials: new UserCredentials("admin", "changeit")
);
}
);

// No more events should be appended to the stream
var eventsCount = await Fixture.Streams.ReadStreamAsync(Direction.Forwards, streamName, StreamPosition.Start)
.CountAsync();
eventsCount.ShouldBe(initialNumberOfEvents, "No more events should be appended to the stream");

return;

// Throw an exception after 5 events
IEnumerable<EventData> GetEvents() {
for (var i = 0; i < 5; i++) {
yield return Fixture.CreateTestEvents(1).First();
}

throw new EnumerationFailedException();
}
}

[Fact]
public async Task with_timeout_stream_revision_fails_when_operation_expired() {
var stream = Fixture.GetStreamName();
Expand All @@ -480,36 +439,39 @@ public async Task with_timeout_stream_revision_fails_when_operation_expired() {

ex.StatusCode.ShouldBe(StatusCode.DeadlineExceeded);
}

[Fact]
public async Task when_events_enumerator_throws_the_write_does_not_succeed() {
var streamName = Fixture.GetStreamName();

await Fixture.Streams
.AppendToStreamAsync(streamName, StreamRevision.None, GetEvents())
.AppendToStreamAsync(
streamName,
StreamRevision.None,
GetEvents(),
userCredentials: new UserCredentials(TestCredentials.Root.Username!, TestCredentials.Root.Password!)
)
.ShouldThrowAsync<EnumerationFailedException>();

var result = Fixture.Streams.ReadStreamAsync(Direction.Forwards, streamName, StreamPosition.Start);

var state = await result.ReadState;
var state = await Fixture.Streams.ReadStreamAsync(Direction.Forwards, streamName, StreamPosition.Start)
.ReadState;

state.ShouldBe(ReadState.StreamNotFound);

return;

IEnumerable<EventData> GetEvents() {
var i = 0;
foreach (var evt in Fixture.CreateTestEvents(5)) {
if (i++ % 3 == 0)
for (var i = 0; i < 5; i++) {
if (i % 3 == 0)
throw new EnumerationFailedException();

yield return evt;
yield return Fixture.CreateTestEvents(1).First();
}
}
}

class EnumerationFailedException : Exception { }

public static IEnumerable<object?[]> ArgumentOutOfRangeTestCases() {
yield return new object?[] { StreamState.Any };
yield return new object?[] { ulong.MaxValue - 1UL };
Expand Down

0 comments on commit f28f2c3

Please sign in to comment.