Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Catch termination signals and shut down the daemons nicely #233

Merged
merged 1 commit into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ func main() {
go com.ResponseQueueListen()
}

// Start background signal handler
exitSignal := make(chan struct{}, 1)
go com.SignalHandler(&exitSignal)

// Load our self signed CA chain
ourCAPool = x509.NewCertPool()
certFile, err := os.ReadFile(com.Conf.DB4S.CAChain)
Expand Down Expand Up @@ -164,14 +168,10 @@ func main() {

// Start API server
log.Printf("%s: listening on %s", com.Conf.Live.Nodename, server)
err = srv.ListenAndServeTLS(com.Conf.Api.Certificate, com.Conf.Api.CertificateKey)

// Shut down nicely
com.DisconnectPostgreSQL()
go srv.ListenAndServeTLS(com.Conf.Api.Certificate, com.Conf.Api.CertificateKey)

if err != nil {
log.Fatal(err)
}
// Wait for exit signal
<-exitSignal
}

// checkAuth authenticates and logs the incoming request
Expand Down
25 changes: 17 additions & 8 deletions common/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -1325,12 +1325,20 @@ func DeleteLicence(userName, licenceName string) (err error) {
return nil
}

// DisconnectPostgreSQL disconnects the PostgreSQL database connection
// DisconnectPostgreSQL disconnects the PostgreSQL database connections
func DisconnectPostgreSQL() {
pdb.Close()
if pdb != nil {
pdb.Close()
}

if !UseAMQP {
// Don't bother trying to close the job responses listener connection, as it just blocks
//JobListenConn.Close(context.Background())

// Log successful disconnection
log.Printf("Disconnected from PostgreSQL server: %v:%v", Conf.Pg.Server, uint16(Conf.Pg.Port))
if JobQueueConn != nil {
JobQueueConn.Close()
}
}
}

// Discussions returns the list of discussions or MRs for a given database
Expand Down Expand Up @@ -3499,11 +3507,15 @@ func StatusUpdatesLoop() {
timeStamp time.Time
}
for {
// Wait at the start of the loop (simpler code then adding a delay before each continue statement below)
time.Sleep(Conf.Event.Delay * time.Second)

// Begin a transaction
var tx pgx.Tx
tx, err = pdb.Begin(context.Background())
if err != nil {
log.Printf("Couldn't begin database transaction for status update processing loop: %s", err.Error())
log.Printf("%s: couldn't begin database transaction for status update processing loop: %s",
Conf.Live.Nodename, err.Error())
continue
}

Expand Down Expand Up @@ -3727,9 +3739,6 @@ func StatusUpdatesLoop() {
log.Printf("Could not commit transaction when processing status updates: %v", err.Error())
continue
}

// Wait before running the loop again
time.Sleep(Conf.Event.Delay * time.Second)
}
return
}
Expand Down
6 changes: 6 additions & 0 deletions common/postgresql_live.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,12 @@ func ResponseQueueListen() {
}

// Listen for notify events
if JobListenConn == nil {
log.Fatalf("%v: ERROR, couldn't start ResponseQueueListen() as JobListenConn IS NILL", Conf.Live.Nodename)
}
if JobListenConn.IsClosed() {
log.Fatalf("%v: ERROR, couldn't start ResponseQueueListen() as connection to job responses listener is NOT open", Conf.Live.Nodename)
}
_, err := JobListenConn.Exec(context.Background(), "LISTEN job_responses_queue")
if err != nil {
log.Fatal(err)
Expand Down
49 changes: 48 additions & 1 deletion common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (
"math/rand"
"net/http"
"os"
"os/signal"
"runtime"
"strings"
"syscall"
"time"

"github.com/minio/minio-go"
Expand All @@ -39,7 +41,7 @@ func (*FilteringErrorLogWriter) Write(msg []byte) (int, error) {
return len(msg), nil
}

// Filter out the copious 'TLS handshake error' messages we're getting
// HttpErrorLog is used to filter out the copious 'TLS handshake error' messages we're getting
func HttpErrorLog() *log.Logger {
httpErrorLogger = log.New(&FilteringErrorLogWriter{}, "", log.LstdFlags)
return httpErrorLogger
Expand Down Expand Up @@ -1107,6 +1109,51 @@ func RandomString(length int) string {
return string(randomString)
}

// SignalHandler is a background goroutine that exists to catch *nix termination signals then shut the daemon down cleanly
func SignalHandler(done *chan struct{}) {
// Catch signals
z := make(chan os.Signal, 1)
signal.Notify(z, syscall.SIGINT, syscall.SIGTERM)
sig := <-z
log.Printf("%s: received signal '%s', shutting down", Conf.Live.Nodename, sig)

// On non-live nodes, wait for the job response queue to be empty. aka not be waiting on in-flight responses from the live nodes
if ResponseQueue != nil {
loop := 0
ResponseQueue.RLock()
queueLength := len(ResponseQueue.receivers)
ResponseQueue.RUnlock()
for queueLength > 0 {
log.Printf("%s: response queue not empty (%d), waiting for 1/2 second then trying again", Conf.Live.Nodename, queueLength)
time.Sleep(500 * time.Millisecond)
loop++
if loop >= 20 {
log.Printf("%s: response queue not empty (%d) after 10 seconds. Exiting anyway", Conf.Live.Nodename, queueLength)
break
}
ResponseQueue.RLock()
queueLength = len(ResponseQueue.receivers)
ResponseQueue.RUnlock()
if queueLength == 0 {
log.Printf("%s: response queue now empty, shutting down", Conf.Live.Nodename)
}
}
}

// Shut down connections
DisconnectPostgreSQL()
if UseAMQP {
err := CloseMQChannel(AmqpChan)
if err != nil {
log.Fatal(err)
}
}

// The application is ready to exit
*done <- struct{}{}
return
}

// StatusUpdateCheck checks if a status update for the user exists for a given discussion or MR, and if so then removes it
func StatusUpdateCheck(dbOwner, dbName string, thisID int, userName string) (numStatusUpdates int, err error) {
var lst map[string][]StatusUpdateEntry
Expand Down
24 changes: 23 additions & 1 deletion db4s/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ func main() {
log.Fatal(err)
}

// Connect to job queue server
com.AmqpChan, err = com.ConnectQueue()
if err != nil {
log.Fatal(err)
}

// Connect to the Memcached server
err = com.ConnectCache()
if err != nil {
Expand All @@ -77,6 +83,19 @@ func main() {
log.Fatal(err)
}

// Start background goroutines to handle job queue responses
if !com.UseAMQP {
com.ResponseQueue = com.NewResponseQueue()
com.CheckResponsesQueue = make(chan struct{})
com.SubmitterInstance = com.RandomString(3)
go com.ResponseQueueCheck()
go com.ResponseQueueListen()
}

// Start background signal handler
exitSignal := make(chan struct{}, 1)
go com.SignalHandler(&exitSignal)

// Load our self signed CA chain
ourCAPool = x509.NewCertPool()
certFile, err := os.ReadFile(com.Conf.DB4S.CAChain)
Expand Down Expand Up @@ -123,7 +142,10 @@ func main() {

// Start server
log.Printf("%s: listening for requests on %s", com.Conf.Live.Nodename, server)
log.Fatal(newServer.ListenAndServeTLS(com.Conf.DB4S.Certificate, com.Conf.DB4S.CertificateKey))
go newServer.ListenAndServeTLS(com.Conf.DB4S.Certificate, com.Conf.DB4S.CertificateKey)

// Wait for exit signal
<-exitSignal
}

// Returns the list of branches for a database
Expand Down
14 changes: 6 additions & 8 deletions live/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func main() {
log.Fatal(err)
}

// Start background signal handler
exitSignal := make(chan struct{}, 1)
go com.SignalHandler(&exitSignal)

// Make sure the channel to the AMQP server is still open
if com.UseAMQP {
// Create queue for receiving new database creation requests
Expand Down Expand Up @@ -440,12 +444,6 @@ func main() {

log.Printf("%s: listening for requests", com.Conf.Live.Nodename)

// Endless loop
var forever chan struct{}
<-forever

// Close the channel to the MQ server
if com.UseAMQP {
_ = com.CloseMQChannel(ch)
}
// Wait for exit signal
<- exitSignal
}
18 changes: 7 additions & 11 deletions webui/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3196,6 +3196,10 @@ func main() {
go com.ResponseQueueListen()
}

// Start background signal handler
exitSignal := make(chan struct{}, 1)
go com.SignalHandler(&exitSignal)

// Our pages
http.Handle("/", gz.GzipHandler(logReq(mainHandler)))
http.Handle("/about", gz.GzipHandler(logReq(aboutPage)))
Expand Down Expand Up @@ -3498,18 +3502,10 @@ func main() {
MinVersion: tls.VersionTLS12, // TLS 1.2 is now the lowest acceptable level
},
}
err = srv.ListenAndServeTLS(com.Conf.Web.Certificate, com.Conf.Web.CertificateKey)

// Shut down nicely
com.DisconnectPostgreSQL()
if err != nil {
log.Println(err)
}
go srv.ListenAndServeTLS(com.Conf.Web.Certificate, com.Conf.Web.CertificateKey)

err = com.CloseMQChannel(com.AmqpChan)
if err != nil {
log.Fatal(err)
}
// Wait for exit signal
<-exitSignal
}

func mainHandler(w http.ResponseWriter, r *http.Request) {
Expand Down