Skip to content

Commit

Permalink
Updated documentation of subscribing to all in CQRS flow sample.
Browse files Browse the repository at this point in the history
  • Loading branch information
oskardudycz committed Dec 8, 2021
1 parent a3a1f62 commit 06a21a0
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,11 @@ private async Task HandleEvent(StreamSubscription subscription, ResolvedEvent re

if (streamEvent == null)
{
// that can happen if we're sharing database between modules
// if we're subscribing to all then we might get events that are from other module
// and we might not be able to deserialize them
// That can happen if we're sharing database between modules.
// If we're subscribing to all and not filtering out events from other modules,
// then we might get events that are from other module and we might not be able to deserialize them.
// In that case it's safe to ignore deserialization error.
// You may add more sophisticated logic checking if it should be ignored or not.
logger.LogWarning("Couldn't deserialize event with id: {EventId}", resolvedEvent.Event.EventId);

if (!subscriptionOptions.IgnoreDeserializationErrors)
Expand All @@ -119,6 +121,8 @@ private async Task HandleEvent(StreamSubscription subscription, ResolvedEvent re
{
logger.LogError("Error consuming message: {ExceptionMessage}{ExceptionStackTrace}", e.Message,
e.StackTrace);
// if you're fine with dropping some events instead of stopping subscription
// then you can add some logic if error should be ignored
throw;
}
}
Expand All @@ -137,13 +141,18 @@ private void HandleDrop(StreamSubscription _, SubscriptionDroppedReason reason,

private void Resubscribe()
{
// You may consider adding a max resubscribe count if you want to fail process
// instead of retrying until database is up
while (true)
{
var resubscribed = false;
try
{
Monitor.Enter(resubscribeLock);

// No synchronization context is needed to disable synchronization context.
// That enables running asynchronous method not causing deadlocks.
// As this is a background process then we don't need to have async context here.
using (NoSynchronizationContextScope.Enter())
{
SubscribeToAll(subscriptionOptions, cancellationToken).Wait(cancellationToken);
Expand All @@ -165,7 +174,9 @@ private void Resubscribe()
if (resubscribed)
break;

Thread.Sleep(1000);
// Sleep between reconnections to not flood the database or not kill the CPU with infinite loop
// Randomness added to reduce the chance of multiple subscriptions trying to reconnect at the same time
Thread.Sleep(1000 + new Random((int)DateTime.UtcNow.Ticks).Next(1000));
}
}

Expand Down
1 change: 1 addition & 0 deletions CQRS_Flow/.NET/Core/Core/Events/EventBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ IServiceProvider serviceProvider

private async Task Publish<TEvent>(TEvent @event, CancellationToken ct)
{
// You can consider adding here a retry policy for event handling
using var scope = serviceProvider.CreateScope();

var eventHandlers =
Expand Down
6 changes: 3 additions & 3 deletions CQRS_Flow/.NET/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,17 @@ It uses:

## Read Model
- Read models are rebuilt with eventual consistency using subscribe to all EventStoreDB feature,
- Added hosted service [SubscribeToAllBackgroundWorker](./Core/Core.EventStoreDB/Subscriptions/SubscribeToAllBackgroundWorker.cs) to handle subscribing to all. It handles checkpointing and simple retries if the connection was dropped.
- Added class to manage subscriptions: [EventStoreDBSubscriptionToAll](./Core/Core.EventStoreDB/Subscriptions/EventStoreDBSubscriptionToAll.cs) to handle subscribing to all. It handles checkpointing and simple retries if the connection was dropped. It's run inside the [BackgroundWorker](./Core/Core/BackgroundWorkers/BackgroundWorker.cs) that provides the abstraction for starting and disposing hosted service.
- Added [ISubscriptionCheckpointRepository](./Core/Core.EventStoreDB/Subscriptions/ISubscriptionCheckpointRepository.cs) for handling Subscription checkpointing.
- Added checkpointing to EventStoreDB stream with [EventStoreDBSubscriptionCheckpointRepository](./Core/Core.EventStoreDB/Subscriptions/EventStoreDBSubscriptionCheckpointRepository.cs) and dummy in-memory checkpointer [InMemorySubscriptionCheckpointRepository](./Core/Core.EventStoreDB/Subscriptions/InMemorySubscriptionCheckpointRepository.cs),
- Added [ElasticSearchProjection](./Core/Core.ElasticSearch/Projections/ElasticSearchProjection.cs) as a sample how to project with [`left-fold`](https://en.wikipedia.org/wiki/Fold_(higher-order_function)) into external storage. Another (e.g. MongoDB, EntityFramework) can be implemented the same way.
- Added [ElasticSearchProjection](./Core/Core.ElasticSearch/Projections/ElasticSearchProjection.cs) as a sample how to project with [`left-fold`](https://en.wikipedia.org/wiki/Fold_(higher-order_function)) into external storage. It supports idempotency through "external version" ElasticSearch mechanism. Another (e.g. MongoDB, EntityFramework) can be implemented the same way.

## Tests
- Added sample of unit testing in [`Carts.Tests`](./Carts/Carts.Tests):
- [Aggregate unit tests](./Carts/Carts.Tests/Carts/InitializingCart/InitializeCartTests.cs)
- [Command handler unit tests](./Carts/Carts.Tests/Carts/InitializingCart/InitializeCartCommandHandlerTests.cs)
- Added sample of integration testing in [`Carts.Api.Tests`](./Carts/Carts.Api.Tests)
- [API acceptance tests](./Carts/Carts.Api.Tests/Carts/InitializingCart/InitializeCartTests.cs)
- [API acceptance tests](./Carts/Carts.Api.Tests/Carts/)

## Other
- [EventTypeMapper](./Core/Core/Events/EventTypeMapper.cs) class to allow both convention-based mapping (by the .NET type name) and custom to handle event versioning,
Expand Down

0 comments on commit 06a21a0

Please sign in to comment.