From 966fbf18f67e6a6ab344bc599eb4b64ea460c789 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Wed, 1 Jan 2025 22:24:13 +0000 Subject: [PATCH] Update alpha --- CHANGELOG.md | 4 +- README.md | 2 +- samples/Infrastructure/Services.fs | 2 +- samples/Infrastructure/Store.fs | 3 +- samples/Store/Domain/Domain.fsproj | 2 +- samples/Store/Domain/Infrastructure.fs | 2 +- samples/Tutorial/Infrastructure.fs | 2 +- samples/Tutorial/Tutorial.fsproj | 2 +- src/Equinox.CosmosStore/CosmosStore.fs | 10 ++++- .../AccessStrategies.fs | 2 +- .../CosmosCoreIntegration.fs | 4 +- .../DocumentStoreIntegration.fs | 3 +- .../FsCodecCompressionTests.fs | 40 +++++++++++++++---- 13 files changed, 55 insertions(+), 23 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 45c31bbe6..35c060b76 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,8 +30,6 @@ The `Unreleased` section name is replaced by the expected version of next releas - `eqx stats`: `-A` (all stats) is now the default unless you specify >=1 of the individual stats via `ESDNO` flags [#459](https://github.com/jet/equinox/pull/459) -### Fixed - ## [4.0.4] - 2024-05-08 @@ -109,7 +107,7 @@ The `Unreleased` section name is replaced by the expected version of next releas - `CosmosStore`: Only log `bytes` when log level is `Debug` [#305](https://github.com/jet/equinox/pull/305) - `CosmosStore.AccessStrategy.MultiSnapshot`,`Custom`: Change `list` and `seq` types to `array` [#338](https://github.com/jet/equinox/pull/338) - `CosmosStore.CosmosStoreCategory`: Generalize `compressUnfolds` to `shouldCompress` predicate [#436](https://github.com/jet/equinox/pull/436) -- `CosmosStore.CosmosStoreCategory.TryHydrateTip`: Generates a Stream State Momento based on externally loaded `u`nfold state [#434](https://github.com/jet/equinox/pull/434) +- `CosmosStore.CosmosStoreCategory.TryHydrateTip`: Generates a Stream State Memento based on externally loaded `u`nfold state [#434](https://github.com/jet/equinox/pull/434) - `CosmosStore.CosmosStoreCategory.TryLoad`: Renders a `'state` based on an Unfold [#434](https://github.com/jet/equinox/pull/434) - `CosmosStore.Core.Initialization.initAux`: Replace hard-coded manual 400 RU with `mode` parameter [#328](https://github.com/jet/equinox/pull/328) :pray: [@brihadish](https://github.com/brihadish) - `CosmosStore.CosmosClientFactory`: Moved to Core [#430](https://github.com/jet/equinox/pull/430) diff --git a/README.md b/README.md index 0da772f3d..0a70f4e82 100644 --- a/README.md +++ b/README.md @@ -170,7 +170,7 @@ The components within this repository are delivered as multi-targeted Nuget pack - `Equinox.Core` [![NuGet](https://img.shields.io/nuget/v/Equinox.Core.svg)](https://www.nuget.org/packages/Equinox.Core/): Hosts generic utility types frequently useful alongside Equinox: [`TaskCell`](https://github.com/jet/equinox/blob/master/src/Equinox.Core/TaskCell.fs#L36), [`Batcher`, `BatcherCache`, `BatcherDictionary`](https://github.com/jet/equinox/blob/master/src/Equinox.Core/Batching.fs#L44). ([depends](https://www.fuget.org/packages/Equinox.Core) on `System.Runtime.Caching`) - `Equinox.MemoryStore` [![MemoryStore NuGet](https://img.shields.io/nuget/v/Equinox.MemoryStore.svg)](https://www.nuget.org/packages/Equinox.MemoryStore/): In-memory store for integration testing/performance base-lining/providing out-of-the-box zero dependency storage for examples. ([depends](https://www.fuget.org/packages/Equinox.MemoryStore) on `Equinox`) -- `Equinox.CosmosStore` [![CosmosStore NuGet](https://img.shields.io/nuget/v/Equinox.CosmosStore.svg)](https://www.nuget.org/packages/Equinox.CosmosStore/): Azure CosmosDB Adapter with integrated 'unfolds' feature, facilitating optimal read performance in terms of latency and RU costs, instrumented to meet Jet's production monitoring requirements. ([depends](https://www.fuget.org/packages/Equinox.CosmosStore) on `Equinox`, `Equinox`, `Microsoft.Azure.Cosmos` >= `3.43.0`, `System.Text.Json`, `FSharp.Control.TaskSeq`) +- `Equinox.CosmosStore` [![CosmosStore NuGet](https://img.shields.io/nuget/v/Equinox.CosmosStore.svg)](https://www.nuget.org/packages/Equinox.CosmosStore/): Azure CosmosDB Adapter with integrated 'unfolds' feature, facilitating optimal read performance in terms of latency and RU costs, instrumented to meet Jet's production monitoring requirements. ([depends](https://www.fuget.org/packages/Equinox.CosmosStore) on `Equinox`, `Equinox`, `Microsoft.Azure.Cosmos` >= `3.43.1`, `System.Text.Json`, `FSharp.Control.TaskSeq`) - `Equinox.CosmosStore.Prometheus` [![CosmosStore.Prometheus NuGet](https://img.shields.io/nuget/v/Equinox.CosmosStore.Prometheus.svg)](https://www.nuget.org/packages/Equinox.CosmosStore.Prometheus/): Integration package providing a `Serilog.Core.ILogEventSink` that extracts detailed metrics information attached to the `LogEvent`s and feeds them to the `prometheus-net`'s `Prometheus.Metrics` static instance. ([depends](https://www.fuget.org/packages/Equinox.CosmosStore.Prometheus) on `Equinox.CosmosStore`, `prometheus-net >= 3.6.0`) - `Equinox.DynamoStore` [![DynamoStore NuGet](https://img.shields.io/nuget/v/Equinox.DynamoStore.svg)](https://www.nuget.org/packages/Equinox.DynamoStore/): Amazon DynamoDB Adapter with integrated 'unfolds' feature, facilitating optimal read performance in terms of latency and RC costs, patterned after `Equinox.CosmosStore`. ([depends](https://www.fuget.org/packages/Equinox.DynamoStore) on `Equinox`, `FSharp.AWS.DynamoDB` >= `0.12.0-beta`, `FSharp.Control.TaskSeq`) - `Equinox.DynamoStore.Prometheus` [![DynamoStore.Prometheus NuGet](https://img.shields.io/nuget/v/Equinox.DynamoStore.Prometheus.svg)](https://www.nuget.org/packages/Equinox.DynamoStore.Prometheus/): Integration package providing a `Serilog.Core.ILogEventSink` that extracts detailed metrics information attached to the `LogEvent`s and feeds them to the `prometheus-net`'s `Prometheus.Metrics` static instance. ([depends](https://www.fuget.org/packages/Equinox.CosmosStore.Prometheus) on `Equinox.DynamoStore`, `prometheus-net >= 3.6.0`) diff --git a/samples/Infrastructure/Services.fs b/samples/Infrastructure/Services.fs index fd96e1966..69ec58174 100644 --- a/samples/Infrastructure/Services.fs +++ b/samples/Infrastructure/Services.fs @@ -17,7 +17,7 @@ type Store(store) = MemoryStore.MemoryStoreCategory(store, name, codec, fold, initial) | Store.Config.Cosmos (store, caching, unfolds) -> let accessStrategy = if unfolds then CosmosStore.AccessStrategy.Snapshot snapshot else CosmosStore.AccessStrategy.Unoptimized - CosmosStore.CosmosStoreCategory<'event,'state,_>(store, name, FsCodec.SystemTextJson.Compression.EncodeTryCompress codec, fold, initial, accessStrategy, caching) + CosmosStore.CosmosStoreCategory<'event,'state,_>(store, name, FsCodec.SystemTextJson.EncodedBody.EncodeTryCompress codec, fold, initial, accessStrategy, caching) | Store.Config.Dynamo (store, caching, unfolds) -> let accessStrategy = if unfolds then DynamoStore.AccessStrategy.Snapshot snapshot else DynamoStore.AccessStrategy.Unoptimized DynamoStore.DynamoStoreCategory<'event,'state,_>(store, name, FsCodec.Compression.EncodeTryCompress codec, fold, initial, accessStrategy, caching) diff --git a/samples/Infrastructure/Store.fs b/samples/Infrastructure/Store.fs index 9b5344c67..4dfc46709 100644 --- a/samples/Infrastructure/Store.fs +++ b/samples/Infrastructure/Store.fs @@ -90,7 +90,8 @@ module Cosmos = member _.MaxRetryWaitTime = p.GetResult(RetriesWaitTimeS, 5.) |> TimeSpan.FromSeconds member _.TipMaxEvents = p.GetResult(TipMaxEvents, 256) member _.TipMaxJsonLength = p.GetResult(TipMaxJsonLength, 30_000) - member x.QueryMaxItems = p.GetResult(QueryMaxItems, 10) + member _.QueryMaxItemsOr(def: int) = p.GetResult(QueryMaxItems, def) + member x.QueryMaxItems = x.QueryMaxItemsOr 10 let logContainer (log: ILogger) role (mode, endpoint, db, container) = log.Information("CosmosDB {role:l} {mode} {connection} Database {database} Container {container}", diff --git a/samples/Store/Domain/Domain.fsproj b/samples/Store/Domain/Domain.fsproj index 9887680ab..979837f82 100644 --- a/samples/Store/Domain/Domain.fsproj +++ b/samples/Store/Domain/Domain.fsproj @@ -19,7 +19,7 @@ - + diff --git a/samples/Store/Domain/Infrastructure.fs b/samples/Store/Domain/Infrastructure.fs index 4ddb0d75f..cb490a7ef 100644 --- a/samples/Store/Domain/Infrastructure.fs +++ b/samples/Store/Domain/Infrastructure.fs @@ -92,7 +92,7 @@ module EventCodec = /// For CosmosStore - we encode to JsonElement as that's what the store talks let genJsonElement<'t when 't :> TypeShape.UnionContract.IUnionContract> = - FsCodec.SystemTextJson.CodecJsonElement.Create<'t>() |> FsCodec.SystemTextJson.Compression.EncodeUncompressed + FsCodec.SystemTextJson.CodecJsonElement.Create<'t>() |> FsCodec.SystemTextJson.EncodedBody.EncodeUncompressed /// For stores other than CosmosStore, we encode to UTF-8 and have the store do the right thing let gen<'t when 't :> TypeShape.UnionContract.IUnionContract> = diff --git a/samples/Tutorial/Infrastructure.fs b/samples/Tutorial/Infrastructure.fs index c1cb20740..816003ac9 100644 --- a/samples/Tutorial/Infrastructure.fs +++ b/samples/Tutorial/Infrastructure.fs @@ -23,7 +23,7 @@ module EventCodec = /// For CosmosStore - we encode to JsonElement as that's what the store talks let genJsonElement<'t when 't :> TypeShape.UnionContract.IUnionContract> = - FsCodec.SystemTextJson.CodecJsonElement.Create<'t>() |> FsCodec.SystemTextJson.Compression.EncodeUncompressed + FsCodec.SystemTextJson.CodecJsonElement.Create<'t>() |> FsCodec.SystemTextJson.EncodedBody.EncodeUncompressed /// For stores other than CosmosStore, we encode to UTF-8 and have the store do the right thing let gen<'t when 't :> TypeShape.UnionContract.IUnionContract> = diff --git a/samples/Tutorial/Tutorial.fsproj b/samples/Tutorial/Tutorial.fsproj index 195a29ee5..c7e797d9f 100644 --- a/samples/Tutorial/Tutorial.fsproj +++ b/samples/Tutorial/Tutorial.fsproj @@ -28,7 +28,7 @@ - + diff --git a/src/Equinox.CosmosStore/CosmosStore.fs b/src/Equinox.CosmosStore/CosmosStore.fs index 1c26413d2..b0b1b209e 100644 --- a/src/Equinox.CosmosStore/CosmosStore.fs +++ b/src/Equinox.CosmosStore/CosmosStore.fs @@ -17,6 +17,10 @@ type EncodedBody = (struct (int * JsonElement)) module internal EncodedBody = let internal jsonRawText: EncodedBody -> string = ValueTuple.snd >> _.GetRawText() let internal jsonUtf8Bytes = jsonRawText >> System.Text.Encoding.UTF8.GetByteCount + let [] deflateEncoding = 1 + // prior to the addition of the `D` field in 4.1.0, the integrated compression support + // was predicated entirely on a JSON String `d` value in the Unfold as implying it was UTF8->Deflate->Base64 encoded + let parseUnfold = function struct (0, e: JsonElement) when e.ValueKind = JsonValueKind.String -> struct (deflateEncoding, e) | x -> x /// A single Domain Event from the array held in a Batch [] @@ -28,12 +32,14 @@ type Event = c: string /// Event body + [] d: JsonElement // Can be Json Null for Nullary cases /// The encoding scheme used for `d` [] D: int /// Optional metadata + [] m: JsonElement /// The encoding scheme used for `m` [] @@ -89,12 +95,14 @@ type Unfold = /// The Case (Event Type) of this snapshot, used to drive deserialization c: string // required + [] d: JsonElement // required /// The encoding scheme used for `d` [] D: int /// Optional metadata, same encoding as `d` (can be null; not written if missing) + [] m: JsonElement /// The encoding scheme used for `m` [] @@ -169,7 +177,7 @@ type internal Enum private () = static member internal Events(b: Batch, ?minIndex, ?maxIndex) = Enum.Events(b.i, b.e, ?minIndex = minIndex, ?maxIndex = maxIndex) static member Unfolds(xs: Unfold[]): ITimelineEvent seq = seq { - for x in xs -> FsCodec.Core.TimelineEvent.Create(x.i, x.c, (x.D, x.d), (x.M, x.m), Guid.Empty, null, null, x.t, isUnfold = true) } + for x in xs -> FsCodec.Core.TimelineEvent.Create(x.i, x.c, EncodedBody.parseUnfold (x.D, x.d), (x.M, x.m), Guid.Empty, null, null, x.t, isUnfold = true) } static member EventsAndUnfolds(x: Tip, ?minIndex, ?maxIndex): ITimelineEvent seq = Enum.Events(x, ?minIndex = minIndex, ?maxIndex = maxIndex) |> Seq.append (Enum.Unfolds x.u) diff --git a/tests/Equinox.CosmosStore.Integration/AccessStrategies.fs b/tests/Equinox.CosmosStore.Integration/AccessStrategies.fs index 8b33f90a0..8a5e9c0ab 100644 --- a/tests/Equinox.CosmosStore.Integration/AccessStrategies.fs +++ b/tests/Equinox.CosmosStore.Integration/AccessStrategies.fs @@ -44,7 +44,7 @@ module SequenceCheck = #if STORE_DYNAMO let codec = FsCodec.SystemTextJson.Codec.Create() |> FsCodec.Compression.EncodeTryCompress #else - let codec = FsCodec.SystemTextJson.CodecJsonElement.Create() |> FsCodec.SystemTextJson.Compression.EncodeTryCompress + let codec = FsCodec.SystemTextJson.CodecJsonElement.Create() |> FsCodec.SystemTextJson.EncodedBody.EncodeTryCompress #endif module Fold = diff --git a/tests/Equinox.CosmosStore.Integration/CosmosCoreIntegration.fs b/tests/Equinox.CosmosStore.Integration/CosmosCoreIntegration.fs index d34693f59..8b5e52525 100644 --- a/tests/Equinox.CosmosStore.Integration/CosmosCoreIntegration.fs +++ b/tests/Equinox.CosmosStore.Integration/CosmosCoreIntegration.fs @@ -10,7 +10,7 @@ open System type TestEvents() = static member private Create(i, ?eventType, ?json) = - let enc = System.Text.Json.JsonSerializer.SerializeToElement >> FsCodec.SystemTextJson.Compression.Encode + let enc = System.Text.Json.JsonSerializer.SerializeToElement >> FsCodec.SystemTextJson.EncodedBody.Uncompressed FsCodec.Core.EventData.Create ( sprintf "%s:%d" (defaultArg eventType "test_event") i, enc (defaultArg json "{\"d\":\"d\"}"), @@ -65,7 +65,7 @@ type Tests(testOutputHelper) = test <@ match res with Choice2Of2 (:? InvalidOperationException as ex) -> ex.Message.StartsWith "Must write either events or unfolds." | x -> failwith $"%A{x}" @> } - let stringOfEncodedBody (x: Equinox.CosmosStore.Core.EncodedBody) = FsCodec.SystemTextJson.Compression.DecodeToJsonElement(x).GetRawText() + let stringOfEncodedBody (x: Equinox.CosmosStore.Core.EncodedBody) = FsCodec.SystemTextJson.EncodedBody.ToJsonElement(x).GetRawText() let jsonDiff (x: string) (y: string) = match JsonDiffPatchDotNet.JsonDiffPatch().Diff(JToken.Parse x, JToken.Parse y) with | null -> "" diff --git a/tests/Equinox.CosmosStore.Integration/DocumentStoreIntegration.fs b/tests/Equinox.CosmosStore.Integration/DocumentStoreIntegration.fs index b7b3bbb07..88682f3ea 100644 --- a/tests/Equinox.CosmosStore.Integration/DocumentStoreIntegration.fs +++ b/tests/Equinox.CosmosStore.Integration/DocumentStoreIntegration.fs @@ -29,8 +29,9 @@ module Cart = StoreCategory(context, Cart.CategoryName, codec, fold, initial, AccessStrategy.MultiSnapshot unfArgs, Equinox.CachingStrategy.NoCaching) |> Equinox.Decider.forStream log |> Cart.create + let createCategorySnapshot context = StoreCategory(context, Cart.CategoryName, codec, fold, initial, AccessStrategy.Snapshot Cart.Fold.Snapshot.config, Equinox.CachingStrategy.NoCaching) let createServiceWithSnapshotStrategy log context = - StoreCategory(context, Cart.CategoryName, codec, fold, initial, AccessStrategy.Snapshot Cart.Fold.Snapshot.config, Equinox.CachingStrategy.NoCaching) + createCategorySnapshot context |> Equinox.Decider.forStream log |> Cart.create let createServiceWithSnapshotStrategyAndCaching log context cache = diff --git a/tests/Equinox.CosmosStore.Integration/FsCodecCompressionTests.fs b/tests/Equinox.CosmosStore.Integration/FsCodecCompressionTests.fs index 76e1ee918..a5b90109a 100644 --- a/tests/Equinox.CosmosStore.Integration/FsCodecCompressionTests.fs +++ b/tests/Equinox.CosmosStore.Integration/FsCodecCompressionTests.fs @@ -1,10 +1,9 @@ // Prior to version v 4.1.0, CosmosStore owned: // - compression of snapshots (and APIs controlling conditionally of that) // - inflation of snapshots -// This is now an external concern, fully implemented by APIs presented in FsCodec.SystemTextJson.Compression V 3.1.0 and later +// This is now an external concern, fully implemented by APIs presented in FsCodec.SystemTextJson.Compression v 3.1.0 and later // These tests are a sanity check pinning the basic mechanisms that are now externalized; any more thorough tests should be maintained in FsCodec // NOTE there is no strong dependency on FsCodec; CosmosStore is happy to roundtrip arbitrary pairs of D/d and M/m values -// However, it is recommended to use that implementation as it provides for interop with (Deflate-compressed) snapshots as written by CosmosStore pre 4.1.0 // NOTE prior to v 4.1.0, CosmosStore provided a System.Text.Json integration for Microsoft.Azure.Cosmos // Version 4.1.0 and later lean on the integrated support provided from Microsoft.Azure.Cosmos v 3.43.0 onward module Equinox.CosmosStore.Integration.FsCodecCompressionTests @@ -24,7 +23,6 @@ type Union = type CoreBehaviors() = let defaultOptions = System.Text.Json.JsonSerializerOptions.Default // Rule out dependencies on any FsCodec conversions let eventCodec = FsCodec.SystemTextJson.CodecJsonElement.Create(defaultOptions) - let nullElement = System.Text.Json.JsonSerializer.SerializeToElement null let ser_ et struct (D, d) = let e: Core.Unfold = @@ -32,7 +30,7 @@ type CoreBehaviors() = c = et d = d D = D - m = nullElement + m = Unchecked.defaultof<_> M = Unchecked.defaultof t = DateTimeOffset.MinValue } System.Text.Json.JsonSerializer.Serialize e @@ -40,14 +38,14 @@ type CoreBehaviors() = [] let ``serializes, achieving expected compression`` () = - let encoded = eventCodec |> FsCodec.SystemTextJson.Compression.EncodeTryCompress |> _.Encode((), A { embed = String('x',5000) }) + let encoded = eventCodec |> FsCodec.SystemTextJson.EncodedBody.EncodeTryCompress |> _.Encode((), A { embed = String('x',5000) }) let res = ser encoded test <@ res.Contains "\"d\":\"" && res.Length < 138 && res.Contains "\"D\":2" @> let codec compress = let forceCompression: FsCodec.SystemTextJson.CompressionOptions = { minSize = 0; minGain = -1000 } - if compress then FsCodec.SystemTextJson.Compression.EncodeTryCompress(eventCodec, forceCompression) - else FsCodec.SystemTextJson.Compression.EncodeUncompressed eventCodec + if compress then FsCodec.SystemTextJson.EncodedBody.EncodeTryCompress(eventCodec, forceCompression) + else FsCodec.SystemTextJson.EncodedBody.EncodeUncompressed eventCodec [] let roundtrips compress value = @@ -63,6 +61,32 @@ type CoreBehaviors() = [] let handlesNulls () = - let ser = ser_ "et" (0, nullElement) + let ser = ser_ "et" (0, System.Text.Json.JsonSerializer.SerializeToElement null) let des = System.Text.Json.JsonSerializer.Deserialize(ser) test <@ System.Text.Json.JsonValueKind.Null = let d = des.d in d.ValueKind @> + +module Pre41IntegratedDeflate = + let private deflate (uncompressedBytes: byte[]) = + let output = new System.IO.MemoryStream() + let compressor = new System.IO.Compression.DeflateStream(output, System.IO.Compression.CompressionLevel.Optimal, leaveOpen = true) + compressor.Write(uncompressedBytes) + compressor.Flush() // Could `Close`, but not required + output.ToArray() + let encode (x: 't) = System.Text.Json.JsonSerializer.SerializeToUtf8Bytes x |> deflate |> Convert.ToBase64String |> System.Text.Json.JsonSerializer.SerializeToElement + +type BackCompatTests() = + + /// Validates that an Unfold with a compressed `d` value without an associated `D` value of `1` is correctly inflated via Core.EncodedBody.parseUnfold + [] + let ```Can inflate pre v 4.1 Deflate encoded snapshots`` () = Async.RunSynchronously <| async { + let context = createPrimaryContext Serilog.Log.Logger 10 + let cat = Equinox.Store.Integration.DocumentStoreIntegration.Cart.createCategorySnapshot context + let expected: Domain.Cart.Fold.State = { + items = [ + { skuId = SkuId Guid.Empty; quantity = 4; returnsWaived = Some true } ] } + TypeShape.Empty.register (fun () -> Unchecked.defaultof) + let unfold = { + TypeShape.Empty.empty with + c = nameof Domain.Cart.Events.Event.Snapshotted + d = Pre41IntegratedDeflate.encode expected } + Some expected =! cat.TryLoad [| unfold |] }