diff --git a/src/Elastic.Channels/BufferedChannelBase.cs b/src/Elastic.Channels/BufferedChannelBase.cs index 395848c..fac03e5 100644 --- a/src/Elastic.Channels/BufferedChannelBase.cs +++ b/src/Elastic.Channels/BufferedChannelBase.cs @@ -155,6 +155,9 @@ await ConsumeInboundEventsAsync(maxOut, BufferOptions.OutboundBufferMaxLifetime) /// An overall cancellation token that may be externally provided protected CancellationTokenSource TokenSource { get; } + /// Internal cancellation token for signalling that all publishing activity has completed. + private readonly CancellationTokenSource _exitCancelSource = new CancellationTokenSource(); + private Channel> OutChannel { get; } private Channel InChannel { get; } private BufferOptions BufferOptions => Options.BufferOptions; @@ -236,7 +239,7 @@ private async Task ConsumeOutboundEventsAsync() var taskList = new List(_maxConcurrency); while (await OutChannel.Reader.WaitToReadAsync().ConfigureAwait(false)) - // ReSharper disable once RemoveRedundantBraces + // ReSharper disable once RemoveRedundantBraces { if (TokenSource.Token.IsCancellationRequested) break; if (_signal is { IsSet: true }) break; @@ -257,6 +260,7 @@ private async Task ConsumeOutboundEventsAsync() } } await Task.WhenAll(taskList).ConfigureAwait(false); + _exitCancelSource.Cancel(); _callbacks.OutboundChannelExitedCallback?.Invoke(); } @@ -278,7 +282,7 @@ private async Task ExportBufferAsync(ArraySegment items, IOutboundBuffer { response = await ExportAsync(items, TokenSource.Token).ConfigureAwait(false); _callbacks.ExportResponseCallback?.Invoke(response, - new WriteTrackingBufferEventData { Count = outboundBuffer.Count, DurationSinceFirstWrite = outboundBuffer.DurationSinceFirstWrite }); + new WriteTrackingBufferEventData { Count = outboundBuffer.Count, DurationSinceFirstWrite = outboundBuffer.DurationSinceFirstWrite }); } catch (Exception e) { @@ -372,12 +376,21 @@ public override string ToString() => /// public virtual void Dispose() { - InboundBuffer.Dispose(); try { - TokenSource.Cancel(); + // Mark inchannel completed to flush buffer and end task, signalling end to outchannel InChannel.Writer.TryComplete(); - OutChannel.Writer.TryComplete(); + // Wait a reasonable duration for the outchannel to complete before disposing the rest + if (!_exitCancelSource.IsCancellationRequested) + { + // Allow one retry before we exit + var maxwait = Options.BufferOptions.ExportBackoffPeriod(1); + _exitCancelSource.Token.WaitHandle.WaitOne(maxwait); + } + _exitCancelSource.Dispose(); + InboundBuffer.Dispose(); + TokenSource.Cancel(); + TokenSource.Dispose(); } catch {