From f2d8fcedd92eabbe77e5677da3d296db77e3eeac Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Sat, 7 Sep 2024 10:01:57 +0100 Subject: [PATCH 1/4] feat(Cosmos): Use Azure.Cosmos STJ impl --- CHANGELOG.md | 3 ++ src/Equinox.CosmosStore/CosmosStore.fs | 28 +++++++++++++++---- .../CosmosStoreSerialization.fs | 16 ----------- .../Equinox.CosmosStore.fsproj | 2 +- 4 files changed, 26 insertions(+), 23 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ae0c4cb17..45c31bbe6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ The `Unreleased` section name is replaced by the expected version of next releas ### Added +- `Equinox.CosmosStore`: Use `Microsoft.Azure.Cosmos` integrated `System.Text.Json` support; added ability to specify `serializerOptions` [#467](https://github.com/jet/equinox/pull/467) - `Equinox.CosmosStore`: Group metrics by Container Name [#449](https://github.com/jet/equinox/pull/449) - `Equinox.CosmosStore`: Group metrics by Category; split out `Tip` activity [#453](https://github.com/jet/equinox/pull/453) - `Equinox.CosmosStore`: Support Ingesting unfolds [#460](https://github.com/jet/equinox/pull/460) @@ -21,6 +22,8 @@ The `Unreleased` section name is replaced by the expected version of next releas - `Equinox.*Store`,`Equinox.*Store.Prometheus`: Pin `Equinox` dependencies to `[4.0.0, 5.0.0)`] [#448](https://github.com/jet/equinox/pull/448) - `Equinox.CosmosStore`: Update `System.Text.Json` dep to `6.0.10` per [CVE-2024-43485](https://github.com/advisories/GHSA-8g4q-xg66-9fp4) [#470](https://github.com/jet/equinox/pull/470) +- `Equinox.CosmosStore`: Minimum `Microsoft.Azure.Cosmos` requirement updated to `3.43.0` to avail of integrated `System.Text.Json` support [#467](https://github.com/jet/equinox/pull/467) +- `Equinox.CosmosStore.CosmosStoreConnector`: Removed mandatory `requestTimeout` argument [#467](https://github.com/jet/equinox/pull/467) - `Equinox.MessageDb`: Up min `Npgsql` to v `7.0.7` as `7.0.0` is on CVE blacklist ### Removed diff --git a/src/Equinox.CosmosStore/CosmosStore.fs b/src/Equinox.CosmosStore/CosmosStore.fs index 417b5330f..c2ab7b4a6 100644 --- a/src/Equinox.CosmosStore/CosmosStore.fs +++ b/src/Equinox.CosmosStore/CosmosStore.fs @@ -1163,12 +1163,16 @@ type DiscoveryMode = /// Manages establishing a CosmosClient, which is used by CosmosStoreClient to read from the underlying Cosmos DB Container. type CosmosClientFactory(options) = - static member CreateDefaultOptions(requestTimeout: TimeSpan, maxRetryAttemptsOnRateLimitedRequests: int, maxRetryWaitTimeOnRateLimitedRequests: TimeSpan) = + static member CreateDefaultOptions(maxRetryAttemptsOnRateLimitedRequests: int, maxRetryWaitTimeOnRateLimitedRequests: TimeSpan, serializerOptions) = CosmosClientOptions( MaxRetryAttemptsOnRateLimitedRequests = maxRetryAttemptsOnRateLimitedRequests, MaxRetryWaitTimeOnRateLimitedRequests = maxRetryWaitTimeOnRateLimitedRequests, - RequestTimeout = requestTimeout, - Serializer = CosmosJsonSerializer(JsonSerializerOptions())) + UseSystemTextJsonSerializerWithOptions = serializerOptions) + [] + static member CreateDefaultOptions(requestTimeout: TimeSpan, maxRetryAttemptsOnRateLimitedRequests: int, maxRetryWaitTimeOnRateLimitedRequests: TimeSpan) = + let o = CosmosClientFactory.CreateDefaultOptions(maxRetryAttemptsOnRateLimitedRequests, maxRetryWaitTimeOnRateLimitedRequests, JsonSerializerOptions.Default) + o.RequestTimeout <- requestTimeout + o /// CosmosClientOptions for this CosmosClientFactory as configured (NOTE while the Options object is not immutable, it should not have setters called on it) member val Options = options /// Creates an instance of CosmosClient without actually validating or establishing the connection @@ -1203,8 +1207,6 @@ type Discovery = type CosmosStoreConnector ( // CosmosDB endpoint/credentials specification. discovery: Discovery, - // Timeout to apply to individual reads/write round-trips going to CosmosDB. CosmosDB Default: 1m. - requestTimeout: TimeSpan, // Maximum number of times to attempt when failure reason is a 429 from CosmosDB, signifying RU limits have been breached. CosmosDB default: 9 maxRetryAttemptsOnRateLimitedRequests: int, // Maximum number of seconds to wait (especially if a higher wait delay is suggested by CosmosDB in the 429 response). CosmosDB default: 30s @@ -1212,17 +1214,31 @@ type CosmosStoreConnector // Connection mode (default: ConnectionMode.Direct (best performance, same as Microsoft.Azure.Cosmos SDK default) // NOTE: default for Equinox.Cosmos.Connector (i.e. V2) was Gateway (worst performance, least trouble, Microsoft.Azure.DocumentDb SDK default) [] ?mode: ConnectionMode, + // System.Text.Json SerializerOptions (Defaults to default options) + [] ?serializerOptions: System.Text.Json.JsonSerializerOptions, // consistency mode (default: use configuration specified for Database) [] ?defaultConsistencyLevel: ConsistencyLevel, + // Timeout to apply to individual reads/write round-trips going to CosmosDB. CosmosDB Default: 6s + // NOTE Per CosmosDB Client guidance, it's recommended to leave this at its default + [] ?requestTimeout: TimeSpan, [] ?customize: Action) = let discoveryMode = discovery.ToDiscoveryMode() + let serializerOptions = serializerOptions |> Option.defaultValue System.Text.Json.JsonSerializerOptions.Default let factory = - let o = CosmosClientFactory.CreateDefaultOptions(requestTimeout, maxRetryAttemptsOnRateLimitedRequests, maxRetryWaitTimeOnRateLimitedRequests) + let o = CosmosClientFactory.CreateDefaultOptions(maxRetryAttemptsOnRateLimitedRequests, maxRetryWaitTimeOnRateLimitedRequests, serializerOptions) mode |> Option.iter (fun x -> o.ConnectionMode <- x) + requestTimeout |> Option.iter (fun x -> o.RequestTimeout <- x) defaultConsistencyLevel |> Option.iter (fun x -> o.ConsistencyLevel <- x) customize |> Option.iter (fun c -> c.Invoke o) CosmosClientFactory o + [] + new(discovery, requestTimeout: TimeSpan, maxRetryAttemptsOnRateLimitedRequests: int, maxRetryWaitTimeOnRateLimitedRequests, + ?mode, ?defaultConsistencyLevel, ?customize) = + CosmosStoreConnector(discovery, maxRetryAttemptsOnRateLimitedRequests, maxRetryWaitTimeOnRateLimitedRequests, ?mode = mode, + ?defaultConsistencyLevel = defaultConsistencyLevel, requestTimeout = requestTimeout, ?customize = customize, + ?serializerOptions = None) + /// The CosmosClientOptions used when connecting to CosmosDB member _.Options = factory.Options diff --git a/src/Equinox.CosmosStore/CosmosStoreSerialization.fs b/src/Equinox.CosmosStore/CosmosStoreSerialization.fs index 1dca4ecdd..79d198d5a 100644 --- a/src/Equinox.CosmosStore/CosmosStoreSerialization.fs +++ b/src/Equinox.CosmosStore/CosmosStoreSerialization.fs @@ -31,22 +31,6 @@ module JsonElement = if value.ValueKind = JsonValueKind.Null then value else value |> toUtf8Bytes |> Deflate.compress |> JsonSerializer.SerializeToElement -type CosmosJsonSerializer(options : JsonSerializerOptions) = - inherit Microsoft.Azure.Cosmos.CosmosSerializer() - - override _.FromStream<'T>(stream) = - use _ = stream - - if stream.Length = 0L then Unchecked.defaultof<'T> - elif typeof.IsAssignableFrom(typeof<'T>) then box stream :?> 'T - else JsonSerializer.Deserialize<'T>(stream, options) - - override _.ToStream<'T>(input : 'T) = - let memoryStream = new MemoryStream() - JsonSerializer.Serialize(memoryStream, input, input.GetType(), options) - memoryStream.Position <- 0L - memoryStream :> Stream - /// Manages inflating of the UTF-8 json bytes to make the index record minimal from the perspective of the writer stored proc /// Only relevant for unfolds in the Tip type JsonCompressedBase64Converter() = diff --git a/src/Equinox.CosmosStore/Equinox.CosmosStore.fsproj b/src/Equinox.CosmosStore/Equinox.CosmosStore.fsproj index 085735e50..ba4c22200 100644 --- a/src/Equinox.CosmosStore/Equinox.CosmosStore.fsproj +++ b/src/Equinox.CosmosStore/Equinox.CosmosStore.fsproj @@ -27,7 +27,7 @@ - + From 9e7142fe4ebffecaf67eab29c0f2470015987407 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Sat, 7 Sep 2024 11:06:06 +0100 Subject: [PATCH 2/4] Cleanup --- README.md | 2 +- samples/Infrastructure/Store.fs | 2 +- src/Equinox.CosmosStore/CosmosStore.fs | 83 ++++++++++--------- .../CosmosFixtures.fs | 3 +- 4 files changed, 45 insertions(+), 45 deletions(-) diff --git a/README.md b/README.md index 0810d8bd0..0da772f3d 100644 --- a/README.md +++ b/README.md @@ -170,7 +170,7 @@ The components within this repository are delivered as multi-targeted Nuget pack - `Equinox.Core` [![NuGet](https://img.shields.io/nuget/v/Equinox.Core.svg)](https://www.nuget.org/packages/Equinox.Core/): Hosts generic utility types frequently useful alongside Equinox: [`TaskCell`](https://github.com/jet/equinox/blob/master/src/Equinox.Core/TaskCell.fs#L36), [`Batcher`, `BatcherCache`, `BatcherDictionary`](https://github.com/jet/equinox/blob/master/src/Equinox.Core/Batching.fs#L44). ([depends](https://www.fuget.org/packages/Equinox.Core) on `System.Runtime.Caching`) - `Equinox.MemoryStore` [![MemoryStore NuGet](https://img.shields.io/nuget/v/Equinox.MemoryStore.svg)](https://www.nuget.org/packages/Equinox.MemoryStore/): In-memory store for integration testing/performance base-lining/providing out-of-the-box zero dependency storage for examples. ([depends](https://www.fuget.org/packages/Equinox.MemoryStore) on `Equinox`) -- `Equinox.CosmosStore` [![CosmosStore NuGet](https://img.shields.io/nuget/v/Equinox.CosmosStore.svg)](https://www.nuget.org/packages/Equinox.CosmosStore/): Azure CosmosDB Adapter with integrated 'unfolds' feature, facilitating optimal read performance in terms of latency and RU costs, instrumented to meet Jet's production monitoring requirements. ([depends](https://www.fuget.org/packages/Equinox.CosmosStore) on `Equinox`, `Equinox`, `Microsoft.Azure.Cosmos` >= `3.35.4`, `System.Text.Json`, `FSharp.Control.TaskSeq`) +- `Equinox.CosmosStore` [![CosmosStore NuGet](https://img.shields.io/nuget/v/Equinox.CosmosStore.svg)](https://www.nuget.org/packages/Equinox.CosmosStore/): Azure CosmosDB Adapter with integrated 'unfolds' feature, facilitating optimal read performance in terms of latency and RU costs, instrumented to meet Jet's production monitoring requirements. ([depends](https://www.fuget.org/packages/Equinox.CosmosStore) on `Equinox`, `Equinox`, `Microsoft.Azure.Cosmos` >= `3.43.0`, `System.Text.Json`, `FSharp.Control.TaskSeq`) - `Equinox.CosmosStore.Prometheus` [![CosmosStore.Prometheus NuGet](https://img.shields.io/nuget/v/Equinox.CosmosStore.Prometheus.svg)](https://www.nuget.org/packages/Equinox.CosmosStore.Prometheus/): Integration package providing a `Serilog.Core.ILogEventSink` that extracts detailed metrics information attached to the `LogEvent`s and feeds them to the `prometheus-net`'s `Prometheus.Metrics` static instance. ([depends](https://www.fuget.org/packages/Equinox.CosmosStore.Prometheus) on `Equinox.CosmosStore`, `prometheus-net >= 3.6.0`) - `Equinox.DynamoStore` [![DynamoStore NuGet](https://img.shields.io/nuget/v/Equinox.DynamoStore.svg)](https://www.nuget.org/packages/Equinox.DynamoStore/): Amazon DynamoDB Adapter with integrated 'unfolds' feature, facilitating optimal read performance in terms of latency and RC costs, patterned after `Equinox.CosmosStore`. ([depends](https://www.fuget.org/packages/Equinox.DynamoStore) on `Equinox`, `FSharp.AWS.DynamoDB` >= `0.12.0-beta`, `FSharp.Control.TaskSeq`) - `Equinox.DynamoStore.Prometheus` [![DynamoStore.Prometheus NuGet](https://img.shields.io/nuget/v/Equinox.DynamoStore.Prometheus.svg)](https://www.nuget.org/packages/Equinox.DynamoStore.Prometheus/): Integration package providing a `Serilog.Core.ILogEventSink` that extracts detailed metrics information attached to the `LogEvent`s and feeds them to the `prometheus-net`'s `Prometheus.Metrics` static instance. ([depends](https://www.fuget.org/packages/Equinox.CosmosStore.Prometheus) on `Equinox.DynamoStore`, `prometheus-net >= 3.6.0`) diff --git a/samples/Infrastructure/Store.fs b/samples/Infrastructure/Store.fs index c47850809..9b5344c67 100644 --- a/samples/Infrastructure/Store.fs +++ b/samples/Infrastructure/Store.fs @@ -101,7 +101,7 @@ module Cosmos = // - In hot-warm scenarios, the Archive Container will frequently be within the same account and hence can share a CosmosClient // For these typical purposes, CosmosStoreClient.Connect should be used to establish the Client, not custom wiring as we have here let createConnector (a: Arguments) connectionString = - CosmosStoreConnector(Discovery.ConnectionString connectionString, a.Timeout, a.Retries, a.MaxRetryWaitTime, ?mode = a.Mode) + CosmosStoreConnector(Discovery.ConnectionString connectionString, a.Retries, a.MaxRetryWaitTime, ?mode = a.Mode, timeout = a.Timeout) let connect (log: ILogger) (a: Arguments) = let primaryConnector, primaryDatabase, primaryContainer as primary = createConnector a a.Connection, a.Database, a.Container logContainer log "Primary" (a.Mode, primaryConnector.Endpoint, primaryDatabase, primaryContainer) diff --git a/src/Equinox.CosmosStore/CosmosStore.fs b/src/Equinox.CosmosStore/CosmosStore.fs index c2ab7b4a6..7b83e884e 100644 --- a/src/Equinox.CosmosStore/CosmosStore.fs +++ b/src/Equinox.CosmosStore/CosmosStore.fs @@ -13,24 +13,24 @@ type EventBody = JsonElement /// A single Domain Event from the array held in a Batch [] -type Event = // TODO for STJ v5: All fields required unless explicitly optional +type Event = { /// Creation datetime (as opposed to system-defined _lastUpdated which is touched by triggers, replication etc.) - t: DateTimeOffset // ISO 8601 + t: DateTimeOffset // Will be saved ISO 8601 formatted - /// The Case (Event Type); used to drive deserialization - c: string // required + /// The Case (Event Type); used to drive deserialization (Required) + c: string /// Event body, as UTF-8 encoded json ready to be injected into the Json being rendered for CosmosDB - d: EventBody // TODO for STJ v5: Required, but can be null so Nullary cases can work + d: EventBody // Can be Json Null for Nullary cases /// Optional metadata, as UTF-8 encoded json, ready to emit directly - m: EventBody // TODO for STJ v5: Optional, not serialized if missing + m: EventBody /// Optional correlationId - correlationId: string // TODO for STJ v5: Optional, not serialized if missing + correlationId: string /// Optional causationId - causationId: string } // TODO for STJ v5: Optional, not serialized if missing + causationId: string } interface IEventData with member x.EventType = x.c member x.Data = x.d @@ -42,20 +42,20 @@ type Event = // TODO for STJ v5: All fields required unless explicitly optional /// A 'normal' (frozen, not Tip) Batch of Events (without any Unfolds) [] -type Batch = // TODO for STJ v5: All fields required unless explicitly optional +type Batch = { /// CosmosDB-mandated Partition Key, must be maintained within the document - /// Not actually required if running in single partition mode, but for simplicity, we always write it - p: string // "{streamName}" TODO for STJ v5: Optional, not requested in queries + /// Technically not required if running in single partition mode, but being over simplistic would be more confusing + p: string // "{streamName}" /// CosmosDB-mandated unique row key; needs to be unique within any partition it is maintained; must be string - /// At the present time, one can't perform an ORDER BY on this field, hence we also have i shadowing it - /// NB Tip uses a well known value here while it's actively 'open' + /// There's no way to usefully ORDER BY on it; hence we have i shadowing it and use that instead + /// NB Tip uses a well known value ("-1") for the `id`; that document lives for the life of the stream id: string // "{index}" /// When we read, we need to capture the value so we can retain it for caching purposes /// NB this is not relevant to fill in when we pass it to the writing stored procedure /// as it will do: 1. read 2. merge 3. write merged version contingent on the _etag not having changed - _etag: string // TODO for STJ v5: Optional, not serialized if missing + _etag: string /// base 'i' value for the Events held herein i: int64 // {index} @@ -90,7 +90,7 @@ type Unfold = /// Optional metadata, same encoding as `d` (can be null; not written if missing) [)>] - m: EventBody } // TODO for STJ v5: Optional, not serialized if missing + m: EventBody } // Arrays are not indexed by default. 1. enable filtering by `c`ase 2. index uncompressed fields within unfolds for filtering static member internal IndexedPaths = [| "/u/[]/c/?"; "/u/[]/d/*" |] @@ -98,17 +98,17 @@ type Unfold = /// Stored representation has the following diffs vs a 'normal' (frozen/completed) Batch: a) `id` = `-1` b) contains unfolds (`u`) /// NB the type does double duty as a) model for when we read it b) encoding a batch being sent to the stored proc [] -type Tip = // TODO for STJ v5: All fields required unless explicitly optional +type Tip = { /// Partition key, as per Batch - p: string // "{streamName}" TODO for STJ v5: Optional, not requested in queries + p: string // "{streamName}" /// Document Id within partition, as per Batch - id: string // "{-1}" - Well known IdConstant used while this remains the pending batch + id: string // "{-1}" - Well known Id Constant used for the tail document (the only one that get mutated) /// When we read, we need to capture the value so we can retain it for caching purposes /// NB this is not relevant to fill in when we pass it to the writing stored procedure /// as it will do: 1. read 2. merge 3. write merged version contingent on the _etag not having changed - _etag: string // TODO for STJ v5: Optional, not serialized if missing + _etag: string /// base 'i' value for the Events held herein i: int64 @@ -1161,18 +1161,17 @@ type DiscoveryMode = | DiscoveryMode.AccountUriAndKey (u, _k) -> u | DiscoveryMode.ConnectionString (ConnectionString.AccountEndpoint e) -> e -/// Manages establishing a CosmosClient, which is used by CosmosStoreClient to read from the underlying Cosmos DB Container. +/// Manages establishing a CosmosClient, which is used by CosmosStoreClient to read from the underlying Cosmos DB Container type CosmosClientFactory(options) = - static member CreateDefaultOptions(maxRetryAttemptsOnRateLimitedRequests: int, maxRetryWaitTimeOnRateLimitedRequests: TimeSpan, serializerOptions) = + [] + static member CreateDefaultOptions(requestTimeout: TimeSpan, maxRetryAttemptsOnRateLimitedRequests: int, maxRetryWaitTimeOnRateLimitedRequests: TimeSpan) = CosmosClientOptions( MaxRetryAttemptsOnRateLimitedRequests = maxRetryAttemptsOnRateLimitedRequests, MaxRetryWaitTimeOnRateLimitedRequests = maxRetryWaitTimeOnRateLimitedRequests, - UseSystemTextJsonSerializerWithOptions = serializerOptions) - [] - static member CreateDefaultOptions(requestTimeout: TimeSpan, maxRetryAttemptsOnRateLimitedRequests: int, maxRetryWaitTimeOnRateLimitedRequests: TimeSpan) = - let o = CosmosClientFactory.CreateDefaultOptions(maxRetryAttemptsOnRateLimitedRequests, maxRetryWaitTimeOnRateLimitedRequests, JsonSerializerOptions.Default) - o.RequestTimeout <- requestTimeout - o + RequestTimeout = requestTimeout, + UseSystemTextJsonSerializerWithOptions = JsonSerializerOptions.Default) + /// Default when rendering/parsing Batch/Tip/Event/Unfold - omitting null values + static member val DefaultJsonSerializerOptions = JsonSerializerOptions(DefaultIgnoreCondition = Serialization.JsonIgnoreCondition.WhenWritingNull) /// CosmosClientOptions for this CosmosClientFactory as configured (NOTE while the Options object is not immutable, it should not have setters called on it) member val Options = options /// Creates an instance of CosmosClient without actually validating or establishing the connection @@ -1204,8 +1203,9 @@ type Discovery = | Discovery.ConnectionString c -> DiscoveryMode.ConnectionString c /// Manages establishing a CosmosClient, which is used by CosmosStoreClient to read from the underlying Cosmos DB Container. -type CosmosStoreConnector - ( // CosmosDB endpoint/credentials specification. +type CosmosStoreConnector(discovery: Discovery, factory: CosmosClientFactory) = + let discoveryMode = discovery.ToDiscoveryMode() + new(// CosmosDB endpoint/credentials specification. discovery: Discovery, // Maximum number of times to attempt when failure reason is a 429 from CosmosDB, signifying RU limits have been breached. CosmosDB default: 9 maxRetryAttemptsOnRateLimitedRequests: int, @@ -1214,30 +1214,31 @@ type CosmosStoreConnector // Connection mode (default: ConnectionMode.Direct (best performance, same as Microsoft.Azure.Cosmos SDK default) // NOTE: default for Equinox.Cosmos.Connector (i.e. V2) was Gateway (worst performance, least trouble, Microsoft.Azure.DocumentDb SDK default) [] ?mode: ConnectionMode, - // System.Text.Json SerializerOptions (Defaults to default options) + // Timeout to apply to individual reads/write round-trips going to CosmosDB. CosmosDB Default: 6s + // NOTE Per CosmosDB Client guidance, it's recommended to leave this at its default + [] ?timeout: TimeSpan, + // System.Text.Json SerializerOptions used for rendering Batches, Events and Unfolds internally + // NOTE as Events and/or Unfolds are serialized to `JsonElement`, there should rarely be a need to control the options at this level [] ?serializerOptions: System.Text.Json.JsonSerializerOptions, // consistency mode (default: use configuration specified for Database) [] ?defaultConsistencyLevel: ConsistencyLevel, - // Timeout to apply to individual reads/write round-trips going to CosmosDB. CosmosDB Default: 6s - // NOTE Per CosmosDB Client guidance, it's recommended to leave this at its default - [] ?requestTimeout: TimeSpan, [] ?customize: Action) = - let discoveryMode = discovery.ToDiscoveryMode() - let serializerOptions = serializerOptions |> Option.defaultValue System.Text.Json.JsonSerializerOptions.Default - let factory = - let o = CosmosClientFactory.CreateDefaultOptions(maxRetryAttemptsOnRateLimitedRequests, maxRetryWaitTimeOnRateLimitedRequests, serializerOptions) + let o = + CosmosClientOptions( + MaxRetryAttemptsOnRateLimitedRequests = maxRetryAttemptsOnRateLimitedRequests, + MaxRetryWaitTimeOnRateLimitedRequests = maxRetryWaitTimeOnRateLimitedRequests, + UseSystemTextJsonSerializerWithOptions = defaultArg serializerOptions CosmosClientFactory.DefaultJsonSerializerOptions) mode |> Option.iter (fun x -> o.ConnectionMode <- x) - requestTimeout |> Option.iter (fun x -> o.RequestTimeout <- x) + timeout |> Option.iter (fun x -> o.RequestTimeout <- x) defaultConsistencyLevel |> Option.iter (fun x -> o.ConsistencyLevel <- x) customize |> Option.iter (fun c -> c.Invoke o) - CosmosClientFactory o + CosmosStoreConnector(discovery, CosmosClientFactory o) [] new(discovery, requestTimeout: TimeSpan, maxRetryAttemptsOnRateLimitedRequests: int, maxRetryWaitTimeOnRateLimitedRequests, ?mode, ?defaultConsistencyLevel, ?customize) = CosmosStoreConnector(discovery, maxRetryAttemptsOnRateLimitedRequests, maxRetryWaitTimeOnRateLimitedRequests, ?mode = mode, - ?defaultConsistencyLevel = defaultConsistencyLevel, requestTimeout = requestTimeout, ?customize = customize, - ?serializerOptions = None) + ?defaultConsistencyLevel = defaultConsistencyLevel, timeout = requestTimeout, ?customize = customize) /// The CosmosClientOptions used when connecting to CosmosDB member _.Options = factory.Options diff --git a/tests/Equinox.CosmosStore.Integration/CosmosFixtures.fs b/tests/Equinox.CosmosStore.Integration/CosmosFixtures.fs index a8dca8baf..31f30a4ae 100644 --- a/tests/Equinox.CosmosStore.Integration/CosmosFixtures.fs +++ b/tests/Equinox.CosmosStore.Integration/CosmosFixtures.fs @@ -88,8 +88,7 @@ let discoverConnection () = let createConnector (log: Serilog.ILogger) = let name, discovery = discoverConnection () - let connector = CosmosStoreConnector(discovery, requestTimeout = TimeSpan.FromSeconds 3., - maxRetryAttemptsOnRateLimitedRequests = 2, maxRetryWaitTimeOnRateLimitedRequests = TimeSpan.FromMinutes 1.) + let connector = CosmosStoreConnector(discovery, 9, TimeSpan.FromMinutes 1.) log.Information("CosmosStore {name} {endpoint}", name, connector.Endpoint) connector From d67254c3b5072ef1b039d97e34810bf858e88628 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Sat, 7 Sep 2024 17:58:27 +0100 Subject: [PATCH 3/4] Restore binary compatibility --- .../CosmosStoreSerialization.fs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/Equinox.CosmosStore/CosmosStoreSerialization.fs b/src/Equinox.CosmosStore/CosmosStoreSerialization.fs index 79d198d5a..fd1075932 100644 --- a/src/Equinox.CosmosStore/CosmosStoreSerialization.fs +++ b/src/Equinox.CosmosStore/CosmosStoreSerialization.fs @@ -31,6 +31,23 @@ module JsonElement = if value.ValueKind = JsonValueKind.Null then value else value |> toUtf8Bytes |> Deflate.compress |> JsonSerializer.SerializeToElement +[] +type CosmosJsonSerializer(options : JsonSerializerOptions) = + inherit Microsoft.Azure.Cosmos.CosmosSerializer() + + override _.FromStream<'T>(stream) = + use _ = stream + + if stream.Length = 0L then Unchecked.defaultof<'T> + elif typeof.IsAssignableFrom(typeof<'T>) then box stream :?> 'T + else JsonSerializer.Deserialize<'T>(stream, options) + + override _.ToStream<'T>(input : 'T) = + let memoryStream = new MemoryStream() + JsonSerializer.Serialize(memoryStream, input, input.GetType(), options) + memoryStream.Position <- 0L + memoryStream :> Stream + /// Manages inflating of the UTF-8 json bytes to make the index record minimal from the perspective of the writer stored proc /// Only relevant for unfolds in the Tip type JsonCompressedBase64Converter() = From be4efe37a8ebd70c3555cd19eb72089d3a2542b7 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Wed, 30 Oct 2024 08:32:24 +0000 Subject: [PATCH 4/4] Handle drop to STJ 6 --- src/Equinox.CosmosStore/CosmosStore.fs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Equinox.CosmosStore/CosmosStore.fs b/src/Equinox.CosmosStore/CosmosStore.fs index 7b83e884e..2d255ed15 100644 --- a/src/Equinox.CosmosStore/CosmosStore.fs +++ b/src/Equinox.CosmosStore/CosmosStore.fs @@ -1169,7 +1169,7 @@ type CosmosClientFactory(options) = MaxRetryAttemptsOnRateLimitedRequests = maxRetryAttemptsOnRateLimitedRequests, MaxRetryWaitTimeOnRateLimitedRequests = maxRetryWaitTimeOnRateLimitedRequests, RequestTimeout = requestTimeout, - UseSystemTextJsonSerializerWithOptions = JsonSerializerOptions.Default) + UseSystemTextJsonSerializerWithOptions = JsonSerializerOptions()) /// Default when rendering/parsing Batch/Tip/Event/Unfold - omitting null values static member val DefaultJsonSerializerOptions = JsonSerializerOptions(DefaultIgnoreCondition = Serialization.JsonIgnoreCondition.WhenWritingNull) /// CosmosClientOptions for this CosmosClientFactory as configured (NOTE while the Options object is not immutable, it should not have setters called on it)