From 6812d0afc60b89fac4c3adc5a27c45910a2dde8e Mon Sep 17 00:00:00 2001 From: Tomasz Maruszak Date: Sun, 6 Oct 2024 15:07:56 +0200 Subject: [PATCH] Support for UUIDv7 values Signed-off-by: Tomasz Maruszak --- .../MessageBusBuilderExtensions.cs | 16 ++- .../DbContextOutboxRepository.cs | 18 ++- .../GlobalUsings.cs | 3 + .../MessageBusBuilderExtensions.cs | 28 ++++- .../ISqlMessageOutboxRepository.cs | 5 + .../ISqlOutboxRepository.cs | 5 - ...itory.cs => SqlOutboxMessageRepository.cs} | 110 +++++++++++++----- .../SqlOutboxMigrationService.cs | 2 +- .../MessageBusBuilderExtensions.cs | 3 +- .../Configuration/OutboxSettings.cs | 13 ++- .../IOutboxNotificationService.cs | 4 +- .../OutboxForwardingPublishInterceptor.cs | 49 +++----- .../Repositories/IOutboxMessageFactory.cs | 25 ++++ .../Repositories/IOutboxMessageRepository.cs | 11 ++ .../Repositories/IOutboxRepository.cs | 12 -- .../Repositories/OutboxMessage.cs | 4 +- .../Services/OutboxLockRenewalTimer.cs | 4 +- .../Services/OutboxSendingTask.cs | 25 ++-- .../SqlDialect.cs | 2 +- .../MessageProcessors/MessageProcessor.cs | 2 +- .../ServiceCollectionExtensions.cs | 7 ++ src/SlimMessageBus.Host/MessageBusBase.cs | 16 +-- .../Providers/GuidGenerator/GuidGenerator.cs | 9 ++ .../Providers/GuidGenerator/IGuidGenerator.cs | 10 ++ .../TimeProvider/CurrentTimeProvider.cs | 6 + .../TimeProvider}/ICurrentTimeProvider.cs | 0 .../ServiceBusMessageBusTests.cs | 1 + .../KafkaMessageBusTest.cs | 1 + .../MessageBusMock.cs | 15 ++- .../MemoryMessageBusTests.cs | 6 + .../OutboxBenchmarkTests.cs | 2 +- .../BaseSqlOutboxRepositoryTest.cs | 74 ++++++------ ...SlimMessageBus.Host.Outbox.Sql.Test.csproj | 1 + .../SqlOutboxRepositoryTests.cs | 31 +++-- ...OutboxForwardingPublishInterceptorTests.cs | 91 ++++++++++----- .../OutboxLockRenewalTimerTests.cs | 4 +- .../Services/OutboxSendingTaskTests.cs | 12 +- .../RedisMessageBusTest.cs | 1 + .../CurrentTimeProviderFake.cs | 6 + .../Consumer/MessageBusMock.cs | 4 +- .../Consumer/MessageHandlerTest.cs | 2 +- .../Hybrid/HybridMessageBusTest.cs | 1 + .../MessageBusBaseTests.cs | 12 +- .../MessageBusTested.cs | 9 +- 44 files changed, 440 insertions(+), 222 deletions(-) create mode 100644 src/SlimMessageBus.Host.Outbox.DbContext/GlobalUsings.cs create mode 100644 src/SlimMessageBus.Host.Outbox.Sql/ISqlMessageOutboxRepository.cs delete mode 100644 src/SlimMessageBus.Host.Outbox.Sql/ISqlOutboxRepository.cs rename src/SlimMessageBus.Host.Outbox.Sql/{SqlOutboxRepository.cs => SqlOutboxMessageRepository.cs} (67%) create mode 100644 src/SlimMessageBus.Host.Outbox/Repositories/IOutboxMessageFactory.cs create mode 100644 src/SlimMessageBus.Host.Outbox/Repositories/IOutboxMessageRepository.cs delete mode 100644 src/SlimMessageBus.Host.Outbox/Repositories/IOutboxRepository.cs create mode 100644 src/SlimMessageBus.Host/Providers/GuidGenerator/GuidGenerator.cs create mode 100644 src/SlimMessageBus.Host/Providers/GuidGenerator/IGuidGenerator.cs create mode 100644 src/SlimMessageBus.Host/Providers/TimeProvider/CurrentTimeProvider.cs rename src/SlimMessageBus.Host/{ => Providers/TimeProvider}/ICurrentTimeProvider.cs (100%) create mode 100644 src/Tests/SlimMessageBus.Host.Test.Common/CurrentTimeProviderFake.cs diff --git a/src/SlimMessageBus.Host.Outbox.DbContext/Configuration/MessageBusBuilderExtensions.cs b/src/SlimMessageBus.Host.Outbox.DbContext/Configuration/MessageBusBuilderExtensions.cs index 2ee8bbc0..2c15da78 100644 --- a/src/SlimMessageBus.Host.Outbox.DbContext/Configuration/MessageBusBuilderExtensions.cs +++ b/src/SlimMessageBus.Host.Outbox.DbContext/Configuration/MessageBusBuilderExtensions.cs @@ -1,7 +1,5 @@ namespace SlimMessageBus.Host.Outbox.DbContext; -using Microsoft.Extensions.DependencyInjection.Extensions; - using SlimMessageBus.Host; using SlimMessageBus.Host.Outbox.Sql; using SlimMessageBus.Host.Sql.Common; @@ -14,6 +12,20 @@ public static MessageBusBuilder AddOutboxUsingDbContext(this Message mbb.PostConfigurationActions.Add(services => { services.TryAddScoped>(); + services.TryAddScoped(svp => + { + var settings = svp.GetRequiredService(); + return new DbContextOutboxRepository( + svp.GetRequiredService>>(), + settings, + svp.GetRequiredService(), + settings.GuidGenerator ?? (IGuidGenerator)svp.GetRequiredService(settings.GuidGeneratorType), + svp.GetRequiredService(), + svp.GetRequiredService(), + svp.GetRequiredService(), + svp.GetRequiredService() + ); + }); }); return mbb.AddOutboxUsingSql>(configure); } diff --git a/src/SlimMessageBus.Host.Outbox.DbContext/DbContextOutboxRepository.cs b/src/SlimMessageBus.Host.Outbox.DbContext/DbContextOutboxRepository.cs index d9deb1b7..508775ce 100644 --- a/src/SlimMessageBus.Host.Outbox.DbContext/DbContextOutboxRepository.cs +++ b/src/SlimMessageBus.Host.Outbox.DbContext/DbContextOutboxRepository.cs @@ -2,22 +2,32 @@ using Microsoft.Data.SqlClient; using Microsoft.EntityFrameworkCore; -using Microsoft.Extensions.Logging; using SlimMessageBus.Host.Outbox.Sql; using SlimMessageBus.Host.Sql.Common; -public class DbContextOutboxRepository : SqlOutboxRepository where TDbContext : DbContext +public class DbContextOutboxRepository : SqlOutboxMessageRepository where TDbContext : DbContext { public TDbContext DbContext { get; } public DbContextOutboxRepository( - ILogger logger, + ILogger logger, SqlOutboxSettings settings, SqlOutboxTemplate sqlOutboxTemplate, + IGuidGenerator guidGenerator, + ICurrentTimeProvider currentTimeProvider, + IInstanceIdProvider instanceIdProvider, TDbContext dbContext, ISqlTransactionService transactionService) - : base(logger, settings, sqlOutboxTemplate, (SqlConnection)dbContext.Database.GetDbConnection(), transactionService) + : base( + logger, + settings, + sqlOutboxTemplate, + guidGenerator, + currentTimeProvider, + instanceIdProvider, + (SqlConnection)dbContext.Database.GetDbConnection(), + transactionService) { DbContext = dbContext; } diff --git a/src/SlimMessageBus.Host.Outbox.DbContext/GlobalUsings.cs b/src/SlimMessageBus.Host.Outbox.DbContext/GlobalUsings.cs new file mode 100644 index 00000000..912a0d02 --- /dev/null +++ b/src/SlimMessageBus.Host.Outbox.DbContext/GlobalUsings.cs @@ -0,0 +1,3 @@ +global using Microsoft.Extensions.DependencyInjection; +global using Microsoft.Extensions.DependencyInjection.Extensions; +global using Microsoft.Extensions.Logging; diff --git a/src/SlimMessageBus.Host.Outbox.Sql/Configuration/MessageBusBuilderExtensions.cs b/src/SlimMessageBus.Host.Outbox.Sql/Configuration/MessageBusBuilderExtensions.cs index 19676168..e9e86a16 100644 --- a/src/SlimMessageBus.Host.Outbox.Sql/Configuration/MessageBusBuilderExtensions.cs +++ b/src/SlimMessageBus.Host.Outbox.Sql/Configuration/MessageBusBuilderExtensions.cs @@ -3,7 +3,7 @@ public static class MessageBusBuilderExtensions { public static MessageBusBuilder AddOutboxUsingSql(this MessageBusBuilder mbb, Action configure) - where TOutboxRepository : class, ISqlOutboxRepository + where TOutboxRepository : class, ISqlMessageOutboxRepository { mbb.AddOutbox(); @@ -33,11 +33,10 @@ public static MessageBusBuilder AddOutboxUsingSql(this Messag services.TryAddEnumerable(ServiceDescriptor.Transient(serviceType, implementationType)); } - services.TryAddScoped(); services.TryAddScoped(); - services.Replace(ServiceDescriptor.Scoped(svp => svp.GetRequiredService())); - services.Replace(ServiceDescriptor.Scoped(svp => svp.GetRequiredService())); + services.Replace(ServiceDescriptor.Scoped(svp => svp.GetRequiredService())); + services.Replace(ServiceDescriptor.Scoped(svp => svp.GetRequiredService())); services.TryAddSingleton(); services.TryAddTransient(); @@ -46,5 +45,24 @@ public static MessageBusBuilder AddOutboxUsingSql(this Messag } public static MessageBusBuilder AddOutboxUsingSql(this MessageBusBuilder mbb, Action configure) - => mbb.AddOutboxUsingSql(configure); + { + mbb.PostConfigurationActions.Add(services => + { + services.TryAddScoped(svp => + { + var settings = svp.GetRequiredService(); + return new SqlOutboxMessageRepository( + svp.GetRequiredService>(), + settings, + svp.GetRequiredService(), + settings.GuidGenerator ?? (IGuidGenerator)svp.GetRequiredService(settings.GuidGeneratorType), + svp.GetRequiredService(), + svp.GetRequiredService(), + svp.GetRequiredService(), + svp.GetRequiredService() + ); + }); + }); + return mbb.AddOutboxUsingSql(configure); + } } diff --git a/src/SlimMessageBus.Host.Outbox.Sql/ISqlMessageOutboxRepository.cs b/src/SlimMessageBus.Host.Outbox.Sql/ISqlMessageOutboxRepository.cs new file mode 100644 index 00000000..9f1e1b91 --- /dev/null +++ b/src/SlimMessageBus.Host.Outbox.Sql/ISqlMessageOutboxRepository.cs @@ -0,0 +1,5 @@ +namespace SlimMessageBus.Host.Outbox.Sql; + +public interface ISqlMessageOutboxRepository : IOutboxMessageRepository +{ +} \ No newline at end of file diff --git a/src/SlimMessageBus.Host.Outbox.Sql/ISqlOutboxRepository.cs b/src/SlimMessageBus.Host.Outbox.Sql/ISqlOutboxRepository.cs deleted file mode 100644 index cb124ccb..00000000 --- a/src/SlimMessageBus.Host.Outbox.Sql/ISqlOutboxRepository.cs +++ /dev/null @@ -1,5 +0,0 @@ -namespace SlimMessageBus.Host.Outbox.Sql; - -public interface ISqlOutboxRepository : IOutboxRepository -{ -} \ No newline at end of file diff --git a/src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxRepository.cs b/src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxMessageRepository.cs similarity index 67% rename from src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxRepository.cs rename to src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxMessageRepository.cs index 6501bb99..22760490 100644 --- a/src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxRepository.cs +++ b/src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxMessageRepository.cs @@ -1,23 +1,64 @@ namespace SlimMessageBus.Host.Outbox.Sql; -public class SqlOutboxRepository : CommonSqlRepository, ISqlOutboxRepository +/// +/// The MS SQL implmentation of the +/// +public class SqlOutboxMessageRepository : CommonSqlRepository, ISqlMessageOutboxRepository { + /// + /// Used to serialize the headers dictionary to JSON + /// + private static readonly JsonSerializerOptions _jsonOptions = new() + { + Converters = { new ObjectToInferredTypesConverter() } + }; + private readonly SqlOutboxTemplate _sqlTemplate; - private readonly JsonSerializerOptions _jsonOptions; + private readonly IGuidGenerator _guidGenerator; + private readonly ICurrentTimeProvider _currentTimeProvider; + private readonly IInstanceIdProvider _instanceIdProvider; protected SqlOutboxSettings Settings { get; } - public SqlOutboxRepository(ILogger logger, SqlOutboxSettings settings, SqlOutboxTemplate sqlOutboxTemplate, SqlConnection connection, ISqlTransactionService transactionService) + public SqlOutboxMessageRepository( + ILogger logger, + SqlOutboxSettings settings, + SqlOutboxTemplate sqlOutboxTemplate, + IGuidGenerator guidGenerator, + ICurrentTimeProvider currentTimeProvider, + IInstanceIdProvider instanceIdProvider, + SqlConnection connection, + ISqlTransactionService transactionService) : base(logger, settings.SqlSettings, connection, transactionService) { _sqlTemplate = sqlOutboxTemplate; - _jsonOptions = new(); - _jsonOptions.Converters.Add(new ObjectToInferredTypesConverter()); - + _guidGenerator = guidGenerator; + _currentTimeProvider = currentTimeProvider; + _instanceIdProvider = instanceIdProvider; Settings = settings; } - public async virtual Task Save(OutboxMessage message, CancellationToken token) + public virtual async Task Create(string busName, IDictionary headers, string path, string messageType, byte[] messagePayload, CancellationToken cancellationToken) + { + var outboxMessage = new OutboxMessage + { + Id = _guidGenerator.NewGuid(), + Timestamp = _currentTimeProvider.CurrentTime.DateTime, + InstanceId = _instanceIdProvider.GetInstanceId(), + + BusName = busName, + Headers = headers, + Path = path, + MessageType = messageType, + MessagePayload = messagePayload, + }; + + await Save(outboxMessage, cancellationToken); + + return outboxMessage.Id; + } + + protected async virtual Task Save(OutboxMessage message, CancellationToken cancellationToken) { await EnsureConnection(); @@ -36,10 +77,10 @@ await ExecuteNonQuery(Settings.SqlSettings.OperationRetry, _sqlTemplate.SqlOutbo cmd.Parameters.Add("@DeliveryAttempt", SqlDbType.Int).Value = message.DeliveryAttempt; cmd.Parameters.Add("@DeliveryComplete", SqlDbType.Bit).Value = message.DeliveryComplete; cmd.Parameters.Add("@DeliveryAborted", SqlDbType.Bit).Value = message.DeliveryAborted; - }, token); + }, cancellationToken); } - public async Task> LockAndSelect(string instanceId, int batchSize, bool tableLock, TimeSpan lockDuration, CancellationToken token) + public async Task> LockAndSelect(string instanceId, int batchSize, bool tableLock, TimeSpan lockDuration, CancellationToken cancellationToken) { await EnsureConnection(); @@ -49,10 +90,10 @@ public async Task> LockAndSelect(string insta cmd.Parameters.Add("@BatchSize", SqlDbType.Int).Value = batchSize; cmd.Parameters.Add("@LockDuration", SqlDbType.Int).Value = lockDuration.TotalSeconds; - return await ReadMessages(cmd, token).ConfigureAwait(false); + return await SqlOutboxMessageRepository.ReadMessages(cmd, cancellationToken).ConfigureAwait(false); } - public async Task AbortDelivery(IReadOnlyCollection ids, CancellationToken token) + public async Task AbortDelivery(IReadOnlyCollection ids, CancellationToken cancellationToken) { if (ids.Count == 0) { @@ -67,7 +108,7 @@ public async Task AbortDelivery(IReadOnlyCollection ids, CancellationToken { cmd.Parameters.AddWithValue("@Ids", ToIdsString(ids)); }, - token: token); + token: cancellationToken); if (affected != ids.Count) { @@ -75,7 +116,7 @@ public async Task AbortDelivery(IReadOnlyCollection ids, CancellationToken } } - public async Task UpdateToSent(IReadOnlyCollection ids, CancellationToken token) + public async Task UpdateToSent(IReadOnlyCollection ids, CancellationToken cancellationToken) { if (ids.Count == 0) { @@ -90,7 +131,7 @@ public async Task UpdateToSent(IReadOnlyCollection ids, CancellationToken { cmd.Parameters.AddWithValue("@Ids", ToIdsString(ids)); }, - token: token); + token: cancellationToken); if (affected != ids.Count) { @@ -100,7 +141,7 @@ public async Task UpdateToSent(IReadOnlyCollection ids, CancellationToken private string ToIdsString(IReadOnlyCollection ids) => string.Join(_sqlTemplate.InIdsSeparator, ids); - public async Task IncrementDeliveryAttempt(IReadOnlyCollection ids, int maxDeliveryAttempts, CancellationToken token) + public async Task IncrementDeliveryAttempt(IReadOnlyCollection ids, int maxDeliveryAttempts, CancellationToken cancellationToken) { if (ids.Count == 0) { @@ -121,7 +162,7 @@ public async Task IncrementDeliveryAttempt(IReadOnlyCollection ids, int ma cmd.Parameters.AddWithValue("@Ids", ToIdsString(ids)); cmd.Parameters.AddWithValue("@MaxDeliveryAttempts", maxDeliveryAttempts); }, - token: token); + token: cancellationToken); if (affected != ids.Count) { @@ -129,20 +170,20 @@ public async Task IncrementDeliveryAttempt(IReadOnlyCollection ids, int ma } } - public async Task DeleteSent(DateTime olderThan, CancellationToken token) + public async Task DeleteSent(DateTime olderThan, CancellationToken cancellationToken) { await EnsureConnection(); var affected = await ExecuteNonQuery( Settings.SqlSettings.OperationRetry, _sqlTemplate.SqlOutboxMessageDeleteSent, - cmd => cmd.Parameters.Add("@Timestamp", SqlDbType.DateTime2).Value = olderThan, - token); + cmd => cmd.Parameters.Add("@Timestamp", SqlDbType.DateTimeOffset).Value = olderThan, + cancellationToken); Logger.Log(affected > 0 ? LogLevel.Information : LogLevel.Debug, "Removed {MessageCount} sent messages from outbox table", affected); } - public async Task RenewLock(string instanceId, TimeSpan lockDuration, CancellationToken token) + public async Task RenewLock(string instanceId, TimeSpan lockDuration, CancellationToken cancellationToken) { await EnsureConnection(); @@ -151,7 +192,7 @@ public async Task RenewLock(string instanceId, TimeSpan lockDuration, Canc cmd.Parameters.Add("@InstanceId", SqlDbType.NVarChar).Value = instanceId; cmd.Parameters.Add("@LockDuration", SqlDbType.Int).Value = lockDuration.TotalSeconds; - return await cmd.ExecuteNonQueryAsync(token) > 0; + return await cmd.ExecuteNonQueryAsync(cancellationToken) > 0; } internal async Task> GetAllMessages(CancellationToken cancellationToken) @@ -164,7 +205,7 @@ internal async Task> GetAllMessages(Cancellat return await ReadMessages(cmd, cancellationToken).ConfigureAwait(false); } - private async Task> ReadMessages(SqlCommand cmd, CancellationToken cancellationToken) + private static async Task> ReadMessages(SqlCommand cmd, CancellationToken cancellationToken) { using var reader = await cmd.ExecuteReaderAsync(cancellationToken); @@ -185,23 +226,34 @@ private async Task> ReadMessages(SqlCommand c var items = new List(); while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false)) { - var id = reader.GetGuid(idOrdinal); - var headers = reader.IsDBNull(headersOrdinal) ? null : reader.GetString(headersOrdinal); + var headers = reader.IsDBNull(headersOrdinal) + ? null + : reader.GetString(headersOrdinal); var message = new OutboxMessage { - Id = id, +#pragma warning disable 1998 + Id = reader.GetGuid(idOrdinal), Timestamp = reader.GetDateTime(timestampOrdinal), BusName = reader.GetString(busNameOrdinal), MessageType = reader.GetString(typeOrdinal), MessagePayload = reader.GetSqlBinary(payloadOrdinal).Value, - Headers = headers == null ? null : JsonSerializer.Deserialize>(headers, _jsonOptions), - Path = reader.IsDBNull(pathOrdinal) ? null : reader.GetString(pathOrdinal), + Headers = headers == null + ? null + : JsonSerializer.Deserialize>(headers, _jsonOptions), + Path = reader.IsDBNull(pathOrdinal) + ? null + : reader.GetString(pathOrdinal), InstanceId = reader.GetString(instanceIdOrdinal), - LockInstanceId = reader.IsDBNull(lockInstanceIdOrdinal) ? null : reader.GetString(lockInstanceIdOrdinal), - LockExpiresOn = reader.IsDBNull(lockExpiresOnOrdinal) ? null : reader.GetDateTime(lockExpiresOnOrdinal), + LockInstanceId = reader.IsDBNull(lockInstanceIdOrdinal) + ? null + : reader.GetString(lockInstanceIdOrdinal), + LockExpiresOn = reader.IsDBNull(lockExpiresOnOrdinal) + ? null + : reader.GetDateTime(lockExpiresOnOrdinal), DeliveryAttempt = reader.GetInt32(deliveryAttemptOrdinal), DeliveryComplete = reader.GetBoolean(deliveryCompleteOrdinal), DeliveryAborted = reader.GetBoolean(deliveryAbortedOrdinal) +#pragma warning restore 1998 }; items.Add(message); diff --git a/src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxMigrationService.cs b/src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxMigrationService.cs index dbcea818..afca4874 100644 --- a/src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxMigrationService.cs +++ b/src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxMigrationService.cs @@ -2,7 +2,7 @@ public class SqlOutboxMigrationService : CommonSqlMigrationService, IOutboxMigrationService { - public SqlOutboxMigrationService(ILogger logger, ISqlOutboxRepository repository, ISqlTransactionService transactionService, SqlOutboxSettings settings) + public SqlOutboxMigrationService(ILogger logger, ISqlMessageOutboxRepository repository, ISqlTransactionService transactionService, SqlOutboxSettings settings) : base(logger, (CommonSqlRepository)repository, transactionService, settings.SqlSettings) { } diff --git a/src/SlimMessageBus.Host.Outbox/Configuration/MessageBusBuilderExtensions.cs b/src/SlimMessageBus.Host.Outbox/Configuration/MessageBusBuilderExtensions.cs index 0c9368db..6bded940 100644 --- a/src/SlimMessageBus.Host.Outbox/Configuration/MessageBusBuilderExtensions.cs +++ b/src/SlimMessageBus.Host.Outbox/Configuration/MessageBusBuilderExtensions.cs @@ -35,10 +35,11 @@ public static MessageBusBuilder AddOutbox(this MessageBusBuilder mbb, Action(); services.TryAddEnumerable(ServiceDescriptor.Singleton(sp => sp.GetRequiredService())); - services.TryAdd(ServiceDescriptor.Singleton(sp => sp.GetRequiredService())); + services.TryAddSingleton(sp => sp.GetRequiredService()); services.TryAddSingleton(); services.TryAddSingleton(); + services.TryAddScoped(svp => svp.GetRequiredService()); services.TryAddSingleton(svp => { diff --git a/src/SlimMessageBus.Host.Outbox/Configuration/OutboxSettings.cs b/src/SlimMessageBus.Host.Outbox/Configuration/OutboxSettings.cs index a1c46f86..eff3059b 100644 --- a/src/SlimMessageBus.Host.Outbox/Configuration/OutboxSettings.cs +++ b/src/SlimMessageBus.Host.Outbox/Configuration/OutboxSettings.cs @@ -36,9 +36,20 @@ public class OutboxSettings /// Sent message cleanup settings. /// public OutboxMessageCleanupSettings MessageCleanup { get; set; } = new(); - /// /// Type resolver which is responsible for converting message type into the Outbox table column MessageType /// public IMessageTypeResolver MessageTypeResolver { get; set; } = new AssemblyQualifiedNameMessageTypeResolver(); + /// + /// The type to resolve from MSDI that implementes the . + /// Default is . + /// Guid generator is used to generate unique identifiers for the outbox messages. + /// + public Type GuidGeneratorType { get; set; } = typeof(IGuidGenerator); + /// + /// The instance of to use (if specified). + /// Default is null. + /// Guid generator is used to generate unique identifiers for the outbox messages. + /// + public IGuidGenerator GuidGenerator { get; set; } = null; } diff --git a/src/SlimMessageBus.Host.Outbox/IOutboxNotificationService.cs b/src/SlimMessageBus.Host.Outbox/IOutboxNotificationService.cs index 1bc9414f..1ff002f0 100644 --- a/src/SlimMessageBus.Host.Outbox/IOutboxNotificationService.cs +++ b/src/SlimMessageBus.Host.Outbox/IOutboxNotificationService.cs @@ -1,5 +1,5 @@ -namespace SlimMessageBus.Host.Outbox.Services; - +namespace SlimMessageBus.Host.Outbox; + public interface IOutboxNotificationService { /// diff --git a/src/SlimMessageBus.Host.Outbox/Interceptors/OutboxForwardingPublishInterceptor.cs b/src/SlimMessageBus.Host.Outbox/Interceptors/OutboxForwardingPublishInterceptor.cs index 40921b8d..1d9ceef3 100644 --- a/src/SlimMessageBus.Host.Outbox/Interceptors/OutboxForwardingPublishInterceptor.cs +++ b/src/SlimMessageBus.Host.Outbox/Interceptors/OutboxForwardingPublishInterceptor.cs @@ -1,9 +1,5 @@ namespace SlimMessageBus.Host.Outbox; -using Microsoft.Extensions.Logging; - -using SlimMessageBus.Host.Outbox.Services; - public abstract class OutboxForwardingPublishInterceptor { } @@ -14,20 +10,14 @@ public abstract class OutboxForwardingPublishInterceptor /// public sealed class OutboxForwardingPublishInterceptor( ILogger logger, - IOutboxRepository outboxRepository, - IInstanceIdProvider instanceIdProvider, + IOutboxMessageFactory outboxMessageFactory, IOutboxNotificationService outboxNotificationService, OutboxSettings outboxSettings) - : OutboxForwardingPublishInterceptor, IInterceptorWithOrder, IPublishInterceptor, IDisposable where T : class + : OutboxForwardingPublishInterceptor, IInterceptorWithOrder, IPublishInterceptor, IDisposable + where T : class { internal const string SkipOutboxHeader = "__SkipOutbox"; - private readonly ILogger _logger = logger; - private readonly IOutboxRepository _outboxRepository = outboxRepository; - private readonly IInstanceIdProvider _instanceIdProvider = instanceIdProvider; - private readonly IOutboxNotificationService _outboxNotificationService = outboxNotificationService; - private readonly OutboxSettings _outboxSettings = outboxSettings; - private bool _notifyOutbox = false; public int Order => int.MaxValue; @@ -36,7 +26,7 @@ public void Dispose() { if (_notifyOutbox) { - _outboxNotificationService.Notify(); + outboxNotificationService.Notify(); } GC.SuppressFinalize(this); @@ -58,28 +48,25 @@ public async Task OnHandle(T message, Func next, IProducerContext context) return; } - // Forward to outbox - var messageType = message.GetType(); - - _logger.LogDebug("Forwarding published message of type {MessageType} to the outbox", messageType.Name); + // Forward to outbox - do not call next() + var messageType = message.GetType(); // Take the proper serializer (meant for the bus) var messagePayload = busMaster.Serializer?.Serialize(messageType, message) - ?? throw new PublishMessageBusException($"The {busMaster.Name} bus has no configured serializer, so it cannot be used with the outbox plugin"); + ?? throw new PublishMessageBusException($"The {busMaster.Name} bus has no configured serializer, so it cannot be used with the outbox plugin"); - // Add message to the database, do not call next() - var outboxMessage = new OutboxMessage - { - BusName = busMaster.Name, - Headers = context.Headers, - Path = context.Path, - MessageType = _outboxSettings.MessageTypeResolver.ToName(messageType), - MessagePayload = messagePayload, - InstanceId = _instanceIdProvider.GetInstanceId() - }; - await _outboxRepository.Save(outboxMessage, context.CancellationToken); + var outboxMessageId = await outboxMessageFactory.Create( + busName: busMaster.Name, + headers: context.Headers, + path: context.Path, + messageType: outboxSettings.MessageTypeResolver.ToName(messageType), + messagePayload: messagePayload, + cancellationToken: context.CancellationToken + ); + + logger.LogDebug("Forwarding published message of type {MessageType} to the outbox with Id {OutboxMessageId}", messageType.Name, outboxMessageId); - // a message was sent, notify outbox service to poll on dispose (post transaction) + // A message was sent, notify outbox service to poll on dispose (post transaction) _notifyOutbox = true; } } diff --git a/src/SlimMessageBus.Host.Outbox/Repositories/IOutboxMessageFactory.cs b/src/SlimMessageBus.Host.Outbox/Repositories/IOutboxMessageFactory.cs new file mode 100644 index 00000000..3b09717a --- /dev/null +++ b/src/SlimMessageBus.Host.Outbox/Repositories/IOutboxMessageFactory.cs @@ -0,0 +1,25 @@ +namespace SlimMessageBus.Host.Outbox; + +/// +/// Factory for creating outbox messages +/// +public interface IOutboxMessageFactory +{ + /// + /// Create a new outbox message and store it using the underlying store + /// + /// + /// + /// + /// + /// + /// + /// ID of the . + Task Create( + string busName, + IDictionary headers, + string path, + string messageType, + byte[] messagePayload, + CancellationToken cancellationToken); +} diff --git a/src/SlimMessageBus.Host.Outbox/Repositories/IOutboxMessageRepository.cs b/src/SlimMessageBus.Host.Outbox/Repositories/IOutboxMessageRepository.cs new file mode 100644 index 00000000..9f38a51e --- /dev/null +++ b/src/SlimMessageBus.Host.Outbox/Repositories/IOutboxMessageRepository.cs @@ -0,0 +1,11 @@ +namespace SlimMessageBus.Host.Outbox; + +public interface IOutboxMessageRepository : IOutboxMessageFactory +{ + Task> LockAndSelect(string instanceId, int batchSize, bool tableLock, TimeSpan lockDuration, CancellationToken cancellationToken); + Task AbortDelivery(IReadOnlyCollection ids, CancellationToken cancellationToken); + Task UpdateToSent(IReadOnlyCollection ids, CancellationToken cancellationToken); + Task IncrementDeliveryAttempt(IReadOnlyCollection ids, int maxDeliveryAttempts, CancellationToken cancellationToken); + Task DeleteSent(DateTime olderThan, CancellationToken cancellationToken); + Task RenewLock(string instanceId, TimeSpan lockDuration, CancellationToken cancellationToken); +} diff --git a/src/SlimMessageBus.Host.Outbox/Repositories/IOutboxRepository.cs b/src/SlimMessageBus.Host.Outbox/Repositories/IOutboxRepository.cs deleted file mode 100644 index a8772916..00000000 --- a/src/SlimMessageBus.Host.Outbox/Repositories/IOutboxRepository.cs +++ /dev/null @@ -1,12 +0,0 @@ -namespace SlimMessageBus.Host.Outbox; - -public interface IOutboxRepository -{ - Task Save(OutboxMessage message, CancellationToken token); - Task> LockAndSelect(string instanceId, int batchSize, bool tableLock, TimeSpan lockDuration, CancellationToken token); - Task AbortDelivery (IReadOnlyCollection ids, CancellationToken token); - Task UpdateToSent(IReadOnlyCollection ids, CancellationToken token); - Task IncrementDeliveryAttempt(IReadOnlyCollection ids, int maxDeliveryAttempts, CancellationToken token); - Task DeleteSent(DateTime olderThan, CancellationToken token); - Task RenewLock(string instanceId, TimeSpan lockDuration, CancellationToken token); -} diff --git a/src/SlimMessageBus.Host.Outbox/Repositories/OutboxMessage.cs b/src/SlimMessageBus.Host.Outbox/Repositories/OutboxMessage.cs index 01aedcdb..1a5d89bf 100644 --- a/src/SlimMessageBus.Host.Outbox/Repositories/OutboxMessage.cs +++ b/src/SlimMessageBus.Host.Outbox/Repositories/OutboxMessage.cs @@ -2,8 +2,8 @@ public class OutboxMessage { - public Guid Id { get; set; } = Guid.NewGuid(); - public DateTime Timestamp { get; set; } = DateTime.UtcNow; + public Guid Id { get; set; } + public DateTime Timestamp { get; set; } public string BusName { get; set; } public string MessageType { get; set; } public byte[] MessagePayload { get; set; } diff --git a/src/SlimMessageBus.Host.Outbox/Services/OutboxLockRenewalTimer.cs b/src/SlimMessageBus.Host.Outbox/Services/OutboxLockRenewalTimer.cs index 16ccae19..3ff024bf 100644 --- a/src/SlimMessageBus.Host.Outbox/Services/OutboxLockRenewalTimer.cs +++ b/src/SlimMessageBus.Host.Outbox/Services/OutboxLockRenewalTimer.cs @@ -6,13 +6,13 @@ public sealed class OutboxLockRenewalTimer : IOutboxLockRenewalTimer private readonly object _lock; private readonly Timer _timer; private readonly ILogger _logger; - private readonly IOutboxRepository _outboxRepository; + private readonly IOutboxMessageRepository _outboxRepository; private readonly CancellationToken _cancellationToken; private readonly Action _lockLost; private bool _active; private bool _renewingLock; - public OutboxLockRenewalTimer(ILogger logger, IOutboxRepository outboxRepository, IInstanceIdProvider instanceIdProvider, TimeSpan lockDuration, TimeSpan lockRenewalInterval, Action lockLost, CancellationToken cancellationToken) + public OutboxLockRenewalTimer(ILogger logger, IOutboxMessageRepository outboxRepository, IInstanceIdProvider instanceIdProvider, TimeSpan lockDuration, TimeSpan lockRenewalInterval, Action lockLost, CancellationToken cancellationToken) { Debug.Assert(lockRenewalInterval < lockDuration); diff --git a/src/SlimMessageBus.Host.Outbox/Services/OutboxSendingTask.cs b/src/SlimMessageBus.Host.Outbox/Services/OutboxSendingTask.cs index 843a3f0f..85513c52 100644 --- a/src/SlimMessageBus.Host.Outbox/Services/OutboxSendingTask.cs +++ b/src/SlimMessageBus.Host.Outbox/Services/OutboxSendingTask.cs @@ -1,12 +1,12 @@ namespace SlimMessageBus.Host.Outbox.Services; -using SlimMessageBus; using SlimMessageBus.Host; using SlimMessageBus.Host.Outbox; - + internal class OutboxSendingTask( ILoggerFactory loggerFactory, - OutboxSettings outboxSettings, + OutboxSettings outboxSettings, + ICurrentTimeProvider currentTimeProvider, IServiceProvider serviceProvider) : IMessageBusLifecycleInterceptor, IOutboxNotificationService, IAsyncDisposable { @@ -25,16 +25,17 @@ internal class OutboxSendingTask( private int _busStartCount; - private DateTime? _cleanupNextRun; + private DateTimeOffset? _cleanupNextRun; private bool ShouldRunCleanup() { if (_outboxSettings.MessageCleanup?.Enabled == true) - { - var trigger = !_cleanupNextRun.HasValue || DateTime.UtcNow > _cleanupNextRun.Value; + { + var currentTime = currentTimeProvider.CurrentTime; + var trigger = !_cleanupNextRun.HasValue || currentTime > _cleanupNextRun.Value; if (trigger) { - _cleanupNextRun = DateTime.UtcNow.Add(_outboxSettings.MessageCleanup.Interval); + _cleanupNextRun = currentTime.Add(_outboxSettings.MessageCleanup.Interval); } return trigger; @@ -160,7 +161,7 @@ private async Task Run() try { await EnsureMigrateSchema(scope.ServiceProvider, _loopCts.Token); - var outboxRepository = scope.ServiceProvider.GetRequiredService(); + var outboxRepository = scope.ServiceProvider.GetRequiredService(); do { if (_loopCts.Token.IsCancellationRequested) @@ -175,7 +176,7 @@ private async Task Run() if (!_loopCts.IsCancellationRequested && ShouldRunCleanup()) { _logger.LogTrace("Running cleanup of sent messages"); - await outboxRepository.DeleteSent(DateTime.UtcNow.Add(-_outboxSettings.MessageCleanup.Age), _loopCts.Token).ConfigureAwait(false); + await outboxRepository.DeleteSent(currentTimeProvider.CurrentTime.DateTime.Add(-_outboxSettings.MessageCleanup.Age), _loopCts.Token).ConfigureAwait(false); } } catch (Exception e) @@ -208,7 +209,7 @@ private async Task Run() } } - async internal Task SendMessages(IServiceProvider serviceProvider, IOutboxRepository outboxRepository, CancellationToken cancellationToken) + async internal Task SendMessages(IServiceProvider serviceProvider, IOutboxMessageRepository outboxRepository, CancellationToken cancellationToken) { var lockDuration = TimeSpan.FromSeconds(Math.Min(Math.Max(_outboxSettings.LockExpiration.TotalSeconds, 5), 30)); if (lockDuration != _outboxSettings.LockExpiration) @@ -252,7 +253,7 @@ async internal Task SendMessages(IServiceProvider serviceProvider, IOutboxR return count; } - async internal Task<(bool RunAgain, int Count)> ProcessMessages(IOutboxRepository outboxRepository, IReadOnlyCollection outboxMessages, ICompositeMessageBus compositeMessageBus, IMessageBusTarget messageBusTarget, CancellationToken cancellationToken) + async internal Task<(bool RunAgain, int Count)> ProcessMessages(IOutboxMessageRepository outboxRepository, IReadOnlyCollection outboxMessages, ICompositeMessageBus compositeMessageBus, IMessageBusTarget messageBusTarget, CancellationToken cancellationToken) { const int defaultBatchSize = 50; @@ -325,7 +326,7 @@ async internal Task SendMessages(IServiceProvider serviceProvider, IOutboxR return (runAgain, count); } - async internal Task<(bool Success, int Published)> DispatchBatch(IOutboxRepository outboxRepository, IMessageBusBulkProducer producer, IMessageBusTarget messageBusTarget, IReadOnlyCollection batch, string busName, string path, CancellationToken cancellationToken) + async internal Task<(bool Success, int Published)> DispatchBatch(IOutboxMessageRepository outboxRepository, IMessageBusBulkProducer producer, IMessageBusTarget messageBusTarget, IReadOnlyCollection batch, string busName, string path, CancellationToken cancellationToken) { _logger.LogDebug("Publishing batch of {MessageCount} messages to pathGroup {Path} on {BusName} bus", batch.Count, path, busName); diff --git a/src/SlimMessageBus.Host.Sql.Common/SqlDialect.cs b/src/SlimMessageBus.Host.Sql.Common/SqlDialect.cs index 268385a6..beb552d2 100644 --- a/src/SlimMessageBus.Host.Sql.Common/SqlDialect.cs +++ b/src/SlimMessageBus.Host.Sql.Common/SqlDialect.cs @@ -2,6 +2,6 @@ public enum SqlDialect { - SqlServer = 1 + SqlServer = 1, // More to come } \ No newline at end of file diff --git a/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageProcessor.cs b/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageProcessor.cs index dcf7af8a..ac37cad8 100644 --- a/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageProcessor.cs +++ b/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageProcessor.cs @@ -37,7 +37,7 @@ public MessageProcessor( messageTypeResolver: messageBus.MessageTypeResolver, messageHeadersFactory: messageBus, runtimeTypeCache: messageBus.RuntimeTypeCache, - currentTimeProvider: messageBus, + currentTimeProvider: messageBus.CurrentTimeProvider, path: path, consumerErrorHandlerOpenGenericType) { diff --git a/src/SlimMessageBus.Host/DependencyResolver/ServiceCollectionExtensions.cs b/src/SlimMessageBus.Host/DependencyResolver/ServiceCollectionExtensions.cs index d8fbaac4..8c658dd8 100644 --- a/src/SlimMessageBus.Host/DependencyResolver/ServiceCollectionExtensions.cs +++ b/src/SlimMessageBus.Host/DependencyResolver/ServiceCollectionExtensions.cs @@ -94,6 +94,13 @@ public static IServiceCollection AddSlimMessageBus(this IServiceCollection servi services.AddHostedService(); + // Register the default providers + services.TryAddSingleton(); + services.TryAddSingleton(svp => svp.GetRequiredService()); + + services.TryAddSingleton(); + services.TryAddSingleton(svp => svp.GetRequiredService()); + return services; } diff --git a/src/SlimMessageBus.Host/MessageBusBase.cs b/src/SlimMessageBus.Host/MessageBusBase.cs index b4b4cf00..3fc9223f 100644 --- a/src/SlimMessageBus.Host/MessageBusBase.cs +++ b/src/SlimMessageBus.Host/MessageBusBase.cs @@ -15,13 +15,13 @@ protected MessageBusBase(MessageBusSettings settings, TProviderSettings provider } } -public abstract class MessageBusBase : IDisposable, IAsyncDisposable, IMasterMessageBus, IMessageScopeFactory, IMessageHeadersFactory, ICurrentTimeProvider, IResponseProducer, IResponseConsumer, IMessageBusBulkProducer +public abstract class MessageBusBase : IDisposable, IAsyncDisposable, IMasterMessageBus, IMessageScopeFactory, IMessageHeadersFactory, IResponseProducer, IResponseConsumer, IMessageBusBulkProducer { private readonly ILogger _logger; private CancellationTokenSource _cancellationTokenSource = new(); private IMessageSerializer _serializer; private readonly MessageHeaderService _headerService; - private readonly List _consumers = []; + private readonly List _consumers = []; /// /// Special market reference that signifies a dummy producer settings for response types. @@ -101,7 +101,9 @@ protected MessageBusBase(MessageBusSettings settings) RuntimeTypeCache = new RuntimeTypeCache(); - MessageBusTarget = new MessageBusProxy(this, Settings.ServiceProvider); + MessageBusTarget = new MessageBusProxy(this, Settings.ServiceProvider); + + CurrentTimeProvider = settings.ServiceProvider.GetRequiredService(); } protected void AddInit(Task task) @@ -174,7 +176,7 @@ protected virtual void Build() protected virtual void BuildPendingRequestStore() { PendingRequestStore = new InMemoryPendingRequestStore(); - PendingRequestManager = new PendingRequestManager(PendingRequestStore, () => CurrentTime, TimeSpan.FromSeconds(1), LoggerFactory); + PendingRequestManager = new PendingRequestManager(PendingRequestStore, () => CurrentTimeProvider.CurrentTime, TimeSpan.FromSeconds(1), LoggerFactory); PendingRequestManager.Start(); } @@ -387,7 +389,7 @@ protected async virtual Task DestroyConsumers() protected void AddConsumer(AbstractConsumer consumer) => _consumers.Add(consumer); - public virtual DateTimeOffset CurrentTime => DateTimeOffset.UtcNow; + public ICurrentTimeProvider CurrentTimeProvider { get; protected set; } public virtual int? MaxMessagesPerTransaction => null; @@ -513,7 +515,7 @@ public virtual Task ProduceSend(object request, string pat path ??= GetDefaultPath(requestType, producerSettings); timeout ??= GetDefaultRequestTimeout(requestType, producerSettings); - var created = CurrentTime; + var created = CurrentTimeProvider.CurrentTime; var expires = created.Add(timeout.Value); // generate the request guid @@ -670,7 +672,7 @@ public virtual Task OnResponseArrived(byte[] responsePayload, string { if (_logger.IsEnabled(LogLevel.Debug)) { - var tookTimespan = CurrentTime.Subtract(requestState.Created); + var tookTimespan = CurrentTimeProvider.CurrentTime.Subtract(requestState.Created); _logger.LogDebug("Response arrived for {Request} on path {Path} (time: {RequestTime} ms)", requestState, path, tookTimespan); } diff --git a/src/SlimMessageBus.Host/Providers/GuidGenerator/GuidGenerator.cs b/src/SlimMessageBus.Host/Providers/GuidGenerator/GuidGenerator.cs new file mode 100644 index 00000000..d52495e4 --- /dev/null +++ b/src/SlimMessageBus.Host/Providers/GuidGenerator/GuidGenerator.cs @@ -0,0 +1,9 @@ +namespace SlimMessageBus.Host; + +public class GuidGenerator : IGuidGenerator +{ + /// + /// Generate a new Guid (UUID v4) using + /// + public Guid NewGuid() => Guid.NewGuid(); +} diff --git a/src/SlimMessageBus.Host/Providers/GuidGenerator/IGuidGenerator.cs b/src/SlimMessageBus.Host/Providers/GuidGenerator/IGuidGenerator.cs new file mode 100644 index 00000000..e891fc89 --- /dev/null +++ b/src/SlimMessageBus.Host/Providers/GuidGenerator/IGuidGenerator.cs @@ -0,0 +1,10 @@ +namespace SlimMessageBus.Host; + +public interface IGuidGenerator +{ + /// + /// Generate a new Guid + /// + /// + Guid NewGuid(); +} diff --git a/src/SlimMessageBus.Host/Providers/TimeProvider/CurrentTimeProvider.cs b/src/SlimMessageBus.Host/Providers/TimeProvider/CurrentTimeProvider.cs new file mode 100644 index 00000000..6ddd90fd --- /dev/null +++ b/src/SlimMessageBus.Host/Providers/TimeProvider/CurrentTimeProvider.cs @@ -0,0 +1,6 @@ +namespace SlimMessageBus.Host; + +public class CurrentTimeProvider : ICurrentTimeProvider +{ + public DateTimeOffset CurrentTime => DateTimeOffset.UtcNow; +} \ No newline at end of file diff --git a/src/SlimMessageBus.Host/ICurrentTimeProvider.cs b/src/SlimMessageBus.Host/Providers/TimeProvider/ICurrentTimeProvider.cs similarity index 100% rename from src/SlimMessageBus.Host/ICurrentTimeProvider.cs rename to src/SlimMessageBus.Host/Providers/TimeProvider/ICurrentTimeProvider.cs diff --git a/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusMessageBusTests.cs b/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusMessageBusTests.cs index cd52a4cb..5f50e183 100644 --- a/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusMessageBusTests.cs +++ b/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusMessageBusTests.cs @@ -24,6 +24,7 @@ public ServiceBusMessageBusTests() serviceProviderMock.Setup(x => x.GetService(typeof(IMessageSerializer))).Returns(new Mock().Object); serviceProviderMock.Setup(x => x.GetService(typeof(IMessageTypeResolver))).Returns(new AssemblyQualifiedNameMessageTypeResolver()); serviceProviderMock.Setup(x => x.GetService(typeof(IEnumerable))).Returns(Array.Empty()); + serviceProviderMock.Setup(x => x.GetService(typeof(ICurrentTimeProvider))).Returns(new CurrentTimeProvider()); BusBuilder.WithDependencyResolver(serviceProviderMock.Object); diff --git a/src/Tests/SlimMessageBus.Host.Kafka.Test/KafkaMessageBusTest.cs b/src/Tests/SlimMessageBus.Host.Kafka.Test/KafkaMessageBusTest.cs index 759ea2e3..278267f7 100644 --- a/src/Tests/SlimMessageBus.Host.Kafka.Test/KafkaMessageBusTest.cs +++ b/src/Tests/SlimMessageBus.Host.Kafka.Test/KafkaMessageBusTest.cs @@ -17,6 +17,7 @@ public KafkaMessageBusTest() var serviceProviderMock = new Mock(); serviceProviderMock.Setup(x => x.GetService(typeof(ILogger))).CallBase(); serviceProviderMock.Setup(x => x.GetService(typeof(IMessageTypeResolver))).Returns(new AssemblyQualifiedNameMessageTypeResolver()); + serviceProviderMock.Setup(x => x.GetService(typeof(ICurrentTimeProvider))).Returns(new CurrentTimeProvider()); MbSettings = new MessageBusSettings { diff --git a/src/Tests/SlimMessageBus.Host.Kafka.Test/MessageBusMock.cs b/src/Tests/SlimMessageBus.Host.Kafka.Test/MessageBusMock.cs index b11c5391..d100af1f 100644 --- a/src/Tests/SlimMessageBus.Host.Kafka.Test/MessageBusMock.cs +++ b/src/Tests/SlimMessageBus.Host.Kafka.Test/MessageBusMock.cs @@ -9,28 +9,31 @@ public class MessageBusMock public Mock SerializerMock { get; } public MessageBusSettings BusSettings { get; } - public DateTimeOffset CurrentTime { get; set; } + public CurrentTimeProviderFake CurrentTimeProvider { get; set; } public Mock BusMock { get; } public MessageBusBase Bus => BusMock.Object; public MessageBusMock() { + CurrentTimeProvider = new CurrentTimeProviderFake + { + CurrentTime = DateTimeOffset.UtcNow + }; + SerializerMock = new Mock(); ServiceProviderMock = new ServiceProviderMock(); ServiceProviderMock.ProviderMock.Setup(x => x.GetService(typeof(IMessageSerializer))).Returns(SerializerMock.Object); ServiceProviderMock.ProviderMock.Setup(x => x.GetService(typeof(IMessageTypeResolver))).Returns(new AssemblyQualifiedNameMessageTypeResolver()); + ServiceProviderMock.ProviderMock.Setup(x => x.GetService(typeof(ICurrentTimeProvider))).Returns(CurrentTimeProvider); BusSettings = new MessageBusSettings { ServiceProvider = ServiceProviderMock.ProviderMock.Object, }; - - CurrentTime = DateTimeOffset.UtcNow; - + BusMock = new Mock(BusSettings); - BusMock.SetupGet(x => x.Settings).Returns(BusSettings); - BusMock.SetupGet(x => x.CurrentTime).Returns(() => CurrentTime); + BusMock.SetupGet(x => x.Settings).Returns(BusSettings); } } diff --git a/src/Tests/SlimMessageBus.Host.Memory.Test/MemoryMessageBusTests.cs b/src/Tests/SlimMessageBus.Host.Memory.Test/MemoryMessageBusTests.cs index ca5952e7..2640acb2 100644 --- a/src/Tests/SlimMessageBus.Host.Memory.Test/MemoryMessageBusTests.cs +++ b/src/Tests/SlimMessageBus.Host.Memory.Test/MemoryMessageBusTests.cs @@ -36,6 +36,7 @@ public MemoryMessageBusTests() _serviceProviderMock.ProviderMock.Setup(x => x.GetService(typeof(IMessageSerializer))).Returns(_messageSerializerMock.Object); _serviceProviderMock.ProviderMock.Setup(x => x.GetService(typeof(IMessageTypeResolver))).Returns(new AssemblyQualifiedNameMessageTypeResolver()); _serviceProviderMock.ProviderMock.Setup(x => x.GetService(It.Is(t => t.IsGenericType && t.GetGenericTypeDefinition() == typeof(IEnumerable<>)))).Returns((Type t) => Enumerable.Empty()); + _serviceProviderMock.ProviderMock.Setup(x => x.GetService(typeof(ICurrentTimeProvider))).Returns(new CurrentTimeProvider()); _messageSerializerMock .Setup(x => x.Serialize(It.IsAny(), It.IsAny())) @@ -182,6 +183,7 @@ public async Task When_Publish_Given_PerMessageScopeEnabled_Then_TheScopeIsCreat _serviceProviderMock.ProviderMock.Verify(x => x.GetService(typeof(IEnumerable>)), Times.Once); _serviceProviderMock.ProviderMock.Verify(x => x.GetService(typeof(IEnumerable)), Times.Between(0, 2, Moq.Range.Inclusive)); _serviceProviderMock.ProviderMock.Verify(x => x.GetService(typeof(IMessageTypeResolver)), Times.Once); + _serviceProviderMock.ProviderMock.Verify(x => x.GetService(typeof(ICurrentTimeProvider)), Times.Once); _serviceProviderMock.ProviderMock.VerifyNoOtherCalls(); scopeProviderMock.Should().NotBeNull(); @@ -231,6 +233,7 @@ public async Task When_Publish_Given_PerMessageScopeDisabled_Then_TheScopeIsNotC _serviceProviderMock.ProviderMock.Verify(x => x.GetService(typeof(IEnumerable>)), Times.Once); _serviceProviderMock.ProviderMock.Verify(x => x.GetService(typeof(IEnumerable)), Times.Between(0, 2, Moq.Range.Inclusive)); _serviceProviderMock.ProviderMock.Verify(x => x.GetService(typeof(IMessageTypeResolver)), Times.Once); + _serviceProviderMock.ProviderMock.Verify(x => x.GetService(typeof(ICurrentTimeProvider)), Times.Once); _serviceProviderMock.ProviderMock.VerifyNoOtherCalls(); consumerMock.Verify(x => x.OnHandle(m, It.IsAny()), Times.Once); @@ -306,6 +309,7 @@ public async Task When_ProducePublish_Given_PerMessageScopeDisabledOrEnabled_And _serviceProviderMock.ProviderMock.Verify(x => x.GetService(typeof(ILoggerFactory)), Times.Once); _serviceProviderMock.ProviderMock.Verify(x => x.GetService(typeof(IEnumerable)), Times.Between(0, 2, Moq.Range.Inclusive)); _serviceProviderMock.ProviderMock.Verify(x => x.GetService(typeof(IMessageTypeResolver)), Times.Once); + _serviceProviderMock.ProviderMock.Verify(x => x.GetService(typeof(ICurrentTimeProvider)), Times.Once); _serviceProviderMock.ProviderMock.VerifyNoOtherCalls(); } @@ -341,6 +345,7 @@ public async Task When_Publish_Given_TwoConsumersOnSameTopic_Then_BothAreInvoked _serviceProviderMock.ProviderMock.Verify(x => x.GetService(typeof(IEnumerable>)), Times.Once); _serviceProviderMock.ProviderMock.Verify(x => x.GetService(typeof(IEnumerable)), Times.Between(0, 2, Moq.Range.Inclusive)); _serviceProviderMock.ProviderMock.Verify(x => x.GetService(typeof(IMessageTypeResolver)), Times.Once); + _serviceProviderMock.ProviderMock.Verify(x => x.GetService(typeof(ICurrentTimeProvider)), Times.Once); _serviceProviderMock.ProviderMock.VerifyNoOtherCalls(); consumer1Mock.VerifySet(x => x.Context = It.IsAny(), Times.Once); @@ -391,6 +396,7 @@ public async Task When_Send_Given_AConsumersAndHandlerOnSameTopic_Then_BothAreIn _serviceProviderMock.ProviderMock.Verify(x => x.GetService(typeof(IEnumerable>)), Times.Once); _serviceProviderMock.ProviderMock.Verify(x => x.GetService(typeof(IEnumerable)), Times.Between(0, 2, Moq.Range.Inclusive)); _serviceProviderMock.ProviderMock.Verify(x => x.GetService(typeof(IMessageTypeResolver)), Times.Once); + _serviceProviderMock.ProviderMock.Verify(x => x.GetService(typeof(ICurrentTimeProvider)), Times.Once); _serviceProviderMock.ProviderMock.VerifyNoOtherCalls(); consumer2Mock.Verify(x => x.OnHandle(m, It.IsAny()), Times.Once); diff --git a/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/OutboxBenchmarkTests.cs b/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/OutboxBenchmarkTests.cs index b8e291c1..12dedc0b 100644 --- a/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/OutboxBenchmarkTests.cs +++ b/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/OutboxBenchmarkTests.cs @@ -197,7 +197,7 @@ await PerformDbOperation(async (context, outboxMigrationService) => var outboxPublishTimerElapsed = TimeSpan.Zero; if (_useOutbox) { - var outputRepository = ServiceProvider.GetRequiredService(); + var outputRepository = ServiceProvider.GetRequiredService(); var outboxTimer = Stopwatch.StartNew(); var publishCount = await outboxSendingTask.SendMessages(ServiceProvider, outputRepository, CancellationToken.None); diff --git a/src/Tests/SlimMessageBus.Host.Outbox.Sql.Test/BaseSqlOutboxRepositoryTest.cs b/src/Tests/SlimMessageBus.Host.Outbox.Sql.Test/BaseSqlOutboxRepositoryTest.cs index cb51dc21..f0fc21b7 100644 --- a/src/Tests/SlimMessageBus.Host.Outbox.Sql.Test/BaseSqlOutboxRepositoryTest.cs +++ b/src/Tests/SlimMessageBus.Host.Outbox.Sql.Test/BaseSqlOutboxRepositoryTest.cs @@ -1,5 +1,7 @@ -namespace SlimMessageBus.Host.Outbox.Sql.Test; - +namespace SlimMessageBus.Host.Outbox.Sql.Test; + +using SlimMessageBus.Host.Test.Common; + public class BaseSqlOutboxRepositoryTest : BaseSqlTest { protected readonly Fixture _fixture = new(); @@ -7,9 +9,10 @@ public class BaseSqlOutboxRepositoryTest : BaseSqlTest protected SqlConnection _connection; protected SqlOutboxMigrationService _migrationService; protected SqlOutboxSettings _settings; - protected SqlOutboxRepository _target; + protected SqlOutboxMessageRepository _target; protected SqlOutboxTemplate _template; - protected ISqlTransactionService _transactionService; + protected ISqlTransactionService _transactionService; + protected CurrentTimeProviderFake _currentTimeProvider; public override async Task InitializeAsync() { @@ -19,8 +22,9 @@ public override async Task InitializeAsync() _connection = new SqlConnection(GetConnectionString()); _transactionService = new SqlTransactionService(_connection, _settings.SqlSettings); _template = new SqlOutboxTemplate(_settings); - _target = new SqlOutboxRepository(NullLogger.Instance, _settings, _template, _connection, _transactionService); - _migrationService = new SqlOutboxMigrationService(NullLogger.Instance, _target, _transactionService, _settings); + _currentTimeProvider = new(); + _target = new SqlOutboxMessageRepository(NullLogger.Instance, _settings, _template, new GuidGenerator(), _currentTimeProvider, new DefaultInstanceIdProvider(), _connection, _transactionService); + _migrationService = new SqlOutboxMigrationService(NullLogger.Instance, _target, _transactionService, _settings); await _migrationService.Migrate(CancellationToken.None); } @@ -38,42 +42,38 @@ protected async Task> SeedOutbox(int count, Action< { var message = messages[i]; action?.Invoke(i, message); - await _target.Save(message, cancellationToken); + message.Id = await _target.Create(message.BusName, message.Headers, message.Path, message.MessageType, message.MessagePayload, cancellationToken); } - return messages; } - protected IReadOnlyList CreateOutboxMessages(int count) - { - return Enumerable - .Range(0, count) - .Select(_ => - { - // Create a sample object for MessagePayload - var samplePayload = new { Key = _fixture.Create(), Number = _fixture.Create() }; - var jsonPayload = JsonSerializer.SerializeToUtf8Bytes(samplePayload); + protected IReadOnlyList CreateOutboxMessages(int count) => Enumerable + .Range(0, count) + .Select(_ => + { + // Create a sample object for MessagePayload + var samplePayload = new { Key = _fixture.Create(), Number = _fixture.Create() }; + var jsonPayload = JsonSerializer.SerializeToUtf8Bytes(samplePayload); - // Generate Headers dictionary with simple types - var headers = new Dictionary - { - { "Header1", _fixture.Create() }, - { "Header2", _fixture.Create() }, - { "Header3", _fixture.Create() } - }; + // Generate Headers dictionary with simple types + var headers = new Dictionary + { + { "Header1", _fixture.Create() }, + { "Header2", _fixture.Create() }, + { "Header3", _fixture.Create() } + }; - // Configure fixture to use the generated values - _fixture.Customize(om => om - .With(x => x.MessagePayload, jsonPayload) - .With(x => x.Headers, headers) - .With(x => x.LockExpiresOn, DateTime.MinValue) - .With(x => x.LockInstanceId, string.Empty) - .With(x => x.DeliveryAborted, false) - .With(x => x.DeliveryAttempt, 0) - .With(x => x.DeliveryComplete, false)); + // Configure fixture to use the generated values + _fixture.Customize(om => om + .With(x => x.MessagePayload, jsonPayload) + .With(x => x.Headers, headers) + .With(x => x.LockExpiresOn, DateTime.MinValue) + .With(x => x.LockInstanceId, string.Empty) + .With(x => x.DeliveryAborted, false) + .With(x => x.DeliveryAttempt, 0) + .With(x => x.DeliveryComplete, false)); - return _fixture.Create(); - }) - .ToList(); - } + return _fixture.Create(); + }) + .ToList(); } diff --git a/src/Tests/SlimMessageBus.Host.Outbox.Sql.Test/SlimMessageBus.Host.Outbox.Sql.Test.csproj b/src/Tests/SlimMessageBus.Host.Outbox.Sql.Test/SlimMessageBus.Host.Outbox.Sql.Test.csproj index c6a10cb4..9c2167a7 100644 --- a/src/Tests/SlimMessageBus.Host.Outbox.Sql.Test/SlimMessageBus.Host.Outbox.Sql.Test.csproj +++ b/src/Tests/SlimMessageBus.Host.Outbox.Sql.Test/SlimMessageBus.Host.Outbox.Sql.Test.csproj @@ -8,6 +8,7 @@ + diff --git a/src/Tests/SlimMessageBus.Host.Outbox.Sql.Test/SqlOutboxRepositoryTests.cs b/src/Tests/SlimMessageBus.Host.Outbox.Sql.Test/SqlOutboxRepositoryTests.cs index 381e1199..3f61d0b7 100644 --- a/src/Tests/SlimMessageBus.Host.Outbox.Sql.Test/SqlOutboxRepositoryTests.cs +++ b/src/Tests/SlimMessageBus.Host.Outbox.Sql.Test/SqlOutboxRepositoryTests.cs @@ -1,5 +1,4 @@ -namespace SlimMessageBus.Host.Outbox.Sql.Test; - +namespace SlimMessageBus.Host.Outbox.Sql.Test; public static class SqlOutboxRepositoryTests { public class SaveTests : BaseSqlOutboxRepositoryTest @@ -11,12 +10,18 @@ public async Task SavedMessage_IsPersisted() var message = CreateOutboxMessages(1).Single(); // act - await _target.Save(message, CancellationToken.None); - var actual = await _target.GetAllMessages(CancellationToken.None); - - // assert - actual.Count.Should().Be(1); - actual.Single().Should().BeEquivalentTo(message); + message.Id = await _target.Create(message.BusName, message.Headers, message.Path, message.MessageType, message.MessagePayload, CancellationToken.None); + var messages = await _target.GetAllMessages(CancellationToken.None); + + // assert + messages.Count.Should().Be(1); + var actual = messages.Single(); + actual.Id.Should().Be(message.Id); + actual.BusName.Should().Be(message.BusName); + actual.Headers.Should().BeEquivalentTo(message.Headers); + actual.Path.Should().Be(message.Path); + actual.MessageType.Should().Be(message.MessageType); + actual.MessagePayload.Should().BeEquivalentTo(message.MessagePayload); } } @@ -50,11 +55,13 @@ public async Task ExpiredItems_AreDeleted() var seedMessages = await SeedOutbox(10, (i, x) => { - x.DeliveryAttempt = 1; - x.DeliveryComplete = true; - x.Timestamp = i < 5 ? expired : active; + // affect the timestamp to make the message expired + _currentTimeProvider.CurrentTime = i < 5 ? expired : active; }); - + + // mark the first 5 messages as sent + await _target.UpdateToSent(seedMessages.Select(x => x.Id).Take(5).ToList(), CancellationToken.None); + // act await _target.DeleteSent(active, CancellationToken.None); var messages = await _target.GetAllMessages(CancellationToken.None); diff --git a/src/Tests/SlimMessageBus.Host.Outbox.Test/Interceptors/OutboxForwardingPublishInterceptorTests.cs b/src/Tests/SlimMessageBus.Host.Outbox.Test/Interceptors/OutboxForwardingPublishInterceptorTests.cs index b13f9dd8..1ca6b273 100644 --- a/src/Tests/SlimMessageBus.Host.Outbox.Test/Interceptors/OutboxForwardingPublishInterceptorTests.cs +++ b/src/Tests/SlimMessageBus.Host.Outbox.Test/Interceptors/OutboxForwardingPublishInterceptorTests.cs @@ -1,6 +1,6 @@ -namespace SlimMessageBus.Host.Outbox.Test.Interceptors; - -using SlimMessageBus.Host.Outbox.Services; +namespace SlimMessageBus.Host.Outbox.Test.Interceptors; + +using SlimMessageBus.Host.Outbox; using SlimMessageBus.Host.Serialization; public static class OutboxForwardingPublishInterceptorTests @@ -14,13 +14,13 @@ public void OutboxForwardingPublisher_MustBeLastInPipeline() var expected = int.MaxValue; var mockLogger = new Mock>(); - var mockOutboxRepository = new Mock(); - var mockInstanceIdProvider = new Mock(); + var mockOutboxRepository = new Mock(); var mockOutboxNotificationService = new Mock(); - var mockOutboxSettings = new Mock(); + var mockOutboxSettings = new Mock(); + var mockOutboxMessageFactory = new Mock(); // act - var target = new OutboxForwardingPublishInterceptor(mockLogger.Object, mockOutboxRepository.Object, mockInstanceIdProvider.Object, mockOutboxNotificationService.Object, mockOutboxSettings.Object); + var target = new OutboxForwardingPublishInterceptor(mockLogger.Object, mockOutboxMessageFactory.Object, mockOutboxNotificationService.Object, mockOutboxSettings.Object); var actual = target.Order; // assert @@ -41,20 +41,19 @@ public void OutboxForwardingPublisher_MustImplement_IInterceptorWithOrder() public class OnHandleTests { private readonly Mock> _mockLogger; - private readonly Mock _mockOutboxRepository; - private readonly Mock _mockInstanceIdProvider; + private readonly Mock _mockOutboxRepository; private readonly Mock _mockSerializer; private readonly Mock _mockMasterMessageBus; private readonly Mock _mockOutboxNotificationService; private readonly Mock _mockOutboxSettings; - private Mock _mockTargetBus; - private Mock _mockProducerContext; + private readonly Mock _mockTargetBus; + private readonly Mock _mockProducerContext; + private readonly Mock _mockOutboxMessageFactory; public OnHandleTests() { _mockLogger = new Mock>(); - _mockOutboxRepository = new Mock(); - _mockInstanceIdProvider = new Mock(); + _mockOutboxRepository = new Mock(); _mockOutboxNotificationService = new Mock(); _mockOutboxSettings = new Mock(); @@ -67,7 +66,12 @@ public OnHandleTests() _mockTargetBus.SetupGet(x => x.Target).Returns(_mockMasterMessageBus.Object); _mockProducerContext = new Mock(); - _mockProducerContext.SetupGet(x => x.Bus).Returns(_mockTargetBus.Object); + _mockProducerContext.SetupGet(x => x.Bus).Returns(_mockTargetBus.Object); + + _mockOutboxMessageFactory = new Mock(); + _mockOutboxMessageFactory + .Setup(x => x.Create(It.IsAny(), It.IsAny>(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .ReturnsAsync(Guid.NewGuid()); } [Fact] @@ -75,7 +79,8 @@ public async Task SkipOutboxHeader_IsPresent_PromoteToNext() { // arrange _mockProducerContext.SetupGet(x => x.Headers).Returns(new Dictionary { { OutboxForwardingPublishInterceptor.SkipOutboxHeader, true } }); - _mockOutboxRepository.Setup(x => x.Save(It.IsAny(), It.IsAny())).Verifiable(); + _mockOutboxRepository.Setup(x => x.Create(It.IsAny(), It.IsAny>(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())).Verifiable(); + var nextCalled = 0; var next = () => @@ -85,14 +90,22 @@ public async Task SkipOutboxHeader_IsPresent_PromoteToNext() }; // act - var target = new OutboxForwardingPublishInterceptor(_mockLogger.Object, _mockOutboxRepository.Object, _mockInstanceIdProvider.Object, _mockOutboxNotificationService.Object, _mockOutboxSettings.Object); + var target = new OutboxForwardingPublishInterceptor(_mockLogger.Object, _mockOutboxMessageFactory.Object, _mockOutboxNotificationService.Object, _mockOutboxSettings.Object); await target.OnHandle(new object(), next, _mockProducerContext.Object); target.Dispose(); // assert _mockProducerContext.VerifyGet(x => x.Bus, Times.AtLeastOnce); _mockProducerContext.VerifyGet(x => x.Headers, Times.AtLeastOnce); - _mockOutboxRepository.Verify(x => x.Save(It.IsAny(), It.IsAny()), Times.Never); + _mockOutboxRepository + .Verify(x => x.Create( + It.IsAny(), + It.IsAny>(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny()), + Times.Never); nextCalled.Should().Be(1); } @@ -103,8 +116,8 @@ public async Task SkipOutboxHeader_IsNotPresent_DoNotPromoteToNext() // arrange var message = new object(); - _mockSerializer.Setup(x => x.Serialize(typeof(object), message)).Verifiable(); - _mockOutboxRepository.Setup(x => x.Save(It.IsAny(), It.IsAny())).Verifiable(); + _mockSerializer.Setup(x => x.Serialize(typeof(object), message)).Verifiable(); + _mockOutboxRepository.Setup(x => x.Create(It.IsAny(), It.IsAny>(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())).Verifiable(); var nextCalled = 0; var next = () => @@ -114,13 +127,21 @@ public async Task SkipOutboxHeader_IsNotPresent_DoNotPromoteToNext() }; // act - var target = new OutboxForwardingPublishInterceptor(_mockLogger.Object, _mockOutboxRepository.Object, _mockInstanceIdProvider.Object, _mockOutboxNotificationService.Object, _mockOutboxSettings.Object); + var target = new OutboxForwardingPublishInterceptor(_mockLogger.Object, _mockOutboxMessageFactory.Object, _mockOutboxNotificationService.Object, _mockOutboxSettings.Object); await target.OnHandle(new object(), next, _mockProducerContext.Object); target.Dispose(); // assert nextCalled.Should().Be(0); - _mockOutboxRepository.Verify(x => x.Save(It.IsAny(), It.IsAny()), Times.Once); + _mockOutboxRepository + .Verify(x => x.Create( + It.IsAny(), + It.IsAny>(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny()), + Times.Never); } [Fact] @@ -128,16 +149,24 @@ public async Task SkipOutboxHeader_IsPresent_DoNotRaiseOutboxNotification() { // arrange _mockProducerContext.SetupGet(x => x.Headers).Returns(new Dictionary { { OutboxForwardingPublishInterceptor.SkipOutboxHeader, true } }); - _mockOutboxRepository.Setup(x => x.Save(It.IsAny(), It.IsAny())).Verifiable(); + _mockOutboxRepository.Setup(x => x.Create(It.IsAny(), It.IsAny>(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())).Verifiable(); _mockOutboxNotificationService.Setup(x => x.Notify()).Verifiable(); // act - var target = new OutboxForwardingPublishInterceptor(_mockLogger.Object, _mockOutboxRepository.Object, _mockInstanceIdProvider.Object, _mockOutboxNotificationService.Object, _mockOutboxSettings.Object); + var target = new OutboxForwardingPublishInterceptor(_mockLogger.Object, _mockOutboxMessageFactory.Object, _mockOutboxNotificationService.Object, _mockOutboxSettings.Object); await target.OnHandle(new object(), () => Task.CompletedTask, _mockProducerContext.Object); target.Dispose(); // assert - _mockOutboxRepository.Verify(x => x.Save(It.IsAny(), It.IsAny()), Times.Never); + _mockOutboxRepository + .Verify(x => x.Create( + It.IsAny(), + It.IsAny>(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny()), + Times.Never); _mockOutboxNotificationService.Verify(x => x.Notify(), Times.Never); } @@ -148,16 +177,24 @@ public async Task SkipOutboxHeader_IsNotPresent_RaiseOutboxNotification() var message = new object(); _mockSerializer.Setup(x => x.Serialize(typeof(object), message)).Verifiable(); - _mockOutboxRepository.Setup(x => x.Save(It.IsAny(), It.IsAny())).Verifiable(); + _mockOutboxRepository.Setup(x => x.Create(It.IsAny(), It.IsAny>(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())).Verifiable(); _mockOutboxNotificationService.Setup(x => x.Notify()).Verifiable(); // act - var target = new OutboxForwardingPublishInterceptor(_mockLogger.Object, _mockOutboxRepository.Object, _mockInstanceIdProvider.Object, _mockOutboxNotificationService.Object, _mockOutboxSettings.Object); + var target = new OutboxForwardingPublishInterceptor(_mockLogger.Object, _mockOutboxMessageFactory.Object, _mockOutboxNotificationService.Object, _mockOutboxSettings.Object); await target.OnHandle(new object(), () => Task.CompletedTask, _mockProducerContext.Object); target.Dispose(); // assert - _mockOutboxRepository.Verify(x => x.Save(It.IsAny(), It.IsAny()), Times.Once); + _mockOutboxRepository + .Verify(x => x.Create( + It.IsAny(), + It.IsAny>(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny()), + Times.Never); _mockOutboxNotificationService.Verify(x => x.Notify(), Times.Once); } } diff --git a/src/Tests/SlimMessageBus.Host.Outbox.Test/OutboxLockRenewalTimerTests.cs b/src/Tests/SlimMessageBus.Host.Outbox.Test/OutboxLockRenewalTimerTests.cs index ffc9d15d..8bb89646 100644 --- a/src/Tests/SlimMessageBus.Host.Outbox.Test/OutboxLockRenewalTimerTests.cs +++ b/src/Tests/SlimMessageBus.Host.Outbox.Test/OutboxLockRenewalTimerTests.cs @@ -3,7 +3,7 @@ public class OutboxLockRenewalTimerTests { private readonly Mock> _loggerMock; - private readonly Mock _outboxRepositoryMock; + private readonly Mock _outboxRepositoryMock; private readonly Mock _instanceIdProviderMock; private readonly CancellationTokenSource _cancellationTokenSource; private readonly Action _lockLostAction; @@ -14,7 +14,7 @@ public class OutboxLockRenewalTimerTests public OutboxLockRenewalTimerTests() { _loggerMock = new Mock>(); - _outboxRepositoryMock = new Mock(); + _outboxRepositoryMock = new Mock(); _instanceIdProviderMock = new Mock(); _cancellationTokenSource = new CancellationTokenSource(); _lockLostAction = Mock.Of>(); diff --git a/src/Tests/SlimMessageBus.Host.Outbox.Test/Services/OutboxSendingTaskTests.cs b/src/Tests/SlimMessageBus.Host.Outbox.Test/Services/OutboxSendingTaskTests.cs index e4f7a680..f713dadd 100644 --- a/src/Tests/SlimMessageBus.Host.Outbox.Test/Services/OutboxSendingTaskTests.cs +++ b/src/Tests/SlimMessageBus.Host.Outbox.Test/Services/OutboxSendingTaskTests.cs @@ -7,7 +7,7 @@ public sealed class OutboxSendingTaskTests public class DispatchBatchTests { private readonly ILoggerFactory _loggerFactory; - private readonly Mock _outboxRepositoryMock; + private readonly Mock _outboxRepositoryMock; private readonly Mock _producerMock; private readonly Mock _messageBusTargetMock; private readonly OutboxSettings _outboxSettings; @@ -16,14 +16,14 @@ public class DispatchBatchTests public DispatchBatchTests() { - _outboxRepositoryMock = new Mock(); + _outboxRepositoryMock = new Mock(); _producerMock = new Mock(); _messageBusTargetMock = new Mock(); _outboxSettings = new OutboxSettings { MaxDeliveryAttempts = 5 }; _serviceProvider = Mock.Of(); _loggerFactory = new NullLoggerFactory(); - _sut = new OutboxSendingTask(_loggerFactory, _outboxSettings, _serviceProvider); + _sut = new OutboxSendingTask(_loggerFactory, _outboxSettings, new CurrentTimeProvider(), _serviceProvider); } [Fact] @@ -88,7 +88,7 @@ public async Task DispatchBatch_ShouldIncrementDeliveryAttempts_WhenNotAllMessag public class ProcessMessagesTests { - private readonly Mock _mockOutboxRepository; + private readonly Mock _mockOutboxRepository; private readonly Mock _mockCompositeMessageBus; private readonly Mock _mockMessageBusTarget; private readonly Mock _mockMasterMessageBus; @@ -98,7 +98,7 @@ public class ProcessMessagesTests public ProcessMessagesTests() { - _mockOutboxRepository = new Mock(); + _mockOutboxRepository = new Mock(); _mockCompositeMessageBus = new Mock(); _mockMessageBusTarget = new Mock(); _mockMasterMessageBus = new Mock(); @@ -110,7 +110,7 @@ public ProcessMessagesTests() MessageTypeResolver = new Mock().Object }; - _sut = new OutboxSendingTask(NullLoggerFactory.Instance, _outboxSettings, null); + _sut = new OutboxSendingTask(NullLoggerFactory.Instance, _outboxSettings, new CurrentTimeProvider(), null); } [Fact] diff --git a/src/Tests/SlimMessageBus.Host.Redis.Test/RedisMessageBusTest.cs b/src/Tests/SlimMessageBus.Host.Redis.Test/RedisMessageBusTest.cs index fb1deae3..beced739 100644 --- a/src/Tests/SlimMessageBus.Host.Redis.Test/RedisMessageBusTest.cs +++ b/src/Tests/SlimMessageBus.Host.Redis.Test/RedisMessageBusTest.cs @@ -25,6 +25,7 @@ public RedisMessageBusTest() { _serviceProviderMock.Setup(x => x.GetService(typeof(IMessageSerializer))).Returns(_messageSerializerMock.Object); _serviceProviderMock.Setup(x => x.GetService(typeof(IMessageTypeResolver))).Returns(new AssemblyQualifiedNameMessageTypeResolver()); + _serviceProviderMock.Setup(x => x.GetService(typeof(ICurrentTimeProvider))).Returns(new CurrentTimeProvider()); _serviceProviderMock.Setup(x => x.GetService(It.Is(t => t.IsGenericType && t.GetGenericTypeDefinition() == typeof(IEnumerable<>)))).Returns(Enumerable.Empty()); _settings.ServiceProvider = _serviceProviderMock.Object; diff --git a/src/Tests/SlimMessageBus.Host.Test.Common/CurrentTimeProviderFake.cs b/src/Tests/SlimMessageBus.Host.Test.Common/CurrentTimeProviderFake.cs new file mode 100644 index 00000000..61c3a953 --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.Test.Common/CurrentTimeProviderFake.cs @@ -0,0 +1,6 @@ +namespace SlimMessageBus.Host.Test.Common; + +public class CurrentTimeProviderFake : ICurrentTimeProvider +{ + public DateTimeOffset CurrentTime { get; set; } = DateTimeOffset.Now; +} \ No newline at end of file diff --git a/src/Tests/SlimMessageBus.Host.Test/Consumer/MessageBusMock.cs b/src/Tests/SlimMessageBus.Host.Test/Consumer/MessageBusMock.cs index b8f68ad7..60c1f7c6 100644 --- a/src/Tests/SlimMessageBus.Host.Test/Consumer/MessageBusMock.cs +++ b/src/Tests/SlimMessageBus.Host.Test/Consumer/MessageBusMock.cs @@ -5,7 +5,7 @@ namespace SlimMessageBus.Host.Test; using SlimMessageBus.Host.Interceptor; using SlimMessageBus.Host.Serialization; -public class MessageBusMock +public class MessageBusMock : ICurrentTimeProvider { public Mock ServiceProviderMock { get; } public IList> ChildDependencyResolverMocks { get; } @@ -63,6 +63,7 @@ void SetupDependencyResolver(Mock mock) where T : class, IServiceProvider ServiceProviderMock.Setup(x => x.GetService(typeof(IServiceScopeFactory))).Returns(serviceScopeFactoryMock.Object); ServiceProviderMock.Setup(x => x.GetService(typeof(IMessageSerializer))).Returns(SerializerMock.Object); + ServiceProviderMock.Setup(x => x.GetService(typeof(ICurrentTimeProvider))).Returns(this); var mbSettings = new MessageBusSettings { @@ -75,7 +76,6 @@ void SetupDependencyResolver(Mock mock) where T : class, IServiceProvider BusMock.SetupGet(x => x.Settings).Returns(mbSettings); BusMock.SetupGet(x => x.Serializer).CallBase(); BusMock.SetupGet(x => x.MessageBusTarget).CallBase(); - BusMock.SetupGet(x => x.CurrentTime).Returns(() => CurrentTime); BusMock.Setup(x => x.CreateHeaders()).CallBase(); BusMock.Setup(x => x.CreateMessageScope(It.IsAny(), It.IsAny(), It.IsAny>(), It.IsAny())).CallBase(); } diff --git a/src/Tests/SlimMessageBus.Host.Test/Consumer/MessageHandlerTest.cs b/src/Tests/SlimMessageBus.Host.Test/Consumer/MessageHandlerTest.cs index f80aa052..6c63213f 100644 --- a/src/Tests/SlimMessageBus.Host.Test/Consumer/MessageHandlerTest.cs +++ b/src/Tests/SlimMessageBus.Host.Test/Consumer/MessageHandlerTest.cs @@ -39,7 +39,7 @@ public MessageHandlerTest() messageTypeResolver: messageTypeResolverMock.Object, messageHeadersFactory: messageHeaderFactoryMock.Object, runtimeTypeCache: new Host.Collections.RuntimeTypeCache(), - currentTimeProvider: busMock.Bus, + currentTimeProvider: busMock, path: "topic1"); } diff --git a/src/Tests/SlimMessageBus.Host.Test/Hybrid/HybridMessageBusTest.cs b/src/Tests/SlimMessageBus.Host.Test/Hybrid/HybridMessageBusTest.cs index f6a18972..30140f62 100644 --- a/src/Tests/SlimMessageBus.Host.Test/Hybrid/HybridMessageBusTest.cs +++ b/src/Tests/SlimMessageBus.Host.Test/Hybrid/HybridMessageBusTest.cs @@ -39,6 +39,7 @@ public HybridMessageBusTest() _serviceProviderMock.Setup(x => x.GetService(typeof(IMessageTypeResolver))).Returns(new AssemblyQualifiedNameMessageTypeResolver()); _serviceProviderMock.Setup(x => x.GetService(typeof(ILoggerFactory))).Returns(_loggerFactoryMock.Object); _serviceProviderMock.Setup(x => x.GetService(typeof(IEnumerable))).Returns(Array.Empty()); + _serviceProviderMock.Setup(x => x.GetService(typeof(ICurrentTimeProvider))).Returns(new CurrentTimeProvider()); _loggerFactoryMock.Setup(x => x.CreateLogger(It.IsAny())).Returns(_loggerMock.Object); diff --git a/src/Tests/SlimMessageBus.Host.Test/MessageBusBaseTests.cs b/src/Tests/SlimMessageBus.Host.Test/MessageBusBaseTests.cs index 0b0fea6f..57c27fbb 100644 --- a/src/Tests/SlimMessageBus.Host.Test/MessageBusBaseTests.cs +++ b/src/Tests/SlimMessageBus.Host.Test/MessageBusBaseTests.cs @@ -1,7 +1,5 @@ namespace SlimMessageBus.Host.Test; -using System.Threading; - using Moq.Protected; using SlimMessageBus.Host.Test.Common; @@ -29,9 +27,13 @@ public MessageBusBaseTests() _producedMessages = []; + var currentTimeProviderMock = new Mock(); + currentTimeProviderMock.SetupGet(x => x.CurrentTime).Returns(() => _timeNow); + _serviceProviderMock = new Mock(); _serviceProviderMock.Setup(x => x.GetService(typeof(IMessageSerializer))).Returns(new JsonMessageSerializer()); _serviceProviderMock.Setup(x => x.GetService(typeof(IMessageTypeResolver))).Returns(new AssemblyQualifiedNameMessageTypeResolver()); + _serviceProviderMock.Setup(x => x.GetService(typeof(ICurrentTimeProvider))).Returns(new CurrentTimeProvider()); _serviceProviderMock.Setup(x => x.GetService(It.Is(t => t.IsGenericType && t.GetGenericTypeDefinition() == typeof(IEnumerable<>)))).Returns((Type t) => Array.CreateInstance(t.GetGenericArguments()[0], 0)); BusBuilder = MessageBusBuilder.Create() @@ -52,10 +54,9 @@ public MessageBusBaseTests() .WithDependencyResolver(_serviceProviderMock.Object) .WithProvider(s => { - return new MessageBusTested(s) + return new MessageBusTested(s, currentTimeProviderMock.Object) { // provide current time - CurrentTimeProvider = () => _timeNow, OnProduced = (mt, n, m) => _producedMessages.Add(new(mt, n, m)) }; }); @@ -497,6 +498,7 @@ public async Task When_Publish_Given_InterceptorsInDI_Then_InterceptorInfluenceI _serviceProviderMock.Verify(x => x.GetService(typeof(ILoggerFactory)), Times.Once); _serviceProviderMock.Verify(x => x.GetService(typeof(IMessageSerializer)), Times.Between(0, 1, Moq.Range.Inclusive)); _serviceProviderMock.Verify(x => x.GetService(typeof(IMessageTypeResolver)), Times.Once); + _serviceProviderMock.Verify(x => x.GetService(typeof(ICurrentTimeProvider)), Times.Once); _serviceProviderMock.VerifyNoOtherCalls(); if (producerInterceptorCallsNext != null) @@ -596,6 +598,7 @@ public async Task When_Send_Given_InterceptorsInDI_Then_InterceptorInfluenceIfTh _serviceProviderMock.Verify(x => x.GetService(typeof(ILoggerFactory)), Times.Once); _serviceProviderMock.Verify(x => x.GetService(typeof(IMessageSerializer)), Times.Between(0, 1, Moq.Range.Inclusive)); _serviceProviderMock.Verify(x => x.GetService(typeof(IMessageTypeResolver)), Times.Once); + _serviceProviderMock.Verify(x => x.GetService(typeof(ICurrentTimeProvider)), Times.Once); _serviceProviderMock.VerifyNoOtherCalls(); if (producerInterceptorCallsNext != null) @@ -679,6 +682,7 @@ public async Task When_Given_NoReplyToHeader_DoNothing() var mockServiceProvider = new Mock(); mockServiceProvider.Setup(x => x.GetService(typeof(IMessageTypeResolver))).Returns(mockMessageTypeResolver.Object); + mockServiceProvider.Setup(x => x.GetService(typeof(ICurrentTimeProvider))).Returns(new CurrentTimeProvider()); var mockMessageTypeConsumerInvokerSettings = new Mock(); mockMessageTypeConsumerInvokerSettings.SetupGet(x => x.ParentSettings).Returns(() => new ConsumerSettings() { ResponseType = response.GetType() }); diff --git a/src/Tests/SlimMessageBus.Host.Test/MessageBusTested.cs b/src/Tests/SlimMessageBus.Host.Test/MessageBusTested.cs index 0e3e427c..e2696828 100644 --- a/src/Tests/SlimMessageBus.Host.Test/MessageBusTested.cs +++ b/src/Tests/SlimMessageBus.Host.Test/MessageBusTested.cs @@ -5,12 +5,13 @@ public class MessageBusTested : MessageBusBase internal int _startedCount; internal int _stoppedCount; - public MessageBusTested(MessageBusSettings settings) + public MessageBusTested(MessageBusSettings settings, ICurrentTimeProvider currentTimeProvider) : base(settings) { // by default no responses will arrive OnReply = (type, payload, req) => null; - + + CurrentTimeProvider = currentTimeProvider; OnBuildProvider(); } @@ -78,12 +79,8 @@ protected override async Task> ProduceToTranspor return new(dispatched, null); } - public override DateTimeOffset CurrentTime => CurrentTimeProvider(); - #endregion - public Func CurrentTimeProvider { get; set; } = () => DateTimeOffset.UtcNow; - public void TriggerPendingRequestCleanup() { PendingRequestManager.CleanPendingRequests();