Skip to content

Commit

Permalink
Process response if not cancelled
Browse files Browse the repository at this point in the history
  • Loading branch information
w1am committed Feb 8, 2024
1 parent 831a308 commit 969cba3
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 68 deletions.
139 changes: 72 additions & 67 deletions src/EventStore.Client.Streams/EventStoreClient.Append.cs
Original file line number Diff line number Diff line change
Expand Up @@ -134,94 +134,99 @@ await call.RequestStream.WriteAsync(
);

lct.Cancel();
throw;
} finally {
await call.RequestStream.CompleteAsync().ConfigureAwait(false);
}

var response = await call.ResponseAsync.ConfigureAwait(false);
if (response.Success != null) {
writeResult = new SuccessResult(
response.Success.CurrentRevisionOptionCase ==
AppendResp.Types.Success.CurrentRevisionOptionOneofCase.NoStream
? StreamRevision.None
: new StreamRevision(response.Success.CurrentRevision),
response.Success.PositionOptionCase == AppendResp.Types.Success.PositionOptionOneofCase.Position
? new Position(
response.Success.Position.CommitPosition,
response.Success.Position.PreparePosition
)
: default
);

_log.LogDebug(
"Append to stream succeeded - {streamName}@{logPosition}/{nextExpectedVersion}.",
header.Options.StreamIdentifier,
writeResult.LogPosition,
writeResult.NextExpectedStreamRevision
);
} else {
if (response.WrongExpectedVersion != null) {
var actualStreamRevision = response.WrongExpectedVersion.CurrentRevisionOptionCase switch {
AppendResp.Types.WrongExpectedVersion.CurrentRevisionOptionOneofCase.CurrentNoStream =>
StreamRevision.None,
_ => new StreamRevision(response.WrongExpectedVersion.CurrentRevision)
};
if (!lct.IsCancellationRequested) {
var response = await call.ResponseAsync.ConfigureAwait(false);

if (response.Success != null) {
writeResult = new SuccessResult(
response.Success.CurrentRevisionOptionCase ==
AppendResp.Types.Success.CurrentRevisionOptionOneofCase.NoStream
? StreamRevision.None
: new StreamRevision(response.Success.CurrentRevision),
response.Success.PositionOptionCase == AppendResp.Types.Success.PositionOptionOneofCase.Position
? new Position(
response.Success.Position.CommitPosition,
response.Success.Position.PreparePosition
)
: default
);

_log.LogDebug(
"Append to stream failed with Wrong Expected Version - {streamName}/{expectedRevision}/{currentRevision}",
"Append to stream succeeded - {streamName}@{logPosition}/{nextExpectedVersion}.",
header.Options.StreamIdentifier,
new StreamRevision(header.Options.Revision),
actualStreamRevision
writeResult.LogPosition,
writeResult.NextExpectedStreamRevision
);
} else {
if (response.WrongExpectedVersion != null) {
var actualStreamRevision = response.WrongExpectedVersion.CurrentRevisionOptionCase switch {
AppendResp.Types.WrongExpectedVersion.CurrentRevisionOptionOneofCase.CurrentNoStream =>
StreamRevision.None,
_ => new StreamRevision(response.WrongExpectedVersion.CurrentRevision)
};

_log.LogDebug(
"Append to stream failed with Wrong Expected Version - {streamName}/{expectedRevision}/{currentRevision}",
header.Options.StreamIdentifier,
new StreamRevision(header.Options.Revision),
actualStreamRevision
);

if (operationOptions.ThrowOnAppendFailure) {
if (response.WrongExpectedVersion.ExpectedRevisionOptionCase == AppendResp.Types
.WrongExpectedVersion.ExpectedRevisionOptionOneofCase.ExpectedRevision) {
throw new WrongExpectedVersionException(
header.Options.StreamIdentifier!,
new StreamRevision(response.WrongExpectedVersion.ExpectedRevision),
actualStreamRevision
);
}

var expectedStreamState = response.WrongExpectedVersion.ExpectedRevisionOptionCase switch {
AppendResp.Types.WrongExpectedVersion.ExpectedRevisionOptionOneofCase.ExpectedAny =>
StreamState.Any,
AppendResp.Types.WrongExpectedVersion.ExpectedRevisionOptionOneofCase.ExpectedNoStream =>
StreamState.NoStream,
AppendResp.Types.WrongExpectedVersion.ExpectedRevisionOptionOneofCase
.ExpectedStreamExists =>
StreamState.StreamExists,
_ => StreamState.Any
};

throw new WrongExpectedVersionException(
header.Options.StreamIdentifier!,
expectedStreamState,
actualStreamRevision
);
}

if (operationOptions.ThrowOnAppendFailure) {
if (response.WrongExpectedVersion.ExpectedRevisionOptionCase == AppendResp.Types
.WrongExpectedVersion.ExpectedRevisionOptionOneofCase.ExpectedRevision) {
throw new WrongExpectedVersionException(
writeResult = new WrongExpectedVersionResult(
header.Options.StreamIdentifier!,
new StreamRevision(response.WrongExpectedVersion.ExpectedRevision),
actualStreamRevision
);
} else {
writeResult = new WrongExpectedVersionResult(
header.Options.StreamIdentifier!,
StreamRevision.None,
actualStreamRevision
);
}

var expectedStreamState = response.WrongExpectedVersion.ExpectedRevisionOptionCase switch {
AppendResp.Types.WrongExpectedVersion.ExpectedRevisionOptionOneofCase.ExpectedAny =>
StreamState.Any,
AppendResp.Types.WrongExpectedVersion.ExpectedRevisionOptionOneofCase.ExpectedNoStream =>
StreamState.NoStream,
AppendResp.Types.WrongExpectedVersion.ExpectedRevisionOptionOneofCase
.ExpectedStreamExists =>
StreamState.StreamExists,
_ => StreamState.Any
};

throw new WrongExpectedVersionException(
header.Options.StreamIdentifier!,
expectedStreamState,
actualStreamRevision
);
}

if (response.WrongExpectedVersion.ExpectedRevisionOptionCase == AppendResp.Types
.WrongExpectedVersion.ExpectedRevisionOptionOneofCase.ExpectedRevision) {
writeResult = new WrongExpectedVersionResult(
header.Options.StreamIdentifier!,
new StreamRevision(response.WrongExpectedVersion.ExpectedRevision),
actualStreamRevision
);
} else {
writeResult = new WrongExpectedVersionResult(
header.Options.StreamIdentifier!,
StreamRevision.None,
actualStreamRevision
);
throw new InvalidOperationException("The operation completed with an unexpected result.");
}
} else {
throw new InvalidOperationException("The operation completed with an unexpected result.");
}
} else {
throw new InvalidOperationException("The operation completed with an unexpected result.");
}


return writeResult;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ await Fixture.Streams.AppendToStreamAsync(
);

// Force regular append by passing credentials
await Assert.ThrowsAsync<EnumerationFailedException>(
await Assert.ThrowsAsync<InvalidOperationException>(
async () =>{
await Fixture.Streams.AppendToStreamAsync(
streamName,
Expand Down

0 comments on commit 969cba3

Please sign in to comment.