Skip to content

Commit

Permalink
fix(host): reconnect qga on failed exec qga commands (#21357)
Browse files Browse the repository at this point in the history
  • Loading branch information
wanyaoqi authored Oct 8, 2024
1 parent 211a5de commit f426935
Showing 1 changed file with 28 additions and 9 deletions.
37 changes: 28 additions & 9 deletions pkg/hostman/monitor/qga/qga.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/json"
"fmt"
"net"
"runtime/debug"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -66,8 +67,6 @@ func NewQemuGuestAgent(id, qgaSocketPath string) (*QemuGuestAgent, error) {
qgaSocketPath: qgaSocketPath,
mutex: &sync.Mutex{},
readTimeout: QGA_DEFAULT_READ_TIMEOUT_SECOND,
commandQueue: make([]string, 0),
callbackQueue: make([]QGACallback, 0),
}
err := qga.connect()
if err != nil {
Expand All @@ -84,6 +83,9 @@ func (qga *QemuGuestAgent) connect() error {
if err != nil {
return errors.Wrap(err, "dial qga socket")
}

qga.commandQueue = make([]string, 0)
qga.callbackQueue = make([]QGACallback, 0)
qga.rwc = conn
qga.scanner = bufio.NewScanner(conn)

Expand All @@ -92,17 +94,23 @@ func (qga *QemuGuestAgent) connect() error {
}

func (qga *QemuGuestAgent) read() {
for qga.scanner.Scan() {
res := qga.scanner.Bytes()
defer func() {
if r := recover(); r != nil {
log.Errorf("QemuGuestAgent read %v %v", r, debug.Stack())
}
}()

scanner := qga.scanner
for scanner.Scan() {
res := scanner.Bytes()
if len(res) == 0 {
continue
}
go qga.callBack(res)
}
log.Infof("QGA Scan over %s ...", qga.id)
err := qga.scanner.Err()
err := scanner.Err()
if err != nil {
log.Infof("QGA Disconnected %s: %s", qga.id, err)
log.Debugf("QGA Disconnected %s: %s", qga.id, err)
}
}

Expand Down Expand Up @@ -132,21 +140,27 @@ func (qga *QemuGuestAgent) Close() error {
return err
}

qga.commandQueue = nil
qga.callbackQueue = nil
qga.scanner = nil
qga.rwc = nil
return nil
}

func (qga *QemuGuestAgent) Query(cmd string, cb QGACallback) {
func (qga *QemuGuestAgent) Query(cmd string, cb QGACallback) int {
// push
var cbQueueLength int
qga.mutex.Lock()
qga.commandQueue = append(qga.commandQueue, cmd)
qga.callbackQueue = append(qga.callbackQueue, cb)
cbQueueLength = len(qga.callbackQueue)
qga.mutex.Unlock()

if !qga.writing {
go qga.query()
}

return cbQueueLength
}

func (m *QemuGuestAgent) checkWriting() bool {
Expand Down Expand Up @@ -244,7 +258,7 @@ func (qga *QemuGuestAgent) execCmd(cmd *monitor.Command, expectResp bool, readTi
var cb = qga.getQGACallback(expectResp, resChan)

rawCmd := jsonutils.Marshal(cmd).String()
qga.Query(rawCmd, cb)
cbQueueLength := qga.Query(rawCmd, cb)

if !expectResp {
return nil, nil
Expand All @@ -256,6 +270,11 @@ func (qga *QemuGuestAgent) execCmd(cmd *monitor.Command, expectResp bool, readTi
}
select {
case <-time.After(time.Duration(readTimeoutSecond) * time.Second):
if cbQueueLength > 30 {
if err := qga.Close(); err != nil {
log.Errorf("failed close qga connection %s", err)
}
}
return nil, errors.Errorf("qga read timeout")
case res = <-resChan:
break
Expand Down

0 comments on commit f426935

Please sign in to comment.