Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dispose on token cancellation request #314

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
using EventStore.Client.PersistentSubscriptions;
using Grpc.Core;
using Microsoft.Extensions.Logging;

namespace EventStore.Client {
/// <summary>
/// Represents a persistent subscription connection.
/// </summary>
public class PersistentSubscription : IDisposable {
private readonly EventStorePersistentSubscriptionsClient.PersistentSubscriptionResult _persistentSubscriptionResult;
private readonly EventStorePersistentSubscriptionsClient.PersistentSubscriptionResult
_persistentSubscriptionResult;

private readonly IAsyncEnumerator<PersistentSubscriptionMessage> _enumerator;
private readonly Func<PersistentSubscription, ResolvedEvent, int?, CancellationToken, Task> _eventAppeared;
private readonly Action<PersistentSubscription, SubscriptionDroppedReason, Exception?> _subscriptionDropped;
Expand All @@ -25,7 +25,8 @@ internal static async Task<PersistentSubscription> Confirm(
EventStorePersistentSubscriptionsClient.PersistentSubscriptionResult persistentSubscriptionResult,
Func<PersistentSubscription, ResolvedEvent, int?, CancellationToken, Task> eventAppeared,
Action<PersistentSubscription, SubscriptionDroppedReason, Exception?> subscriptionDropped,
ILogger log, UserCredentials? userCredentials, CancellationToken cancellationToken = default) {
ILogger log, UserCredentials? userCredentials, CancellationToken cancellationToken = default
) {
var enumerator = persistentSubscriptionResult
.Messages
.GetAsyncEnumerator(cancellationToken);
Expand All @@ -34,11 +35,20 @@ internal static async Task<PersistentSubscription> Confirm(

return (result, enumerator.Current) switch {
(true, PersistentSubscriptionMessage.SubscriptionConfirmation (var subscriptionId)) =>
new PersistentSubscription(persistentSubscriptionResult, enumerator, subscriptionId, eventAppeared,
subscriptionDropped, log, cancellationToken),
new PersistentSubscription(
persistentSubscriptionResult,
enumerator,
subscriptionId,
eventAppeared,
subscriptionDropped,
log,
cancellationToken
),
(true, PersistentSubscriptionMessage.NotFound) =>
throw new PersistentSubscriptionNotFoundException(persistentSubscriptionResult.StreamName,
persistentSubscriptionResult.GroupName),
throw new PersistentSubscriptionNotFoundException(
persistentSubscriptionResult.StreamName,
persistentSubscriptionResult.GroupName
),
_ => throw new InvalidOperationException("Subscription could not be confirmed.")
};
}
Expand All @@ -49,14 +59,15 @@ private PersistentSubscription(
IAsyncEnumerator<PersistentSubscriptionMessage> enumerator, string subscriptionId,
Func<PersistentSubscription, ResolvedEvent, int?, CancellationToken, Task> eventAppeared,
Action<PersistentSubscription, SubscriptionDroppedReason, Exception?> subscriptionDropped, ILogger log,
CancellationToken cancellationToken) {
CancellationToken cancellationToken
) {
_persistentSubscriptionResult = persistentSubscriptionResult;
_enumerator = enumerator;
SubscriptionId = subscriptionId;
_eventAppeared = eventAppeared;
_subscriptionDropped = subscriptionDropped;
_log = log;
_cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
_enumerator = enumerator;
SubscriptionId = subscriptionId;
_eventAppeared = eventAppeared;
_subscriptionDropped = subscriptionDropped;
_log = log;
_cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);

Task.Run(Subscribe, _cts.Token);
}
Expand Down Expand Up @@ -91,15 +102,15 @@ public Task Ack(params ResolvedEvent[] resolvedEvents) =>
public Task Ack(IEnumerable<ResolvedEvent> resolvedEvents) =>
Ack(resolvedEvents.Select(resolvedEvent => resolvedEvent.OriginalEvent.EventId));


/// <summary>
/// Acknowledge that a message has failed processing (this will tell the server it has not been processed).
/// </summary>
/// <param name="action">The <see cref="PersistentSubscriptionNakEventAction"/> to take.</param>
/// <param name="reason">A reason given.</param>
/// <param name="eventIds">The <see cref="Uuid"/> of the <see cref="ResolvedEvent" />s to nak. There should not be more than 2000 to nak at a time.</param>
/// <exception cref="ArgumentException">The number of eventIds exceeded the limit of 2000.</exception>
public Task Nack(PersistentSubscriptionNakEventAction action, string reason, params Uuid[] eventIds) => NackInternal(eventIds, action, reason);
public Task Nack(PersistentSubscriptionNakEventAction action, string reason, params Uuid[] eventIds) =>
NackInternal(eventIds, action, reason);

/// <summary>
/// Acknowledge that a message has failed processing (this will tell the server it has not been processed).
Expand All @@ -108,10 +119,15 @@ public Task Ack(IEnumerable<ResolvedEvent> resolvedEvents) =>
/// <param name="reason">A reason given.</param>
/// <param name="resolvedEvents">The <see cref="ResolvedEvent" />s to nak. There should not be more than 2000 to nak at a time.</param>
/// <exception cref="ArgumentException">The number of resolvedEvents exceeded the limit of 2000.</exception>
public Task Nack(PersistentSubscriptionNakEventAction action, string reason,
params ResolvedEvent[] resolvedEvents) =>
Nack(action, reason,
Array.ConvertAll(resolvedEvents, resolvedEvent => resolvedEvent.OriginalEvent.EventId));
public Task Nack(
PersistentSubscriptionNakEventAction action, string reason,
params ResolvedEvent[] resolvedEvents
) =>
Nack(
action,
reason,
Array.ConvertAll(resolvedEvents, resolvedEvent => resolvedEvent.OriginalEvent.EventId)
);

/// <inheritdoc />
public void Dispose() => SubscriptionDropped(SubscriptionDroppedReason.Disposed);
Expand All @@ -121,64 +137,94 @@ private async Task Subscribe() {

try {
while (await _enumerator.MoveNextAsync(_cts.Token).ConfigureAwait(false)) {
if (_enumerator.Current is not PersistentSubscriptionMessage.Event(var resolvedEvent, var retryCount)) {
if (_enumerator.Current is not
PersistentSubscriptionMessage.Event(var resolvedEvent, var retryCount)) {
continue;
}

if (_enumerator.Current is PersistentSubscriptionMessage.NotFound) {
if (_subscriptionDroppedInvoked != 0) {
return;
}
SubscriptionDropped(SubscriptionDroppedReason.ServerError,

SubscriptionDropped(
SubscriptionDroppedReason.ServerError,
new PersistentSubscriptionNotFoundException(
_persistentSubscriptionResult.StreamName, _persistentSubscriptionResult.GroupName));
_persistentSubscriptionResult.StreamName,
_persistentSubscriptionResult.GroupName
)
);

return;
}

_log.LogTrace(
"Persistent Subscription {subscriptionId} received event {streamName}@{streamRevision} {position}",
SubscriptionId, resolvedEvent.OriginalEvent.EventStreamId,
resolvedEvent.OriginalEvent.EventNumber, resolvedEvent.OriginalEvent.Position);
SubscriptionId,
resolvedEvent.OriginalEvent.EventStreamId,
resolvedEvent.OriginalEvent.EventNumber,
resolvedEvent.OriginalEvent.Position
);

try {
await _eventAppeared(
this,
resolvedEvent,
retryCount,
_cts.Token).ConfigureAwait(false);
_cts.Token
).ConfigureAwait(false);
} catch (Exception ex) when (ex is ObjectDisposedException or OperationCanceledException) {
if (_subscriptionDroppedInvoked != 0) {
return;
}

_log.LogWarning(ex,
_log.LogWarning(
ex,
"Persistent Subscription {subscriptionId} was dropped because cancellation was requested by another caller.",
SubscriptionId);
SubscriptionId
);

SubscriptionDropped(SubscriptionDroppedReason.Disposed);

return;
} catch (Exception ex) {
_log.LogError(ex,
_log.LogError(
ex,
"Persistent Subscription {subscriptionId} was dropped because the subscriber made an error.",
SubscriptionId);
SubscriptionId
);

SubscriptionDropped(SubscriptionDroppedReason.SubscriberError, ex);

return;
}
}
} catch (Exception ex) {
if (_subscriptionDroppedInvoked == 0) {
_log.LogError(ex,
"Persistent Subscription {subscriptionId} was dropped because an error occurred on the server.",
SubscriptionId);
SubscriptionDropped(SubscriptionDroppedReason.ServerError, ex);
if (_cts.Token.IsCancellationRequested) {
_log.LogInformation(
"Subscription {subscriptionId} was dropped because cancellation was requested.",
SubscriptionId
);

SubscriptionDropped(SubscriptionDroppedReason.Disposed, ex);
} else {
_log.LogError(
ex,
"Persistent Subscription {subscriptionId} was dropped because an error occurred on the server.",
SubscriptionId
);

SubscriptionDropped(SubscriptionDroppedReason.ServerError, ex);
}
}
} finally {
if (_subscriptionDroppedInvoked == 0) {
_log.LogError(
"Persistent Subscription {subscriptionId} was unexpectedly terminated.",
SubscriptionId);
SubscriptionId
);

SubscriptionDropped(SubscriptionDroppedReason.ServerError);
}
}
Expand Down
21 changes: 15 additions & 6 deletions src/EventStore.Client.Streams/StreamSubscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,22 @@ await _checkpointReached(this, position, _cts.Token)
SubscriptionDropped(SubscriptionDroppedReason.Disposed, ex);
} catch (Exception ex) {
if (_subscriptionDroppedInvoked == 0) {
_log.LogError(
ex,
"Subscription {subscriptionId} was dropped because an error occurred on the server.",
SubscriptionId
);
if (_cts.IsCancellationRequested) {
_log.LogInformation(
"Subscription {subscriptionId} was dropped because cancellation was requested by the client.",
SubscriptionId
);

SubscriptionDropped(SubscriptionDroppedReason.Disposed, ex);
} else {
_log.LogError(
ex,
"Subscription {subscriptionId} was dropped because an error occurred on the server.",
SubscriptionId
);

SubscriptionDropped(SubscriptionDroppedReason.ServerError, ex);
SubscriptionDropped(SubscriptionDroppedReason.ServerError, ex);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// ReSharper disable InconsistentNaming

using EventStore.Client.Streams.Tests.Subscriptions;

namespace EventStore.Client.PersistentSubscriptions.Tests.SubscriptionToAll.Obsolete;

[Obsolete("Will be removed in future release when older subscriptions APIs are removed from the client")]
public class
PersistentSubscriptionDropsDueToCancellationToken(PersistentSubscriptionDropsDueToCancellationToken.Fixture fixture)
: IClassFixture<
PersistentSubscriptionDropsDueToCancellationToken.Fixture> {
static readonly string Group = Guid.NewGuid().ToString();
static readonly string Stream = Guid.NewGuid().ToString();

[SupportsPSToAll.Fact]
public async Task persistent_subscription_to_all_drops_due_to_cancellation_token() {
var subscriptionDropped = new TaskCompletionSource<SubscriptionDroppedResult>();

using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2));

await fixture.Client.CreateToAllAsync(
Group,
cancellationToken: cts.Token,
settings: new PersistentSubscriptionSettings()
);

using var subscription = await fixture.Client.SubscribeToAllAsync(
Group,
async (s, e, r, ct) => await s.Ack(e),
(sub, reason, ex) => subscriptionDropped.SetResult(new SubscriptionDroppedResult(reason, ex)),
userCredentials: TestCredentials.Root,
cancellationToken: cts.Token
)
.WithTimeout();

// wait until the cancellation token cancels
await Task.Delay(TimeSpan.FromSeconds(3));

var result = await subscriptionDropped.Task.WithTimeout();
result.Reason.ShouldBe(SubscriptionDroppedReason.Disposed);
}

[SupportsPSToAll.Fact]
public async Task persistent_subscription_to_stream_drops_due_to_cancellation_token() {
var subscriptionDropped = new TaskCompletionSource<SubscriptionDroppedResult>();

using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2));

await fixture.Client.CreateToStreamAsync(
Group,
Stream,
cancellationToken: cts.Token,
settings: new PersistentSubscriptionSettings()
);

using var subscription = await fixture.Client.SubscribeToStreamAsync(
Group,
Stream,
async (s, e, r, ct) => await s.Ack(e),
(sub, reason, ex) => subscriptionDropped.SetResult(new SubscriptionDroppedResult(reason, ex)),
userCredentials: TestCredentials.Root,
cancellationToken: cts.Token
)
.WithTimeout();

// wait until the cancellation token cancels
await Task.Delay(TimeSpan.FromSeconds(3));

var result = await subscriptionDropped.Task.WithTimeout();
result.Reason.ShouldBe(SubscriptionDroppedReason.Disposed);
}

public class Fixture : EventStoreClientFixture {
protected override Task Given() {
return Task.CompletedTask;
}

protected override Task When() => Task.CompletedTask;
}
}
Loading
Loading