From f28f2c32d49e22231e114c2387992068e3dd0843 Mon Sep 17 00:00:00 2001 From: William Chong Date: Thu, 15 Feb 2024 10:24:08 +0400 Subject: [PATCH] Add interceptor on request stream --- .../EventStoreClient.Append.cs | 67 ++++++++----------- .../Interceptors/TypedExceptionInterceptor.cs | 27 +++++++- .../Append/append_to_stream.cs | 66 ++++-------------- 3 files changed, 65 insertions(+), 95 deletions(-) diff --git a/src/EventStore.Client.Streams/EventStoreClient.Append.cs b/src/EventStore.Client.Streams/EventStoreClient.Append.cs index 01ed58a0e..2cf59db32 100644 --- a/src/EventStore.Client.Streams/EventStoreClient.Append.cs +++ b/src/EventStore.Client.Streams/EventStoreClient.Append.cs @@ -92,51 +92,38 @@ public async Task AppendToStreamAsync( } private async ValueTask AppendToStreamInternal( - CallInvoker callInvoker, - AppendReq header, - IEnumerable eventData, - EventStoreClientOperationOptions operationOptions, - TimeSpan? deadline, - UserCredentials? userCredentials, - CancellationToken cancellationToken - ) { + CallInvoker callInvoker, + AppendReq header, + IEnumerable eventData, + EventStoreClientOperationOptions operationOptions, + TimeSpan? deadline, + UserCredentials? userCredentials, + CancellationToken cancellationToken + ) { using var call = new Streams.Streams.StreamsClient(callInvoker).Append( - EventStoreCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken) - ); + EventStoreCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken) + ); - try { - await call.RequestStream.WriteAsync(header).ConfigureAwait(false); - foreach (var e in eventData) { - await call.RequestStream.WriteAsync( - new AppendReq { - ProposedMessage = new AppendReq.Types.ProposedMessage { - Id = e.EventId.ToDto(), - Data = ByteString.CopyFrom(e.Data.Span), - CustomMetadata = ByteString.CopyFrom(e.Metadata.Span), - Metadata = { - { Constants.Metadata.Type, e.Type }, - { Constants.Metadata.ContentType, e.ContentType } - } - }, - } - ).ConfigureAwait(false); - } + await call.RequestStream.WriteAsync(header).ConfigureAwait(false); - await call.RequestStream.CompleteAsync().ConfigureAwait(false); - } catch (InvalidOperationException exc) { - _log.LogDebug( - exc, - "Got InvalidOperationException when appending events to stream - {streamName}. This is perfectly normal if the connection was closed from the server-side.", - header.Options.StreamIdentifier - ); - } catch (RpcException exc) { - _log.LogDebug( - exc, - "Got RpcException when appending events to stream - {streamName}. This is perfectly normal if the connection was closed from the server-side.", - header.Options.StreamIdentifier - ); + foreach (var e in eventData) { + await call.RequestStream.WriteAsync( + new AppendReq { + ProposedMessage = new AppendReq.Types.ProposedMessage { + Id = e.EventId.ToDto(), + Data = ByteString.CopyFrom(e.Data.Span), + CustomMetadata = ByteString.CopyFrom(e.Metadata.Span), + Metadata = { + { Constants.Metadata.Type, e.Type }, + { Constants.Metadata.ContentType, e.ContentType } + } + }, + } + ).ConfigureAwait(false); } + await call.RequestStream.CompleteAsync().ConfigureAwait(false); + var response = await call.ResponseAsync.ConfigureAwait(false); if (response.Success != null) diff --git a/src/EventStore.Client/Interceptors/TypedExceptionInterceptor.cs b/src/EventStore.Client/Interceptors/TypedExceptionInterceptor.cs index d24dd2a8e..0f1368805 100644 --- a/src/EventStore.Client/Interceptors/TypedExceptionInterceptor.cs +++ b/src/EventStore.Client/Interceptors/TypedExceptionInterceptor.cs @@ -1,4 +1,3 @@ -using System.Diagnostics.CodeAnalysis; using Grpc.Core; using Grpc.Core.Interceptors; using static EventStore.Client.Constants; @@ -55,7 +54,7 @@ AsyncClientStreamingCallContinuation continuation var response = continuation(context); return new AsyncClientStreamingCall( - response.RequestStream, + response.RequestStream.Apply(ConvertRpcException), response.ResponseAsync.Apply(ConvertRpcException), response.ResponseHeadersAsync, response.GetStatus, @@ -103,7 +102,15 @@ public static IAsyncStreamReader Apply(this IAsyncStreamRead public static Task Apply(this Task task, Func convertException) => task.ContinueWith(t => t.Exception?.InnerException is RpcException ex ? throw convertException(ex) : t.Result); - + + public static IClientStreamWriter Apply( + this IClientStreamWriter writer, Func convertException + ) => + new ExceptionConverterStreamWriter(writer, convertException); + + public static Task Apply(this Task task, Func convertException) => + task.ContinueWith(t => t.Exception?.InnerException is RpcException ex ? throw convertException(ex) : t); + public static AccessDeniedException ToAccessDeniedException(this RpcException exception) => new(exception.Message, exception); @@ -142,3 +149,17 @@ public async Task MoveNext(CancellationToken cancellationToken) { } } } + +class ExceptionConverterStreamWriter( + IClientStreamWriter writer, + Func convertException +) + : IClientStreamWriter { + public WriteOptions? WriteOptions { + get => writer.WriteOptions; + set => writer.WriteOptions = value; + } + + public Task WriteAsync(TRequest message) => writer.WriteAsync(message).Apply(convertException); + public Task CompleteAsync() => writer.CompleteAsync().Apply(convertException); +} diff --git a/test/EventStore.Client.Streams.Tests/Append/append_to_stream.cs b/test/EventStore.Client.Streams.Tests/Append/append_to_stream.cs index b961ce565..989a6b6bf 100644 --- a/test/EventStore.Client.Streams.Tests/Append/append_to_stream.cs +++ b/test/EventStore.Client.Streams.Tests/Append/append_to_stream.cs @@ -424,47 +424,6 @@ public async Task with_timeout_any_stream_revision_fails_when_operation_expired( ex.StatusCode.ShouldBe(StatusCode.DeadlineExceeded); } - [Fact] - public async Task should_not_append_to_stream_when_error_thrown_midway() { - var streamName = Fixture.GetStreamName(); - const int initialNumberOfEvents = 5; - - // Append some events before - await Fixture.Streams.AppendToStreamAsync( - streamName, - StreamState.Any, - Fixture.CreateTestEvents(initialNumberOfEvents) - ); - - // Force regular append by passing credentials - await Assert.ThrowsAsync( - async () =>{ - await Fixture.Streams.AppendToStreamAsync( - streamName, - StreamState.StreamExists, - GetEvents(), - userCredentials: new UserCredentials("admin", "changeit") - ); - } - ); - - // No more events should be appended to the stream - var eventsCount = await Fixture.Streams.ReadStreamAsync(Direction.Forwards, streamName, StreamPosition.Start) - .CountAsync(); - eventsCount.ShouldBe(initialNumberOfEvents, "No more events should be appended to the stream"); - - return; - - // Throw an exception after 5 events - IEnumerable GetEvents() { - for (var i = 0; i < 5; i++) { - yield return Fixture.CreateTestEvents(1).First(); - } - - throw new EnumerationFailedException(); - } - } - [Fact] public async Task with_timeout_stream_revision_fails_when_operation_expired() { var stream = Fixture.GetStreamName(); @@ -480,36 +439,39 @@ public async Task with_timeout_stream_revision_fails_when_operation_expired() { ex.StatusCode.ShouldBe(StatusCode.DeadlineExceeded); } - + [Fact] public async Task when_events_enumerator_throws_the_write_does_not_succeed() { var streamName = Fixture.GetStreamName(); await Fixture.Streams - .AppendToStreamAsync(streamName, StreamRevision.None, GetEvents()) + .AppendToStreamAsync( + streamName, + StreamRevision.None, + GetEvents(), + userCredentials: new UserCredentials(TestCredentials.Root.Username!, TestCredentials.Root.Password!) + ) .ShouldThrowAsync(); - var result = Fixture.Streams.ReadStreamAsync(Direction.Forwards, streamName, StreamPosition.Start); - - var state = await result.ReadState; + var state = await Fixture.Streams.ReadStreamAsync(Direction.Forwards, streamName, StreamPosition.Start) + .ReadState; state.ShouldBe(ReadState.StreamNotFound); - + return; IEnumerable GetEvents() { - var i = 0; - foreach (var evt in Fixture.CreateTestEvents(5)) { - if (i++ % 3 == 0) + for (var i = 0; i < 5; i++) { + if (i % 3 == 0) throw new EnumerationFailedException(); - yield return evt; + yield return Fixture.CreateTestEvents(1).First(); } } } class EnumerationFailedException : Exception { } - + public static IEnumerable ArgumentOutOfRangeTestCases() { yield return new object?[] { StreamState.Any }; yield return new object?[] { ulong.MaxValue - 1UL };