Skip to content

Commit

Permalink
Fixed styling issues according to code review.
Browse files Browse the repository at this point in the history
  • Loading branch information
DSilence committed Oct 13, 2019
1 parent 0986670 commit 4f96845
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 46 deletions.
20 changes: 9 additions & 11 deletions src/Equinox.Core/Cache.fs
Original file line number Diff line number Diff line change
Expand Up @@ -25,34 +25,32 @@ type ICache =
abstract member TryGet: key: string -> Async<(StreamToken * 'state) option>

namespace Equinox

open System.Runtime.Caching
open Equinox.Core

type Cache(name, sizeMb : int) =
let cache =
let config = System.Collections.Specialized.NameValueCollection(1)
config.Add("cacheMemoryLimitMegabytes", string sizeMb);
new MemoryCache(name, config)

let getPolicy (cacheItemOption: CacheItemOptions)=
let getPolicy (cacheItemOption: CacheItemOptions) =
match cacheItemOption with
| AbsoluteExpiration absolute -> new CacheItemPolicy(AbsoluteExpiration = absolute)
| RelativeExpiration relative -> new CacheItemPolicy(SlidingExpiration = relative)

interface ICache with

member this.UpdateIfNewer cacheItemOptions key entry =
member this.UpdateIfNewer cacheItemOptions key entry = async {
let policy = getPolicy cacheItemOptions
match cache.AddOrGetExisting(key, box entry, policy) with
| null ->
async.Return ()
| null -> ()
| :? CacheEntry<'state> as existingEntry -> existingEntry.UpdateIfNewer entry
async.Return ()
| x -> failwithf "UpdateIfNewer Incompatible cache entry %A" x
| x -> failwithf "UpdateIfNewer Incompatible cache entry %A" x }

member this.TryGet key =
async.Return (
member this.TryGet key = async {
return
match cache.Get key with
| null -> None
| :? CacheEntry<'state> as existingEntry -> Some existingEntry.Value
| x -> failwithf "TryGet Incompatible cache entry %A" x
)
| x -> failwithf "TryGet Incompatible cache entry %A" x }
2 changes: 1 addition & 1 deletion src/Equinox.Core/Equinox.Core.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

<PackageReference Include="FSharp.Core" Version="3.1.2.5" Condition=" '$(TargetFramework)' != 'netstandard2.0' " />
<PackageReference Include="FSharp.Core" Version="4.3.4" Condition=" '$(TargetFramework)' == 'netstandard2.0' " />
<PackageReference Include="System.Runtime.Caching" Version="4.6.0" />
<PackageReference Include="System.Runtime.Caching" Version="4.6.0" Condition=" '$(TargetFramework)' == 'netstandard2.0' "/>
</ItemGroup>

</Project>
31 changes: 13 additions & 18 deletions src/Equinox.Cosmos/Cosmos.fs
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,6 @@ open Microsoft.Azure.Documents
open Serilog
open System
open System.Collections.Concurrent
open System.Runtime.Caching

/// Defines policies for retrying with respect to transient failures calling CosmosDb (as opposed to application level concurrency conflicts)
type Connection(client: Client.DocumentClient, [<O; D(null)>]?readRetryPolicy: IRetryPolicy, [<O; D(null)>]?writeRetryPolicy) =
Expand Down Expand Up @@ -893,21 +892,19 @@ module Caching =
let! syncRes = inner.TrySync log (streamToken, state) events
match syncRes with
| SyncResult.Conflict resync -> return SyncResult.Conflict(interceptAsync resync stream)
| SyncResult.Written(token', state')
->
let! intercepted = intercept stream (token', state')
return SyncResult.Written(intercepted) }

| SyncResult.Written(token', state') ->
let! intercepted = intercept stream (token', state')
return SyncResult.Written(intercepted) }

let applyCacheUpdatesWithSlidingExpiration
(cache: ICache)
(prefix: string)
(slidingExpiration : TimeSpan)
(category: ICategory<'event, 'state, Container*string>)
: ICategory<'event, 'state, Container*string> =
let cacheEntryGenerator (initialToken: StreamToken, initialState: 'state) = new CacheEntry<'state>(initialToken, initialState, Token.supersedes)
let mkCacheEntry (initialToken: StreamToken, initialState: 'state) = new CacheEntry<'state>(initialToken, initialState, Token.supersedes)
let policy = CacheItemOptions.RelativeExpiration(slidingExpiration)
let addOrUpdateSlidingExpirationCacheEntry streamName = cacheEntryGenerator >> cache.UpdateIfNewer policy (prefix + streamName)
let addOrUpdateSlidingExpirationCacheEntry streamName = mkCacheEntry >> cache.UpdateIfNewer policy (prefix + streamName)
CategoryTee<'event,'state>(category, addOrUpdateSlidingExpirationCacheEntry) :> _

type private Folder<'event, 'state>
Expand All @@ -918,17 +915,15 @@ type private Folder<'event, 'state>
let inspectUnfolds = match mapUnfolds with Choice1Of3 () -> false | _ -> true
interface ICategory<'event, 'state, Container*string> with
member __.Load containerStream (log : ILogger): Async<StreamToken * 'state> = async {
let! batched = category.Load inspectUnfolds containerStream fold initial isOrigin log
let cached tokenAndState = category.LoadFromToken tokenAndState fold isOrigin log
match readCache with
let! batched = category.Load inspectUnfolds containerStream fold initial isOrigin log
let cached tokenAndState = category.LoadFromToken tokenAndState fold isOrigin log
match readCache with
| None -> return batched
| Some (cache : ICache, prefix : string) ->
match! cache.TryGet(prefix + snd containerStream) with
| None -> return batched
| Some (cache : ICache, prefix : string) ->
let! cacheItem = cache.TryGet(prefix + snd containerStream)
match cacheItem with
| None -> return batched
| Some tokenAndState -> return! cached tokenAndState
}
member __.TrySync (log : ILogger) (streamToken,state) (events : 'event list)
| Some tokenAndState -> return! cached tokenAndState }
member __.TrySync (log : ILogger) (streamToken,state) (events : 'event list)
: Async<SyncResult<'state>> = async {
let! res = category.Sync((streamToken,state), events, mapUnfolds, fold, isOrigin, log)
match res with
Expand Down
27 changes: 11 additions & 16 deletions src/Equinox.EventStore/EventStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -444,8 +444,7 @@ type private Category<'event, 'state>(context : Context, codec : FsCodec.IUnionE
module Caching =
/// Forwards all state changes in all streams of an ICategory to a `tee` function
type CategoryTee<'event, 'state>(inner: ICategory<'event, 'state, string>, tee : string -> StreamToken * 'state -> Async<unit>) =
let intercept streamName tokenAndState =
async{
let intercept streamName tokenAndState = async {
let! _ = tee streamName tokenAndState
return tokenAndState
}
Expand All @@ -461,31 +460,27 @@ module Caching =
| SyncResult.Conflict resync -> return SyncResult.Conflict (interceptAsync resync stream.name)
| SyncResult.Written (token', state') -> return SyncResult.Written (token', state') }


let applyCacheUpdatesWithSlidingExpiration
(cache: ICache)
(prefix: string)
(slidingExpiration : TimeSpan)
(category: ICategory<'event, 'state, string>)
: ICategory<'event, 'state, string> =
let cacheEntryGenerator (initialToken: StreamToken, initialState: 'state) = new CacheEntry<'state>(initialToken, initialState, Token.supersedes)
let mkCacheEntry (initialToken: StreamToken, initialState: 'state) = new CacheEntry<'state>(initialToken, initialState, Token.supersedes)
let policy = CacheItemOptions.RelativeExpiration(slidingExpiration)
let addOrUpdateSlidingExpirationCacheEntry streamName = cacheEntryGenerator >> cache.UpdateIfNewer policy (prefix + streamName)
let addOrUpdateSlidingExpirationCacheEntry streamName = mkCacheEntry >> cache.UpdateIfNewer policy (prefix + streamName)
CategoryTee<'event,'state>(category, addOrUpdateSlidingExpirationCacheEntry) :> _

type private Folder<'event, 'state>(category : Category<'event, 'state>, fold: 'state -> 'event seq -> 'state, initial: 'state, ?readCache) =
let loadAlgorithm streamName initial log =
async {
let! batched = category.Load fold initial streamName log
let cached token state = category.LoadFromToken fold state streamName token log
match readCache with
let loadAlgorithm streamName initial log = async {
let! batched = category.Load fold initial streamName log
let cached token state = category.LoadFromToken fold state streamName token log
match readCache with
| None -> return batched
| Some (cache : ICache, prefix : string) ->
match! cache.TryGet(prefix + streamName) with
| None -> return batched
| Some (cache : ICache, prefix : string) ->
let! cacheItem = cache.TryGet(prefix + streamName)
match cacheItem with
| None -> return batched
| Some (token, state) -> return! cached token state
}
| Some (token, state) -> return! cached token state }
interface ICategory<'event, 'state, string> with
member __.Load (streamName : string) (log : ILogger) : Async<StreamToken * 'state> =
loadAlgorithm streamName initial log
Expand Down

0 comments on commit 4f96845

Please sign in to comment.