Skip to content

Commit

Permalink
V3 cleanup and signature changes (#169)
Browse files Browse the repository at this point in the history
Lots of restructuring prompted by
a) signature changes driven by profiling
b) implementation of FeedSourceBase.AwaitCompletion

See also #116:

cleanup naming and signatures for Sinks
rename ProjectorPipeline<'T> -> Sink
name Ingester batch tuples
Restructure Scheduler modules
rename Feed.Internal to Feed.Core
convert all Tuples to struct tuples
change StreamSpan from a record to a type alias
Linting e.g. remove ., ref etc
Extensive profiling-driven impl changes
  • Loading branch information
bartelink authored Aug 25, 2022
1 parent 406c3ac commit 8ed429c
Show file tree
Hide file tree
Showing 46 changed files with 1,497 additions and 1,502 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ The `Unreleased` section name is replaced by the expected version of next releas
### Changed

- Targeted `net6.0` with `6.0.300` SDK, `FSharp.Core` v `6.0.0`, `FSharp.Control.AsyncSeq` v `3.2.1`, `MathNet.Numerics` v `4.15.0`
- Changed all `Tuple` types to `struct` tuples (`System.ValueTuple`) [#169](https://github.com/jet/propulsion/pull/169)
- Changed dominant `ITimelineEvent` EventBody type from `byte array` to `System.ReadOnlyMemory<byte>` [#169](https://github.com/jet/propulsion/pull/169)
- `StreamSpan`: Changed from a record to a type alias [#169](https://github.com/jet/propulsion/pull/169)
- `Propulsion.CosmosStore`: Changed to target `Equinox.CosmosStore` v `4.0.0` [#139](https://github.com/jet/propulsion/pull/139)
- `Propulsion.CosmosStore.CosmosSource`: Changed parsing to use `System.Text.Json` [#139](https://github.com/jet/propulsion/pull/139)
- `Propulsion.EventStore`: Pinned to target `Equinox.EventStore` v `[3.0.7`-`3.99.0]` **Deprecated; Please migrate to `Propulsion.EventStoreDb`** [#139](https://github.com/jet/propulsion/pull/139)
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ The components within this repository are delivered as a multi-targeted Nuget pa

- `Propulsion.MemoryStore` [![NuGet](https://img.shields.io/nuget/v/Propulsion.MemoryStore.svg)](https://www.nuget.org/packages/Propulsion.MemoryStore/). Provides bindings to `Equinox.MemoryStore`. [Depends](https://www.fuget.org/packages/Propulsion.MemoryStore) on `Equinox.MemoryStore` v `4.0.0`, `FsCodec.Box`, `Propulsion`

1. `MemoryStoreSource`: Forwarding from an `Equinox.MemoryStore` into a `Propulsion.ProjectorPipeline`, in order to enable maximum speed integration testing.
1. `MemoryStoreSource`: Forwarding from an `Equinox.MemoryStore` into a `Propulsion.Sink`, in order to enable maximum speed integration testing.

- `Propulsion.CosmosStore` [![NuGet](https://img.shields.io/nuget/v/Propulsion.CosmosStore.svg)](https://www.nuget.org/packages/Propulsion.CosmosStore/) Provides bindings to Azure CosmosDB. [Depends](https://www.fuget.org/packages/Propulsion.CosmosStore) on `Equinox.CosmosStore` v `4.0.0`

Expand Down Expand Up @@ -45,7 +45,7 @@ The components within this repository are delivered as a multi-targeted Nuget pa
- `Propulsion.DynamoStore.Constructs` [![NuGet](https://img.shields.io/nuget/v/Propulsion.DynamoStore.Constructs.svg)](https://www.nuget.org/packages/Propulsion.DynamoStore.Constructs/) AWS Lambda CDK deploy logic. [Depends](https://www.fuget.org/packages/Propulsion.DynamoStore.Constructs) on `Amazon.CDK.Lib` (and, indirectly, on the binary assets included as content in `Propulsion.DynamoStore.Lambda`)

- `Propulsion.EventStoreDb` [![NuGet](https://img.shields.io/nuget/v/Propulsion.EventStoreDb.svg)](https://www.nuget.org/packages/Propulsion.EventStoreDb/). Provides bindings to [EventStore](https://www.eventstore.org), writing via `Propulsion.EventStore.EventStoreSink` [Depends](https://www.fuget.org/packages/Propulsion.EventStoreDb) on `Equinox.EventStoreDb` v `4.0.0`, `Serilog`
1. `EventStoreSource`: reading from an EventStoreDB >= `20.10` `$all` stream into a `Propulsion.ProjectorPipeline` using the gRPC interface. Provides throughput metrics via `Propulsion.Feed.Prometheus`
1. `EventStoreSource`: reading from an EventStoreDB >= `20.10` `$all` stream into a `Propulsion.Sink` using the gRPC interface. Provides throughput metrics via `Propulsion.Feed.Prometheus`
2. `EventStoreSink`: writing to `Equinox.EventStoreDb` v `4.0.0`

(Reading and position metrics are exposed via `Propulsion.Feed.Prometheus`)
Expand All @@ -60,7 +60,7 @@ The components within this repository are delivered as a multi-targeted Nuget pa

- `Propulsion.SqlStreamStore` [![NuGet](https://img.shields.io/nuget/v/Propulsion.SqlStreamStore.svg)](https://www.nuget.org/packages/Propulsion.SqlStreamStore/). Provides bindings to [SqlStreamStore](https://github.com/SQLStreamStore/SQLStreamStore), maintaining checkpoints in a SQL table using Dapper [Depends](https://www.fuget.org/packages/Propulsion.SqlStreamStore) on `Propulsion.Feed`, `SqlStreamStore`, `Dapper` v `2.0`, `Microsoft.Data.SqlClient` v `1.1.3`, `Serilog`

1. `SqlStreamStoreSource`: reading from a SqlStreamStore `$all` stream into a `Propulsion.ProjectorPipeline`
1. `SqlStreamStoreSource`: reading from a SqlStreamStore `$all` stream into a `Propulsion.Sink`
2. `ReaderCheckpoint`: checkpoint storage for `Propulsion.Feed`/`SqlStreamSteamStore`/`EventStoreDb` using `Dapper`, `Microsoft.Data.SqlClient`

(Reading and position metrics are exposed via `Propulsion.Feed.Prometheus`)
Expand Down
36 changes: 16 additions & 20 deletions src/Propulsion.Cosmos/CosmosPruner.fs
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ module Pruner =
Equinox.Cosmos.Store.Log.InternalMetrics.dump log

// Per set of accumulated events per stream (selected via `selectExpired`), attempt to prune up to the high water mark
let handle pruneUntil (stream, span: StreamSpan<_>) = async {
let handle pruneUntil struct (stream, span: StreamSpan<_>) = async {
// The newest event eligible for deletion defines the cutoff point
let untilIndex = span.events[span.events.Length - 1].Index
let untilIndex = span[span.Length - 1].Index
// Depending on the way the events are batched, requests break into three groupings:
// 1. All requested events already deleted, no writes took place
// (if trimmedPos is beyond requested Index, Propulsion will discard the requests via the OverrideWritePosition)
Expand All @@ -91,20 +91,20 @@ module Pruner =
// in this case, we mark the event as handled and await a successor event triggering another attempt
let! deleted, deferred, trimmedPos = pruneUntil (FsCodec.StreamName.toString stream) untilIndex
// Categorize the outcome so the stats handler can summarize the work being carried out
let res = if deleted = 0 && deferred = 0 then Nop span.events.Length else Ok (deleted, deferred)
let res = if deleted = 0 && deferred = 0 then Nop span.Length else Ok (deleted, deferred)
// For case where we discover events have already been deleted beyond our requested position, signal to reader to drop events
let writePos = max trimmedPos (untilIndex + 1L)
return writePos, res
return struct (writePos, res)
}

type StreamSchedulingEngine =

static member Create(pruneUntil, itemDispatcher, stats : Stats, dumpStreams, ?maxBatches, ?purgeInterval, ?wakeForResults, ?idleDelay)
: Scheduling.StreamSchedulingEngine<_, _, _> =
let interpret (stream, span) =
let stats = Buffering.StreamSpan.stats span
stats, (stream, span)
let dispatcher = Scheduling.MultiDispatcher<_, _, _>.Create(itemDispatcher, handle pruneUntil, interpret, (fun _ -> id), stats, dumpStreams)
: Scheduling.StreamSchedulingEngine<_, _, _, _> =
let interpret struct (stream, span) =
let metrics = StreamSpan.metrics Default.eventSize span
struct (metrics, struct (stream, span))
let dispatcher = Scheduling.Dispatcher.MultiDispatcher<_, _, _, _>.Create(itemDispatcher, handle pruneUntil, interpret, (fun _ -> id), stats, dumpStreams)
Scheduling.StreamSchedulingEngine(
dispatcher,
?maxBatches = maxBatches, ?purgeInterval = purgeInterval, ?wakeForResults = wakeForResults, ?idleDelay = idleDelay,
Expand All @@ -114,32 +114,28 @@ module Pruner =
type CosmosPruner =

/// DANGER: this API DELETES events - use with care
/// Starts a <c>StreamsProjectorPipeline</c> that prunes _all submitted events from the supplied <c>context</c>_
/// Starts a <c>Sink</c> that prunes _all submitted events from the supplied <c>context</c>_
static member Start
( log : ILogger, maxReadAhead, context, maxConcurrentStreams,
// Default 5m
?statsInterval,
// Default 5m
?stateInterval,
?maxSubmissionsPerPartition,
?maxBatches, ?purgeInterval, ?wakeForResults,
// Delay when no items available. Default 10ms.
?idleDelay,
?maxBatches, ?purgeInterval, ?wakeForResults, ?idleDelay,
// Defaults to statsInterval
?ingesterStatsInterval)
: Propulsion.ProjectorPipeline<_> =
let idleDelay = defaultArg idleDelay (TimeSpan.FromMilliseconds 10.)
: Default.Sink =
let statsInterval, stateInterval = defaultArg statsInterval (TimeSpan.FromMinutes 5.), defaultArg stateInterval (TimeSpan.FromMinutes 5.)
let stats = Pruner.Stats(log.ForContext<Pruner.Stats>(), statsInterval, stateInterval)
let dispatcher = Scheduling.ItemDispatcher<_>(maxConcurrentStreams)
let dumpStreams struct (s : Scheduling.StreamStates<_>, totalPruned) log =
s.Dump(log, totalPruned, Buffering.StreamState.eventsSize)
let dispatcher = Dispatch.ItemDispatcher<_, _>(maxConcurrentStreams)
let dumpStreams logStreamStates _log = logStreamStates Default.eventSize
let pruneUntil stream index = Equinox.Cosmos.Core.Events.pruneUntil context stream index
let streamScheduler =
Pruner.StreamSchedulingEngine.Create(
pruneUntil, dispatcher, stats, dumpStreams,
?maxBatches = maxBatches, ?purgeInterval = purgeInterval, ?wakeForResults = wakeForResults, idleDelay = idleDelay)
Projector.StreamsProjectorPipeline.Start(
?maxBatches = maxBatches, ?purgeInterval = purgeInterval, ?wakeForResults = wakeForResults, ?idleDelay = idleDelay)
Projector.Pipeline.Start(
log, dispatcher.Pump, streamScheduler.Pump, maxReadAhead, streamScheduler.Submit, statsInterval,
?maxSubmissionsPerPartition = maxSubmissionsPerPartition,
?ingesterStatsInterval = ingesterStatsInterval)
Loading

0 comments on commit 8ed429c

Please sign in to comment.