Skip to content

Commit

Permalink
Support for UUIDv7 values
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Maruszak <[email protected]>
  • Loading branch information
zarusz committed Oct 6, 2024
1 parent c2cbd1f commit f0a2799
Show file tree
Hide file tree
Showing 40 changed files with 367 additions and 205 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,28 @@
using SlimMessageBus.Host.Outbox.Sql;
using SlimMessageBus.Host.Sql.Common;

public class DbContextOutboxRepository<TDbContext> : SqlOutboxRepository where TDbContext : DbContext
public class DbContextOutboxRepository<TDbContext> : SqlOutboxMessageRepository where TDbContext : DbContext
{
public TDbContext DbContext { get; }

public DbContextOutboxRepository(
ILogger<SqlOutboxRepository> logger,
ILogger<SqlOutboxMessageRepository> logger,
SqlOutboxSettings settings,
SqlOutboxTemplate sqlOutboxTemplate,
IGuidGenerator guidGenerator,
ICurrentTimeProvider currentTimeProvider,
IInstanceIdProvider instanceIdProvider,
TDbContext dbContext,
ISqlTransactionService transactionService)
: base(logger, settings, sqlOutboxTemplate, (SqlConnection)dbContext.Database.GetDbConnection(), transactionService)
: base(
logger,
settings,
sqlOutboxTemplate,
guidGenerator,
currentTimeProvider,
instanceIdProvider,
(SqlConnection)dbContext.Database.GetDbConnection(),
transactionService)
{
DbContext = dbContext;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public static MessageBusBuilder AddOutboxUsingSql<TOutboxRepository>(this Messag
services.TryAddScoped<ISqlTransactionService, SqlTransactionService>();
services.Replace(ServiceDescriptor.Scoped<ISqlOutboxRepository>(svp => svp.GetRequiredService<TOutboxRepository>()));
services.Replace(ServiceDescriptor.Scoped<IOutboxRepository>(svp => svp.GetRequiredService<TOutboxRepository>()));
services.Replace(ServiceDescriptor.Scoped<IOutboxMessageRepository>(svp => svp.GetRequiredService<TOutboxRepository>()));
services.TryAddSingleton<SqlOutboxTemplate>();
services.TryAddTransient<IOutboxMigrationService, SqlOutboxMigrationService>();
Expand All @@ -46,5 +46,5 @@ public static MessageBusBuilder AddOutboxUsingSql<TOutboxRepository>(this Messag
}

public static MessageBusBuilder AddOutboxUsingSql(this MessageBusBuilder mbb, Action<SqlOutboxSettings> configure)
=> mbb.AddOutboxUsingSql<SqlOutboxRepository>(configure);
=> mbb.AddOutboxUsingSql<SqlOutboxMessageRepository>(configure);
}
2 changes: 1 addition & 1 deletion src/SlimMessageBus.Host.Outbox.Sql/ISqlOutboxRepository.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
namespace SlimMessageBus.Host.Outbox.Sql;

public interface ISqlOutboxRepository : IOutboxRepository
public interface ISqlOutboxRepository : IOutboxMessageRepository
{
}
Original file line number Diff line number Diff line change
@@ -1,23 +1,58 @@
namespace SlimMessageBus.Host.Outbox.Sql;

public class SqlOutboxRepository : CommonSqlRepository, ISqlOutboxRepository
public class SqlOutboxMessageRepository : CommonSqlRepository, ISqlOutboxRepository
{
private static readonly JsonSerializerOptions _jsonOptions = new()
{
Converters = { new ObjectToInferredTypesConverter() }
};

private readonly SqlOutboxTemplate _sqlTemplate;
private readonly JsonSerializerOptions _jsonOptions;
private readonly IGuidGenerator _guidGenerator;
private readonly ICurrentTimeProvider _currentTimeProvider;
private readonly IInstanceIdProvider _instanceIdProvider;

protected SqlOutboxSettings Settings { get; }

public SqlOutboxRepository(ILogger<SqlOutboxRepository> logger, SqlOutboxSettings settings, SqlOutboxTemplate sqlOutboxTemplate, SqlConnection connection, ISqlTransactionService transactionService)
public SqlOutboxMessageRepository(
ILogger<SqlOutboxMessageRepository> logger,
SqlOutboxSettings settings,
SqlOutboxTemplate sqlOutboxTemplate,
IGuidGenerator guidGenerator,
ICurrentTimeProvider currentTimeProvider,
IInstanceIdProvider instanceIdProvider,
SqlConnection connection,
ISqlTransactionService transactionService)
: base(logger, settings.SqlSettings, connection, transactionService)
{
_sqlTemplate = sqlOutboxTemplate;
_jsonOptions = new();
_jsonOptions.Converters.Add(new ObjectToInferredTypesConverter());

_guidGenerator = guidGenerator;
_currentTimeProvider = currentTimeProvider;
_instanceIdProvider = instanceIdProvider;
Settings = settings;
}

public async virtual Task Save(OutboxMessage message, CancellationToken token)
public virtual async Task<Guid> Create(string busName, IDictionary<string, object> headers, string path, string messageType, byte[] messagePayload, CancellationToken cancellationToken)
{
var outboxMessage = new OutboxMessage
{
Id = _guidGenerator.NewGuid(),
Timestamp = _currentTimeProvider.CurrentTime.DateTime,
InstanceId = _instanceIdProvider.GetInstanceId(),

BusName = busName,
Headers = headers,
Path = path,
MessageType = messageType,
MessagePayload = messagePayload,
};

await Save(outboxMessage, cancellationToken);

return outboxMessage.Id;
}

protected async virtual Task Save(OutboxMessage message, CancellationToken cancellationToken)
{
await EnsureConnection();

Expand All @@ -36,10 +71,10 @@ await ExecuteNonQuery(Settings.SqlSettings.OperationRetry, _sqlTemplate.SqlOutbo
cmd.Parameters.Add("@DeliveryAttempt", SqlDbType.Int).Value = message.DeliveryAttempt;
cmd.Parameters.Add("@DeliveryComplete", SqlDbType.Bit).Value = message.DeliveryComplete;
cmd.Parameters.Add("@DeliveryAborted", SqlDbType.Bit).Value = message.DeliveryAborted;
}, token);
}, cancellationToken);
}

public async Task<IReadOnlyCollection<OutboxMessage>> LockAndSelect(string instanceId, int batchSize, bool tableLock, TimeSpan lockDuration, CancellationToken token)
public async Task<IReadOnlyCollection<OutboxMessage>> LockAndSelect(string instanceId, int batchSize, bool tableLock, TimeSpan lockDuration, CancellationToken cancellationToken)
{
await EnsureConnection();

Expand All @@ -49,10 +84,10 @@ public async Task<IReadOnlyCollection<OutboxMessage>> LockAndSelect(string insta
cmd.Parameters.Add("@BatchSize", SqlDbType.Int).Value = batchSize;
cmd.Parameters.Add("@LockDuration", SqlDbType.Int).Value = lockDuration.TotalSeconds;

return await ReadMessages(cmd, token).ConfigureAwait(false);
return await ReadMessages(cmd, cancellationToken).ConfigureAwait(false);
}

public async Task AbortDelivery(IReadOnlyCollection<Guid> ids, CancellationToken token)
public async Task AbortDelivery(IReadOnlyCollection<Guid> ids, CancellationToken cancellationToken)
{
if (ids.Count == 0)
{
Expand All @@ -67,15 +102,15 @@ public async Task AbortDelivery(IReadOnlyCollection<Guid> ids, CancellationToken
{
cmd.Parameters.AddWithValue("@Ids", ToIdsString(ids));
},
token: token);
token: cancellationToken);

if (affected != ids.Count)
{
throw new MessageBusException($"The number of affected rows was {affected}, but {ids.Count} was expected");
}
}

public async Task UpdateToSent(IReadOnlyCollection<Guid> ids, CancellationToken token)
public async Task UpdateToSent(IReadOnlyCollection<Guid> ids, CancellationToken cancellationToken)
{
if (ids.Count == 0)
{
Expand All @@ -90,7 +125,7 @@ public async Task UpdateToSent(IReadOnlyCollection<Guid> ids, CancellationToken
{
cmd.Parameters.AddWithValue("@Ids", ToIdsString(ids));
},
token: token);
token: cancellationToken);

if (affected != ids.Count)
{
Expand All @@ -100,7 +135,7 @@ public async Task UpdateToSent(IReadOnlyCollection<Guid> ids, CancellationToken

private string ToIdsString(IReadOnlyCollection<Guid> ids) => string.Join(_sqlTemplate.InIdsSeparator, ids);

public async Task IncrementDeliveryAttempt(IReadOnlyCollection<Guid> ids, int maxDeliveryAttempts, CancellationToken token)
public async Task IncrementDeliveryAttempt(IReadOnlyCollection<Guid> ids, int maxDeliveryAttempts, CancellationToken cancellationToken)
{
if (ids.Count == 0)
{
Expand All @@ -121,28 +156,28 @@ public async Task IncrementDeliveryAttempt(IReadOnlyCollection<Guid> ids, int ma
cmd.Parameters.AddWithValue("@Ids", ToIdsString(ids));
cmd.Parameters.AddWithValue("@MaxDeliveryAttempts", maxDeliveryAttempts);
},
token: token);
token: cancellationToken);

if (affected != ids.Count)
{
throw new MessageBusException($"The number of affected rows was {affected}, but {ids.Count} was expected");
}
}

public async Task DeleteSent(DateTime olderThan, CancellationToken token)
public async Task DeleteSent(DateTime olderThan, CancellationToken cancellationToken)
{
await EnsureConnection();

var affected = await ExecuteNonQuery(
Settings.SqlSettings.OperationRetry,
_sqlTemplate.SqlOutboxMessageDeleteSent,
cmd => cmd.Parameters.Add("@Timestamp", SqlDbType.DateTime2).Value = olderThan,
token);
cmd => cmd.Parameters.Add("@Timestamp", SqlDbType.DateTimeOffset).Value = olderThan,
cancellationToken);

Logger.Log(affected > 0 ? LogLevel.Information : LogLevel.Debug, "Removed {MessageCount} sent messages from outbox table", affected);
}

public async Task<bool> RenewLock(string instanceId, TimeSpan lockDuration, CancellationToken token)
public async Task<bool> RenewLock(string instanceId, TimeSpan lockDuration, CancellationToken cancellationToken)
{
await EnsureConnection();

Expand All @@ -151,7 +186,7 @@ public async Task<bool> RenewLock(string instanceId, TimeSpan lockDuration, Canc
cmd.Parameters.Add("@InstanceId", SqlDbType.NVarChar).Value = instanceId;
cmd.Parameters.Add("@LockDuration", SqlDbType.Int).Value = lockDuration.TotalSeconds;

return await cmd.ExecuteNonQueryAsync(token) > 0;
return await cmd.ExecuteNonQueryAsync(cancellationToken) > 0;
}

internal async Task<IReadOnlyCollection<OutboxMessage>> GetAllMessages(CancellationToken cancellationToken)
Expand Down Expand Up @@ -185,11 +220,10 @@ private async Task<IReadOnlyCollection<OutboxMessage>> ReadMessages(SqlCommand c
var items = new List<OutboxMessage>();
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
var id = reader.GetGuid(idOrdinal);
var headers = reader.IsDBNull(headersOrdinal) ? null : reader.GetString(headersOrdinal);
var message = new OutboxMessage
{
Id = id,
Id = reader.GetGuid(idOrdinal),
Timestamp = reader.GetDateTime(timestampOrdinal),
BusName = reader.GetString(busNameOrdinal),
MessageType = reader.GetString(typeOrdinal),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ public static MessageBusBuilder AddOutbox(this MessageBusBuilder mbb, Action<Out
services.AddSingleton<OutboxSendingTask>();
services.TryAddEnumerable(ServiceDescriptor.Singleton<IMessageBusLifecycleInterceptor, OutboxSendingTask>(sp => sp.GetRequiredService<OutboxSendingTask>()));
services.TryAdd(ServiceDescriptor.Singleton<IOutboxNotificationService, OutboxSendingTask>(sp => sp.GetRequiredService<OutboxSendingTask>()));
services.TryAddSingleton<IOutboxNotificationService>(sp => sp.GetRequiredService<OutboxSendingTask>());
services.TryAddSingleton<IInstanceIdProvider, DefaultInstanceIdProvider>();
services.TryAddSingleton<IOutboxLockRenewalTimerFactory, OutboxLockRenewalTimerFactory>();
services.TryAddScoped<IOutboxMessageFactory>(svp => svp.GetRequiredService<IOutboxMessageRepository>());
services.TryAddSingleton(svp =>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,14 @@ public class OutboxSettings
/// Sent message cleanup settings.
/// </summary>
public OutboxMessageCleanupSettings MessageCleanup { get; set; } = new();

/// <summary>
/// Type resolver which is responsible for converting message type into the Outbox table column MessageType
/// </summary>
public IMessageTypeResolver MessageTypeResolver { get; set; } = new AssemblyQualifiedNameMessageTypeResolver();

// ToDo: Revisit this
/// <summary>
/// The type to resolve from MSDI for <see cref="IGuidGenerator"/>. Default is <see cref="IGuidGenerator"/>.
/// </summary>
public Type GuidGeneratorType { get; set; } = typeof(IGuidGenerator);
}
4 changes: 2 additions & 2 deletions src/SlimMessageBus.Host.Outbox/IOutboxNotificationService.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
namespace SlimMessageBus.Host.Outbox.Services;

namespace SlimMessageBus.Host.Outbox;

public interface IOutboxNotificationService
{
/// <summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
namespace SlimMessageBus.Host.Outbox;

using Microsoft.Extensions.Logging;

using SlimMessageBus.Host.Outbox.Services;

public abstract class OutboxForwardingPublishInterceptor
{
}
Expand All @@ -14,20 +10,14 @@ public abstract class OutboxForwardingPublishInterceptor
/// </remarks>
public sealed class OutboxForwardingPublishInterceptor<T>(
ILogger<OutboxForwardingPublishInterceptor> logger,
IOutboxRepository outboxRepository,
IInstanceIdProvider instanceIdProvider,
IOutboxMessageFactory outboxMessageFactory,
IOutboxNotificationService outboxNotificationService,
OutboxSettings outboxSettings)
: OutboxForwardingPublishInterceptor, IInterceptorWithOrder, IPublishInterceptor<T>, IDisposable where T : class
: OutboxForwardingPublishInterceptor, IInterceptorWithOrder, IPublishInterceptor<T>, IDisposable
where T : class
{
internal const string SkipOutboxHeader = "__SkipOutbox";

private readonly ILogger _logger = logger;
private readonly IOutboxRepository _outboxRepository = outboxRepository;
private readonly IInstanceIdProvider _instanceIdProvider = instanceIdProvider;
private readonly IOutboxNotificationService _outboxNotificationService = outboxNotificationService;
private readonly OutboxSettings _outboxSettings = outboxSettings;

private bool _notifyOutbox = false;

public int Order => int.MaxValue;
Expand All @@ -36,7 +26,7 @@ public void Dispose()
{
if (_notifyOutbox)
{
_outboxNotificationService.Notify();
outboxNotificationService.Notify();
}

GC.SuppressFinalize(this);
Expand All @@ -58,28 +48,25 @@ public async Task OnHandle(T message, Func<Task> next, IProducerContext context)
return;
}

// Forward to outbox
var messageType = message.GetType();

_logger.LogDebug("Forwarding published message of type {MessageType} to the outbox", messageType.Name);
// Forward to outbox - do not call next()

var messageType = message.GetType();
// Take the proper serializer (meant for the bus)
var messagePayload = busMaster.Serializer?.Serialize(messageType, message)
?? throw new PublishMessageBusException($"The {busMaster.Name} bus has no configured serializer, so it cannot be used with the outbox plugin");
?? throw new PublishMessageBusException($"The {busMaster.Name} bus has no configured serializer, so it cannot be used with the outbox plugin");

// Add message to the database, do not call next()
var outboxMessage = new OutboxMessage
{
BusName = busMaster.Name,
Headers = context.Headers,
Path = context.Path,
MessageType = _outboxSettings.MessageTypeResolver.ToName(messageType),
MessagePayload = messagePayload,
InstanceId = _instanceIdProvider.GetInstanceId()
};
await _outboxRepository.Save(outboxMessage, context.CancellationToken);
var outboxMessageId = await outboxMessageFactory.Create(
busName: busMaster.Name,
headers: context.Headers,
path: context.Path,
messageType: outboxSettings.MessageTypeResolver.ToName(messageType),
messagePayload: messagePayload,
cancellationToken: context.CancellationToken
);

logger.LogDebug("Forwarding published message of type {MessageType} to the outbox with Id {OutboxMessageId}", messageType.Name, outboxMessageId);

// a message was sent, notify outbox service to poll on dispose (post transaction)
// A message was sent, notify outbox service to poll on dispose (post transaction)
_notifyOutbox = true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
namespace SlimMessageBus.Host.Outbox;

/// <summary>
/// Factory for creating outbox messages
/// </summary>
public interface IOutboxMessageFactory
{
/// <summary>
/// Create a new outbox message and store it using the underlying store
/// </summary>
/// <param name="busName"></param>
/// <param name="headers"></param>
/// <param name="path"></param>
/// <param name="messageType"></param>
/// <param name="messagePayload"></param>
/// <param name="cancellationToken"></param>
/// <returns>ID of the <see cref="OutboxMessage"/>.</returns>
Task<Guid> Create(
string busName,
IDictionary<string, object> headers,
string path,
string messageType,
byte[] messagePayload,
CancellationToken cancellationToken);
}
Loading

0 comments on commit f0a2799

Please sign in to comment.