From c343cb5b34deb43570fe22c16af54e03cdecddf4 Mon Sep 17 00:00:00 2001 From: Petr Fedchenkov Date: Fri, 29 Sep 2023 11:08:44 +0300 Subject: [PATCH] Implement prune command In case of state store and load functions implemented we may prune objects in Adam to keep the logic to be as fast as possible. Signed-off-by: Petr Fedchenkov --- cmd/edenClean.go | 20 ++++++++++- cmd/root.go | 1 + pkg/controller/adam/adam.go | 65 ++++++++++++++++++++++++++++++++++++ pkg/controller/controller.go | 7 +++- pkg/openevec/eden.go | 45 +++++++++++++++++++++++++ 5 files changed, 136 insertions(+), 2 deletions(-) diff --git a/cmd/edenClean.go b/cmd/edenClean.go index 8e6d624f2..67db3dbc7 100644 --- a/cmd/edenClean.go +++ b/cmd/edenClean.go @@ -23,7 +23,7 @@ func newCleanCmd(configName, verbosity *string) *cobra.Command { PersistentPreRunE: preRunViperLoadFunction(cfg, configName, verbosity), Run: func(cmd *cobra.Command, args []string) { if err := openEVEC.EdenClean(*configName, configDist, vmName, currentContext); err != nil { - log.Fatalf("Setup eden failed: %s", err) + log.Fatalf("Clean eden failed: %s", err) } }, } @@ -54,3 +54,21 @@ func newCleanCmd(configName, verbosity *string) *cobra.Command { return cleanCmd } + +func newPruneCmd(configName, verbosity *string) *cobra.Command { + cfg := &openevec.EdenSetupArgs{} + + var pruneCmd = &cobra.Command{ + Use: "prune", + Short: "prune stored objects from the controller. Please save them before.", + Long: `Prune stored objects from the controller. Please save them before.`, + PersistentPreRunE: preRunViperLoadFunction(cfg, configName, verbosity), + Run: func(cmd *cobra.Command, args []string) { + if err := openEVEC.EdenPrune(); err != nil { + log.Fatalf("Prune eden failed: %s", err) + } + }, + } + + return pruneCmd +} diff --git a/cmd/root.go b/cmd/root.go index 864735a10..f575635e4 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -36,6 +36,7 @@ func NewEdenCommand() *cobra.Command { newCleanCmd(&configName, &verbosity), newConfigCmd(&configName, &verbosity), newSdnCmd(&configName, &verbosity), + newPruneCmd(&configName, &verbosity), }, }, { diff --git a/pkg/controller/adam/adam.go b/pkg/controller/adam/adam.go index 5f49b8eae..12147e65f 100644 --- a/pkg/controller/adam/adam.go +++ b/pkg/controller/adam/adam.go @@ -1,6 +1,7 @@ package adam import ( + "context" "encoding/json" "fmt" "net/url" @@ -10,6 +11,7 @@ import ( "strings" "time" + "github.com/go-redis/redis/v9" "github.com/lf-edge/eden/pkg/controller/cachers" "github.com/lf-edge/eden/pkg/controller/eapps" "github.com/lf-edge/eden/pkg/controller/eflowlog" @@ -405,3 +407,66 @@ func (adam *Ctx) GetGlobalOptions() (*types.GlobalOptions, error) { } return &globalOptions, nil } + +func (adam *Ctx) cleanRedisStream(stream string) error { + addr, password, databaseID, err := parseRedisURL(adam.AdamRedisURLEden) + if err != nil { + return err + } + client := redis.NewClient(&redis.Options{ + Addr: addr, + Password: password, + DB: databaseID, + MaxRetries: defaults.DefaultRepeatCount, + MinRetryBackoff: defaults.DefaultRepeatTimeout / 2, + MaxRetryBackoff: defaults.DefaultRepeatTimeout * 2, + }) + n, err := client.XTrimMaxLenApprox(context.TODO(), stream, 0, 0).Result() + log.Debugf("XTrimMaxLenApprox(%s): %d", stream, n) + return err +} + +// CleanInfo removes all info messages of device from controller +func (adam *Ctx) CleanInfo(devUUID uuid.UUID) (err error) { + if adam.AdamRemoteRedis { + stream := adam.getInfoRedisStream(devUUID) + return adam.cleanRedisStream(stream) + } + panic("implement me") +} + +// CleanMetrics removes all metric messages of device from controller +func (adam *Ctx) CleanMetrics(devUUID uuid.UUID) (err error) { + if adam.AdamRemoteRedis { + stream := adam.getMetricsRedisStream(devUUID) + return adam.cleanRedisStream(stream) + } + panic("implement me") +} + +// CleanLogs removes all logs messages of device from controller +func (adam *Ctx) CleanLogs(devUUID uuid.UUID) (err error) { + if adam.AdamRemoteRedis { + stream := adam.getLogsRedisStream(devUUID) + return adam.cleanRedisStream(stream) + } + panic("implement me") +} + +// CleanFlowLogs removes all flow logs messages of device from controller +func (adam *Ctx) CleanFlowLogs(devUUID uuid.UUID) (err error) { + if adam.AdamRemoteRedis { + stream := adam.getFlowLogRedisStream(devUUID) + return adam.cleanRedisStream(stream) + } + panic("implement me") +} + +// CleanAppLogs removes all app logs messages of app of device from controller +func (adam *Ctx) CleanAppLogs(devUUID uuid.UUID, appUUID uuid.UUID) (err error) { + if adam.AdamRemoteRedis { + stream := adam.getAppsLogsRedisStream(devUUID, appUUID) + return adam.cleanRedisStream(stream) + } + panic("implement me") +} diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 404ed6dbd..0c1ff8606 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -15,7 +15,7 @@ import ( uuid "github.com/satori/go.uuid" ) -//Controller is an interface of controller +// Controller is an interface of controller type Controller interface { CertsGet(devUUID uuid.UUID) (out string, err error) ConfigGet(devUUID uuid.UUID) (out string, err error) @@ -28,6 +28,11 @@ type Controller interface { FlowLogLastCallback(devUUID uuid.UUID, q map[string]string, handler eflowlog.HandlerFunc) (err error) InfoChecker(devUUID uuid.UUID, q map[string]string, handler einfo.HandlerFunc, mode einfo.InfoCheckerMode, timeout time.Duration) (err error) InfoLastCallback(devUUID uuid.UUID, q map[string]string, handler einfo.HandlerFunc) (err error) + CleanInfo(devUUID uuid.UUID) (err error) + CleanMetrics(devUUID uuid.UUID) (err error) + CleanLogs(devUUID uuid.UUID) (err error) + CleanFlowLogs(devUUID uuid.UUID) (err error) + CleanAppLogs(devUUID uuid.UUID, appUUID uuid.UUID) (err error) MetricChecker(devUUID uuid.UUID, q map[string]string, handler emetric.HandlerFunc, mode emetric.MetricCheckerMode, timeout time.Duration) (err error) MetricLastCallback(devUUID uuid.UUID, q map[string]string, handler emetric.HandlerFunc) (err error) RequestLastCallback(devUUID uuid.UUID, q map[string]string, handler erequest.HandlerFunc) (err error) diff --git a/pkg/openevec/eden.go b/pkg/openevec/eden.go index d42dcc26f..9fb9e8189 100644 --- a/pkg/openevec/eden.go +++ b/pkg/openevec/eden.go @@ -23,6 +23,7 @@ import ( "github.com/lf-edge/eden/pkg/controller/types" "github.com/lf-edge/eden/pkg/defaults" "github.com/lf-edge/eden/pkg/eden" + "github.com/lf-edge/eden/pkg/eve" "github.com/lf-edge/eden/pkg/models" "github.com/lf-edge/eden/pkg/utils" "github.com/lf-edge/eve/api/go/flowlog" @@ -827,6 +828,50 @@ func (openEVEC *OpenEVEC) EdenImport(tarFile string, rewriteRoot bool) error { return nil } +// EdenPrune removes data from the controller +// +//nolint:cyclop +func (openEVEC *OpenEVEC) EdenPrune() error { + changer := &adamChanger{} + ctrl, dev, err := changer.getControllerAndDevFromConfig(openEVEC.cfg) + if err != nil { + return fmt.Errorf("getControllerAndDevFromConfig: %w", err) + } + state := eve.Init(ctrl, dev) + if err := ctrl.InfoLastCallback(dev.GetID(), nil, state.InfoCallback()); err != nil { + return fmt.Errorf("fail in get InfoLastCallback: %w", err) + } + if err := ctrl.MetricLastCallback(dev.GetID(), nil, state.MetricCallback()); err != nil { + return fmt.Errorf("fail in get MetricLastCallback: %w", err) + } + err = state.Store() + if err != nil { + return fmt.Errorf("state.Store: %w", err) + } + if err := ctrl.CleanInfo(dev.GetID()); err != nil { + return fmt.Errorf("fail in ctrl CleanInfo: %w", err) + } + if err := ctrl.CleanMetrics(dev.GetID()); err != nil { + return fmt.Errorf("fail in ctrl CleanMetrics: %w", err) + } + if err := ctrl.CleanLogs(dev.GetID()); err != nil { + return fmt.Errorf("fail in ctrl CleanLogs: %w", err) + } + if err := ctrl.CleanFlowLogs(dev.GetID()); err != nil { + return fmt.Errorf("fail in ctrl CleanFlowLogs: %w", err) + } + for _, el := range dev.GetApplicationInstances() { + appUUID, err := uuid.FromString(el) + if err != nil { + return err + } + if err := ctrl.CleanAppLogs(dev.GetID(), appUUID); err != nil { + return fmt.Errorf("fail in ctrl CleanAppLogs: %w", err) + } + } + return nil +} + // ParseTemplateFile fills EdenSetupArgs variable into template stored in file and writes result to io.Writer func ParseTemplateFile(path string, cfg EdenSetupArgs, w io.Writer) error { t, err := ioutil.ReadFile(path)