diff --git a/sandbox/Example.Client/Program.cs b/sandbox/Example.Client/Program.cs index 4bac3d56a..c72d89ba8 100644 --- a/sandbox/Example.Client/Program.cs +++ b/sandbox/Example.Client/Program.cs @@ -2,6 +2,9 @@ using System.Text; using NATS.Client.JetStream; +using NATS.Client.KeyValueStore; +using NATS.Client.ObjectStore; +using NATS.Client.Services; using NATS.Net; CancellationTokenSource cts = new(); @@ -95,6 +98,22 @@ Console.WriteLine($"JetStream Stream: {stream.Info.Config.Name}"); } +// Use KeyValueStore by referencing NATS.Client.KeyValueStore package +var kv1 = client.CreateKeyValueStoreContext(); +var kv2 = js.CreateKeyValueStoreContext(); +await kv1.CreateStoreAsync("store1"); +await kv2.CreateStoreAsync("store1"); + +// Use ObjectStore by referencing NATS.Client.ObjectStore package +var obj1 = client.CreateObjectStoreContext(); +var obj2 = js.CreateObjectStoreContext(); +await obj1.CreateObjectStoreAsync("store1"); +await obj2.CreateObjectStoreAsync("store1"); + +// Use Services by referencing NATS.Client.Services package +var svc = client.CreateServicesContext(); +await svc.AddServiceAsync("service1", "1.0.0"); + await cts.CancelAsync(); await Task.WhenAll(tasks); diff --git a/src/NATS.Client.JetStream/INatsJSContext.cs b/src/NATS.Client.JetStream/INatsJSContext.cs index 2c51a20cb..a23eff2c8 100644 --- a/src/NATS.Client.JetStream/INatsJSContext.cs +++ b/src/NATS.Client.JetStream/INatsJSContext.cs @@ -11,6 +11,11 @@ public interface INatsJSContext /// INatsConnection Connection { get; } + /// + /// Provides configuration options for the JetStream context. + /// + NatsJSOpts Opts { get; } + /// /// Creates new ordered consumer. /// @@ -296,4 +301,26 @@ ValueTask PublishConcurrentAsync( NatsJSPubOpts? opts = default, NatsHeaders? headers = default, CancellationToken cancellationToken = default); + + /// + /// Generates a new base inbox string using the connection's inbox prefix. + /// + /// A new inbox string. + string NewBaseInbox(); + + /// + /// Sends a request message to a JetStream subject and waits for a response. + /// + /// The JetStream API subject to send the request to. + /// The request message object. + /// A used to cancel the API call. + /// The type of the request message. + /// The type of the response message. + /// A task representing the asynchronous operation, with a result of type . + ValueTask JSRequestResponseAsync( + string subject, + TRequest? request, + CancellationToken cancellationToken = default) + where TRequest : class + where TResponse : class; } diff --git a/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs b/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs index 03e8e51bf..d10b1ac09 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs @@ -45,7 +45,7 @@ internal class NatsJSOrderedPushConsumer { private readonly ILogger _logger; private readonly bool _debug; - private readonly NatsJSContext _context; + private readonly INatsJSContext _context; private readonly string _stream; private readonly string _filter; private readonly INatsDeserialize _serializer; @@ -68,7 +68,7 @@ internal class NatsJSOrderedPushConsumer private int _done; public NatsJSOrderedPushConsumer( - NatsJSContext context, + INatsJSContext context, string stream, string filter, INatsDeserialize serializer, @@ -417,7 +417,7 @@ private void CreateSub(string origin) internal class NatsJSOrderedPushConsumerSub : NatsSubBase { - private readonly NatsJSContext _context; + private readonly INatsJSContext _context; private readonly CancellationToken _cancellationToken; private readonly INatsConnection _nats; private readonly NatsHeaderParser _headerParser; @@ -425,7 +425,7 @@ internal class NatsJSOrderedPushConsumerSub : NatsSubBase private readonly ChannelWriter> _commands; public NatsJSOrderedPushConsumerSub( - NatsJSContext context, + INatsJSContext context, Channel> commandChannel, INatsDeserialize serializer, NatsSubOpts? opts, @@ -433,7 +433,7 @@ public NatsJSOrderedPushConsumerSub( : base( connection: context.Connection, manager: context.Connection.SubscriptionManager, - subject: context.NewInbox(), + subject: context.NewBaseInbox(), queueGroup: default, opts) { diff --git a/src/NATS.Client.JetStream/NatsClientExtensions.cs b/src/NATS.Client.JetStream/NatsClientExtensions.cs index 31e4e9809..3c8c0f75b 100644 --- a/src/NATS.Client.JetStream/NatsClientExtensions.cs +++ b/src/NATS.Client.JetStream/NatsClientExtensions.cs @@ -1,12 +1,24 @@ using NATS.Client.Core; +using NATS.Client.JetStream; -namespace NATS.Client.JetStream; +// ReSharper disable once CheckNamespace +namespace NATS.Net; public static class NatsClientExtensions { + /// + /// Creates a JetStream context using the provided NATS client. + /// + /// The NATS client used to create the JetStream context. + /// Returns an instance of for interacting with JetStream. public static INatsJSContext CreateJetStreamContext(this INatsClient client) => CreateJetStreamContext(client.Connection); + /// + /// Creates a JetStream context using the provided NATS connection. + /// + /// The NATS connection used to create the JetStream context. + /// Returns an instance of for interacting with JetStream. public static INatsJSContext CreateJetStreamContext(this INatsConnection connection) => new NatsJSContext(connection); } diff --git a/src/NATS.Client.JetStream/NatsJSConsumer.cs b/src/NATS.Client.JetStream/NatsJSConsumer.cs index 9f21017ec..6d8fd736f 100644 --- a/src/NATS.Client.JetStream/NatsJSConsumer.cs +++ b/src/NATS.Client.JetStream/NatsJSConsumer.cs @@ -284,7 +284,7 @@ internal async ValueTask> ConsumeInternalAsync(INatsDeserial opts ??= new NatsJSConsumeOpts(); serializer ??= _context.Connection.Opts.SerializerRegistry.GetDeserializer(); - var inbox = _context.NewInbox(); + var inbox = _context.NewBaseInbox(); var max = NatsJSOptsDefaults.SetMax(opts.MaxMsgs, opts.MaxBytes, opts.ThresholdMsgs, opts.ThresholdBytes); var timeouts = NatsJSOptsDefaults.SetTimeouts(opts.Expires, opts.IdleHeartbeat); @@ -332,7 +332,7 @@ internal async ValueTask> OrderedConsumeInternalAsync ThrowIfDeleted(); serializer ??= _context.Connection.Opts.SerializerRegistry.GetDeserializer(); - var inbox = _context.NewInbox(); + var inbox = _context.NewBaseInbox(); var max = NatsJSOptsDefaults.SetMax(opts.MaxMsgs, opts.MaxBytes, opts.ThresholdMsgs, opts.ThresholdBytes); var timeouts = NatsJSOptsDefaults.SetTimeouts(opts.Expires, opts.IdleHeartbeat); @@ -382,7 +382,7 @@ internal async ValueTask> FetchInternalAsync( ThrowIfDeleted(); serializer ??= _context.Connection.Opts.SerializerRegistry.GetDeserializer(); - var inbox = _context.NewInbox(); + var inbox = _context.NewBaseInbox(); var max = NatsJSOptsDefaults.SetMax(opts.MaxMsgs, opts.MaxBytes); var timeouts = NatsJSOptsDefaults.SetTimeouts(opts.Expires, opts.IdleHeartbeat); diff --git a/src/NATS.Client.JetStream/NatsJSContext.cs b/src/NATS.Client.JetStream/NatsJSContext.cs index c0c6f9094..8076dccec 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.cs @@ -32,7 +32,8 @@ public NatsJSContext(INatsConnection connection, NatsJSOpts opts) public INatsConnection Connection { get; } - internal NatsJSOpts Opts { get; } + /// + public NatsJSOpts Opts { get; } /// /// Calls JetStream Account Info API. @@ -238,6 +239,22 @@ public async ValueTask PublishConcurrentAsync( return new NatsJSPublishConcurrentFuture(sub); } + /// + public string NewBaseInbox() => NatsConnection.NewInbox(Connection.Opts.InboxPrefix); + + /// + public async ValueTask JSRequestResponseAsync( + string subject, + TRequest? request, + CancellationToken cancellationToken = default) + where TRequest : class + where TResponse : class + { + var response = await JSRequestAsync(subject, request, cancellationToken); + response.EnsureSuccess(); + return response.Response!; + } + internal static void ThrowIfInvalidStreamName([NotNull] string? name, [CallerArgumentExpression("name")] string? paramName = null) { #if NETSTANDARD @@ -262,20 +279,6 @@ internal static void ThrowIfInvalidStreamName([NotNull] string? name, [CallerArg } } - internal string NewInbox() => NatsConnection.NewInbox(Connection.Opts.InboxPrefix); - - internal async ValueTask JSRequestResponseAsync( - string subject, - TRequest? request, - CancellationToken cancellationToken = default) - where TRequest : class - where TResponse : class - { - var response = await JSRequestAsync(subject, request, cancellationToken); - response.EnsureSuccess(); - return response.Response!; - } - internal async ValueTask> JSRequestAsync( string subject, TRequest? request, diff --git a/src/NATS.Client.JetStream/NatsJSMsg.cs b/src/NATS.Client.JetStream/NatsJSMsg.cs index e5a0bd151..cc6830054 100644 --- a/src/NATS.Client.JetStream/NatsJSMsg.cs +++ b/src/NATS.Client.JetStream/NatsJSMsg.cs @@ -140,11 +140,11 @@ public interface INatsJSMsg /// User message type public readonly struct NatsJSMsg : INatsJSMsg { - private readonly NatsJSContext _context; + private readonly INatsJSContext _context; private readonly NatsMsg _msg; private readonly Lazy _replyToDateTimeAndSeq; - public NatsJSMsg(NatsMsg msg, NatsJSContext context) + public NatsJSMsg(NatsMsg msg, INatsJSContext context) { _msg = msg; _context = context; diff --git a/src/NATS.Client.KeyValueStore/INatsKVContext.cs b/src/NATS.Client.KeyValueStore/INatsKVContext.cs index 2c00bf951..cdf371037 100644 --- a/src/NATS.Client.KeyValueStore/INatsKVContext.cs +++ b/src/NATS.Client.KeyValueStore/INatsKVContext.cs @@ -4,6 +4,11 @@ namespace NATS.Client.KeyValueStore; public interface INatsKVContext { + /// + /// Provides access to the JetStream context associated with the Key-Value Store operations. + /// + INatsJSContext JetStreamContext { get; } + /// /// Create a new Key Value Store or get an existing one /// diff --git a/src/NATS.Client.KeyValueStore/INatsKVStore.cs b/src/NATS.Client.KeyValueStore/INatsKVStore.cs index a1ed8ce37..1c99a0a3c 100644 --- a/src/NATS.Client.KeyValueStore/INatsKVStore.cs +++ b/src/NATS.Client.KeyValueStore/INatsKVStore.cs @@ -1,9 +1,15 @@ using NATS.Client.Core; +using NATS.Client.JetStream; namespace NATS.Client.KeyValueStore; public interface INatsKVStore { + /// + /// Provides access to the JetStream context associated with the Object Store operations. + /// + INatsJSContext JetStreamContext { get; } + /// /// Name of the Key Value Store bucket /// diff --git a/src/NATS.Client.KeyValueStore/Internal/NatsKVWatchSub.cs b/src/NATS.Client.KeyValueStore/Internal/NatsKVWatchSub.cs index e2f7f1d3b..7b7f59335 100644 --- a/src/NATS.Client.KeyValueStore/Internal/NatsKVWatchSub.cs +++ b/src/NATS.Client.KeyValueStore/Internal/NatsKVWatchSub.cs @@ -7,7 +7,7 @@ namespace NATS.Client.KeyValueStore.Internal; internal class NatsKVWatchSub : NatsSubBase { - private readonly NatsJSContext _context; + private readonly INatsJSContext _context; private readonly CancellationToken _cancellationToken; private readonly INatsConnection _nats; private readonly NatsHeaderParser _headerParser; @@ -15,7 +15,7 @@ internal class NatsKVWatchSub : NatsSubBase private readonly ChannelWriter> _commands; public NatsKVWatchSub( - NatsJSContext context, + INatsJSContext context, Channel> commandChannel, INatsDeserialize serializer, NatsSubOpts? opts, @@ -23,7 +23,7 @@ public NatsKVWatchSub( : base( connection: context.Connection, manager: context.Connection.SubscriptionManager, - subject: context.NewInbox(), + subject: context.NewBaseInbox(), queueGroup: default, opts) { diff --git a/src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs b/src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs index 30f54dd00..ad2812eeb 100644 --- a/src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs +++ b/src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs @@ -28,7 +28,7 @@ internal sealed class NatsKVWatcher : IAsyncDisposable { private readonly ILogger _logger; private readonly bool _debug; - private readonly NatsJSContext _context; + private readonly INatsJSContext _context; private readonly string _bucket; private readonly INatsDeserialize _serializer; private readonly NatsKVWatchOpts _opts; @@ -53,7 +53,7 @@ internal sealed class NatsKVWatcher : IAsyncDisposable private INatsJSConsumer? _initialConsumer; public NatsKVWatcher( - NatsJSContext context, + INatsJSContext context, string bucket, IEnumerable keys, INatsDeserialize serializer, diff --git a/src/NATS.Client.KeyValueStore/NatsClientExtensions.cs b/src/NATS.Client.KeyValueStore/NatsClientExtensions.cs new file mode 100644 index 000000000..3c616065a --- /dev/null +++ b/src/NATS.Client.KeyValueStore/NatsClientExtensions.cs @@ -0,0 +1,33 @@ +using NATS.Client.Core; +using NATS.Client.JetStream; +using NATS.Client.KeyValueStore; + +// ReSharper disable once CheckNamespace +namespace NATS.Net; + +public static class NatsClientExtensions +{ + /// + /// Creates a NATS Key-Value Store context using the specified NATS client. + /// + /// The NATS client instance. + /// An instance of which can be used to interact with the Key-Value Store. + public static INatsKVContext CreateKeyValueStoreContext(this INatsClient client) + => CreateKeyValueStoreContext(client.CreateJetStreamContext()); + + /// + /// Creates a NATS Key-Value Store context using the specified NATS connection. + /// + /// The NATS connection instance. + /// An instance of which can be used to interact with the Key-Value Store. + public static INatsKVContext CreateKeyValueStoreContext(this INatsConnection connection) + => CreateKeyValueStoreContext(connection.CreateJetStreamContext()); + + /// + /// Creates a NATS Key-Value Store context using the specified NATS JetStream context. + /// + /// The NATS JetStream context instance. + /// An instance of which can be used to interact with the Key-Value Store. + public static INatsKVContext CreateKeyValueStoreContext(this INatsJSContext context) + => new NatsKVContext(context); +} diff --git a/src/NATS.Client.KeyValueStore/NatsKVContext.cs b/src/NATS.Client.KeyValueStore/NatsKVContext.cs index cd0b1871c..501a261aa 100644 --- a/src/NATS.Client.KeyValueStore/NatsKVContext.cs +++ b/src/NATS.Client.KeyValueStore/NatsKVContext.cs @@ -23,59 +23,37 @@ public class NatsKVContext : INatsKVContext private static readonly int KvStreamNamePrefixLen = KvStreamNamePrefix.Length; private static readonly Regex ValidBucketRegex = new(pattern: @"\A[a-zA-Z0-9_-]+\z", RegexOptions.Compiled); - private readonly NatsJSContext _context; - /// /// Create a new Key Value Store context /// /// JetStream context - public NatsKVContext(NatsJSContext context) => _context = context; + public NatsKVContext(INatsJSContext context) => JetStreamContext = context; - /// - /// Create a new Key Value Store or get an existing one - /// - /// Name of the bucket - /// A used to cancel the API call. - /// Key Value Store - /// There was an issue retrieving the response. - /// Server responded with an error. + /// + public INatsJSContext JetStreamContext { get; } + + /// public ValueTask CreateStoreAsync(string bucket, CancellationToken cancellationToken = default) => CreateStoreAsync(new NatsKVConfig(bucket), cancellationToken); - /// - /// Create a new Key Value Store or get an existing one - /// - /// Key Value Store configuration - /// A used to cancel the API call. - /// Key Value Store - /// There was an issue with configuration - /// There was an issue retrieving the response. - /// Server responded with an error. + /// public async ValueTask CreateStoreAsync(NatsKVConfig config, CancellationToken cancellationToken = default) { ValidateBucketName(config.Bucket); var streamConfig = NatsKVContext.CreateStreamConfig(config); - var stream = await _context.CreateStreamAsync(streamConfig, cancellationToken); + var stream = await JetStreamContext.CreateStreamAsync(streamConfig, cancellationToken); - return new NatsKVStore(config.Bucket, _context, stream); + return new NatsKVStore(config.Bucket, JetStreamContext, stream); } - /// - /// Get a Key Value Store - /// - /// Name of the bucjet - /// A used to cancel the API call. - /// Key Value Store - /// There was an issue with configuration - /// There was an issue retrieving the response. - /// Server responded with an error. + /// public async ValueTask GetStoreAsync(string bucket, CancellationToken cancellationToken = default) { ValidateBucketName(bucket); - var stream = await _context.GetStreamAsync(BucketToStream(bucket), cancellationToken: cancellationToken); + var stream = await JetStreamContext.GetStreamAsync(BucketToStream(bucket), cancellationToken: cancellationToken); if (stream.Info.Config.MaxMsgsPerSubject < 1) { @@ -83,53 +61,32 @@ public async ValueTask GetStoreAsync(string bucket, CancellationTo } // TODO: KV mirror - return new NatsKVStore(bucket, _context, stream); + return new NatsKVStore(bucket, JetStreamContext, stream); } - /// - /// Update a key value store configuration. Storage type cannot change. - /// - /// Key Value Store configuration - /// used to cancel the API call. - /// Key Value Store - /// There was an issue with configuration - /// There was an issue retrieving the response. - /// Server responded with an error. + /// public async ValueTask UpdateStoreAsync(NatsKVConfig config, CancellationToken cancellationToken = default) { ValidateBucketName(config.Bucket); var streamConfig = NatsKVContext.CreateStreamConfig(config); - var stream = await _context.UpdateStreamAsync(streamConfig, cancellationToken); + var stream = await JetStreamContext.UpdateStreamAsync(streamConfig, cancellationToken); - return new NatsKVStore(config.Bucket, _context, stream); + return new NatsKVStore(config.Bucket, JetStreamContext, stream); } - /// - /// Delete a Key Value Store - /// - /// Name of the bucket - /// A used to cancel the API call. - /// True for success - /// There was an issue retrieving the response. - /// Server responded with an error. + /// public ValueTask DeleteStoreAsync(string bucket, CancellationToken cancellationToken = default) { ValidateBucketName(bucket); - return _context.DeleteStreamAsync(BucketToStream(bucket), cancellationToken); + return JetStreamContext.DeleteStreamAsync(BucketToStream(bucket), cancellationToken); } - /// - /// Get a list of bucket names - /// - /// used to cancel the API call. - /// Async enumerable of bucket names. Can be used in a await foreach loop. - /// There was an issue retrieving the response. - /// Server responded with an error. + /// public async IAsyncEnumerable GetBucketNamesAsync([EnumeratorCancellation] CancellationToken cancellationToken = default) { - await foreach (var name in _context.ListStreamNamesAsync(cancellationToken: cancellationToken)) + await foreach (var name in JetStreamContext.ListStreamNamesAsync(cancellationToken: cancellationToken)) { if (!name.StartsWith(KvStreamNamePrefix)) { @@ -140,18 +97,12 @@ public async IAsyncEnumerable GetBucketNamesAsync([EnumeratorCancellatio } } - /// - /// Gets the status for all buckets - /// - /// used to cancel the API call. - /// Async enumerable of Key/Value statuses. Can be used in a await foreach loop. - /// There was an issue retrieving the response. - /// Server responded with an error. + /// public async IAsyncEnumerable GetStatusesAsync([EnumeratorCancellation] CancellationToken cancellationToken = default) { - await foreach (var name in _context.ListStreamNamesAsync(cancellationToken: cancellationToken)) + await foreach (var name in JetStreamContext.ListStreamNamesAsync(cancellationToken: cancellationToken)) { - var stream = await _context.GetStreamAsync(name, cancellationToken: cancellationToken); + var stream = await JetStreamContext.GetStreamAsync(name, cancellationToken: cancellationToken); var isCompressed = stream.Info.Config.Compression != StreamConfigCompression.None; yield return new NatsKVStatus(name, isCompressed, stream.Info); } diff --git a/src/NATS.Client.KeyValueStore/NatsKVStore.cs b/src/NATS.Client.KeyValueStore/NatsKVStore.cs index d42c9b6f2..cd2a32fc9 100644 --- a/src/NATS.Client.KeyValueStore/NatsKVStore.cs +++ b/src/NATS.Client.KeyValueStore/NatsKVStore.cs @@ -44,23 +44,26 @@ public class NatsKVStore : INatsKVStore private const string NatsSequence = "Nats-Sequence"; private const string NatsTimeStamp = "Nats-Time-Stamp"; private static readonly Regex ValidKeyRegex = new(pattern: @"\A[-/_=\.a-zA-Z0-9]+\z", RegexOptions.Compiled); - private readonly NatsJSContext _context; private readonly INatsJSStream _stream; - internal NatsKVStore(string bucket, NatsJSContext context, INatsJSStream stream) + internal NatsKVStore(string bucket, INatsJSContext context, INatsJSStream stream) { Bucket = bucket; - _context = context; + JetStreamContext = context; _stream = stream; } + /// + public INatsJSContext JetStreamContext { get; } + + /// public string Bucket { get; } /// public async ValueTask PutAsync(string key, T value, INatsSerialize? serializer = default, CancellationToken cancellationToken = default) { ValidateKey(key); - var ack = await _context.PublishAsync($"$KV.{Bucket}.{key}", value, serializer: serializer, cancellationToken: cancellationToken); + var ack = await JetStreamContext.PublishAsync($"$KV.{Bucket}.{key}", value, serializer: serializer, cancellationToken: cancellationToken); ack.EnsureSuccess(); return ack.Seq; } @@ -100,7 +103,7 @@ public async ValueTask UpdateAsync(string key, T value, ulong revision try { - var ack = await _context.PublishAsync($"$KV.{Bucket}.{key}", value, headers: headers, serializer: serializer, cancellationToken: cancellationToken); + var ack = await JetStreamContext.PublishAsync($"$KV.{Bucket}.{key}", value, headers: headers, serializer: serializer, cancellationToken: cancellationToken); ack.EnsureSuccess(); return ack.Seq; @@ -143,7 +146,7 @@ public async ValueTask DeleteAsync(string key, NatsKVDeleteOpts? opts = default, try { - var ack = await _context.PublishAsync(subject, null, headers: headers, cancellationToken: cancellationToken); + var ack = await JetStreamContext.PublishAsync(subject, null, headers: headers, cancellationToken: cancellationToken); ack.EnsureSuccess(); } catch (NatsJSApiException e) @@ -157,6 +160,7 @@ public async ValueTask DeleteAsync(string key, NatsKVDeleteOpts? opts = default, } } + /// public ValueTask PurgeAsync(string key, NatsKVDeleteOpts? opts = default, CancellationToken cancellationToken = default) => DeleteAsync(key, (opts ?? new NatsKVDeleteOpts()) with { Purge = true }, cancellationToken); @@ -164,7 +168,7 @@ public ValueTask PurgeAsync(string key, NatsKVDeleteOpts? opts = default, Cancel public async ValueTask> GetEntryAsync(string key, ulong revision = default, INatsDeserialize? serializer = default, CancellationToken cancellationToken = default) { ValidateKey(key); - serializer ??= _context.Connection.Opts.SerializerRegistry.GetDeserializer(); + serializer ??= JetStreamContext.Connection.Opts.SerializerRegistry.GetDeserializer(); var request = new StreamMsgGetRequest(); var keySubject = $"$KV.{Bucket}.{key}"; @@ -425,12 +429,12 @@ public async IAsyncEnumerable GetKeysAsync(IEnumerable filters, internal async ValueTask> WatchInternalAsync(IEnumerable keys, INatsDeserialize? serializer = default, NatsKVWatchOpts? opts = default, CancellationToken cancellationToken = default) { opts ??= NatsKVWatchOpts.Default; - serializer ??= _context.Connection.Opts.SerializerRegistry.GetDeserializer(); + serializer ??= JetStreamContext.Connection.Opts.SerializerRegistry.GetDeserializer(); opts.ThrowIfInvalid(); var watcher = new NatsKVWatcher( - context: _context, + context: JetStreamContext, bucket: Bucket, keys: keys, opts: opts, diff --git a/src/NATS.Client.ObjectStore/INatsObjContext.cs b/src/NATS.Client.ObjectStore/INatsObjContext.cs index 7d31337cc..f5dfd00a0 100644 --- a/src/NATS.Client.ObjectStore/INatsObjContext.cs +++ b/src/NATS.Client.ObjectStore/INatsObjContext.cs @@ -1,3 +1,5 @@ +using NATS.Client.JetStream; + namespace NATS.Client.ObjectStore; /// @@ -5,6 +7,11 @@ namespace NATS.Client.ObjectStore; /// public interface INatsObjContext { + /// + /// Provides access to the JetStream context associated with the Object Store operations. + /// + INatsJSContext JetStreamContext { get; } + /// /// Create a new object store. /// diff --git a/src/NATS.Client.ObjectStore/INatsObjStore.cs b/src/NATS.Client.ObjectStore/INatsObjStore.cs index 255a5618b..68bb23e6d 100644 --- a/src/NATS.Client.ObjectStore/INatsObjStore.cs +++ b/src/NATS.Client.ObjectStore/INatsObjStore.cs @@ -8,6 +8,11 @@ namespace NATS.Client.ObjectStore; /// public interface INatsObjStore { + /// + /// Provides access to the JetStream context associated with the Object Store operations. + /// + INatsJSContext JetStreamContext { get; } + /// /// Object store bucket name. /// diff --git a/src/NATS.Client.ObjectStore/NatsClientExtensions.cs b/src/NATS.Client.ObjectStore/NatsClientExtensions.cs new file mode 100644 index 000000000..104c7a2e0 --- /dev/null +++ b/src/NATS.Client.ObjectStore/NatsClientExtensions.cs @@ -0,0 +1,33 @@ +using NATS.Client.Core; +using NATS.Client.JetStream; +using NATS.Client.ObjectStore; + +// ReSharper disable once CheckNamespace +namespace NATS.Net; + +public static class NatsClientExtensions +{ + /// + /// Creates a NATS Object Store context for the given NATS client. + /// + /// The NATS client instance. + /// An instance of used for interacting with the NATS Object Store. + public static INatsObjContext CreateObjectStoreContext(this INatsClient client) + => CreateObjectStoreContext(client.CreateJetStreamContext()); + + /// + /// Creates a NATS Object Store context for the given NATS connection. + /// + /// The NATS connection instance. + /// An instance of used for interacting with the NATS Object Store. + public static INatsObjContext CreateObjectStoreContext(this INatsConnection connection) + => CreateObjectStoreContext(connection.CreateJetStreamContext()); + + /// + /// Creates a NATS Object Store context for the given NATS JetStream context. + /// + /// The NATS JetStream context instance. + /// An instance of used for interacting with the NATS Object Store. + public static INatsObjContext CreateObjectStoreContext(this INatsJSContext context) + => new NatsObjContext(context); +} diff --git a/src/NATS.Client.ObjectStore/NatsObjContext.cs b/src/NATS.Client.ObjectStore/NatsObjContext.cs index c2afb850f..969239e17 100644 --- a/src/NATS.Client.ObjectStore/NatsObjContext.cs +++ b/src/NATS.Client.ObjectStore/NatsObjContext.cs @@ -12,29 +12,20 @@ public class NatsObjContext : INatsObjContext { private static readonly Regex ValidBucketRegex = new(pattern: @"\A[a-zA-Z0-9_-]+\z", RegexOptions.Compiled); - private readonly NatsJSContext _context; - /// /// Create a new object store context. /// /// JetStream context. - public NatsObjContext(NatsJSContext context) => _context = context; + public NatsObjContext(INatsJSContext context) => JetStreamContext = context; - /// - /// Create a new object store. - /// - /// Bucket name. - /// A used to cancel the API call. - /// Object store object. + /// + public INatsJSContext JetStreamContext { get; } + + /// public ValueTask CreateObjectStoreAsync(string bucket, CancellationToken cancellationToken = default) => CreateObjectStoreAsync(new NatsObjConfig(bucket), cancellationToken); - /// - /// Create a new object store. - /// - /// Object store configuration. - /// A used to cancel the API call. - /// Object store object. + /// public async ValueTask CreateObjectStoreAsync(NatsObjConfig config, CancellationToken cancellationToken = default) { ValidateBucketName(config.Bucket); @@ -61,33 +52,23 @@ public async ValueTask CreateObjectStoreAsync(NatsObjConfig confi Compression = config.Compression ? StreamConfigCompression.S2 : StreamConfigCompression.None, }; - var stream = await _context.CreateStreamAsync(streamConfig, cancellationToken); - return new NatsObjStore(config, this, _context, stream); + var stream = await JetStreamContext.CreateStreamAsync(streamConfig, cancellationToken); + return new NatsObjStore(config, this, JetStreamContext, stream); } - /// - /// Get an existing object store. - /// - /// Bucket name - /// A used to cancel the API call. - /// The Object Store object + /// public async ValueTask GetObjectStoreAsync(string bucket, CancellationToken cancellationToken = default) { ValidateBucketName(bucket); - var stream = await _context.GetStreamAsync($"OBJ_{bucket}", cancellationToken: cancellationToken); - return new NatsObjStore(new NatsObjConfig(bucket), this, _context, stream); + var stream = await JetStreamContext.GetStreamAsync($"OBJ_{bucket}", cancellationToken: cancellationToken); + return new NatsObjStore(new NatsObjConfig(bucket), this, JetStreamContext, stream); } - /// - /// Delete an object store. - /// - /// Name of the bucket. - /// A used to cancel the API call. - /// Whether delete was successful or not. + /// public ValueTask DeleteObjectStore(string bucket, CancellationToken cancellationToken) { ValidateBucketName(bucket); - return _context.DeleteStreamAsync($"OBJ_{bucket}", cancellationToken); + return JetStreamContext.DeleteStreamAsync($"OBJ_{bucket}", cancellationToken); } private void ValidateBucketName(string bucket) diff --git a/src/NATS.Client.ObjectStore/NatsObjStore.cs b/src/NATS.Client.ObjectStore/NatsObjStore.cs index 5e6c8006a..7fdc28f67 100644 --- a/src/NATS.Client.ObjectStore/NatsObjStore.cs +++ b/src/NATS.Client.ObjectStore/NatsObjStore.cs @@ -28,28 +28,23 @@ public class NatsObjStore : INatsObjStore private static readonly NatsHeaders NatsRollupHeaders = new() { { NatsRollup, RollupSubject } }; private readonly NatsObjContext _objContext; - private readonly NatsJSContext _context; private readonly INatsJSStream _stream; - internal NatsObjStore(NatsObjConfig config, NatsObjContext objContext, NatsJSContext context, INatsJSStream stream) + internal NatsObjStore(NatsObjConfig config, NatsObjContext objContext, INatsJSContext context, INatsJSStream stream) { Bucket = config.Bucket; _objContext = objContext; - _context = context; + JetStreamContext = context; _stream = stream; } - /// - /// Object store bucket name. - /// + /// + public INatsJSContext JetStreamContext { get; } + + /// public string Bucket { get; } - /// - /// Get object by key. - /// - /// Object key. - /// A used to cancel the API call. - /// Object value as a byte array. + /// public async ValueTask GetBytesAsync(string key, CancellationToken cancellationToken = default) { using var memoryStream = new MemoryStream(); @@ -57,15 +52,7 @@ public async ValueTask GetBytesAsync(string key, CancellationToken cance return memoryStream.ToArray(); } - /// - /// Get object by key. - /// - /// Object key. - /// Stream to write the object value to. - /// true to not close the underlying stream when async method returns; otherwise, false - /// A used to cancel the API call. - /// Object metadata. - /// Metadata didn't match the value retrieved e.g. the SHA digest. + /// public async ValueTask GetAsync(string key, Stream stream, bool leaveOpen = false, CancellationToken cancellationToken = default) { ValidateObjectName(key); @@ -84,7 +71,7 @@ public async ValueTask GetAsync(string key, Stream stream, bool } await using var pushConsumer = new NatsJSOrderedPushConsumer>( - context: _context, + context: JetStreamContext, stream: $"OBJ_{Bucket}", filter: GetChunkSubject(info.Nuid), serializer: NatsDefaultSerializer>.Default, @@ -158,39 +145,15 @@ public async ValueTask GetAsync(string key, Stream stream, bool return info; } - /// - /// Put an object by key. - /// - /// Object key. - /// Object value as a byte array. - /// A used to cancel the API call. - /// Object metadata. + /// public ValueTask PutAsync(string key, byte[] value, CancellationToken cancellationToken = default) => PutAsync(new ObjectMetadata { Name = key }, new MemoryStream(value), cancellationToken: cancellationToken); - /// - /// Put an object by key. - /// - /// Object key. - /// Stream to read the value from. - /// true to not close the underlying stream when async method returns; otherwise, false - /// A used to cancel the API call. - /// Object metadata. - /// There was an error calculating SHA digest. - /// Server responded with an error. + /// public ValueTask PutAsync(string key, Stream stream, bool leaveOpen = false, CancellationToken cancellationToken = default) => PutAsync(new ObjectMetadata { Name = key }, stream, leaveOpen, cancellationToken); - /// - /// Put an object by key. - /// - /// Object metadata. - /// Stream to read the value from. - /// true to not close the underlying stream when async method returns; otherwise, false - /// A used to cancel the API call. - /// Object metadata. - /// There was an error calculating SHA digest. - /// Server responded with an error. + /// public async ValueTask PutAsync(ObjectMetadata meta, Stream stream, bool leaveOpen = false, CancellationToken cancellationToken = default) { ValidateObjectName(meta.Name); @@ -294,7 +257,7 @@ public async ValueTask PutAsync(ObjectMetadata meta, Stream stre var buffer = memoryOwner.Slice(0, currentChunkSize); // Chunks - var ack = await _context.PublishAsync(GetChunkSubject(nuid), buffer, serializer: NatsRawSerializer>.Default, cancellationToken: cancellationToken); + var ack = await JetStreamContext.PublishAsync(GetChunkSubject(nuid), buffer, serializer: NatsRawSerializer>.Default, cancellationToken: cancellationToken); ack.EnsureSuccess(); if (eof) @@ -320,8 +283,8 @@ public async ValueTask PutAsync(ObjectMetadata meta, Stream stre { try { - await _context.JSRequestResponseAsync( - subject: $"{_context.Opts.Prefix}.STREAM.PURGE.OBJ_{Bucket}", + await JetStreamContext.JSRequestResponseAsync( + subject: $"{JetStreamContext.Opts.Prefix}.STREAM.PURGE.OBJ_{Bucket}", request: new StreamPurgeRequest { Filter = GetChunkSubject(info.Nuid), @@ -338,14 +301,7 @@ await _context.JSRequestResponseAsync( return meta; } - /// - /// Update object metadata - /// - /// Object key - /// Object metadata - /// A used to cancel the API call. - /// Object metadata - /// There is already an object with the same name + /// public async ValueTask UpdateMetaAsync(string key, ObjectMetadata meta, CancellationToken cancellationToken = default) { ValidateObjectName(meta.Name); @@ -375,23 +331,11 @@ public async ValueTask UpdateMetaAsync(string key, ObjectMetadat return info; } - /// - /// Add a link to another object - /// - /// Link name - /// Target object's name - /// A used to cancel the API call. - /// Metadata of the new link object + /// public ValueTask AddLinkAsync(string link, string target, CancellationToken cancellationToken = default) => AddLinkAsync(link, new ObjectMetadata { Name = target, Bucket = Bucket }, cancellationToken); - /// - /// Add a link to another object - /// - /// Link name - /// Target object's metadata - /// A used to cancel the API call. - /// Metadata of the new link object + /// public async ValueTask AddLinkAsync(string link, ObjectMetadata target, CancellationToken cancellationToken = default) { ValidateObjectName(link); @@ -444,14 +388,7 @@ public async ValueTask AddLinkAsync(string link, ObjectMetadata return info; } - /// - /// Add a link to another object store - /// - /// Object's name to be linked - /// Target object store - /// A used to cancel the API call. - /// Metadata of the new link object - /// Object with the same name already exists + /// public async ValueTask AddBucketLinkAsync(string link, INatsObjStore target, CancellationToken cancellationToken = default) { ValidateObjectName(link); @@ -488,23 +425,19 @@ public async ValueTask AddBucketLinkAsync(string link, INatsObjS return info; } - /// - /// Seal the object store. No further modifications will be allowed. - /// - /// A used to cancel the API call. - /// Update operation failed + /// public async ValueTask SealAsync(CancellationToken cancellationToken = default) { - var info = await _context.JSRequestResponseAsync( - subject: $"{_context.Opts.Prefix}.STREAM.INFO.{_stream.Info.Config.Name}", + var info = await JetStreamContext.JSRequestResponseAsync( + subject: $"{JetStreamContext.Opts.Prefix}.STREAM.INFO.{_stream.Info.Config.Name}", request: null, cancellationToken).ConfigureAwait(false); var config = info.Config; config.Sealed = true; - var response = await _context.JSRequestResponseAsync( - subject: $"{_context.Opts.Prefix}.STREAM.UPDATE.{_stream.Info.Config.Name}", + var response = await JetStreamContext.JSRequestResponseAsync( + subject: $"{JetStreamContext.Opts.Prefix}.STREAM.UPDATE.{_stream.Info.Config.Name}", request: config, cancellationToken); @@ -514,14 +447,7 @@ public async ValueTask SealAsync(CancellationToken cancellationToken = default) } } - /// - /// Get object metadata by key. - /// - /// Object key. - /// Also retrieve deleted objects. - /// A used to cancel the API call. - /// Object metadata. - /// Object was not found. + /// public async ValueTask GetInfoAsync(string key, bool showDeleted = false, CancellationToken cancellationToken = default) { ValidateObjectName(key); @@ -562,12 +488,7 @@ public async ValueTask GetInfoAsync(string key, bool showDeleted } } - /// - /// List all the objects in this store. - /// - /// List options - /// A used to cancel the API call. - /// An async enumerable object metadata to be used in an await foreach + /// public IAsyncEnumerable ListAsync(NatsObjListOpts? opts = default, CancellationToken cancellationToken = default) { opts ??= new NatsObjListOpts(); @@ -581,11 +502,7 @@ public IAsyncEnumerable ListAsync(NatsObjListOpts? opts = defaul return WatchAsync(watchOpts, cancellationToken); } - /// - /// Retrieves run-time status about the backing store of the bucket. - /// - /// A used to cancel the API call. - /// Object store status + /// public async ValueTask GetStatusAsync(CancellationToken cancellationToken = default) { await _stream.RefreshAsync(cancellationToken); @@ -593,12 +510,7 @@ public async ValueTask GetStatusAsync(CancellationToken cancellat return new NatsObjStatus(Bucket, isCompressed, _stream.Info); } - /// - /// Watch for changes in the underlying store and receive meta information updates. - /// - /// Watch options - /// A used to cancel the API call. - /// An async enumerable object metadata to be used in an await foreach + /// public async IAsyncEnumerable WatchAsync(NatsObjWatchOpts? opts = default, [EnumeratorCancellation] CancellationToken cancellationToken = default) { opts ??= new NatsObjWatchOpts(); @@ -616,7 +528,7 @@ public async IAsyncEnumerable WatchAsync(NatsObjWatchOpts? opts } await using var pushConsumer = new NatsJSOrderedPushConsumer>( - context: _context, + context: JetStreamContext, stream: $"OBJ_{Bucket}", filter: $"$O.{Bucket}.M.>", serializer: NatsDefaultSerializer>.Default, @@ -662,12 +574,7 @@ public async IAsyncEnumerable WatchAsync(NatsObjWatchOpts? opts } } - /// - /// Delete an object by key. - /// - /// Object key. - /// A used to cancel the API call. - /// Object metadata was invalid or chunks can't be purged. + /// public async ValueTask DeleteAsync(string key, CancellationToken cancellationToken = default) { ValidateObjectName(key); @@ -696,7 +603,7 @@ public async ValueTask DeleteAsync(string key, CancellationToken cancellationTok private async ValueTask PublishMeta(ObjectMetadata meta, CancellationToken cancellationToken) { - var ack = await _context.PublishAsync(GetMetaSubject(meta.Name), meta, serializer: NatsObjJsonSerializer.Default, headers: NatsRollupHeaders, cancellationToken: cancellationToken); + var ack = await JetStreamContext.PublishAsync(GetMetaSubject(meta.Name), meta, serializer: NatsObjJsonSerializer.Default, headers: NatsRollupHeaders, cancellationToken: cancellationToken); ack.EnsureSuccess(); } diff --git a/src/NATS.Client.Services/INatsSvcContext.cs b/src/NATS.Client.Services/INatsSvcContext.cs index b06513dcf..1e8a683fa 100644 --- a/src/NATS.Client.Services/INatsSvcContext.cs +++ b/src/NATS.Client.Services/INatsSvcContext.cs @@ -1,3 +1,5 @@ +using NATS.Client.Core; + namespace NATS.Client.Services; /// @@ -5,6 +7,11 @@ namespace NATS.Client.Services; /// public interface INatsSvcContext { + /// + /// Gets the associated NATS connection. + /// + INatsConnection Connection { get; } + /// /// Adds a new service. /// diff --git a/src/NATS.Client.Services/Internal/SvcListener.cs b/src/NATS.Client.Services/Internal/SvcListener.cs index 6b3edb220..72ce5fd16 100644 --- a/src/NATS.Client.Services/Internal/SvcListener.cs +++ b/src/NATS.Client.Services/Internal/SvcListener.cs @@ -7,7 +7,7 @@ namespace NATS.Client.Services.Internal; internal class SvcListener : IAsyncDisposable { private readonly ILogger _logger; - private readonly NatsConnection _nats; + private readonly INatsConnection _nats; private readonly Channel _channel; private readonly SvcMsgType _type; private readonly string _subject; @@ -16,7 +16,7 @@ internal class SvcListener : IAsyncDisposable private INatsSub>? _sub; private Task? _readLoop; - public SvcListener(ILogger logger, NatsConnection nats, Channel channel, SvcMsgType type, string subject, string? queueGroup, CancellationToken cancellationToken) + public SvcListener(ILogger logger, INatsConnection nats, Channel channel, SvcMsgType type, string subject, string? queueGroup, CancellationToken cancellationToken) { _logger = logger; _nats = nats; diff --git a/src/NATS.Client.Services/NatsClientExtensions.cs b/src/NATS.Client.Services/NatsClientExtensions.cs new file mode 100644 index 000000000..ba0015af1 --- /dev/null +++ b/src/NATS.Client.Services/NatsClientExtensions.cs @@ -0,0 +1,24 @@ +using NATS.Client.Core; +using NATS.Client.Services; + +// ReSharper disable once CheckNamespace +namespace NATS.Net; + +public static class NatsClientExtensions +{ + /// + /// Creates a NATS Services context for the given NATS client. + /// + /// The NATS client for which to create the services context. + /// An instance of used for interacting with the NATS Services. + public static INatsSvcContext CreateServicesContext(this INatsClient client) + => CreateServicesContext(client.Connection); + + /// + /// Creates a NATS Services context for the given NATS connection. + /// + /// The NATS connection for which to create the services context. + /// An instance of used for interacting with the NATS Services. + public static INatsSvcContext CreateServicesContext(this INatsConnection connection) + => new NatsSvcContext(connection); +} diff --git a/src/NATS.Client.Services/NatsSvcContext.cs b/src/NATS.Client.Services/NatsSvcContext.cs index cfcfb7eb0..9a450c31e 100644 --- a/src/NATS.Client.Services/NatsSvcContext.cs +++ b/src/NATS.Client.Services/NatsSvcContext.cs @@ -7,13 +7,14 @@ namespace NATS.Client.Services; /// public class NatsSvcContext : INatsSvcContext { - private readonly NatsConnection _nats; - /// /// Creates a new instance of . /// /// NATS connection. - public NatsSvcContext(NatsConnection nats) => _nats = nats; + public NatsSvcContext(INatsConnection nats) => Connection = nats; + + /// + public INatsConnection Connection { get; } /// /// Adds a new service. @@ -34,7 +35,7 @@ public ValueTask AddServiceAsync(string name, string version, st /// NATS Service instance. public async ValueTask AddServiceAsync(NatsSvcConfig config, CancellationToken cancellationToken = default) { - var service = new NatsSvcServer(_nats, config, cancellationToken); + var service = new NatsSvcServer(Connection, config, cancellationToken); await service.StartAsync().ConfigureAwait(false); return service; } diff --git a/src/NATS.Client.Services/NatsSvcEndPoint.cs b/src/NATS.Client.Services/NatsSvcEndPoint.cs index 8e6947c2f..6db64067a 100644 --- a/src/NATS.Client.Services/NatsSvcEndPoint.cs +++ b/src/NATS.Client.Services/NatsSvcEndPoint.cs @@ -69,7 +69,7 @@ public interface INatsSvcEndpoint : IAsyncDisposable /// public abstract class NatsSvcEndpointBase : NatsSubBase, INatsSvcEndpoint { - protected NatsSvcEndpointBase(NatsConnection connection, string subject, string? queueGroup, NatsSubOpts? opts) + protected NatsSvcEndpointBase(INatsConnection connection, string subject, string? queueGroup, NatsSubOpts? opts) : base(connection, connection.SubscriptionManager, subject, queueGroup, opts) { } @@ -108,7 +108,7 @@ public class NatsSvcEndpoint : NatsSvcEndpointBase { private readonly ILogger _logger; private readonly Func, ValueTask> _handler; - private readonly NatsConnection _nats; + private readonly INatsConnection _nats; private readonly CancellationToken _cancellationToken; private readonly Channel> _channel; private readonly INatsDeserialize _serializer; @@ -131,7 +131,7 @@ public class NatsSvcEndpoint : NatsSvcEndpointBase /// Serializer to use for the message type. /// Subscription options. /// A used to cancel the API call. - public NatsSvcEndpoint(NatsConnection nats, string? queueGroup, string name, Func, ValueTask> handler, string subject, IDictionary? metadata, INatsDeserialize serializer, NatsSubOpts? opts, CancellationToken cancellationToken) + public NatsSvcEndpoint(INatsConnection nats, string? queueGroup, string name, Func, ValueTask> handler, string subject, IDictionary? metadata, INatsDeserialize serializer, NatsSubOpts? opts, CancellationToken cancellationToken) : base(nats, subject, queueGroup, opts) { _logger = nats.Opts.LoggerFactory.CreateLogger>(); diff --git a/src/NATS.Client.Services/NatsSvcServer.cs b/src/NATS.Client.Services/NatsSvcServer.cs index 08f4379b4..1b30c21c8 100644 --- a/src/NATS.Client.Services/NatsSvcServer.cs +++ b/src/NATS.Client.Services/NatsSvcServer.cs @@ -16,7 +16,7 @@ public class NatsSvcServer : INatsSvcServer { private readonly ILogger _logger; private readonly string _id; - private readonly NatsConnection _nats; + private readonly INatsConnection _nats; private readonly NatsSvcConfig _config; private readonly Channel _channel; private readonly Task _taskMsgLoop; @@ -31,7 +31,7 @@ public class NatsSvcServer : INatsSvcServer /// NATS connection. /// Service configuration. /// A used to cancel the service creation requests. - public NatsSvcServer(NatsConnection nats, NatsSvcConfig config, CancellationToken cancellationToken) + public NatsSvcServer(INatsConnection nats, NatsSvcConfig config, CancellationToken cancellationToken) { _logger = nats.Opts.LoggerFactory.CreateLogger(); _id = Nuid.NewNuid(); diff --git a/src/NATS.Client.Simplified/NatsClientDefaultSerializer.cs b/src/NATS.Client.Simplified/NatsClientDefaultSerializer.cs index 811c9571c..7dee8b0cb 100644 --- a/src/NATS.Client.Simplified/NatsClientDefaultSerializer.cs +++ b/src/NATS.Client.Simplified/NatsClientDefaultSerializer.cs @@ -14,13 +14,9 @@ public static class NatsClientDefaultSerializer /// public static readonly INatsSerializer Default; - static NatsClientDefaultSerializer() - { - Default = new NatsSerializerBuilder() - .Add(new NatsRawSerializer()) - .Add(new NatsUtf8PrimitivesSerializer()) - .Add(new NatsJsonSerializer()) - .Build(); - Console.WriteLine($"Default serializer for {typeof(T).Name} is {Default.GetType().Name}"); - } + static NatsClientDefaultSerializer() => Default = new NatsSerializerBuilder() + .Add(new NatsRawSerializer()) + .Add(new NatsUtf8PrimitivesSerializer()) + .Add(new NatsJsonSerializer()) + .Build(); } diff --git a/tests/NATS.Client.KeyValueStore.Tests/NatsKVContextFactoryTest.cs b/tests/NATS.Client.KeyValueStore.Tests/NatsKVContextFactoryTest.cs index 9f63aeada..f3700d222 100644 --- a/tests/NATS.Client.KeyValueStore.Tests/NatsKVContextFactoryTest.cs +++ b/tests/NATS.Client.KeyValueStore.Tests/NatsKVContextFactoryTest.cs @@ -93,5 +93,12 @@ public class MockJsContext : INatsJSContext public IAsyncEnumerable ListStreamNamesAsync(string? subject = default, CancellationToken cancellationToken = default) => throw new NotImplementedException(); public ValueTask PublishConcurrentAsync(string subject, T? data, INatsSerialize? serializer = default, NatsJSPubOpts? opts = default, NatsHeaders? headers = default, CancellationToken cancellationToken = default) => throw new NotImplementedException(); + + public string NewBaseInbox() => throw new NotImplementedException(); + + public ValueTask JSRequestResponseAsync(string subject, TRequest? request, CancellationToken cancellationToken = default) + where TRequest : class + where TResponse : class + => throw new NotImplementedException(); } } diff --git a/tests/NATS.Client.ObjectStore.Tests/NatsObjContextFactoryTest.cs b/tests/NATS.Client.ObjectStore.Tests/NatsObjContextFactoryTest.cs index 1ac2c470b..adc5eef32 100644 --- a/tests/NATS.Client.ObjectStore.Tests/NatsObjContextFactoryTest.cs +++ b/tests/NATS.Client.ObjectStore.Tests/NatsObjContextFactoryTest.cs @@ -93,5 +93,12 @@ public class MockJsContext : INatsJSContext public IAsyncEnumerable ListStreamNamesAsync(string? subject = default, CancellationToken cancellationToken = default) => throw new NotImplementedException(); public ValueTask PublishConcurrentAsync(string subject, T? data, INatsSerialize? serializer = default, NatsJSPubOpts? opts = default, NatsHeaders? headers = default, CancellationToken cancellationToken = default) => throw new NotImplementedException(); + + public string NewBaseInbox() => throw new NotImplementedException(); + + public ValueTask JSRequestResponseAsync(string subject, TRequest? request, CancellationToken cancellationToken = default) + where TRequest : class + where TResponse : class + => throw new NotImplementedException(); } }