You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Hello all,
I think I've run into this observation alluded to by Jeremy trying to implement event store async projections with dynamic tenancy. I have a session factory is basically a tenant id locator using information provided via scoping variables, http request, etc. After the id is found the tenancy is reverts to your exiting framework.
If the async projection coordination (or whatever you call it) functionality where encapsulated into a resolvable unit I would be able to create a new tenancy strategy that would be able to opt into event projections on the fly. I would love to know what your thoughts are or what alternatives I can use today to accomplish my goals.
Thanks in advance!
public class DynamicMultiTenancy: ITenancy
{
private IAsyncProjectionCoordination ProjectionCoordination { get; }
private readonly StoreOptions _options;
public DynamicMultiTenancy(string masterConnectionString, StoreOptions options, IAsyncProjectionCoordination projectionCoordination)
{
ProjectionCoordination = projectionCoordination;
_options = options;
Cleaner = new CompositeDocumentCleaner(this);
}
// Snippet from ITenancy implementation
public Tenant GetTenant(string tenantId)
{
if (_tenants.TryFind(tenantId, out var tenant))
{
return tenant;
}
if (!_tenantToDatabase.TryGetValue(tenantId, out var databaseName))
{
databaseName = tenantId;
}
var database = FindOrCreateDatabase(databaseName).AsTask()
.GetAwaiter().GetResult();
tenant = new Tenant(tenantId, database);
_tenants = _tenants.AddOrUpdate(tenantId, tenant);
// inform the projection coordination logic that a new database should be monitored.
ProjectionCoordination.AddAsync(database).ConfigureAwait(false).GetAwaiter().GetResult();
return tenant;
}
The only change here was moving the logic to a new singleton and adding one opt-in method for AddAsync
/// <summary>
/// Registered automatically by Marten if the async projection daemon is enabled
/// to start and stop asynchronous projections on application start and shutdown
/// </summary>
public class AsyncProjectionHostedService: IHostedService
{
private IAsyncProjectionCoordination ProjectionCoordination { get; }
public AsyncProjectionHostedService(IAsyncProjectionCoordination projectionCoordination)
{
ProjectionCoordination = projectionCoordination;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
// turns the IHostedService into dumb wrapper
await ProjectionCoordination.StartAsync(cancellationToken).ConfigureAwait(false);
}
public async Task StopAsync(CancellationToken cancellationToken)
{
// turns the IHostedService into dumb wrapper
await ProjectionCoordination.StopAsync(cancellationToken).ConfigureAwait(false);
}
}
public interface IAsyncProjectionCoordination
{
Task StartAsync(CancellationToken cancellation);
Task StopAsync(CancellationToken cancellation);
Task AddAsync(IMartenDatabase database, CancellationToken cancellationToken = default);
}
/// <summary>
/// Register as a singleton along side the IHostedService.
/// </summary>
internal class AsyncProjectionCoordination: IAsyncProjectionCoordination
{
public AsyncProjectionCoordination(IDocumentStore store, ILogger<AsyncProjectionHostedService> logger)
{
Store = store.As<DocumentStore>();
_logger = logger;
}
private readonly List<INodeCoordinator> _coordinators = new();
private readonly ILogger<AsyncProjectionHostedService> _logger;
public IReadOnlyList<INodeCoordinator> Coordinators => _coordinators;
internal DocumentStore Store { get; }
public async Task StartAsync(CancellationToken cancellationToken)
{
// accounts for all of the static/known databases
var databases = await Store.Tenancy.BuildDatabases().ConfigureAwait(false);
foreach (var database in databases.OfType<MartenDatabase>())
{
await AddAsync(database, cancellationToken).ConfigureAwait(false);
}
}
public async Task StopAsync(CancellationToken cancellation)
{
if (Store.Options.Projections.AsyncMode == DaemonMode.Disabled)
{
return;
}
try
{
_logger.LogDebug("Stopping the asynchronous projection agent");
foreach (var coordinator in _coordinators)
await coordinator.Stop().ConfigureAwait(false);
}
catch (Exception e)
{
_logger.LogError(e, "Error when trying to stop the asynchronous projection agent");
}
}
public async Task AddAsync(IMartenDatabase database, CancellationToken cancellationToken = default)
{
if (database is not MartenDatabase martenDatabase)
{
return;
}
// accounts for dynamic databases
if (Store.Options.Projections.AsyncMode == DaemonMode.Disabled)
{
return;
}
INodeCoordinator coordinator = Store.Options.Projections.AsyncMode == DaemonMode.Solo
? new SoloCoordinator()
: new HotColdCoordinator(database, Store.Options.Projections, _logger);
try
{
var agent = martenDatabase.StartProjectionDaemon(Store, _logger);
await coordinator.Start(agent, cancellationToken).ConfigureAwait(false);
_coordinators.Add(coordinator);
}
catch (Exception e)
{
_logger.LogError(e, "Unable to start the asynchronous projection agent for database {Database}",
database.Identifier);
throw;
}
}
}
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
-
Hello all,
I think I've run into this observation alluded to by Jeremy trying to implement event store async projections with dynamic tenancy. I have a session factory is basically a tenant id locator using information provided via scoping variables, http request, etc. After the id is found the tenancy is reverts to your exiting framework.
If the async projection coordination (or whatever you call it) functionality where encapsulated into a resolvable unit I would be able to create a new tenancy strategy that would be able to opt into event projections on the fly. I would love to know what your thoughts are or what alternatives I can use today to accomplish my goals.
Thanks in advance!
The only change here was moving the logic to a new singleton and adding one opt-in method for
AddAsync
Beta Was this translation helpful? Give feedback.
All reactions