Replies: 7 comments
-
I assume you mean core publishing versus JetStream since for JS you can use the PubAck to know if something is published. I suppose you could treat the library as internal and access the connection data structures and manually look through the queue. But otherwise I'm not seeing a solution to your question. |
Beta Was this translation helpful? Give feedback.
-
Best location would be to extend the The main issue is not that messages can be lost, Once the connection instance is successfully created and registered in an DI container. |
Beta Was this translation helpful? Give feedback.
-
The client will try to reconnect to other servers in the cluster, and when it does it will send any messages in it's queue. It will also try to reconnect to single servers, but it might happen so fast, before the server is back up. |
Beta Was this translation helpful? Give feedback.
-
The connection state after a permission valuation is "closed" I made an implantation with an Its not ideal but better then be stuck with an closed connection. readonly ConcurrentDictionary<string, IConnection> _connections = new (1, 8, StringComparer.OrdinalIgnoreCase);
Subject<(string Name, IConnection? Connection)> _subject = default!;
//...
public IConnection GetConnection(string? name = null)
{
if (!_running)
throw new InvalidOperationException("nats service not running");
if (string.IsNullOrEmpty(name))
name = "Default";
return _connections.GetOrAdd(name, CreateConnection);
}
public IObservable<IConnection> GetLatestConnection(string? name = null)
{
if (!_running)
throw new InvalidOperationException("nats service not running");
if (string.IsNullOrEmpty(name))
name = "Default";
var connection = GetConnection(name);
return _subject.Where(n => StringComparer.OrdinalIgnoreCase.Equals(name, n.Name))
.Select(n => n.Connection is null ? GetConnection(name) : n.Connection)
.Append(connection)
.DistinctUntilChanged();
}
private IConnection CreateConnection(string name)
{
if (!_running)
throw new InvalidOperationException("nats service not running");
var opt = _optionsMonitor.Get(StringComparer.OrdinalIgnoreCase.Equals(name, "Default")
? Microsoft.Extensions.Options.Options.DefaultName
: name);
var connection = MyNatsUtils.CreateConnection(opt, o =>
{
o.Name = name;
o.DisconnectedEventHandler += (s, a) =>
{
if(a.Error is not null)
_logger.LogError(a.Error, "Connection[{name}] disconnected", a.Conn.Opts.Name);
else if(_logger.IsEnabled(LogLevel.Debug))
_logger.LogDebug("Connection[{name}] disconnected", a.Conn.Opts.Name);
};
o.ClosedEventHandler += (s, a) =>
{
var removed = _connections.TryRemove(new KeyValuePair<string, IConnection>(a.Conn.Opts.Name, a.Conn));
if (a.Error is not null)
_logger.LogError(a.Error, "Connection[{name}] closed", a.Conn.Opts.Name);
else if (_logger.IsEnabled(LogLevel.Debug))
_logger.LogDebug("Connection[{name}] closed", a.Conn.Opts.Name);
if(removed)
_subject.OnNext((name, null));
};
});
_subject.OnNext((name, connection));
return connection;
} |
Beta Was this translation helpful? Give feedback.
-
I think we need to start over. Can you elaborate on "a not allowed subject"? |
Beta Was this translation helpful? Give feedback.
-
As far as DI, maybe don't inject the connection, maybe wrap the connection in a class with event awareness sort of a proxy or for publishing, you could even use something like a pool of connections. |
Beta Was this translation helpful? Give feedback.
-
Its fine, its only a logical misalignment with the nats It behaves for the user as auto recovery, always active connection Example would be on an `Error from processErr(): permissions violation for publish to "msg.notallowed.subject" The users of the instance would need to take it into account before each publish. Yes, everything can be resolved with one extra layer of abstraction. But its not always a good idea, beside the normal Better solution would be to "update" the nats.net client, Improvments:
At the end I tested the nats.net now for 2 weeks and there was no issue, The performance does not really matter for a message rate under 200msg/s |
Beta Was this translation helpful? Give feedback.
-
Is it possible to get all unpublished messages from the buffer after the connection gets closed (by the server) ?
I run into this problem when a component tried to publish a message on a not allowed subject with a shared connection.
The shared connection got closed server-side and it seams there is no why to retrieve/rescue buffered message.
Beta Was this translation helpful? Give feedback.
All reactions