From abcdde292028929568f4d73c6b4bfe6c52f53b54 Mon Sep 17 00:00:00 2001 From: Justin Clift Date: Wed, 13 Dec 2023 06:12:27 +1000 Subject: [PATCH] Use RWMutex instead of Mutex for the job queue Should be a safe optimisation for the job queue code. Also renamed a variable for better clarity. --- api/main.go | 2 +- common/live.go | 4 ++-- common/live_types.go | 8 ++++---- common/postgresql_live.go | 13 ++++++------- webui/main.go | 2 +- 5 files changed, 14 insertions(+), 15 deletions(-) diff --git a/api/main.go b/api/main.go index 7b5c939d5..b49938a29 100644 --- a/api/main.go +++ b/api/main.go @@ -99,7 +99,7 @@ func main() { // Start background goroutines to handle job queue responses if !com.UseAMQP { - com.ResponseWaiters = com.NewResponseReceiver() + com.ResponseQueue = com.NewResponseQueue() com.CheckResponsesQueue = make(chan struct{}) com.SubmitterInstance = com.RandomString(3) go com.ResponseQueueCheck() diff --git a/common/live.go b/common/live.go index ff252c22e..69d24e10f 100644 --- a/common/live.go +++ b/common/live.go @@ -691,13 +691,13 @@ func RemoveLiveDB(dbOwner, dbName string) (err error) { func WaitForResponse[T any](jobID int, resp *T) (err error) { // Add the response receiver responseChan := make(chan ResponseInfo) - ResponseWaiters.AddReceiver(jobID, &responseChan) + ResponseQueue.AddReceiver(jobID, &responseChan) // Wait for a response response := <-responseChan // Remove the response receiver - ResponseWaiters.RemoveReceiver(jobID) + ResponseQueue.RemoveReceiver(jobID) // Update the response status to 'processed' (should be fine done async) go ResponseComplete(response.responseID) diff --git a/common/live_types.go b/common/live_types.go index ceada8db5..f66880262 100644 --- a/common/live_types.go +++ b/common/live_types.go @@ -98,14 +98,14 @@ type ResponseInfo struct { // ResponseReceivers is a simple structure used for matching up job queue responses to the caller who submitted the job type ResponseReceivers struct { - sync.Mutex + sync.RWMutex receivers map[int]*chan ResponseInfo } -// NewResponseReceiver is the constructor function for creating new ResponseReceivers -func NewResponseReceiver() *ResponseReceivers { +// NewResponseQueue is the constructor function for creating a new ResponseReceivers +func NewResponseQueue() *ResponseReceivers { z := ResponseReceivers{ - Mutex: sync.Mutex{}, + RWMutex: sync.RWMutex{}, receivers: nil, } z.receivers = make(map[int]*chan ResponseInfo) diff --git a/common/postgresql_live.go b/common/postgresql_live.go index aadf85833..9c1a51842 100644 --- a/common/postgresql_live.go +++ b/common/postgresql_live.go @@ -21,8 +21,8 @@ var ( // CheckResponsesQueue is used by the non-live daemons for triggering a check of the job responses queue CheckResponsesQueue chan struct{} - // ResponseWaiters is used to direct job queue responses back to the appropriate callers - ResponseWaiters *ResponseReceivers + // ResponseQueue is used to direct job queue responses back to the appropriate callers + ResponseQueue *ResponseReceivers // SubmitterInstance is a random string generated at server start for identification purposes SubmitterInstance string @@ -49,8 +49,7 @@ func JobQueueCheck() { continue } - // TODO: a) we're in a loop so can't use defer, and - // b) we'll likely need to update the job state to 'error' on failure instead of rolling back + // TODO: should we update the job state to 'error' on failure? dbQuery := ` SELECT job_id, operation, submitter_node, details @@ -523,12 +522,12 @@ func ResponseQueueCheck() { log.Printf("%s: picked up response %d for jobID %d", Conf.Live.Nodename, responseID, jobID) } - ResponseWaiters.Lock() - receiverChan, ok := ResponseWaiters.receivers[jobID] + ResponseQueue.RLock() + receiverChan, ok := ResponseQueue.receivers[jobID] if ok { *receiverChan <- ResponseInfo{jobID: jobID, responseID: responseID, payload: details} } - ResponseWaiters.Unlock() + ResponseQueue.RUnlock() return nil }) if err != nil { diff --git a/webui/main.go b/webui/main.go index c8214723f..8da565ffb 100644 --- a/webui/main.go +++ b/webui/main.go @@ -3189,7 +3189,7 @@ func main() { // Start background goroutines to handle job queue responses if !com.UseAMQP { - com.ResponseWaiters = com.NewResponseReceiver() + com.ResponseQueue = com.NewResponseQueue() com.CheckResponsesQueue = make(chan struct{}) com.SubmitterInstance = com.RandomString(3) go com.ResponseQueueCheck()