Skip to content

Commit

Permalink
Change payload to byte slice
Browse files Browse the repository at this point in the history
  • Loading branch information
hibiken committed Jun 29, 2021
1 parent 7af3981 commit 4768124
Show file tree
Hide file tree
Showing 21 changed files with 254 additions and 1,156 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed

- NewTask function now takes array of bytes as payload.
- Task `Type` and `Payload` should be accessed by a method call.
- Requires redis v4.0+ for multiple field/value pair support
- Renamed pending key (TODO: need migration script

Expand Down
73 changes: 45 additions & 28 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ Asynq is a Go library for queueing tasks and processing them asynchronously with

Highlevel overview of how Asynq works:

- Client puts task on a queue
- Server pulls task off queues and starts a worker goroutine for each task
- Client puts tasks on a queue
- Server pulls tasks off queues and starts a worker goroutine for each task
- Tasks are processed concurrently by multiple workers

Task queues are used as a mechanism to distribute work across multiple machines. A system can consist of multiple worker servers and brokers, giving way to high availability and horizontal scaling.
Expand Down Expand Up @@ -77,19 +77,36 @@ const (
TypeImageResize = "image:resize"
)

type EmailDeliveryPayload struct {
UserID int
TemplateID string
}

type ImageResizePayload struct {
SourceURL string
}

//----------------------------------------------
// Write a function NewXXXTask to create a task.
// A task consists of a type and a payload.
//----------------------------------------------

func NewEmailDeliveryTask(userID int, tmplID string) *asynq.Task {
payload := map[string]interface{}{"user_id": userID, "template_id": tmplID}
return asynq.NewTask(TypeEmailDelivery, payload)
func NewEmailDeliveryTask(userID int, tmplID string) (*asynq.Task, error) {
payload := EmailDeliveryPayload{UserID: userID, TemplateID: templID}
bytes, err := json.Marshal(payload)
if err != nil {
return nil, err
}
return asynq.NewTask(TypeEmailDelivery, bytes), nil
}

func NewImageResizeTask(src string) *asynq.Task {
payload := map[string]interface{}{"src": src}
return asynq.NewTask(TypeImageResize, payload)
func NewImageResizeTask(src string) (*asynq.Task, error) {
payload := ImageResizePayload{SourceURL: src}
bytes, err := json.Marshal(payload)
if err != nil {
return nil, err
}
return asynq.NewTask(TypeImageResize, bytes), nil
}

//---------------------------------------------------------------
Expand All @@ -101,15 +118,11 @@ func NewImageResizeTask(src string) *asynq.Task {
//---------------------------------------------------------------

func HandleEmailDeliveryTask(ctx context.Context, t *asynq.Task) error {
userID, err := t.Payload.GetInt("user_id")
if err != nil {
return err
var p EmailDeliveryPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
}
tmplID, err := t.Payload.GetString("template_id")
if err != nil {
return err
}
fmt.Printf("Send Email to User: user_id = %d, template_id = %s\n", userID, tmplID)
log.Printf("Sending Email to User: user_id = %d, template_id = %s\n", p.UserID, p.TemplateID)
// Email delivery code ...
return nil
}
Expand All @@ -120,11 +133,11 @@ type ImageProcessor struct {
}

func (p *ImageProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error {
src, err := t.Payload.GetString("src")
if err != nil {
return err
var p ImageResizePayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
}
fmt.Printf("Resize image: src = %s\n", src)
log.Printf("Resizing image: src = %s\n", p.SourceURL)
// Image resizing code ...
return nil
}
Expand Down Expand Up @@ -160,10 +173,13 @@ func main() {
// Use (*Client).Enqueue method.
// ------------------------------------------------------

t := tasks.NewEmailDeliveryTask(42, "some:template:id")
t, err := tasks.NewEmailDeliveryTask(42, "some:template:id")
if err != nil {
log.Fatalf("could not create task: %v", err)
}
res, err := c.Enqueue(t)
if err != nil {
log.Fatal("could not enqueue task: %v", err)
log.Fatalf("could not enqueue task: %v", err)
}
fmt.Printf("Enqueued Result: %+v\n", res)

Expand All @@ -173,10 +189,9 @@ func main() {
// Use ProcessIn or ProcessAt option.
// ------------------------------------------------------------

t = tasks.NewEmailDeliveryTask(42, "other:template:id")
res, err = c.Enqueue(t, asynq.ProcessIn(24*time.Hour))
if err != nil {
log.Fatal("could not schedule task: %v", err)
log.Fatalf("could not schedule task: %v", err)
}
fmt.Printf("Enqueued Result: %+v\n", res)

Expand All @@ -188,19 +203,21 @@ func main() {

c.SetDefaultOptions(tasks.TypeImageResize, asynq.MaxRetry(10), asynq.Timeout(3*time.Minute))

t = tasks.NewImageResizeTask("some/blobstore/path")
t, err = tasks.NewImageResizeTask("https://example.com/myassets/image.jpg")
if err != nil {
log.Fatalf("could not create task: %v", err)
}
res, err = c.Enqueue(t)
if err != nil {
log.Fatal("could not enqueue task: %v", err)
log.Fatalf("could not enqueue task: %v", err)
}
fmt.Printf("Enqueued Result: %+v\n", res)

// ---------------------------------------------------------------------------
// Example 4: Pass options to tune task processing behavior at enqueue time.
// Options passed at enqueue time override default ones, if any.
// Options passed at enqueue time override default ones.
// ---------------------------------------------------------------------------

t = tasks.NewImageResizeTask("some/blobstore/path")
res, err = c.Enqueue(t, asynq.Queue("critical"), asynq.Timeout(30*time.Second))
if err != nil {
log.Fatal("could not enqueue task: %v", err)
Expand Down
19 changes: 10 additions & 9 deletions asynq.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,21 @@ import (

// Task represents a unit of work to be performed.
type Task struct {
// Type indicates the type of task to be performed.
Type string
// typename indicates the type of task to be performed.
typename string

// Payload holds data needed to perform the task.
Payload Payload
// payload holds data needed to perform the task.
payload []byte
}

func (t *Task) Type() string { return t.typename }
func (t *Task) Payload() []byte { return t.payload }

// NewTask returns a new Task given a type name and payload data.
//
// The payload values must be serializable.
func NewTask(typename string, payload map[string]interface{}) *Task {
func NewTask(typename string, payload []byte) *Task {
return &Task{
Type: typename,
Payload: Payload{payload},
typename: typename,
payload: payload,
}
}

Expand Down
2 changes: 1 addition & 1 deletion asynq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func getRedisConnOpt(tb testing.TB) RedisConnOpt {
var sortTaskOpt = cmp.Transformer("SortMsg", func(in []*Task) []*Task {
out := append([]*Task(nil), in...) // Copy input to avoid mutating it
sort.Slice(out, func(i, j int) bool {
return out[i].Type < out[j].Type
return out[i].Type() < out[j].Type()
})
return out
})
Expand Down
49 changes: 29 additions & 20 deletions benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,24 @@ package asynq

import (
"context"
"encoding/json"
"fmt"
"sync"
"testing"
"time"

h "github.com/hibiken/asynq/internal/asynqtest"
)

// Creates a new task of type "task<n>" with payload {"data": n}.
func makeTask(n int) *Task {
b, err := json.Marshal(map[string]int{"data": n})
if err != nil {
panic(err)
}
return NewTask(fmt.Sprintf("task%d", n), b)
}

// Simple E2E Benchmark testing with no scheduled tasks and retries.
func BenchmarkEndToEndSimple(b *testing.B) {
const count = 100000
Expand All @@ -29,8 +41,7 @@ func BenchmarkEndToEndSimple(b *testing.B) {
})
// Create a bunch of tasks
for i := 0; i < count; i++ {
t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i})
if _, err := client.Enqueue(t); err != nil {
if _, err := client.Enqueue(makeTask(i)); err != nil {
b.Fatalf("could not enqueue a task: %v", err)
}
}
Expand Down Expand Up @@ -70,14 +81,12 @@ func BenchmarkEndToEnd(b *testing.B) {
})
// Create a bunch of tasks
for i := 0; i < count; i++ {
t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i})
if _, err := client.Enqueue(t); err != nil {
if _, err := client.Enqueue(makeTask(i)); err != nil {
b.Fatalf("could not enqueue a task: %v", err)
}
}
for i := 0; i < count; i++ {
t := NewTask(fmt.Sprintf("scheduled%d", i), map[string]interface{}{"data": i})
if _, err := client.Enqueue(t, ProcessIn(1*time.Second)); err != nil {
if _, err := client.Enqueue(makeTask(i), ProcessIn(1*time.Second)); err != nil {
b.Fatalf("could not enqueue a task: %v", err)
}
}
Expand All @@ -86,13 +95,18 @@ func BenchmarkEndToEnd(b *testing.B) {
var wg sync.WaitGroup
wg.Add(count * 2)
handler := func(ctx context.Context, t *Task) error {
n, err := t.Payload.GetInt("data")
if err != nil {
var p map[string]int
if err := json.Unmarshal(t.Payload(), &p); err != nil {
b.Logf("internal error: %v", err)
}
n, ok := p["data"]
if !ok {
n = 1
b.Logf("internal error: could not get data from payload")
}
retried, ok := GetRetryCount(ctx)
if !ok {
b.Logf("internal error: %v", err)
b.Logf("internal error: could not get retry count from context")
}
// Fail 1% of tasks for the first attempt.
if retried == 0 && n%100 == 0 {
Expand Down Expand Up @@ -136,20 +150,17 @@ func BenchmarkEndToEndMultipleQueues(b *testing.B) {
})
// Create a bunch of tasks
for i := 0; i < highCount; i++ {
t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i})
if _, err := client.Enqueue(t, Queue("high")); err != nil {
if _, err := client.Enqueue(makeTask(i), Queue("high")); err != nil {
b.Fatalf("could not enqueue a task: %v", err)
}
}
for i := 0; i < defaultCount; i++ {
t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i})
if _, err := client.Enqueue(t); err != nil {
if _, err := client.Enqueue(makeTask(i)); err != nil {
b.Fatalf("could not enqueue a task: %v", err)
}
}
for i := 0; i < lowCount; i++ {
t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i})
if _, err := client.Enqueue(t, Queue("low")); err != nil {
if _, err := client.Enqueue(makeTask(i), Queue("low")); err != nil {
b.Fatalf("could not enqueue a task: %v", err)
}
}
Expand Down Expand Up @@ -190,15 +201,13 @@ func BenchmarkClientWhileServerRunning(b *testing.B) {
})
// Enqueue 10,000 tasks.
for i := 0; i < count; i++ {
t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i})
if _, err := client.Enqueue(t); err != nil {
if _, err := client.Enqueue(makeTask(i)); err != nil {
b.Fatalf("could not enqueue a task: %v", err)
}
}
// Schedule 10,000 tasks.
for i := 0; i < count; i++ {
t := NewTask(fmt.Sprintf("scheduled%d", i), map[string]interface{}{"data": i})
if _, err := client.Enqueue(t, ProcessIn(1*time.Second)); err != nil {
if _, err := client.Enqueue(makeTask(i), ProcessIn(1*time.Second)); err != nil {
b.Fatalf("could not enqueue a task: %v", err)
}
}
Expand All @@ -213,7 +222,7 @@ func BenchmarkClientWhileServerRunning(b *testing.B) {
b.Log("Starting enqueueing")
enqueued := 0
for enqueued < 100000 {
t := NewTask(fmt.Sprintf("enqueued%d", enqueued), map[string]interface{}{"data": enqueued})
t := NewTask(fmt.Sprintf("enqueued%d", enqueued), h.JSON(map[string]interface{}{"data": enqueued}))
if _, err := client.Enqueue(t); err != nil {
b.Logf("could not enqueue task %d: %v", enqueued, err)
continue
Expand Down
9 changes: 4 additions & 5 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ func (d processInOption) String() string { return fmt.Sprintf("ProcessIn(%v)
func (d processInOption) Type() OptionType { return ProcessInOpt }
func (d processInOption) Value() interface{} { return time.Duration(d) }


// ErrDuplicateTask indicates that the given task could not be enqueued since it's a duplicate of another task.
//
// ErrDuplicateTask error only applies to tasks enqueued with a Unique option.
Expand Down Expand Up @@ -305,7 +304,7 @@ func (c *Client) Close() error {
// If no ProcessAt or ProcessIn options are passed, the task will be processed immediately.
func (c *Client) Enqueue(task *Task, opts ...Option) (*Result, error) {
c.mu.Lock()
if defaults, ok := c.opts[task.Type]; ok {
if defaults, ok := c.opts[task.Type()]; ok {
opts = append(defaults, opts...)
}
c.mu.Unlock()
Expand All @@ -327,12 +326,12 @@ func (c *Client) Enqueue(task *Task, opts ...Option) (*Result, error) {
}
var uniqueKey string
if opt.uniqueTTL > 0 {
uniqueKey = base.UniqueKey(opt.queue, task.Type, task.Payload.data)
uniqueKey = base.UniqueKey(opt.queue, task.Type(), task.Payload())
}
msg := &base.TaskMessage{
ID: uuid.New(),
Type: task.Type,
Payload: task.Payload.data,
Type: task.Type(),
Payload: task.Payload(),
Queue: opt.queue,
Retry: opt.retry,
Deadline: deadline.Unix(),
Expand Down
Loading

0 comments on commit 4768124

Please sign in to comment.