Skip to content

Commit

Permalink
Implement prune command
Browse files Browse the repository at this point in the history
In case of state store and load functions implemented we may prune
objects in Adam to keep the logic to be as fast as possible.

Signed-off-by: Petr Fedchenkov <[email protected]>
  • Loading branch information
giggsoff committed Sep 29, 2023
1 parent fdf746e commit f1b4581
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 2 deletions.
20 changes: 19 additions & 1 deletion cmd/edenClean.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func newCleanCmd(configName, verbosity *string) *cobra.Command {
PersistentPreRunE: preRunViperLoadFunction(cfg, configName, verbosity),
Run: func(cmd *cobra.Command, args []string) {
if err := openevec.EdenClean(*cfg, *configName, configDist, vmName, currentContext); err != nil {
log.Fatalf("Setup eden failed: %s", err)
log.Fatalf("Clean eden failed: %s", err)
}
},
}
Expand Down Expand Up @@ -54,3 +54,21 @@ func newCleanCmd(configName, verbosity *string) *cobra.Command {

return cleanCmd
}

func newPruneCmd(configName, verbosity *string) *cobra.Command {
cfg := &openevec.EdenSetupArgs{}

var pruneCmd = &cobra.Command{
Use: "prune",
Short: "prune stored objects from the controller. Please save them before.",
Long: `Prune stored objects from the controller. Please save them before.`,
PersistentPreRunE: preRunViperLoadFunction(cfg, configName, verbosity),
Run: func(cmd *cobra.Command, args []string) {
if err := openevec.EdenPrune(*cfg); err != nil {
log.Fatalf("Prune eden failed: %s", err)
}
},
}

return pruneCmd
}
1 change: 1 addition & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func NewEdenCommand() *cobra.Command {
newCleanCmd(&configName, &verbosity),
newConfigCmd(&configName, &verbosity),
newSdnCmd(&configName, &verbosity),
newPruneCmd(&configName, &verbosity),
},
},
{
Expand Down
65 changes: 65 additions & 0 deletions pkg/controller/adam/adam.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package adam

import (
"context"
"encoding/json"
"fmt"
"net/url"
Expand All @@ -10,6 +11,7 @@ import (
"strings"
"time"

"github.com/go-redis/redis/v9"
"github.com/lf-edge/eden/pkg/controller/cachers"
"github.com/lf-edge/eden/pkg/controller/eapps"
"github.com/lf-edge/eden/pkg/controller/eflowlog"
Expand Down Expand Up @@ -405,3 +407,66 @@ func (adam *Ctx) GetGlobalOptions() (*types.GlobalOptions, error) {
}
return &globalOptions, nil
}

func (adam *Ctx) cleanRedisStream(stream string) error {
addr, password, databaseID, err := parseRedisURL(adam.AdamRedisURLEden)
if err != nil {
return err
}
client := redis.NewClient(&redis.Options{
Addr: addr,
Password: password,

Check failure on line 418 in pkg/controller/adam/adam.go

View workflow job for this annotation

GitHub Actions / yetus

detsecrets:5baa61e4c9b93f3f0682250b6cf8331b7ee68fd8:Secret Keyword
DB: databaseID,
MaxRetries: defaults.DefaultRepeatCount,
MinRetryBackoff: defaults.DefaultRepeatTimeout / 2,
MaxRetryBackoff: defaults.DefaultRepeatTimeout * 2,
})
n, err := client.XTrimMaxLenApprox(context.TODO(), stream, 0, 0).Result()
log.Debugf("XTrimMaxLenApprox(%s): %d", stream, n)
return err
}

// CleanInfo removes all info messages of device from controller
func (adam *Ctx) CleanInfo(devUUID uuid.UUID) (err error) {
if adam.AdamRemoteRedis {
stream := adam.getInfoRedisStream(devUUID)
return adam.cleanRedisStream(stream)
}
panic("implement me")
}

// CleanMetrics removes all metric messages of device from controller
func (adam *Ctx) CleanMetrics(devUUID uuid.UUID) (err error) {
if adam.AdamRemoteRedis {
stream := adam.getMetricsRedisStream(devUUID)
return adam.cleanRedisStream(stream)
}
panic("implement me")
}

// CleanLogs removes all logs messages of device from controller
func (adam *Ctx) CleanLogs(devUUID uuid.UUID) (err error) {
if adam.AdamRemoteRedis {
stream := adam.getLogsRedisStream(devUUID)
return adam.cleanRedisStream(stream)
}
panic("implement me")
}

// CleanFlowLogs removes all flow logs messages of device from controller
func (adam *Ctx) CleanFlowLogs(devUUID uuid.UUID) (err error) {
if adam.AdamRemoteRedis {
stream := adam.getFlowLogRedisStream(devUUID)
return adam.cleanRedisStream(stream)
}
panic("implement me")
}

// CleanAppLogs removes all app logs messages of app of device from controller
func (adam *Ctx) CleanAppLogs(devUUID uuid.UUID, appUUID uuid.UUID) (err error) {
if adam.AdamRemoteRedis {
stream := adam.getAppsLogsRedisStream(devUUID, appUUID)
return adam.cleanRedisStream(stream)
}
panic("implement me")
}
7 changes: 6 additions & 1 deletion pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
uuid "github.com/satori/go.uuid"
)

//Controller is an interface of controller
// Controller is an interface of controller
type Controller interface {
CertsGet(devUUID uuid.UUID) (out string, err error)
ConfigGet(devUUID uuid.UUID) (out string, err error)
Expand All @@ -28,6 +28,11 @@ type Controller interface {
FlowLogLastCallback(devUUID uuid.UUID, q map[string]string, handler eflowlog.HandlerFunc) (err error)
InfoChecker(devUUID uuid.UUID, q map[string]string, handler einfo.HandlerFunc, mode einfo.InfoCheckerMode, timeout time.Duration) (err error)
InfoLastCallback(devUUID uuid.UUID, q map[string]string, handler einfo.HandlerFunc) (err error)
CleanInfo(devUUID uuid.UUID) (err error)
CleanMetrics(devUUID uuid.UUID) (err error)
CleanLogs(devUUID uuid.UUID) (err error)
CleanFlowLogs(devUUID uuid.UUID) (err error)
CleanAppLogs(devUUID uuid.UUID, appUUID uuid.UUID) (err error)
MetricChecker(devUUID uuid.UUID, q map[string]string, handler emetric.HandlerFunc, mode emetric.MetricCheckerMode, timeout time.Duration) (err error)
MetricLastCallback(devUUID uuid.UUID, q map[string]string, handler emetric.HandlerFunc) (err error)
RequestLastCallback(devUUID uuid.UUID, q map[string]string, handler erequest.HandlerFunc) (err error)
Expand Down
45 changes: 45 additions & 0 deletions pkg/openevec/eden.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/lf-edge/eden/pkg/controller/types"
"github.com/lf-edge/eden/pkg/defaults"
"github.com/lf-edge/eden/pkg/eden"
"github.com/lf-edge/eden/pkg/eve"
"github.com/lf-edge/eden/pkg/models"
"github.com/lf-edge/eden/pkg/utils"
"github.com/lf-edge/eve/api/go/flowlog"
Expand Down Expand Up @@ -841,3 +842,47 @@ func EdenImport(tarFile string, rewriteRoot bool, cfg *EdenSetupArgs) error {

return nil
}

// EdenPrune removes data from the controller
//
//nolint:cyclop
func EdenPrune(_ EdenSetupArgs) error {
changer := &adamChanger{}
ctrl, dev, err := changer.getControllerAndDev()
if err != nil {
return fmt.Errorf("getControllerAndDev: %w", err)
}
state := eve.Init(ctrl, dev)
if err := ctrl.InfoLastCallback(dev.GetID(), nil, state.InfoCallback()); err != nil {
return fmt.Errorf("fail in get InfoLastCallback: %w", err)
}
if err := ctrl.MetricLastCallback(dev.GetID(), nil, state.MetricCallback()); err != nil {
return fmt.Errorf("fail in get MetricLastCallback: %w", err)
}
err = state.Store()
if err != nil {
return fmt.Errorf("state.Store: %w", err)
}
if err := ctrl.CleanInfo(dev.GetID()); err != nil {
return fmt.Errorf("fail in ctrl CleanInfo: %w", err)
}
if err := ctrl.CleanMetrics(dev.GetID()); err != nil {
return fmt.Errorf("fail in ctrl CleanMetrics: %w", err)
}
if err := ctrl.CleanLogs(dev.GetID()); err != nil {
return fmt.Errorf("fail in ctrl CleanLogs: %w", err)
}
if err := ctrl.CleanFlowLogs(dev.GetID()); err != nil {
return fmt.Errorf("fail in ctrl CleanFlowLogs: %w", err)
}
for _, el := range dev.GetApplicationInstances() {
appUUID, err := uuid.FromString(el)
if err != nil {
return err
}
if err := ctrl.CleanAppLogs(dev.GetID(), appUUID); err != nil {
return fmt.Errorf("fail in ctrl CleanAppLogs: %w", err)
}
}
return nil
}

0 comments on commit f1b4581

Please sign in to comment.