Skip to content

Commit

Permalink
Add JetStream NATS Client Extensions (#598)
Browse files Browse the repository at this point in the history
* Expose subscription manager

* Separate subscription manager internal usage

* Use INatsConnection in JS

* Revert subscription manager internal

* Tidy up connection interface

* Expose base sub for extension

* Connection docs

* JetStream client extensions

* dotnet format

* Format

* XMLDocs

* Expose connection
  • Loading branch information
mtmk authored Sep 18, 2024
1 parent 5fb5a92 commit 442db86
Show file tree
Hide file tree
Showing 29 changed files with 316 additions and 98 deletions.
8 changes: 7 additions & 1 deletion sandbox/Example.Client/Program.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// See https://aka.ms/new-console-template for more information

using System.Text;
using NATS.Client.JetStream;
using NATS.Net;

CancellationTokenSource cts = new();
Expand Down Expand Up @@ -88,7 +89,12 @@
}

// Use JetStream by referencing NATS.Client.JetStream package
// var js = client.GetJetStream();
var js = client.CreateJetStreamContext();
await foreach (var stream in js.ListStreamsAsync())
{
Console.WriteLine($"JetStream Stream: {stream.Info.Config.Name}");
}

await cts.CancelAsync();

await Task.WhenAll(tasks);
Expand Down
90 changes: 90 additions & 0 deletions src/NATS.Client.Core/INatsConnection.cs
Original file line number Diff line number Diff line change
@@ -1,23 +1,56 @@
using System.Diagnostics.CodeAnalysis;
using System.Threading.Channels;

namespace NATS.Client.Core;

public interface INatsConnection : INatsClient
{
/// <summary>
/// Event that is raised when the connection to the NATS server is disconnected.
/// </summary>
event AsyncEventHandler<NatsEventArgs>? ConnectionDisconnected;

/// <summary>
/// Event that is raised when the connection to the NATS server is opened.
/// </summary>
event AsyncEventHandler<NatsEventArgs>? ConnectionOpened;

/// <summary>
/// Event that is raised when a reconnect attempt is failed.
/// </summary>
event AsyncEventHandler<NatsEventArgs>? ReconnectFailed;

/// <summary>
/// Event that is raised when a message is dropped for a subscription.
/// </summary>
event AsyncEventHandler<NatsMessageDroppedEventArgs>? MessageDropped;

/// <summary>
/// Server information received from the NATS server.
/// </summary>
INatsServerInfo? ServerInfo { get; }

/// <summary>
/// Options used to configure the NATS connection.
/// </summary>
NatsOpts Opts { get; }

/// <summary>
/// Connection state of the NATS connection.
/// </summary>
NatsConnectionState ConnectionState { get; }

/// <summary>
/// Subscription manager used to manage subscriptions for the NATS connection.
/// </summary>
INatsSubscriptionManager SubscriptionManager { get; }

/// <summary>
/// Singleton instance of the NATS header parser used to parse message headers
/// used by the NATS connection.
/// </summary>
NatsHeaderParser HeaderParser { get; }

/// <summary>
/// Publishes a serializable message payload to the given subject name, optionally supplying a reply subject.
/// </summary>
Expand Down Expand Up @@ -87,4 +120,61 @@ IAsyncEnumerable<NatsMsg<TReply>> RequestManyAsync<TRequest, TReply>(
NatsPubOpts? requestOpts = default,
NatsSubOpts? replyOpts = default,
CancellationToken cancellationToken = default);

/// <summary>
/// Adds a subscription to the NATS connection for a given <see cref="NatsSubBase"/> object.
/// Subscriptions are managed by the connection and are automatically removed when the connection is closed.
/// </summary>
/// <param name="sub">The <see cref="NatsSubBase"/> object representing the subscription details.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the operation.</param>
/// <returns>A <see cref="ValueTask"/> that represents the asynchronous subscription operation.</returns>
ValueTask AddSubAsync(NatsSubBase sub, CancellationToken cancellationToken = default);

/// <summary>
/// Creates a subscription with appropriate request and reply subjects publishing the request.
/// It's the caller's responsibility to retrieve the reply messages and complete the subscription.
/// </summary>
/// <typeparam name="TRequest">The type of the request data.</typeparam>
/// <typeparam name="TReply">The type of the expected reply.</typeparam>
/// <param name="subject">The subject to subscribe to.</param>
/// <param name="data">The optional request data.</param>
/// <param name="headers">The optional headers to include with the request.</param>
/// <param name="requestSerializer">The optional serializer for the request data.</param>
/// <param name="replySerializer">The optional deserializer for the reply data.</param>
/// <param name="requestOpts">The optional publishing options for the request.</param>
/// <param name="replyOpts">The optional subscription options for the reply.</param>
/// <param name="cancellationToken">The optional cancellation token.</param>
/// <returns>A <see cref="ValueTask{T}"/> representing the asynchronous operation of creating the request subscription.</returns>
ValueTask<NatsSub<TReply>> CreateRequestSubAsync<TRequest, TReply>(
string subject,
TRequest? data,
NatsHeaders? headers = default,
INatsSerialize<TRequest>? requestSerializer = default,
INatsDeserialize<TReply>? replySerializer = default,
NatsPubOpts? requestOpts = default,
NatsSubOpts? replyOpts = default,
CancellationToken cancellationToken = default);

/// <summary>
/// Retrieves the bounded channel options for creating a channel used by a subscription.
/// Options are built from the connection's configuration and the subscription channel options.
/// Used to aid in custom message handling when building a subscription channel.
/// </summary>
/// <param name="subChannelOpts">The options for configuring the subscription channel.</param>
/// <returns>The bounded channel options used for creating the subscription channel.</returns>
BoundedChannelOptions GetBoundedChannelOpts(NatsSubChannelOpts? subChannelOpts);

/// <summary>
/// Called when a message is dropped for a subscription.
/// Used to aid in custom message handling when a subscription's message channel is full.
/// </summary>
/// <param name="natsSub">The <see cref="NatsSubBase"/> representing the subscription.</param>
/// <param name="pending">The number of pending messages at the time the drop occurred.</param>
/// <param name="msg">The dropped message represented by <see cref="NatsMsg{T}"/>.</param>
/// <typeparam name="T">Specifies the type of data in the dropped message.</typeparam>
/// <remarks>
/// This method is expected to complete quickly to avoid further delays in processing;
/// if complex work is required, it is recommended to offload to a channel or other out-of-band processor.
/// </remarks>
void OnMessageDropped<T>(NatsSubBase natsSub, int pending, NatsMsg<T> msg);
}
19 changes: 19 additions & 0 deletions src/NATS.Client.Core/INatsSubscriptionManager.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
namespace NATS.Client.Core;

/// <summary>
/// Subscription manager interface.
/// </summary>
/// <remarks>
/// This interface is used to manage subscriptions. However, it is not intended to be used directly.
/// You can implement this interface if you are using low-level APIs and implement your own
/// subscription manager.
/// </remarks>
public interface INatsSubscriptionManager
{
/// <summary>
/// Remove a subscription.
/// </summary>
/// <param name="sub">Subscription to remove.</param>
/// <returns>A value task that represents the asynchronous remove operation.</returns>
public ValueTask RemoveAsync(NatsSubBase sub);
}
6 changes: 3 additions & 3 deletions src/NATS.Client.Core/Internal/InboxSub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public InboxSub(
string subject,
NatsSubOpts? opts,
NatsConnection connection,
ISubscriptionManager manager)
INatsSubscriptionManager manager)
: base(connection, manager, subject, queueGroup: default, opts)
{
_inbox = inbox;
Expand All @@ -35,7 +35,7 @@ protected override void TryComplete()
}
}

internal class InboxSubBuilder : ISubscriptionManager
internal class InboxSubBuilder : INatsSubscriptionManager
{
private readonly ILogger<InboxSubBuilder> _logger;
#if NETSTANDARD2_0
Expand All @@ -46,7 +46,7 @@ internal class InboxSubBuilder : ISubscriptionManager

public InboxSubBuilder(ILogger<InboxSubBuilder> logger) => _logger = logger;

public InboxSub Build(string subject, NatsSubOpts? opts, NatsConnection connection, ISubscriptionManager manager)
public InboxSub Build(string subject, NatsSubOpts? opts, NatsConnection connection, INatsSubscriptionManager manager)
{
return new InboxSub(this, subject, opts, connection, manager);
}
Expand Down
11 changes: 3 additions & 8 deletions src/NATS.Client.Core/Internal/SubscriptionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,11 @@

namespace NATS.Client.Core.Internal;

internal interface ISubscriptionManager
{
public ValueTask RemoveAsync(NatsSubBase sub);
}

internal record struct SidMetadata(string Subject, WeakReference<NatsSubBase> WeakReference);

internal sealed record SubscriptionMetadata(int Sid);

internal sealed class SubscriptionManager : ISubscriptionManager, IAsyncDisposable
internal sealed class SubscriptionManager : INatsSubscriptionManager, IAsyncDisposable
{
private readonly ILogger<SubscriptionManager> _logger;
private readonly bool _trace;
Expand Down Expand Up @@ -192,7 +187,7 @@ public ValueTask RemoveAsync(NatsSubBase sub)
/// Commands returned form all the subscriptions will be run as a priority right after reconnection is established.
/// </remarks>
/// <returns>Enumerable list of commands</returns>
public async ValueTask WriteReconnectCommandsAsync(CommandWriter commandWriter)
internal async ValueTask WriteReconnectCommandsAsync(CommandWriter commandWriter)
{
if (_debug)
{
Expand Down Expand Up @@ -226,7 +221,7 @@ public async ValueTask WriteReconnectCommandsAsync(CommandWriter commandWriter)
}
}

public ISubscriptionManager GetManagerFor(string subject)
internal INatsSubscriptionManager GetManagerFor(string subject)
{
if (IsInboxSubject(subject))
return InboxSubBuilder;
Expand Down
7 changes: 4 additions & 3 deletions src/NATS.Client.Core/NatsConnection.LowLevelApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ namespace NATS.Client.Core;

public partial class NatsConnection
{
internal ValueTask SubAsync(NatsSubBase sub, CancellationToken cancellationToken = default) =>
/// <inheritdoc />
public ValueTask AddSubAsync(NatsSubBase sub, CancellationToken cancellationToken = default) =>
ConnectionState != NatsConnectionState.Open
? ConnectAndSubAsync(sub, cancellationToken)
: SubscriptionManager.SubscribeAsync(sub, cancellationToken);
: _subscriptionManager.SubscribeAsync(sub, cancellationToken);

private async ValueTask ConnectAndSubAsync(NatsSubBase sub, CancellationToken cancellationToken = default)
{
await ConnectAsync().AsTask().WaitAsync(cancellationToken).ConfigureAwait(false);
await SubscriptionManager.SubscribeAsync(sub, cancellationToken).ConfigureAwait(false);
await _subscriptionManager.SubscribeAsync(sub, cancellationToken).ConfigureAwait(false);
}
}
6 changes: 3 additions & 3 deletions src/NATS.Client.Core/NatsConnection.RequestReply.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public async ValueTask<NatsMsg<TReply>> RequestAsync<TRequest, TReply>(
try
{
replyOpts = SetReplyOptsDefaults(replyOpts);
await using var sub1 = await RequestSubAsync<TRequest, TReply>(subject, data, headers, requestSerializer, replySerializer, requestOpts, replyOpts, cancellationToken)
await using var sub1 = await CreateRequestSubAsync<TRequest, TReply>(subject, data, headers, requestSerializer, replySerializer, requestOpts, replyOpts, cancellationToken)
.ConfigureAwait(false);

await foreach (var msg in sub1.Msgs.ReadAllAsync(cancellationToken).ConfigureAwait(false))
Expand All @@ -56,7 +56,7 @@ public async ValueTask<NatsMsg<TReply>> RequestAsync<TRequest, TReply>(
}

replyOpts = SetReplyOptsDefaults(replyOpts);
await using var sub = await RequestSubAsync<TRequest, TReply>(subject, data, headers, requestSerializer, replySerializer, requestOpts, replyOpts, cancellationToken)
await using var sub = await CreateRequestSubAsync<TRequest, TReply>(subject, data, headers, requestSerializer, replySerializer, requestOpts, replyOpts, cancellationToken)
.ConfigureAwait(false);

await foreach (var msg in sub.Msgs.ReadAllAsync(cancellationToken).ConfigureAwait(false))
Expand Down Expand Up @@ -95,7 +95,7 @@ public async IAsyncEnumerable<NatsMsg<TReply>> RequestManyAsync<TRequest, TReply
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
replyOpts = SetReplyManyOptsDefaults(replyOpts);
await using var sub = await RequestSubAsync<TRequest, TReply>(subject, data, headers, requestSerializer, replySerializer, requestOpts, replyOpts, cancellationToken)
await using var sub = await CreateRequestSubAsync<TRequest, TReply>(subject, data, headers, requestSerializer, replySerializer, requestOpts, replyOpts, cancellationToken)
.ConfigureAwait(false);

await foreach (var msg in sub.Msgs.ReadAllAsync(cancellationToken).ConfigureAwait(false))
Expand Down
7 changes: 4 additions & 3 deletions src/NATS.Client.Core/NatsConnection.RequestSub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ namespace NATS.Client.Core;

public partial class NatsConnection
{
internal async ValueTask<NatsSub<TReply>> RequestSubAsync<TRequest, TReply>(
/// <inheritdoc />
public async ValueTask<NatsSub<TReply>> CreateRequestSubAsync<TRequest, TReply>(
string subject,
TRequest? data,
NatsHeaders? headers = default,
Expand All @@ -15,8 +16,8 @@ internal async ValueTask<NatsSub<TReply>> RequestSubAsync<TRequest, TReply>(
var replyTo = NewInbox();

replySerializer ??= Opts.SerializerRegistry.GetDeserializer<TReply>();
var sub = new NatsSub<TReply>(this, SubscriptionManager.InboxSubBuilder, replyTo, queueGroup: default, replyOpts, replySerializer);
await SubAsync(sub, cancellationToken).ConfigureAwait(false);
var sub = new NatsSub<TReply>(this, _subscriptionManager.InboxSubBuilder, replyTo, queueGroup: default, replyOpts, replySerializer);
await AddSubAsync(sub, cancellationToken).ConfigureAwait(false);

requestSerializer ??= Opts.SerializerRegistry.GetSerializer<TRequest>();
await PublishAsync(subject, data, headers, replyTo, requestSerializer, requestOpts, cancellationToken).ConfigureAwait(false);
Expand Down
8 changes: 4 additions & 4 deletions src/NATS.Client.Core/NatsConnection.Subscribe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ public async IAsyncEnumerable<NatsMsg<T>> SubscribeAsync<T>(string subject, stri
{
serializer ??= Opts.SerializerRegistry.GetDeserializer<T>();

await using var sub = new NatsSub<T>(this, SubscriptionManager.GetManagerFor(subject), subject, queueGroup, opts, serializer, cancellationToken);
await SubAsync(sub, cancellationToken: cancellationToken).ConfigureAwait(false);
await using var sub = new NatsSub<T>(this, _subscriptionManager.GetManagerFor(subject), subject, queueGroup, opts, serializer, cancellationToken);
await AddSubAsync(sub, cancellationToken: cancellationToken).ConfigureAwait(false);

// We don't cancel the channel reader here because we want to keep reading until the subscription
// channel writer completes so that messages left in the channel can be consumed before exit the loop.
Expand All @@ -25,8 +25,8 @@ public async IAsyncEnumerable<NatsMsg<T>> SubscribeAsync<T>(string subject, stri
public async ValueTask<INatsSub<T>> SubscribeCoreAsync<T>(string subject, string? queueGroup = default, INatsDeserialize<T>? serializer = default, NatsSubOpts? opts = default, CancellationToken cancellationToken = default)
{
serializer ??= Opts.SerializerRegistry.GetDeserializer<T>();
var sub = new NatsSub<T>(this, SubscriptionManager.GetManagerFor(subject), subject, queueGroup, opts, serializer, cancellationToken);
await SubAsync(sub, cancellationToken).ConfigureAwait(false);
var sub = new NatsSub<T>(this, _subscriptionManager.GetManagerFor(subject), subject, queueGroup, opts, serializer, cancellationToken);
await AddSubAsync(sub, cancellationToken).ConfigureAwait(false);
return sub;
}
}
Loading

0 comments on commit 442db86

Please sign in to comment.