Skip to content

Commit

Permalink
SlimMessageBus.Host.Outbox.DbContext that is an relational DB abstrac…
Browse files Browse the repository at this point in the history
…tion via EntityFrameworkCore.Relational

Signed-off-by: Tomasz Maruszak <[email protected]>
  • Loading branch information
zarusz committed Oct 16, 2024
1 parent eb6bd33 commit 19118b9
Show file tree
Hide file tree
Showing 18 changed files with 338 additions and 23 deletions.
1 change: 1 addition & 0 deletions build/tasks.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ $projects = @(
"SlimMessageBus.Host.FluentValidation",

"SlimMessageBus.Host.Outbox",
"SlimMessageBus.Host.Outbox.DbContext",
"SlimMessageBus.Host.Outbox.Sql",
"SlimMessageBus.Host.Outbox.Sql.DbContext",

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
namespace SlimMessageBus.Host.Outbox.DbContext;

public static class BuilderExtensions
{
static readonly internal string PropertyDbContextTransactionEnabled = "DbContextTransaction_Enabled";
static readonly internal string PropertyDbContextTransactionFilter = "DbContextTransaction_Filter";

/// <summary>
/// Enables the creation of <see cref="System.Data.IDbTransaction"/> for every message consumed by this bus.
/// </summary>
/// <param name="builder"></param>
/// <param name="enabled"></param>
/// <param name="messageTypeFilter">When enabled, allows to decide if the transaction should be created.</param>
/// <returns></returns>
public static MessageBusBuilder UseDbContextTransaction(this MessageBusBuilder builder, bool enabled = true, Func<Type, bool> messageTypeFilter = null)
{
SetTransactionProps(builder.Settings, enabled, messageTypeFilter);
return builder;
}

/// <summary>
/// Enables the creation of <see cref="System.Data.IDbTransaction"/> for every message on this consumer.
/// </summary>
/// <param name="builder"></param>
/// <param name="enabled"></param>
/// <param name="messageTypeFilter">When enabled, allows to decide if the transaction should be created.</param>
/// <returns></returns>
public static ConsumerBuilder<T> UseDbContextTransaction<T>(this ConsumerBuilder<T> builder, bool enabled = true, Func<Type, bool> messageTypeFilter = null)
{
SetTransactionProps(builder.Settings, enabled, messageTypeFilter);
return builder;
}

/// <summary>
/// Enables the creation of <see cref="System.Data.IDbTransaction"/> every messages on this handler.
/// </summary>
/// <param name="builder"></param>
/// <param name="enabled"></param>
/// <param name="messageTypeFilter">When enabled, allows to decide if the transaction should be created.</param>
/// <returns></returns>
public static HandlerBuilder<T, R> UseDbContextTransaction<T, R>(this HandlerBuilder<T, R> builder, bool enabled = true, Func<Type, bool> messageTypeFilter = null)
{
SetTransactionProps(builder.Settings, enabled, messageTypeFilter);
return builder;
}

private static void SetTransactionProps(HasProviderExtensions hasProviderExtensions, bool enabled, Func<Type, bool> messageTypeFilter = null)
{
hasProviderExtensions.Properties[PropertyDbContextTransactionEnabled] = enabled;
hasProviderExtensions.Properties[PropertyDbContextTransactionFilter] = messageTypeFilter;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace SlimMessageBus.Host.Outbox.DbContext;

public class DbContextOutboxSettings : OutboxSettings
{
public string DatabaseSchemaName { get; set; } = "dbo";
public string DatabaseTableName { get; set; } = "Outbox";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
namespace SlimMessageBus.Host.Outbox.DbContext;

using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;

public static class MessageBusBuilderExtensions
{
public static MessageBusBuilder AddOutboxUsingDbContext<TDbContext>(this MessageBusBuilder mbb, Action<DbContextOutboxSettings> configure)
where TDbContext : Microsoft.EntityFrameworkCore.DbContext
{
mbb.AddOutbox();

mbb.PostConfigurationActions.Add(services =>
{
var settings = new[] { mbb.Settings }.Concat(mbb.Children.Values.Select(x => x.Settings)).ToList();
services.TryAddSingleton(svp =>
{
var settings = new DbContextOutboxSettings();
configure?.Invoke(settings);
return settings;
});
services.Replace(ServiceDescriptor.Transient<OutboxSettings>(svp => svp.GetRequiredService<DbContextOutboxSettings>()));
// Optimization: only register generic interceptors in the DI for particular message types that have opted in for transaction scope
foreach (var consumerMessageType in settings
.SelectMany(x => x.Consumers
.SelectMany(c => c.Invokers)
.Where(ci => ci.ParentSettings.IsEnabledForMessageType(x, BuilderExtensions.PropertyDbContextTransactionEnabled, BuilderExtensions.PropertyDbContextTransactionFilter, ci.MessageType)))
.Select(x => x.MessageType))
{
var serviceType = typeof(IConsumerInterceptor<>).MakeGenericType(consumerMessageType);
var implementationType = typeof(DbContextTransactionConsumerInterceptor).MakeGenericType(consumerMessageType);
services.TryAddEnumerable(ServiceDescriptor.Transient(serviceType, implementationType));
}
services.TryAddScoped<IOutboxMessageRepository>(svp =>
{
var settings = svp.GetRequiredService<DbContextOutboxSettings>();
return new DbContextOutboxMessageRepository(
svp.GetRequiredService<TDbContext>()
/*
svp.GetRequiredService<ILogger<DbContextOutboxMessageRepository<TDbContext>>>(),
settings,
svp.GetRequiredService<SqlOutboxTemplate>(),
settings.IdGeneration.GuidGenerator ?? (IGuidGenerator)svp.GetRequiredService(settings.IdGeneration.GuidGeneratorType),
svp.GetRequiredService<ICurrentTimeProvider>(),
svp.GetRequiredService<IInstanceIdProvider>(),
svp.GetRequiredService<SqlConnection>(),
svp.GetRequiredService<ISqlTransactionService>()
*/
);
});
});
return mbb;
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
namespace SlimMessageBus.Host.Outbox.DbContext;

public class DbContextOutboxMessageRepository(Microsoft.EntityFrameworkCore.DbContext dbContext) : IOutboxMessageRepository, IHasDbContext
{
public Microsoft.EntityFrameworkCore.DbContext DbContext { get; } = dbContext;

protected readonly DbSet<OutboxMessage> _outboxMessages = dbContext.Set<OutboxMessage>();

public Task<Guid> Create(string busName, IDictionary<string, object> headers, string path, string messageType, byte[] messagePayload, CancellationToken cancellationToken)
{
throw new NotImplementedException();
}

public Task AbortDelivery(IReadOnlyCollection<Guid> ids, CancellationToken cancellationToken)
{
throw new NotImplementedException();
}

public Task DeleteSent(DateTime olderThan, CancellationToken cancellationToken)
{
throw new NotImplementedException();
}

public Task IncrementDeliveryAttempt(IReadOnlyCollection<Guid> ids, int maxDeliveryAttempts, CancellationToken cancellationToken)
{
throw new NotImplementedException();
}

public Task<IReadOnlyCollection<OutboxMessage>> LockAndSelect(string instanceId, int batchSize, bool tableLock, TimeSpan lockDuration, CancellationToken cancellationToken)
{
throw new NotImplementedException();
}

public Task<bool> RenewLock(string instanceId, TimeSpan lockDuration, CancellationToken cancellationToken)
{
throw new NotImplementedException();
}

public Task UpdateToSent(IReadOnlyCollection<Guid> ids, CancellationToken cancellationToken)
{
throw new NotImplementedException();
}
}
7 changes: 7 additions & 0 deletions src/SlimMessageBus.Host.Outbox.DbContext/IHasDbContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace SlimMessageBus.Host.Outbox.DbContext;

public interface IHasDbContext
{
Microsoft.EntityFrameworkCore.DbContext DbContext { get; }
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
namespace SlimMessageBus.Host.Outbox.DbContext;

public abstract class DbContextTransactionConsumerInterceptor
{
}

/// <summary>
/// Wraps the consumer in an <see cref="DbTransaction"/> (conditionally) obtained from <see cref="Microsoft.EntityFrameworkCore.DbContext.Database"/>.
/// </summary>
/// <typeparam name="T"></typeparam>
public class DbContextTransactionConsumerInterceptor<T>(ILogger<DbContextTransactionConsumerInterceptor> logger, IOutboxMessageRepository outboxMessageRepository)
: IConsumerInterceptor<T> where T : class
{
public async Task<object> OnHandle(T message, Func<Task<object>> next, IConsumerContext context)
{
logger.LogTrace("SqlTransaction - creating...");
var hasDbContext = (IHasDbContext)outboxMessageRepository;
var dbTransaction = await hasDbContext.DbContext.Database.BeginTransactionAsync(context.CancellationToken);
try
{
logger.LogDebug("SqlTransaction - created");

var result = await next();

logger.LogTrace("SqlTransaction - committing...");
await dbTransaction.CommitAsync(context.CancellationToken);
logger.LogDebug("SqlTransaction - committed");
return result;
}
catch
{
logger.LogTrace("SqlTransaction - rolling back...");
await dbTransaction.RollbackAsync(context.CancellationToken);
logger.LogDebug("SqlTransaction - rolled back");
throw;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
namespace SlimMessageBus.Host.Outbox.DbContext;

using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Metadata.Builders;

public class OutboxMessageEntityTypeConfiguration(DbContextOutboxSettings outboxSettings) : IEntityTypeConfiguration<OutboxMessage>
{
public void Configure(EntityTypeBuilder<OutboxMessage> builder)
{
builder.ToTable(outboxSettings.DatabaseTableName, outboxSettings.DatabaseSchemaName);

builder.HasKey(x => x.Id);
builder.Property(x => x.Id).ValueGeneratedOnAdd();

builder.Property(x => x.Timestamp)
.IsRequired();

builder.Property(x => x.BusName)
.HasMaxLength(64)
.IsRequired(false);

builder.Property(x => x.MessageType)
.HasMaxLength(256)
.IsRequired();

builder.Property(x => x.MessagePayload)
.IsRequired();

builder.Property(x => x.Headers)
.IsRequired(false);

builder.Property(x => x.Path)
.HasMaxLength(128)
.IsRequired(false);

builder.Property(x => x.InstanceId)
.HasMaxLength(128)
.IsRequired();

builder.Property(x => x.LockInstanceId)
.HasMaxLength(128)
.IsRequired();

builder.Property(x => x.LockExpiresOn)
.IsRequired();

builder.Property(x => x.DeliveryAttempt)
.IsRequired();

builder.Property(x => x.DeliveryComplete)
.IsRequired();

builder.Property(x => x.DeliveryAborted)
.HasDefaultValue(false)
.IsRequired();

builder.HasIndex(x => new { x.Timestamp, x.LockInstanceId, x.LockExpiresOn })
.HasFilter("DeliveryComplete = 0 and DeliveryAborted = 0")
.HasNameInternal("IX_Outbox_Timestamp_LockInstanceId_LockExpiresOn_DeliveryComplete_0_DeliveryAborted_0");

builder.HasIndex(x => new { x.LockExpiresOn, x.LockInstanceId })
.HasFilter("DeliveryComplete = 0 and DeliveryAborted = 0")
.HasNameInternal("IX_Outbox_LockExpiresOn_LockInstanceId_DeliveryComplete_0_DeliveryAborted_0");

builder.HasIndex(x => new { x.Timestamp })
.HasFilter("DeliveryComplete = 1 and DeliveryAborted = 0")
.HasNameInternal("IX_Outbox_Timestamp_DeliveryComplete_1_DeliveryAborted_0");
}
}

static internal class IndexBuilderExtensions
{
static internal IndexBuilder<TEntity> HasNameInternal<TEntity>(this IndexBuilder<TEntity> indexBuilder, string name)
{
#if NETSTANDARD2_0
return indexBuilder.HasName(name);
#else
return indexBuilder.HasDatabaseName(name);
#endif
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<Project Sdk="Microsoft.NET.Sdk">

<Import Project="../Host.Plugin.Properties.xml" />

<PropertyGroup>
<Description>Plugin for SlimMessageBus that adds Transactional Outbox pattern support using Entity Framework</Description>
<PackageTags>SlimMessageBus MessageBus Transactional Outbox SQL Entity Framework EF</PackageTags>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\SlimMessageBus.Host.Outbox\SlimMessageBus.Host.Outbox.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="3.1.0" Condition="'$(TargetFramework)' == 'netstandard2.0'" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="6.0.0" Condition="'$(TargetFramework)' == 'net6.0'" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="8.0.0" Condition="'$(TargetFramework)' == 'net8.0'" />
</ItemGroup>


</Project>
4 changes: 4 additions & 0 deletions src/SlimMessageBus.Host.Outbox.DbContext/Usings.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
global using Microsoft.EntityFrameworkCore;
global using Microsoft.Extensions.Logging;

global using SlimMessageBus.Host.Interceptor;
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
namespace SlimMessageBus.Host.Outbox.Sql;

using SlimMessageBus.Host;

public static class BuilderExtensions
{
static readonly internal string PropertySqlTransactionEnabled = "SqlTransaction_Enabled";
Expand All @@ -16,7 +14,7 @@ public static class BuilderExtensions
/// <returns></returns>
public static MessageBusBuilder UseSqlTransaction(this MessageBusBuilder builder, bool enabled = true, Func<Type, bool> messageTypeFilter = null)
{
SetTransactionScopeProps(builder.Settings, enabled, messageTypeFilter);
SetTransactionProps(builder.Settings, enabled, messageTypeFilter);
return builder;
}

Expand All @@ -29,7 +27,7 @@ public static MessageBusBuilder UseSqlTransaction(this MessageBusBuilder builder
/// <returns></returns>
public static ConsumerBuilder<T> UseSqlTransaction<T>(this ConsumerBuilder<T> builder, bool enabled = true, Func<Type, bool> messageTypeFilter = null)
{
SetTransactionScopeProps(builder.Settings, enabled, messageTypeFilter);
SetTransactionProps(builder.Settings, enabled, messageTypeFilter);
return builder;
}

Expand All @@ -42,11 +40,11 @@ public static ConsumerBuilder<T> UseSqlTransaction<T>(this ConsumerBuilder<T> bu
/// <returns></returns>
public static HandlerBuilder<T, R> UseSqlTransaction<T, R>(this HandlerBuilder<T, R> builder, bool enabled = true, Func<Type, bool> messageTypeFilter = null)
{
SetTransactionScopeProps(builder.Settings, enabled, messageTypeFilter);
SetTransactionProps(builder.Settings, enabled, messageTypeFilter);
return builder;
}

private static void SetTransactionScopeProps(HasProviderExtensions hasProviderExtensions, bool enabled, Func<Type, bool> messageTypeFilter = null)
private static void SetTransactionProps(HasProviderExtensions hasProviderExtensions, bool enabled, Func<Type, bool> messageTypeFilter = null)
{
hasProviderExtensions.Properties[PropertySqlTransactionEnabled] = enabled;
hasProviderExtensions.Properties[PropertySqlTransactionFilter] = messageTypeFilter;
Expand Down
2 changes: 1 addition & 1 deletion src/SlimMessageBus.Host.Outbox/IEnumerableExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
namespace SlimMessageBus.Host.Outbox;

internal static class IEnumerableExtensions
static internal class IEnumerableExtensions
{
public static IEnumerable<IReadOnlyCollection<T>> Batch<T>(this IEnumerable<T> items, int batchSize)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@ public abstract class TransactionScopeConsumerInterceptor
/// Wraps the consumer in an <see cref="TransactionScope"/> (conditionally).
/// </summary>
/// <typeparam name="T"></typeparam>
public class TransactionScopeConsumerInterceptor<T>(ILogger<TransactionScopeConsumerInterceptor> logger, OutboxSettings settings)
: TransactionScopeConsumerInterceptor, IConsumerInterceptor<T> where T : class
public class TransactionScopeConsumerInterceptor<T>(
ILogger<TransactionScopeConsumerInterceptor> logger,
OutboxSettings settings)
: TransactionScopeConsumerInterceptor, IConsumerInterceptor<T>
where T : class
{
public async Task<object> OnHandle(T message, Func<Task<object>> next, IConsumerContext context)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
namespace SlimMessageBus.Host.Outbox.Services;

using System.Threading;

public interface IOutboxLockRenewalTimerFactory
{
IOutboxLockRenewalTimer CreateRenewalTimer(TimeSpan lockDuration, TimeSpan interval, Action<Exception> lockLost, CancellationToken cancellationToken);
Expand Down
Loading

0 comments on commit 19118b9

Please sign in to comment.