Skip to content

Commit

Permalink
Keep sub alive when reading channel (#506)
Browse files Browse the repository at this point in the history
* keep sub alive when reading channel

Signed-off-by: Caleb Lloyd <[email protected]>

* refactor memory tests to use Task.Run

Signed-off-by: Caleb Lloyd <[email protected]>

* subscribe benchmark

Signed-off-by: Caleb Lloyd <[email protected]>

---------

Signed-off-by: Caleb Lloyd <[email protected]>
  • Loading branch information
caleblloyd authored Jun 4, 2024
1 parent 177112b commit 2ff424c
Show file tree
Hide file tree
Showing 7 changed files with 385 additions and 55 deletions.
3 changes: 3 additions & 0 deletions sandbox/MicroBenchmark/PublishParallelBench.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ public async Task Setup()
await _nats.ConnectAsync();
}

[GlobalCleanup]
public async Task Cleanup() => await _nats.DisposeAsync();

[Benchmark]
public async Task PublishParallelAsync()
{
Expand Down
3 changes: 3 additions & 0 deletions sandbox/MicroBenchmark/PublishSerialBench.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ public async Task SetupAsync()
await _nats.ConnectAsync();
}

[GlobalCleanup]
public async Task Cleanup() => await _nats.DisposeAsync();

[Benchmark]
public async Task PublishAsync()
{
Expand Down
132 changes: 132 additions & 0 deletions sandbox/MicroBenchmark/Subscribe.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
using System.Diagnostics;
using BenchmarkDotNet.Attributes;
using NATS.Client.Core;

#pragma warning disable CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider declaring as nullable.

namespace MicroBenchmark;

[MemoryDiagnoser]
[ShortRunJob]
[PlainExporter]
public class Subscribe
{
private const int TotalMsgs = 500_000;
private NatsConnection _nats;
private CancellationTokenSource _cts;
private Task _pubTask;

[GlobalSetup]
public async Task Setup()
{
_nats = new NatsConnection(NatsOpts.Default);
await _nats.ConnectAsync();
}

[GlobalCleanup]
public async Task Cleanup() => await _nats.DisposeAsync();

[IterationSetup]
public void IterSetup()
{
_cts = new CancellationTokenSource();
_pubTask = PubTask(_cts);
}

[IterationCleanup]
public void IterCleanup()
{
_cts.Cancel();
_pubTask.GetAwaiter().GetResult();
}

[Benchmark]
public async Task SubscribeAsync()
{
var count = 0;
#pragma warning disable SA1312
await foreach (var _ in _nats.SubscribeAsync<string>("test"))
#pragma warning restore SA1312
{
if (++count >= TotalMsgs)
{
return;
}
}
}

[Benchmark]
public async Task CoreWait()
{
var count = 0;
await using var sub = await _nats.SubscribeCoreAsync<string>("test");
while (await sub.Msgs.WaitToReadAsync())
{
while (sub.Msgs.TryRead(out _))
{
if (++count >= TotalMsgs)
{
return;
}
}
}
}

[Benchmark]
public async Task CoreRead()
{
var count = 0;
await using var sub = await _nats.SubscribeCoreAsync<string>("test");
while (true)
{
await sub.Msgs.ReadAsync();
if (++count >= TotalMsgs)
{
return;
}
}
}

[Benchmark]
public async Task CoreReadAll()
{
var count = 0;
await using var sub = await _nats.SubscribeCoreAsync<string>("test");
#pragma warning disable SA1312
await foreach (var _ in sub.Msgs.ReadAllAsync())
#pragma warning restore SA1312
{
if (++count >= TotalMsgs)
{
return;
}
}
}

// limit pub to the same rate across benchmarks
// pub in batches so that groups of messages are available
private Task PubTask(CancellationTokenSource cts) =>
Task.Run(async () =>
{
const long pubMaxPerSecond = TotalMsgs;
const long batchSize = 100;
const long ticksBetweenBatches = TimeSpan.TicksPerSecond / pubMaxPerSecond * batchSize;
var sw = new Stopwatch();
sw.Start();
var lastTick = sw.ElapsedTicks;
var i = 0L;
while (!cts.IsCancellationRequested)
{
await _nats.PublishAsync("test", "data");
if (++i % batchSize == 0)
{
while (sw.ElapsedTicks - lastTick < ticksBetweenBatches)
{
}
lastTick = sw.ElapsedTicks;
}
}
});
}
112 changes: 101 additions & 11 deletions src/NATS.Client.Core/Internal/ActivityEndingMsgReader.cs
Original file line number Diff line number Diff line change
@@ -1,21 +1,66 @@
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading.Channels;

namespace NATS.Client.Core.Internal;

// ActivityEndingMsgReader servers 2 purposes
// 1. End activity for OpenTelemetry
// 2. Keep the INatsSub<T> from being garbage collected as long as calls interacting
// with the _inner channel are being made
// To achieve (1):
// Calls that result in a read from the _inner channel should msg.Headers?.Activity?.Dispose()
// To achieve (2):
// Synchronous calls should call GC.KeepAlive(_sub); immediately before returning
// Asynchronous calls should allocate a GCHandle.Alloc(_sub) at the start of the method,
// and then free it in a try/finally block
internal sealed class ActivityEndingMsgReader<T> : ChannelReader<NatsMsg<T>>
{
private readonly ChannelReader<NatsMsg<T>> _inner;

public ActivityEndingMsgReader(ChannelReader<NatsMsg<T>> inner) => _inner = inner;
private readonly INatsSub<T> _sub;

public override bool CanCount => _inner.CanCount;
public ActivityEndingMsgReader(ChannelReader<NatsMsg<T>> inner, INatsSub<T> sub)
{
_inner = inner;
_sub = sub;
}

public override bool CanPeek => _inner.CanPeek;
public override bool CanCount
{
get
{
GC.KeepAlive(_sub);
return _inner.CanCount;
}
}

public override int Count => _inner.Count;
public override bool CanPeek
{
get
{
GC.KeepAlive(_sub);
return _inner.CanPeek;
}
}

public override Task Completion => _inner.Completion;
public override int Count
{
get
{
GC.KeepAlive(_sub);
return _inner.Count;
}
}

public override Task Completion
{
get
{
GC.KeepAlive(_sub);
return _inner.Completion;
}
}

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand All @@ -26,16 +71,61 @@ public override bool TryRead(out NatsMsg<T> item)

item.Headers?.Activity?.Dispose();

GC.KeepAlive(_sub);
return true;
}

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public override ValueTask<bool> WaitToReadAsync(CancellationToken cancellationToken = default) => _inner.WaitToReadAsync(cancellationToken);
public override async ValueTask<bool> WaitToReadAsync(CancellationToken cancellationToken = default)
{
var handle = GCHandle.Alloc(_sub);
try
{
return await _inner.WaitToReadAsync(cancellationToken).ConfigureAwait(false);
}
finally
{
handle.Free();
}
}

public override ValueTask<NatsMsg<T>> ReadAsync(CancellationToken cancellationToken = default) => _inner.ReadAsync(cancellationToken);
public override async ValueTask<NatsMsg<T>> ReadAsync(CancellationToken cancellationToken = default)
{
var handle = GCHandle.Alloc(_sub);
try
{
var msg = await _inner.ReadAsync(cancellationToken).ConfigureAwait(false);
msg.Headers?.Activity?.Dispose();
return msg;
}
finally
{
handle.Free();
}
}

public override bool TryPeek(out NatsMsg<T> item) => _inner.TryPeek(out item);
public override bool TryPeek(out NatsMsg<T> item)
{
GC.KeepAlive(_sub);
return _inner.TryPeek(out item);
}

public override IAsyncEnumerable<NatsMsg<T>> ReadAllAsync(CancellationToken cancellationToken = default) => _inner.ReadAllAsync(cancellationToken);
public override async IAsyncEnumerable<NatsMsg<T>> ReadAllAsync([EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var handle = GCHandle.Alloc(_sub);
try
{
while (await _inner.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
{
while (_inner.TryRead(out var msg))
{
msg.Headers?.Activity?.Dispose();
yield return msg;
}
}
}
finally
{
handle.Free();
}
}
}
10 changes: 3 additions & 7 deletions src/NATS.Client.Core/NatsConnection.Subscribe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,15 @@ public async IAsyncEnumerable<NatsMsg<T>> SubscribeAsync<T>(string subject, stri
{
serializer ??= Opts.SerializerRegistry.GetDeserializer<T>();

// call to RegisterSubAnchor is no longer needed; sub is kept alive in ActivityEndingMsgReader
await using var sub = new NatsSub<T>(this, SubscriptionManager.GetManagerFor(subject), subject, queueGroup, opts, serializer, cancellationToken);
using var anchor = RegisterSubAnchor(sub);

await SubAsync(sub, cancellationToken: cancellationToken).ConfigureAwait(false);

// We don't cancel the channel reader here because we want to keep reading until the subscription
// channel writer completes so that messages left in the channel can be consumed before exit the loop.
while (await sub.Msgs.WaitToReadAsync(CancellationToken.None).ConfigureAwait(false))
await foreach (var msg in sub.Msgs.ReadAllAsync(CancellationToken.None).ConfigureAwait(false))
{
while (sub.Msgs.TryRead(out var msg))
{
yield return msg;
}
yield return msg;
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client.Core/NatsSub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ internal NatsSub(
connection.GetChannelOpts(connection.Opts, opts?.ChannelOpts),
msg => Connection.OnMessageDropped(this, _msgs?.Reader.Count ?? 0, msg));

Msgs = new ActivityEndingMsgReader<T>(_msgs.Reader);
Msgs = new ActivityEndingMsgReader<T>(_msgs.Reader, this);

Serializer = serializer;
}
Expand Down
Loading

0 comments on commit 2ff424c

Please sign in to comment.