diff --git a/v1/brokers/iface/interfaces.go b/v1/brokers/iface/interfaces.go index c15e8f7..b550aa4 100644 --- a/v1/brokers/iface/interfaces.go +++ b/v1/brokers/iface/interfaces.go @@ -17,6 +17,7 @@ type Broker interface { Publish(ctx context.Context, task *tasks.Signature) error GetPendingTasks(queue string) ([]*tasks.Signature, error) GetDelayedTasks() ([]*tasks.Signature, error) + CancelDelayedTask(taskUUID string) error AdjustRoutingKey(s *tasks.Signature) } diff --git a/v1/brokers/redis/redis.go b/v1/brokers/redis/redis.go index 1683f13..dc1a5f8 100644 --- a/v1/brokers/redis/redis.go +++ b/v1/brokers/redis/redis.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "math" "runtime" @@ -273,6 +274,36 @@ func (b *Broker) GetDelayedTasks() ([]*tasks.Signature, error) { return taskSignatures, nil } +func (b *Broker) GetDelayedTask(taskUUID string) (*tasks.Signature, error) { + delayedTasks, err := b.GetDelayedTasks() + if err != nil { + return nil, err + } + + for _, task := range delayedTasks { + if task != nil && task.UUID == taskUUID { + return task, nil + } + } + + return nil, errors.New("Not found") +} + +func (b *Broker) CancelDelayedTask(taskUUID string) error { + conn := b.open() + defer conn.Close() + + task, err := b.GetDelayedTask(taskUUID) + if err != nil { + return err + } + + taskBytes, err := task.ToJSONByte() + _, err = conn.Do("ZREM", b.redisDelayedTasksKey, taskBytes) + + return err +} + // consume takes delivered messages from the channel and manages a worker pool // to process tasks concurrently func (b *Broker) consume(deliveries <-chan []byte, concurrency int, taskProcessor iface.TaskProcessor) error { diff --git a/v1/common/broker.go b/v1/common/broker.go index 1846eb2..d53d4fe 100644 --- a/v1/common/broker.go +++ b/v1/common/broker.go @@ -95,6 +95,11 @@ func (b *Broker) GetDelayedTasks() ([]*tasks.Signature, error) { return nil, errors.New("Not implemented") } +// CancelDelayedTask removes a scheduled task with given UUID, so that is not picked up by the queue later +func (b *Broker) CancelDelayedTask(taskUUID string) error { + return errors.New("Not implemented") +} + // StartConsuming is a common part of StartConsuming method func (b *Broker) StartConsuming(consumerTag string, concurrency int, taskProcessor iface.TaskProcessor) { if b.retryFunc == nil { diff --git a/v1/tasks/signature.go b/v1/tasks/signature.go index 7a90c4a..27affdb 100644 --- a/v1/tasks/signature.go +++ b/v1/tasks/signature.go @@ -1,6 +1,7 @@ package tasks import ( + "encoding/json" "fmt" "github.com/RichardKnop/machinery/v1/utils" "time" @@ -94,3 +95,7 @@ func CopySignature(signature *Signature) *Signature { _ = utils.DeepCopy(sig, signature) return sig } + +func (s Signature) ToJSONByte() ([]byte, error) { + return json.Marshal(&s) +}