From e4058b8729bfc66c79589946a977e3d195e218e9 Mon Sep 17 00:00:00 2001 From: Petr Fedchenkov Date: Fri, 29 Sep 2023 11:56:34 +0300 Subject: [PATCH 1/4] Format files Seems we have quite outdated format for several files Signed-off-by: Petr Fedchenkov --- pkg/projects/state.go | 62 ++++++++++++++++++------------------- pkg/projects/testContext.go | 58 +++++++++++++++++----------------- tests/app/app_test.go | 6 ++-- tests/network/nw_test.go | 6 ++-- tests/volume/vol_test.go | 6 ++-- 5 files changed, 69 insertions(+), 69 deletions(-) diff --git a/pkg/projects/state.go b/pkg/projects/state.go index 273f4f127..3854bab1d 100644 --- a/pkg/projects/state.go +++ b/pkg/projects/state.go @@ -28,13 +28,13 @@ type infoState struct { LastInfoMessageTime *timestamppb.Timestamp } -//State aggregates device state +// State aggregates device state type State struct { device *device.Ctx deviceInfo *infoState } -//InitState init State object for device +// InitState init State object for device func InitState(device *device.Ctx) *State { return &State{device: device, deviceInfo: &infoState{}} } @@ -107,7 +107,7 @@ func (state *State) getProcessorInfo() einfo.HandlerFunc { } } -//GetInfoProcessingFunction returns processing function for ZInfoMsg +// GetInfoProcessingFunction returns processing function for ZInfoMsg func (state *State) GetInfoProcessingFunction() ProcInfoFunc { return func(infoMsg *info.ZInfoMsg) error { return state.processInfo(infoMsg) @@ -133,89 +133,89 @@ func (state *State) getProcessorMetric() emetric.HandlerFunc { } } -//GetMetricProcessingFunction returns processing function for ZMetricMsg +// GetMetricProcessingFunction returns processing function for ZMetricMsg func (state *State) GetMetricProcessingFunction() ProcMetricFunc { return func(metricMsg *metrics.ZMetricMsg) error { return state.processMetric(metricMsg) } } -//GetDinfo get *info.ZInfoDevice from obtained info +// GetDinfo get *info.ZInfoDevice from obtained info func (state *State) GetDinfo() *info.ZInfoDevice { return state.deviceInfo.Dinfo } -//GetAinfoSlice get []*info.ZInfoApp from obtained info +// GetAinfoSlice get []*info.ZInfoApp from obtained info func (state *State) GetAinfoSlice() []*info.ZInfoApp { return state.deviceInfo.Ainfo } -//GetNiinfoSlice get []*info.ZInfoNetworkInstance from obtained info +// GetNiinfoSlice get []*info.ZInfoNetworkInstance from obtained info func (state *State) GetNiinfoSlice() []*info.ZInfoNetworkInstance { return state.deviceInfo.Niinfo } -//GetVinfoSlice get []*info.ZInfoVolume from obtained info +// GetVinfoSlice get []*info.ZInfoVolume from obtained info func (state *State) GetVinfoSlice() []*info.ZInfoVolume { return state.deviceInfo.Vinfo } -//GetCinfoSlice get []*info.ZInfoContentTree from obtained info +// GetCinfoSlice get []*info.ZInfoContentTree from obtained info func (state *State) GetCinfoSlice() []*info.ZInfoContentTree { return state.deviceInfo.Cinfo } -//GetBinfoSlice get []*info.ZInfoBlob from obtained info +// GetBinfoSlice get []*info.ZInfoBlob from obtained info func (state *State) GetBinfoSlice() []*info.ZInfoBlob { return state.deviceInfo.Binfo } -//GetAppMetrics get []*metrics.AppMetric from obtained metrics +// GetAppMetrics get []*metrics.AppMetric from obtained metrics func (state *State) GetAppMetrics() []*metrics.AppMetric { return state.deviceInfo.AppMetrics } -//GetNetworkInstanceMetrics get []*metrics.ZMetricNetworkInstance from obtained metrics +// GetNetworkInstanceMetrics get []*metrics.ZMetricNetworkInstance from obtained metrics func (state *State) GetNetworkInstanceMetrics() []*metrics.ZMetricNetworkInstance { return state.deviceInfo.NetworkInstanceMetrics } -//GetVolumeMetrics get []*metrics.ZMetricVolume from obtained metrics +// GetVolumeMetrics get []*metrics.ZMetricVolume from obtained metrics func (state *State) GetVolumeMetrics() []*metrics.ZMetricVolume { return state.deviceInfo.VolumeMetrics } -//GetDeviceMetrics get *metrics.DeviceMetric from obtained metrics +// GetDeviceMetrics get *metrics.DeviceMetric from obtained metrics func (state *State) GetDeviceMetrics() *metrics.DeviceMetric { return state.deviceInfo.DeviceMetrics } -//GetLastInfoTime get *timestamp.Timestamp for last received info +// GetLastInfoTime get *timestamp.Timestamp for last received info func (state *State) GetLastInfoTime() *timestamppb.Timestamp { return state.deviceInfo.LastInfoMessageTime } -//LookUp access fields of State objects by path -//path contains address to lookup -//for example: LookUp("Dinfo.Network[0].IPAddrs[0]") will return first IP of first network of EVE -//All top fields to lookup in: -//Dinfo *info.ZInfoDevice -//Ainfo []*info.ZInfoApp -//Niinfo []*info.ZInfoNetworkInstance -//Vinfo []*info.ZInfoVolume -//Cinfo []*info.ZInfoContentTree -//Binfo []*info.ZInfoBlob -//Cipherinfo []*info.ZInfoCipher -//AppMetrics []*metrics.AppMetric -//NetworkInstanceMetrics []*metrics.ZMetricNetworkInstance -//VolumeMetrics []*metrics.ZMetricVolume -//DeviceMetrics *metrics.DeviceMetric +// LookUp access fields of State objects by path +// path contains address to lookup +// for example: LookUp("Dinfo.Network[0].IPAddrs[0]") will return first IP of first network of EVE +// All top fields to lookup in: +// Dinfo *info.ZInfoDevice +// Ainfo []*info.ZInfoApp +// Niinfo []*info.ZInfoNetworkInstance +// Vinfo []*info.ZInfoVolume +// Cinfo []*info.ZInfoContentTree +// Binfo []*info.ZInfoBlob +// Cipherinfo []*info.ZInfoCipher +// AppMetrics []*metrics.AppMetric +// NetworkInstanceMetrics []*metrics.ZMetricNetworkInstance +// VolumeMetrics []*metrics.ZMetricVolume +// DeviceMetrics *metrics.DeviceMetric func (state *State) LookUp(path string) (value reflect.Value, err error) { value, err = utils.LookUp(state.deviceInfo, path) return } -//CheckReady returns true in all needed information obtained from controller +// CheckReady returns true in all needed information obtained from controller func (state *State) CheckReady() bool { if state.deviceInfo.Dinfo == nil { return false diff --git a/pkg/projects/testContext.go b/pkg/projects/testContext.go index 93e235ff6..9b9281c20 100644 --- a/pkg/projects/testContext.go +++ b/pkg/projects/testContext.go @@ -21,7 +21,7 @@ import ( "github.com/spf13/viper" ) -//GetControllerMode parse url with controller +// GetControllerMode parse url with controller func GetControllerMode(controllerMode string) (modeType, modeURL string, err error) { params := utils.GetParams(controllerMode, defaults.DefaultControllerModePattern) if len(params) == 0 { @@ -37,7 +37,7 @@ func GetControllerMode(controllerMode string) (modeType, modeURL string, err err return } -//TestContext is main structure for running tests +// TestContext is main structure for running tests type TestContext struct { cloud controller.Cloud project *Project @@ -51,7 +51,7 @@ type TestContext struct { addTime time.Duration } -//NewTestContext creates new TestContext +// NewTestContext creates new TestContext func NewTestContext() *TestContext { var ( err error @@ -127,7 +127,7 @@ func NewTestContext() *TestContext { return tstCtx } -//GetNodeDescriptions returns list of nodes from config +// GetNodeDescriptions returns list of nodes from config func (tc *TestContext) GetNodeDescriptions() (nodes []*EdgeNodeDescription) { if eveList := viper.GetStringMap("test.eve"); len(eveList) > 0 { for name := range eveList { @@ -147,7 +147,7 @@ func (tc *TestContext) GetNodeDescriptions() (nodes []*EdgeNodeDescription) { return } -//GetController returns current controller +// GetController returns current controller func (tc *TestContext) GetController() controller.Cloud { if tc.cloud == nil { log.Fatal("Controller not initialized") @@ -155,12 +155,12 @@ func (tc *TestContext) GetController() controller.Cloud { return tc.cloud } -//InitProject init project object with defined name +// InitProject init project object with defined name func (tc *TestContext) InitProject(name string) { tc.project = &Project{name: name} } -//AddEdgeNodesFromDescription adds EdgeNodes from description in test.eve param +// AddEdgeNodesFromDescription adds EdgeNodes from description in test.eve param func (tc *TestContext) AddEdgeNodesFromDescription() { for _, node := range tc.GetNodeDescriptions() { edgeNode := node.GetEdgeNode(tc) @@ -180,10 +180,10 @@ func (tc *TestContext) AddEdgeNodesFromDescription() { } } -//GetEdgeNodeOpts pattern to pass device modifications +// GetEdgeNodeOpts pattern to pass device modifications type GetEdgeNodeOpts func(*device.Ctx) bool -//WithTest assign *testing.T for device +// WithTest assign *testing.T for device func (tc *TestContext) WithTest(t *testing.T) GetEdgeNodeOpts { return func(d *device.Ctx) bool { tc.tests[d] = t @@ -191,7 +191,7 @@ func (tc *TestContext) WithTest(t *testing.T) GetEdgeNodeOpts { } } -//GetEdgeNode return node from context +// GetEdgeNode return node from context func (tc *TestContext) GetEdgeNode(opts ...GetEdgeNodeOpts) *device.Ctx { Node: for _, el := range tc.nodes { @@ -205,12 +205,12 @@ Node: return nil } -//AddNode add node to test context +// AddNode add node to test context func (tc *TestContext) AddNode(node *device.Ctx) { tc.nodes = append(tc.nodes, node) } -//UpdateEdgeNode update edge node +// UpdateEdgeNode update edge node func (tc *TestContext) UpdateEdgeNode(edgeNode *device.Ctx, opts ...EdgeNodeOption) { for _, opt := range opts { opt(edgeNode) @@ -218,7 +218,7 @@ func (tc *TestContext) UpdateEdgeNode(edgeNode *device.Ctx, opts ...EdgeNodeOpti tc.ConfigSync(edgeNode) } -//NewEdgeNode creates edge node +// NewEdgeNode creates edge node func (tc *TestContext) NewEdgeNode(opts ...EdgeNodeOption) *device.Ctx { d := device.CreateEdgeNode() for _, opt := range opts { @@ -231,7 +231,7 @@ func (tc *TestContext) NewEdgeNode(opts ...EdgeNodeOption) *device.Ctx { return d } -//ConfigSync send config to controller +// ConfigSync send config to controller func (tc *TestContext) ConfigSync(edgeNode *device.Ctx) { if edgeNode.GetState() == device.NotOnboarded { if err := tc.GetController().OnBoardDev(edgeNode); err != nil { @@ -245,13 +245,13 @@ func (tc *TestContext) ConfigSync(edgeNode *device.Ctx) { } } -//ExpandOnSuccess adds additional time to global timeout on every success check +// ExpandOnSuccess adds additional time to global timeout on every success check func (tc *TestContext) ExpandOnSuccess(secs int) { tc.addTime = time.Duration(secs) * time.Second } -//WaitForProcWithErrorCallback blocking execution until the time elapses or all Procs gone -//and fires callback in case of timeout +// WaitForProcWithErrorCallback blocking execution until the time elapses or all Procs gone +// and fires callback in case of timeout func (tc *TestContext) WaitForProcWithErrorCallback(secs int, callback Callback) { defer func() { tc.addTime = 0 }() //reset addTime on exit defer tc.procBus.clean() @@ -290,8 +290,8 @@ func (tc *TestContext) WaitForProcWithErrorCallback(secs int, callback Callback) } } -//WaitForProc blocking execution until the time elapses or all Procs gone -//returns error on timeout +// WaitForProc blocking execution until the time elapses or all Procs gone +// returns error on timeout func (tc *TestContext) WaitForProc(secs int) { timeout := time.Duration(secs) * time.Second callback := func() { @@ -305,38 +305,38 @@ func (tc *TestContext) WaitForProc(secs int) { tc.WaitForProcWithErrorCallback(secs, callback) } -//AddProcLog add processFunction, that will get all logs for edgeNode +// AddProcLog add processFunction, that will get all logs for edgeNode func (tc *TestContext) AddProcLog(edgeNode *device.Ctx, processFunction ProcLogFunc) { tc.procBus.addProc(edgeNode, processFunction) } -//AddProcAppLog add processFunction, that will get all app logs for edgeNode +// AddProcAppLog add processFunction, that will get all app logs for edgeNode func (tc *TestContext) AddProcAppLog(edgeNode *device.Ctx, appUUID uuid.UUID, processFunction ProcAppLogFunc) { tc.procBus.addAppProc(edgeNode, appUUID, processFunction) } -//AddProcFlowLog add processFunction, that will get all FlowLogs for edgeNode +// AddProcFlowLog add processFunction, that will get all FlowLogs for edgeNode func (tc *TestContext) AddProcFlowLog(edgeNode *device.Ctx, processFunction ProcLogFlowFunc) { tc.procBus.addProc(edgeNode, processFunction) } -//AddProcInfo add processFunction, that will get all info for edgeNode +// AddProcInfo add processFunction, that will get all info for edgeNode func (tc *TestContext) AddProcInfo(edgeNode *device.Ctx, processFunction ProcInfoFunc) { tc.procBus.addProc(edgeNode, processFunction) } -//AddProcMetric add processFunction, that will get all metrics for edgeNode +// AddProcMetric add processFunction, that will get all metrics for edgeNode func (tc *TestContext) AddProcMetric(edgeNode *device.Ctx, processFunction ProcMetricFunc) { tc.procBus.addProc(edgeNode, processFunction) } -//AddProcTimer add processFunction, that will fire with time intervals for edgeNode +// AddProcTimer add processFunction, that will fire with time intervals for edgeNode func (tc *TestContext) AddProcTimer(edgeNode *device.Ctx, processFunction ProcTimerFunc) { tc.procBus.addProc(edgeNode, processFunction) } -//StartTrackingState init function for State monitoring -//if onlyNewElements set no use old information from controller +// StartTrackingState init function for State monitoring +// if onlyNewElements set no use old information from controller func (tc *TestContext) StartTrackingState(onlyNewElements bool) { tc.states = map[*device.Ctx]*State{} for _, dev := range tc.nodes { @@ -355,7 +355,7 @@ func (tc *TestContext) StartTrackingState(onlyNewElements bool) { } } -//WaitForState wait for State initialization from controller +// WaitForState wait for State initialization from controller func (tc *TestContext) WaitForState(edgeNode *device.Ctx, secs int) { state, isOk := tc.states[edgeNode] if !isOk { @@ -391,7 +391,7 @@ func (tc *TestContext) WaitForState(edgeNode *device.Ctx, secs int) { } } -//GetState returns State object for edgeNode +// GetState returns State object for edgeNode func (tc *TestContext) GetState(edgeNode *device.Ctx) *State { return tc.states[edgeNode] } diff --git a/tests/app/app_test.go b/tests/app/app_test.go index 2cb992625..69feb32a1 100644 --- a/tests/app/app_test.go +++ b/tests/app/app_test.go @@ -123,7 +123,7 @@ func checkState(eveState *eve.State, state string, appNames []string) error { return nil } -//checkApp wait for info of ZInfoApp type with state +// checkApp wait for info of ZInfoApp type with state func checkApp(state string, appNames []string) projects.ProcInfoFunc { return func(msg *info.ZInfoMsg) error { eveState.InfoCallback()(msg) //feed state with new info @@ -131,8 +131,8 @@ func checkApp(state string, appNames []string) projects.ProcInfoFunc { } } -//TestAppStatus wait for application reaching the selected state -//with a timewait +// TestAppStatus wait for application reaching the selected state +// with a timewait func TestAppStatus(t *testing.T) { edgeNode := tc.GetEdgeNode(tc.WithTest(t)) diff --git a/tests/network/nw_test.go b/tests/network/nw_test.go index 956610c50..47a8a36a7 100644 --- a/tests/network/nw_test.go +++ b/tests/network/nw_test.go @@ -110,7 +110,7 @@ func checkState(eveState *eve.State, state string, netNames []string) error { return nil } -//checkNet wait for info of ZInfoApp type with state +// checkNet wait for info of ZInfoApp type with state func checkNet(state string, volNames []string) projects.ProcInfoFunc { return func(msg *info.ZInfoMsg) error { eveState.InfoCallback()(msg) //feed state with new info @@ -118,8 +118,8 @@ func checkNet(state string, volNames []string) projects.ProcInfoFunc { } } -//TestNetworkStatus wait for networks reaching the selected state -//with a timewait +// TestNetworkStatus wait for networks reaching the selected state +// with a timewait func TestNetworkStatus(t *testing.T) { edgeNode := tc.GetEdgeNode(tc.WithTest(t)) diff --git a/tests/volume/vol_test.go b/tests/volume/vol_test.go index 5a49b6a37..34dfa3c4f 100644 --- a/tests/volume/vol_test.go +++ b/tests/volume/vol_test.go @@ -114,7 +114,7 @@ func checkState(eveState *eve.State, state string, volNames []string) error { return nil } -//checkVol wait for info of ZInfoApp type with state +// checkVol wait for info of ZInfoApp type with state func checkVol(state string, volNames []string) projects.ProcInfoFunc { return func(msg *info.ZInfoMsg) error { eveState.InfoCallback()(msg) //feed state with new info @@ -122,8 +122,8 @@ func checkVol(state string, volNames []string) projects.ProcInfoFunc { } } -//TestVolStatus wait for application reaching the selected state -//with a timewait +// TestVolStatus wait for application reaching the selected state +// with a timewait func TestVolStatus(t *testing.T) { edgeNode := tc.GetEdgeNode(tc.WithTest(t)) From 8c81107e8b586979f6bc05ad6c2ac6ecc695906c Mon Sep 17 00:00:00 2001 From: Petr Fedchenkov Date: Fri, 28 Jul 2023 19:30:41 +0300 Subject: [PATCH 2/4] Implement detachable state We try to calculate state of EdgeNode and objects inside of it. We construct the state based on info and metric messages from the controller. Info and metric messages are great for debugging, but in case of long time work we may hit the problem where state calculation consume more and more time. Let's slightly refactor the state logic to be able to store it locally and reuse. Signed-off-by: Petr Fedchenkov --- pkg/eve/applications.go | 85 +++++++++++++++-------- pkg/eve/eve.go | 66 ++++++++++++++++++ pkg/eve/networks.go | 42 ++++++++--- pkg/eve/state.go | 134 ++++++++++++++++++++++++++---------- pkg/eve/volumes.go | 48 +++++++++---- pkg/openevec/eve.go | 44 +++++------- pkg/openevec/status.go | 25 +++---- pkg/projects/state.go | 14 +++- pkg/projects/testContext.go | 2 +- tests/app/app_test.go | 32 +++------ tests/network/nw_test.go | 30 +++----- tests/reboot/reboot_test.go | 32 ++++----- tests/volume/vol_test.go | 30 +++----- 13 files changed, 370 insertions(+), 214 deletions(-) create mode 100644 pkg/eve/eve.go diff --git a/pkg/eve/applications.go b/pkg/eve/applications.go index 7240ce0bd..92858c6f4 100644 --- a/pkg/eve/applications.go +++ b/pkg/eve/applications.go @@ -30,16 +30,17 @@ type AppInstState struct { ExternalIP string InternalPort string ExternalPort string + Metadata string MemoryUsed uint32 MemoryAvail uint32 CPUUsage int Macs []string Volumes map[string]uint32 - prevCPUNS uint64 - prevCPUNSTime time.Time - deleted bool - infoTime time.Time + PrevCPUNS uint64 + PrevCPUNSTime time.Time + Deleted bool + InfoTime time.Time } func appStateHeader() string { @@ -117,7 +118,7 @@ func getPortMapping(appConfig *config.AppInstanceConfig, qemuPorts map[string]st } func (ctx *State) initApplications(ctrl controller.Cloud, dev *device.Ctx) error { - ctx.applications = make(map[string]*AppInstState) + ctx.Applications = make(map[string]*AppInstState) for _, el := range dev.GetApplicationInstances() { app, err := ctrl.GetApplicationInstanceConfig(el) if err != nil { @@ -144,24 +145,44 @@ func (ctx *State) initApplications(ctrl controller.Cloud, dev *device.Ctx) error Volumes: volumes, UUID: app.Uuidandversion.Uuid, } - ctx.applications[app.Uuidandversion.Uuid] = appStateObj + ctx.Applications[app.Uuidandversion.Uuid] = appStateObj } return nil } +func (ctx *State) applyOldStateApps(state *State) { + for stateID, stateEL := range state.Applications { + found := false + for id := range ctx.Applications { + if id != stateID { + continue + } + ctx.Applications[id] = stateEL + found = true + } + if !found { + if stateEL.Deleted { + continue + } + stateEL.AdamState = notInControllerConfig + ctx.Applications[stateID] = stateEL + } + } +} + func (ctx *State) processApplicationsByMetric(msg *metrics.ZMetricMsg) { if appMetrics := msg.GetAm(); appMetrics != nil { for _, appMetric := range appMetrics { - for _, el := range ctx.applications { + for _, el := range ctx.Applications { if appMetric.AppID == el.UUID { el.MemoryAvail = appMetric.Memory.GetAvailMem() el.MemoryUsed = appMetric.Memory.GetUsedMem() // if not restarted - if el.prevCPUNS < appMetric.Cpu.TotalNs { - el.CPUUsage = int(float32(appMetric.Cpu.TotalNs-el.prevCPUNS) / float32(msg.GetAtTimeStamp().AsTime().Sub(el.prevCPUNSTime).Nanoseconds()) * 100.0) + if el.PrevCPUNS < appMetric.Cpu.TotalNs { + el.CPUUsage = int(float32(appMetric.Cpu.TotalNs-el.PrevCPUNS) / float32(msg.GetAtTimeStamp().AsTime().Sub(el.PrevCPUNSTime).Nanoseconds()) * 100.0) } - el.prevCPUNS = appMetric.Cpu.TotalNs - el.prevCPUNSTime = msg.GetAtTimeStamp().AsTime() + el.PrevCPUNS = appMetric.Cpu.TotalNs + el.PrevCPUNSTime = msg.GetAtTimeStamp().AsTime() break } } @@ -169,10 +190,11 @@ func (ctx *State) processApplicationsByMetric(msg *metrics.ZMetricMsg) { } } +//nolint:cyclop func (ctx *State) processApplicationsByInfo(im *info.ZInfoMsg) { switch im.GetZtype() { case info.ZInfoTypes_ZiVolume: - for _, app := range ctx.applications { + for _, app := range ctx.Applications { if len(app.Volumes) == 0 { continue } @@ -188,8 +210,15 @@ func (ctx *State) processApplicationsByInfo(im *info.ZInfoMsg) { app.EVEState = fmt.Sprintf("%s (%d%%)", info.ZSwState_DOWNLOAD_STARTED.String(), int(percent)/len(app.Volumes)) } } + case info.ZInfoTypes_ZiAppInstMetaData: + for _, app := range ctx.Applications { + if im.GetAmdinfo().Uuid == app.UUID { + app.Metadata = string(im.GetAmdinfo().Data) + break + } + } case info.ZInfoTypes_ZiApp: - appStateObj, ok := ctx.applications[im.GetAinfo().AppID] + appStateObj, ok := ctx.Applications[im.GetAinfo().AppID] if !ok { appStateObj = &AppInstState{ Name: im.GetAinfo().AppName, @@ -197,7 +226,7 @@ func (ctx *State) processApplicationsByInfo(im *info.ZInfoMsg) { AdamState: notInControllerConfig, UUID: im.GetAinfo().AppID, } - ctx.applications[im.GetAinfo().AppID] = appStateObj + ctx.Applications[im.GetAinfo().AppID] = appStateObj } appStateObj.EVEState = im.GetAinfo().State.String() if len(im.GetAinfo().AppErr) > 0 { @@ -227,20 +256,20 @@ func (ctx *State) processApplicationsByInfo(im *info.ZInfoMsg) { //check appStateObj not defined in adam if appStateObj.AdamState != inControllerConfig { if im.GetAinfo().AppID == appStateObj.UUID { - appStateObj.deleted = false //if in recent ZInfoTypes_ZiApp, then not deleted + appStateObj.Deleted = false //if in recent ZInfoTypes_ZiApp, then not deleted } } if im.GetAinfo().State == info.ZSwState_INVALID { - appStateObj.deleted = true + appStateObj.Deleted = true } - appStateObj.infoTime = im.AtTimeStamp.AsTime() + appStateObj.InfoTime = im.AtTimeStamp.AsTime() case info.ZInfoTypes_ZiNetworkInstance: //try to find ips from NetworkInstances for _, el := range im.GetNiinfo().IpAssignments { // nothing to show if no IpAddress received if len(el.IpAddress) == 0 { continue } - for _, appStateObj := range ctx.applications { + for _, appStateObj := range ctx.Applications { for ind, mac := range appStateObj.Macs { if mac == el.MacAddress { appStateObj.InternalIP[ind] = el.IpAddress[0] @@ -250,7 +279,7 @@ func (ctx *State) processApplicationsByInfo(im *info.ZInfoMsg) { } case info.ZInfoTypes_ZiDevice: for _, el := range im.GetDinfo().AppInstances { - if _, ok := ctx.applications[el.Uuid]; !ok { + if _, ok := ctx.Applications[el.Uuid]; !ok { appStateObj := &AppInstState{ Name: el.Name, Image: "-", @@ -258,10 +287,10 @@ func (ctx *State) processApplicationsByInfo(im *info.ZInfoMsg) { EVEState: "UNKNOWN", UUID: el.Uuid, } - ctx.applications[el.Uuid] = appStateObj + ctx.Applications[el.Uuid] = appStateObj } } - for _, appStateObj := range ctx.applications { + for _, appStateObj := range ctx.Applications { seen := false for _, el := range im.GetDinfo().AppInstances { if appStateObj.UUID == el.Uuid { @@ -289,12 +318,12 @@ func (ctx *State) processApplicationsByInfo(im *info.ZInfoMsg) { appStateObj.ExternalIP = "127.0.0.1" } //check appStateObj not defined in adam - if appStateObj.AdamState != inControllerConfig && appStateObj.infoTime.Before(im.AtTimeStamp.AsTime()) { - appStateObj.deleted = true + if appStateObj.AdamState != inControllerConfig && appStateObj.InfoTime.Before(im.AtTimeStamp.AsTime()) { + appStateObj.Deleted = true for _, el := range im.GetDinfo().AppInstances { - //if in recent ZInfoTypes_ZiDevice with timestamp after ZInfoTypes_ZiApp, than not deleted + //if in recent ZInfoTypes_ZiDevice with timestamp after ZInfoTypes_ZiApp, then not deleted if el.Uuid == appStateObj.UUID { - appStateObj.deleted = false + appStateObj.Deleted = false } } } @@ -308,8 +337,8 @@ func (ctx *State) printPodListLines() error { if _, err := fmt.Fprintln(w, appStateHeader()); err != nil { return err } - appStatesSlice := make([]*AppInstState, 0, len(ctx.Applications())) - appStatesSlice = append(appStatesSlice, ctx.Applications()...) + appStatesSlice := make([]*AppInstState, 0, len(ctx.NotDeletedApplications())) + appStatesSlice = append(appStatesSlice, ctx.NotDeletedApplications()...) sort.SliceStable(appStatesSlice, func(i, j int) bool { return appStatesSlice[i].Name < appStatesSlice[j].Name }) @@ -322,7 +351,7 @@ func (ctx *State) printPodListLines() error { } func (ctx *State) printPodListJSON() error { - result, err := json.MarshalIndent(ctx.Applications(), "", " ") + result, err := json.MarshalIndent(ctx.NotDeletedApplications(), "", " ") if err != nil { return err } diff --git a/pkg/eve/eve.go b/pkg/eve/eve.go new file mode 100644 index 000000000..250623f9f --- /dev/null +++ b/pkg/eve/eve.go @@ -0,0 +1,66 @@ +package eve + +import ( + "time" + + "github.com/lf-edge/eden/pkg/controller" + "github.com/lf-edge/eden/pkg/device" + "github.com/lf-edge/eve/api/go/info" + "github.com/lf-edge/eve/api/go/metrics" +) + +// NodeState describes state of edge node +type NodeState struct { + UsedMem uint32 + AvailMem uint32 + UsedPercentageMem float64 + + LastRebootTime time.Time + LastRebootReason string + + // interface to ip mapping + RemoteIPs map[string][]string + + LastSeen time.Time + + Version string +} + +func (ctx *State) initNodeState(_ controller.Cloud, _ *device.Ctx) error { + ctx.EveState = &NodeState{} + return nil +} + +func (ctx *State) applyOldStateNodeState(state *State) { + ctx.EveState = state.EveState +} + +func (ctx *State) processNodeStateByInfo(msg *info.ZInfoMsg) { + infoTime := msg.AtTimeStamp.AsTime() + if infoTime.After(ctx.EveState.LastSeen) { + ctx.EveState.LastSeen = infoTime + } + if deviceInfo := msg.GetDinfo(); deviceInfo != nil { + ctx.EveState.RemoteIPs = make(map[string][]string) + for _, nw := range deviceInfo.Network { + ctx.EveState.RemoteIPs[nw.LocalName] = nw.IPAddrs + } + ctx.EveState.LastRebootTime = deviceInfo.LastRebootTime.AsTime() + ctx.EveState.LastRebootReason = deviceInfo.LastRebootReason + if len(deviceInfo.SwList) > 0 { + ctx.EveState.Version = deviceInfo.SwList[0].ShortVersion + } + } +} + +func (ctx *State) processNodeStateByMetric(msg *metrics.ZMetricMsg) { + metricTime := msg.AtTimeStamp.AsTime() + if metricTime.After(ctx.EveState.LastSeen) { + ctx.EveState.LastSeen = metricTime + } + if deviceMetric := msg.GetDm(); deviceMetric != nil { + ctx.EveState.AvailMem = deviceMetric.Memory.GetAvailMem() + ctx.EveState.UsedMem = deviceMetric.Memory.GetUsedMem() + ctx.EveState.UsedPercentageMem = deviceMetric.Memory.GetUsedPercentage() + } +} diff --git a/pkg/eve/networks.go b/pkg/eve/networks.go index d03904934..b9b46b76a 100644 --- a/pkg/eve/networks.go +++ b/pkg/eve/networks.go @@ -25,7 +25,7 @@ type NetInstState struct { AdamState string EveState string Activated bool - deleted bool + Deleted bool } func netInstStateHeader() string { @@ -40,7 +40,7 @@ func (netInstStateObj *NetInstState) toString() string { } func (ctx *State) initNetworks(ctrl controller.Cloud, dev *device.Ctx) error { - ctx.networks = make(map[string]*NetInstState) + ctx.Networks = make(map[string]*NetInstState) for _, el := range dev.GetNetworkInstances() { ni, err := ctrl.GetNetworkInstanceConfig(el) if err != nil { @@ -55,15 +55,35 @@ func (ctx *State) initNetworks(ctrl controller.Cloud, dev *device.Ctx) error { CIDR: ni.Ip.Subnet, NetworkType: ni.InstType.String(), } - ctx.networks[ni.Uuidandversion.Uuid] = netInstStateObj + ctx.Networks[ni.Uuidandversion.Uuid] = netInstStateObj } return nil } +func (ctx *State) applyOldStateNetworks(state *State) { + for stateID, stateEL := range state.Networks { + found := false + for id := range ctx.Networks { + if id != stateID { + continue + } + ctx.Networks[id] = stateEL + found = true + } + if !found { + if stateEL.Deleted { + continue + } + stateEL.AdamState = notInControllerConfig + ctx.Networks[stateID] = stateEL + } + } +} + func (ctx *State) processNetworksByInfo(im *info.ZInfoMsg) { switch im.GetZtype() { case info.ZInfoTypes_ZiNetworkInstance: - netInstStateObj, ok := ctx.networks[im.GetNiinfo().GetNetworkID()] + netInstStateObj, ok := ctx.Networks[im.GetNiinfo().GetNetworkID()] if !ok { netInstStateObj = &NetInstState{ Name: im.GetNiinfo().GetDisplayname(), @@ -73,7 +93,7 @@ func (ctx *State) processNetworksByInfo(im *info.ZInfoMsg) { EveState: "UNKNOWN", NetworkType: (config.ZNetworkInstType)(int32(im.GetNiinfo().InstType)).String(), } - ctx.networks[im.GetNiinfo().GetNetworkID()] = netInstStateObj + ctx.Networks[im.GetNiinfo().GetNetworkID()] = netInstStateObj } netInstStateObj.EveState = im.GetNiinfo().State.String() netInstStateObj.Activated = im.GetNiinfo().Activated @@ -98,12 +118,12 @@ func (ctx *State) processNetworksByInfo(im *info.ZInfoMsg) { if !netInstStateObj.Activated && im.GetNiinfo().State != info.ZNetworkInstanceState_ZNETINST_STATE_INIT && netInstStateObj.AdamState == notInControllerConfig { - netInstStateObj.deleted = true + netInstStateObj.Deleted = true } if im.GetNiinfo().State == info.ZNetworkInstanceState_ZNETINST_STATE_UNSPECIFIED && netInstStateObj.AdamState == notInControllerConfig { - netInstStateObj.deleted = true + netInstStateObj.Deleted = true } } } @@ -112,7 +132,7 @@ func (ctx *State) processNetworksByMetric(msg *metrics.ZMetricMsg) { if networkMetrics := msg.GetNm(); networkMetrics != nil { for _, networkMetric := range networkMetrics { // XXX use [uuid] instead of loop - for _, el := range ctx.networks { + for _, el := range ctx.Networks { if networkMetric.NetworkID == el.UUID { el.Stats = networkMetric.GetNetworkStats().String() break @@ -128,8 +148,8 @@ func (ctx *State) printNetListLines() error { if _, err := fmt.Fprintln(w, netInstStateHeader()); err != nil { return err } - netInstStatesSlice := make([]*NetInstState, 0, len(ctx.Networks())) - netInstStatesSlice = append(netInstStatesSlice, ctx.Networks()...) + netInstStatesSlice := make([]*NetInstState, 0, len(ctx.NotDeletedNetworks())) + netInstStatesSlice = append(netInstStatesSlice, ctx.NotDeletedNetworks()...) sort.SliceStable(netInstStatesSlice, func(i, j int) bool { return netInstStatesSlice[i].Name < netInstStatesSlice[j].Name }) @@ -142,7 +162,7 @@ func (ctx *State) printNetListLines() error { } func (ctx *State) printNetListJSON() error { - result, err := json.MarshalIndent(ctx.Networks(), "", " ") + result, err := json.MarshalIndent(ctx.NotDeletedNetworks(), "", " ") if err != nil { return err } diff --git a/pkg/eve/state.go b/pkg/eve/state.go index cbf447951..d00929c97 100644 --- a/pkg/eve/state.go +++ b/pkg/eve/state.go @@ -1,11 +1,17 @@ package eve import ( + "encoding/json" + "errors" + "fmt" + "os" + "path/filepath" + "github.com/lf-edge/eden/pkg/controller" "github.com/lf-edge/eden/pkg/controller/einfo" "github.com/lf-edge/eden/pkg/controller/emetric" "github.com/lf-edge/eden/pkg/device" - "github.com/lf-edge/eden/pkg/projects" + "github.com/lf-edge/eden/pkg/utils" "github.com/lf-edge/eve/api/go/info" "github.com/lf-edge/eve/api/go/metrics" log "github.com/sirupsen/logrus" @@ -14,23 +20,22 @@ import ( const ( inControllerConfig = "IN_CONFIG" notInControllerConfig = "NOT_IN_CONFIG" + stateFileTemplate = "state_store_%s.json" ) -//State stores representation of EVE state -//we should assign InfoCallback and MetricCallback to update state +// State stores representation of EVE state +// we should assign InfoCallback and MetricCallback to update state type State struct { - applications map[string]*AppInstState - networks map[string]*NetInstState - volumes map[string]*VolInstState - infoAndMetrics *projects.State - device *device.Ctx + Applications map[string]*AppInstState + Networks map[string]*NetInstState + Volumes map[string]*VolInstState + EveState *NodeState + device *device.Ctx } -//Init State object with controller and device +// Init State object with controller and device func Init(ctrl controller.Cloud, dev *device.Ctx) (ctx *State) { - ctx = &State{device: dev, infoAndMetrics: projects.InitState(dev)} - ctx.applications = make(map[string]*AppInstState) - ctx.networks = make(map[string]*NetInstState) + ctx = &State{device: dev} if err := ctx.initApplications(ctrl, dev); err != nil { log.Fatalf("EVE State initApplications error: %s", err) } @@ -40,69 +45,122 @@ func Init(ctrl controller.Cloud, dev *device.Ctx) (ctx *State) { if err := ctx.initNetworks(ctrl, dev); err != nil { log.Fatalf("EVE State initNetworks error: %s", err) } + if err := ctx.initNodeState(ctrl, dev); err != nil { + log.Fatalf("EVE State initNodeState error: %s", err) + } + if err := ctx.Load(); err != nil { + log.Fatalf("EVE State Load error: %s", err) + } return } -//InfoAndMetrics returns last info and metric objects -func (ctx *State) InfoAndMetrics() *projects.State { - return ctx.infoAndMetrics +func (ctx *State) getStateFile() (string, error) { + edenDir, err := utils.DefaultEdenDir() + if err != nil { + return "", err + } + return filepath.Join(edenDir, fmt.Sprintf(stateFileTemplate, ctx.device.GetID().String())), nil +} + +// Store state into file +func (ctx *State) Store() error { + data, err := json.Marshal(ctx) + if err != nil { + return err + } + stateFile, err := ctx.getStateFile() + if err != nil { + return err + } + return os.WriteFile(stateFile, data, 0600) +} + +// Load state from file +func (ctx *State) Load() error { + stateFile, err := ctx.getStateFile() + if err != nil { + return err + } + data, err := os.ReadFile(stateFile) + if errors.Is(err, os.ErrNotExist) { + return nil + } + if err != nil { + return err + } + var obj *State + err = json.Unmarshal(data, &obj) + if err != nil { + return err + } + ctx.applyOldStateApps(obj) + ctx.applyOldStateNetworks(obj) + ctx.applyOldStateVolumes(obj) + ctx.applyOldStateNodeState(obj) + return nil +} + +// Prepared returns true if we have enough info to work +func (ctx *State) Prepared() bool { + return ctx.EveState.LastSeen.Unix() != 0 } -//Applications extracts applications states -func (ctx *State) Applications() []*AppInstState { - v := make([]*AppInstState, 0, len(ctx.applications)) - for _, value := range ctx.applications { - if !value.deleted { +// NotDeletedApplications extracts AppInstState which are not marked as deleted +func (ctx *State) NotDeletedApplications() []*AppInstState { + v := make([]*AppInstState, 0, len(ctx.Applications)) + for _, value := range ctx.Applications { + if !value.Deleted { v = append(v, value) } } return v } -//Networks extracts networks states -func (ctx *State) Networks() []*NetInstState { - v := make([]*NetInstState, 0, len(ctx.networks)) - for _, value := range ctx.networks { - if !value.deleted { +// NotDeletedNetworks extracts NetInstState which are not marked as deleted +func (ctx *State) NotDeletedNetworks() []*NetInstState { + v := make([]*NetInstState, 0, len(ctx.Networks)) + for _, value := range ctx.Networks { + if !value.Deleted { v = append(v, value) } } return v } -//Volumes extracts volumes states -func (ctx *State) Volumes() []*VolInstState { - v := make([]*VolInstState, 0, len(ctx.volumes)) - for _, value := range ctx.volumes { - if !value.deleted { +// NotDeletedVolumes extracts VolInstState which are not marked as deleted +func (ctx *State) NotDeletedVolumes() []*VolInstState { + v := make([]*VolInstState, 0, len(ctx.Volumes)) + for _, value := range ctx.Volumes { + if !value.Deleted { v = append(v, value) } } return v } -//InfoCallback should be assigned to feed new values from info messages into state +// NodeState returns NodeState +func (ctx *State) NodeState() *NodeState { + return ctx.EveState +} + +// InfoCallback should be assigned to feed new values from info messages into state func (ctx *State) InfoCallback() einfo.HandlerFunc { return func(msg *info.ZInfoMsg) bool { ctx.processVolumesByInfo(msg) ctx.processApplicationsByInfo(msg) ctx.processNetworksByInfo(msg) - if err := ctx.infoAndMetrics.GetInfoProcessingFunction()(msg); err != nil { - log.Fatalf("EVE State GetInfoProcessingFunction error: %s", err) - } + ctx.processNodeStateByInfo(msg) return false } } -//MetricCallback should be assigned to feed new values from metric messages into state +// MetricCallback should be assigned to feed new values from metric messages into state func (ctx *State) MetricCallback() emetric.HandlerFunc { return func(msg *metrics.ZMetricMsg) bool { ctx.processVolumesByMetric(msg) ctx.processApplicationsByMetric(msg) ctx.processNetworksByMetric(msg) - if err := ctx.infoAndMetrics.GetMetricProcessingFunction()(msg); err != nil { - log.Fatalf("EVE State GetMetricProcessingFunction error: %s", err) - } + ctx.processNodeStateByMetric(msg) return false } } diff --git a/pkg/eve/volumes.go b/pkg/eve/volumes.go index 70deafafd..234f596bc 100644 --- a/pkg/eve/volumes.go +++ b/pkg/eve/volumes.go @@ -29,10 +29,10 @@ type VolInstState struct { EveState string LastError string Ref string - contentTreeID string + ContentTreeID string MountPoint string OriginType string - deleted bool + Deleted bool } func volInstStateHeader() string { @@ -51,7 +51,7 @@ func (volInstStateObj *VolInstState) toString() string { } func (ctx *State) initVolumes(ctrl controller.Cloud, dev *device.Ctx) error { - ctx.volumes = make(map[string]*VolInstState) + ctx.Volumes = make(map[string]*VolInstState) for _, el := range dev.GetVolumes() { vi, err := ctrl.GetVolume(el) if err != nil { @@ -95,19 +95,39 @@ func (ctx *State) initVolumes(ctrl controller.Cloud, dev *device.Ctx) error { MaxSize: "-", MountPoint: strings.Join(mountPoint, ";"), Ref: strings.Join(ref, ";"), - contentTreeID: contentTreeID, + ContentTreeID: contentTreeID, OriginType: vi.GetOrigin().GetType().String(), } - ctx.volumes[vi.GetUuid()] = volInstStateObj + ctx.Volumes[vi.GetUuid()] = volInstStateObj } return nil } +func (ctx *State) applyOldStateVolumes(state *State) { + for stateID, stateEL := range state.Volumes { + found := false + for id := range ctx.Volumes { + if id != stateID { + continue + } + ctx.Volumes[id] = stateEL + found = true + } + if !found { + if stateEL.Deleted { + continue + } + stateEL.AdamState = notInControllerConfig + ctx.Volumes[stateID] = stateEL + } + } +} + func (ctx *State) processVolumesByInfo(im *info.ZInfoMsg) { switch im.GetZtype() { case info.ZInfoTypes_ZiVolume: infoObject := im.GetVinfo() - volInstStateObj, ok := ctx.volumes[infoObject.GetUuid()] + volInstStateObj, ok := ctx.Volumes[infoObject.GetUuid()] if !ok { volInstStateObj = &VolInstState{ Name: infoObject.GetDisplayName(), @@ -119,9 +139,9 @@ func (ctx *State) processVolumesByInfo(im *info.ZInfoMsg) { MountPoint: "-", Ref: "-", } - ctx.volumes[infoObject.GetUuid()] = volInstStateObj + ctx.Volumes[infoObject.GetUuid()] = volInstStateObj } - volInstStateObj.deleted = + volInstStateObj.Deleted = infoObject.DisplayName == "" || infoObject.State == info.ZSwState_INVALID if volInstStateObj.VolumeType != config.Format_FmtUnknown.String() && volInstStateObj.VolumeType != config.Format_CONTAINER.String() { @@ -143,8 +163,8 @@ func (ctx *State) processVolumesByInfo(im *info.ZInfoMsg) { } case info.ZInfoTypes_ZiContentTree: infoObject := im.GetCinfo() - for _, el := range ctx.volumes { - if infoObject.Uuid == el.contentTreeID { + for _, el := range ctx.Volumes { + if infoObject.Uuid == el.ContentTreeID { el.EveState = infoObject.GetState().String() if infoObject.GetErr() != nil { el.LastError = infoObject.GetErr().String() @@ -162,7 +182,7 @@ func (ctx *State) processVolumesByInfo(im *info.ZInfoMsg) { func (ctx *State) processVolumesByMetric(msg *metrics.ZMetricMsg) { if volumeMetrics := msg.GetVm(); volumeMetrics != nil { for _, volumeMetric := range volumeMetrics { - volInstStateObj, ok := ctx.volumes[volumeMetric.GetUuid()] + volInstStateObj, ok := ctx.Volumes[volumeMetric.GetUuid()] if ok { volInstStateObj.Size = humanize.Bytes(volumeMetric.GetUsedBytes()) } @@ -175,8 +195,8 @@ func (ctx *State) printVolumeListLines() error { if _, err := fmt.Fprintln(w, volInstStateHeader()); err != nil { return err } - volInstStatesSlice := make([]*VolInstState, 0, len(ctx.Volumes())) - volInstStatesSlice = append(volInstStatesSlice, ctx.Volumes()...) + volInstStatesSlice := make([]*VolInstState, 0, len(ctx.NotDeletedVolumes())) + volInstStatesSlice = append(volInstStatesSlice, ctx.NotDeletedVolumes()...) sort.SliceStable(volInstStatesSlice, func(i, j int) bool { return volInstStatesSlice[i].Name < volInstStatesSlice[j].Name }) @@ -189,7 +209,7 @@ func (ctx *State) printVolumeListLines() error { } func (ctx *State) printVolumeListJSON() error { - result, err := json.MarshalIndent(ctx.Volumes(), "", " ") + result, err := json.MarshalIndent(ctx.NotDeletedVolumes(), "", " ") if err != nil { return err } diff --git a/pkg/openevec/eve.go b/pkg/openevec/eve.go index e063c3b4e..1962c2a73 100644 --- a/pkg/openevec/eve.go +++ b/pkg/openevec/eve.go @@ -16,7 +16,6 @@ import ( "github.com/lf-edge/eden/pkg/eve" "github.com/lf-edge/eden/pkg/utils" sdnapi "github.com/lf-edge/eden/sdn/vm/api" - "github.com/lf-edge/eve/api/go/info" log "github.com/sirupsen/logrus" ) @@ -221,25 +220,17 @@ func (openEVEC *OpenEVEC) VersionEve() error { changer := &adamChanger{} ctrl, dev, err := changer.getControllerAndDevFromConfig(openEVEC.cfg) if err != nil { - log.Debugf("getControllerAndDevFromConfig: %s", err.Error()) - fmt.Println("EVE status: undefined (no onboarded EVE)") - } else { - var lastDInfo *info.ZInfoMsg - var handleInfo = func(im *info.ZInfoMsg) bool { - if im.GetZtype() == info.ZInfoTypes_ZiDevice { - lastDInfo = im - } - return false - } - if err = ctrl.InfoLastCallback(dev.GetID(), map[string]string{"devId": dev.GetID().String()}, handleInfo); err != nil { - return fmt.Errorf("fail in get InfoLastCallback: %w", err) - } - if lastDInfo == nil { - log.Info("no info messages") - } else { - fmt.Println(lastDInfo.GetDinfo().SwList[0].ShortVersion) - } + return fmt.Errorf("getControllerAndDev: %w", err) + } + state := eve.Init(ctrl, dev) + if err := ctrl.InfoLastCallback(dev.GetID(), nil, state.InfoCallback()); err != nil { + return fmt.Errorf("fail in get InfoLastCallback: %w", err) } + if err := ctrl.MetricLastCallback(dev.GetID(), nil, state.MetricCallback()); err != nil { + return fmt.Errorf("fail in get MetricLastCallback: %w", err) + } + //nolint:forbidigo + fmt.Println(state.NodeState().Version) return nil } @@ -291,12 +282,12 @@ func (openEVEC *OpenEVEC) GetEveIP(ifName string) string { log.Error(err) return "" } - for _, nw := range networks { - if nw.LocalName == ifName { - if len(nw.IPAddrs) == 0 { + for ifNameKey, ips := range networks { + if ifNameKey == ifName { + if len(ips) == 0 { return "" } - return nw.IPAddrs[0] + return ips[0] } } return "" @@ -509,7 +500,7 @@ func (openEVEC *OpenEVEC) NewLinkEve(command, eveInterfaceName, vmName string) e return nil } -func (openEVEC *OpenEVEC) getEveNetworkInfo() (networks []*info.ZInfoNetwork, err error) { +func (openEVEC *OpenEVEC) getEveNetworkInfo() (ips map[string][]string, err error) { changer := &adamChanger{} ctrl, dev, err := changer.getControllerAndDevFromConfig(openEVEC.cfg) if err != nil { @@ -522,8 +513,5 @@ func (openEVEC *OpenEVEC) getEveNetworkInfo() (networks []*info.ZInfoNetwork, er if err = ctrl.MetricLastCallback(dev.GetID(), nil, eveState.MetricCallback()); err != nil { return nil, fmt.Errorf("MetricLastCallback failed: %w", err) } - if lastDInfo := eveState.InfoAndMetrics().GetDinfo(); lastDInfo != nil { - networks = append(networks, lastDInfo.Network...) - } - return networks, nil + return eveState.NodeState().RemoteIPs, nil } diff --git a/pkg/openevec/status.go b/pkg/openevec/status.go index b3b322aca..47a485b07 100644 --- a/pkg/openevec/status.go +++ b/pkg/openevec/status.go @@ -143,34 +143,31 @@ func (openEVEC *OpenEVEC) eveStatusRemote() error { if err = ctrl.MetricLastCallback(dev.GetID(), nil, eveState.MetricCallback()); err != nil { return fmt.Errorf("fail in get InfoLastCallback: %w", err) } - if lastDInfo := eveState.InfoAndMetrics().GetDinfo(); lastDInfo != nil { + if eveState.NodeState().LastSeen.Unix() == 0 { + fmt.Printf("%s EVE REMOTE IPs: %s\n", statusWarn(), "waiting for info...") + fmt.Printf("%s EVE memory: %s\n", statusWarn(), "waiting for info...") + } else { var ips []string - for _, nw := range lastDInfo.Network { - ips = append(ips, nw.IPAddrs...) + for _, v := range eveState.NodeState().RemoteIPs { + ips = append(ips, v...) } fmt.Printf("%s EVE REMOTE IPs: %s\n", statusOK(), strings.Join(ips, "; ")) - var lastseen = time.Unix(eveState.InfoAndMetrics().GetLastInfoTime().GetSeconds(), 0) + var lastseen = eveState.NodeState().LastSeen var timenow = time.Now().Unix() fmt.Printf("\tLast info received time: %s\n", lastseen) if (timenow - lastseen.Unix()) > 600 { fmt.Printf("\t EVE MIGHT BE DOWN OR CONNECTIVITY BETWEEN EVE AND ADAM WAS LOST\n") } - } else { - fmt.Printf("%s EVE REMOTE IPs: %s\n", statusWarn(), "waiting for info...") - } - if lastDMetric := eveState.InfoAndMetrics().GetDeviceMetrics(); lastDMetric != nil { status := statusOK() - if lastDMetric.Memory.GetUsedPercentage() >= 70 { + if eveState.NodeState().UsedPercentageMem >= 70 { status = statusWarn() } - if lastDMetric.Memory.GetUsedPercentage() >= 90 { + if eveState.NodeState().UsedPercentageMem >= 90 { status = statusBad() } fmt.Printf("%s EVE memory: %s/%s\n", status, - humanize.Bytes((uint64)(lastDMetric.Memory.GetUsedMem()*humanize.MByte)), - humanize.Bytes((uint64)(lastDMetric.Memory.GetAvailMem()*humanize.MByte))) - } else { - fmt.Printf("%s EVE memory: %s\n", statusWarn(), "waiting for info...") + humanize.Bytes((uint64)(eveState.NodeState().UsedMem*humanize.MByte)), + humanize.Bytes((uint64)(eveState.NodeState().AvailMem*humanize.MByte))) } return nil } diff --git a/pkg/projects/state.go b/pkg/projects/state.go index 3854bab1d..3f7f19b38 100644 --- a/pkg/projects/state.go +++ b/pkg/projects/state.go @@ -3,9 +3,11 @@ package projects import ( "reflect" + "github.com/lf-edge/eden/pkg/controller" "github.com/lf-edge/eden/pkg/controller/einfo" "github.com/lf-edge/eden/pkg/controller/emetric" "github.com/lf-edge/eden/pkg/device" + "github.com/lf-edge/eden/pkg/eve" "github.com/lf-edge/eden/pkg/utils" "github.com/lf-edge/eve/api/go/info" "github.com/lf-edge/eve/api/go/metrics" @@ -32,17 +34,19 @@ type infoState struct { type State struct { device *device.Ctx deviceInfo *infoState + eveState *eve.State } // InitState init State object for device -func InitState(device *device.Ctx) *State { - return &State{device: device, deviceInfo: &infoState{}} +func InitState(cloud controller.Cloud, device *device.Ctx) *State { + return &State{device: device, deviceInfo: &infoState{}, eveState: eve.Init(cloud, device)} } func (state *State) processInfo(infoMsg *info.ZInfoMsg) error { if infoMsg.DevId != state.device.GetID().String() { return nil } + state.eveState.InfoCallback()(infoMsg) state.deviceInfo.LastInfoMessageTime = infoMsg.AtTimeStamp switch infoMsg.GetZtype() { case info.ZInfoTypes_ZiDevice: @@ -118,6 +122,7 @@ func (state *State) processMetric(metricMsg *metrics.ZMetricMsg) error { if metricMsg.DevID != state.device.GetID().String() { return nil } + state.eveState.MetricCallback()(metricMsg) state.deviceInfo.AppMetrics = metricMsg.GetAm() state.deviceInfo.NetworkInstanceMetrics = metricMsg.GetNm() state.deviceInfo.VolumeMetrics = metricMsg.GetVm() @@ -225,3 +230,8 @@ func (state *State) CheckReady() bool { } return true } + +// GetEVEState returns state of edge node +func (state *State) GetEVEState() *eve.State { + return state.eveState +} diff --git a/pkg/projects/testContext.go b/pkg/projects/testContext.go index 9b9281c20..289f7cc14 100644 --- a/pkg/projects/testContext.go +++ b/pkg/projects/testContext.go @@ -340,7 +340,7 @@ func (tc *TestContext) AddProcTimer(edgeNode *device.Ctx, processFunction ProcTi func (tc *TestContext) StartTrackingState(onlyNewElements bool) { tc.states = map[*device.Ctx]*State{} for _, dev := range tc.nodes { - curState := InitState(dev) + curState := InitState(tc.cloud, dev) tc.states[dev] = curState if !onlyNewElements { //process all events from controller diff --git a/tests/app/app_test.go b/tests/app/app_test.go index 69feb32a1..c0289fc24 100644 --- a/tests/app/app_test.go +++ b/tests/app/app_test.go @@ -9,6 +9,7 @@ import ( "github.com/lf-edge/eden/pkg/controller/eapps" "github.com/lf-edge/eden/pkg/controller/types" + "github.com/lf-edge/eden/pkg/device" "github.com/lf-edge/eden/pkg/eve" "github.com/lf-edge/eden/pkg/projects" "github.com/lf-edge/eden/pkg/tests" @@ -25,10 +26,9 @@ type appState struct { // This test wait for the app's state with a timewait. var ( timewait = flag.Duration("timewait", 10*time.Minute, "Timewait for items waiting") - newitems = flag.Bool("check-new", false, "Check only new info messages") + _ = flag.Bool("check-new", false, "Check only new info messages") tc *projects.TestContext states map[string][]appState - eveState *eve.State ) // TestMain is used to provide setup and teardown for the rest of the @@ -48,9 +48,7 @@ func TestMain(m *testing.M) { tc.AddEdgeNodesFromDescription() - eveState = eve.Init(tc.GetController(), tc.GetEdgeNode()) - - tc.StartTrackingState(true) + tc.StartTrackingState(false) res := m.Run() @@ -83,11 +81,11 @@ func checkState(eveState *eve.State, state string, appNames []string) error { out := "\n" if state == "-" { foundAny := false - if eveState.InfoAndMetrics().GetDinfo() == nil { + if !eveState.Prepared() { //we need to wait for info return nil } - for _, app := range eveState.Applications() { + for _, app := range eveState.NotDeletedApplications() { if _, inSlice := utils.FindEleInSlice(appNames, app.Name); inSlice { checkAndAppendState(app.Name, app.EVEState) foundAny = true @@ -103,7 +101,7 @@ func checkState(eveState *eve.State, state string, appNames []string) error { } return fmt.Errorf(out) } - for _, app := range eveState.Applications() { + for _, app := range eveState.NotDeletedApplications() { if _, inSlice := utils.FindEleInSlice(appNames, app.Name); inSlice { checkAndAppendState(app.Name, app.EVEState) } @@ -124,10 +122,9 @@ func checkState(eveState *eve.State, state string, appNames []string) error { } // checkApp wait for info of ZInfoApp type with state -func checkApp(state string, appNames []string) projects.ProcInfoFunc { +func checkApp(edgeNode *device.Ctx, state string, appNames []string) projects.ProcInfoFunc { return func(msg *info.ZInfoMsg) error { - eveState.InfoCallback()(msg) //feed state with new info - return checkState(eveState, state, appNames) + return checkState(tc.GetState(edgeNode).GetEVEState(), state, appNames) } } @@ -156,16 +153,9 @@ func TestAppStatus(t *testing.T) { timestamp: time.Now()}} } - if !*newitems { - // observe existing info object and feed them into eveState object - if err := tc.GetController().InfoLastCallback(edgeNode.GetID(), nil, eveState.InfoCallback()); err != nil { - t.Fatal(err) - } - } - - if ready := checkState(eveState, state, apps); ready == nil { + if ready := checkState(tc.GetState(edgeNode).GetEVEState(), state, apps); ready == nil { - tc.AddProcInfo(edgeNode, checkApp(state, apps)) + tc.AddProcInfo(edgeNode, checkApp(edgeNode, state, apps)) callback := func() { t.Errorf("ASSERTION FAILED (%s): expected apps %s in %s state", time.Now().Format(time.RFC3339Nano), apps, state) @@ -177,7 +167,7 @@ func TestAppStatus(t *testing.T) { t.Errorf("\t\tstate: %s received in: %s", st.state, st.timestamp.Format(time.RFC3339Nano)) } } - for _, app := range eveState.Applications() { + for _, app := range tc.GetState(edgeNode).GetEVEState().NotDeletedApplications() { if app.Name == k { appID, err := uuid.FromString(app.UUID) if err != nil { diff --git a/tests/network/nw_test.go b/tests/network/nw_test.go index 47a8a36a7..92dcb6876 100644 --- a/tests/network/nw_test.go +++ b/tests/network/nw_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/lf-edge/eden/pkg/device" "github.com/lf-edge/eden/pkg/eve" "github.com/lf-edge/eden/pkg/projects" "github.com/lf-edge/eden/pkg/utils" @@ -21,10 +22,9 @@ type nwState struct { // This test wait for the network's state with a timewait. var ( timewait = flag.Duration("timewait", time.Minute, "Timewait for items waiting") - newitems = flag.Bool("check-new", false, "Check only new info messages") + _ = flag.Bool("check-new", false, "Check only new info messages") tc *projects.TestContext states map[string][]nwState - eveState *eve.State ) // TestMain is used to provide setup and teardown for the rest of the @@ -42,9 +42,7 @@ func TestMain(m *testing.M) { tc.AddEdgeNodesFromDescription() - eveState = eve.Init(tc.GetController(), tc.GetEdgeNode()) - - tc.StartTrackingState(true) + tc.StartTrackingState(false) res := m.Run() @@ -74,7 +72,7 @@ func checkState(eveState *eve.State, state string, netNames []string) error { out := "\n" if state == "-" { foundAny := false - for _, net := range eveState.Networks() { + for _, net := range eveState.NotDeletedNetworks() { if _, inSlice := utils.FindEleInSlice(netNames, net.Name); inSlice { checkAndAppendState(net.Name, net.EveState) foundAny = true @@ -90,7 +88,7 @@ func checkState(eveState *eve.State, state string, netNames []string) error { } return fmt.Errorf(out) } - for _, net := range eveState.Networks() { + for _, net := range eveState.NotDeletedNetworks() { if _, inSlice := utils.FindEleInSlice(netNames, net.Name); inSlice { checkAndAppendState(net.Name, net.EveState) } @@ -111,10 +109,9 @@ func checkState(eveState *eve.State, state string, netNames []string) error { } // checkNet wait for info of ZInfoApp type with state -func checkNet(state string, volNames []string) projects.ProcInfoFunc { - return func(msg *info.ZInfoMsg) error { - eveState.InfoCallback()(msg) //feed state with new info - return checkState(eveState, state, volNames) +func checkNet(edgeNode *device.Ctx, state string, volNames []string) projects.ProcInfoFunc { + return func(_ *info.ZInfoMsg) error { + return checkState(tc.GetState(edgeNode).GetEVEState(), state, volNames) } } @@ -141,17 +138,10 @@ func TestNetworkStatus(t *testing.T) { states[el] = []nwState{{state: "no info from controller", timestamp: time.Now()}} } - if !*newitems { - // observe existing info object and feed them into eveState object - if err := tc.GetController().InfoLastCallback(edgeNode.GetID(), nil, eveState.InfoCallback()); err != nil { - t.Fatal(err) - } - } - // we are done if our eveState object is in required state - if ready := checkState(eveState, state, nws); ready == nil { + if ready := checkState(tc.GetState(edgeNode).GetEVEState(), state, nws); ready == nil { - tc.AddProcInfo(edgeNode, checkNet(state, nws)) + tc.AddProcInfo(edgeNode, checkNet(edgeNode, state, nws)) callback := func() { t.Errorf("ASSERTION FAILED (%s): expected networks %s in %s state", time.Now().Format(time.RFC3339Nano), nws, state) diff --git a/tests/reboot/reboot_test.go b/tests/reboot/reboot_test.go index 419bcdd6d..9b9f1cd9b 100644 --- a/tests/reboot/reboot_test.go +++ b/tests/reboot/reboot_test.go @@ -13,8 +13,6 @@ import ( "github.com/lf-edge/eden/pkg/tests" "github.com/lf-edge/eden/pkg/utils" "github.com/lf-edge/eve/api/go/info" - "google.golang.org/protobuf/proto" - "google.golang.org/protobuf/types/known/timestamppb" ) // This context holds all the configuration items in the same @@ -40,25 +38,23 @@ var ( tc *projects.TestContext - lastRebootTime *timestamppb.Timestamp + lastRebootTime time.Time ) func checkReboot(t *testing.T, edgeNode *device.Ctx) projects.ProcInfoFunc { - return func(im *info.ZInfoMsg) error { - if im.GetZtype() != info.ZInfoTypes_ZiDevice { - return nil - } - currentLastRebootTime := im.GetDinfo().LastRebootTime - if !proto.Equal(lastRebootTime, currentLastRebootTime) { - if im.GetDinfo().LastRebootReason == "" && - currentLastRebootTime.AsTime().Unix() == 0 { + return func(_ *info.ZInfoMsg) error { + eveState := tc.GetState(edgeNode).GetEVEState() + currentLastRebootTime := eveState.NodeState().LastRebootTime + if !lastRebootTime.Equal(currentLastRebootTime) { + if eveState.NodeState().LastRebootReason == "" && + currentLastRebootTime.Unix() == 0 { // device may not fill the info return nil } lastRebootTime = currentLastRebootTime - fmt.Printf("rebooted with reason %s at %s/n", im.GetDinfo().LastRebootReason, lastRebootTime.AsTime()) - if !strings.Contains(im.GetDinfo().LastRebootReason, "NORMAL") { - err := fmt.Errorf("abnormal reboot: %s", im.GetDinfo().LastRebootReason) + fmt.Printf("rebooted with reason %s at %s/n", eveState.NodeState().LastRebootReason, lastRebootTime) + if !strings.Contains(eveState.NodeState().LastRebootReason, "NORMAL") { + err := fmt.Errorf("abnormal reboot: %s", eveState.NodeState().LastRebootReason) if *reboot { //if we use this test to do reboot, abnormal one must errored the test t.Fatal(err) @@ -167,11 +163,13 @@ func TestReboot(t *testing.T) { t.Log(utils.AddTimestamp(fmt.Sprintf("reboot: %t", *reboot))) t.Log(utils.AddTimestamp(fmt.Sprintf("count: %d", *count))) - lastRebootTime = tc.GetState(edgeNode).GetDinfo().LastRebootTime + nodeState := tc.GetState(edgeNode).GetEVEState().NodeState() + + t.Log(utils.AddTimestamp(fmt.Sprintf("LastRebootTime: %s", nodeState.LastRebootTime))) - t.Log(utils.AddTimestamp(fmt.Sprintf("LastRebootTime: %s", lastRebootTime.AsTime()))) + t.Log(utils.AddTimestamp(fmt.Sprintf("LastRebootReason: %s", nodeState.LastRebootReason))) - t.Log(utils.AddTimestamp(fmt.Sprintf("LastRebootReason: %s", tc.GetState(edgeNode).GetDinfo().LastRebootReason))) + lastRebootTime = nodeState.LastRebootTime tc.AddProcInfo(edgeNode, checkReboot(t, edgeNode)) diff --git a/tests/volume/vol_test.go b/tests/volume/vol_test.go index 34dfa3c4f..4a80525ce 100644 --- a/tests/volume/vol_test.go +++ b/tests/volume/vol_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/lf-edge/eden/pkg/device" "github.com/lf-edge/eden/pkg/eve" "github.com/lf-edge/eden/pkg/projects" "github.com/lf-edge/eden/pkg/utils" @@ -21,10 +22,9 @@ type volState struct { // This test wait for the volume's state with a timewait. var ( timewait = flag.Duration("timewait", time.Minute, "Timewait for items waiting") - newitems = flag.Bool("check-new", false, "Check only new info messages") + _ = flag.Bool("check-new", false, "Check only new info messages") tc *projects.TestContext states map[string][]volState - eveState *eve.State ) // TestMain is used to provide setup and teardown for the rest of the @@ -42,8 +42,6 @@ func TestMain(m *testing.M) { tc.AddEdgeNodesFromDescription() - eveState = eve.Init(tc.GetController(), tc.GetEdgeNode()) - tc.StartTrackingState(false) res := m.Run() @@ -74,11 +72,11 @@ func checkState(eveState *eve.State, state string, volNames []string) error { out := "\n" if state == "-" { foundAny := false - if eveState.InfoAndMetrics().GetDinfo() == nil { + if !eveState.Prepared() { //we need to wait for info return nil } - for _, vol := range eveState.Volumes() { + for _, vol := range eveState.NotDeletedVolumes() { if _, inSlice := utils.FindEleInSlice(volNames, vol.Name); inSlice { checkAndAppendState(vol.Name, vol.EveState) foundAny = true @@ -94,7 +92,7 @@ func checkState(eveState *eve.State, state string, volNames []string) error { } return fmt.Errorf(out) } - for _, vol := range eveState.Volumes() { + for _, vol := range eveState.NotDeletedVolumes() { if _, inSlice := utils.FindEleInSlice(volNames, vol.Name); inSlice { checkAndAppendState(vol.Name, vol.EveState) } @@ -115,10 +113,9 @@ func checkState(eveState *eve.State, state string, volNames []string) error { } // checkVol wait for info of ZInfoApp type with state -func checkVol(state string, volNames []string) projects.ProcInfoFunc { - return func(msg *info.ZInfoMsg) error { - eveState.InfoCallback()(msg) //feed state with new info - return checkState(eveState, state, volNames) +func checkVol(edgeNode *device.Ctx, state string, volNames []string) projects.ProcInfoFunc { + return func(_ *info.ZInfoMsg) error { + return checkState(tc.GetState(edgeNode).GetEVEState(), state, volNames) } } @@ -148,17 +145,10 @@ func TestVolStatus(t *testing.T) { states[el] = []volState{{state: "no info from controller", timestamp: time.Now()}} } - if !*newitems { - // observe existing info object and feed them into eveState object - if err := tc.GetController().InfoLastCallback(edgeNode.GetID(), nil, eveState.InfoCallback()); err != nil { - t.Fatal(err) - } - } - // we are done if our eveState object is in required state - if ready := checkState(eveState, state, vols); ready == nil { + if ready := checkState(tc.GetState(edgeNode).GetEVEState(), state, vols); ready == nil { - tc.AddProcInfo(edgeNode, checkVol(state, vols)) + tc.AddProcInfo(edgeNode, checkVol(edgeNode, state, vols)) callback := func() { t.Errorf("ASSERTION FAILED (%s): expected volumes %s in %s state", time.Now().Format(time.RFC3339Nano), vols, state) From c343cb5b34deb43570fe22c16af54e03cdecddf4 Mon Sep 17 00:00:00 2001 From: Petr Fedchenkov Date: Fri, 29 Sep 2023 11:08:44 +0300 Subject: [PATCH 3/4] Implement prune command In case of state store and load functions implemented we may prune objects in Adam to keep the logic to be as fast as possible. Signed-off-by: Petr Fedchenkov --- cmd/edenClean.go | 20 ++++++++++- cmd/root.go | 1 + pkg/controller/adam/adam.go | 65 ++++++++++++++++++++++++++++++++++++ pkg/controller/controller.go | 7 +++- pkg/openevec/eden.go | 45 +++++++++++++++++++++++++ 5 files changed, 136 insertions(+), 2 deletions(-) diff --git a/cmd/edenClean.go b/cmd/edenClean.go index 8e6d624f2..67db3dbc7 100644 --- a/cmd/edenClean.go +++ b/cmd/edenClean.go @@ -23,7 +23,7 @@ func newCleanCmd(configName, verbosity *string) *cobra.Command { PersistentPreRunE: preRunViperLoadFunction(cfg, configName, verbosity), Run: func(cmd *cobra.Command, args []string) { if err := openEVEC.EdenClean(*configName, configDist, vmName, currentContext); err != nil { - log.Fatalf("Setup eden failed: %s", err) + log.Fatalf("Clean eden failed: %s", err) } }, } @@ -54,3 +54,21 @@ func newCleanCmd(configName, verbosity *string) *cobra.Command { return cleanCmd } + +func newPruneCmd(configName, verbosity *string) *cobra.Command { + cfg := &openevec.EdenSetupArgs{} + + var pruneCmd = &cobra.Command{ + Use: "prune", + Short: "prune stored objects from the controller. Please save them before.", + Long: `Prune stored objects from the controller. Please save them before.`, + PersistentPreRunE: preRunViperLoadFunction(cfg, configName, verbosity), + Run: func(cmd *cobra.Command, args []string) { + if err := openEVEC.EdenPrune(); err != nil { + log.Fatalf("Prune eden failed: %s", err) + } + }, + } + + return pruneCmd +} diff --git a/cmd/root.go b/cmd/root.go index 864735a10..f575635e4 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -36,6 +36,7 @@ func NewEdenCommand() *cobra.Command { newCleanCmd(&configName, &verbosity), newConfigCmd(&configName, &verbosity), newSdnCmd(&configName, &verbosity), + newPruneCmd(&configName, &verbosity), }, }, { diff --git a/pkg/controller/adam/adam.go b/pkg/controller/adam/adam.go index 5f49b8eae..12147e65f 100644 --- a/pkg/controller/adam/adam.go +++ b/pkg/controller/adam/adam.go @@ -1,6 +1,7 @@ package adam import ( + "context" "encoding/json" "fmt" "net/url" @@ -10,6 +11,7 @@ import ( "strings" "time" + "github.com/go-redis/redis/v9" "github.com/lf-edge/eden/pkg/controller/cachers" "github.com/lf-edge/eden/pkg/controller/eapps" "github.com/lf-edge/eden/pkg/controller/eflowlog" @@ -405,3 +407,66 @@ func (adam *Ctx) GetGlobalOptions() (*types.GlobalOptions, error) { } return &globalOptions, nil } + +func (adam *Ctx) cleanRedisStream(stream string) error { + addr, password, databaseID, err := parseRedisURL(adam.AdamRedisURLEden) + if err != nil { + return err + } + client := redis.NewClient(&redis.Options{ + Addr: addr, + Password: password, + DB: databaseID, + MaxRetries: defaults.DefaultRepeatCount, + MinRetryBackoff: defaults.DefaultRepeatTimeout / 2, + MaxRetryBackoff: defaults.DefaultRepeatTimeout * 2, + }) + n, err := client.XTrimMaxLenApprox(context.TODO(), stream, 0, 0).Result() + log.Debugf("XTrimMaxLenApprox(%s): %d", stream, n) + return err +} + +// CleanInfo removes all info messages of device from controller +func (adam *Ctx) CleanInfo(devUUID uuid.UUID) (err error) { + if adam.AdamRemoteRedis { + stream := adam.getInfoRedisStream(devUUID) + return adam.cleanRedisStream(stream) + } + panic("implement me") +} + +// CleanMetrics removes all metric messages of device from controller +func (adam *Ctx) CleanMetrics(devUUID uuid.UUID) (err error) { + if adam.AdamRemoteRedis { + stream := adam.getMetricsRedisStream(devUUID) + return adam.cleanRedisStream(stream) + } + panic("implement me") +} + +// CleanLogs removes all logs messages of device from controller +func (adam *Ctx) CleanLogs(devUUID uuid.UUID) (err error) { + if adam.AdamRemoteRedis { + stream := adam.getLogsRedisStream(devUUID) + return adam.cleanRedisStream(stream) + } + panic("implement me") +} + +// CleanFlowLogs removes all flow logs messages of device from controller +func (adam *Ctx) CleanFlowLogs(devUUID uuid.UUID) (err error) { + if adam.AdamRemoteRedis { + stream := adam.getFlowLogRedisStream(devUUID) + return adam.cleanRedisStream(stream) + } + panic("implement me") +} + +// CleanAppLogs removes all app logs messages of app of device from controller +func (adam *Ctx) CleanAppLogs(devUUID uuid.UUID, appUUID uuid.UUID) (err error) { + if adam.AdamRemoteRedis { + stream := adam.getAppsLogsRedisStream(devUUID, appUUID) + return adam.cleanRedisStream(stream) + } + panic("implement me") +} diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 404ed6dbd..0c1ff8606 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -15,7 +15,7 @@ import ( uuid "github.com/satori/go.uuid" ) -//Controller is an interface of controller +// Controller is an interface of controller type Controller interface { CertsGet(devUUID uuid.UUID) (out string, err error) ConfigGet(devUUID uuid.UUID) (out string, err error) @@ -28,6 +28,11 @@ type Controller interface { FlowLogLastCallback(devUUID uuid.UUID, q map[string]string, handler eflowlog.HandlerFunc) (err error) InfoChecker(devUUID uuid.UUID, q map[string]string, handler einfo.HandlerFunc, mode einfo.InfoCheckerMode, timeout time.Duration) (err error) InfoLastCallback(devUUID uuid.UUID, q map[string]string, handler einfo.HandlerFunc) (err error) + CleanInfo(devUUID uuid.UUID) (err error) + CleanMetrics(devUUID uuid.UUID) (err error) + CleanLogs(devUUID uuid.UUID) (err error) + CleanFlowLogs(devUUID uuid.UUID) (err error) + CleanAppLogs(devUUID uuid.UUID, appUUID uuid.UUID) (err error) MetricChecker(devUUID uuid.UUID, q map[string]string, handler emetric.HandlerFunc, mode emetric.MetricCheckerMode, timeout time.Duration) (err error) MetricLastCallback(devUUID uuid.UUID, q map[string]string, handler emetric.HandlerFunc) (err error) RequestLastCallback(devUUID uuid.UUID, q map[string]string, handler erequest.HandlerFunc) (err error) diff --git a/pkg/openevec/eden.go b/pkg/openevec/eden.go index d42dcc26f..9fb9e8189 100644 --- a/pkg/openevec/eden.go +++ b/pkg/openevec/eden.go @@ -23,6 +23,7 @@ import ( "github.com/lf-edge/eden/pkg/controller/types" "github.com/lf-edge/eden/pkg/defaults" "github.com/lf-edge/eden/pkg/eden" + "github.com/lf-edge/eden/pkg/eve" "github.com/lf-edge/eden/pkg/models" "github.com/lf-edge/eden/pkg/utils" "github.com/lf-edge/eve/api/go/flowlog" @@ -827,6 +828,50 @@ func (openEVEC *OpenEVEC) EdenImport(tarFile string, rewriteRoot bool) error { return nil } +// EdenPrune removes data from the controller +// +//nolint:cyclop +func (openEVEC *OpenEVEC) EdenPrune() error { + changer := &adamChanger{} + ctrl, dev, err := changer.getControllerAndDevFromConfig(openEVEC.cfg) + if err != nil { + return fmt.Errorf("getControllerAndDevFromConfig: %w", err) + } + state := eve.Init(ctrl, dev) + if err := ctrl.InfoLastCallback(dev.GetID(), nil, state.InfoCallback()); err != nil { + return fmt.Errorf("fail in get InfoLastCallback: %w", err) + } + if err := ctrl.MetricLastCallback(dev.GetID(), nil, state.MetricCallback()); err != nil { + return fmt.Errorf("fail in get MetricLastCallback: %w", err) + } + err = state.Store() + if err != nil { + return fmt.Errorf("state.Store: %w", err) + } + if err := ctrl.CleanInfo(dev.GetID()); err != nil { + return fmt.Errorf("fail in ctrl CleanInfo: %w", err) + } + if err := ctrl.CleanMetrics(dev.GetID()); err != nil { + return fmt.Errorf("fail in ctrl CleanMetrics: %w", err) + } + if err := ctrl.CleanLogs(dev.GetID()); err != nil { + return fmt.Errorf("fail in ctrl CleanLogs: %w", err) + } + if err := ctrl.CleanFlowLogs(dev.GetID()); err != nil { + return fmt.Errorf("fail in ctrl CleanFlowLogs: %w", err) + } + for _, el := range dev.GetApplicationInstances() { + appUUID, err := uuid.FromString(el) + if err != nil { + return err + } + if err := ctrl.CleanAppLogs(dev.GetID(), appUUID); err != nil { + return fmt.Errorf("fail in ctrl CleanAppLogs: %w", err) + } + } + return nil +} + // ParseTemplateFile fills EdenSetupArgs variable into template stored in file and writes result to io.Writer func ParseTemplateFile(path string, cfg EdenSetupArgs, w io.Writer) error { t, err := ioutil.ReadFile(path) From 4130e0a4be1e8f73d5c1da76f77728ebafae4537 Mon Sep 17 00:00:00 2001 From: Petr Fedchenkov Date: Fri, 20 Oct 2023 20:09:31 +0300 Subject: [PATCH 4/4] Check new state implementation We want to use check-new option to check options after transition, thus the next state after initial Signed-off-by: Petr Fedchenkov --- tests/app/app_test.go | 17 ++++++++++++++++- tests/network/nw_test.go | 17 ++++++++++++++++- tests/volume/vol_test.go | 17 ++++++++++++++++- 3 files changed, 48 insertions(+), 3 deletions(-) diff --git a/tests/app/app_test.go b/tests/app/app_test.go index c0289fc24..5b35a6575 100644 --- a/tests/app/app_test.go +++ b/tests/app/app_test.go @@ -26,9 +26,11 @@ type appState struct { // This test wait for the app's state with a timewait. var ( timewait = flag.Duration("timewait", 10*time.Minute, "Timewait for items waiting") - _ = flag.Bool("check-new", false, "Check only new info messages") + checkNew = flag.Bool("check-new", false, "Check for the new state after state transition") tc *projects.TestContext states map[string][]appState + + lastRebootTime time.Time ) // TestMain is used to provide setup and teardown for the rest of the @@ -109,6 +111,17 @@ func checkState(eveState *eve.State, state string, appNames []string) error { if len(states) == len(appNames) { for _, appName := range appNames { if !checkNewLastState(appName, state) { + currentLastRebootTime := eveState.NodeState().LastRebootTime + // if we rebooted we may miss state transition + if *checkNew && !currentLastRebootTime.After(lastRebootTime) { + // first one is no info from controller + // the second is initial state + // we want to wait for the third or later, thus new state + if len(states[appName]) <= 2 { + fmt.Println(utils.AddTimestamp(fmt.Sprintf("\tappName %s wait for new state", appName))) + return nil + } + } out += utils.AddTimestamp(fmt.Sprintf( "app %s state %s\n", appName, state)) @@ -133,6 +146,8 @@ func checkApp(edgeNode *device.Ctx, state string, appNames []string) projects.Pr func TestAppStatus(t *testing.T) { edgeNode := tc.GetEdgeNode(tc.WithTest(t)) + lastRebootTime = tc.GetState(edgeNode).GetEVEState().NodeState().LastRebootTime + args := flag.Args() if len(args) == 0 { t.Fatalf("Usage: %s [options] state app_name...\n", os.Args[0]) diff --git a/tests/network/nw_test.go b/tests/network/nw_test.go index 92dcb6876..fc199b00a 100644 --- a/tests/network/nw_test.go +++ b/tests/network/nw_test.go @@ -22,9 +22,11 @@ type nwState struct { // This test wait for the network's state with a timewait. var ( timewait = flag.Duration("timewait", time.Minute, "Timewait for items waiting") - _ = flag.Bool("check-new", false, "Check only new info messages") + checkNew = flag.Bool("check-new", false, "Check for the new state after state transition") tc *projects.TestContext states map[string][]nwState + + lastRebootTime time.Time ) // TestMain is used to provide setup and teardown for the rest of the @@ -96,6 +98,17 @@ func checkState(eveState *eve.State, state string, netNames []string) error { if len(states) == len(netNames) { for _, netName := range netNames { if !checkNewLastState(netName, state) { + currentLastRebootTime := eveState.NodeState().LastRebootTime + // if we rebooted we may miss state transition + if *checkNew && !currentLastRebootTime.After(lastRebootTime) { + // first one is no info from controller + // the second is initial state + // we want to wait for the third or later, thus new state + if len(states[netName]) <= 2 { + fmt.Println(utils.AddTimestamp(fmt.Sprintf("\tnetName %s wait for new state", netName))) + return nil + } + } out += fmt.Sprintf( "network %s state %s\n", netName, state) @@ -120,6 +133,8 @@ func checkNet(edgeNode *device.Ctx, state string, volNames []string) projects.Pr func TestNetworkStatus(t *testing.T) { edgeNode := tc.GetEdgeNode(tc.WithTest(t)) + lastRebootTime = tc.GetState(edgeNode).GetEVEState().NodeState().LastRebootTime + args := flag.Args() if len(args) == 0 { t.Fatalf("Usage: %s [options] state vol_name...\n", os.Args[0]) diff --git a/tests/volume/vol_test.go b/tests/volume/vol_test.go index 4a80525ce..8086d24a4 100644 --- a/tests/volume/vol_test.go +++ b/tests/volume/vol_test.go @@ -22,9 +22,11 @@ type volState struct { // This test wait for the volume's state with a timewait. var ( timewait = flag.Duration("timewait", time.Minute, "Timewait for items waiting") - _ = flag.Bool("check-new", false, "Check only new info messages") + checkNew = flag.Bool("check-new", false, "Check for the new state after state transition") tc *projects.TestContext states map[string][]volState + + lastRebootTime time.Time ) // TestMain is used to provide setup and teardown for the rest of the @@ -100,6 +102,17 @@ func checkState(eveState *eve.State, state string, volNames []string) error { if len(states) == len(volNames) { for _, volName := range volNames { if !checkNewLastState(volName, state) { + currentLastRebootTime := eveState.NodeState().LastRebootTime + // if we rebooted we may miss state transition + if *checkNew && !currentLastRebootTime.After(lastRebootTime) { + // first one is no info from controller + // the second is initial state + // we want to wait for the third or later, thus new state + if len(states[volName]) <= 2 { + fmt.Println(utils.AddTimestamp(fmt.Sprintf("\tvolName %s wait for new state", volName))) + return nil + } + } out += fmt.Sprintf( "volume %s state %s\n", volName, state) @@ -124,6 +137,8 @@ func checkVol(edgeNode *device.Ctx, state string, volNames []string) projects.Pr func TestVolStatus(t *testing.T) { edgeNode := tc.GetEdgeNode(tc.WithTest(t)) + lastRebootTime = tc.GetState(edgeNode).GetEVEState().NodeState().LastRebootTime + args := flag.Args() if len(args) == 0 { t.Fatalf("Usage: %s [options] state vol_name...\n", os.Args[0])