Skip to content

Commit

Permalink
Update alpha
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Jan 1, 2025
1 parent 5b27bea commit 966fbf1
Show file tree
Hide file tree
Showing 13 changed files with 55 additions and 23 deletions.
4 changes: 1 addition & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ The `Unreleased` section name is replaced by the expected version of next releas

- `eqx stats`: `-A` (all stats) is now the default unless you specify >=1 of the individual stats via `ESDNO` flags [#459](https://github.com/jet/equinox/pull/459)

### Fixed

<a name="4.0.4"></a>
## [4.0.4] - 2024-05-08

Expand Down Expand Up @@ -109,7 +107,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `CosmosStore`: Only log `bytes` when log level is `Debug` [#305](https://github.com/jet/equinox/pull/305)
- `CosmosStore.AccessStrategy.MultiSnapshot`,`Custom`: Change `list` and `seq` types to `array` [#338](https://github.com/jet/equinox/pull/338)
- `CosmosStore.CosmosStoreCategory`: Generalize `compressUnfolds` to `shouldCompress` predicate [#436](https://github.com/jet/equinox/pull/436)
- `CosmosStore.CosmosStoreCategory.TryHydrateTip`: Generates a Stream State Momento based on externally loaded `u`nfold state [#434](https://github.com/jet/equinox/pull/434)
- `CosmosStore.CosmosStoreCategory.TryHydrateTip`: Generates a Stream State Memento based on externally loaded `u`nfold state [#434](https://github.com/jet/equinox/pull/434)
- `CosmosStore.CosmosStoreCategory.TryLoad`: Renders a `'state` based on an Unfold [#434](https://github.com/jet/equinox/pull/434)
- `CosmosStore.Core.Initialization.initAux`: Replace hard-coded manual 400 RU with `mode` parameter [#328](https://github.com/jet/equinox/pull/328) :pray: [@brihadish](https://github.com/brihadish)
- `CosmosStore.CosmosClientFactory`: Moved to Core [#430](https://github.com/jet/equinox/pull/430)
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ The components within this repository are delivered as multi-targeted Nuget pack

- `Equinox.Core` [![NuGet](https://img.shields.io/nuget/v/Equinox.Core.svg)](https://www.nuget.org/packages/Equinox.Core/): Hosts generic utility types frequently useful alongside Equinox: [`TaskCell`](https://github.com/jet/equinox/blob/master/src/Equinox.Core/TaskCell.fs#L36), [`Batcher`, `BatcherCache`, `BatcherDictionary`](https://github.com/jet/equinox/blob/master/src/Equinox.Core/Batching.fs#L44). ([depends](https://www.fuget.org/packages/Equinox.Core) on `System.Runtime.Caching`)
- `Equinox.MemoryStore` [![MemoryStore NuGet](https://img.shields.io/nuget/v/Equinox.MemoryStore.svg)](https://www.nuget.org/packages/Equinox.MemoryStore/): In-memory store for integration testing/performance base-lining/providing out-of-the-box zero dependency storage for examples. ([depends](https://www.fuget.org/packages/Equinox.MemoryStore) on `Equinox`)
- `Equinox.CosmosStore` [![CosmosStore NuGet](https://img.shields.io/nuget/v/Equinox.CosmosStore.svg)](https://www.nuget.org/packages/Equinox.CosmosStore/): Azure CosmosDB Adapter with integrated 'unfolds' feature, facilitating optimal read performance in terms of latency and RU costs, instrumented to meet Jet's production monitoring requirements. ([depends](https://www.fuget.org/packages/Equinox.CosmosStore) on `Equinox`, `Equinox`, `Microsoft.Azure.Cosmos` >= `3.43.0`, `System.Text.Json`, `FSharp.Control.TaskSeq`)
- `Equinox.CosmosStore` [![CosmosStore NuGet](https://img.shields.io/nuget/v/Equinox.CosmosStore.svg)](https://www.nuget.org/packages/Equinox.CosmosStore/): Azure CosmosDB Adapter with integrated 'unfolds' feature, facilitating optimal read performance in terms of latency and RU costs, instrumented to meet Jet's production monitoring requirements. ([depends](https://www.fuget.org/packages/Equinox.CosmosStore) on `Equinox`, `Equinox`, `Microsoft.Azure.Cosmos` >= `3.43.1`, `System.Text.Json`, `FSharp.Control.TaskSeq`)
- `Equinox.CosmosStore.Prometheus` [![CosmosStore.Prometheus NuGet](https://img.shields.io/nuget/v/Equinox.CosmosStore.Prometheus.svg)](https://www.nuget.org/packages/Equinox.CosmosStore.Prometheus/): Integration package providing a `Serilog.Core.ILogEventSink` that extracts detailed metrics information attached to the `LogEvent`s and feeds them to the `prometheus-net`'s `Prometheus.Metrics` static instance. ([depends](https://www.fuget.org/packages/Equinox.CosmosStore.Prometheus) on `Equinox.CosmosStore`, `prometheus-net >= 3.6.0`)
- `Equinox.DynamoStore` [![DynamoStore NuGet](https://img.shields.io/nuget/v/Equinox.DynamoStore.svg)](https://www.nuget.org/packages/Equinox.DynamoStore/): Amazon DynamoDB Adapter with integrated 'unfolds' feature, facilitating optimal read performance in terms of latency and RC costs, patterned after `Equinox.CosmosStore`. ([depends](https://www.fuget.org/packages/Equinox.DynamoStore) on `Equinox`, `FSharp.AWS.DynamoDB` >= `0.12.0-beta`, `FSharp.Control.TaskSeq`)
- `Equinox.DynamoStore.Prometheus` [![DynamoStore.Prometheus NuGet](https://img.shields.io/nuget/v/Equinox.DynamoStore.Prometheus.svg)](https://www.nuget.org/packages/Equinox.DynamoStore.Prometheus/): Integration package providing a `Serilog.Core.ILogEventSink` that extracts detailed metrics information attached to the `LogEvent`s and feeds them to the `prometheus-net`'s `Prometheus.Metrics` static instance. ([depends](https://www.fuget.org/packages/Equinox.CosmosStore.Prometheus) on `Equinox.DynamoStore`, `prometheus-net >= 3.6.0`)
Expand Down
2 changes: 1 addition & 1 deletion samples/Infrastructure/Services.fs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type Store(store) =
MemoryStore.MemoryStoreCategory(store, name, codec, fold, initial)
| Store.Config.Cosmos (store, caching, unfolds) ->
let accessStrategy = if unfolds then CosmosStore.AccessStrategy.Snapshot snapshot else CosmosStore.AccessStrategy.Unoptimized
CosmosStore.CosmosStoreCategory<'event,'state,_>(store, name, FsCodec.SystemTextJson.Compression.EncodeTryCompress codec, fold, initial, accessStrategy, caching)
CosmosStore.CosmosStoreCategory<'event,'state,_>(store, name, FsCodec.SystemTextJson.EncodedBody.EncodeTryCompress codec, fold, initial, accessStrategy, caching)
| Store.Config.Dynamo (store, caching, unfolds) ->
let accessStrategy = if unfolds then DynamoStore.AccessStrategy.Snapshot snapshot else DynamoStore.AccessStrategy.Unoptimized
DynamoStore.DynamoStoreCategory<'event,'state,_>(store, name, FsCodec.Compression.EncodeTryCompress codec, fold, initial, accessStrategy, caching)
Expand Down
3 changes: 2 additions & 1 deletion samples/Infrastructure/Store.fs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ module Cosmos =
member _.MaxRetryWaitTime = p.GetResult(RetriesWaitTimeS, 5.) |> TimeSpan.FromSeconds
member _.TipMaxEvents = p.GetResult(TipMaxEvents, 256)
member _.TipMaxJsonLength = p.GetResult(TipMaxJsonLength, 30_000)
member x.QueryMaxItems = p.GetResult(QueryMaxItems, 10)
member _.QueryMaxItemsOr(def: int) = p.GetResult(QueryMaxItems, def)
member x.QueryMaxItems = x.QueryMaxItemsOr 10

let logContainer (log: ILogger) role (mode, endpoint, db, container) =
log.Information("CosmosDB {role:l} {mode} {connection} Database {database} Container {container}",
Expand Down
2 changes: 1 addition & 1 deletion samples/Store/Domain/Domain.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<PackageReference Include="FSharp.Core" Version="6.0.7" ExcludeAssets="contentfiles" />

<PackageReference Include="FsCodec.NewtonsoftJson" Version="3.0.0" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.4-alpha.0.2" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.4-alpha.0.5" />

<ProjectReference Include="..\..\..\src\Equinox\Equinox.fsproj" />
</ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion samples/Store/Domain/Infrastructure.fs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ module EventCodec =

/// For CosmosStore - we encode to JsonElement as that's what the store talks
let genJsonElement<'t when 't :> TypeShape.UnionContract.IUnionContract> =
FsCodec.SystemTextJson.CodecJsonElement.Create<'t>() |> FsCodec.SystemTextJson.Compression.EncodeUncompressed
FsCodec.SystemTextJson.CodecJsonElement.Create<'t>() |> FsCodec.SystemTextJson.EncodedBody.EncodeUncompressed

/// For stores other than CosmosStore, we encode to UTF-8 and have the store do the right thing
let gen<'t when 't :> TypeShape.UnionContract.IUnionContract> =
Expand Down
2 changes: 1 addition & 1 deletion samples/Tutorial/Infrastructure.fs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ module EventCodec =

/// For CosmosStore - we encode to JsonElement as that's what the store talks
let genJsonElement<'t when 't :> TypeShape.UnionContract.IUnionContract> =
FsCodec.SystemTextJson.CodecJsonElement.Create<'t>() |> FsCodec.SystemTextJson.Compression.EncodeUncompressed
FsCodec.SystemTextJson.CodecJsonElement.Create<'t>() |> FsCodec.SystemTextJson.EncodedBody.EncodeUncompressed

/// For stores other than CosmosStore, we encode to UTF-8 and have the store do the right thing
let gen<'t when 't :> TypeShape.UnionContract.IUnionContract> =
Expand Down
2 changes: 1 addition & 1 deletion samples/Tutorial/Tutorial.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

<ItemGroup>
<PackageReference Include="MinVer" Version="5.0.0" PrivateAssets="All" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.4-alpha.0.2" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.4-alpha.0.5" />
<PackageReference Include="Serilog.Sinks.Console" Version="5.0.1" />
<PackageReference Include="Serilog.Sinks.Seq" Version="7.0.0" />
</ItemGroup>
Expand Down
10 changes: 9 additions & 1 deletion src/Equinox.CosmosStore/CosmosStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ type EncodedBody = (struct (int * JsonElement))
module internal EncodedBody =
let internal jsonRawText: EncodedBody -> string = ValueTuple.snd >> _.GetRawText()
let internal jsonUtf8Bytes = jsonRawText >> System.Text.Encoding.UTF8.GetByteCount
let [<Literal>] deflateEncoding = 1
// prior to the addition of the `D` field in 4.1.0, the integrated compression support
// was predicated entirely on a JSON String `d` value in the Unfold as implying it was UTF8->Deflate->Base64 encoded
let parseUnfold = function struct (0, e: JsonElement) when e.ValueKind = JsonValueKind.String -> struct (deflateEncoding, e) | x -> x

/// A single Domain Event from the array held in a Batch
[<NoEquality; NoComparison>]
Expand All @@ -28,12 +32,14 @@ type Event =
c: string

/// Event body
[<Serialization.JsonIgnore(Condition = Serialization.JsonIgnoreCondition.WhenWritingDefault)>]
d: JsonElement // Can be Json Null for Nullary cases
/// The encoding scheme used for `d`
[<Serialization.JsonIgnore(Condition = Serialization.JsonIgnoreCondition.WhenWritingDefault)>]
D: int

/// Optional metadata
[<Serialization.JsonIgnore(Condition = Serialization.JsonIgnoreCondition.WhenWritingDefault)>]
m: JsonElement
/// The encoding scheme used for `m`
[<Serialization.JsonIgnore(Condition = Serialization.JsonIgnoreCondition.WhenWritingDefault)>]
Expand Down Expand Up @@ -89,12 +95,14 @@ type Unfold =
/// The Case (Event Type) of this snapshot, used to drive deserialization
c: string // required

[<Serialization.JsonIgnore(Condition = Serialization.JsonIgnoreCondition.WhenWritingDefault)>]
d: JsonElement // required
/// The encoding scheme used for `d`
[<Serialization.JsonIgnore(Condition = Serialization.JsonIgnoreCondition.WhenWritingDefault)>]
D: int

/// Optional metadata, same encoding as `d` (can be null; not written if missing)
[<Serialization.JsonIgnore(Condition = Serialization.JsonIgnoreCondition.WhenWritingDefault)>]
m: JsonElement
/// The encoding scheme used for `m`
[<Serialization.JsonIgnore(Condition = Serialization.JsonIgnoreCondition.WhenWritingDefault)>]
Expand Down Expand Up @@ -169,7 +177,7 @@ type internal Enum private () =
static member internal Events(b: Batch, ?minIndex, ?maxIndex) =
Enum.Events(b.i, b.e, ?minIndex = minIndex, ?maxIndex = maxIndex)
static member Unfolds(xs: Unfold[]): ITimelineEvent<EncodedBody> seq = seq {
for x in xs -> FsCodec.Core.TimelineEvent.Create(x.i, x.c, (x.D, x.d), (x.M, x.m), Guid.Empty, null, null, x.t, isUnfold = true) }
for x in xs -> FsCodec.Core.TimelineEvent.Create(x.i, x.c, EncodedBody.parseUnfold (x.D, x.d), (x.M, x.m), Guid.Empty, null, null, x.t, isUnfold = true) }
static member EventsAndUnfolds(x: Tip, ?minIndex, ?maxIndex): ITimelineEvent<EncodedBody> seq =
Enum.Events(x, ?minIndex = minIndex, ?maxIndex = maxIndex)
|> Seq.append (Enum.Unfolds x.u)
Expand Down
2 changes: 1 addition & 1 deletion tests/Equinox.CosmosStore.Integration/AccessStrategies.fs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ module SequenceCheck =
#if STORE_DYNAMO
let codec = FsCodec.SystemTextJson.Codec.Create<Event>() |> FsCodec.Compression.EncodeTryCompress
#else
let codec = FsCodec.SystemTextJson.CodecJsonElement.Create<Event>() |> FsCodec.SystemTextJson.Compression.EncodeTryCompress
let codec = FsCodec.SystemTextJson.CodecJsonElement.Create<Event>() |> FsCodec.SystemTextJson.EncodedBody.EncodeTryCompress
#endif

module Fold =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ open System

type TestEvents() =
static member private Create(i, ?eventType, ?json) =
let enc = System.Text.Json.JsonSerializer.SerializeToElement >> FsCodec.SystemTextJson.Compression.Encode
let enc = System.Text.Json.JsonSerializer.SerializeToElement >> FsCodec.SystemTextJson.EncodedBody.Uncompressed
FsCodec.Core.EventData.Create
( sprintf "%s:%d" (defaultArg eventType "test_event") i,
enc (defaultArg json "{\"d\":\"d\"}"),
Expand Down Expand Up @@ -65,7 +65,7 @@ type Tests(testOutputHelper) =
test <@ match res with Choice2Of2 (:? InvalidOperationException as ex) -> ex.Message.StartsWith "Must write either events or unfolds." | x -> failwith $"%A{x}" @>
}
let stringOfEncodedBody (x: Equinox.CosmosStore.Core.EncodedBody) = FsCodec.SystemTextJson.Compression.DecodeToJsonElement(x).GetRawText()
let stringOfEncodedBody (x: Equinox.CosmosStore.Core.EncodedBody) = FsCodec.SystemTextJson.EncodedBody.ToJsonElement(x).GetRawText()
let jsonDiff (x: string) (y: string) =
match JsonDiffPatchDotNet.JsonDiffPatch().Diff(JToken.Parse x, JToken.Parse y) with
| null -> ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ module Cart =
StoreCategory(context, Cart.CategoryName, codec, fold, initial, AccessStrategy.MultiSnapshot unfArgs, Equinox.CachingStrategy.NoCaching)
|> Equinox.Decider.forStream log
|> Cart.create
let createCategorySnapshot context = StoreCategory(context, Cart.CategoryName, codec, fold, initial, AccessStrategy.Snapshot Cart.Fold.Snapshot.config, Equinox.CachingStrategy.NoCaching)
let createServiceWithSnapshotStrategy log context =
StoreCategory(context, Cart.CategoryName, codec, fold, initial, AccessStrategy.Snapshot Cart.Fold.Snapshot.config, Equinox.CachingStrategy.NoCaching)
createCategorySnapshot context
|> Equinox.Decider.forStream log
|> Cart.create
let createServiceWithSnapshotStrategyAndCaching log context cache =
Expand Down
40 changes: 32 additions & 8 deletions tests/Equinox.CosmosStore.Integration/FsCodecCompressionTests.fs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
// Prior to version v 4.1.0, CosmosStore owned:
// - compression of snapshots (and APIs controlling conditionally of that)
// - inflation of snapshots
// This is now an external concern, fully implemented by APIs presented in FsCodec.SystemTextJson.Compression V 3.1.0 and later
// This is now an external concern, fully implemented by APIs presented in FsCodec.SystemTextJson.Compression v 3.1.0 and later
// These tests are a sanity check pinning the basic mechanisms that are now externalized; any more thorough tests should be maintained in FsCodec
// NOTE there is no strong dependency on FsCodec; CosmosStore is happy to roundtrip arbitrary pairs of D/d and M/m values
// However, it is recommended to use that implementation as it provides for interop with (Deflate-compressed) snapshots as written by CosmosStore pre 4.1.0
// NOTE prior to v 4.1.0, CosmosStore provided a System.Text.Json integration for Microsoft.Azure.Cosmos
// Version 4.1.0 and later lean on the integrated support provided from Microsoft.Azure.Cosmos v 3.43.0 onward
module Equinox.CosmosStore.Integration.FsCodecCompressionTests
Expand All @@ -24,30 +23,29 @@ type Union =
type CoreBehaviors() =
let defaultOptions = System.Text.Json.JsonSerializerOptions.Default // Rule out dependencies on any FsCodec conversions
let eventCodec = FsCodec.SystemTextJson.CodecJsonElement.Create(defaultOptions)
let nullElement = System.Text.Json.JsonSerializer.SerializeToElement null

let ser_ et struct (D, d) =
let e: Core.Unfold =
{ i = 42L
c = et
d = d
D = D
m = nullElement
m = Unchecked.defaultof<_>
M = Unchecked.defaultof<int>
t = DateTimeOffset.MinValue }
System.Text.Json.JsonSerializer.Serialize e
let ser (e: FsCodec.IEventData<Equinox.CosmosStore.Core.EncodedBody>) = ser_ e.EventType e.Data

[<Fact>]
let ``serializes, achieving expected compression`` () =
let encoded = eventCodec |> FsCodec.SystemTextJson.Compression.EncodeTryCompress |> _.Encode((), A { embed = String('x',5000) })
let encoded = eventCodec |> FsCodec.SystemTextJson.EncodedBody.EncodeTryCompress |> _.Encode((), A { embed = String('x',5000) })
let res = ser encoded
test <@ res.Contains "\"d\":\"" && res.Length < 138 && res.Contains "\"D\":2" @>

let codec compress =
let forceCompression: FsCodec.SystemTextJson.CompressionOptions = { minSize = 0; minGain = -1000 }
if compress then FsCodec.SystemTextJson.Compression.EncodeTryCompress(eventCodec, forceCompression)
else FsCodec.SystemTextJson.Compression.EncodeUncompressed eventCodec
if compress then FsCodec.SystemTextJson.EncodedBody.EncodeTryCompress(eventCodec, forceCompression)
else FsCodec.SystemTextJson.EncodedBody.EncodeUncompressed eventCodec

[<Property>]
let roundtrips compress value =
Expand All @@ -63,6 +61,32 @@ type CoreBehaviors() =

[<Fact>]
let handlesNulls () =
let ser = ser_ "et" (0, nullElement)
let ser = ser_ "et" (0, System.Text.Json.JsonSerializer.SerializeToElement null)
let des = System.Text.Json.JsonSerializer.Deserialize<Core.Unfold>(ser)
test <@ System.Text.Json.JsonValueKind.Null = let d = des.d in d.ValueKind @>

module Pre41IntegratedDeflate =
let private deflate (uncompressedBytes: byte[]) =
let output = new System.IO.MemoryStream()
let compressor = new System.IO.Compression.DeflateStream(output, System.IO.Compression.CompressionLevel.Optimal, leaveOpen = true)
compressor.Write(uncompressedBytes)
compressor.Flush() // Could `Close`, but not required
output.ToArray()
let encode (x: 't) = System.Text.Json.JsonSerializer.SerializeToUtf8Bytes x |> deflate |> Convert.ToBase64String |> System.Text.Json.JsonSerializer.SerializeToElement

type BackCompatTests() =

/// Validates that an Unfold with a compressed `d` value without an associated `D` value of `1` is correctly inflated via Core.EncodedBody.parseUnfold
[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
let ```Can inflate pre v 4.1 Deflate encoded snapshots`` () = Async.RunSynchronously <| async {
let context = createPrimaryContext Serilog.Log.Logger 10
let cat = Equinox.Store.Integration.DocumentStoreIntegration.Cart.createCategorySnapshot context
let expected: Domain.Cart.Fold.State = {
items = [
{ skuId = SkuId Guid.Empty; quantity = 4; returnsWaived = Some true } ] }
TypeShape.Empty.register (fun () -> Unchecked.defaultof<System.Text.Json.JsonElement>)
let unfold = {
TypeShape.Empty.empty<Equinox.CosmosStore.Core.Unfold> with
c = nameof Domain.Cart.Events.Event.Snapshotted
d = Pre41IntegratedDeflate.encode expected }
Some expected =! cat.TryLoad [| unfold |] }

0 comments on commit 966fbf1

Please sign in to comment.