Skip to content

Commit

Permalink
Add option to specify projection rebuild command timeout for CommandLine
Browse files Browse the repository at this point in the history
  • Loading branch information
mysticmind authored and oskardudycz committed Nov 8, 2022
1 parent 6148300 commit 449617f
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 12 deletions.
25 changes: 24 additions & 1 deletion src/Marten.CommandLine.Tests/ProjectionControllerTests.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using Baseline.Dates;
using Castle.Core;
using Marten.CommandLine.Commands.Projection;
using Marten.Events.Daemon;
Expand Down Expand Up @@ -64,7 +65,7 @@ void IProjectionHost.ListenForUserTriggeredExit()

protected readonly List<RebuildRecord> rebuilt = new ();

Task<RebuildStatus> IProjectionHost.TryRebuildShards(IProjectionDatabase database, IReadOnlyList<AsyncProjectionShard> asyncProjectionShards)
Task<RebuildStatus> IProjectionHost.TryRebuildShards(IProjectionDatabase database, IReadOnlyList<AsyncProjectionShard> asyncProjectionShards, TimeSpan? shardTimeout)
{
foreach (var shard in asyncProjectionShards)
{
Expand Down Expand Up @@ -327,6 +328,28 @@ public async Task rebuilds_all_databases_for_a_single_store()
rebuilt.ShouldHaveTheSameElementsAs(expectedRebuilds);
}

[Fact]
public async Task rebuilds_all_databases_for_a_single_store_with_shard_timeout()
{
var store = withStore("Marten", new("Foo:All", ProjectionLifecycle.Async),
new("Bar:All", ProjectionLifecycle.Inline));

var shards = store.Shards;

var databases = store.HasDatabases("One", "Two", "Three");

// NOTE: ShardTimeout is set in ProjectInput as a test
// but there is no means to assert this value being used by daemon
await theController.Execute(new ProjectionInput { RebuildFlag = true, ShardTimeout = 10.Minutes()});

var expectedRebuilds = databases.SelectMany(db =>
{
return shards.Select(shard => new RebuildRecord(store, db, shard));
}).ToArray();

rebuilt.ShouldHaveTheSameElementsAs(expectedRebuilds);
}

[Fact]
public async Task rebuild_database_that_is_empty()
{
Expand Down
6 changes: 4 additions & 2 deletions src/Marten.CommandLine/Commands/Projection/IProjectionHost.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Baseline.Dates;
using Marten.Events.Daemon;

namespace Marten.CommandLine.Commands.Projection;
Expand All @@ -14,7 +16,7 @@ public interface IProjectionHost
{
IReadOnlyList<IProjectionStore> AllStores();
void ListenForUserTriggeredExit();
Task<RebuildStatus> TryRebuildShards(IProjectionDatabase database, IReadOnlyList<AsyncProjectionShard> asyncProjectionShards);
Task<RebuildStatus> TryRebuildShards(IProjectionDatabase database, IReadOnlyList<AsyncProjectionShard> asyncProjectionShards, TimeSpan? shardTimeout=null);
Task StartShards(IProjectionDatabase database, IReadOnlyList<AsyncProjectionShard> shards);
Task WaitForExit();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public async Task<bool> Execute(ProjectionInput input)
_view.WriteHeader(database);
}

var status = await _host.TryRebuildShards(database, shards).ConfigureAwait(false);
var status = await _host.TryRebuildShards(database, shards, input.ShardTimeout).ConfigureAwait(false);

if (status == RebuildStatus.NoData)
{
Expand Down Expand Up @@ -175,4 +175,4 @@ public IReadOnlyList<IProjectionDatabase> FilterDatabases(ProjectionInput input,

return databases;
}
}
}
31 changes: 25 additions & 6 deletions src/Marten.CommandLine/Commands/Projection/ProjectionHost.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Runtime.Loader;
using System.Threading;
using System.Threading.Tasks;
using Baseline.Dates;
using Marten.Events.Daemon;
using Microsoft.Extensions.Hosting;

Expand Down Expand Up @@ -57,7 +58,7 @@ public void Shutdown()
_completion.TrySetResult(true);
}

public async Task<RebuildStatus> TryRebuildShards(IProjectionDatabase database, IReadOnlyList<AsyncProjectionShard> asyncProjectionShards)
public async Task<RebuildStatus> TryRebuildShards(IProjectionDatabase database, IReadOnlyList<AsyncProjectionShard> asyncProjectionShards, TimeSpan? shardTimeout=null)
{
using var daemon = database.BuildDaemon();
await daemon.StartDaemon().ConfigureAwait(false);
Expand All @@ -80,13 +81,31 @@ public async Task<RebuildStatus> TryRebuildShards(IProjectionDatabase database,

#if NET6_0_OR_GREATER
await Parallel.ForEachAsync(projectionNames, _cancellation.Token,
async (projectionName, token) =>
await daemon.RebuildProjection(projectionName, token).ConfigureAwait(false))
.ConfigureAwait(false);
async (projectionName, token) =>
{
if (shardTimeout == null)
{
await daemon.RebuildProjection(projectionName, token).ConfigureAwait(true);
}
else
{
await daemon.RebuildProjection(projectionName, shardTimeout.Value, token).ConfigureAwait(true);
}
})
.ConfigureAwait(false);

#else
var tasks = projectionNames
.Select(x => Task.Run(async () => await daemon.RebuildProjection(x, _cancellation.Token).ConfigureAwait(false), _cancellation.Token))
.Select(x => Task.Run(async () => {
if (shardTimeout == null)
{
await daemon.RebuildProjection(x, _cancellation.Token).ConfigureAwait(false);
}
else
{
await daemon.RebuildProjection(x, shardTimeout.Value, _cancellation.Token).ConfigureAwait(false);
}
}, _cancellation.Token))
.ToArray();

await Task.WhenAll(tasks).ConfigureAwait(false);
Expand Down Expand Up @@ -116,4 +135,4 @@ public Task WaitForExit()
{
return _completion.Task;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System;
using System.Collections.Generic;
using System.Linq;
using Baseline;
Expand Down Expand Up @@ -33,6 +34,9 @@ public ProjectionInput()
[Description("If specified, only execute against the named Marten database within the specified store(s). Does not apply with only one store")]
public string DatabaseFlag { get; set; }

[Description("If specified, use this shard timeout value for daemon")]
public TimeSpan? ShardTimeout { get; set; }

internal IList<AsyncProjectionShard> BuildShards(DocumentStore store)
{
var projections = store
Expand Down Expand Up @@ -89,4 +93,4 @@ internal IList<IProjectionSource> SelectProjectionsForRebuild(DocumentStore stor

return projections;
}
}
}

0 comments on commit 449617f

Please sign in to comment.