Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for custom indexes on events tables #2556

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/configuration/hostbuilder.md
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ public interface IConfigureMarten
void Configure(IServiceProvider services, StoreOptions options);
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten/MartenServiceCollectionExtensions.cs#L732-L743' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_iconfiguremarten' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten/MartenServiceCollectionExtensions.cs#L734-L745' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_iconfiguremarten' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

You could alternatively implement a custom `IConfigureMarten` class like so:
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration/storeoptions.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class MyStoreOptions: StoreOptions
}
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten.Testing/Examples/ConfiguringDocumentStore.cs#L203-L221' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_custom-store-options' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten.Testing/Examples/ConfiguringDocumentStore.cs#L223-L241' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_custom-store-options' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

This strategy might be beneficial if you need to share Marten configuration across different applications
Expand Down
2 changes: 1 addition & 1 deletion docs/events/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ var store = DocumentStore.For(opts =>
opts.Events.TenancyStyle = TenancyStyle.Conjoined;
});
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten.Testing/Examples/ConfiguringDocumentStore.cs#L226-L236' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_making_the_events_multi_tenanted' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten.Testing/Examples/ConfiguringDocumentStore.cs#L246-L256' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_making_the_events_multi_tenanted' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

By default, if you try to define projection with a single tenancy, Marten will throw an exception at runtime informing you about the mismatch.
Expand Down
2 changes: 1 addition & 1 deletion docs/events/multitenancy.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ var store = DocumentStore.For(opts =>
opts.Events.TenancyStyle = TenancyStyle.Conjoined;
});
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten.Testing/Examples/ConfiguringDocumentStore.cs#L226-L236' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_making_the_events_multi_tenanted' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten.Testing/Examples/ConfiguringDocumentStore.cs#L246-L256' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_making_the_events_multi_tenanted' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->
16 changes: 8 additions & 8 deletions docs/events/projections/aggregate-projections.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public class TripProjection: SingleStreamProjection<Trip>
}
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten.AsyncDaemon.Testing/TestingSupport/TripProjectionWithCustomName.cs#L44-L74' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_tripprojection_aggregate' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten.AsyncDaemon.Testing/TestingSupport/TripProjectionWithCustomName.cs#L43-L73' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_tripprojection_aggregate' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

And register that projection like this:
Expand All @@ -116,7 +116,7 @@ var store = DocumentStore.For(opts =>
opts.Projections.Add<TripProjection>(ProjectionLifecycle.Async);
});
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten.AsyncDaemon.Testing/TestingSupport/TripProjectionWithCustomName.cs#L17-L30' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_registering_an_aggregate_projection' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten.AsyncDaemon.Testing/TestingSupport/TripProjectionWithCustomName.cs#L16-L29' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_registering_an_aggregate_projection' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

Any projection based on `SingleStreamProjection<T>` will allow you to define steps by event type to either create, delete, or mutate an aggregate
Expand Down Expand Up @@ -185,7 +185,7 @@ public class Trip
internal bool ShouldDelete(VacationOver e) => Traveled > 1000;
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten.AsyncDaemon.Testing/TestingSupport/TripProjectionWithCustomName.cs#L113-L163' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_trip_stream_aggregation' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten.AsyncDaemon.Testing/TestingSupport/TripProjectionWithCustomName.cs#L112-L162' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_trip_stream_aggregation' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

Or finally, you can use a method named `Create()` on a projection type as shown in this sample:
Expand Down Expand Up @@ -221,7 +221,7 @@ public class TripProjection: SingleStreamProjection<Trip>
}
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten.AsyncDaemon.Testing/TestingSupport/TripProjectionWithCustomName.cs#L44-L74' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_tripprojection_aggregate' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten.AsyncDaemon.Testing/TestingSupport/TripProjectionWithCustomName.cs#L43-L73' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_tripprojection_aggregate' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

The `Create()` method has to return either the aggregate document type or `Task<T>` where `T` is the aggregate document type. There must be an argument for the specific event type or `Event<T>` where `T` is the event type if you need access to event metadata. You can also take in an `IQuerySession` if you need to look up additional data as part of the transformation or `IEvent` in addition to the exact event type just to get at event metadata.
Expand Down Expand Up @@ -261,7 +261,7 @@ public class TripProjection: SingleStreamProjection<Trip>
}
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten.AsyncDaemon.Testing/TestingSupport/TripProjectionWithCustomName.cs#L169-L194' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_projectevent_in_aggregate_projection' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten.AsyncDaemon.Testing/TestingSupport/TripProjectionWithCustomName.cs#L168-L193' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_projectevent_in_aggregate_projection' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

I'm not personally that wild about using lots of inline Lambdas like the example above, and to that end, Marten now supports the `Apply()` method convention. Here's the same `TripProjection`, but this time using methods to mutate the `Trip` document:
Expand Down Expand Up @@ -297,7 +297,7 @@ public class TripProjection: SingleStreamProjection<Trip>
}
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten.AsyncDaemon.Testing/TestingSupport/TripProjectionWithCustomName.cs#L44-L74' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_tripprojection_aggregate' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten.AsyncDaemon.Testing/TestingSupport/TripProjectionWithCustomName.cs#L43-L73' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_tripprojection_aggregate' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

The `Apply()` methods can accept any combination of these arguments:
Expand Down Expand Up @@ -466,7 +466,7 @@ public class Trip
internal bool ShouldDelete(VacationOver e) => Traveled > 1000;
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten.AsyncDaemon.Testing/TestingSupport/TripProjectionWithCustomName.cs#L113-L163' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_trip_stream_aggregation' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten.AsyncDaemon.Testing/TestingSupport/TripProjectionWithCustomName.cs#L112-L162' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_trip_stream_aggregation' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

Here's an example of using the various ways of doing `Trip` stream aggregation:
Expand Down Expand Up @@ -500,7 +500,7 @@ internal async Task use_a_stream_aggregation()
var trip = await session.Events.AggregateStreamAsync<Trip>(tripId);
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten.AsyncDaemon.Testing/TestingSupport/TripProjectionWithCustomName.cs#L81-L109' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_stream_aggregation' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten.AsyncDaemon.Testing/TestingSupport/TripProjectionWithCustomName.cs#L80-L108' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_stream_aggregation' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Aggregate Versioning
Expand Down
8 changes: 4 additions & 4 deletions docs/events/projections/live-aggregates.md
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ await theSession.Events.AggregateStreamAsync(
fromVersion: baseStateVersion
);
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Aggregation/aggregate_stream_into_samples.cs#L141-L147' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_aggregate-stream-into-state-default' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Aggregation/aggregate_stream_into_samples.cs#L140-L148' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_aggregate-stream-into-state-default' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

It can be helpful, for instance, in snapshotting. Snapshot is a state of the stream at a specific point of time (version). It is a performance optimization that shouldn't be your first choice, but it's an option to consider for performance-critical computations. As you're optimizing your processing, you usually don't want to store a snapshot after each event not to increase the number of writes. Usually, you'd like to do a snapshot on the specific interval or specific event type.
Expand Down Expand Up @@ -360,7 +360,7 @@ public class CashRegisterRepository
}
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Aggregation/aggregate_stream_into_samples.cs#L81-L131' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_aggregate-stream-into-state-wrapper' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Aggregation/aggregate_stream_into_samples.cs#L80-L130' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_aggregate-stream-into-state-wrapper' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

Then append event and store snapshot on opening accounting month:
Expand Down Expand Up @@ -390,7 +390,7 @@ var repository = new CashRegisterRepository(theSession);

await repository.Store(openedCashierShift, cashierShiftOpened);
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Aggregation/aggregate_stream_into_samples.cs#L164-L188' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_aggregate-stream-into-state-store' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Aggregation/aggregate_stream_into_samples.cs#L181-L205' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_aggregate-stream-into-state-store' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

and read snapshot and following event with:
Expand All @@ -400,7 +400,7 @@ and read snapshot and following event with:
```cs
var currentState = await repository.Get(financialAccountId);
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Aggregation/aggregate_stream_into_samples.cs#L207-L211' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_aggregate-stream-into-state-get' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Aggregation/aggregate_stream_into_samples.cs#L224-L228' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_aggregate-stream-into-state-get' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Live Aggregation from Linq Queries
Expand Down
25 changes: 25 additions & 0 deletions docs/events/storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,28 @@ public Dictionary<string, object>? Headers { get; set; }
<!-- endSnippet -->

The full event data is available on `EventStream` and `IEvent` objects immediately after committing a transaction that involves event capture. See [diagnostics and instrumentation](/diagnostics) for more information on capturing event data in the instrumentation hooks.

## Adding indexes to event tables

Additional indexes can be added to the `mt_streams` and `mt_events` tables. These can be useful if you often need to perform queries directly against the event tables.

<!-- snippet: sample_setting_event_custom_indexes -->
<a id='snippet-sample_setting_event_custom_indexes'></a>
```cs
var store = DocumentStore.For(_ =>
{
_.Connection("some connection string");

// Add an index to the mt_streams table on "is_archived"
_.Events.AddIndexToStreamsTable(
new IndexDefinition("idx_mt_streams_is_archived")
.AgainstColumns("is_archived"));

// Add an index to the mt_events table on "type"
_.Events.AddIndexToEventsTable(
new IndexDefinition("idx_mt_events_type")
.AgainstColumns("type"));
});
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten.Testing/Examples/ConfiguringDocumentStore.cs#L205-L220' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_setting_event_custom_indexes' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->
50 changes: 50 additions & 0 deletions src/EventSourcingTests/EventStoreCustomIndexesTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
using System.Linq;
using System.Threading.Tasks;
using Marten.Events;
using Marten.Testing;
using Marten.Testing.Harness;
using Weasel.Postgresql.Tables;
using Xunit;

namespace EventSourcingTests;

public class EventStoreCustomIndexesTests : OneOffConfigurationsContext
{
[Fact]
public async Task can_create_custom_indexes_on_event_tables()
{
const string streamsTypeIndexName = "idx_mt_streams_type";
const string eventsDataIndexName = "idx_mt_events_data_gin";
StoreOptions(options =>
{
var streamsTypeIndex = new IndexDefinition(streamsTypeIndexName).AgainstColumns("type");
options.Events.AddIndexToStreamsTable(streamsTypeIndex);

var eventsDataIndex = new IndexDefinition(eventsDataIndexName).AgainstColumns("data");
eventsDataIndex.Method = IndexMethod.gin;
options.Events.AddIndexToEventsTable(eventsDataIndex);
});

await theStore.EnsureStorageExistsAsync(typeof(IEvent));

Assert.True(await CheckIfIndexExists("mt_streams", streamsTypeIndexName));
Assert.True(await CheckIfIndexExists("mt_events", eventsDataIndexName));
}

private async Task<bool> CheckIfIndexExists(string tableName, string indexName)
{
var exists = await theSession.QueryAsync<bool>(@"
select exists(
select 1
from pg_catalog.pg_indexes
where schemaname = ?
and tablename = ?
and indexname = ?
)",
_schemaName,
tableName,
indexName);

return exists.FirstOrDefault(false);
}
}
24 changes: 22 additions & 2 deletions src/Marten.Testing/Examples/ConfiguringDocumentStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
using Marten.Testing.Harness;
using Newtonsoft.Json;
using Weasel.Core;
using Weasel.Postgresql;
using Weasel.Postgresql.Tables;

namespace Marten.Testing.Examples;
// Leave this commented out please, and always use the User
Expand Down Expand Up @@ -200,6 +200,26 @@ public void setting_event_schema()
#endregion
}

public void setting_event_custom_indexes()
{
#region sample_setting_event_custom_indexes
var store = DocumentStore.For(_ =>
{
_.Connection("some connection string");

// Add an index to the mt_streams table on "is_archived"
_.Events.AddIndexToStreamsTable(
new IndexDefinition("idx_mt_streams_is_archived")
.AgainstColumns("is_archived"));

// Add an index to the mt_events table on "type"
_.Events.AddIndexToEventsTable(
new IndexDefinition("idx_mt_events_type")
.AgainstColumns("type"));
});
#endregion
}

#region sample_custom-store-options
public class MyStoreOptions: StoreOptions
{
Expand Down Expand Up @@ -235,4 +255,4 @@ public void set_multi_tenancy_on_events()

#endregion
}
}
}
9 changes: 7 additions & 2 deletions src/Marten/Events/EventGraph.FeatureSchema.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
using System;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using JasperFx.Core;
using Marten.Events.Archiving;
using Marten.Events.Daemon;
using Marten.Events.Projections;
Expand Down Expand Up @@ -38,8 +39,12 @@ void IFeatureSchema.WritePermissions(Migrator rules, TextWriter writer)

private IEnumerable<ISchemaObject> createAllSchemaObjects()
{
yield return new StreamsTable(this);
var streamsTable = new StreamsTable(this);
streamsTable.Indexes.AddRange(_customStreamsTableIndexes);
yield return streamsTable;

var eventsTable = new EventsTable(this);
eventsTable.Indexes.AddRange(_customEventsTableIndexes);
yield return eventsTable;

#region sample_using-sequence
Expand Down
16 changes: 16 additions & 0 deletions src/Marten/Events/EventGraph.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using Marten.Util;
using NpgsqlTypes;
using Weasel.Core;
using Weasel.Postgresql.Tables;
using static Marten.Events.EventMappingExtensions;

namespace Marten.Events;
Expand Down Expand Up @@ -217,6 +218,21 @@ public IEventStoreOptions Upcast(params IEventUpcaster[] upcasters)
return this;
}

private readonly IList<IndexDefinition> _customEventsTableIndexes = new List<IndexDefinition>();
private readonly IList<IndexDefinition> _customStreamsTableIndexes = new List<IndexDefinition>();

public IEventStoreOptions AddIndexToEventsTable(IndexDefinition index)
{
_customEventsTableIndexes.Add(index);
return this;
}

public IEventStoreOptions AddIndexToStreamsTable(IndexDefinition index)
{
_customStreamsTableIndexes.Add(index);
return this;
}

/// <summary>
/// Override the database schema name for event related tables. By default this
/// is the same schema as the document storage
Expand Down
Loading