Skip to content

Commit

Permalink
Catch termination signals and shut down the daemons nicely
Browse files Browse the repository at this point in the history
This should avoid a potential race condition where a daemon shuts
down after submitting a job to the live nodes, but before processing
the response.
  • Loading branch information
justinclift committed Dec 13, 2023
1 parent 9c34acb commit 3f5a94e
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 36 deletions.
14 changes: 7 additions & 7 deletions api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ func main() {
go com.ResponseQueueListen()
}

// Start background signal handler
exitSignal := make(chan struct{}, 1)
go com.SignalHandler(&exitSignal)

// Load our self signed CA chain
ourCAPool = x509.NewCertPool()
certFile, err := os.ReadFile(com.Conf.DB4S.CAChain)
Expand Down Expand Up @@ -164,14 +168,10 @@ func main() {

// Start API server
log.Printf("%s: listening on %s", com.Conf.Live.Nodename, server)
err = srv.ListenAndServeTLS(com.Conf.Api.Certificate, com.Conf.Api.CertificateKey)

// Shut down nicely
com.DisconnectPostgreSQL()
go srv.ListenAndServeTLS(com.Conf.Api.Certificate, com.Conf.Api.CertificateKey)

if err != nil {
log.Fatal(err)
}
// Wait for exit signal
<-exitSignal
}

// checkAuth authenticates and logs the incoming request
Expand Down
25 changes: 17 additions & 8 deletions common/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -1325,12 +1325,20 @@ func DeleteLicence(userName, licenceName string) (err error) {
return nil
}

// DisconnectPostgreSQL disconnects the PostgreSQL database connection
// DisconnectPostgreSQL disconnects the PostgreSQL database connections
func DisconnectPostgreSQL() {
pdb.Close()
if pdb != nil {
pdb.Close()
}

if !UseAMQP {
// Don't bother trying to close the job responses listener connection, as it just blocks
//JobListenConn.Close(context.Background())

// Log successful disconnection
log.Printf("Disconnected from PostgreSQL server: %v:%v", Conf.Pg.Server, uint16(Conf.Pg.Port))
if JobQueueConn != nil {
JobQueueConn.Close()
}
}
}

// Discussions returns the list of discussions or MRs for a given database
Expand Down Expand Up @@ -3499,11 +3507,15 @@ func StatusUpdatesLoop() {
timeStamp time.Time
}
for {
// Wait at the start of the loop (simpler code then adding a delay before each continue statement below)
time.Sleep(Conf.Event.Delay * time.Second)

// Begin a transaction
var tx pgx.Tx
tx, err = pdb.Begin(context.Background())
if err != nil {
log.Printf("Couldn't begin database transaction for status update processing loop: %s", err.Error())
log.Printf("%s: couldn't begin database transaction for status update processing loop: %s",
Conf.Live.Nodename, err.Error())
continue
}

Expand Down Expand Up @@ -3727,9 +3739,6 @@ func StatusUpdatesLoop() {
log.Printf("Could not commit transaction when processing status updates: %v", err.Error())
continue
}

// Wait before running the loop again
time.Sleep(Conf.Event.Delay * time.Second)
}
return
}
Expand Down
6 changes: 6 additions & 0 deletions common/postgresql_live.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,12 @@ func ResponseQueueListen() {
}

// Listen for notify events
if JobListenConn == nil {
log.Fatalf("%v: ERROR, couldn't start ResponseQueueListen() as JobListenConn IS NILL", Conf.Live.Nodename)
}
if JobListenConn.IsClosed() {
log.Fatalf("%v: ERROR, couldn't start ResponseQueueListen() as connection to job responses listener is NOT open", Conf.Live.Nodename)
}
_, err := JobListenConn.Exec(context.Background(), "LISTEN job_responses_queue")
if err != nil {
log.Fatal(err)
Expand Down
49 changes: 48 additions & 1 deletion common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (
"math/rand"
"net/http"
"os"
"os/signal"
"runtime"
"strings"
"syscall"
"time"

"github.com/minio/minio-go"
Expand All @@ -39,7 +41,7 @@ func (*FilteringErrorLogWriter) Write(msg []byte) (int, error) {
return len(msg), nil
}

// Filter out the copious 'TLS handshake error' messages we're getting
// HttpErrorLog is used to filter out the copious 'TLS handshake error' messages we're getting
func HttpErrorLog() *log.Logger {
httpErrorLogger = log.New(&FilteringErrorLogWriter{}, "", log.LstdFlags)
return httpErrorLogger
Expand Down Expand Up @@ -1107,6 +1109,51 @@ func RandomString(length int) string {
return string(randomString)
}

// SignalHandler is a background goroutine that exists to catch *nix termination signals then shut the daemon down cleanly
func SignalHandler(done *chan struct{}) {
// Catch signals
z := make(chan os.Signal, 1)
signal.Notify(z, syscall.SIGINT, syscall.SIGTERM)
sig := <-z
log.Printf("%s: received signal '%s', shutting down", Conf.Live.Nodename, sig)

// On non-live nodes, wait for the job response queue to be empty. aka not be waiting on in-flight responses from the live nodes
if ResponseQueue != nil {
loop := 0
ResponseQueue.RLock()
queueLength := len(ResponseQueue.receivers)
ResponseQueue.RUnlock()
for queueLength > 0 {
log.Printf("%s: response queue not empty (%d), waiting for 1/2 second then trying again", Conf.Live.Nodename, queueLength)
time.Sleep(500 * time.Millisecond)
loop++
if loop >= 20 {
log.Printf("%s: response queue not empty (%d) after 10 seconds. Exiting anyway", Conf.Live.Nodename, queueLength)
break
}
ResponseQueue.RLock()
queueLength = len(ResponseQueue.receivers)
ResponseQueue.RUnlock()
if queueLength == 0 {
log.Printf("%s: response queue now empty, shutting down", Conf.Live.Nodename)
}
}
}

// Shut down connections
DisconnectPostgreSQL()
if UseAMQP {
err := CloseMQChannel(AmqpChan)
if err != nil {
log.Fatal(err)
}
}

// The application is ready to exit
*done <- struct{}{}
return
}

// StatusUpdateCheck checks if a status update for the user exists for a given discussion or MR, and if so then removes it
func StatusUpdateCheck(dbOwner, dbName string, thisID int, userName string) (numStatusUpdates int, err error) {
var lst map[string][]StatusUpdateEntry
Expand Down
24 changes: 23 additions & 1 deletion db4s/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ func main() {
log.Fatal(err)
}

// Connect to job queue server
com.AmqpChan, err = com.ConnectQueue()
if err != nil {
log.Fatal(err)
}

// Connect to the Memcached server
err = com.ConnectCache()
if err != nil {
Expand All @@ -77,6 +83,19 @@ func main() {
log.Fatal(err)
}

// Start background goroutines to handle job queue responses
if !com.UseAMQP {
com.ResponseQueue = com.NewResponseQueue()
com.CheckResponsesQueue = make(chan struct{})
com.SubmitterInstance = com.RandomString(3)
go com.ResponseQueueCheck()
go com.ResponseQueueListen()
}

// Start background signal handler
exitSignal := make(chan struct{}, 1)
go com.SignalHandler(&exitSignal)

// Load our self signed CA chain
ourCAPool = x509.NewCertPool()
certFile, err := os.ReadFile(com.Conf.DB4S.CAChain)
Expand Down Expand Up @@ -123,7 +142,10 @@ func main() {

// Start server
log.Printf("%s: listening for requests on %s", com.Conf.Live.Nodename, server)
log.Fatal(newServer.ListenAndServeTLS(com.Conf.DB4S.Certificate, com.Conf.DB4S.CertificateKey))
go newServer.ListenAndServeTLS(com.Conf.DB4S.Certificate, com.Conf.DB4S.CertificateKey)

// Wait for exit signal
<-exitSignal
}

// Returns the list of branches for a database
Expand Down
14 changes: 6 additions & 8 deletions live/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func main() {
log.Fatal(err)
}

// Start background signal handler
exitSignal := make(chan struct{}, 1)
go com.SignalHandler(&exitSignal)

// Make sure the channel to the AMQP server is still open
if com.UseAMQP {
// Create queue for receiving new database creation requests
Expand Down Expand Up @@ -440,12 +444,6 @@ func main() {

log.Printf("%s: listening for requests", com.Conf.Live.Nodename)

// Endless loop
var forever chan struct{}
<-forever

// Close the channel to the MQ server
if com.UseAMQP {
_ = com.CloseMQChannel(ch)
}
// Wait for exit signal
<- exitSignal
}
18 changes: 7 additions & 11 deletions webui/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3196,6 +3196,10 @@ func main() {
go com.ResponseQueueListen()
}

// Start background signal handler
exitSignal := make(chan struct{}, 1)
go com.SignalHandler(&exitSignal)

// Our pages
http.Handle("/", gz.GzipHandler(logReq(mainHandler)))
http.Handle("/about", gz.GzipHandler(logReq(aboutPage)))
Expand Down Expand Up @@ -3498,18 +3502,10 @@ func main() {
MinVersion: tls.VersionTLS12, // TLS 1.2 is now the lowest acceptable level
},
}
err = srv.ListenAndServeTLS(com.Conf.Web.Certificate, com.Conf.Web.CertificateKey)

// Shut down nicely
com.DisconnectPostgreSQL()
if err != nil {
log.Println(err)
}
go srv.ListenAndServeTLS(com.Conf.Web.Certificate, com.Conf.Web.CertificateKey)

err = com.CloseMQChannel(com.AmqpChan)
if err != nil {
log.Fatal(err)
}
// Wait for exit signal
<-exitSignal
}

func mainHandler(w http.ResponseWriter, r *http.Request) {
Expand Down

0 comments on commit 3f5a94e

Please sign in to comment.