From 724616af3c93ba3c192bcfbb9329af6ea8f542b2 Mon Sep 17 00:00:00 2001 From: Seokho Son Date: Thu, 12 Oct 2023 23:00:44 +0900 Subject: [PATCH] Apply channel for GoRoutine in MC-infra control --- src/api/rest/server/mcis/control.go | 2 +- src/core/mcis/control.go | 310 +++++++--------------------- src/core/mcis/manageInfo.go | 18 +- 3 files changed, 87 insertions(+), 243 deletions(-) diff --git a/src/api/rest/server/mcis/control.go b/src/api/rest/server/mcis/control.go index a075205e6..0000934fb 100644 --- a/src/api/rest/server/mcis/control.go +++ b/src/api/rest/server/mcis/control.go @@ -87,7 +87,7 @@ func RestGetControlMcisVm(c echo.Context) error { if action == "suspend" || action == "resume" || action == "reboot" || action == "terminate" { - result, err := mcis.CoreGetMcisVmAction(nsId, mcisId, vmId, action) + result, err := mcis.HandleMcisVmAction(nsId, mcisId, vmId, action) if err != nil { mapA := map[string]string{"message": err.Error()} return c.JSON(http.StatusInternalServerError, &mapA) diff --git a/src/core/mcis/control.go b/src/core/mcis/control.go index f51ffd456..a1e14ce55 100644 --- a/src/core/mcis/control.go +++ b/src/core/mcis/control.go @@ -19,22 +19,21 @@ import ( "encoding/json" "fmt" - "io/ioutil" //"log" - "strconv" + "strings" "time" //csv file handling // REST API (echo) - "net/http" "sync" "github.com/cloud-barista/cb-tumblebug/src/core/common" "github.com/cloud-barista/cb-tumblebug/src/core/mcir" + "github.com/go-resty/resty/v2" ) // MCIS Control @@ -164,8 +163,8 @@ func HandleMcisAction(nsId string, mcisId string, action string, force bool) (st } } -// CoreGetMcisVmAction is func to Get McisVm Action -func CoreGetMcisVmAction(nsId string, mcisId string, vmId string, action string) (string, error) { +// HandleMcisVmAction is func to Get McisVm Action +func HandleMcisVmAction(nsId string, mcisId string, vmId string, action string) (string, error) { err := common.CheckString(nsId) if err != nil { @@ -191,29 +190,29 @@ func CoreGetMcisVmAction(nsId string, mcisId string, vmId string, action string) return err.Error(), err } - fmt.Println("[Get VM requested action: " + action) - if action == "suspend" { - fmt.Println("[suspend VM]") - ControlVm(nsId, mcisId, vmId, ActionSuspend) - return "Suspending the VM", nil - - } else if action == "resume" { - fmt.Println("[resume VM]") - ControlVm(nsId, mcisId, vmId, ActionResume) - return "Resuming the VM", nil - - } else if action == "reboot" { - fmt.Println("[reboot VM]") - ControlVm(nsId, mcisId, vmId, ActionReboot) - return "Rebooting the VM", nil - - } else if action == "terminate" { - fmt.Println("[terminate VM]") - ControlVm(nsId, mcisId, vmId, ActionTerminate) - return "Terminated the VM", nil + fmt.Println("[VM action: " + action) + var wg sync.WaitGroup + results := make(chan ControlVmResult, 1) + wg.Add(1) + if strings.EqualFold(action, ActionSuspend) { + go ControlVmAsync(&wg, nsId, mcisId, vmId, ActionSuspend, results) + } else if strings.EqualFold(action, ActionResume) { + go ControlVmAsync(&wg, nsId, mcisId, vmId, ActionResume, results) + } else if strings.EqualFold(action, ActionReboot) { + go ControlVmAsync(&wg, nsId, mcisId, vmId, ActionReboot, results) + } else if strings.EqualFold(action, ActionTerminate) { + go ControlVmAsync(&wg, nsId, mcisId, vmId, ActionTerminate, results) } else { - return "", fmt.Errorf(action + " not supported") + close(results) + wg.Done() + return "", fmt.Errorf("not supported action: " + action) + } + checkErr := <-results + if checkErr.Error != nil { + return checkErr.Error.Error(), checkErr.Error } + close(results) + return "Working on " + action, nil } // ControlMcisAsync is func to control MCIS async @@ -285,7 +284,7 @@ func ControlMcisAsync(nsId string, mcisId string, action string, force bool) err //goroutin sync wg var wg sync.WaitGroup - var results ControlVmResultWrapper + results := make(chan ControlVmResult, len(vmList)) for _, v := range vmList { wg.Add(1) @@ -293,18 +292,23 @@ func ControlMcisAsync(nsId string, mcisId string, action string, force bool) err // Avoid concurrent requests to CSP. time.Sleep(time.Duration(3) * time.Second) - go ControlVmAsync(&wg, nsId, mcisId, v, action, &results) + go ControlVmAsync(&wg, nsId, mcisId, v, action, results) } - wg.Wait() //goroutine sync wg + go func() { + wg.Wait() + close(results) + }() checkErrFlag := "" - for _, v := range results.ResultArray { - if v.Error != nil { + for result := range results { + fmt.Println("Result:", result) + if result.Error != nil { checkErrFlag += "[" - checkErrFlag += v.Error.Error() + checkErrFlag += result.Error.Error() checkErrFlag += "]" } } + if checkErrFlag != "" { return fmt.Errorf(checkErrFlag) } @@ -316,15 +320,14 @@ func ControlMcisAsync(nsId string, mcisId string, action string, force bool) err } // ControlVmAsync is func to control VM async -func ControlVmAsync(wg *sync.WaitGroup, nsId string, mcisId string, vmId string, action string, results *ControlVmResultWrapper) error { +func ControlVmAsync(wg *sync.WaitGroup, nsId string, mcisId string, vmId string, action string, results chan<- ControlVmResult) { defer wg.Done() //goroutine sync done - var errTmp error var err error - var err2 error - resultTmp := ControlVmResult{} - resultTmp.VmId = vmId - resultTmp.Status = "" + + callResult := ControlVmResult{} + callResult.VmId = vmId + callResult.Status = "" temp := TbVmInfo{} key := common.GenMcisKey(nsId, mcisId, vmId) @@ -333,33 +336,27 @@ func ControlVmAsync(wg *sync.WaitGroup, nsId string, mcisId string, vmId string, keyValue, err := common.CBStore.Get(key) if keyValue == nil || err != nil { - - resultTmp.Error = fmt.Errorf("CBStoreGetErr. keyValue == nil || err != nil. key[" + key + "]") - results.ResultArray = append(results.ResultArray, resultTmp) - common.PrintJsonPretty(resultTmp) - return resultTmp.Error - + callResult.Error = fmt.Errorf("CBStoreGetErr in ControlVmAsync. key[" + key + "]") + common.PrintJsonPretty(callResult) + results <- callResult + return } else { - fmt.Println("<" + keyValue.Key + "> \n" + keyValue.Value) - fmt.Println("===============================================") unmarshalErr := json.Unmarshal([]byte(keyValue.Value), &temp) if unmarshalErr != nil { fmt.Println("Unmarshal error:", unmarshalErr) } - fmt.Println("\n[Calling SPIDER]START vmControl") - cspVmId := temp.CspViewVmDetail.IId.NameId common.PrintJsonPretty(temp.CspViewVmDetail) // Prevent malformed cspVmId if cspVmId == "" || common.CheckString(cspVmId) != nil { - resultTmp.Error = fmt.Errorf("Not valid requested CSPNativeVmId: [" + cspVmId + "]") + callResult.Error = fmt.Errorf("Not valid requested CSPNativeVmId: [" + cspVmId + "]") temp.Status = StatusFailed - temp.SystemMessage = resultTmp.Error.Error() + temp.SystemMessage = callResult.Error.Error() UpdateVmInfo(nsId, mcisId, temp) - //return err + return } else { url := "" @@ -398,70 +395,41 @@ func ControlVmAsync(wg *sync.WaitGroup, nsId string, mcisId string, vmId string, url = common.SpiderRestUrl + "/controlvm/" + cspVmId + "?action=resume" method = "GET" default: - return errors.New(action + " is invalid actionType") + callResult.Error = fmt.Errorf(action + " is invalid actionType") + results <- callResult + return } UpdateVmInfo(nsId, mcisId, temp) - type ControlVMReqInfo struct { - ConnectionName string - } - requestBody := ControlVMReqInfo{} - requestBody.ConnectionName = temp.ConnectionName - payload, _ := json.MarshalIndent(requestBody, "", " ") + client := resty.New() + client.SetTimeout(10 * time.Minute) - client := &http.Client{ - CheckRedirect: func(req *http.Request, via []*http.Request) error { - return http.ErrUseLastResponse - }, - } - req, err := http.NewRequest(method, url, strings.NewReader(string(payload))) - - if err != nil { - common.CBLog.Error(err) - return err - } - req.Header.Add("Content-Type", "application/json") + requestBody := common.SpiderConnectionName{} + requestBody.ConnectionName = temp.ConnectionName - res, err := client.Do(req) - if err != nil { - common.CBLog.Error(err) - return err - } - body, err := ioutil.ReadAll(res.Body) + err = common.ExecuteHttpRequest( + client, + method, + url, + nil, + common.SetUseBody(requestBody), + &requestBody, + &callResult, + common.MediumDuration, + ) if err != nil { common.CBLog.Error(err) - return err - } - defer res.Body.Close() - - fmt.Println("HTTP Status code: " + strconv.Itoa(res.StatusCode)) - switch { - case res.StatusCode >= 400 || res.StatusCode < 200: - err := fmt.Errorf(string(body)) - common.CBLog.Error(err) - errTmp = err - } - - err2 = json.Unmarshal(body, &resultTmp) - - if err2 != nil { - fmt.Println(err2) - common.CBLog.Error(err) - errTmp = err - } - if errTmp != nil { - resultTmp.Error = errTmp - temp.Status = StatusFailed - temp.SystemMessage = errTmp.Error() + temp.SystemMessage = err.Error() UpdateVmInfo(nsId, mcisId, temp) - } - results.ResultArray = append(results.ResultArray, resultTmp) - common.PrintJsonPretty(resultTmp) + callResult.Error = err + results <- callResult + return + } - fmt.Println("[Calling SPIDER]END vmControl") + common.PrintJsonPretty(callResult) if action != ActionTerminate { //When VM is restared, temporal PublicIP will be chanaged. Need update. @@ -484,142 +452,12 @@ func ControlVmAsync(wg *sync.WaitGroup, nsId string, mcisId string, vmId string, mcir.UpdateAssociatedObjectList(nsId, common.StrDataDisk, v, common.StrDelete, key) } } - } - - } - - return nil - -} - -// ControlVm is func to control VM -func ControlVm(nsId string, mcisId string, vmId string, action string) error { - - var content struct { - CloudId string `json:"cloudId"` - CspVmId string `json:"cspVmId"` - } - - key := common.GenMcisKey(nsId, mcisId, vmId) - fmt.Println("[ControlVm] " + key) - - keyValue, err := common.CBStore.Get(key) - if err != nil { - err = fmt.Errorf("In ControlVm(); CBStore.Get() returned an error.") - common.CBLog.Error(err) - return err - } - - fmt.Println("<" + keyValue.Key + "> \n" + keyValue.Value) - fmt.Println("===============================================") - - json.Unmarshal([]byte(keyValue.Value), &content) - - temp := TbVmInfo{} - unmarshalErr := json.Unmarshal([]byte(keyValue.Value), &temp) - if unmarshalErr != nil { - fmt.Println("unmarshalErr:", unmarshalErr) - } - - fmt.Println("\n[Calling SPIDER]START vmControl") - - cspVmId := temp.CspViewVmDetail.IId.NameId - common.PrintJsonPretty(temp.CspViewVmDetail) - - url := "" - method := "" - switch action { - case ActionTerminate: - - temp.TargetAction = ActionTerminate - temp.TargetStatus = StatusTerminated - temp.Status = StatusTerminating - - url = common.SpiderRestUrl + "/vm/" + cspVmId - method = "DELETE" - case ActionReboot: - - temp.TargetAction = ActionReboot - temp.TargetStatus = StatusRunning - temp.Status = StatusRebooting - - url = common.SpiderRestUrl + "/controlvm/" + cspVmId + "?action=reboot" - method = "GET" - case ActionSuspend: - - temp.TargetAction = ActionSuspend - temp.TargetStatus = StatusSuspended - temp.Status = StatusSuspending - - url = common.SpiderRestUrl + "/controlvm/" + cspVmId + "?action=suspend" - method = "GET" - case ActionResume: - - temp.TargetAction = ActionResume - temp.TargetStatus = StatusRunning - temp.Status = StatusResuming - url = common.SpiderRestUrl + "/controlvm/" + cspVmId + "?action=resume" - method = "GET" - default: - return errors.New(action + "is invalid actionType") - } - - UpdateVmInfo(nsId, mcisId, temp) - - type ControlVMReqInfo struct { - ConnectionName string - } - requestBody := ControlVMReqInfo{} - requestBody.ConnectionName = temp.ConnectionName - payload, _ := json.MarshalIndent(requestBody, "", " ") - - client := &http.Client{ - CheckRedirect: func(req *http.Request, via []*http.Request) error { - return http.ErrUseLastResponse - }, - } - req, err := http.NewRequest(method, url, strings.NewReader(string(payload))) - - if err != nil { - fmt.Println(err) - return err - } - req.Header.Add("Content-Type", "application/json") - - res, err := client.Do(req) - if err != nil { - fmt.Println(err) - return err - } - - body, err := ioutil.ReadAll(res.Body) - if err != nil { - fmt.Println(err) - return err - } - defer res.Body.Close() - - fmt.Println(string(body)) - - fmt.Println("[Calling SPIDER] END vmControl\n") - /* - if strings.Compare(content.CspVmId, "Not assigned yet") == 0 { - return nil - } - if strings.Compare(content.CloudId, "aws") == 0 { - controlVmAws(content.CspVmId) - } else if strings.Compare(content.CloudId, "gcp") == 0 { - controlVmGcp(content.CspVmId) - } else if strings.Compare(content.CloudId, "azure") == 0 { - controlVmAzure(content.CspVmId) - } else { - fmt.Println("==============ERROR=no matched providerId=================") + results <- callResult } - */ - - return nil + } + return } // CheckAllowedTransition is func to check status transition is acceptable diff --git a/src/core/mcis/manageInfo.go b/src/core/mcis/manageInfo.go index 6236b20d8..bf7d81ebe 100644 --- a/src/core/mcis/manageInfo.go +++ b/src/core/mcis/manageInfo.go @@ -1127,7 +1127,7 @@ func FetchVmStatus(nsId string, mcisId string, vmId string) (TbVmStatusInfo, err errorInfo.Location = temp.Location errorInfo.MonAgentStatus = temp.MonAgentStatus errorInfo.CreatedTime = temp.CreatedTime - errorInfo.SystemMessage = "Error in GetVmStatus" + errorInfo.SystemMessage = "Error in FetchVmStatus" cspVmId := temp.CspViewVmDetail.IId.NameId @@ -1857,14 +1857,20 @@ func DelMcisVm(nsId string, mcisId string, vmId string, option string) error { fmt.Println("[Delete VM] " + vmId) // ControlVm first - err = ControlVm(nsId, mcisId, vmId, ActionTerminate) - - if err != nil { - common.CBLog.Error(err) + var wg sync.WaitGroup + results := make(chan ControlVmResult, 1) + wg.Add(1) + go ControlVmAsync(&wg, nsId, mcisId, vmId, ActionTerminate, results) + checkErr := <-results + wg.Wait() + close(results) + if checkErr.Error != nil { + common.CBLog.Info(checkErr.Error) if option != "force" { - return err + return checkErr.Error } } + // for deletion, need to wait until termination is finished // Sleep for 5 seconds fmt.Printf("\n\n[Info] Sleep for 20 seconds for safe VM termination.\n\n")