Skip to content

Commit

Permalink
Use RWMutex instead of Mutex for the job queue
Browse files Browse the repository at this point in the history
Should be a safe optimisation for the job queue code.  Also renamed
a variable for better clarity.
  • Loading branch information
justinclift committed Dec 12, 2023
1 parent 54d8bb5 commit abcdde2
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 15 deletions.
2 changes: 1 addition & 1 deletion api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func main() {

// Start background goroutines to handle job queue responses
if !com.UseAMQP {
com.ResponseWaiters = com.NewResponseReceiver()
com.ResponseQueue = com.NewResponseQueue()
com.CheckResponsesQueue = make(chan struct{})
com.SubmitterInstance = com.RandomString(3)
go com.ResponseQueueCheck()
Expand Down
4 changes: 2 additions & 2 deletions common/live.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,13 +691,13 @@ func RemoveLiveDB(dbOwner, dbName string) (err error) {
func WaitForResponse[T any](jobID int, resp *T) (err error) {
// Add the response receiver
responseChan := make(chan ResponseInfo)
ResponseWaiters.AddReceiver(jobID, &responseChan)
ResponseQueue.AddReceiver(jobID, &responseChan)

// Wait for a response
response := <-responseChan

// Remove the response receiver
ResponseWaiters.RemoveReceiver(jobID)
ResponseQueue.RemoveReceiver(jobID)

// Update the response status to 'processed' (should be fine done async)
go ResponseComplete(response.responseID)
Expand Down
8 changes: 4 additions & 4 deletions common/live_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,14 @@ type ResponseInfo struct {

// ResponseReceivers is a simple structure used for matching up job queue responses to the caller who submitted the job
type ResponseReceivers struct {
sync.Mutex
sync.RWMutex
receivers map[int]*chan ResponseInfo
}

// NewResponseReceiver is the constructor function for creating new ResponseReceivers
func NewResponseReceiver() *ResponseReceivers {
// NewResponseQueue is the constructor function for creating a new ResponseReceivers
func NewResponseQueue() *ResponseReceivers {
z := ResponseReceivers{
Mutex: sync.Mutex{},
RWMutex: sync.RWMutex{},
receivers: nil,
}
z.receivers = make(map[int]*chan ResponseInfo)
Expand Down
13 changes: 6 additions & 7 deletions common/postgresql_live.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ var (
// CheckResponsesQueue is used by the non-live daemons for triggering a check of the job responses queue
CheckResponsesQueue chan struct{}

// ResponseWaiters is used to direct job queue responses back to the appropriate callers
ResponseWaiters *ResponseReceivers
// ResponseQueue is used to direct job queue responses back to the appropriate callers
ResponseQueue *ResponseReceivers

// SubmitterInstance is a random string generated at server start for identification purposes
SubmitterInstance string
Expand All @@ -49,8 +49,7 @@ func JobQueueCheck() {
continue
}

// TODO: a) we're in a loop so can't use defer, and
// b) we'll likely need to update the job state to 'error' on failure instead of rolling back
// TODO: should we update the job state to 'error' on failure?

dbQuery := `
SELECT job_id, operation, submitter_node, details
Expand Down Expand Up @@ -523,12 +522,12 @@ func ResponseQueueCheck() {
log.Printf("%s: picked up response %d for jobID %d", Conf.Live.Nodename, responseID, jobID)
}

ResponseWaiters.Lock()
receiverChan, ok := ResponseWaiters.receivers[jobID]
ResponseQueue.RLock()
receiverChan, ok := ResponseQueue.receivers[jobID]
if ok {
*receiverChan <- ResponseInfo{jobID: jobID, responseID: responseID, payload: details}
}
ResponseWaiters.Unlock()
ResponseQueue.RUnlock()
return nil
})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion webui/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3189,7 +3189,7 @@ func main() {

// Start background goroutines to handle job queue responses
if !com.UseAMQP {
com.ResponseWaiters = com.NewResponseReceiver()
com.ResponseQueue = com.NewResponseQueue()
com.CheckResponsesQueue = make(chan struct{})
com.SubmitterInstance = com.RandomString(3)
go com.ResponseQueueCheck()
Expand Down

0 comments on commit abcdde2

Please sign in to comment.