Skip to content

Commit

Permalink
PMM-13292 Add INITIALIZATION_ERROR statue for process initialization …
Browse files Browse the repository at this point in the history
…failure (#2935)

* add process init fail status

* minor fix

* fixed the test

* lint and add timer

* lint

* lint

---------

Co-authored-by: Jiří Čtvrtka <[email protected]>
Co-authored-by: Roman Novikov <[email protected]>
Co-authored-by: Nurlan Moldomurov <[email protected]>
Co-authored-by: Alex Demidoff <[email protected]>
  • Loading branch information
5 people authored Aug 27, 2024
1 parent 7c580b4 commit 070cc40
Show file tree
Hide file tree
Showing 45 changed files with 1,016 additions and 469 deletions.
55 changes: 40 additions & 15 deletions agent/agents/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"strings"
"time"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sys/unix"

Expand Down Expand Up @@ -53,12 +54,14 @@ const (
// implements its own logic, and then switches to then next state via "go toXXX()". "go" statement is used
// only to avoid stack overflow; there are no extra goroutines for states.
type Process struct {
params *Params
l *logrus.Entry
pl *processLogger
changes chan inventorypb.AgentStatus
backoff *backoff.Backoff
ctxDone chan struct{}
params *Params
l *logrus.Entry
pl *processLogger
changes chan inventorypb.AgentStatus
backoff *backoff.Backoff
ctxDone chan struct{}
err error
initialized chan bool

// recreated on each restart
cmd *exec.Cmd
Expand Down Expand Up @@ -88,15 +91,26 @@ func (p *Params) String() string {
// New creates new process.
func New(params *Params, redactWords []string, l *logrus.Entry) *Process {
return &Process{
params: params,
l: l,
pl: newProcessLogger(l, keepLogLines, redactWords),
changes: make(chan inventorypb.AgentStatus, 10),
backoff: backoff.New(backoffMinDelay, backoffMaxDelay),
ctxDone: make(chan struct{}),
params: params,
l: l,
pl: newProcessLogger(l, keepLogLines, redactWords),
changes: make(chan inventorypb.AgentStatus, 10),
backoff: backoff.New(backoffMinDelay, backoffMaxDelay),
ctxDone: make(chan struct{}),
initialized: make(chan bool, 1),
}
}

// IsInitialized returns a chan of bool. True can be received if the process is initialized.
func (p *Process) IsInitialized() <-chan bool {
return p.initialized
}

// GetError returns the error thrown when initializing the process.
func (p *Process) GetError() error {
return p.err
}

// Run starts process and runs until ctx is canceled.
func (p *Process) Run(ctx context.Context) {
go p.toStarting()
Expand All @@ -107,7 +121,7 @@ func (p *Process) Run(ctx context.Context) {
}

// STARTING -> RUNNING.
// STARTING -> WAITING.
// STARTING -> FAILING.
func (p *Process) toStarting() {
p.l.Tracef("Process: starting.")
p.changes <- inventorypb.AgentStatus_STARTING
Expand All @@ -128,7 +142,7 @@ func (p *Process) toStarting() {

if err := p.cmd.Start(); err != nil {
p.l.Warnf("Process: failed to start: %s.", err)
go p.toWaiting()
go p.toFailing(err)
return
}

Expand All @@ -142,10 +156,11 @@ func (p *Process) toStarting() {
defer t.Stop()
select {
case <-t.C:
p.initialized <- true
go p.toRunning()
case <-p.cmdDone:
p.l.Warnf("Process: exited early: %s.", p.cmd.ProcessState)
go p.toWaiting()
go p.toFailing(errors.New("exited early"))
}
}

Expand Down Expand Up @@ -192,6 +207,16 @@ func (p *Process) toWaiting() {
}
}

// FAILING -> DONE.
func (p *Process) toFailing(err error) {
p.l.Tracef("Process: failing")
p.changes <- inventorypb.AgentStatus_INITIALIZATION_ERROR
p.l.Infof("Process: exited: %s.", p.cmd.ProcessState)
go p.toDone()
p.err = err
p.initialized <- false
}

// STOPPING -> DONE.
func (p *Process) toStopping() {
p.l.Tracef("Process: stopping (sending SIGTERM)...")
Expand Down
25 changes: 6 additions & 19 deletions agent/agents/process/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,35 +80,22 @@ func TestProcess(t *testing.T) {
})

t.Run("FailedToStart", func(t *testing.T) {
ctx, cancel, l := setup(t)
ctx, _, l := setup(t)
p := New(&Params{Path: "no_such_command"}, nil, l)
go p.Run(ctx)

assertStates(t, p, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_WAITING, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_WAITING)
cancel()
assertStates(t, p, inventorypb.AgentStatus_DONE, inventorypb.AgentStatus_AGENT_STATUS_INVALID)
assertStates(t, p, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_INITIALIZATION_ERROR,
inventorypb.AgentStatus_DONE, inventorypb.AgentStatus_AGENT_STATUS_INVALID)
})

t.Run("ExitedEarly", func(t *testing.T) {
sleep := strconv.FormatFloat(runningT.Seconds()-0.5, 'f', -1, 64)
ctx, cancel, l := setup(t)
p := New(&Params{Path: "sleep", Args: []string{sleep}}, nil, l)
go p.Run(ctx)

assertStates(t, p, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_WAITING, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_WAITING)
cancel()
assertStates(t, p, inventorypb.AgentStatus_DONE, inventorypb.AgentStatus_AGENT_STATUS_INVALID)
})

t.Run("CancelStarting", func(t *testing.T) {
sleep := strconv.FormatFloat(runningT.Seconds()-0.5, 'f', -1, 64)
ctx, cancel, l := setup(t)
ctx, _, l := setup(t)
p := New(&Params{Path: "sleep", Args: []string{sleep}}, nil, l)
go p.Run(ctx)

assertStates(t, p, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_WAITING, inventorypb.AgentStatus_STARTING)
cancel()
assertStates(t, p, inventorypb.AgentStatus_WAITING, inventorypb.AgentStatus_DONE, inventorypb.AgentStatus_AGENT_STATUS_INVALID)
assertStates(t, p, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_INITIALIZATION_ERROR,
inventorypb.AgentStatus_DONE, inventorypb.AgentStatus_AGENT_STATUS_INVALID)
})

t.Run("Exited", func(t *testing.T) {
Expand Down
52 changes: 40 additions & 12 deletions agent/agents/supervisor/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sort"
"strings"
"sync"
"time"

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -237,7 +238,7 @@ func (s *Supervisor) RestartAgents() {
agent.cancel()
<-agent.done

if err := s.startProcess(id, agent.requestedState, agent.listenPort); err != nil {
if err := s.tryStartProcess(id, agent.requestedState, agent.listenPort); err != nil {
s.l.Errorf("Failed to restart Agent: %s.", err)
}
}
Expand Down Expand Up @@ -310,22 +311,15 @@ func (s *Supervisor) setAgentProcesses(agentProcesses map[string]*agentpb.SetSta
agent.cancel()
<-agent.done

if err := s.startProcess(agentID, agentProcesses[agentID], agent.listenPort); err != nil {
if err := s.tryStartProcess(agentID, agentProcesses[agentID], agent.listenPort); err != nil {
s.l.Errorf("Failed to start Agent: %s.", err)
// TODO report that error to server
}
}

// start new agents
for _, agentID := range toStart {
port, err := s.portsRegistry.Reserve()
if err != nil {
s.l.Errorf("Failed to reserve port: %s.", err)
// TODO report that error to server
continue
}

if err := s.startProcess(agentID, agentProcesses[agentID], port); err != nil {
if err := s.tryStartProcess(agentID, agentProcesses[agentID], 0); err != nil {
s.l.Errorf("Failed to start Agent: %s.", err)
// TODO report that error to server
}
Expand Down Expand Up @@ -427,10 +421,33 @@ func filter(existing, ap map[string]agentpb.AgentParams) ([]string, []string, []

//nolint:golint,stylecheck,revive
const (
type_TEST_SLEEP inventorypb.AgentType = 998 // process
type_TEST_NOOP inventorypb.AgentType = 999 // built-in
type_TEST_SLEEP inventorypb.AgentType = 998 // process
type_TEST_NOOP inventorypb.AgentType = 999 // built-in
process_Retry_Time int = 3
start_Process_Waiting = 2 * time.Second
)

func (s *Supervisor) tryStartProcess(agentID string, agentProcess *agentpb.SetStateRequest_AgentProcess, port uint16) error {
var err error
for i := 0; i < process_Retry_Time; i++ {
if port == 0 {
_port, err := s.portsRegistry.Reserve()
if err != nil {
s.l.Errorf("Failed to reserve port: %s.", err)
continue
}
port = _port
}

if err = s.startProcess(agentID, agentProcess, port); err == nil {
return nil
}

port = 0
}
return err
}

// startProcess starts Agent's process.
// Must be called with s.rw held for writing.
func (s *Supervisor) startProcess(agentID string, agentProcess *agentpb.SetStateRequest_AgentProcess, port uint16) error {
Expand Down Expand Up @@ -473,6 +490,17 @@ func (s *Supervisor) startProcess(agentID string, agentProcess *agentpb.SetState
close(done)
}()

t := time.NewTimer(start_Process_Waiting)
defer t.Stop()
select {
case isInitialized := <-process.IsInitialized():
if !isInitialized {
defer cancel()
return process.GetError()
}
case <-t.C:
}

//nolint:forcetypeassert
s.agentProcesses[agentID] = &agentProcessInfo{
cancel: cancel,
Expand Down
58 changes: 48 additions & 10 deletions agent/agents/supervisor/supervisor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,16 @@ func TestSupervisor(t *testing.T) {

assertChanges(t, s,
&agentpb.StateChangedRequest{AgentId: "noop3", Status: inventorypb.AgentStatus_STARTING},
&agentpb.StateChangedRequest{AgentId: "sleep1", Status: inventorypb.AgentStatus_STARTING, ListenPort: 65000, ProcessExecPath: "sleep"})
&agentpb.StateChangedRequest{AgentId: "sleep1", Status: inventorypb.AgentStatus_STARTING, ListenPort: 65000, ProcessExecPath: "sleep"},
&agentpb.StateChangedRequest{AgentId: "sleep1", Status: inventorypb.AgentStatus_RUNNING, ListenPort: 65000, ProcessExecPath: "sleep"})
expectedList = []*agentlocalpb.AgentInfo{
{AgentType: type_TEST_NOOP, AgentId: "noop3", Status: inventorypb.AgentStatus_STARTING},
{AgentType: type_TEST_SLEEP, AgentId: "sleep1", Status: inventorypb.AgentStatus_STARTING, ListenPort: 65000, ProcessExecPath: "sleep"},
{AgentType: type_TEST_SLEEP, AgentId: "sleep1", Status: inventorypb.AgentStatus_RUNNING, ListenPort: 65000, ProcessExecPath: "sleep"},
}
assert.Equal(t, expectedList, s.AgentsList())

assertChanges(t, s,
&agentpb.StateChangedRequest{AgentId: "noop3", Status: inventorypb.AgentStatus_RUNNING},
&agentpb.StateChangedRequest{AgentId: "sleep1", Status: inventorypb.AgentStatus_RUNNING, ListenPort: 65000, ProcessExecPath: "sleep"})
&agentpb.StateChangedRequest{AgentId: "noop3", Status: inventorypb.AgentStatus_RUNNING})
expectedList = []*agentlocalpb.AgentInfo{
{AgentType: type_TEST_NOOP, AgentId: "noop3", Status: inventorypb.AgentStatus_RUNNING},
{AgentType: type_TEST_SLEEP, AgentId: "sleep1", Status: inventorypb.AgentStatus_RUNNING, ListenPort: 65000, ProcessExecPath: "sleep"},
Expand Down Expand Up @@ -114,17 +114,17 @@ func TestSupervisor(t *testing.T) {

assertChanges(t, s,

Check failure on line 115 in agent/agents/supervisor/supervisor_test.go

View workflow job for this annotation

GitHub Actions / Checks

arg list parens: align `)` to a same line with last argument
&agentpb.StateChangedRequest{AgentId: "sleep1", Status: inventorypb.AgentStatus_STARTING, ListenPort: 65000, ProcessExecPath: "sleep"},
&agentpb.StateChangedRequest{AgentId: "sleep2", Status: inventorypb.AgentStatus_STARTING, ListenPort: 65001, ProcessExecPath: "sleep"})
&agentpb.StateChangedRequest{AgentId: "sleep1", Status: inventorypb.AgentStatus_RUNNING, ListenPort: 65000, ProcessExecPath: "sleep"},
&agentpb.StateChangedRequest{AgentId: "sleep2", Status: inventorypb.AgentStatus_STARTING, ListenPort: 65001, ProcessExecPath: "sleep"},
&agentpb.StateChangedRequest{AgentId: "sleep2", Status: inventorypb.AgentStatus_RUNNING, ListenPort: 65001, ProcessExecPath: "sleep"},
)
expectedList = []*agentlocalpb.AgentInfo{
{AgentType: type_TEST_NOOP, AgentId: "noop3", Status: inventorypb.AgentStatus_RUNNING},
{AgentType: type_TEST_SLEEP, AgentId: "sleep1", Status: inventorypb.AgentStatus_STARTING, ListenPort: 65000, ProcessExecPath: "sleep"},
{AgentType: type_TEST_SLEEP, AgentId: "sleep2", Status: inventorypb.AgentStatus_STARTING, ListenPort: 65001, ProcessExecPath: "sleep"},
{AgentType: type_TEST_SLEEP, AgentId: "sleep1", Status: inventorypb.AgentStatus_RUNNING, ListenPort: 65000, ProcessExecPath: "sleep"},
{AgentType: type_TEST_SLEEP, AgentId: "sleep2", Status: inventorypb.AgentStatus_RUNNING, ListenPort: 65001, ProcessExecPath: "sleep"},
}
assert.Equal(t, expectedList, s.AgentsList())

assertChanges(t, s,
&agentpb.StateChangedRequest{AgentId: "sleep1", Status: inventorypb.AgentStatus_RUNNING, ListenPort: 65000, ProcessExecPath: "sleep"},
&agentpb.StateChangedRequest{AgentId: "sleep2", Status: inventorypb.AgentStatus_RUNNING, ListenPort: 65001, ProcessExecPath: "sleep"})
expectedList = []*agentlocalpb.AgentInfo{
{AgentType: type_TEST_NOOP, AgentId: "noop3", Status: inventorypb.AgentStatus_RUNNING},
{AgentType: type_TEST_SLEEP, AgentId: "sleep1", Status: inventorypb.AgentStatus_RUNNING, ListenPort: 65000, ProcessExecPath: "sleep"},
Expand Down Expand Up @@ -259,6 +259,44 @@ func TestSupervisor(t *testing.T) {
})
}

func TestStartProcessFail(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
tempDir := t.TempDir()
cfgStorage := config.NewStorage(&config.Config{
Paths: config.Paths{TempDir: tempDir},
Ports: config.Ports{Min: 65000, Max: 65099},
Server: config.Server{Address: "localhost:443"},
LogLinesCount: 1,
})
s := NewSupervisor(ctx, nil, cfgStorage)
go s.Run(ctx)

t.Run("Start", func(t *testing.T) {
expectedList := []*agentlocalpb.AgentInfo{}
require.Equal(t, expectedList, s.AgentsList())

s.SetState(&agentpb.SetStateRequest{
AgentProcesses: map[string]*agentpb.SetStateRequest_AgentProcess{
"sleep1": {Type: type_TEST_SLEEP, Args: []string{"wrong format"}},
},
})

assertChanges(t, s,
&agentpb.StateChangedRequest{AgentId: "sleep1", Status: inventorypb.AgentStatus_STARTING, ListenPort: 65000, ProcessExecPath: "sleep"},
&agentpb.StateChangedRequest{AgentId: "sleep1", Status: inventorypb.AgentStatus_INITIALIZATION_ERROR, ListenPort: 65000, ProcessExecPath: "sleep"},
&agentpb.StateChangedRequest{AgentId: "sleep1", Status: inventorypb.AgentStatus_DONE, ListenPort: 65000, ProcessExecPath: "sleep"},
&agentpb.StateChangedRequest{AgentId: "sleep1", Status: inventorypb.AgentStatus_STARTING, ListenPort: 65001, ProcessExecPath: "sleep"},
&agentpb.StateChangedRequest{AgentId: "sleep1", Status: inventorypb.AgentStatus_INITIALIZATION_ERROR, ListenPort: 65001, ProcessExecPath: "sleep"},
&agentpb.StateChangedRequest{AgentId: "sleep1", Status: inventorypb.AgentStatus_DONE, ListenPort: 65001, ProcessExecPath: "sleep"},
&agentpb.StateChangedRequest{AgentId: "sleep1", Status: inventorypb.AgentStatus_STARTING, ListenPort: 65002, ProcessExecPath: "sleep"},
&agentpb.StateChangedRequest{AgentId: "sleep1", Status: inventorypb.AgentStatus_INITIALIZATION_ERROR, ListenPort: 65002, ProcessExecPath: "sleep"},
&agentpb.StateChangedRequest{AgentId: "sleep1", Status: inventorypb.AgentStatus_DONE, ListenPort: 65002, ProcessExecPath: "sleep"})
expectedList = []*agentlocalpb.AgentInfo{}
require.Equal(t, expectedList, s.AgentsList())
})
}

func TestFilter(t *testing.T) {
t.Parallel()

Expand Down
6 changes: 4 additions & 2 deletions api/agentlocalpb/json/agentlocalpb.json
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,13 @@
"x-order": 4
},
"status": {
"description": "AgentStatus represents actual Agent status.\n\n - STARTING: Agent is starting.\n - RUNNING: Agent is running.\n - WAITING: Agent encountered error and will be restarted automatically soon.\n - STOPPING: Agent is stopping.\n - DONE: Agent finished.\n - UNKNOWN: Agent is not connected, we don't know anything about it's state.",
"description": "AgentStatus represents actual Agent status.\n\n - STARTING: Agent is starting.\n - INITIALIZATION_ERROR: Agent encountered error when starting.\n - RUNNING: Agent is running.\n - WAITING: Agent encountered error when running and will be restarted automatically soon.\n - STOPPING: Agent is stopping.\n - DONE: Agent finished.\n - UNKNOWN: Agent is not connected, we don't know anything about it's state.",
"type": "string",
"default": "AGENT_STATUS_INVALID",
"enum": [
"AGENT_STATUS_INVALID",
"STARTING",
"INITIALIZATION_ERROR",
"RUNNING",
"WAITING",
"STOPPING",
Expand Down Expand Up @@ -342,12 +343,13 @@
"x-order": 4
},
"status": {
"description": "AgentStatus represents actual Agent status.\n\n - STARTING: Agent is starting.\n - RUNNING: Agent is running.\n - WAITING: Agent encountered error and will be restarted automatically soon.\n - STOPPING: Agent is stopping.\n - DONE: Agent finished.\n - UNKNOWN: Agent is not connected, we don't know anything about it's state.",
"description": "AgentStatus represents actual Agent status.\n\n - STARTING: Agent is starting.\n - INITIALIZATION_ERROR: Agent encountered error when starting.\n - RUNNING: Agent is running.\n - WAITING: Agent encountered error when running and will be restarted automatically soon.\n - STOPPING: Agent is stopping.\n - DONE: Agent finished.\n - UNKNOWN: Agent is not connected, we don't know anything about it's state.",
"type": "string",
"default": "AGENT_STATUS_INVALID",
"enum": [
"AGENT_STATUS_INVALID",
"STARTING",
"INITIALIZATION_ERROR",
"RUNNING",
"WAITING",
"STOPPING",
Expand Down
Loading

0 comments on commit 070cc40

Please sign in to comment.