Skip to content

Commit

Permalink
Facts cache (#325)
Browse files Browse the repository at this point in the history
* Add facts cache package

* Add FactGathererWithCache interface

* Use cache in cibadmin gatherer

* Add a pure GetOrUpdate funcion to cache pkg

* Memoize the updateFunc

* Improve GetOrUpdate function docstring
  • Loading branch information
arbulu89 authored Mar 7, 2024
1 parent 1564aea commit a6d804e
Show file tree
Hide file tree
Showing 8 changed files with 387 additions and 8 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/trento-project/contracts/go v0.0.0-20230823130307-95ed2147fa9d
github.com/vektra/mockery/v2 v2.40.1
github.com/wagslane/go-rabbitmq v0.10.0
golang.org/x/sync v0.4.0
golang.org/x/sync v0.6.0
google.golang.org/protobuf v1.31.0
)

Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,8 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ=
golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
102 changes: 102 additions & 0 deletions internal/factsengine/factscache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package factscache

import (
"sync"

"golang.org/x/sync/singleflight"

log "github.com/sirupsen/logrus"
)

type UpdateCacheFunc func(args ...interface{}) (interface{}, error)

type FactsCache struct {
entries sync.Map
group singleflight.Group
}

type Entry struct {
content interface{}
err error
}

func NewFactsCache() *FactsCache {
return &FactsCache{
entries: sync.Map{},
group: singleflight.Group{},
}
}

// GetOrUpdate Runs FactsCache GetOrUpdate with a provided cache
// If the cache is nil, it runs the function, otherwise it returns
// from cache
func GetOrUpdate(
cache *FactsCache,
entry string,
udpateFunc UpdateCacheFunc,
updateFuncArgs ...interface{},
) (interface{}, error) {
if cache == nil {
return udpateFunc(updateFuncArgs...)
}

return cache.GetOrUpdate(
entry,
udpateFunc,
updateFuncArgs...,
)
}

// Entries returns the cached entries list
func (c *FactsCache) Entries() []string {
keys := []string{}
c.entries.Range(func(key, _ any) bool {
// nolint:forcetypeassert
keys = append(keys, key.(string))
return true
})
return keys
}

// GetOrUpdate returns the cached result providing an entry name
// or runs the updateFunc to generate the entry.
// It locks its usage for each used key, returning the same value of the
// first execution in the additional usages.
// If other function with a different key is asked, it runs in parallel
// without blocking.
func (c *FactsCache) GetOrUpdate(
entry string,
udpateFunc UpdateCacheFunc,
updateFuncArgs ...interface{},
) (interface{}, error) {
loadedEntry, hit := c.entries.Load(entry)
if hit {
// nolint:forcetypeassert
cacheEntry := loadedEntry.(Entry)
log.Debugf("Value for entry %s already cached", entry)
return cacheEntry.content, cacheEntry.err
}

// singleflight is used to avoid a duplicated function execution at
// the same moment for a given key (memoization).
// This way, the code only blocks the execution based on same keys,
// not blocking other keys execution
content, err, _ := c.group.Do(entry, func() (interface{}, error) {
content, err := udpateFunc(updateFuncArgs...)
newEntry := Entry{
content: content,
err: err,
}
c.entries.Store(entry, newEntry)

return content, err
})

if err != nil {
log.Debugf("New value with error set for entry %s", entry)
return content, err
}

log.Debugf("New value for entry %s set", entry)
return content, err
}
173 changes: 173 additions & 0 deletions internal/factsengine/factscache/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package factscache_test

import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/suite"
"github.com/trento-project/agent/internal/factsengine/factscache"
"golang.org/x/sync/errgroup"
)

type FactsCacheTestSuite struct {
suite.Suite
returnValue string
count int
}

func TestFactsCacheTestSuite(t *testing.T) {
suite.Run(t, new(FactsCacheTestSuite))
}

func (suite *FactsCacheTestSuite) SetupSuite() {
suite.returnValue = "value"
}

func (suite *FactsCacheTestSuite) SetupTest() {
suite.count = 0
}

// nolint:errcheck
func (suite *FactsCacheTestSuite) TestEntries() {
cache := factscache.NewFactsCache()
cache.GetOrUpdate("entry1", func(args ...interface{}) (interface{}, error) {
return "", nil
})
cache.GetOrUpdate("entry2", func(args ...interface{}) (interface{}, error) {
return "", nil
})
entries := cache.Entries()

suite.ElementsMatch([]string{"entry1", "entry2"}, entries)
}

func (suite *FactsCacheTestSuite) TestGetOrUpdate() {
cache := factscache.NewFactsCache()

updateFunc := func(args ...interface{}) (interface{}, error) {
return suite.returnValue, nil
}

value, err := cache.GetOrUpdate("entry1", updateFunc)

suite.Equal(suite.returnValue, value)
suite.NoError(err)
}

func (suite *FactsCacheTestSuite) TestGetOrUpdateWithError() {
cache := factscache.NewFactsCache()
someError := "some error"

updateFunc := func(args ...interface{}) (interface{}, error) {
return nil, fmt.Errorf(someError)
}

_, err := cache.GetOrUpdate("entry", updateFunc)

suite.EqualError(err, someError)
}

func (suite *FactsCacheTestSuite) TestGetOrUpdateCacheHit() {
cache := factscache.NewFactsCache()

updateFunc := func(args ...interface{}) (interface{}, error) {
suite.count++
return suite.returnValue, nil
}

// nolint:errcheck
cache.GetOrUpdate("entry", updateFunc)
value, err := cache.GetOrUpdate("entry", updateFunc)

suite.Equal(suite.returnValue, value)
suite.Equal(1, suite.count)
suite.NoError(err)
}

func (suite *FactsCacheTestSuite) TestGetOrUpdateWithArgs() {
cache := factscache.NewFactsCache()

// nolint:forcetypeassert
updateFunc := func(args ...interface{}) (interface{}, error) {
arg1 := args[0].(int)
arg2 := args[1].(string)
return fmt.Sprintf("%d_%s", arg1, arg2), nil
}

value, err := cache.GetOrUpdate("entry", updateFunc, 1, "text")

suite.Equal("1_text", value)
suite.NoError(err)
}

// nolint:errcheck
func (suite *FactsCacheTestSuite) TestGetOrUpdateCacheConcurrent() {
cache := factscache.NewFactsCache()
g := errgroup.Group{}

updateFunc := func(args ...interface{}) (interface{}, error) {
value, _ := args[0].(string)
time.Sleep(100 * time.Millisecond)
return value, nil
}

g.Go(func() error {
value, _ := cache.GetOrUpdate("entry1", updateFunc, "initialValueEntry1")
castedValue, _ := value.(string)
suite.Equal("initialValueEntry1", castedValue)
return nil
})
g.Go(func() error {
value, _ := cache.GetOrUpdate("entry2", updateFunc, "initialValueEntry2")
castedValue, _ := value.(string)
suite.Equal("initialValueEntry2", castedValue)
return nil
})
time.Sleep(50 * time.Millisecond)
// The next 2 calls return the memoized value
g.Go(func() error {
value, _ := cache.GetOrUpdate("entry1", updateFunc, "newValueEntry1")
castedValue, _ := value.(string)
suite.Equal("initialValueEntry1", castedValue)
return nil
})

g.Go(func() error {
value, _ := cache.GetOrUpdate("entry2", updateFunc, "newValueEntry2")
castedValue, _ := value.(string)
suite.Equal("initialValueEntry2", castedValue)
return nil
})
g.Wait()
}

func (suite *FactsCacheTestSuite) TestPureGetOrUpdate() {
updateFunc := func(args ...interface{}) (interface{}, error) {
suite.count++
return suite.returnValue, nil
}

value, err := factscache.GetOrUpdate(nil, "entry1", updateFunc)

suite.Equal(suite.returnValue, value)
suite.Equal(1, suite.count)
suite.NoError(err)
}

func (suite *FactsCacheTestSuite) TestPureGetOrUpdateCacheHit() {
cache := factscache.NewFactsCache()

updateFunc := func(args ...interface{}) (interface{}, error) {
suite.count++
return suite.returnValue, nil
}

// nolint:errcheck
factscache.GetOrUpdate(cache, "entry1", updateFunc)
value, err := factscache.GetOrUpdate(cache, "entry1", updateFunc)

suite.Equal(suite.returnValue, value)
suite.Equal(1, suite.count)
suite.NoError(err)
}
30 changes: 26 additions & 4 deletions internal/factsengine/gatherers/cibadmin.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package gatherers

import (
log "github.com/sirupsen/logrus"
"github.com/trento-project/agent/internal/factsengine/factscache"
"github.com/trento-project/agent/pkg/factsengine/entities"
"github.com/trento-project/agent/pkg/utils"
)

const (
CibAdminGathererName = "cibadmin"
CibAdminGathererName = "cibadmin"
CibAdminGathererCache = "cibadmin"
)

// nolint:gochecknoglobals
Expand All @@ -25,26 +27,46 @@ var (

type CibAdminGatherer struct {
executor utils.CommandExecutor
cache *factscache.FactsCache
}

func NewDefaultCibAdminGatherer() *CibAdminGatherer {
return NewCibAdminGatherer(utils.Executor{})
return NewCibAdminGatherer(utils.Executor{}, nil)
}

func NewCibAdminGatherer(executor utils.CommandExecutor) *CibAdminGatherer {
func NewCibAdminGatherer(executor utils.CommandExecutor, cache *factscache.FactsCache) *CibAdminGatherer {
return &CibAdminGatherer{
executor: executor,
cache: cache,
}
}

func (g *CibAdminGatherer) SetCache(cache *factscache.FactsCache) {
g.cache = cache
}

func memoizeCibAdmin(args ...interface{}) (interface{}, error) {
executor, ok := args[0].(utils.CommandExecutor)
if !ok {
return nil, ImplementationError.Wrap("error using memoizeCibAdmin. executor must be 1st argument")
}
return executor.Exec("cibadmin", "--query", "--local")
}

func (g *CibAdminGatherer) Gather(factsRequests []entities.FactRequest) ([]entities.Fact, error) {
log.Infof("Starting %s facts gathering process", CibAdminGathererName)

cibadmin, err := g.executor.Exec("cibadmin", "--query", "--local")
content, err := factscache.GetOrUpdate(g.cache, CibAdminGathererCache, memoizeCibAdmin, g.executor)

if err != nil {
return nil, CibAdminCommandError.Wrap(err.Error())
}

cibadmin, ok := content.([]byte)
if !ok {
return nil, CibAdminDecodingError.Wrap("error casting the command output")
}

elementsToList := map[string]bool{"primitive": true, "clone": true, "master": true, "group": true,
"nvpair": true, "op": true, "rsc_location": true, "rsc_order": true,
"rsc_colocation": true, "cluster_property_set": true, "meta_attributes": true}
Expand Down
Loading

0 comments on commit a6d804e

Please sign in to comment.