Skip to content

Commit

Permalink
Merge pull request #1 from klippa-app/feature/cancel-delayed-tasks
Browse files Browse the repository at this point in the history
Cancel delayed tasks (for now only implemented for Redis)
  • Loading branch information
jerbob92 authored Feb 14, 2022
2 parents 5d996ef + 29059c0 commit 08072ad
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 0 deletions.
1 change: 1 addition & 0 deletions v1/brokers/iface/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type Broker interface {
Publish(ctx context.Context, task *tasks.Signature) error
GetPendingTasks(queue string) ([]*tasks.Signature, error)
GetDelayedTasks() ([]*tasks.Signature, error)
CancelDelayedTask(taskUUID string) error
AdjustRoutingKey(s *tasks.Signature)
}

Expand Down
31 changes: 31 additions & 0 deletions v1/brokers/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"math"
"runtime"
Expand Down Expand Up @@ -273,6 +274,36 @@ func (b *Broker) GetDelayedTasks() ([]*tasks.Signature, error) {
return taskSignatures, nil
}

func (b *Broker) GetDelayedTask(taskUUID string) (*tasks.Signature, error) {
delayedTasks, err := b.GetDelayedTasks()
if err != nil {
return nil, err
}

for _, task := range delayedTasks {
if task != nil && task.UUID == taskUUID {
return task, nil
}
}

return nil, errors.New("Not found")
}

func (b *Broker) CancelDelayedTask(taskUUID string) error {
conn := b.open()
defer conn.Close()

task, err := b.GetDelayedTask(taskUUID)
if err != nil {
return err
}

taskBytes, err := task.ToJSONByte()
_, err = conn.Do("ZREM", b.redisDelayedTasksKey, taskBytes)

return err
}

// consume takes delivered messages from the channel and manages a worker pool
// to process tasks concurrently
func (b *Broker) consume(deliveries <-chan []byte, concurrency int, taskProcessor iface.TaskProcessor) error {
Expand Down
5 changes: 5 additions & 0 deletions v1/common/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ func (b *Broker) GetDelayedTasks() ([]*tasks.Signature, error) {
return nil, errors.New("Not implemented")
}

// CancelDelayedTask removes a scheduled task with given UUID, so that is not picked up by the queue later
func (b *Broker) CancelDelayedTask(taskUUID string) error {
return errors.New("Not implemented")
}

// StartConsuming is a common part of StartConsuming method
func (b *Broker) StartConsuming(consumerTag string, concurrency int, taskProcessor iface.TaskProcessor) {
if b.retryFunc == nil {
Expand Down
5 changes: 5 additions & 0 deletions v1/tasks/signature.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package tasks

import (
"encoding/json"
"fmt"
"github.com/RichardKnop/machinery/v1/utils"
"time"
Expand Down Expand Up @@ -94,3 +95,7 @@ func CopySignature(signature *Signature) *Signature {
_ = utils.DeepCopy(sig, signature)
return sig
}

func (s Signature) ToJSONByte() ([]byte, error) {
return json.Marshal(&s)
}

0 comments on commit 08072ad

Please sign in to comment.