Can I have a small sample of sending 40k messages/second? #530
Replies: 10 comments 1 reply
-
Thank you for using NATS! I'm not aware of any issues that would slow down publishing that much. You're likely encountering a lot of overhead with the additional tasks being created by I'd suggest publishing NATS messages inline in your request handler instead of creating a Task for each request. Internally, the NATS Publish API will buffer data and then another thread will flush messages making the publish APIs extremely efficient. Also, the publish API is thread-safe so the parallelized publish calls would end up serialized anyhow. Here's a suggested change: [HttpPost("notification")]
public ActionResult PublishMessage(IEnumerable<Notification> payload)
{
if (payload == null)
return BadRequest("Empty payload");
var notificationBatches = payload.Batch(5, true);
foreach (var notifications in notificationBatches)
{
_connection.Publish(Encoding.UTF8.GetBytes(System.Text.Json.JsonSerializer.Serialize(notification)));
}
_connection.FlushBuffer(); // Optional to reduce latency
return Ok();
} As you progress, you'll also want be aware the that publish API can throw exceptions and you could respond with with a You'll also want to take care that you don't miss notifications in the batches if there are more than 5. I hope this helps, let us know how it goes! |
Beta Was this translation helpful? Give feedback.
-
Thanks for quick reply, I will try it now. About the payload size, it may up to 40 or 50, then I split it in smaller chunks (in this sample is 5 notifications a patch) About the task, I see there's a sample like this (I thought this best practice) |
Beta Was this translation helpful? Give feedback.
-
@ColinSullivan1
Here's the code of the pool: public class NatsConnectionPool
{
private readonly LinkedList<IConnection> _connections;
private readonly bool _useRoundRobbin;
private LinkedListNode<IConnection> _connetionNode;
/// <exception cref="T:NATS.Client.NATSNoServersException">No connection to a NATS Server could be established.</exception>
/// <exception cref="T:NATS.Client.NATSConnectionException">
/// <para>A timeout occurred connecting to a NATS Server.</para>
/// <para>-or-</para>
/// <para>
/// An exception was encountered while connecting to a NATS Server. See
/// <see cref="P:System.Exception.InnerException" /> for more
/// details.
/// </para>
/// </exception>
public NatsConnectionPool(Configuration configuration)
{
_useRoundRobbin = configuration.UseConnectionPool;
var cf = new ConnectionFactory();
var opts = ConnectionFactory.GetDefaultOptions();
opts.AllowReconnect = true;
opts.MaxReconnect = Options.ReconnectForever;
_connections = new LinkedList<IConnection>();
if (_useRoundRobbin)
{
for (var i = 0; i < configuration.NatsConnectionPoolSize; i++)
foreach (var serverConfig in configuration.NatsServers)
{
opts.Servers = serverConfig.Split(',');
_connections.AddLast(cf.CreateConnection(opts));
}
}
else
{
opts.Servers = configuration.NatsServers[0].Split(',');
_connections.AddLast(cf.CreateConnection(opts));
}
}
public IConnection Connection
{
get
{
_connetionNode ??= _connections.First;
if (_useRoundRobbin)
_connetionNode = _connetionNode.Next ?? _connections.First;
// ReSharper disable once PossibleNullReferenceException
return _connetionNode.Value;
}
}
} And the actually code I'm testing (yesterday I posted from home, now I'm at the office) [HttpPost("{customer}")]
public IActionResult PublishNotificationAsync([FromRoute] [Required] string customer,
[FromBody] IEnumerable<DatabaseNotification> databaseNotifications)
{
if (databaseNotifications == null)
return BadRequest("Empty payload");
var connection = _connectionPool.Connection; //get a connection
var notificationGroups = databaseNotifications.Batch(_configuration.BatchSize, true);
foreach (var notificationGroup in notificationGroups)
foreach (var notification in notificationGroup)
{
var subject =
$"{_configuration.NatsSubject}.{customer.ToLower()}.{notification.Table?.ToLower()}.{notification.Action}";
notification.Customer = customer;
var serializedData =
JsonSerializer.Serialize(notification, _configuration.JsonSerializerOptions);
var payload = Encoding.UTF8.GetBytes(serializedData);
for (var i = 0; i < 3; i++)
try
{
connection.Publish(subject, payload);
break;
}
// ReSharper disable once CatchAllClause
catch (Exception e)
{
if (i < 2)
continue;
LogError(customer,
$"Fail to publishing for 3 times: [{serializedData}], error: {e.GetBaseException().Message}");
}
}
connection.FlushBuffer();
return Ok();
} And the configuration {
"ConnectionStrings": {
"NATSServers": [
"nats://0.0.0.178:4222,nats://0.0.0.179:4222,nats://0.0.0.180:4222",
"nats://0.0.0.179:4222,nats://0.0.0.180:4222,nats://0.0.0.178:4222",
"nats://0.0.0.180:4222,nats://0.0.0.178:4222,nats://0.0.0.179:4222"
] //config like this to support round robin logic
},
"Kestrel": {
"Limits": {
"MaxConcurrentConnections": null,
"MaxConcurrentUpgradedConnections": null,
"MaxRequestBufferSize": null,
"MaxRequestHeaderCount": 4096,
"MaxRequestHeadersTotalSize": 28672,
"MaxRequestBodySize": null,
"MaxResponseBufferSize": null,
"AddServerHeader": false
},
"EndPoints": {
"Http": {
"Url": "http://*:7500"
}
}
},
"Settings": {
"Subject": "subject to send",
"Batch": 5,
"NatsConnectionPoolSize": 2, // There will be a total of ([configValue] * ConnectionStrings:NATSServers connections
"UseConnectionPool":
true //to use connection pool or a single connection, in both cases, the connection pool will always have connection(s) inside
}
} Please hep me to make a try with this setup :) |
Beta Was this translation helpful? Give feedback.
-
Your response times are not apples to apples in these cases. In your In any case, Two notes:
Given [HttpPost("notification")]
public ActionResult PublishMessage(
[FromServices] IMessageQueue messageQueue,
[FromBody] IEnumerable<Notification> payload)
{
if (payload == null)
return BadRequest("Empty payload");
foreach (var notification in payload)
{
await messageQueue.EnqueueAsync(CreateMessage(notification));
}
return Ok();
} With a background service like: public class MessageSenderBackgroundService : BackgroundService
{
private readonly IMessageQueue _messageQueue;
public MessageSenderBackgroundService(IMessageQueue messageQueue)
{
_messageQueue = messageQueue;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
using (var connection = /* create or inject your NATS connection */)
{
while (!token.IsCancellationRequested)
{
Message message = await _messageQueue.DequeueAsync(stoppingToken);
connection.Publish(message.Subject, message.Payload);
// pick some heuristic to flush if the default isn't reasonable
}
}
}
} |
Beta Was this translation helpful? Give feedback.
-
We'll also want to check that the test isn't running up against network bandwidth limits. How big is a request in bytes, and how many bytes are the NATS message payloads? Also, what are the network links you are are testing this over between the HTTP client, HTTP server, and NATS server? (e.g. 10Mbs wifi, 100Mbs ether, etc.) |
Beta Was this translation helpful? Give feedback.
-
Thanks you, I will try @watfordgnf's suggestion and feedback the result.
public class DatabaseNotification
{
[JsonPropertyName("id")] public long? Id { get; set; }
[JsonPropertyName("table")] public string Table { get; set; }
[JsonPropertyName("createdDate")] public DateTime? CreatedDate { get; set; }
[JsonPropertyName("modifiedDate")] public DateTime? ModifiedDate { get; set; }
[JsonPropertyName("deletedDate")] public DateTime? DeletedDate { get; set; }
}
|
Beta Was this translation helpful? Give feedback.
-
@haiduong87 so the ActixWeb setup used nats.rs as its NATS client? I'd be curious to know whether it called Publish in a blocking loop or if it put it into a background queue and returned immediately. |
Beta Was this translation helpful? Give feedback.
-
@watfordgnf Controller: [HttpPost("{customer}")]
public IActionResult PublishNotificationAsync([FromRoute][Required] string customer,
[FromBody] IEnumerable<DatabaseNotification> databaseNotifications)
{
if (databaseNotifications == null)
return BadRequest("Empty payload");
foreach (var notification in databaseNotifications)
{
notification.Customer = customer;
_notificationChannelWriter.WriteAsync(notification);
}
return Ok();
} Worker class: public class NatsWorker : BackgroundService
{
private readonly Configuration _configuration;
private readonly NatsConnectionPool _connectionPool;
private readonly ILogger _logger;
private readonly ChannelReader<DatabaseNotification> _notificationChannelReader;
public NatsWorker(Channel<DatabaseNotification> notificationChannel, Configuration configuration,
ILogger<NatsWorker> logger, NatsConnectionPool connectionPool)
{
_notificationChannelReader = notificationChannel;
_logger = logger;
_configuration = configuration;
_connectionPool = connectionPool;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await foreach (var notification in _notificationChannelReader.ReadAllAsync(stoppingToken)
.ConfigureAwait(false))
{
PushNotification(notification);
}
}
private void PushNotification(DatabaseNotification notification)
{
var subject =
$"{_configuration.NatsSubject}.{notification.Customer.ToLower()}.{notification.Table?.ToLower()}.{notification.Action}";
var payload = Encoding.UTF8.GetBytes(notification.SerializedString);
for (var i = 0; i < 3; i++)
try
{
_connectionPool.Connection.Publish(subject, payload);
_configuration.LogObject.Success(notification.Customer);
break;
}
// ReSharper disable once CatchAllClause
catch (Exception e)
{
if (i < 2)
continue;
LogError(notification.Customer,
$"Fail to publishing for 3 times: [{notification.SerializedString}], error: {e.GetBaseException().Message}");
_configuration.LogObject.Fail(notification.Customer);
}
}
private void LogError(string customer, string message)
{
using (_logger.BeginScope(new Dictionary<string, object>
{
["customer"] = customer
}))
{
_logger.LogError(message);
}
}
} The response time is very fast, it's about 300~400 ms max. But about the nats client the publish process, can you look into my code and give me advice? About the Actix WebApp. |
Beta Was this translation helpful? Give feedback.
-
That looks more like what we would recommend. I'm not certain where the holdup is as we can easily push 100k 1KB messages/sec using just Publish. It would be interesting to see profiling data to find out where the holdup is. |
Beta Was this translation helpful? Give feedback.
-
@watfordgnf @ColinSullivan1 Please help me to check my using of NatsClient. Thank you! |
Beta Was this translation helpful? Give feedback.
-
I have a simple ASP.NET Core API project, hosted as a windows service.
I've tried to optimized anything I can (know), but sending 1000 requests (each request publish 20 messages via nats client) isn't as fast as expectation.
Here's a sample of what's I'm doing:
Then I create a small windows app that make requests.
I tried: 1000 requests, each requests with 30 Notifications
Running that test, give some response with high elapsed time (up to 2000ms)
I think the problem is with the connection, when I try above code with out publishing message (just make a task.delay(500)), the highest elapsed time is 600ms
My target: create an asp .net core webapi, that can handle up to 1000 requests, each request has up to 40 messages payload
Please help me.
Duong Nguyen
Beta Was this translation helpful? Give feedback.
All reactions