Skip to content

Commit

Permalink
Clean up process lifecycle management in the entry package
Browse files Browse the repository at this point in the history
Before, the interrupt signalling logic (in the `wait_*.go` files) would stop the client and/or process directly.

We move this logic to the `RunMain` functions instead, and create a context that gets canceled under the same conditions.

This context is then passed to the `innerMain` function, which creates a client just like before, but binds the client's lifetime to the context.

This way, we have a more top-down approach, and remove the entry package's dependency on the client.

This allows an easy implementation of config reloading in subsequent contributions.

PiperOrigin-RevId: 666794830
  • Loading branch information
torsm authored and copybara-github committed Aug 23, 2024
1 parent 74cb012 commit f816f78
Show file tree
Hide file tree
Showing 6 changed files with 204 additions and 144 deletions.
24 changes: 13 additions & 11 deletions cmd/fleetspeak_client/fleetspeak_client.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package main

import (
"context"
"flag"
"fmt"
"os"

log "github.com/golang/glog"
"google.golang.org/protobuf/encoding/prototext"

"github.com/google/fleetspeak/fleetspeak/src/client"
Expand All @@ -22,23 +23,19 @@ import (
)

var configFile = flag.String("config", "", "Client configuration file, required.")
var profileDir = flag.String("profile-dir", "/tmp", "Profile directory.")

func innerMain() {
flag.Parse()

func innerMain(ctx context.Context) error {
b, err := os.ReadFile(*configFile)
if err != nil {
log.Exitf("Unable to read configuration file [%s]: %v", *configFile, err)
return fmt.Errorf("unable to read configuration file %q: %v", *configFile, err)
}
cfgPB := &gpb.Config{}
if err := prototext.Unmarshal(b, cfgPB); err != nil {
log.Exitf("Unable to parse configuration file [%s]: %v", *configFile, err)
return fmt.Errorf("unable to parse configuration file %q: %v", *configFile, err)
}

cfg, err := generic.MakeConfiguration(cfgPB)
if err != nil {
log.Exitf("Error in configuration file: %v", err)
return fmt.Errorf("error in configuration file: %v", err)
}

var com comms.Communicator
Expand All @@ -60,12 +57,17 @@ func innerMain() {
Stats: stats.NoopCollector{},
})
if err != nil {
log.Exitf("Error starting client: %v", err)
return fmt.Errorf("error starting client: %v", err)
}

entry.Wait(cl, *profileDir)
select {
case <-ctx.Done():
cl.Stop()
}
return nil
}

func main() {
flag.Parse()
entry.RunMain(innerMain, "FleetspeakService")
}
20 changes: 20 additions & 0 deletions fleetspeak/src/client/entry/entry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Package entry provides platform-specific wrappers around the application's
// entry points to manage its lifecycle.
package entry

import (
"context"
"time"
)

// Timeout for shutting down gracefully.
//
// If the [InnerMain] function does not return within this time, the process
// may be shut down ungracefully.
const shutdownTimeout = 10 * time.Second

// InnerMain is an inner entry function responsible for creating a
// [client.Client] and managing its configuration and lifecycle. It is called by
// [RunMain] which handles platform-specific mechanics to manage the passed
// Context.
type InnerMain func(ctx context.Context) error
94 changes: 91 additions & 3 deletions fleetspeak/src/client/entry/entry_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,95 @@

package entry

// RunMain starts the application.
func RunMain(innerMain func(), _ /* windowsServiceName */ string) {
innerMain()
import (
"context"
"flag"
"fmt"
"os"
"os/signal"
"path/filepath"
"runtime"
"runtime/pprof"
"sync"
"syscall"
"time"

log "github.com/golang/glog"
)

var (
profileDir = flag.String("profile_dir", "/tmp", "Directory to write profiling data to.")
)

// RunMain calls innerMain with a context that's influenced by signals this
// process receives.
// If innerMain does not return within shutdownTimeout after the context is
// canceled, the process will be stopped ungracefully.
func RunMain(innerMain InnerMain, _ /* windowsServiceName */ string) {
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()

context.AfterFunc(ctx, func() {
log.Info("Main context stopped, shutting down...")
time.AfterFunc(shutdownTimeout, func() {
if err := dumpProfile(*profileDir, "goroutine", 2); err != nil {
log.Errorf("Failed to dump goroutine profile: %v", err)
}
log.Exitf("Fleetspeak failed to shut down in %v. Exiting ungracefully.", shutdownTimeout)
})
})

cancelSignal := notifyFunc(func(si os.Signal) {
runtime.GC()
if err := dumpProfile(*profileDir, "heap", 0); err != nil {
log.Errorf("Failed to dump heap profile: %v", err)
}
}, syscall.SIGUSR1)
defer cancelSignal()

err := innerMain(ctx)
if err != nil {
log.Exitf("Stopped due to unrecoverable error: %v", err)
}
log.Info("Successfully stopped service.")
}

// notifyFunc is similar to other signal.Notify* functions, and calls the given
// callback each time one of the specified signals is received. It returns a
// cancelation function to reset signals and free resources.
func notifyFunc(callback func(os.Signal), signals ...os.Signal) func() {
wg := sync.WaitGroup{}
wg.Add(1)
ch := make(chan os.Signal, 1)
signal.Notify(ch, signals...)

go func() {
defer wg.Done()
for si := range ch {
callback(si)
}
}()

return func() {
signal.Stop(ch)
close(ch)
for range ch {
}
wg.Wait()
}
}

// dumpProfile writes the given pprof profile to disk with the given debug flag.
func dumpProfile(profileDir, profileName string, pprofDebugFlag int) error {
profileDumpPath := filepath.Join(profileDir, fmt.Sprintf("fleetspeakd-%s-pprof-%d-%v", profileName, os.Getpid(), time.Now().Format("2006-01-02-15-04-05.000")))
fileWriter, err := os.Create(profileDumpPath)
if err != nil {
return fmt.Errorf("unable to create profile file %q: %v", profileDumpPath, err)
}
defer fileWriter.Close()
if err := pprof.Lookup(profileName).WriteTo(fileWriter, pprofDebugFlag); err != nil {
return fmt.Errorf("unable to write %s profile %q: %v", profileName, profileDumpPath, err)
}
log.Infof("%s profile dumped to %q.", profileName, profileDumpPath)
return nil
}
108 changes: 80 additions & 28 deletions fleetspeak/src/client/entry/entry_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,57 +3,109 @@
package entry

import (
"context"
"os"
"os/signal"
"sync"
"syscall"
"time"

log "github.com/golang/glog"
"golang.org/x/sys/windows/svc"
)

type fleetspeakService struct {
innerMain func()
innerMain InnerMain
}

func (m *fleetspeakService) Execute(args []string, r <-chan svc.ChangeRequest, changes chan<- svc.Status) (svcSpecificEC bool, errno uint32) {
const cmdsAccepted = svc.AcceptStop | svc.AcceptShutdown
changes <- svc.Status{State: svc.StartPending}
changes <- svc.Status{State: svc.Running, Accepts: cmdsAccepted}
timer := time.Tick(2 * time.Second)
go m.innerMain()
loop:
for {
select {
case <-timer:
case c := <-r:
switch c.Cmd {
case svc.Interrogate:
changes <- c.CurrentStatus
case svc.Stop, svc.Shutdown:
break loop
case svc.Pause:
changes <- svc.Status{State: svc.Paused, Accepts: cmdsAccepted}
case svc.Continue:
changes <- svc.Status{State: svc.Running, Accepts: cmdsAccepted}
default:

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

enforceShutdownTimeout(ctx)

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer func() {
changes <- svc.Status{State: svc.StopPending}
wg.Done()
}()

for {
select {
case <-ctx.Done():
return
case c := <-r:
switch c.Cmd {
case svc.Interrogate:
changes <- c.CurrentStatus
case svc.Stop, svc.Shutdown:
cancel()
return
default:
log.Warningf("Unsupported control request: %v", c.Cmd)
}
}
}
}()

err := m.innerMain(ctx)
cancel()
wg.Wait()
// Returning from this function tells Windows we're shutting down. Even if we
// return an error, Windows doesn't seem to consider this orderly-shutdown an
// error, so it doesn't restart us. Hence if there's an error we exit.
if err != nil {
// Don't use log.Exitf - it unconditionally writes to stderr which doesn't
// work here.
log.Errorf("Stopped due to unrecoverable error: %v", err)
os.Exit(1)
}
changes <- svc.Status{State: svc.StopPending}

log.Info("Stopping the service.")
os.Exit(2)
return
log.Info("Successfully stopped service.")
return false, 0
}

// RunMain starts the application.
func RunMain(innerMain func(), windowsServiceName string) {
isIntSess, err := svc.IsAnInteractiveSession()
func (m *fleetspeakService) ExecuteAsRegularProcess() {
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()

enforceShutdownTimeout(ctx)

err := m.innerMain(ctx)
if err != nil {
log.Exitf("Stopped due to unrecoverable error: %v", err)
}
}

// RunMain calls innerMain with a context that's influenced by signals or
// service requests, depending on whether we're running as a service or not.
// If innerMain does not return within shutdownTimeout after the context is
// canceled, the process will be stopped ungracefully.
func RunMain(innerMain InnerMain, windowsServiceName string) {
fs := &fleetspeakService{innerMain}

isService, err := svc.IsWindowsService()
if err != nil {
log.Fatalf("failed to determine if we are running in an interactive session: %v", err)
}
if isIntSess {
innerMain()
if isService {
svc.Run(windowsServiceName, fs)
} else {
svc.Run(windowsServiceName, &fleetspeakService{innerMain})
fs.ExecuteAsRegularProcess()
}
}

func enforceShutdownTimeout(ctx context.Context) {
context.AfterFunc(ctx, func() {
log.Info("Main context stopped, shutting down...")
time.AfterFunc(shutdownTimeout, func() {
log.Exitf("Fleetspeak failed to shut down in %v. Exiting ungracefully.", shutdownTimeout)
})
})
}
65 changes: 0 additions & 65 deletions fleetspeak/src/client/entry/wait_unix.go

This file was deleted.

Loading

0 comments on commit f816f78

Please sign in to comment.