Skip to content

Commit

Permalink
(choria-io#2209) Add a signal API
Browse files Browse the repository at this point in the history
This adds a signal action to the executor agent.
Signal is only allowed if the caller was authorized
to communicate with the agent#action that started the
process

Additionally the status action will provide the command
and arguments if similarly authorized

Signed-off-by: R.I.Pienaar <[email protected]>
  • Loading branch information
ripienaar committed Jan 23, 2025
1 parent 630fe8a commit 4d7e9f4
Show file tree
Hide file tree
Showing 7 changed files with 233 additions and 18 deletions.
48 changes: 48 additions & 0 deletions internal/fs/ddl/cache/agent/executor.ddl
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,39 @@ metadata :name => "executor",
:timeout => 20


action "signal", :description => "Sends a signal to a process" do
display :always

input :id,
:prompt => "Job ID",
:description => "The unique ID for the job",
:type => :string,
:validation => '.',
:maxlength => 20,
:optional => false


input :signal,
:prompt => "Signal",
:description => "The signal to send",
:type => :integer,
:optional => false




output :pid,
:description => "The PID that was signalled",
:type => "integer",
:display_as => "PID"

output :running,
:description => "If the process was running after signalling",
:type => "boolean",
:display_as => "Running"

end

action "status", :description => "Requests the status of a job by ID" do
display :always

Expand All @@ -32,16 +65,31 @@ action "status", :description => "Requests the status of a job by ID" do
:type => "string",
:display_as => "Agent"

output :args,
:description => "The command arguments, if the caller has access",
:type => "string",
:display_as => "Arguments"

output :caller,
:description => "The Caller ID who started the process",
:type => "string",
:display_as => "Caller"

output :command,
:description => "The command being executed, if the caller has access",
:type => "string",
:display_as => "Command"

output :exit_code,
:description => "The exit code the process terminated with",
:type => "integer",
:display_as => "Exit Code"

output :exit_reason,
:description => "If the process failed, the reason for th failure",
:type => "string",
:display_as => "Exit Reason"

output :pid,
:description => "The OS Process ID",
:type => "integer",
Expand Down
53 changes: 50 additions & 3 deletions internal/fs/ddl/cache/agent/executor.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,42 @@
"provider": "golang"
},
"actions": [
{
"action": "signal",
"description": "Sends a signal to a process",
"display": "always",
"input": {
"id": {
"prompt": "Job ID",
"description": "The unique ID for the job",
"type": "string",
"maxlength": 20,
"validation": ".",
"optional": false
},
"signal": {
"prompt": "Signal",
"description": "The signal to send",
"type": "integer"
}
},
"output": {
"pid": {
"description": "The PID that was signalled",
"type": "integer",
"display_as": "PID"
},
"running": {
"description": "If the process was running after signalling",
"type": "boolean",
"display_as": "Running"
}
}
},
{
"action": "status",
"display": "always",
"description": "Requests the status of a job by ID",
"input": {
"id": {
"prompt": "Job ID",
Expand All @@ -24,6 +58,16 @@
}
},
"output": {
"command": {
"description": "The command being executed, if the caller has access",
"type": "string",
"display_as": "Command"
},
"args": {
"description": "The command arguments, if the caller has access",
"type": "string",
"display_as": "Arguments"
},
"action": {
"description": "The RPC Action that started the process",
"display_as": "Action",
Expand All @@ -44,6 +88,11 @@
"display_as": "Exit Code",
"type": "integer"
},
"exit_reason": {
"description": "If the process failed, the reason for th failure",
"display_as": "Exit Reason",
"type": "string"
},
"pid": {
"description": "The OS Process ID",
"display_as": "Pid",
Expand Down Expand Up @@ -84,9 +133,7 @@
"display_as": "STDERR Bytes",
"type": "integer"
}
},
"display": "always",
"description": "Requests the status of a job by ID"
}
}
]
}
1 change: 1 addition & 0 deletions providers/agent/mcorpc/golang/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func New(mgr server.AgentManager) (*mcorpc.Agent, error) {
})

agent.MustRegisterAction("status", statusAction)
agent.MustRegisterAction("signal", signalAction)

return agent, nil
}
82 changes: 82 additions & 0 deletions providers/agent/mcorpc/golang/executor/signal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright (c) 2025, R.I. Pienaar and the Choria Project contributors
//
// SPDX-License-Identifier: Apache-2.0

package executor

import (
"context"
"syscall"

"github.com/choria-io/go-choria/inter"
"github.com/choria-io/go-choria/providers/agent/mcorpc"
"github.com/choria-io/go-choria/providers/execution"
)

type SignalRequest struct {
JobID string `json:"id"`
Signal int `json:"signal"`
}

type SignalResponse struct {
Pid int `json:"pid"`
Running bool `json:"running"`
}

func signalAction(ctx context.Context, req *mcorpc.Request, reply *mcorpc.Reply, agent *mcorpc.Agent, conn inter.ConnectorInfo) {
spool := agent.Config.Choria.ExecutorSpool
if spool == "" {
abort(reply, "Executor spool is not configured")
return
}

args := &SignalRequest{}

if !mcorpc.ParseRequestData(args, req, reply) {
return
}

if args.JobID == "" {
abort(reply, "ID is required")
}

if args.Signal < 0 {
abort(reply, "Signal is required")
}

resp := &SignalResponse{}

p, err := execution.Load(spool, args.JobID)
if err != nil {
abort(reply, "Could not load job: %v", err.Error())
return
}

if proxyAuthorize(p, req, agent) {
agent.Log.Warnf("Denying %s access to process created by %s#%s based on authorization policy for request %s", req.CallerID, p.Agent, p.Action, req.RequestID)
abort(reply, "You are not authorized to call this %s#%s", p.Agent, p.Action)
return
}

resp.Running = p.IsRunning()
if !resp.Running {
abort(reply, "Job %s is not running", args.JobID)
return
}

resp.Pid, err = p.ParsePid()
if err != nil {
abort(reply, "Could not parse pid file: %v", err.Error())
return
}

err = p.Signal(syscall.Signal(args.Signal))
if err != nil {
abort(reply, "Could not send signal: %v", err.Error())
return
}

resp.Running = p.IsRunning()

reply.Data = resp
}
27 changes: 21 additions & 6 deletions providers/agent/mcorpc/golang/executor/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,27 @@ package executor

import (
"context"
"errors"
"strings"
"time"

"github.com/choria-io/go-choria/inter"
"github.com/choria-io/go-choria/providers/agent/mcorpc"
"github.com/choria-io/go-choria/providers/execution"
"time"
)

type StatusRequest struct {
JobID string `json:"id"`
}

type StatusResponse struct {
Command string `json:"command"`
Args string `json:"args"`
Started bool `json:"started"`
StartTime time.Time `json:"start_time"`
TerminateTime time.Time `json:"terminate_time"`
ExitCode int `json:"exit_code"`
ExitReason string `json:"exit_reason"`
Running bool `json:"running"`
Agent string `json:"agent"`
Action string `json:"action"`
Expand All @@ -33,7 +39,6 @@ type StatusResponse struct {

func statusAction(ctx context.Context, req *mcorpc.Request, reply *mcorpc.Reply, agent *mcorpc.Agent, conn inter.ConnectorInfo) {
spool := agent.Config.Choria.ExecutorSpool

if spool == "" {
abort(reply, "Executor spool is not configured")
return
Expand All @@ -44,15 +49,18 @@ func statusAction(ctx context.Context, req *mcorpc.Request, reply *mcorpc.Reply,
return
}

if args.JobID == "" {
abort(reply, "ID is required")
}

p, err := execution.Load(spool, args.JobID)
if err != nil {
reply.Statuscode = mcorpc.Aborted
reply.Statusmsg = err.Error()
abort(reply, "Could not load job: %v", err)
abort(reply, "Could not load job: %v", err.Error())
return
}

resp := &StatusResponse{
Command: "Not authorized",
Running: p.IsRunning(),
StartTime: p.StartTime,
TerminateTime: p.TerminateTime,
Expand All @@ -64,6 +72,11 @@ func statusAction(ctx context.Context, req *mcorpc.Request, reply *mcorpc.Reply,
ExitCode: -1,
}

if proxyAuthorize(p, req, agent) {
resp.Command = p.Command
resp.Args = strings.Join(p.Args, " ")
}

resp.Started, err = p.HasStarted()
if err != nil {
abort(reply, "Could not check if job is started: %v", err)
Expand All @@ -80,7 +93,9 @@ func statusAction(ctx context.Context, req *mcorpc.Request, reply *mcorpc.Reply,

if !resp.Running && resp.Started {
resp.ExitCode, err = p.ParseExitCode()
if err != nil {
if errors.Is(err, execution.ErrProcessFailed) {
resp.ExitReason = err.Error()
} else if err != nil {
abort(reply, "Could not parse exit code: %v", err)
return
}
Expand Down
21 changes: 21 additions & 0 deletions providers/agent/mcorpc/golang/executor/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,31 @@ package executor

import (
"fmt"
"time"

"github.com/choria-io/go-choria/providers/agent/mcorpc"
"github.com/choria-io/go-choria/providers/execution"
)

func abort(reply *mcorpc.Reply, format string, a ...any) {
reply.Statuscode = mcorpc.Aborted
reply.Statusmsg = fmt.Sprintf(format, a...)
}

func proxyAuthorize(p *execution.Process, req *mcorpc.Request, agent *mcorpc.Agent) bool {
processRequest := &mcorpc.Request{
Agent: p.Agent,
Action: p.Action,
RequestID: p.RequestID,
SenderID: req.SenderID,
CallerID: req.CallerID,
Collective: req.Collective,
TTL: req.TTL,
Time: time.Time{},
Filter: req.Filter,
CallerPublicData: req.CallerPublicData,
SignerPublicData: req.SignerPublicData,
}

return mcorpc.AuthorizeRequest(agent.Choria, processRequest, agent.Config, agent.ServerInfoSource, agent.Log)
}
Loading

0 comments on commit 4d7e9f4

Please sign in to comment.