Skip to content

Commit

Permalink
Support for UUIDv7 values
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Maruszak <[email protected]>
  • Loading branch information
zarusz committed Oct 9, 2024
1 parent c2cbd1f commit 6812d0a
Show file tree
Hide file tree
Showing 44 changed files with 440 additions and 222 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
namespace SlimMessageBus.Host.Outbox.DbContext;

using Microsoft.Extensions.DependencyInjection.Extensions;

using SlimMessageBus.Host;
using SlimMessageBus.Host.Outbox.Sql;
using SlimMessageBus.Host.Sql.Common;
Expand All @@ -14,6 +12,20 @@ public static MessageBusBuilder AddOutboxUsingDbContext<TDbContext>(this Message
mbb.PostConfigurationActions.Add(services =>
{
services.TryAddScoped<ISqlTransactionService, DbContextTransactionService<TDbContext>>();
services.TryAddScoped(svp =>
{
var settings = svp.GetRequiredService<SqlOutboxSettings>();
return new DbContextOutboxRepository<TDbContext>(
svp.GetRequiredService<ILogger<DbContextOutboxRepository<TDbContext>>>(),
settings,
svp.GetRequiredService<SqlOutboxTemplate>(),
settings.GuidGenerator ?? (IGuidGenerator)svp.GetRequiredService(settings.GuidGeneratorType),
svp.GetRequiredService<ICurrentTimeProvider>(),
svp.GetRequiredService<IInstanceIdProvider>(),
svp.GetRequiredService<TDbContext>(),
svp.GetRequiredService<ISqlTransactionService>()
);
});
});
return mbb.AddOutboxUsingSql<DbContextOutboxRepository<TDbContext>>(configure);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,32 @@

using Microsoft.Data.SqlClient;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;

using SlimMessageBus.Host.Outbox.Sql;
using SlimMessageBus.Host.Sql.Common;

public class DbContextOutboxRepository<TDbContext> : SqlOutboxRepository where TDbContext : DbContext
public class DbContextOutboxRepository<TDbContext> : SqlOutboxMessageRepository where TDbContext : DbContext
{
public TDbContext DbContext { get; }

public DbContextOutboxRepository(
ILogger<SqlOutboxRepository> logger,
ILogger<SqlOutboxMessageRepository> logger,
SqlOutboxSettings settings,
SqlOutboxTemplate sqlOutboxTemplate,
IGuidGenerator guidGenerator,
ICurrentTimeProvider currentTimeProvider,
IInstanceIdProvider instanceIdProvider,
TDbContext dbContext,
ISqlTransactionService transactionService)
: base(logger, settings, sqlOutboxTemplate, (SqlConnection)dbContext.Database.GetDbConnection(), transactionService)
: base(
logger,
settings,
sqlOutboxTemplate,
guidGenerator,
currentTimeProvider,
instanceIdProvider,
(SqlConnection)dbContext.Database.GetDbConnection(),
transactionService)
{
DbContext = dbContext;
}
Expand Down
3 changes: 3 additions & 0 deletions src/SlimMessageBus.Host.Outbox.DbContext/GlobalUsings.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
global using Microsoft.Extensions.DependencyInjection;
global using Microsoft.Extensions.DependencyInjection.Extensions;
global using Microsoft.Extensions.Logging;
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
public static class MessageBusBuilderExtensions
{
public static MessageBusBuilder AddOutboxUsingSql<TOutboxRepository>(this MessageBusBuilder mbb, Action<SqlOutboxSettings> configure)
where TOutboxRepository : class, ISqlOutboxRepository
where TOutboxRepository : class, ISqlMessageOutboxRepository
{
mbb.AddOutbox();

Expand Down Expand Up @@ -33,11 +33,10 @@ public static MessageBusBuilder AddOutboxUsingSql<TOutboxRepository>(this Messag
services.TryAddEnumerable(ServiceDescriptor.Transient(serviceType, implementationType));
}
services.TryAddScoped<TOutboxRepository>();
services.TryAddScoped<ISqlTransactionService, SqlTransactionService>();
services.Replace(ServiceDescriptor.Scoped<ISqlOutboxRepository>(svp => svp.GetRequiredService<TOutboxRepository>()));
services.Replace(ServiceDescriptor.Scoped<IOutboxRepository>(svp => svp.GetRequiredService<TOutboxRepository>()));
services.Replace(ServiceDescriptor.Scoped<ISqlMessageOutboxRepository>(svp => svp.GetRequiredService<TOutboxRepository>()));
services.Replace(ServiceDescriptor.Scoped<IOutboxMessageRepository>(svp => svp.GetRequiredService<TOutboxRepository>()));
services.TryAddSingleton<SqlOutboxTemplate>();
services.TryAddTransient<IOutboxMigrationService, SqlOutboxMigrationService>();
Expand All @@ -46,5 +45,24 @@ public static MessageBusBuilder AddOutboxUsingSql<TOutboxRepository>(this Messag
}

public static MessageBusBuilder AddOutboxUsingSql(this MessageBusBuilder mbb, Action<SqlOutboxSettings> configure)
=> mbb.AddOutboxUsingSql<SqlOutboxRepository>(configure);
{
mbb.PostConfigurationActions.Add(services =>
{
services.TryAddScoped(svp =>
{
var settings = svp.GetRequiredService<SqlOutboxSettings>();
return new SqlOutboxMessageRepository(
svp.GetRequiredService<ILogger<SqlOutboxMessageRepository>>(),
settings,
svp.GetRequiredService<SqlOutboxTemplate>(),
settings.GuidGenerator ?? (IGuidGenerator)svp.GetRequiredService(settings.GuidGeneratorType),
svp.GetRequiredService<ICurrentTimeProvider>(),
svp.GetRequiredService<IInstanceIdProvider>(),
svp.GetRequiredService<SqlConnection>(),
svp.GetRequiredService<ISqlTransactionService>()
);
});
});
return mbb.AddOutboxUsingSql<SqlOutboxMessageRepository>(configure);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
namespace SlimMessageBus.Host.Outbox.Sql;

public interface ISqlMessageOutboxRepository : IOutboxMessageRepository
{
}
5 changes: 0 additions & 5 deletions src/SlimMessageBus.Host.Outbox.Sql/ISqlOutboxRepository.cs

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,23 +1,64 @@
namespace SlimMessageBus.Host.Outbox.Sql;

public class SqlOutboxRepository : CommonSqlRepository, ISqlOutboxRepository
/// <summary>
/// The MS SQL implmentation of the <see cref="IOutboxMessageRepository"/>
/// </summary>
public class SqlOutboxMessageRepository : CommonSqlRepository, ISqlMessageOutboxRepository
{
/// <summary>
/// Used to serialize the headers dictionary to JSON
/// </summary>
private static readonly JsonSerializerOptions _jsonOptions = new()
{
Converters = { new ObjectToInferredTypesConverter() }
};

private readonly SqlOutboxTemplate _sqlTemplate;
private readonly JsonSerializerOptions _jsonOptions;
private readonly IGuidGenerator _guidGenerator;
private readonly ICurrentTimeProvider _currentTimeProvider;
private readonly IInstanceIdProvider _instanceIdProvider;

protected SqlOutboxSettings Settings { get; }

public SqlOutboxRepository(ILogger<SqlOutboxRepository> logger, SqlOutboxSettings settings, SqlOutboxTemplate sqlOutboxTemplate, SqlConnection connection, ISqlTransactionService transactionService)
public SqlOutboxMessageRepository(
ILogger<SqlOutboxMessageRepository> logger,
SqlOutboxSettings settings,
SqlOutboxTemplate sqlOutboxTemplate,
IGuidGenerator guidGenerator,
ICurrentTimeProvider currentTimeProvider,
IInstanceIdProvider instanceIdProvider,
SqlConnection connection,
ISqlTransactionService transactionService)
: base(logger, settings.SqlSettings, connection, transactionService)
{
_sqlTemplate = sqlOutboxTemplate;
_jsonOptions = new();
_jsonOptions.Converters.Add(new ObjectToInferredTypesConverter());

_guidGenerator = guidGenerator;
_currentTimeProvider = currentTimeProvider;
_instanceIdProvider = instanceIdProvider;
Settings = settings;
}

public async virtual Task Save(OutboxMessage message, CancellationToken token)
public virtual async Task<Guid> Create(string busName, IDictionary<string, object> headers, string path, string messageType, byte[] messagePayload, CancellationToken cancellationToken)
{
var outboxMessage = new OutboxMessage
{
Id = _guidGenerator.NewGuid(),
Timestamp = _currentTimeProvider.CurrentTime.DateTime,
InstanceId = _instanceIdProvider.GetInstanceId(),

BusName = busName,
Headers = headers,
Path = path,
MessageType = messageType,
MessagePayload = messagePayload,
};

await Save(outboxMessage, cancellationToken);

return outboxMessage.Id;
}

protected async virtual Task Save(OutboxMessage message, CancellationToken cancellationToken)
{
await EnsureConnection();

Expand All @@ -36,10 +77,10 @@ await ExecuteNonQuery(Settings.SqlSettings.OperationRetry, _sqlTemplate.SqlOutbo
cmd.Parameters.Add("@DeliveryAttempt", SqlDbType.Int).Value = message.DeliveryAttempt;
cmd.Parameters.Add("@DeliveryComplete", SqlDbType.Bit).Value = message.DeliveryComplete;
cmd.Parameters.Add("@DeliveryAborted", SqlDbType.Bit).Value = message.DeliveryAborted;
}, token);
}, cancellationToken);
}

public async Task<IReadOnlyCollection<OutboxMessage>> LockAndSelect(string instanceId, int batchSize, bool tableLock, TimeSpan lockDuration, CancellationToken token)
public async Task<IReadOnlyCollection<OutboxMessage>> LockAndSelect(string instanceId, int batchSize, bool tableLock, TimeSpan lockDuration, CancellationToken cancellationToken)
{
await EnsureConnection();

Expand All @@ -49,10 +90,10 @@ public async Task<IReadOnlyCollection<OutboxMessage>> LockAndSelect(string insta
cmd.Parameters.Add("@BatchSize", SqlDbType.Int).Value = batchSize;
cmd.Parameters.Add("@LockDuration", SqlDbType.Int).Value = lockDuration.TotalSeconds;

return await ReadMessages(cmd, token).ConfigureAwait(false);
return await SqlOutboxMessageRepository.ReadMessages(cmd, cancellationToken).ConfigureAwait(false);
}

public async Task AbortDelivery(IReadOnlyCollection<Guid> ids, CancellationToken token)
public async Task AbortDelivery(IReadOnlyCollection<Guid> ids, CancellationToken cancellationToken)
{
if (ids.Count == 0)
{
Expand All @@ -67,15 +108,15 @@ public async Task AbortDelivery(IReadOnlyCollection<Guid> ids, CancellationToken
{
cmd.Parameters.AddWithValue("@Ids", ToIdsString(ids));
},
token: token);
token: cancellationToken);

if (affected != ids.Count)
{
throw new MessageBusException($"The number of affected rows was {affected}, but {ids.Count} was expected");
}
}

public async Task UpdateToSent(IReadOnlyCollection<Guid> ids, CancellationToken token)
public async Task UpdateToSent(IReadOnlyCollection<Guid> ids, CancellationToken cancellationToken)
{
if (ids.Count == 0)
{
Expand All @@ -90,7 +131,7 @@ public async Task UpdateToSent(IReadOnlyCollection<Guid> ids, CancellationToken
{
cmd.Parameters.AddWithValue("@Ids", ToIdsString(ids));
},
token: token);
token: cancellationToken);

if (affected != ids.Count)
{
Expand All @@ -100,7 +141,7 @@ public async Task UpdateToSent(IReadOnlyCollection<Guid> ids, CancellationToken

private string ToIdsString(IReadOnlyCollection<Guid> ids) => string.Join(_sqlTemplate.InIdsSeparator, ids);

public async Task IncrementDeliveryAttempt(IReadOnlyCollection<Guid> ids, int maxDeliveryAttempts, CancellationToken token)
public async Task IncrementDeliveryAttempt(IReadOnlyCollection<Guid> ids, int maxDeliveryAttempts, CancellationToken cancellationToken)
{
if (ids.Count == 0)
{
Expand All @@ -121,28 +162,28 @@ public async Task IncrementDeliveryAttempt(IReadOnlyCollection<Guid> ids, int ma
cmd.Parameters.AddWithValue("@Ids", ToIdsString(ids));
cmd.Parameters.AddWithValue("@MaxDeliveryAttempts", maxDeliveryAttempts);
},
token: token);
token: cancellationToken);

if (affected != ids.Count)
{
throw new MessageBusException($"The number of affected rows was {affected}, but {ids.Count} was expected");
}
}

public async Task DeleteSent(DateTime olderThan, CancellationToken token)
public async Task DeleteSent(DateTime olderThan, CancellationToken cancellationToken)
{
await EnsureConnection();

var affected = await ExecuteNonQuery(
Settings.SqlSettings.OperationRetry,
_sqlTemplate.SqlOutboxMessageDeleteSent,
cmd => cmd.Parameters.Add("@Timestamp", SqlDbType.DateTime2).Value = olderThan,
token);
cmd => cmd.Parameters.Add("@Timestamp", SqlDbType.DateTimeOffset).Value = olderThan,
cancellationToken);

Logger.Log(affected > 0 ? LogLevel.Information : LogLevel.Debug, "Removed {MessageCount} sent messages from outbox table", affected);
}

public async Task<bool> RenewLock(string instanceId, TimeSpan lockDuration, CancellationToken token)
public async Task<bool> RenewLock(string instanceId, TimeSpan lockDuration, CancellationToken cancellationToken)
{
await EnsureConnection();

Expand All @@ -151,7 +192,7 @@ public async Task<bool> RenewLock(string instanceId, TimeSpan lockDuration, Canc
cmd.Parameters.Add("@InstanceId", SqlDbType.NVarChar).Value = instanceId;
cmd.Parameters.Add("@LockDuration", SqlDbType.Int).Value = lockDuration.TotalSeconds;

return await cmd.ExecuteNonQueryAsync(token) > 0;
return await cmd.ExecuteNonQueryAsync(cancellationToken) > 0;
}

internal async Task<IReadOnlyCollection<OutboxMessage>> GetAllMessages(CancellationToken cancellationToken)
Expand All @@ -164,7 +205,7 @@ internal async Task<IReadOnlyCollection<OutboxMessage>> GetAllMessages(Cancellat
return await ReadMessages(cmd, cancellationToken).ConfigureAwait(false);
}

private async Task<IReadOnlyCollection<OutboxMessage>> ReadMessages(SqlCommand cmd, CancellationToken cancellationToken)
private static async Task<IReadOnlyCollection<OutboxMessage>> ReadMessages(SqlCommand cmd, CancellationToken cancellationToken)
{
using var reader = await cmd.ExecuteReaderAsync(cancellationToken);

Expand All @@ -185,23 +226,34 @@ private async Task<IReadOnlyCollection<OutboxMessage>> ReadMessages(SqlCommand c
var items = new List<OutboxMessage>();
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
var id = reader.GetGuid(idOrdinal);
var headers = reader.IsDBNull(headersOrdinal) ? null : reader.GetString(headersOrdinal);
var headers = reader.IsDBNull(headersOrdinal)
? null
: reader.GetString(headersOrdinal);
var message = new OutboxMessage
{
Id = id,
#pragma warning disable 1998
Id = reader.GetGuid(idOrdinal),
Timestamp = reader.GetDateTime(timestampOrdinal),
BusName = reader.GetString(busNameOrdinal),
MessageType = reader.GetString(typeOrdinal),
MessagePayload = reader.GetSqlBinary(payloadOrdinal).Value,
Headers = headers == null ? null : JsonSerializer.Deserialize<IDictionary<string, object>>(headers, _jsonOptions),
Path = reader.IsDBNull(pathOrdinal) ? null : reader.GetString(pathOrdinal),
Headers = headers == null
? null
: JsonSerializer.Deserialize<IDictionary<string, object>>(headers, _jsonOptions),
Path = reader.IsDBNull(pathOrdinal)
? null
: reader.GetString(pathOrdinal),
InstanceId = reader.GetString(instanceIdOrdinal),
LockInstanceId = reader.IsDBNull(lockInstanceIdOrdinal) ? null : reader.GetString(lockInstanceIdOrdinal),
LockExpiresOn = reader.IsDBNull(lockExpiresOnOrdinal) ? null : reader.GetDateTime(lockExpiresOnOrdinal),
LockInstanceId = reader.IsDBNull(lockInstanceIdOrdinal)
? null
: reader.GetString(lockInstanceIdOrdinal),
LockExpiresOn = reader.IsDBNull(lockExpiresOnOrdinal)
? null
: reader.GetDateTime(lockExpiresOnOrdinal),
DeliveryAttempt = reader.GetInt32(deliveryAttemptOrdinal),
DeliveryComplete = reader.GetBoolean(deliveryCompleteOrdinal),
DeliveryAborted = reader.GetBoolean(deliveryAbortedOrdinal)
#pragma warning restore 1998
};

items.Add(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

public class SqlOutboxMigrationService : CommonSqlMigrationService<CommonSqlRepository, SqlSettings>, IOutboxMigrationService
{
public SqlOutboxMigrationService(ILogger<SqlOutboxMigrationService> logger, ISqlOutboxRepository repository, ISqlTransactionService transactionService, SqlOutboxSettings settings)
public SqlOutboxMigrationService(ILogger<SqlOutboxMigrationService> logger, ISqlMessageOutboxRepository repository, ISqlTransactionService transactionService, SqlOutboxSettings settings)
: base(logger, (CommonSqlRepository)repository, transactionService, settings.SqlSettings)
{
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ public static MessageBusBuilder AddOutbox(this MessageBusBuilder mbb, Action<Out
services.AddSingleton<OutboxSendingTask>();
services.TryAddEnumerable(ServiceDescriptor.Singleton<IMessageBusLifecycleInterceptor, OutboxSendingTask>(sp => sp.GetRequiredService<OutboxSendingTask>()));
services.TryAdd(ServiceDescriptor.Singleton<IOutboxNotificationService, OutboxSendingTask>(sp => sp.GetRequiredService<OutboxSendingTask>()));
services.TryAddSingleton<IOutboxNotificationService>(sp => sp.GetRequiredService<OutboxSendingTask>());
services.TryAddSingleton<IInstanceIdProvider, DefaultInstanceIdProvider>();
services.TryAddSingleton<IOutboxLockRenewalTimerFactory, OutboxLockRenewalTimerFactory>();
services.TryAddScoped<IOutboxMessageFactory>(svp => svp.GetRequiredService<IOutboxMessageRepository>());
services.TryAddSingleton(svp =>
{
Expand Down
Loading

0 comments on commit 6812d0a

Please sign in to comment.