Skip to content

Commit

Permalink
Merge branch 'development' into feature/cancel-delayed-tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
wim.de.jager committed Feb 14, 2022
2 parents 4fbcdbb + 5d996ef commit 29059c0
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 8 deletions.
25 changes: 19 additions & 6 deletions v1/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Worker struct {
preTaskHandler func(*tasks.Signature)
postTaskHandler func(*tasks.Signature)
preConsumeHandler func(*Worker) bool
timeOutFunc func(*tasks.Signature) int
}

var (
Expand Down Expand Up @@ -210,8 +211,12 @@ func (worker *Worker) taskRetry(signature *tasks.Signature) error {
// Decrement the retry counter, when it reaches 0, we won't retry again
signature.RetryCount--

// Increase retry timeout
signature.RetryTimeout = retry.FibonacciNext(signature.RetryTimeout)
if worker.timeOutFunc != nil {
signature.RetryTimeout = worker.timeOutFunc(signature)
} else {
// Increase retry timeout
signature.RetryTimeout = retry.FibonacciNext(signature.RetryTimeout)
}

// Delay task by signature.RetryTimeout seconds
eta := time.Now().UTC().Add(time.Second * time.Duration(signature.RetryTimeout))
Expand All @@ -231,13 +236,17 @@ func (worker *Worker) retryTaskIn(signature *tasks.Signature, retryIn time.Durat
return fmt.Errorf("Set state to 'retry' for task %s returned error: %s", signature.UUID, err)
}

// Decrement the retry counter, when it reaches 0, we won't retry again
signature.RetryCount--

// Delay task by retryIn duration
eta := time.Now().UTC().Add(retryIn)
signature.ETA = &eta

// Decrement the retry counter, when it reaches 0, we won't retry again
signature.RetryCount--

if signature.RetryCount == 0 {
return worker.taskFailed(signature, errors.New("Task failed too many times."))
}

log.WARNING.Printf("Task %s failed. Going to retry in %.0f seconds.", signature.UUID, retryIn.Seconds())

// Send the task back to the queue
Expand Down Expand Up @@ -418,12 +427,16 @@ func (worker *Worker) SetPreConsumeHandler(handler func(*Worker) bool) {
worker.preConsumeHandler = handler
}

//SetTimeoutFunc sets a timeout for the worker to determine fibonacci or custom
func (worker *Worker) SetTimeoutFunc(timeoutFunc func(*tasks.Signature) int) {
worker.timeOutFunc = timeoutFunc
}

//GetServer returns server
func (worker *Worker) GetServer() *Server {
return worker.server
}

//
func (worker *Worker) PreConsumeHandler() bool {
if worker.preConsumeHandler == nil {
return true
Expand Down
16 changes: 14 additions & 2 deletions v2/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Worker struct {
preTaskHandler func(*tasks.Signature)
postTaskHandler func(*tasks.Signature)
preConsumeHandler func(*Worker) bool
timeOutFunc func(*tasks.Signature) int
}

var (
Expand Down Expand Up @@ -210,8 +211,12 @@ func (worker *Worker) taskRetry(signature *tasks.Signature) error {
// Decrement the retry counter, when it reaches 0, we won't retry again
signature.RetryCount--

// Increase retry timeout
signature.RetryTimeout = retry.FibonacciNext(signature.RetryTimeout)
if worker.timeOutFunc != nil {
signature.RetryTimeout = worker.timeOutFunc(signature)
} else {
// Increase retry timeout
signature.RetryTimeout = retry.FibonacciNext(signature.RetryTimeout)
}

// Delay task by signature.RetryTimeout seconds
eta := time.Now().UTC().Add(time.Second * time.Duration(signature.RetryTimeout))
Expand All @@ -235,6 +240,13 @@ func (worker *Worker) retryTaskIn(signature *tasks.Signature, retryIn time.Durat
eta := time.Now().UTC().Add(retryIn)
signature.ETA = &eta

// Decrement the retry counter, when it reaches 0, we won't retry again
signature.RetryCount--

if signature.RetryCount == 0 {
return worker.taskFailed(signature, errors.New("Task failed too many times."))
}

log.WARNING.Printf("Task %s failed. Going to retry in %.0f seconds.", signature.UUID, retryIn.Seconds())

// Send the task back to the queue
Expand Down

0 comments on commit 29059c0

Please sign in to comment.