Skip to content

Commit

Permalink
Support for UUIDv7 values
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Maruszak <[email protected]>
  • Loading branch information
zarusz committed Oct 10, 2024
1 parent c2cbd1f commit c2dd2d0
Show file tree
Hide file tree
Showing 49 changed files with 530 additions and 247 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
namespace SlimMessageBus.Host.Outbox.DbContext;

using Microsoft.Extensions.DependencyInjection.Extensions;

using SlimMessageBus.Host;
using SlimMessageBus.Host.Outbox.Sql;
using SlimMessageBus.Host.Sql.Common;
Expand All @@ -14,6 +12,20 @@ public static MessageBusBuilder AddOutboxUsingDbContext<TDbContext>(this Message
mbb.PostConfigurationActions.Add(services =>
{
services.TryAddScoped<ISqlTransactionService, DbContextTransactionService<TDbContext>>();
services.TryAddScoped(svp =>
{
var settings = svp.GetRequiredService<SqlOutboxSettings>();
return new DbContextOutboxRepository<TDbContext>(
svp.GetRequiredService<ILogger<DbContextOutboxRepository<TDbContext>>>(),
settings,
svp.GetRequiredService<SqlOutboxTemplate>(),
settings.IdGeneration.GuidGenerator ?? (IGuidGenerator)svp.GetRequiredService(settings.IdGeneration.GuidGeneratorType),
svp.GetRequiredService<ICurrentTimeProvider>(),
svp.GetRequiredService<IInstanceIdProvider>(),
svp.GetRequiredService<TDbContext>(),
svp.GetRequiredService<ISqlTransactionService>()
);
});
});
return mbb.AddOutboxUsingSql<DbContextOutboxRepository<TDbContext>>(configure);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,32 @@

using Microsoft.Data.SqlClient;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;

using SlimMessageBus.Host.Outbox.Sql;
using SlimMessageBus.Host.Sql.Common;

public class DbContextOutboxRepository<TDbContext> : SqlOutboxRepository where TDbContext : DbContext
public class DbContextOutboxRepository<TDbContext> : SqlOutboxMessageRepository where TDbContext : DbContext
{
public TDbContext DbContext { get; }

public DbContextOutboxRepository(
ILogger<SqlOutboxRepository> logger,
ILogger<DbContextOutboxRepository<TDbContext>> logger,
SqlOutboxSettings settings,
SqlOutboxTemplate sqlOutboxTemplate,
IGuidGenerator guidGenerator,
ICurrentTimeProvider currentTimeProvider,
IInstanceIdProvider instanceIdProvider,
TDbContext dbContext,
ISqlTransactionService transactionService)
: base(logger, settings, sqlOutboxTemplate, (SqlConnection)dbContext.Database.GetDbConnection(), transactionService)
: base(
logger,
settings,
sqlOutboxTemplate,
guidGenerator,
currentTimeProvider,
instanceIdProvider,
(SqlConnection)dbContext.Database.GetDbConnection(),
transactionService)
{
DbContext = dbContext;
}
Expand Down
3 changes: 3 additions & 0 deletions src/SlimMessageBus.Host.Outbox.DbContext/GlobalUsings.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
global using Microsoft.Extensions.DependencyInjection;
global using Microsoft.Extensions.DependencyInjection.Extensions;
global using Microsoft.Extensions.Logging;
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
public static class MessageBusBuilderExtensions
{
public static MessageBusBuilder AddOutboxUsingSql<TOutboxRepository>(this MessageBusBuilder mbb, Action<SqlOutboxSettings> configure)
where TOutboxRepository : class, ISqlOutboxRepository
where TOutboxRepository : class, ISqlMessageOutboxRepository
{
mbb.AddOutbox();

Expand Down Expand Up @@ -33,11 +33,10 @@ public static MessageBusBuilder AddOutboxUsingSql<TOutboxRepository>(this Messag
services.TryAddEnumerable(ServiceDescriptor.Transient(serviceType, implementationType));
}
services.TryAddScoped<TOutboxRepository>();
services.TryAddScoped<ISqlTransactionService, SqlTransactionService>();
services.Replace(ServiceDescriptor.Scoped<ISqlOutboxRepository>(svp => svp.GetRequiredService<TOutboxRepository>()));
services.Replace(ServiceDescriptor.Scoped<IOutboxRepository>(svp => svp.GetRequiredService<TOutboxRepository>()));
services.Replace(ServiceDescriptor.Scoped<ISqlMessageOutboxRepository>(svp => svp.GetRequiredService<TOutboxRepository>()));
services.Replace(ServiceDescriptor.Scoped<IOutboxMessageRepository>(svp => svp.GetRequiredService<TOutboxRepository>()));
services.TryAddSingleton<SqlOutboxTemplate>();
services.TryAddTransient<IOutboxMigrationService, SqlOutboxMigrationService>();
Expand All @@ -46,5 +45,24 @@ public static MessageBusBuilder AddOutboxUsingSql<TOutboxRepository>(this Messag
}

public static MessageBusBuilder AddOutboxUsingSql(this MessageBusBuilder mbb, Action<SqlOutboxSettings> configure)
=> mbb.AddOutboxUsingSql<SqlOutboxRepository>(configure);
{
mbb.PostConfigurationActions.Add(services =>
{
services.TryAddScoped(svp =>
{
var settings = svp.GetRequiredService<SqlOutboxSettings>();
return new SqlOutboxMessageRepository(
svp.GetRequiredService<ILogger<SqlOutboxMessageRepository>>(),
settings,
svp.GetRequiredService<SqlOutboxTemplate>(),
settings.IdGeneration.GuidGenerator ?? (IGuidGenerator)svp.GetRequiredService(settings.IdGeneration.GuidGeneratorType),
svp.GetRequiredService<ICurrentTimeProvider>(),
svp.GetRequiredService<IInstanceIdProvider>(),
svp.GetRequiredService<SqlConnection>(),
svp.GetRequiredService<ISqlTransactionService>()
);
});
});
return mbb.AddOutboxUsingSql<SqlOutboxMessageRepository>(configure);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
namespace SlimMessageBus.Host.Outbox.Sql;

public interface ISqlMessageOutboxRepository : IOutboxMessageRepository
{
}
5 changes: 0 additions & 5 deletions src/SlimMessageBus.Host.Outbox.Sql/ISqlOutboxRepository.cs

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,45 +1,94 @@
namespace SlimMessageBus.Host.Outbox.Sql;

public class SqlOutboxRepository : CommonSqlRepository, ISqlOutboxRepository
/// <summary>
/// The MS SQL implmentation of the <see cref="IOutboxMessageRepository"/>
/// </summary>
public class SqlOutboxMessageRepository : CommonSqlRepository, ISqlMessageOutboxRepository
{
/// <summary>
/// Used to serialize the headers dictionary to JSON
/// </summary>
private static readonly JsonSerializerOptions _jsonOptions = new()
{
Converters = { new ObjectToInferredTypesConverter() }
};

private readonly SqlOutboxTemplate _sqlTemplate;
private readonly JsonSerializerOptions _jsonOptions;
private readonly IGuidGenerator _guidGenerator;
private readonly ICurrentTimeProvider _currentTimeProvider;
private readonly IInstanceIdProvider _instanceIdProvider;
private readonly bool _idDatabaseGenerated;

protected SqlOutboxSettings Settings { get; }

public SqlOutboxRepository(ILogger<SqlOutboxRepository> logger, SqlOutboxSettings settings, SqlOutboxTemplate sqlOutboxTemplate, SqlConnection connection, ISqlTransactionService transactionService)
public SqlOutboxMessageRepository(
ILogger<SqlOutboxMessageRepository> logger,
SqlOutboxSettings settings,
SqlOutboxTemplate sqlOutboxTemplate,
IGuidGenerator guidGenerator,
ICurrentTimeProvider currentTimeProvider,
IInstanceIdProvider instanceIdProvider,
SqlConnection connection,
ISqlTransactionService transactionService)
: base(logger, settings.SqlSettings, connection, transactionService)
{
_sqlTemplate = sqlOutboxTemplate;
_jsonOptions = new();
_jsonOptions.Converters.Add(new ObjectToInferredTypesConverter());

_guidGenerator = guidGenerator;
_currentTimeProvider = currentTimeProvider;
_instanceIdProvider = instanceIdProvider;
_idDatabaseGenerated = settings.IdGeneration.Mode == OutboxMessageIdGenerationMode.DatabaseGeneratedSequentialGuid;
Settings = settings;
}

public async virtual Task Save(OutboxMessage message, CancellationToken token)
public virtual async Task<Guid> Create(string busName, IDictionary<string, object> headers, string path, string messageType, byte[] messagePayload, CancellationToken cancellationToken)
{
var om = new OutboxMessage
{
Timestamp = _currentTimeProvider.CurrentTime.DateTime,
InstanceId = _instanceIdProvider.GetInstanceId(),

BusName = busName,
Headers = headers,
Path = path,
MessageType = messageType,
MessagePayload = messagePayload,
};

if (!_idDatabaseGenerated)
{
om.Id = _guidGenerator.NewGuid();
}

var template = _idDatabaseGenerated
? _sqlTemplate.SqlOutboxMessageInsertWithDatabaseId
: _sqlTemplate.SqlOutboxMessageInsertWithClientId;

await EnsureConnection();

await ExecuteNonQuery(Settings.SqlSettings.OperationRetry, _sqlTemplate.SqlOutboxMessageInsert, cmd =>
om.Id = (Guid)await ExecuteScalarAsync(Settings.SqlSettings.OperationRetry, template, cmd =>
{
cmd.Parameters.Add("@Id", SqlDbType.UniqueIdentifier).Value = message.Id;
cmd.Parameters.Add("@Timestamp", SqlDbType.DateTime2).Value = message.Timestamp;
cmd.Parameters.Add("@BusName", SqlDbType.NVarChar).Value = message.BusName;
cmd.Parameters.Add("@MessageType", SqlDbType.NVarChar).Value = message.MessageType;
cmd.Parameters.Add("@MessagePayload", SqlDbType.VarBinary).Value = message.MessagePayload;
cmd.Parameters.Add("@Headers", SqlDbType.NVarChar).Value = message.Headers != null ? JsonSerializer.Serialize(message.Headers, _jsonOptions) : DBNull.Value;
cmd.Parameters.Add("@Path", SqlDbType.NVarChar).Value = message.Path;
cmd.Parameters.Add("@InstanceId", SqlDbType.NVarChar).Value = message.InstanceId;
cmd.Parameters.Add("@LockInstanceId", SqlDbType.NVarChar).Value = message.LockInstanceId;
cmd.Parameters.Add("@LockExpiresOn", SqlDbType.DateTime2).Value = message.LockExpiresOn ?? new DateTime(2000, 1, 1, 0, 0, 0, DateTimeKind.Utc);
cmd.Parameters.Add("@DeliveryAttempt", SqlDbType.Int).Value = message.DeliveryAttempt;
cmd.Parameters.Add("@DeliveryComplete", SqlDbType.Bit).Value = message.DeliveryComplete;
cmd.Parameters.Add("@DeliveryAborted", SqlDbType.Bit).Value = message.DeliveryAborted;
}, token);
if (!_idDatabaseGenerated)
{
cmd.Parameters.Add("@Id", SqlDbType.UniqueIdentifier).Value = om.Id;
}
cmd.Parameters.Add("@Timestamp", SqlDbType.DateTime2).Value = om.Timestamp;
cmd.Parameters.Add("@BusName", SqlDbType.NVarChar).Value = om.BusName;
cmd.Parameters.Add("@MessageType", SqlDbType.NVarChar).Value = om.MessageType;
cmd.Parameters.Add("@MessagePayload", SqlDbType.VarBinary).Value = om.MessagePayload;
cmd.Parameters.Add("@Headers", SqlDbType.NVarChar).Value = om.Headers != null ? JsonSerializer.Serialize(om.Headers, _jsonOptions) : DBNull.Value;
cmd.Parameters.Add("@Path", SqlDbType.NVarChar).Value = om.Path;
cmd.Parameters.Add("@InstanceId", SqlDbType.NVarChar).Value = om.InstanceId;
cmd.Parameters.Add("@LockInstanceId", SqlDbType.NVarChar).Value = om.LockInstanceId;
cmd.Parameters.Add("@LockExpiresOn", SqlDbType.DateTime2).Value = om.LockExpiresOn ?? new DateTime(2000, 1, 1, 0, 0, 0, DateTimeKind.Utc);
cmd.Parameters.Add("@DeliveryAttempt", SqlDbType.Int).Value = om.DeliveryAttempt;
cmd.Parameters.Add("@DeliveryComplete", SqlDbType.Bit).Value = om.DeliveryComplete;
cmd.Parameters.Add("@DeliveryAborted", SqlDbType.Bit).Value = om.DeliveryAborted;
}, cancellationToken);

return om.Id;
}

public async Task<IReadOnlyCollection<OutboxMessage>> LockAndSelect(string instanceId, int batchSize, bool tableLock, TimeSpan lockDuration, CancellationToken token)
public async Task<IReadOnlyCollection<OutboxMessage>> LockAndSelect(string instanceId, int batchSize, bool tableLock, TimeSpan lockDuration, CancellationToken cancellationToken)
{
await EnsureConnection();

Expand All @@ -49,10 +98,10 @@ public async Task<IReadOnlyCollection<OutboxMessage>> LockAndSelect(string insta
cmd.Parameters.Add("@BatchSize", SqlDbType.Int).Value = batchSize;
cmd.Parameters.Add("@LockDuration", SqlDbType.Int).Value = lockDuration.TotalSeconds;

return await ReadMessages(cmd, token).ConfigureAwait(false);
return await ReadMessages(cmd, cancellationToken).ConfigureAwait(false);
}

public async Task AbortDelivery(IReadOnlyCollection<Guid> ids, CancellationToken token)
public async Task AbortDelivery(IReadOnlyCollection<Guid> ids, CancellationToken cancellationToken)
{
if (ids.Count == 0)
{
Expand All @@ -67,15 +116,15 @@ public async Task AbortDelivery(IReadOnlyCollection<Guid> ids, CancellationToken
{
cmd.Parameters.AddWithValue("@Ids", ToIdsString(ids));
},
token: token);
token: cancellationToken);

if (affected != ids.Count)
{
throw new MessageBusException($"The number of affected rows was {affected}, but {ids.Count} was expected");
}
}

public async Task UpdateToSent(IReadOnlyCollection<Guid> ids, CancellationToken token)
public async Task UpdateToSent(IReadOnlyCollection<Guid> ids, CancellationToken cancellationToken)
{
if (ids.Count == 0)
{
Expand All @@ -90,7 +139,7 @@ public async Task UpdateToSent(IReadOnlyCollection<Guid> ids, CancellationToken
{
cmd.Parameters.AddWithValue("@Ids", ToIdsString(ids));
},
token: token);
token: cancellationToken);

if (affected != ids.Count)
{
Expand All @@ -100,7 +149,7 @@ public async Task UpdateToSent(IReadOnlyCollection<Guid> ids, CancellationToken

private string ToIdsString(IReadOnlyCollection<Guid> ids) => string.Join(_sqlTemplate.InIdsSeparator, ids);

public async Task IncrementDeliveryAttempt(IReadOnlyCollection<Guid> ids, int maxDeliveryAttempts, CancellationToken token)
public async Task IncrementDeliveryAttempt(IReadOnlyCollection<Guid> ids, int maxDeliveryAttempts, CancellationToken cancellationToken)
{
if (ids.Count == 0)
{
Expand All @@ -121,28 +170,28 @@ public async Task IncrementDeliveryAttempt(IReadOnlyCollection<Guid> ids, int ma
cmd.Parameters.AddWithValue("@Ids", ToIdsString(ids));
cmd.Parameters.AddWithValue("@MaxDeliveryAttempts", maxDeliveryAttempts);
},
token: token);
token: cancellationToken);

if (affected != ids.Count)
{
throw new MessageBusException($"The number of affected rows was {affected}, but {ids.Count} was expected");
}
}

public async Task DeleteSent(DateTime olderThan, CancellationToken token)
public async Task DeleteSent(DateTime olderThan, CancellationToken cancellationToken)
{
await EnsureConnection();

var affected = await ExecuteNonQuery(
Settings.SqlSettings.OperationRetry,
_sqlTemplate.SqlOutboxMessageDeleteSent,
cmd => cmd.Parameters.Add("@Timestamp", SqlDbType.DateTime2).Value = olderThan,
token);
cmd => cmd.Parameters.Add("@Timestamp", SqlDbType.DateTimeOffset).Value = olderThan,
cancellationToken);

Logger.Log(affected > 0 ? LogLevel.Information : LogLevel.Debug, "Removed {MessageCount} sent messages from outbox table", affected);
}

public async Task<bool> RenewLock(string instanceId, TimeSpan lockDuration, CancellationToken token)
public async Task<bool> RenewLock(string instanceId, TimeSpan lockDuration, CancellationToken cancellationToken)
{
await EnsureConnection();

Expand All @@ -151,7 +200,7 @@ public async Task<bool> RenewLock(string instanceId, TimeSpan lockDuration, Canc
cmd.Parameters.Add("@InstanceId", SqlDbType.NVarChar).Value = instanceId;
cmd.Parameters.Add("@LockDuration", SqlDbType.Int).Value = lockDuration.TotalSeconds;

return await cmd.ExecuteNonQueryAsync(token) > 0;
return await cmd.ExecuteNonQueryAsync(cancellationToken) > 0;
}

internal async Task<IReadOnlyCollection<OutboxMessage>> GetAllMessages(CancellationToken cancellationToken)
Expand All @@ -164,7 +213,7 @@ internal async Task<IReadOnlyCollection<OutboxMessage>> GetAllMessages(Cancellat
return await ReadMessages(cmd, cancellationToken).ConfigureAwait(false);
}

private async Task<IReadOnlyCollection<OutboxMessage>> ReadMessages(SqlCommand cmd, CancellationToken cancellationToken)
private static async Task<IReadOnlyCollection<OutboxMessage>> ReadMessages(SqlCommand cmd, CancellationToken cancellationToken)
{
using var reader = await cmd.ExecuteReaderAsync(cancellationToken);

Expand All @@ -185,20 +234,29 @@ private async Task<IReadOnlyCollection<OutboxMessage>> ReadMessages(SqlCommand c
var items = new List<OutboxMessage>();
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
var id = reader.GetGuid(idOrdinal);
var headers = reader.IsDBNull(headersOrdinal) ? null : reader.GetString(headersOrdinal);
var headers = reader.IsDBNull(headersOrdinal)
? null
: reader.GetString(headersOrdinal);
var message = new OutboxMessage
{
Id = id,
Id = reader.GetGuid(idOrdinal),
Timestamp = reader.GetDateTime(timestampOrdinal),
BusName = reader.GetString(busNameOrdinal),
MessageType = reader.GetString(typeOrdinal),
MessagePayload = reader.GetSqlBinary(payloadOrdinal).Value,
Headers = headers == null ? null : JsonSerializer.Deserialize<IDictionary<string, object>>(headers, _jsonOptions),
Path = reader.IsDBNull(pathOrdinal) ? null : reader.GetString(pathOrdinal),
Headers = headers == null
? null
: JsonSerializer.Deserialize<IDictionary<string, object>>(headers, _jsonOptions),
Path = reader.IsDBNull(pathOrdinal)
? null
: reader.GetString(pathOrdinal),
InstanceId = reader.GetString(instanceIdOrdinal),
LockInstanceId = reader.IsDBNull(lockInstanceIdOrdinal) ? null : reader.GetString(lockInstanceIdOrdinal),
LockExpiresOn = reader.IsDBNull(lockExpiresOnOrdinal) ? null : reader.GetDateTime(lockExpiresOnOrdinal),
LockInstanceId = reader.IsDBNull(lockInstanceIdOrdinal)
? null
: reader.GetString(lockInstanceIdOrdinal),
LockExpiresOn = reader.IsDBNull(lockExpiresOnOrdinal)
? null
: reader.GetDateTime(lockExpiresOnOrdinal),
DeliveryAttempt = reader.GetInt32(deliveryAttemptOrdinal),
DeliveryComplete = reader.GetBoolean(deliveryCompleteOrdinal),
DeliveryAborted = reader.GetBoolean(deliveryAbortedOrdinal)
Expand Down
Loading

0 comments on commit c2dd2d0

Please sign in to comment.