From 05f64f5c6d14b80e582785d10dcc589819a84a51 Mon Sep 17 00:00:00 2001 From: William Chong Date: Wed, 14 Feb 2024 12:06:53 +0400 Subject: [PATCH] Log exceptions in catch and refactor --- .../EventStoreClient.Append.cs | 183 +++++++++--------- .../Append/append_to_stream.cs | 4 +- 2 files changed, 96 insertions(+), 91 deletions(-) diff --git a/src/EventStore.Client.Streams/EventStoreClient.Append.cs b/src/EventStore.Client.Streams/EventStoreClient.Append.cs index 5441f6267..01ed58a0e 100644 --- a/src/EventStore.Client.Streams/EventStoreClient.Append.cs +++ b/src/EventStore.Client.Streams/EventStoreClient.Append.cs @@ -104,11 +104,8 @@ CancellationToken cancellationToken EventStoreCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken) ); - IWriteResult writeResult; - try { await call.RequestStream.WriteAsync(header).ConfigureAwait(false); - foreach (var e in eventData) { await call.RequestStream.WriteAsync( new AppendReq { @@ -126,105 +123,113 @@ await call.RequestStream.WriteAsync( } await call.RequestStream.CompleteAsync().ConfigureAwait(false); - } catch (RpcException) { - // Do nothing so that RpcExceptions propagate to the call.ResponseAsync and be translated by the TypedInterceptor + } catch (InvalidOperationException exc) { + _log.LogDebug( + exc, + "Got InvalidOperationException when appending events to stream - {streamName}. This is perfectly normal if the connection was closed from the server-side.", + header.Options.StreamIdentifier + ); + } catch (RpcException exc) { + _log.LogDebug( + exc, + "Got RpcException when appending events to stream - {streamName}. This is perfectly normal if the connection was closed from the server-side.", + header.Options.StreamIdentifier + ); } var response = await call.ResponseAsync.ConfigureAwait(false); - if (response.Success != null) { - writeResult = new SuccessResult( - response.Success.CurrentRevisionOptionCase == - AppendResp.Types.Success.CurrentRevisionOptionOneofCase.NoStream - ? StreamRevision.None - : new StreamRevision(response.Success.CurrentRevision), - response.Success.PositionOptionCase - == AppendResp.Types.Success.PositionOptionOneofCase.Position - ? new Position( - response.Success.Position.CommitPosition, - response.Success.Position.PreparePosition - ) - : default - ); + if (response.Success != null) + return HandleSuccessAppend(response, header); - _log.LogDebug( - "Append to stream succeeded - {streamName}@{logPosition}/{nextExpectedVersion}.", - header.Options.StreamIdentifier, - writeResult.LogPosition, - writeResult.NextExpectedStreamRevision - ); - } else { - if (response.WrongExpectedVersion != null) { - var actualStreamRevision = - response.WrongExpectedVersion.CurrentRevisionOptionCase switch { - AppendResp.Types.WrongExpectedVersion.CurrentRevisionOptionOneofCase - .CurrentNoStream => - StreamRevision.None, - _ => new StreamRevision(response.WrongExpectedVersion.CurrentRevision) - }; + if (response.WrongExpectedVersion == null) + throw new InvalidOperationException("The operation completed with an unexpected result."); - _log.LogDebug( - "Append to stream failed with Wrong Expected Version - {streamName}/{expectedRevision}/{currentRevision}", - header.Options.StreamIdentifier, - new StreamRevision(header.Options.Revision), - actualStreamRevision - ); + return HandleWrongExpectedRevision(response, header, operationOptions); + } - if (operationOptions.ThrowOnAppendFailure) { - if (response.WrongExpectedVersion.ExpectedRevisionOptionCase == AppendResp.Types - .WrongExpectedVersion.ExpectedRevisionOptionOneofCase - .ExpectedRevision) { - throw new WrongExpectedVersionException( - header.Options.StreamIdentifier!, - new StreamRevision(response.WrongExpectedVersion.ExpectedRevision), - actualStreamRevision - ); - } + private IWriteResult HandleSuccessAppend(AppendResp response, AppendReq header) { + var currentRevision = response.Success.CurrentRevisionOptionCase == + AppendResp.Types.Success.CurrentRevisionOptionOneofCase.NoStream + ? StreamRevision.None + : new StreamRevision(response.Success.CurrentRevision); - var expectedStreamState = - response.WrongExpectedVersion.ExpectedRevisionOptionCase switch { - AppendResp.Types.WrongExpectedVersion.ExpectedRevisionOptionOneofCase - .ExpectedAny => - StreamState.Any, - AppendResp.Types.WrongExpectedVersion.ExpectedRevisionOptionOneofCase - .ExpectedNoStream => - StreamState.NoStream, - AppendResp.Types.WrongExpectedVersion.ExpectedRevisionOptionOneofCase - .ExpectedStreamExists => - StreamState.StreamExists, - _ => StreamState.Any - }; - - throw new WrongExpectedVersionException( - header.Options.StreamIdentifier!, - expectedStreamState, - actualStreamRevision - ); - } + var position = response.Success.PositionOptionCase == + AppendResp.Types.Success.PositionOptionOneofCase.Position + ? new Position(response.Success.Position.CommitPosition, response.Success.Position.PreparePosition) + : default; - if (response.WrongExpectedVersion.ExpectedRevisionOptionCase == AppendResp.Types - .WrongExpectedVersion.ExpectedRevisionOptionOneofCase.ExpectedRevision) { - writeResult = new WrongExpectedVersionResult( - header.Options.StreamIdentifier!, - new StreamRevision(response.WrongExpectedVersion.ExpectedRevision), - actualStreamRevision - ); - } else { - writeResult = new WrongExpectedVersionResult( - header.Options.StreamIdentifier!, - StreamRevision.None, - actualStreamRevision - ); - } - } else { - throw new InvalidOperationException("The operation completed with an unexpected result."); + _log.LogDebug( + "Append to stream succeeded - {streamName}@{logPosition}/{nextExpectedVersion}.", + header.Options.StreamIdentifier, + position, + currentRevision); + + return new SuccessResult(currentRevision, position); + } + + private IWriteResult HandleWrongExpectedRevision( + AppendResp response, AppendReq header, EventStoreClientOperationOptions operationOptions + ) { + var actualStreamRevision = + response.WrongExpectedVersion.CurrentRevisionOptionCase switch { + AppendResp.Types.WrongExpectedVersion.CurrentRevisionOptionOneofCase + .CurrentNoStream => + StreamRevision.None, + _ => new StreamRevision(response.WrongExpectedVersion.CurrentRevision) + }; + + _log.LogDebug( + "Append to stream failed with Wrong Expected Version - {streamName}/{expectedRevision}/{currentRevision}", + header.Options.StreamIdentifier, + new StreamRevision(header.Options.Revision), + actualStreamRevision + ); + + if (operationOptions.ThrowOnAppendFailure) { + if (response.WrongExpectedVersion.ExpectedRevisionOptionCase == AppendResp.Types + .WrongExpectedVersion.ExpectedRevisionOptionOneofCase + .ExpectedRevision) { + throw new WrongExpectedVersionException( + header.Options.StreamIdentifier!, + new StreamRevision(response.WrongExpectedVersion.ExpectedRevision), + actualStreamRevision + ); } - } + var expectedStreamState = + response.WrongExpectedVersion.ExpectedRevisionOptionCase switch { + AppendResp.Types.WrongExpectedVersion.ExpectedRevisionOptionOneofCase + .ExpectedAny => + StreamState.Any, + AppendResp.Types.WrongExpectedVersion.ExpectedRevisionOptionOneofCase + .ExpectedNoStream => + StreamState.NoStream, + AppendResp.Types.WrongExpectedVersion.ExpectedRevisionOptionOneofCase + .ExpectedStreamExists => + StreamState.StreamExists, + _ => StreamState.Any + }; - return writeResult; - } + throw new WrongExpectedVersionException( + header.Options.StreamIdentifier!, + expectedStreamState, + actualStreamRevision + ); + } + var expectedRevision = response.WrongExpectedVersion.ExpectedRevisionOptionCase + == AppendResp.Types.WrongExpectedVersion.ExpectedRevisionOptionOneofCase + .ExpectedRevision + ? new StreamRevision(response.WrongExpectedVersion.ExpectedRevision) + : StreamRevision.None; + + return new WrongExpectedVersionResult( + header.Options.StreamIdentifier!, + expectedRevision, + actualStreamRevision + ); + } private class StreamAppender : IDisposable { private readonly EventStoreClientSettings _settings; diff --git a/test/EventStore.Client.Streams.Tests/Append/append_to_stream.cs b/test/EventStore.Client.Streams.Tests/Append/append_to_stream.cs index 02ccaeca4..b961ce565 100644 --- a/test/EventStore.Client.Streams.Tests/Append/append_to_stream.cs +++ b/test/EventStore.Client.Streams.Tests/Append/append_to_stream.cs @@ -449,8 +449,8 @@ await Fixture.Streams.AppendToStreamAsync( ); // No more events should be appended to the stream - var result = Fixture.Streams.ReadStreamAsync(Direction.Forwards, streamName, StreamPosition.Start); - var eventsCount = await result.CountAsync(); + var eventsCount = await Fixture.Streams.ReadStreamAsync(Direction.Forwards, streamName, StreamPosition.Start) + .CountAsync(); eventsCount.ShouldBe(initialNumberOfEvents, "No more events should be appended to the stream"); return;