Skip to content

Commit

Permalink
Ensure broker-originated channel closure completes
Browse files Browse the repository at this point in the history
Fixes #1749

* Ensure `Dispose` and `DisposeAsync` are idempotent and thread-safe.
* Use TaskCompletionSource when `HandleChannelCloseAsync` runs to allow dispose methods to wait.
* Use `Interlocked` for thread safety.
* I like `_isDisposing` better. So sue me!
* Move the `Interlocked.Exchange` code to a getter, for readability.
* Minor nullable change.

Co-authored-by: Daniel Marbach <[email protected]>
  • Loading branch information
lukebakken and danielmarbach committed Jan 9, 2025
1 parent 5b1c9cc commit 1b74f30
Show file tree
Hide file tree
Showing 6 changed files with 245 additions and 47 deletions.
38 changes: 31 additions & 7 deletions projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -48,6 +49,7 @@ internal sealed class AutorecoveringChannel : IChannel, IRecoverable
private AutorecoveringConnection _connection;
private RecoveryAwareChannel _innerChannel;
private bool _disposed;
private int _isDisposing;

private ushort _prefetchCountConsumer;
private ushort _prefetchCountGlobal;
Expand Down Expand Up @@ -256,19 +258,25 @@ public override string ToString()

public async ValueTask DisposeAsync()
{
if (_disposed)
if (IsDisposing)
{
return;
}

if (IsOpen)
try
{
await this.AbortAsync()
.ConfigureAwait(false);
}
if (IsOpen)
{
await this.AbortAsync()
.ConfigureAwait(false);
}

_recordedConsumerTags.Clear();
_disposed = true;
_recordedConsumerTags.Clear();
}
finally
{
_disposed = true;
}
}

public ValueTask<ulong> GetNextPublishSequenceNumberAsync(CancellationToken cancellationToken = default) => InnerChannel.GetNextPublishSequenceNumberAsync(cancellationToken);
Expand Down Expand Up @@ -482,7 +490,23 @@ private void ThrowIfDisposed()
ThrowDisposed();
}

return;

[DoesNotReturn]
static void ThrowDisposed() => throw new ObjectDisposedException(typeof(AutorecoveringChannel).FullName);
}

private bool IsDisposing
{
get
{
if (Interlocked.Exchange(ref _isDisposing, 1) != 0)
{
return true;
}

return false;
}
}
}
}
29 changes: 24 additions & 5 deletions projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -50,6 +51,7 @@ internal sealed partial class AutorecoveringConnection : IConnection

private Connection _innerConnection;
private bool _disposed;
private int _isDisposing;

private Connection InnerConnection
{
Expand Down Expand Up @@ -272,7 +274,7 @@ await RecordChannelAsync(autorecoveringChannel, channelsSemaphoreHeld: false, ca

public async ValueTask DisposeAsync()
{
if (_disposed)
if (IsDisposing)
{
return;
}
Expand All @@ -281,17 +283,18 @@ public async ValueTask DisposeAsync()
{
await _innerConnection.DisposeAsync()
.ConfigureAwait(false);

_channels.Clear();
_recordedEntitiesSemaphore.Dispose();
_channelsSemaphore.Dispose();
_recoveryCancellationTokenSource.Dispose();
}
catch (OperationInterruptedException)
{
// ignored, see rabbitmq/rabbitmq-dotnet-client#133
}
finally
{
_channels.Clear();
_recordedEntitiesSemaphore.Dispose();
_channelsSemaphore.Dispose();
_recoveryCancellationTokenSource.Dispose();
_disposed = true;
}
}
Expand All @@ -307,7 +310,23 @@ private void ThrowIfDisposed()
ThrowDisposed();
}

return;

[DoesNotReturn]
static void ThrowDisposed() => throw new ObjectDisposedException(typeof(AutorecoveringConnection).FullName);
}

private bool IsDisposing
{
get
{
if (Interlocked.Exchange(ref _isDisposing, 1) != 0)
{
return true;
}

return false;
}
}
}
}
132 changes: 103 additions & 29 deletions projects/RabbitMQ.Client/Impl/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,13 @@ internal partial class Channel : IChannel, IRecoverable
private ShutdownEventArgs? _closeReason;
public ShutdownEventArgs? CloseReason => Volatile.Read(ref _closeReason);

private TaskCompletionSource<bool>? _serverOriginatedChannelCloseTcs;

internal readonly IConsumerDispatcher ConsumerDispatcher;

private bool _disposed;
private int _isDisposing;

public Channel(ISession session, CreateChannelOptions createChannelOptions)
{
ContinuationTimeout = createChannelOptions.ContinuationTimeout;
Expand Down Expand Up @@ -514,22 +519,41 @@ public override string ToString()

void IDisposable.Dispose()
{
if (_disposed)
{
return;
}

Dispose(true);
}

protected virtual void Dispose(bool disposing)
{
if (IsDisposing)
{
return;
}

if (disposing)
{
if (IsOpen)
try
{
this.AbortAsync().GetAwaiter().GetResult();
}
if (IsOpen)
{
this.AbortAsync().GetAwaiter().GetResult();
}

ConsumerDispatcher.Dispose();
_rpcSemaphore.Dispose();
_confirmSemaphore.Dispose();
_outstandingPublisherConfirmationsRateLimiter?.Dispose();
_serverOriginatedChannelCloseTcs?.Task.Wait(TimeSpan.FromSeconds(5));

ConsumerDispatcher.Dispose();
_rpcSemaphore.Dispose();
_confirmSemaphore.Dispose();
_outstandingPublisherConfirmationsRateLimiter?.Dispose();
}
finally
{
_disposed = true;
}
}
}

Expand All @@ -543,18 +567,37 @@ await DisposeAsyncCore()

protected virtual async ValueTask DisposeAsyncCore()
{
if (IsOpen)
if (IsDisposing)
{
await this.AbortAsync().ConfigureAwait(false);
return;
}

ConsumerDispatcher.Dispose();
_rpcSemaphore.Dispose();
_confirmSemaphore.Dispose();
if (_outstandingPublisherConfirmationsRateLimiter is not null)
try
{
await _outstandingPublisherConfirmationsRateLimiter.DisposeAsync()
.ConfigureAwait(false);
if (IsOpen)
{
await this.AbortAsync().ConfigureAwait(false);
}

if (_serverOriginatedChannelCloseTcs is not null)
{
await _serverOriginatedChannelCloseTcs.Task.WaitAsync(TimeSpan.FromSeconds(5))
.ConfigureAwait(false);
}

ConsumerDispatcher.Dispose();
_rpcSemaphore.Dispose();
_confirmSemaphore.Dispose();

if (_outstandingPublisherConfirmationsRateLimiter is not null)
{
await _outstandingPublisherConfirmationsRateLimiter.DisposeAsync()
.ConfigureAwait(false);
}
}
finally
{
_disposed = true;
}
}

Expand Down Expand Up @@ -651,23 +694,41 @@ protected virtual ulong AdjustDeliveryTag(ulong deliveryTag)

protected async Task<bool> HandleChannelCloseAsync(IncomingCommand cmd, CancellationToken cancellationToken)
{
var channelClose = new ChannelClose(cmd.MethodSpan);
SetCloseReason(new ShutdownEventArgs(ShutdownInitiator.Peer,
channelClose._replyCode,
channelClose._replyText,
channelClose._classId,
channelClose._methodId));
TaskCompletionSource<bool>? serverOriginatedChannelCloseTcs = _serverOriginatedChannelCloseTcs;
if (serverOriginatedChannelCloseTcs is null)
{
// Attempt to assign the new TCS only if _tcs is still null
_ = Interlocked.CompareExchange(ref _serverOriginatedChannelCloseTcs,
new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously), null);
}

await Session.CloseAsync(_closeReason, notify: false)
.ConfigureAwait(false);
try
{
var channelClose = new ChannelClose(cmd.MethodSpan);
SetCloseReason(new ShutdownEventArgs(ShutdownInitiator.Peer,
channelClose._replyCode,
channelClose._replyText,
channelClose._classId,
channelClose._methodId));

var method = new ChannelCloseOk();
await ModelSendAsync(in method, cancellationToken)
.ConfigureAwait(false);
await Session.CloseAsync(_closeReason, notify: false)
.ConfigureAwait(false);

await Session.NotifyAsync(cancellationToken)
.ConfigureAwait(false);
return true;
var method = new ChannelCloseOk();
await ModelSendAsync(in method, cancellationToken)
.ConfigureAwait(false);

await Session.NotifyAsync(cancellationToken)
.ConfigureAwait(false);

_serverOriginatedChannelCloseTcs?.TrySetResult(true);
return true;
}
catch (Exception ex)
{
_serverOriginatedChannelCloseTcs?.TrySetException(ex);
throw;
}
}

protected async Task<bool> HandleChannelCloseOkAsync(IncomingCommand cmd, CancellationToken cancellationToken)
Expand Down Expand Up @@ -1587,5 +1648,18 @@ private Task<bool> DispatchCommandAsync(IncomingCommand cmd, CancellationToken c
}
}
}

private bool IsDisposing
{
get
{
if (Interlocked.Exchange(ref _isDisposing, 1) != 0)
{
return true;
}

return false;
}
}
}
}
28 changes: 22 additions & 6 deletions projects/RabbitMQ.Client/Impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Runtime.CompilerServices;
using System.Threading;
Expand All @@ -46,6 +47,7 @@ namespace RabbitMQ.Client.Framing
internal sealed partial class Connection : IConnection
{
private bool _disposed;
private int _isDisposing;
private volatile bool _closed;

private readonly ConnectionConfig _config;
Expand Down Expand Up @@ -489,7 +491,7 @@ internal ValueTask WriteAsync(RentedMemory frames, CancellationToken cancellatio

public async ValueTask DisposeAsync()
{
if (_disposed)
if (IsDisposing)
{
return;
}
Expand Down Expand Up @@ -523,23 +525,37 @@ private void ThrowIfDisposed()
{
if (_disposed)
{
ThrowObjectDisposedException();
ThrowDisposed();
}

static void ThrowObjectDisposedException()
{
throw new ObjectDisposedException(typeof(Connection).FullName);
}
return;

[DoesNotReturn]
static void ThrowDisposed() => throw new ObjectDisposedException(typeof(Connection).FullName);
}

public override string ToString()
{
return $"Connection({_id},{Endpoint})";
}

[DoesNotReturn]
private static void ThrowAlreadyClosedException(ShutdownEventArgs closeReason)
{
throw new AlreadyClosedException(closeReason);
}

private bool IsDisposing
{
get
{
if (Interlocked.Exchange(ref _isDisposing, 1) != 0)
{
return true;
}

return false;
}
}
}
}
Loading

0 comments on commit 1b74f30

Please sign in to comment.