From 06a21a0a45d243119f6af4e5e5a4e4440c0f5162 Mon Sep 17 00:00:00 2001 From: Oskar Dudycz Date: Wed, 8 Dec 2021 15:12:58 +0100 Subject: [PATCH] Updated documentation of subscribing to all in CQRS flow sample. --- .../EventStoreDBSubscriptionToAll.cs | 19 +++++++++++++++---- CQRS_Flow/.NET/Core/Core/Events/EventBus.cs | 1 + CQRS_Flow/.NET/README.md | 6 +++--- 3 files changed, 19 insertions(+), 7 deletions(-) diff --git a/CQRS_Flow/.NET/Core/Core.EventStoreDB/Subscriptions/EventStoreDBSubscriptionToAll.cs b/CQRS_Flow/.NET/Core/Core.EventStoreDB/Subscriptions/EventStoreDBSubscriptionToAll.cs index 3167c53..c09dd52 100644 --- a/CQRS_Flow/.NET/Core/Core.EventStoreDB/Subscriptions/EventStoreDBSubscriptionToAll.cs +++ b/CQRS_Flow/.NET/Core/Core.EventStoreDB/Subscriptions/EventStoreDBSubscriptionToAll.cs @@ -97,9 +97,11 @@ private async Task HandleEvent(StreamSubscription subscription, ResolvedEvent re if (streamEvent == null) { - // that can happen if we're sharing database between modules - // if we're subscribing to all then we might get events that are from other module - // and we might not be able to deserialize them + // That can happen if we're sharing database between modules. + // If we're subscribing to all and not filtering out events from other modules, + // then we might get events that are from other module and we might not be able to deserialize them. + // In that case it's safe to ignore deserialization error. + // You may add more sophisticated logic checking if it should be ignored or not. logger.LogWarning("Couldn't deserialize event with id: {EventId}", resolvedEvent.Event.EventId); if (!subscriptionOptions.IgnoreDeserializationErrors) @@ -119,6 +121,8 @@ private async Task HandleEvent(StreamSubscription subscription, ResolvedEvent re { logger.LogError("Error consuming message: {ExceptionMessage}{ExceptionStackTrace}", e.Message, e.StackTrace); + // if you're fine with dropping some events instead of stopping subscription + // then you can add some logic if error should be ignored throw; } } @@ -137,6 +141,8 @@ private void HandleDrop(StreamSubscription _, SubscriptionDroppedReason reason, private void Resubscribe() { + // You may consider adding a max resubscribe count if you want to fail process + // instead of retrying until database is up while (true) { var resubscribed = false; @@ -144,6 +150,9 @@ private void Resubscribe() { Monitor.Enter(resubscribeLock); + // No synchronization context is needed to disable synchronization context. + // That enables running asynchronous method not causing deadlocks. + // As this is a background process then we don't need to have async context here. using (NoSynchronizationContextScope.Enter()) { SubscribeToAll(subscriptionOptions, cancellationToken).Wait(cancellationToken); @@ -165,7 +174,9 @@ private void Resubscribe() if (resubscribed) break; - Thread.Sleep(1000); + // Sleep between reconnections to not flood the database or not kill the CPU with infinite loop + // Randomness added to reduce the chance of multiple subscriptions trying to reconnect at the same time + Thread.Sleep(1000 + new Random((int)DateTime.UtcNow.Ticks).Next(1000)); } } diff --git a/CQRS_Flow/.NET/Core/Core/Events/EventBus.cs b/CQRS_Flow/.NET/Core/Core/Events/EventBus.cs index 5e1338f..0f524ac 100644 --- a/CQRS_Flow/.NET/Core/Core/Events/EventBus.cs +++ b/CQRS_Flow/.NET/Core/Core/Events/EventBus.cs @@ -22,6 +22,7 @@ IServiceProvider serviceProvider private async Task Publish(TEvent @event, CancellationToken ct) { + // You can consider adding here a retry policy for event handling using var scope = serviceProvider.CreateScope(); var eventHandlers = diff --git a/CQRS_Flow/.NET/README.md b/CQRS_Flow/.NET/README.md index 97ffa13..9abea73 100644 --- a/CQRS_Flow/.NET/README.md +++ b/CQRS_Flow/.NET/README.md @@ -41,17 +41,17 @@ It uses: ## Read Model - Read models are rebuilt with eventual consistency using subscribe to all EventStoreDB feature, -- Added hosted service [SubscribeToAllBackgroundWorker](./Core/Core.EventStoreDB/Subscriptions/SubscribeToAllBackgroundWorker.cs) to handle subscribing to all. It handles checkpointing and simple retries if the connection was dropped. +- Added class to manage subscriptions: [EventStoreDBSubscriptionToAll](./Core/Core.EventStoreDB/Subscriptions/EventStoreDBSubscriptionToAll.cs) to handle subscribing to all. It handles checkpointing and simple retries if the connection was dropped. It's run inside the [BackgroundWorker](./Core/Core/BackgroundWorkers/BackgroundWorker.cs) that provides the abstraction for starting and disposing hosted service. - Added [ISubscriptionCheckpointRepository](./Core/Core.EventStoreDB/Subscriptions/ISubscriptionCheckpointRepository.cs) for handling Subscription checkpointing. - Added checkpointing to EventStoreDB stream with [EventStoreDBSubscriptionCheckpointRepository](./Core/Core.EventStoreDB/Subscriptions/EventStoreDBSubscriptionCheckpointRepository.cs) and dummy in-memory checkpointer [InMemorySubscriptionCheckpointRepository](./Core/Core.EventStoreDB/Subscriptions/InMemorySubscriptionCheckpointRepository.cs), -- Added [ElasticSearchProjection](./Core/Core.ElasticSearch/Projections/ElasticSearchProjection.cs) as a sample how to project with [`left-fold`](https://en.wikipedia.org/wiki/Fold_(higher-order_function)) into external storage. Another (e.g. MongoDB, EntityFramework) can be implemented the same way. +- Added [ElasticSearchProjection](./Core/Core.ElasticSearch/Projections/ElasticSearchProjection.cs) as a sample how to project with [`left-fold`](https://en.wikipedia.org/wiki/Fold_(higher-order_function)) into external storage. It supports idempotency through "external version" ElasticSearch mechanism. Another (e.g. MongoDB, EntityFramework) can be implemented the same way. ## Tests - Added sample of unit testing in [`Carts.Tests`](./Carts/Carts.Tests): - [Aggregate unit tests](./Carts/Carts.Tests/Carts/InitializingCart/InitializeCartTests.cs) - [Command handler unit tests](./Carts/Carts.Tests/Carts/InitializingCart/InitializeCartCommandHandlerTests.cs) - Added sample of integration testing in [`Carts.Api.Tests`](./Carts/Carts.Api.Tests) - - [API acceptance tests](./Carts/Carts.Api.Tests/Carts/InitializingCart/InitializeCartTests.cs) + - [API acceptance tests](./Carts/Carts.Api.Tests/Carts/) ## Other - [EventTypeMapper](./Core/Core/Events/EventTypeMapper.cs) class to allow both convention-based mapping (by the .NET type name) and custom to handle event versioning,