Skip to content

Commit

Permalink
feat(Cosmos): Use Azure.Cosmos STJ impl
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Sep 7, 2024
1 parent 7d1e455 commit 0125145
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 23 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ The `Unreleased` section name is replaced by the expected version of next releas

### Added

- `Equinox.CosmosStore`: Use `Microsoft.Azure.Cosmos` integrated `System.Text.Json` support; added ability to specify `serializerOptions` [#467](https://github.com/jet/equinox/pull/467)
- `Equinox.CosmosStore`: Group metrics by Container Name [#449](https://github.com/jet/equinox/pull/449)
- `Equinox.CosmosStore`: Group metrics by Category; split out `Tip` activity [#453](https://github.com/jet/equinox/pull/453)
- `Equinox.CosmosStore`: Support Ingesting unfolds [#460](https://github.com/jet/equinox/pull/460)
Expand All @@ -20,6 +21,8 @@ The `Unreleased` section name is replaced by the expected version of next releas
### Changed

- `Equinox.*Store`,`Equinox.*Store.Prometheus`: Pin `Equinox` dependencies to `[4.0.0, 5.0.0)`] [#448](https://github.com/jet/equinox/pull/448)
- `Equinox.CosmosStore`: Minimum `Microsoft.Azure.Cosmos` requirement updated to `3.43.0` to avail of integrated `System.Text.Json` support [#467](https://github.com/jet/equinox/pull/467)
- `Equinox.CosmosStore.CosmosStoreConnector`: Removed mandatory `requestTimeout` argument [#467](https://github.com/jet/equinox/pull/467)
- `Equinox.CosmosStore`: Minimum `System.Text.Json` requirement updated to `8.0.4` as that's the lowest non-flagged version [#462](https://github.com/jet/equinox/pull/462)
- `Equinox.MessageDb`: Up min `Npgsql` to v `7.0.7` as `7.0.0` is on CVE blacklist

Expand Down
28 changes: 22 additions & 6 deletions src/Equinox.CosmosStore/CosmosStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -1164,12 +1164,16 @@ type DiscoveryMode =

/// Manages establishing a CosmosClient, which is used by CosmosStoreClient to read from the underlying Cosmos DB Container.
type CosmosClientFactory(options) =
static member CreateDefaultOptions(requestTimeout: TimeSpan, maxRetryAttemptsOnRateLimitedRequests: int, maxRetryWaitTimeOnRateLimitedRequests: TimeSpan) =
static member CreateDefaultOptions(maxRetryAttemptsOnRateLimitedRequests: int, maxRetryWaitTimeOnRateLimitedRequests: TimeSpan, serializerOptions) =
CosmosClientOptions(
MaxRetryAttemptsOnRateLimitedRequests = maxRetryAttemptsOnRateLimitedRequests,
MaxRetryWaitTimeOnRateLimitedRequests = maxRetryWaitTimeOnRateLimitedRequests,
RequestTimeout = requestTimeout,
Serializer = CosmosJsonSerializer(JsonSerializerOptions()))
UseSystemTextJsonSerializerWithOptions = serializerOptions)
[<Obsolete "Will be removed in V5; please use the overload that includes `serializerOptions`">]
static member CreateDefaultOptions(requestTimeout: TimeSpan, maxRetryAttemptsOnRateLimitedRequests: int, maxRetryWaitTimeOnRateLimitedRequests: TimeSpan) =
let o = CosmosClientFactory.CreateDefaultOptions(maxRetryAttemptsOnRateLimitedRequests, maxRetryWaitTimeOnRateLimitedRequests, JsonSerializerOptions.Default)
o.RequestTimeout <- requestTimeout
o
/// CosmosClientOptions for this CosmosClientFactory as configured (NOTE while the Options object is not immutable, it should not have setters called on it)
member val Options = options
/// Creates an instance of CosmosClient without actually validating or establishing the connection
Expand Down Expand Up @@ -1204,26 +1208,38 @@ type Discovery =
type CosmosStoreConnector
( // CosmosDB endpoint/credentials specification.
discovery: Discovery,
// Timeout to apply to individual reads/write round-trips going to CosmosDB. CosmosDB Default: 1m.
requestTimeout: TimeSpan,
// Maximum number of times to attempt when failure reason is a 429 from CosmosDB, signifying RU limits have been breached. CosmosDB default: 9
maxRetryAttemptsOnRateLimitedRequests: int,
// Maximum number of seconds to wait (especially if a higher wait delay is suggested by CosmosDB in the 429 response). CosmosDB default: 30s
maxRetryWaitTimeOnRateLimitedRequests: TimeSpan,
// Connection mode (default: ConnectionMode.Direct (best performance, same as Microsoft.Azure.Cosmos SDK default)
// NOTE: default for Equinox.Cosmos.Connector (i.e. V2) was Gateway (worst performance, least trouble, Microsoft.Azure.DocumentDb SDK default)
[<O; D null>] ?mode: ConnectionMode,
// System.Text.Json SerializerOptions (Defaults to default options)
[<O; D null>] ?serializerOptions: System.Text.Json.JsonSerializerOptions,
// consistency mode (default: use configuration specified for Database)
[<O; D null>] ?defaultConsistencyLevel: ConsistencyLevel,
// Timeout to apply to individual reads/write round-trips going to CosmosDB. CosmosDB Default: 6s
// NOTE Per CosmosDB Client guidance, it's recommended to leave this at its default
[<O; D null>] ?requestTimeout: TimeSpan,
[<O; D null>] ?customize: Action<CosmosClientOptions>) =
let discoveryMode = discovery.ToDiscoveryMode()
let serializerOptions = serializerOptions |> Option.defaultValue System.Text.Json.JsonSerializerOptions.Default
let factory =
let o = CosmosClientFactory.CreateDefaultOptions(requestTimeout, maxRetryAttemptsOnRateLimitedRequests, maxRetryWaitTimeOnRateLimitedRequests)
let o = CosmosClientFactory.CreateDefaultOptions(maxRetryAttemptsOnRateLimitedRequests, maxRetryWaitTimeOnRateLimitedRequests, serializerOptions)
mode |> Option.iter (fun x -> o.ConnectionMode <- x)
requestTimeout |> Option.iter (fun x -> o.RequestTimeout <- x)
defaultConsistencyLevel |> Option.iter (fun x -> o.ConsistencyLevel <- x)
customize |> Option.iter (fun c -> c.Invoke o)
CosmosClientFactory o

[<Obsolete "For backcompat only; will be removed in V5">]
new(discovery, requestTimeout: TimeSpan, maxRetryAttemptsOnRateLimitedRequests: int, maxRetryWaitTimeOnRateLimitedRequests,
?mode, ?defaultConsistencyLevel, ?customize) =
CosmosStoreConnector(discovery, maxRetryAttemptsOnRateLimitedRequests, maxRetryWaitTimeOnRateLimitedRequests, ?mode = mode,
?defaultConsistencyLevel = defaultConsistencyLevel, requestTimeout = requestTimeout, ?customize = customize,
?serializerOptions = None)

/// The <c>CosmosClientOptions</c> used when connecting to CosmosDB
member _.Options = factory.Options

Expand Down
16 changes: 0 additions & 16 deletions src/Equinox.CosmosStore/CosmosStoreSerialization.fs
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,6 @@ module JsonElement =
if value.ValueKind = JsonValueKind.Null then value
else value |> toUtf8Bytes |> Deflate.compress |> JsonSerializer.SerializeToElement

type CosmosJsonSerializer(options : JsonSerializerOptions) =
inherit Microsoft.Azure.Cosmos.CosmosSerializer()

override _.FromStream<'T>(stream) =
use _ = stream

if stream.Length = 0L then Unchecked.defaultof<'T>
elif typeof<Stream>.IsAssignableFrom(typeof<'T>) then box stream :?> 'T
else JsonSerializer.Deserialize<'T>(stream, options)

override _.ToStream<'T>(input : 'T) =
let memoryStream = new MemoryStream()
JsonSerializer.Serialize(memoryStream, input, input.GetType(), options)
memoryStream.Position <- 0L
memoryStream :> Stream

/// Manages inflating of the UTF-8 json bytes to make the index record minimal from the perspective of the writer stored proc
/// Only relevant for unfolds in the Tip
type JsonCompressedBase64Converter() =
Expand Down
2 changes: 1 addition & 1 deletion src/Equinox.CosmosStore/Equinox.CosmosStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
<PackageReference Include="FSharp.Core" Version="6.0.7" ExcludeAssets="contentfiles" />

<PackageReference Include="FSharp.Control.TaskSeq" Version="0.4.0" />
<PackageReference Include="Microsoft.Azure.Cosmos" Version="3.35.4" />
<PackageReference Include="Microsoft.Azure.Cosmos" Version="3.43.0" />
<PackageReference Include="System.Text.Json" Version="8.0.4" />
</ItemGroup>

Expand Down

0 comments on commit 0125145

Please sign in to comment.