diff --git a/go.mod b/go.mod index dc4e6f59..fe3e248d 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/trento-project/contracts/go v0.0.0-20230823130307-95ed2147fa9d github.com/vektra/mockery/v2 v2.40.1 github.com/wagslane/go-rabbitmq v0.10.0 - golang.org/x/sync v0.4.0 + golang.org/x/sync v0.6.0 google.golang.org/protobuf v1.31.0 ) diff --git a/go.sum b/go.sum index c75df715..64bf6c55 100644 --- a/go.sum +++ b/go.sum @@ -371,6 +371,8 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ= golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= +golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= +golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/internal/factsengine/factscache/cache.go b/internal/factsengine/factscache/cache.go new file mode 100644 index 00000000..cdefdfb9 --- /dev/null +++ b/internal/factsengine/factscache/cache.go @@ -0,0 +1,102 @@ +package factscache + +import ( + "sync" + + "golang.org/x/sync/singleflight" + + log "github.com/sirupsen/logrus" +) + +type UpdateCacheFunc func(args ...interface{}) (interface{}, error) + +type FactsCache struct { + entries sync.Map + group singleflight.Group +} + +type Entry struct { + content interface{} + err error +} + +func NewFactsCache() *FactsCache { + return &FactsCache{ + entries: sync.Map{}, + group: singleflight.Group{}, + } +} + +// GetOrUpdate Runs FactsCache GetOrUpdate with a provided cache +// If the cache is nil, it runs the function, otherwise it returns +// from cache +func GetOrUpdate( + cache *FactsCache, + entry string, + udpateFunc UpdateCacheFunc, + updateFuncArgs ...interface{}, +) (interface{}, error) { + if cache == nil { + return udpateFunc(updateFuncArgs...) + } + + return cache.GetOrUpdate( + entry, + udpateFunc, + updateFuncArgs..., + ) +} + +// Entries returns the cached entries list +func (c *FactsCache) Entries() []string { + keys := []string{} + c.entries.Range(func(key, _ any) bool { + // nolint:forcetypeassert + keys = append(keys, key.(string)) + return true + }) + return keys +} + +// GetOrUpdate returns the cached result providing an entry name +// or runs the updateFunc to generate the entry. +// It locks its usage for each used key, returning the same value of the +// first execution in the additional usages. +// If other function with a different key is asked, it runs in parallel +// without blocking. +func (c *FactsCache) GetOrUpdate( + entry string, + udpateFunc UpdateCacheFunc, + updateFuncArgs ...interface{}, +) (interface{}, error) { + loadedEntry, hit := c.entries.Load(entry) + if hit { + // nolint:forcetypeassert + cacheEntry := loadedEntry.(Entry) + log.Debugf("Value for entry %s already cached", entry) + return cacheEntry.content, cacheEntry.err + } + + // singleflight is used to avoid a duplicated function execution at + // the same moment for a given key (memoization). + // This way, the code only blocks the execution based on same keys, + // not blocking other keys execution + content, err, _ := c.group.Do(entry, func() (interface{}, error) { + content, err := udpateFunc(updateFuncArgs...) + newEntry := Entry{ + content: content, + err: err, + } + c.entries.Store(entry, newEntry) + + return content, err + }) + + if err != nil { + log.Debugf("New value with error set for entry %s", entry) + return content, err + } + + log.Debugf("New value for entry %s set", entry) + return content, err +} diff --git a/internal/factsengine/factscache/cache_test.go b/internal/factsengine/factscache/cache_test.go new file mode 100644 index 00000000..b1d888b4 --- /dev/null +++ b/internal/factsengine/factscache/cache_test.go @@ -0,0 +1,173 @@ +package factscache_test + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/suite" + "github.com/trento-project/agent/internal/factsengine/factscache" + "golang.org/x/sync/errgroup" +) + +type FactsCacheTestSuite struct { + suite.Suite + returnValue string + count int +} + +func TestFactsCacheTestSuite(t *testing.T) { + suite.Run(t, new(FactsCacheTestSuite)) +} + +func (suite *FactsCacheTestSuite) SetupSuite() { + suite.returnValue = "value" +} + +func (suite *FactsCacheTestSuite) SetupTest() { + suite.count = 0 +} + +// nolint:errcheck +func (suite *FactsCacheTestSuite) TestEntries() { + cache := factscache.NewFactsCache() + cache.GetOrUpdate("entry1", func(args ...interface{}) (interface{}, error) { + return "", nil + }) + cache.GetOrUpdate("entry2", func(args ...interface{}) (interface{}, error) { + return "", nil + }) + entries := cache.Entries() + + suite.ElementsMatch([]string{"entry1", "entry2"}, entries) +} + +func (suite *FactsCacheTestSuite) TestGetOrUpdate() { + cache := factscache.NewFactsCache() + + updateFunc := func(args ...interface{}) (interface{}, error) { + return suite.returnValue, nil + } + + value, err := cache.GetOrUpdate("entry1", updateFunc) + + suite.Equal(suite.returnValue, value) + suite.NoError(err) +} + +func (suite *FactsCacheTestSuite) TestGetOrUpdateWithError() { + cache := factscache.NewFactsCache() + someError := "some error" + + updateFunc := func(args ...interface{}) (interface{}, error) { + return nil, fmt.Errorf(someError) + } + + _, err := cache.GetOrUpdate("entry", updateFunc) + + suite.EqualError(err, someError) +} + +func (suite *FactsCacheTestSuite) TestGetOrUpdateCacheHit() { + cache := factscache.NewFactsCache() + + updateFunc := func(args ...interface{}) (interface{}, error) { + suite.count++ + return suite.returnValue, nil + } + + // nolint:errcheck + cache.GetOrUpdate("entry", updateFunc) + value, err := cache.GetOrUpdate("entry", updateFunc) + + suite.Equal(suite.returnValue, value) + suite.Equal(1, suite.count) + suite.NoError(err) +} + +func (suite *FactsCacheTestSuite) TestGetOrUpdateWithArgs() { + cache := factscache.NewFactsCache() + + // nolint:forcetypeassert + updateFunc := func(args ...interface{}) (interface{}, error) { + arg1 := args[0].(int) + arg2 := args[1].(string) + return fmt.Sprintf("%d_%s", arg1, arg2), nil + } + + value, err := cache.GetOrUpdate("entry", updateFunc, 1, "text") + + suite.Equal("1_text", value) + suite.NoError(err) +} + +// nolint:errcheck +func (suite *FactsCacheTestSuite) TestGetOrUpdateCacheConcurrent() { + cache := factscache.NewFactsCache() + g := errgroup.Group{} + + updateFunc := func(args ...interface{}) (interface{}, error) { + value, _ := args[0].(string) + time.Sleep(100 * time.Millisecond) + return value, nil + } + + g.Go(func() error { + value, _ := cache.GetOrUpdate("entry1", updateFunc, "initialValueEntry1") + castedValue, _ := value.(string) + suite.Equal("initialValueEntry1", castedValue) + return nil + }) + g.Go(func() error { + value, _ := cache.GetOrUpdate("entry2", updateFunc, "initialValueEntry2") + castedValue, _ := value.(string) + suite.Equal("initialValueEntry2", castedValue) + return nil + }) + time.Sleep(50 * time.Millisecond) + // The next 2 calls return the memoized value + g.Go(func() error { + value, _ := cache.GetOrUpdate("entry1", updateFunc, "newValueEntry1") + castedValue, _ := value.(string) + suite.Equal("initialValueEntry1", castedValue) + return nil + }) + + g.Go(func() error { + value, _ := cache.GetOrUpdate("entry2", updateFunc, "newValueEntry2") + castedValue, _ := value.(string) + suite.Equal("initialValueEntry2", castedValue) + return nil + }) + g.Wait() +} + +func (suite *FactsCacheTestSuite) TestPureGetOrUpdate() { + updateFunc := func(args ...interface{}) (interface{}, error) { + suite.count++ + return suite.returnValue, nil + } + + value, err := factscache.GetOrUpdate(nil, "entry1", updateFunc) + + suite.Equal(suite.returnValue, value) + suite.Equal(1, suite.count) + suite.NoError(err) +} + +func (suite *FactsCacheTestSuite) TestPureGetOrUpdateCacheHit() { + cache := factscache.NewFactsCache() + + updateFunc := func(args ...interface{}) (interface{}, error) { + suite.count++ + return suite.returnValue, nil + } + + // nolint:errcheck + factscache.GetOrUpdate(cache, "entry1", updateFunc) + value, err := factscache.GetOrUpdate(cache, "entry1", updateFunc) + + suite.Equal(suite.returnValue, value) + suite.Equal(1, suite.count) + suite.NoError(err) +} diff --git a/internal/factsengine/gatherers/cibadmin.go b/internal/factsengine/gatherers/cibadmin.go index 921d9090..c2dfddab 100644 --- a/internal/factsengine/gatherers/cibadmin.go +++ b/internal/factsengine/gatherers/cibadmin.go @@ -2,12 +2,14 @@ package gatherers import ( log "github.com/sirupsen/logrus" + "github.com/trento-project/agent/internal/factsengine/factscache" "github.com/trento-project/agent/pkg/factsengine/entities" "github.com/trento-project/agent/pkg/utils" ) const ( - CibAdminGathererName = "cibadmin" + CibAdminGathererName = "cibadmin" + CibAdminGathererCache = "cibadmin" ) // nolint:gochecknoglobals @@ -25,26 +27,46 @@ var ( type CibAdminGatherer struct { executor utils.CommandExecutor + cache *factscache.FactsCache } func NewDefaultCibAdminGatherer() *CibAdminGatherer { - return NewCibAdminGatherer(utils.Executor{}) + return NewCibAdminGatherer(utils.Executor{}, nil) } -func NewCibAdminGatherer(executor utils.CommandExecutor) *CibAdminGatherer { +func NewCibAdminGatherer(executor utils.CommandExecutor, cache *factscache.FactsCache) *CibAdminGatherer { return &CibAdminGatherer{ executor: executor, + cache: cache, } } +func (g *CibAdminGatherer) SetCache(cache *factscache.FactsCache) { + g.cache = cache +} + +func memoizeCibAdmin(args ...interface{}) (interface{}, error) { + executor, ok := args[0].(utils.CommandExecutor) + if !ok { + return nil, ImplementationError.Wrap("error using memoizeCibAdmin. executor must be 1st argument") + } + return executor.Exec("cibadmin", "--query", "--local") +} + func (g *CibAdminGatherer) Gather(factsRequests []entities.FactRequest) ([]entities.Fact, error) { log.Infof("Starting %s facts gathering process", CibAdminGathererName) - cibadmin, err := g.executor.Exec("cibadmin", "--query", "--local") + content, err := factscache.GetOrUpdate(g.cache, CibAdminGathererCache, memoizeCibAdmin, g.executor) + if err != nil { return nil, CibAdminCommandError.Wrap(err.Error()) } + cibadmin, ok := content.([]byte) + if !ok { + return nil, CibAdminDecodingError.Wrap("error casting the command output") + } + elementsToList := map[string]bool{"primitive": true, "clone": true, "master": true, "group": true, "nvpair": true, "op": true, "rsc_location": true, "rsc_order": true, "rsc_colocation": true, "cluster_property_set": true, "meta_attributes": true} diff --git a/internal/factsengine/gatherers/cibadmin_test.go b/internal/factsengine/gatherers/cibadmin_test.go index d76561c5..21c844e1 100644 --- a/internal/factsengine/gatherers/cibadmin_test.go +++ b/internal/factsengine/gatherers/cibadmin_test.go @@ -7,6 +7,7 @@ import ( "testing" "github.com/stretchr/testify/suite" + "github.com/trento-project/agent/internal/factsengine/factscache" "github.com/trento-project/agent/internal/factsengine/gatherers" "github.com/trento-project/agent/pkg/factsengine/entities" utilsMocks "github.com/trento-project/agent/pkg/utils/mocks" @@ -38,7 +39,7 @@ func (suite *CibAdminTestSuite) TestCibAdminGatherCmdNotFound() { suite.mockExecutor.On("Exec", "cibadmin", "--query", "--local").Return( suite.cibAdminOutput, errors.New("cibadmin not found")) - p := gatherers.NewCibAdminGatherer(suite.mockExecutor) + p := gatherers.NewCibAdminGatherer(suite.mockExecutor, nil) factRequests := []entities.FactRequest{ { @@ -59,7 +60,7 @@ func (suite *CibAdminTestSuite) TestCibAdminInvalidXML() { suite.mockExecutor.On("Exec", "cibadmin", "--query", "--local").Return( []byte("invalid"), nil) - p := gatherers.NewCibAdminGatherer(suite.mockExecutor) + p := gatherers.NewCibAdminGatherer(suite.mockExecutor, nil) factRequests := []entities.FactRequest{ { @@ -80,7 +81,7 @@ func (suite *CibAdminTestSuite) TestCibAdminGather() { suite.mockExecutor.On("Exec", "cibadmin", "--query", "--local").Return( suite.cibAdminOutput, nil) - p := gatherers.NewCibAdminGatherer(suite.mockExecutor) + p := gatherers.NewCibAdminGatherer(suite.mockExecutor, nil) factRequests := []entities.FactRequest{ { @@ -206,3 +207,64 @@ func (suite *CibAdminTestSuite) TestCibAdminGather() { suite.NoError(err) suite.ElementsMatch(expectedResults, factResults) } + +func (suite *CibAdminTestSuite) TestCibAdminGatherWithCache() { + suite.mockExecutor.On("Exec", "cibadmin", "--query", "--local"). + Return(suite.cibAdminOutput, nil). + Once() + + cache := factscache.NewFactsCache() + + p := gatherers.NewCibAdminGatherer(suite.mockExecutor, cache) + + factRequests := []entities.FactRequest{ + { + Name: "sid", + Gatherer: "cibadmin", + Argument: "cib.configuration.resources.master.0.primitive.0.instance_attributes.nvpair.0.value", + CheckID: "check1", + }, + } + + expectedResults := []entities.Fact{ + { + Name: "sid", + Value: &entities.FactValueString{Value: "PRD"}, + CheckID: "check1", + }, + } + + factResults, err := p.Gather(factRequests) + suite.NoError(err) + suite.ElementsMatch(expectedResults, factResults) + + _, err = p.Gather(factRequests) + suite.NoError(err) + + entries := cache.Entries() + suite.ElementsMatch([]string{"cibadmin"}, entries) +} + +func (suite *CibAdminTestSuite) TestCibAdminGatherCacheCastingError() { + cache := factscache.NewFactsCache() + _, err := cache.GetOrUpdate("cibadmin", func(args ...interface{}) (interface{}, error) { + return 1, nil + }) + suite.NoError(err) + + p := gatherers.NewCibAdminGatherer(suite.mockExecutor, cache) + + factRequests := []entities.FactRequest{ + { + Name: "sid", + Gatherer: "cibadmin", + Argument: "", + CheckID: "check1", + }, + } + + _, err = p.Gather(factRequests) + + suite.EqualError(err, "fact gathering error: cibadmin-decoding-error - "+ + "error decoding cibadmin output: error casting the command output") +} diff --git a/internal/factsengine/gatherers/gatherer.go b/internal/factsengine/gatherers/gatherer.go index 0b195403..f724b633 100644 --- a/internal/factsengine/gatherers/gatherer.go +++ b/internal/factsengine/gatherers/gatherer.go @@ -1,13 +1,24 @@ package gatherers import ( + "github.com/trento-project/agent/internal/factsengine/factscache" "github.com/trento-project/agent/pkg/factsengine/entities" ) +// nolint:gochecknoglobals +var ImplementationError = entities.FactGatheringError{ + Type: "implemetation-error", + Message: "implementation error", +} + type FactGatherer interface { Gather(factsRequests []entities.FactRequest) ([]entities.Fact, error) } +type FactGathererWithCache interface { + SetCache(cache *factscache.FactsCache) +} + func StandardGatherers() FactGatherersTree { return FactGatherersTree{ CibAdminGathererName: map[string]FactGatherer{ diff --git a/internal/factsengine/gathering.go b/internal/factsengine/gathering.go index a3113617..db784eef 100644 --- a/internal/factsengine/gathering.go +++ b/internal/factsengine/gathering.go @@ -3,6 +3,7 @@ package factsengine import ( "github.com/pkg/errors" log "github.com/sirupsen/logrus" + "github.com/trento-project/agent/internal/factsengine/factscache" "github.com/trento-project/agent/internal/factsengine/gatherers" "github.com/trento-project/agent/pkg/factsengine/entities" "golang.org/x/sync/errgroup" @@ -24,6 +25,7 @@ func gatherFacts( groupedFactsRequest := groupFactsRequestByGatherer(agentFacts) factsCh := make(chan []entities.Fact, len(groupedFactsRequest.FactRequests)) g := new(errgroup.Group) + cache := factscache.NewFactsCache() log.Infof("Starting facts gathering process") @@ -37,6 +39,11 @@ func gatherFacts( continue } + // Check if the gatherer implements FactGathererWithCache to set cache + if gathererWithCache, ok := gatherer.(gatherers.FactGathererWithCache); ok { + gathererWithCache.SetCache(cache) + } + // Execute the fact gathering asynchronously and in parallel g.Go(func() error { var gatheringError *entities.FactGatheringError