Skip to content

Commit

Permalink
Log exceptions in catch and refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
w1am committed Feb 14, 2024
1 parent 6e70d01 commit 05f64f5
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 91 deletions.
183 changes: 94 additions & 89 deletions src/EventStore.Client.Streams/EventStoreClient.Append.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,8 @@ CancellationToken cancellationToken
EventStoreCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken)
);

IWriteResult writeResult;

try {
await call.RequestStream.WriteAsync(header).ConfigureAwait(false);

foreach (var e in eventData) {
await call.RequestStream.WriteAsync(
new AppendReq {
Expand All @@ -126,105 +123,113 @@ await call.RequestStream.WriteAsync(
}

await call.RequestStream.CompleteAsync().ConfigureAwait(false);
} catch (RpcException) {
// Do nothing so that RpcExceptions propagate to the call.ResponseAsync and be translated by the TypedInterceptor
} catch (InvalidOperationException exc) {
_log.LogDebug(
exc,
"Got InvalidOperationException when appending events to stream - {streamName}. This is perfectly normal if the connection was closed from the server-side.",
header.Options.StreamIdentifier
);
} catch (RpcException exc) {
_log.LogDebug(
exc,
"Got RpcException when appending events to stream - {streamName}. This is perfectly normal if the connection was closed from the server-side.",
header.Options.StreamIdentifier
);
}

var response = await call.ResponseAsync.ConfigureAwait(false);

if (response.Success != null) {
writeResult = new SuccessResult(
response.Success.CurrentRevisionOptionCase ==
AppendResp.Types.Success.CurrentRevisionOptionOneofCase.NoStream
? StreamRevision.None
: new StreamRevision(response.Success.CurrentRevision),
response.Success.PositionOptionCase
== AppendResp.Types.Success.PositionOptionOneofCase.Position
? new Position(
response.Success.Position.CommitPosition,
response.Success.Position.PreparePosition
)
: default
);
if (response.Success != null)
return HandleSuccessAppend(response, header);

_log.LogDebug(
"Append to stream succeeded - {streamName}@{logPosition}/{nextExpectedVersion}.",
header.Options.StreamIdentifier,
writeResult.LogPosition,
writeResult.NextExpectedStreamRevision
);
} else {
if (response.WrongExpectedVersion != null) {
var actualStreamRevision =
response.WrongExpectedVersion.CurrentRevisionOptionCase switch {
AppendResp.Types.WrongExpectedVersion.CurrentRevisionOptionOneofCase
.CurrentNoStream =>
StreamRevision.None,
_ => new StreamRevision(response.WrongExpectedVersion.CurrentRevision)
};
if (response.WrongExpectedVersion == null)
throw new InvalidOperationException("The operation completed with an unexpected result.");

_log.LogDebug(
"Append to stream failed with Wrong Expected Version - {streamName}/{expectedRevision}/{currentRevision}",
header.Options.StreamIdentifier,
new StreamRevision(header.Options.Revision),
actualStreamRevision
);
return HandleWrongExpectedRevision(response, header, operationOptions);
}

if (operationOptions.ThrowOnAppendFailure) {
if (response.WrongExpectedVersion.ExpectedRevisionOptionCase == AppendResp.Types
.WrongExpectedVersion.ExpectedRevisionOptionOneofCase
.ExpectedRevision) {
throw new WrongExpectedVersionException(
header.Options.StreamIdentifier!,
new StreamRevision(response.WrongExpectedVersion.ExpectedRevision),
actualStreamRevision
);
}
private IWriteResult HandleSuccessAppend(AppendResp response, AppendReq header) {
var currentRevision = response.Success.CurrentRevisionOptionCase ==
AppendResp.Types.Success.CurrentRevisionOptionOneofCase.NoStream
? StreamRevision.None
: new StreamRevision(response.Success.CurrentRevision);

var expectedStreamState =
response.WrongExpectedVersion.ExpectedRevisionOptionCase switch {
AppendResp.Types.WrongExpectedVersion.ExpectedRevisionOptionOneofCase
.ExpectedAny =>
StreamState.Any,
AppendResp.Types.WrongExpectedVersion.ExpectedRevisionOptionOneofCase
.ExpectedNoStream =>
StreamState.NoStream,
AppendResp.Types.WrongExpectedVersion.ExpectedRevisionOptionOneofCase
.ExpectedStreamExists =>
StreamState.StreamExists,
_ => StreamState.Any
};

throw new WrongExpectedVersionException(
header.Options.StreamIdentifier!,
expectedStreamState,
actualStreamRevision
);
}
var position = response.Success.PositionOptionCase ==
AppendResp.Types.Success.PositionOptionOneofCase.Position
? new Position(response.Success.Position.CommitPosition, response.Success.Position.PreparePosition)
: default;

if (response.WrongExpectedVersion.ExpectedRevisionOptionCase == AppendResp.Types
.WrongExpectedVersion.ExpectedRevisionOptionOneofCase.ExpectedRevision) {
writeResult = new WrongExpectedVersionResult(
header.Options.StreamIdentifier!,
new StreamRevision(response.WrongExpectedVersion.ExpectedRevision),
actualStreamRevision
);
} else {
writeResult = new WrongExpectedVersionResult(
header.Options.StreamIdentifier!,
StreamRevision.None,
actualStreamRevision
);
}
} else {
throw new InvalidOperationException("The operation completed with an unexpected result.");
_log.LogDebug(
"Append to stream succeeded - {streamName}@{logPosition}/{nextExpectedVersion}.",
header.Options.StreamIdentifier,
position,
currentRevision);

return new SuccessResult(currentRevision, position);
}

private IWriteResult HandleWrongExpectedRevision(
AppendResp response, AppendReq header, EventStoreClientOperationOptions operationOptions
) {
var actualStreamRevision =
response.WrongExpectedVersion.CurrentRevisionOptionCase switch {
AppendResp.Types.WrongExpectedVersion.CurrentRevisionOptionOneofCase
.CurrentNoStream =>
StreamRevision.None,
_ => new StreamRevision(response.WrongExpectedVersion.CurrentRevision)
};

_log.LogDebug(
"Append to stream failed with Wrong Expected Version - {streamName}/{expectedRevision}/{currentRevision}",
header.Options.StreamIdentifier,
new StreamRevision(header.Options.Revision),
actualStreamRevision
);

if (operationOptions.ThrowOnAppendFailure) {
if (response.WrongExpectedVersion.ExpectedRevisionOptionCase == AppendResp.Types
.WrongExpectedVersion.ExpectedRevisionOptionOneofCase
.ExpectedRevision) {
throw new WrongExpectedVersionException(
header.Options.StreamIdentifier!,
new StreamRevision(response.WrongExpectedVersion.ExpectedRevision),
actualStreamRevision
);
}
}

var expectedStreamState =
response.WrongExpectedVersion.ExpectedRevisionOptionCase switch {
AppendResp.Types.WrongExpectedVersion.ExpectedRevisionOptionOneofCase
.ExpectedAny =>
StreamState.Any,
AppendResp.Types.WrongExpectedVersion.ExpectedRevisionOptionOneofCase
.ExpectedNoStream =>
StreamState.NoStream,
AppendResp.Types.WrongExpectedVersion.ExpectedRevisionOptionOneofCase
.ExpectedStreamExists =>
StreamState.StreamExists,
_ => StreamState.Any
};

return writeResult;
}
throw new WrongExpectedVersionException(
header.Options.StreamIdentifier!,
expectedStreamState,
actualStreamRevision
);
}

var expectedRevision = response.WrongExpectedVersion.ExpectedRevisionOptionCase
== AppendResp.Types.WrongExpectedVersion.ExpectedRevisionOptionOneofCase
.ExpectedRevision
? new StreamRevision(response.WrongExpectedVersion.ExpectedRevision)
: StreamRevision.None;

return new WrongExpectedVersionResult(
header.Options.StreamIdentifier!,
expectedRevision,
actualStreamRevision
);
}

private class StreamAppender : IDisposable {
private readonly EventStoreClientSettings _settings;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,8 +449,8 @@ await Fixture.Streams.AppendToStreamAsync(
);

// No more events should be appended to the stream
var result = Fixture.Streams.ReadStreamAsync(Direction.Forwards, streamName, StreamPosition.Start);
var eventsCount = await result.CountAsync();
var eventsCount = await Fixture.Streams.ReadStreamAsync(Direction.Forwards, streamName, StreamPosition.Start)
.CountAsync();
eventsCount.ShouldBe(initialNumberOfEvents, "No more events should be appended to the stream");

return;
Expand Down

0 comments on commit 05f64f5

Please sign in to comment.