From f1f585066d0593a1f6ef794daf7043345c3a984f Mon Sep 17 00:00:00 2001 From: "R.I.Pienaar" Date: Fri, 17 Jan 2025 11:59:16 +0100 Subject: [PATCH] (#2209) Adds a basic long running execution framework Signed-off-by: R.I.Pienaar --- choria/framework.go | 27 +- cmd/buildinfo.go | 13 +- cmd/exec-supervisor.go | 68 +++ config/choria.go | 4 +- providers/execution/exec.go | 541 ++++++++++++++++++++ providers/execution/exec_test.go | 191 +++++++ providers/governor/streams/governor_test.go | 18 +- 7 files changed, 828 insertions(+), 34 deletions(-) create mode 100644 cmd/exec-supervisor.go create mode 100644 providers/execution/exec.go create mode 100644 providers/execution/exec_test.go diff --git a/choria/framework.go b/choria/framework.go index 74c9b12e1..1f46ae4ca 100644 --- a/choria/framework.go +++ b/choria/framework.go @@ -1,4 +1,4 @@ -// Copyright (c) 2017-2023, R.I. Pienaar and the Choria Project contributors +// Copyright (c) 2017-2025, R.I. Pienaar and the Choria Project contributors // // SPDX-License-Identifier: Apache-2.0 @@ -479,25 +479,30 @@ func (fw *Framework) SetLogWriter(out io.Writer) { } func (fw *Framework) commonLogOpener() error { + return CommonLogOpener(fw.Config.LogFile, fw.log) +} + +// CommonLogOpener opens a logfile +func CommonLogOpener(logFile string, logger *log.Logger) error { switch { - case strings.ToLower(fw.Config.LogFile) == "discard": - fw.log.SetOutput(io.Discard) + case strings.ToLower(logFile) == "discard": + log.SetOutput(io.Discard) - case strings.ToLower(fw.Config.LogFile) == "stdout": - fw.log.SetOutput(os.Stdout) + case strings.ToLower(logFile) == "stdout": + log.SetOutput(os.Stdout) - case strings.ToLower(fw.Config.LogFile) == "stderr": - fw.log.SetOutput(os.Stderr) + case strings.ToLower(logFile) == "stderr": + log.SetOutput(os.Stderr) - case fw.Config.LogFile != "": - fw.log.Formatter = &log.JSONFormatter{} + case logFile != "": + logger.Formatter = &log.JSONFormatter{} - file, err := os.OpenFile(fw.Config.LogFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666) + file, err := os.OpenFile(logFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666) if err != nil { return fmt.Errorf("could not set up logging: %s", err) } - fw.log.SetOutput(file) + logger.SetOutput(file) } return nil diff --git a/cmd/buildinfo.go b/cmd/buildinfo.go index e59461905..8b5e2cdd8 100644 --- a/cmd/buildinfo.go +++ b/cmd/buildinfo.go @@ -1,4 +1,4 @@ -// Copyright (c) 2017-2024, R.I. Pienaar and the Choria Project contributors +// Copyright (c) 2017-2025, R.I. Pienaar and the Choria Project contributors // // SPDX-License-Identifier: Apache-2.0 @@ -6,16 +6,17 @@ package cmd import ( "fmt" - "github.com/choria-io/go-choria/config" - iu "github.com/choria-io/go-choria/internal/util" - "github.com/choria-io/go-choria/protocol" - "github.com/choria-io/go-choria/providers/provtarget" - gnatsd "github.com/nats-io/nats-server/v2/server" "runtime" rd "runtime/debug" "sort" "strings" "sync" + + "github.com/choria-io/go-choria/config" + iu "github.com/choria-io/go-choria/internal/util" + "github.com/choria-io/go-choria/protocol" + "github.com/choria-io/go-choria/providers/provtarget" + gnatsd "github.com/nats-io/nats-server/v2/server" ) type buildinfoCommand struct { diff --git a/cmd/exec-supervisor.go b/cmd/exec-supervisor.go new file mode 100644 index 000000000..6dcd7422f --- /dev/null +++ b/cmd/exec-supervisor.go @@ -0,0 +1,68 @@ +// Copyright (c) 2025, R.I. Pienaar and the Choria Project contributors +// +// SPDX-License-Identifier: Apache-2.0 + +package cmd + +import ( + "fmt" + "sync" + "time" + + "github.com/choria-io/go-choria/providers/execution" + "github.com/choria-io/go-choria/submission" +) + +type execSupervisorCommand struct { + command + + hb time.Duration + output bool + cmdID string + env map[string]string +} + +func init() { + cli.commands = append(cli.commands, &execSupervisorCommand{}) +} + +func (b *execSupervisorCommand) Setup() (err error) { + b.env = map[string]string{} + + b.cmd = cli.app.Command("exec-supervisor", "Executes and supervises shell commands").Hidden() + b.cmd.Flag("config", "Config file to use").PlaceHolder("FILE").ExistingFileVar(&configFile) + b.cmd.Flag("heartbeat", "Interval to heartbeat about running commands").Default("5m").DurationVar(&b.hb) + b.cmd.Flag("track-output", "Tracks command output and Submit to Choria").UnNegatableBoolVar(&b.output) + b.cmd.Flag("process", "Unique ID for this command").Required().StringVar(&b.cmdID) + + return +} + +func (b *execSupervisorCommand) Configure() (err error) { + return commonConfigure() +} + +func (b *execSupervisorCommand) Run(wg *sync.WaitGroup) (err error) { + defer wg.Done() + + log := c.Logger("exec-supervisor") + proc, err := execution.Load(c, b.cmdID) + if err != nil { + log.Errorf("Could not start supervisor: %s", err) + return fmt.Errorf("could not start supervisor: %s", err) + } + + submit, err := submission.NewFromChoria(c, submission.Directory) + if err != nil { + log.Errorf("Could not start supervisor: %s", err) + return fmt.Errorf("could not start supervisor: %s", err) + } + + err = proc.StartSupervised(ctx, cfg.Choria.ExecutorSpool, submit, b.hb, b.output, log) + if err != nil { + log.Errorf("Could not start supervisor: %s", err) + return fmt.Errorf("could not start supervisor: %s", err) + } + + return nil +} diff --git a/config/choria.go b/config/choria.go index 238ed3c5b..849997f02 100644 --- a/config/choria.go +++ b/config/choria.go @@ -1,4 +1,4 @@ -// Copyright (c) 2018-2023, R.I. Pienaar and the Choria Project contributors +// Copyright (c) 2018-2025, R.I. Pienaar and the Choria Project contributors // // SPDX-License-Identifier: Apache-2.0 @@ -164,6 +164,8 @@ type ChoriaPluginConfig struct { RPCAuditLogfileGroup string `confkey:"plugin.rpcaudit.logfile.group"` // User group to set file ownership to RPCAuditLogFileMode string `confkey:"plugin.rpcaudit.logfile.mode" default:"0600"` // File mode to apply to the file + ExecutorSpool string `confkey:"plugin.choria.executor.spool" type:"path_string"` // Path where the command executor writes state + AutonomousAgentsDownload bool `confkey:"plugin.machines.download"` // Activate run-time installation of Autonomous Agents AutonomousAgentsBucket string `confkey:"plugin.machines.bucket" default:"CHORIA_PLUGINS"` // The KV bucket to query for plugins to install AutonomousAgentsKey string `confkey:"plugin.machines.key" default:"machines"` // The Key to query in KV bucket for plugins to install diff --git a/providers/execution/exec.go b/providers/execution/exec.go new file mode 100644 index 000000000..4ba42d1f4 --- /dev/null +++ b/providers/execution/exec.go @@ -0,0 +1,541 @@ +// Copyright (c) 2025, R.I. Pienaar and the Choria Project contributors +// +// SPDX-License-Identifier: Apache-2.0 + +package execution + +import ( + "bufio" + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "os" + "os/exec" + "path/filepath" + "strconv" + "sync" + "syscall" + "time" + + "github.com/choria-io/go-choria/inter" + iu "github.com/choria-io/go-choria/internal/util" + "github.com/choria-io/go-choria/submission" + "github.com/sirupsen/logrus" +) + +// Process describes a process managed by the execution provider +type Process struct { + Command string `json:"command"` + Args []string `json:"args"` + Environment map[string]string `json:"environment"` + StdoutFile string `json:"stdout"` + StderrFile string `json:"stderr"` + PidFile string `json:"pid"` + HeartBeat time.Duration `json:"heartbeat"` + ID string `json:"id"` + Identity string `json:"identity"` + Caller string `json:"caller"` + Agent string `json:"agent"` + Action string `json:"action"` + RequestID string `json:"requestid"` + StartTime time.Time `json:"start,omitempty"` +} + +type Submitter interface { + NewMessage() *submission.Message + Submit(msg *submission.Message) error +} + +type watchedLine struct { + origin string + line []byte +} + +var ( + ErrSpoolNotConfigured = errors.New("spool not configured") + ErrSpoolNotFound = errors.New("spool not found") + ErrSpecificationNotFound = errors.New("specification not found") + ErrSpecificationLoadError = errors.New("specification could not be loaded") + ErrDuplicateJob = errors.New("duplicate job") + ErrStartFailed = errors.New("start failed") + ErrInvalidProcess = errors.New("invalid process") + ErrWritingPidFailed = errors.New("writing pid file failed") + ErrInvalidPid = errors.New("invalid pid file") + ErrAlreadyStarted = errors.New("already started") + ErrSpoolCreationFailed = errors.New("spool creation failed") +) + +func New(caller string, agent string, action string, reqID string, identity string, id string, command string, args []string, env map[string]string) (*Process, error) { + if caller == "" { + return nil, fmt.Errorf("%w: no caller", ErrInvalidProcess) + } + if reqID == "" { + return nil, fmt.Errorf("%w: no request id", ErrInvalidProcess) + } + if id == "" { + return nil, fmt.Errorf("%w: no id", ErrInvalidProcess) + } + if identity == "" { + return nil, fmt.Errorf("%w: no identity", ErrInvalidProcess) + } + if command == "" { + return nil, fmt.Errorf("%w: no command", ErrInvalidProcess) + } + + return &Process{ + Command: command, + Args: args, + Environment: env, + Identity: identity, + ID: id, + Caller: caller, + RequestID: reqID, + Agent: agent, + Action: action, + }, nil +} + +func Load(spool string, id string) (*Process, error) { + jobSpec := specPath(spool, id) + + if spool == "" { + return nil, ErrSpoolNotConfigured + } + + if !iu.FileIsDir(spool) { + return nil, ErrSpoolNotFound + } + + if !iu.FileExist(jobSpec) { + return nil, ErrSpecificationNotFound + } + + j, err := os.ReadFile(jobSpec) + if err != nil { + return nil, fmt.Errorf("%w: %s", ErrSpecificationLoadError, err) + } + + var p Process + err = json.Unmarshal(j, &p) + if err != nil { + return nil, fmt.Errorf("%w: %s", ErrSpecificationLoadError, err) + } + + return &p, nil +} + +func LoadWithChoria(fw inter.Framework, id string) (*Process, error) { + return Load(fw.Configuration().Choria.ExecutorSpool, id) +} + +// CreateSpool creates the spool and saves the spec, fails if already created +func (p *Process) CreateSpool(spool string) (json.RawMessage, error) { + if spool == "" { + return nil, ErrSpoolNotConfigured + } + + has, err := hasJob(spool, p.ID) + if err != nil { + return nil, fmt.Errorf("%w: %w", ErrInvalidProcess, err) + } + if has { + return nil, fmt.Errorf("%w: spool already exists", ErrDuplicateJob) + } + + jobDir := filepath.Join(spool, p.ID) + err = os.Mkdir(jobDir, 0700) + if err != nil { + return nil, fmt.Errorf("%w: %w", ErrSpoolCreationFailed, err) + } + + return saveJobSpec(spool, p) +} + +// StartSupervised starts a process attached to the calling process with status, heartbeats and optionally output published to Choria Submission +func (p *Process) StartSupervised(ctx context.Context, spool string, submit Submitter, heartbeat time.Duration, publishOutput bool, log *logrus.Entry) error { + if spool == "" { + return ErrSpoolNotConfigured + } + + if !p.StartTime.IsZero() { + return ErrAlreadyStarted + } + + log = log.WithFields(logrus.Fields{ + "id": p.ID, + "command": p.Command, + "caller": p.Caller, + "request": p.RequestID, + }) + + log.Infof("Starting supervised process") + + p.StdoutFile = stdOutPath(spool, p.ID) + p.StderrFile = stdErrPath(spool, p.ID) + p.PidFile = pidPath(spool, p.ID) + p.StartTime = time.Now().UTC() + p.HeartBeat = heartbeat + + prefix := fmt.Sprintf("choria.executor.%s.%s", p.RequestID, p.ID) + + jProc, err := saveJobSpec(spool, p) + if err != nil { + return fmt.Errorf("%w: %w", ErrStartFailed, err) + } + + msg := newSubmissionMessage(submit, fmt.Sprintf("%s.spec", prefix)) + msg.Payload = jProc + err = submit.Submit(msg) + if err != nil { + return fmt.Errorf("%w: %w", ErrStartFailed, err) + } + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + wg := &sync.WaitGroup{} + + var closers []io.Closer + var stdout, stderr io.WriteCloser + + if publishOutput { + var stdoutReader, stderrReader *io.PipeReader + var stdoutFile, stderrFile *os.File + + stdoutFile, err = os.Create(p.StdoutFile) + if err != nil { + return fmt.Errorf("%w: %s", ErrStartFailed, err) + } + stderrFile, err = os.Create(p.StderrFile) + if err != nil { + return fmt.Errorf("%w: %s", ErrStartFailed, err) + } + + stdoutReader, stdout = io.Pipe() + stderrReader, stderr = io.Pipe() + + closers = append(closers, stdout, stderr, stdoutFile, stderrFile, stdoutReader, stderrReader) + + wg.Add(1) + go watchOutput(wg, io.TeeReader(stdoutReader, stdoutFile), io.TeeReader(stderrReader, stderrFile), submit, prefix, p, log) + } else { + stdout, err = os.Create(p.StdoutFile) + if err != nil { + return fmt.Errorf("%w: %s", ErrStartFailed, err) + } + stderr, err = os.Create(p.StderrFile) + if err != nil { + return fmt.Errorf("%w: %s", ErrStartFailed, err) + } + + closers = append(closers, stdout, stderr) + } + + env := createEnv(p.Environment) + cmd := exec.CommandContext(ctx, p.Command, p.Args...) + cmd.Dir = "/" + cmd.Env = env + cmd.Stdout = stdout + cmd.Stderr = stderr + + err = cmd.Start() + if err != nil { + return fmt.Errorf("%w: %w", ErrStartFailed, err) + } + + // this could fail and the command could be running already... + err = os.WriteFile(p.PidFile, []byte(strconv.Itoa(cmd.Process.Pid)), 0700) + if err != nil { + return fmt.Errorf("%w: %s", ErrWritingPidFailed, err) + } + + if heartbeat > 0 { + wg.Add(1) + go hb(ctx, wg, heartbeat, submit, prefix, p, log) + } + + msg = newSubmissionMessage(submit, fmt.Sprintf("%s.pid", prefix)) + msg.Payload = []byte(strconv.Itoa(cmd.Process.Pid)) + err = submit.Submit(msg) + if err != nil { + log.Errorf("Failed to publish start update: %v", err) + } + + err = cmd.Wait() + if err != nil { + var exiterr *exec.ExitError + if errors.As(err, &exiterr) { + msg := newSubmissionMessage(submit, fmt.Sprintf("%s.exit", prefix)) + msg.Payload = []byte(fmt.Sprintf("%d %v", exiterr.ExitCode(), err.Error())) + err = submit.Submit(msg) + if err != nil { + log.Errorf("Failed to publish exit update: %v", err) + } + } + + return fmt.Errorf("%w: %w", ErrStartFailed, err) + } + + cancel() + + for _, closer := range closers { + closer.Close() + } + + wg.Wait() + + msg = newSubmissionMessage(submit, fmt.Sprintf("%s.exit", prefix)) + msg.Payload = []byte("0") + err = submit.Submit(msg) + if err != nil { + log.Errorf("Failed to publish exit update: %v", err) + } + + log.Infof("Finished supervised process") + + return nil +} + +// HasStarted determines if the command was started by the presence of the PID file +func (p *Process) HasStarted() (bool, error) { + if p.PidFile == "" { + return false, nil + } + + stat, err := os.Stat(p.PidFile) + if errors.Is(err, os.ErrNotExist) { + return false, nil + } else if err != nil { + return false, err + } + + return !stat.IsDir(), nil +} + +// IsRunning checks if the process is running +func (p *Process) IsRunning() bool { + err := p.Signal(syscall.Signal(0)) + + return err == nil +} + +// Signal sends a signal to the process +func (p *Process) Signal(sig syscall.Signal) error { + pid, err := p.ParsePid() + if err != nil { + return err + } + + proc, err := os.FindProcess(pid) + if err != nil { + return err + } + + return proc.Signal(sig) +} + +// Stderr reads the stderr output +func (p *Process) Stderr() ([]byte, error) { + if p.StderrFile == "" { + return nil, fmt.Errorf("%w: no stderr file configured", ErrInvalidProcess) + } + + return os.ReadFile(p.StderrFile) +} + +// Stdout reads the stdout output +func (p *Process) Stdout() ([]byte, error) { + if p.StdoutFile == "" { + return nil, fmt.Errorf("%w: no stdout file configured", ErrInvalidProcess) + } + + return os.ReadFile(p.StdoutFile) +} + +// ParsePid loads and parses the pid file, returns -1 on error +func (p *Process) ParsePid() (int, error) { + if p.PidFile == "" { + return -1, fmt.Errorf("%w: no pid file configured", ErrInvalidProcess) + } + + pidBytes, err := os.ReadFile(p.PidFile) + if err != nil { + return -1, fmt.Errorf("%w: %w", ErrInvalidPid, err) + } + + if len(pidBytes) == 0 { + return -1, fmt.Errorf("%w: 0 length pid", ErrInvalidPid) + } + + pid, err := strconv.Atoi(string(pidBytes)) + if err != nil { + return -1, fmt.Errorf("%w: %w", ErrInvalidPid, err) + } + + if pid == 1 { + return -1, fmt.Errorf("%w: impossible pid", ErrInvalidPid) + } + + return pid, nil +} + +func watchOutputReader(wg *sync.WaitGroup, r io.Reader, origin string, out chan watchedLine, log *logrus.Entry) { + defer wg.Done() + + scanner := bufio.NewScanner(r) + for scanner.Scan() { + out <- watchedLine{origin: origin, line: scanner.Bytes()} + } + if err := scanner.Err(); err != nil { + log.Warnf("Failed to read %s: %v", origin, err) + } + log.Infof("Finished watching %q output", origin) +} + +func hb(ctx context.Context, wg *sync.WaitGroup, heartbeat time.Duration, submit Submitter, prefix string, p *Process, log *logrus.Entry) { + defer wg.Done() + + ticker := time.NewTicker(heartbeat) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + isRunning := p.IsRunning() + msg := newSubmissionMessage(submit, fmt.Sprintf("%s.hb", prefix)) + msg.Payload = []byte(fmt.Sprintf("%t", isRunning)) + + err := submit.Submit(msg) + if err != nil { + log.Errorf("Failed to publish heartbeat: %v", err) + } + case <-ctx.Done(): + return + } + } +} + +func createEnv(env map[string]string) []string { + tenv := map[string]string{ + "PATH": os.Getenv("PATH"), + } + + for k, v := range env { + tenv[k] = v + } + + var res []string + for k, v := range tenv { + res = append(res, fmt.Sprintf("%s=%s", k, v)) + } + + return res +} + +func watchOutput(wg *sync.WaitGroup, stdout io.Reader, stderr io.Reader, submit Submitter, prefix string, p *Process, log *logrus.Entry) { + defer wg.Done() + + lines := make(chan watchedLine, 10) + + owg := &sync.WaitGroup{} + owg.Add(2) + go watchOutputReader(owg, stderr, "stderr", lines, log) + go watchOutputReader(owg, stdout, "stdout", lines, log) + + // closing the files will stop the watcher routines which + // this one is waiting on, when thats done it closes the + // channel which stops the range below + go func() { + owg.Wait() + close(lines) + }() + + publish := func(prev string, buff *bytes.Buffer) { + msg := newSubmissionMessage(submit, "") + + switch prev { + case "stdout": + msg.Subject = fmt.Sprintf("%s.out.stdout", prefix) + case "stderr": + msg.Subject = fmt.Sprintf("%s.out.stderr", prefix) + } + msg.Payload = buff.Bytes() + err := submit.Submit(msg) + if err != nil { + log.Errorf("Failed to publish log update: %v", err) + } else { + buff.Reset() + } + } + + ticker := time.NewTicker(time.Second) + buff := bytes.NewBuffer([]byte{}) + prev := "" + + for { + select { + case line, ok := <-lines: + if !ok { + publish(prev, buff) + return + } + + if prev == "" { + prev = line.origin + } + if line.origin != prev { + publish(prev, buff) + } + if buff.Len() > 1024 { + publish(prev, buff) + } + prev = line.origin + + buff.Write(line.line) + buff.Write([]byte("\n")) + + case <-ticker.C: + publish(prev, buff) + } + } +} + +func saveJobSpec(spool string, proc *Process) (json.RawMessage, error) { + j, err := json.Marshal(proc) + if err != nil { + return nil, err + } + + return j, os.WriteFile(specPath(spool, proc.ID), j, 0700) +} + +func hasJob(spool string, id string) (bool, error) { + return iu.FileExist(specPath(spool, id)), nil +} + +func pidPath(spool string, id string) string { + return filepath.Join(spool, id, "pid") +} + +func stdOutPath(spool string, id string) string { + return filepath.Join(spool, id, "stdout") +} + +func stdErrPath(spool string, id string) string { + return filepath.Join(spool, id, "stderr") +} + +func specPath(spool string, id string) string { + return filepath.Join(spool, id, "spec.json") +} + +func newSubmissionMessage(submit Submitter, subject string) *submission.Message { + msg := submit.NewMessage() + msg.Subject = subject + msg.Priority = 1 + msg.Reliable = true + + return msg +} diff --git a/providers/execution/exec_test.go b/providers/execution/exec_test.go new file mode 100644 index 000000000..120d09d44 --- /dev/null +++ b/providers/execution/exec_test.go @@ -0,0 +1,191 @@ +// Copyright (c) 2025, R.I. Pienaar and the Choria Project contributors +// +// SPDX-License-Identifier: Apache-2.0 + +package execution + +import ( + "encoding/json" + iu "github.com/choria-io/go-choria/internal/util" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "os" + "path/filepath" + "strconv" + "testing" +) + +func TestGovernor(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Execution") +} + +var _ = Describe("Execution", func() { + var ( + td string + jobId string + p *Process + err error + ) + + BeforeEach(func() { + jobId = iu.UniqueID() + td = GinkgoT().TempDir() + + p, err = New("ginkgo", "agent", "action", iu.UniqueID(), "ginkgo.example.net", jobId, "echo", []string{"hello", "world"}, nil) + Expect(err).ToNot(HaveOccurred()) + }) + + Describe("New", func() { + It("Should validate the input", func() { + _, err := New("", "X", "x", "", "x", "x", "x", nil, nil) + Expect(err).To(MatchError(ContainSubstring("no caller"))) + + _, err = New("x", "X", "x", "", "x", "x", "x", nil, nil) + Expect(err).To(MatchError(ContainSubstring("no request id"))) + + _, err = New("x", "X", "x", "x", "x", "", "x", nil, nil) + Expect(err).To(MatchError(ContainSubstring("no id"))) + + _, err = New("x", "X", "x", "x", "", "x", "x", nil, nil) + Expect(err).To(MatchError(ContainSubstring("no identity"))) + + _, err = New("x", "X", "x", "x", "x", "x", "", nil, nil) + Expect(err).To(MatchError(ContainSubstring("no command"))) + }) + }) + + Describe("Load", func() { + It("Should load the correct process", func() { + _, err := Load(td, "x") + Expect(err).To(MatchError(ErrSpecificationNotFound)) + + _, err = p.CreateSpool(td) + Expect(err).ToNot(HaveOccurred()) + + lp, err := Load(td, jobId) + Expect(err).ToNot(HaveOccurred()) + + Expect(lp).To(Equal(p)) + }) + }) + + Describe("CreateSpool", func() { + It("Should create a spool", func() { + j, err := p.CreateSpool(td) + Expect(err).ToNot(HaveOccurred()) + + Expect(filepath.Join(td, jobId, "spec.json")).To(BeAnExistingFile()) + + f, err := os.ReadFile(filepath.Join(td, jobId, "spec.json")) + Expect(err).ToNot(HaveOccurred()) + Expect(json.RawMessage(f)).To(Equal(j)) + }) + + It("Should detect duplicates", func() { + _, err = p.CreateSpool(td) + Expect(err).ToNot(HaveOccurred()) + _, err = p.CreateSpool(td) + Expect(err).To(MatchError(ErrDuplicateJob)) + }) + }) + + Describe("HasStarted", func() { + It("Should correctly detect if the process was started", func() { + Expect(p.PidFile).To(BeEmpty()) + Expect(p.HasStarted()).To(BeFalse()) + + p.PidFile = filepath.Join(td, "pid") + Expect(p.HasStarted()).To(BeFalse()) + + Expect(os.WriteFile(p.PidFile, []byte("10"), 0700)).To(Succeed()) + Expect(p.HasStarted()).To(BeTrue()) + }) + }) + + Describe("IsRunning", func() { + It("Should correctly detect if the process is running", func() { + p.PidFile = filepath.Join(td, "pid") + + Expect(os.WriteFile(p.PidFile, []byte(strconv.Itoa(os.Getpid())), 0700)).To(Succeed()) + Expect(p.IsRunning()).To(BeTrue()) + + Expect(os.WriteFile(p.PidFile, []byte("0"), 0700)).To(Succeed()) + Expect(p.IsRunning()).To(BeFalse()) + }) + }) + + Describe("Stderr", func() { + It("Should handle missing paths", func() { + _, err := p.Stderr() + Expect(err).To(MatchError(ErrInvalidProcess)) + }) + + It("Should read the file", func() { + p.StderrFile = filepath.Join(td, "stderr") + body := []byte("stderr") + err = os.WriteFile(p.StderrFile, body, 0700) + Expect(err).ToNot(HaveOccurred()) + Expect(p.Stderr()).To(Equal(body)) + }) + }) + + Describe("Stdout", func() { + It("Should handle missing paths", func() { + _, err := p.Stdout() + Expect(err).To(MatchError(ErrInvalidProcess)) + }) + + It("Should read the file", func() { + p.StdoutFile = filepath.Join(td, "stdout") + body := []byte("stdout") + err = os.WriteFile(p.StdoutFile, body, 0700) + Expect(err).ToNot(HaveOccurred()) + Expect(p.Stdout()).To(Equal(body)) + }) + }) + + Describe("ParsePid", func() { + It("Should detect no pid file set", func() { + pid, err := p.ParsePid() + Expect(pid).To(Equal(-1)) + Expect(err).To(MatchError(ContainSubstring("no pid file configured"))) + }) + + It("Should handle read errors", func() { + p.PidFile = filepath.Join(td, "pid") + pid, err := p.ParsePid() + Expect(pid).To(Equal(-1)) + Expect(err).To(MatchError(ContainSubstring("no such file or directory"))) + }) + + It("Should handle empty pid files", func() { + p.PidFile = filepath.Join(td, "pid") + f, err := os.Create(p.PidFile) + Expect(err).ToNot(HaveOccurred()) + f.Close() + + pid, err := p.ParsePid() + Expect(pid).To(Equal(-1)) + Expect(err).To(MatchError(ContainSubstring("0 length pid"))) + }) + + It("Should handle corrupt pid files", func() { + p.PidFile = filepath.Join(td, "pid") + err := os.WriteFile(p.PidFile, []byte("a"), 0700) + Expect(err).ToNot(HaveOccurred()) + pid, err := p.ParsePid() + Expect(pid).To(Equal(-1)) + Expect(err).To(MatchError(ContainSubstring("invalid syntax"))) + }) + + It("Should correctly parse the pid file", func() { + p.PidFile = filepath.Join(td, "pid") + err := os.WriteFile(p.PidFile, []byte("10"), 0700) + Expect(err).ToNot(HaveOccurred()) + pid, err := p.ParsePid() + Expect(pid).To(Equal(10)) + Expect(err).ToNot(HaveOccurred()) + }) + }) +}) diff --git a/providers/governor/streams/governor_test.go b/providers/governor/streams/governor_test.go index be9908edb..8a43bce10 100644 --- a/providers/governor/streams/governor_test.go +++ b/providers/governor/streams/governor_test.go @@ -1,18 +1,4 @@ -// Copyright 2020-2022 The NATS Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -// Copyright (c) 2022, R.I. Pienaar and the Choria Project contributors +// Copyright (c) 2022-2025, R.I. Pienaar and the Choria Project contributors // // SPDX-License-Identifier: Apache-2.0 @@ -36,7 +22,7 @@ import ( func TestGovernor(t *testing.T) { RegisterFailHandler(Fail) - RunSpecs(t, "Provtarget") + RunSpecs(t, "Governor") } func startJSServer() (*natsd.Server, *nats.Conn) {