diff --git a/api/main.go b/api/main.go index b49938a29..0a0d8b4bc 100644 --- a/api/main.go +++ b/api/main.go @@ -106,6 +106,10 @@ func main() { go com.ResponseQueueListen() } + // Start background signal handler + exitSignal := make(chan struct{}, 1) + go com.SignalHandler(&exitSignal) + // Load our self signed CA chain ourCAPool = x509.NewCertPool() certFile, err := os.ReadFile(com.Conf.DB4S.CAChain) @@ -164,14 +168,10 @@ func main() { // Start API server log.Printf("%s: listening on %s", com.Conf.Live.Nodename, server) - err = srv.ListenAndServeTLS(com.Conf.Api.Certificate, com.Conf.Api.CertificateKey) - - // Shut down nicely - com.DisconnectPostgreSQL() + go srv.ListenAndServeTLS(com.Conf.Api.Certificate, com.Conf.Api.CertificateKey) - if err != nil { - log.Fatal(err) - } + // Wait for exit signal + <-exitSignal } // checkAuth authenticates and logs the incoming request diff --git a/common/postgresql.go b/common/postgresql.go index cd8d272a6..7cac43675 100644 --- a/common/postgresql.go +++ b/common/postgresql.go @@ -1325,12 +1325,20 @@ func DeleteLicence(userName, licenceName string) (err error) { return nil } -// DisconnectPostgreSQL disconnects the PostgreSQL database connection +// DisconnectPostgreSQL disconnects the PostgreSQL database connections func DisconnectPostgreSQL() { - pdb.Close() + if pdb != nil { + pdb.Close() + } + + if !UseAMQP { + // Don't bother trying to close the job responses listener connection, as it just blocks + //JobListenConn.Close(context.Background()) - // Log successful disconnection - log.Printf("Disconnected from PostgreSQL server: %v:%v", Conf.Pg.Server, uint16(Conf.Pg.Port)) + if JobQueueConn != nil { + JobQueueConn.Close() + } + } } // Discussions returns the list of discussions or MRs for a given database @@ -3499,11 +3507,15 @@ func StatusUpdatesLoop() { timeStamp time.Time } for { + // Wait at the start of the loop (simpler code then adding a delay before each continue statement below) + time.Sleep(Conf.Event.Delay * time.Second) + // Begin a transaction var tx pgx.Tx tx, err = pdb.Begin(context.Background()) if err != nil { - log.Printf("Couldn't begin database transaction for status update processing loop: %s", err.Error()) + log.Printf("%s: couldn't begin database transaction for status update processing loop: %s", + Conf.Live.Nodename, err.Error()) continue } @@ -3727,9 +3739,6 @@ func StatusUpdatesLoop() { log.Printf("Could not commit transaction when processing status updates: %v", err.Error()) continue } - - // Wait before running the loop again - time.Sleep(Conf.Event.Delay * time.Second) } return } diff --git a/common/postgresql_live.go b/common/postgresql_live.go index 9c1a51842..3968c5736 100644 --- a/common/postgresql_live.go +++ b/common/postgresql_live.go @@ -545,6 +545,12 @@ func ResponseQueueListen() { } // Listen for notify events + if JobListenConn == nil { + log.Fatalf("%v: ERROR, couldn't start ResponseQueueListen() as JobListenConn IS NILL", Conf.Live.Nodename) + } + if JobListenConn.IsClosed() { + log.Fatalf("%v: ERROR, couldn't start ResponseQueueListen() as connection to job responses listener is NOT open", Conf.Live.Nodename) + } _, err := JobListenConn.Exec(context.Background(), "LISTEN job_responses_queue") if err != nil { log.Fatal(err) diff --git a/common/util.go b/common/util.go index 3b781cfbc..b57e4a0e3 100644 --- a/common/util.go +++ b/common/util.go @@ -11,8 +11,10 @@ import ( "math/rand" "net/http" "os" + "os/signal" "runtime" "strings" + "syscall" "time" "github.com/minio/minio-go" @@ -39,7 +41,7 @@ func (*FilteringErrorLogWriter) Write(msg []byte) (int, error) { return len(msg), nil } -// Filter out the copious 'TLS handshake error' messages we're getting +// HttpErrorLog is used to filter out the copious 'TLS handshake error' messages we're getting func HttpErrorLog() *log.Logger { httpErrorLogger = log.New(&FilteringErrorLogWriter{}, "", log.LstdFlags) return httpErrorLogger @@ -1107,6 +1109,51 @@ func RandomString(length int) string { return string(randomString) } +// SignalHandler is a background goroutine that exists to catch *nix termination signals then shut the daemon down cleanly +func SignalHandler(done *chan struct{}) { + // Catch signals + z := make(chan os.Signal, 1) + signal.Notify(z, syscall.SIGINT, syscall.SIGTERM) + sig := <-z + log.Printf("%s: received signal '%s', shutting down", Conf.Live.Nodename, sig) + + // On non-live nodes, wait for the job response queue to be empty. aka not be waiting on in-flight responses from the live nodes + if ResponseQueue != nil { + loop := 0 + ResponseQueue.RLock() + queueLength := len(ResponseQueue.receivers) + ResponseQueue.RUnlock() + for queueLength > 0 { + log.Printf("%s: response queue not empty (%d), waiting for 1/2 second then trying again", Conf.Live.Nodename, queueLength) + time.Sleep(500 * time.Millisecond) + loop++ + if loop >= 20 { + log.Printf("%s: response queue not empty (%d) after 10 seconds. Exiting anyway", Conf.Live.Nodename, queueLength) + break + } + ResponseQueue.RLock() + queueLength = len(ResponseQueue.receivers) + ResponseQueue.RUnlock() + if queueLength == 0 { + log.Printf("%s: response queue now empty, shutting down", Conf.Live.Nodename) + } + } + } + + // Shut down connections + DisconnectPostgreSQL() + if UseAMQP { + err := CloseMQChannel(AmqpChan) + if err != nil { + log.Fatal(err) + } + } + + // The application is ready to exit + *done <- struct{}{} + return +} + // StatusUpdateCheck checks if a status update for the user exists for a given discussion or MR, and if so then removes it func StatusUpdateCheck(dbOwner, dbName string, thisID int, userName string) (numStatusUpdates int, err error) { var lst map[string][]StatusUpdateEntry diff --git a/db4s/main.go b/db4s/main.go index d8269c3cc..1e923430b 100644 --- a/db4s/main.go +++ b/db4s/main.go @@ -59,6 +59,12 @@ func main() { log.Fatal(err) } + // Connect to job queue server + com.AmqpChan, err = com.ConnectQueue() + if err != nil { + log.Fatal(err) + } + // Connect to the Memcached server err = com.ConnectCache() if err != nil { @@ -77,6 +83,19 @@ func main() { log.Fatal(err) } + // Start background goroutines to handle job queue responses + if !com.UseAMQP { + com.ResponseQueue = com.NewResponseQueue() + com.CheckResponsesQueue = make(chan struct{}) + com.SubmitterInstance = com.RandomString(3) + go com.ResponseQueueCheck() + go com.ResponseQueueListen() + } + + // Start background signal handler + exitSignal := make(chan struct{}, 1) + go com.SignalHandler(&exitSignal) + // Load our self signed CA chain ourCAPool = x509.NewCertPool() certFile, err := os.ReadFile(com.Conf.DB4S.CAChain) @@ -123,7 +142,10 @@ func main() { // Start server log.Printf("%s: listening for requests on %s", com.Conf.Live.Nodename, server) - log.Fatal(newServer.ListenAndServeTLS(com.Conf.DB4S.Certificate, com.Conf.DB4S.CertificateKey)) + go newServer.ListenAndServeTLS(com.Conf.DB4S.Certificate, com.Conf.DB4S.CertificateKey) + + // Wait for exit signal + <-exitSignal } // Returns the list of branches for a database diff --git a/live/main.go b/live/main.go index a17356e04..b4fe050d3 100644 --- a/live/main.go +++ b/live/main.go @@ -67,6 +67,10 @@ func main() { log.Fatal(err) } + // Start background signal handler + exitSignal := make(chan struct{}, 1) + go com.SignalHandler(&exitSignal) + // Make sure the channel to the AMQP server is still open if com.UseAMQP { // Create queue for receiving new database creation requests @@ -440,12 +444,6 @@ func main() { log.Printf("%s: listening for requests", com.Conf.Live.Nodename) - // Endless loop - var forever chan struct{} - <-forever - - // Close the channel to the MQ server - if com.UseAMQP { - _ = com.CloseMQChannel(ch) - } + // Wait for exit signal + <- exitSignal } diff --git a/webui/main.go b/webui/main.go index 8da565ffb..955777600 100644 --- a/webui/main.go +++ b/webui/main.go @@ -3196,6 +3196,10 @@ func main() { go com.ResponseQueueListen() } + // Start background signal handler + exitSignal := make(chan struct{}, 1) + go com.SignalHandler(&exitSignal) + // Our pages http.Handle("/", gz.GzipHandler(logReq(mainHandler))) http.Handle("/about", gz.GzipHandler(logReq(aboutPage))) @@ -3498,18 +3502,10 @@ func main() { MinVersion: tls.VersionTLS12, // TLS 1.2 is now the lowest acceptable level }, } - err = srv.ListenAndServeTLS(com.Conf.Web.Certificate, com.Conf.Web.CertificateKey) - - // Shut down nicely - com.DisconnectPostgreSQL() - if err != nil { - log.Println(err) - } + go srv.ListenAndServeTLS(com.Conf.Web.Certificate, com.Conf.Web.CertificateKey) - err = com.CloseMQChannel(com.AmqpChan) - if err != nil { - log.Fatal(err) - } + // Wait for exit signal + <-exitSignal } func mainHandler(w http.ResponseWriter, r *http.Request) {