diff --git a/CHANGELOG.md b/CHANGELOG.md index 22bfd2f8b..37d80850b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,6 +49,7 @@ The `Unreleased` section name is replaced by the expected version of next releas ### Removed - Remove explicit `net461` handling; minimum target now `net6.0` / `FSharp.Core` v `6.0.0` [#310](https://github.com/jet/equinox/pull/310) [#323](https://github.com/jet/equinox/pull/323) [#354](https://github.com/jet/equinox/pull/354) +- Remove `Equinox.Core.ICache` (there is/was only one impl, and the interface has changed as part of [#386](https://github.com/jet/equinox/pull/386)) [#389](https://github.com/jet/equinox/pull/389) ### Fixed diff --git a/src/Equinox.Core/Cache.fs b/src/Equinox.Core/Cache.fs index 82be8225f..d5633329e 100755 --- a/src/Equinox.Core/Cache.fs +++ b/src/Equinox.Core/Cache.fs @@ -1,38 +1,10 @@ -namespace Equinox.Core - -open System -open System.Runtime.Caching -open System.Threading.Tasks - -type [] CacheItemOptions = - | AbsoluteExpiration of ae: DateTimeOffset - | RelativeExpiration of re: TimeSpan -module internal CacheItemOptions = - let toPolicy = function - | AbsoluteExpiration absolute -> CacheItemPolicy(AbsoluteExpiration = absolute) - | RelativeExpiration relative -> CacheItemPolicy(SlidingExpiration = relative) - -type ICache = - abstract member Load: key: string - * maxAge: TimeSpan - * isStale: Func - * options: CacheItemOptions - * loadOrReload: (struct (StreamToken * 'state) voption -> Task) - -> Task - abstract member Save: key: string - * isStale: Func - * options: CacheItemOptions - * timestamp: int64 - * token: StreamToken * state: 'state - -> unit - namespace Equinox open Equinox.Core open Equinox.Core.Tracing open System -type internal CacheEntry<'state>(initialToken: StreamToken, initialState: 'state, initialTimestamp: int64) = +type private CacheEntry<'state>(initialToken: StreamToken, initialState: 'state, initialTimestamp: int64) = let mutable currentToken = initialToken let mutable currentState = initialState let mutable verifiedTimestamp = initialTimestamp @@ -79,13 +51,13 @@ type Cache private (inner: System.Runtime.Caching.MemoryCache) = | null -> ValueNone | :? CacheEntry<'state> as existingEntry -> existingEntry.TryGetValue() | x -> failwith $"tryLoad Incompatible cache entry %A{x}" - let addOrGet key options entry = - match inner.AddOrGetExisting(key, entry, CacheItemOptions.toPolicy options) with + let addOrGet key policy entry = + match inner.AddOrGetExisting(key, entry, policy = policy) with | null -> Ok entry | :? CacheEntry<'state> as existingEntry -> Error existingEntry | x -> failwith $"addOrGet Incompatible cache entry %A{x}" - let getElseAddEmptyEntry key options = - match addOrGet key options (CacheEntry<'state>.CreateEmpty()) with + let getElseAddEmptyEntry key policy = + match addOrGet key policy (CacheEntry<'state>.CreateEmpty()) with | Ok fresh -> fresh | Error existingEntry -> existingEntry let addOrMergeCacheEntry isStale key options timestamp struct (token, state) = @@ -97,37 +69,36 @@ type Cache private (inner: System.Runtime.Caching.MemoryCache) = let config = System.Collections.Specialized.NameValueCollection(1) config.Add("cacheMemoryLimitMegabytes", string sizeMb); Cache(new System.Runtime.Caching.MemoryCache(name, config)) - interface ICache with - // if there's a non-zero maxAge, concurrent read attempts share the roundtrip (and its fate, if it throws) - member _.Load(key, maxAge, isStale, options, loadOrReload) = task { - let loadOrReload maybeBaseState () = task { - let act = System.Diagnostics.Activity.Current - if act <> null then act.AddCacheHit(ValueOption.isSome maybeBaseState) |> ignore - let ts = System.Diagnostics.Stopwatch.GetTimestamp() - let! res = loadOrReload maybeBaseState - return struct (ts, res) } - if maxAge = TimeSpan.Zero then // Boring algorithm that has each caller independently load/reload the data and then cache it - let maybeBaseState = tryLoad key - let! timestamp, res = loadOrReload maybeBaseState () - addOrMergeCacheEntry isStale key options timestamp res - return res - else // ensure we have an entry in the cache for this key; coordinate retrieval through that - let cacheSlot = getElseAddEmptyEntry key options - return! cacheSlot.ReadThrough(maxAge, isStale, loadOrReload) } - // Newer values get saved; equal values update the last retrieval timestamp - member _.Save(key, isStale, options, timestamp, token, state) = - addOrMergeCacheEntry isStale key options timestamp (token, state) + // if there's a non-zero maxAge, concurrent read attempts share the roundtrip (and its fate, if it throws) + member _.Load(key, maxAge, isStale, policy, loadOrReload) = task { + let loadOrReload maybeBaseState () = task { + let act = System.Diagnostics.Activity.Current + if act <> null then act.AddCacheHit(ValueOption.isSome maybeBaseState) |> ignore + let ts = System.Diagnostics.Stopwatch.GetTimestamp() + let! res = loadOrReload maybeBaseState + return struct (ts, res) } + if maxAge = TimeSpan.Zero then // Boring algorithm that has each caller independently load/reload the data and then cache it + let maybeBaseState = tryLoad key + let! timestamp, res = loadOrReload maybeBaseState () + addOrMergeCacheEntry isStale key policy timestamp res + return res + else // ensure we have an entry in the cache for this key; coordinate retrieval through that + let cacheSlot = getElseAddEmptyEntry key policy + return! cacheSlot.ReadThrough(maxAge, isStale, loadOrReload) } + // Newer values get saved; equal values update the last retrieval timestamp + member _.Save(key, isStale, policy, timestamp, token, state) = + addOrMergeCacheEntry isStale key policy timestamp (token, state) type [] CachingStrategy = /// Retain a single 'state per streamName. /// Each cache hit for a stream renews the retention period for the defined window. /// Upon expiration of the defined window from the point at which the cache was entry was last used, a full reload is triggered. /// Unless a LoadOption is used, cache hits still incur a roundtrip to load any subsequently-added events. - | SlidingWindow of ICache * window: TimeSpan + | SlidingWindow of Cache * window: TimeSpan /// Retain a single 'state per streamName. /// Upon expiration of the defined period, a full reload is triggered. /// Unless a LoadOption is used, cache hits still incur a roundtrip to load any subsequently-added events. - | FixedTimeSpan of ICache * period: TimeSpan + | FixedTimeSpan of Cache * period: TimeSpan /// Prefix is used to segregate multiple folded states per stream when they are stored in the cache. /// Semantics are otherwise identical to SlidingWindow. - | SlidingWindowPrefixed of ICache * window: TimeSpan * prefix: string + | SlidingWindowPrefixed of Cache * window: TimeSpan * prefix: string diff --git a/src/Equinox.Core/Caching.fs b/src/Equinox.Core/Caching.fs index 081a43682..1e7f57097 100644 --- a/src/Equinox.Core/Caching.fs +++ b/src/Equinox.Core/Caching.fs @@ -15,7 +15,7 @@ let private tee f (inner: CancellationToken -> Task ICategory<'event, 'state, 'context> and 'cat :> IReloadable<'state> > - (category: 'cat, cache: ICache, isStale, createKey, createOptions) = + (category: 'cat, cache: Equinox.Cache, isStale, createKey, createOptions) = interface ICategory<'event, 'state, 'context> with member _.Load(log, categoryName, streamId, streamName, maxAge, requireLeader, ct) = task { let loadOrReload = function @@ -35,16 +35,15 @@ type private Decorator<'event, 'state, 'context, 'cat when 'cat :> ICategory<'ev let private mkKey prefix streamName = prefix + streamName -let private optionsSlidingExpiration (slidingExpiration: TimeSpan) () = - CacheItemOptions.RelativeExpiration slidingExpiration -let private optionsFixedTimeSpan (period: TimeSpan) () = +let private policySlidingExpiration (slidingExpiration: TimeSpan) () = + System.Runtime.Caching.CacheItemPolicy(SlidingExpiration = slidingExpiration) +let private policyFixedTimeSpan (period: TimeSpan) () = let expirationPoint = let creationDate = DateTimeOffset.UtcNow in creationDate.Add period - CacheItemOptions.AbsoluteExpiration expirationPoint - + System.Runtime.Caching.CacheItemPolicy(AbsoluteExpiration = expirationPoint) let private mapStrategy = function - | Equinox.CachingStrategy.FixedTimeSpan (cache, period) -> struct ( cache, mkKey null, optionsFixedTimeSpan period) - | Equinox.CachingStrategy.SlidingWindow (cache, window) -> cache, mkKey null, optionsSlidingExpiration window - | Equinox.CachingStrategy.SlidingWindowPrefixed (cache, window, prefix) -> cache, mkKey prefix, optionsSlidingExpiration window + | Equinox.CachingStrategy.FixedTimeSpan (cache, period) -> struct ( cache, mkKey null, policyFixedTimeSpan period) + | Equinox.CachingStrategy.SlidingWindow (cache, window) -> cache, mkKey null, policySlidingExpiration window + | Equinox.CachingStrategy.SlidingWindowPrefixed (cache, window, prefix) -> cache, mkKey prefix, policySlidingExpiration window let apply isStale x (cat: 'cat when 'cat :> ICategory<'event, 'state, 'context> and 'cat :> IReloadable<'state>): ICategory<_, _, _> = match x with diff --git a/src/Equinox.CosmosStore/CosmosStore.fs b/src/Equinox.CosmosStore/CosmosStore.fs index 466f0a909..f9663965f 100644 --- a/src/Equinox.CosmosStore/CosmosStore.fs +++ b/src/Equinox.CosmosStore/CosmosStore.fs @@ -1331,12 +1331,12 @@ type CachingStrategy = /// Unless LoadOption.AnyCachedValue or AllowStale are used, cache hits still incurs an etag-contingent Tip read (at a cost of a roundtrip with a 1RU charge if unmodified). // NB while a strategy like EventStore.Caching.SlidingWindowPrefixed is obviously easy to implement, the recommended approach is to // track all relevant data in the state, and/or have the `unfold` function ensure _all_ relevant events get held in the `u`nfolds in Tip - | SlidingWindow of ICache * window: TimeSpan + | SlidingWindow of Equinox.Cache * window: TimeSpan /// Retain a single 'state per streamName, together with the associated etag. /// Upon expiration of the defined period, a full reload is triggered. /// Typically combined with an `Equinox.LoadOption` to minimize loads. /// Unless LoadOption.AnyCachedValue or AllowStale are used, cache hits still incurs an etag-contingent Tip read (at a cost of a roundtrip with a 1RU charge if unmodified). - | FixedTimeSpan of ICache * period: TimeSpan + | FixedTimeSpan of Equinox.Cache * period: TimeSpan [] type AccessStrategy<'event, 'state> = diff --git a/src/Equinox.DynamoStore/DynamoStore.fs b/src/Equinox.DynamoStore/DynamoStore.fs index 6306158fc..7560f6fe6 100644 --- a/src/Equinox.DynamoStore/DynamoStore.fs +++ b/src/Equinox.DynamoStore/DynamoStore.fs @@ -1287,12 +1287,12 @@ type CachingStrategy = /// Unless a LoadOption is used, each cache hit still involves a read roundtrip (RU charges incurred, transport latency) though deserialization is skipped due to etag match // NB while a strategy like EventStore.Caching.SlidingWindowPrefixed is obviously easy to implement, the recommended approach is to // track all relevant data in the state, and/or have the `unfold` function ensure _all_ relevant events get held in the unfolds in Tip - | SlidingWindow of ICache * window: TimeSpan + | SlidingWindow of Equinox.Cache * window: TimeSpan /// Retain a single 'state per streamName, together with the associated etag. /// Upon expiration of the defined period, a full reload is triggered. /// Typically combined with an `Equinox.LoadOption` to minimize loads. /// Unless a LoadOption is used, each cache hit still involves a read roundtrip (RU charges incurred, transport latency) though deserialization is skipped due to etag match - | FixedTimeSpan of ICache * period: TimeSpan + | FixedTimeSpan of Equinox.Cache * period: TimeSpan [] type AccessStrategy<'event, 'state> =