Skip to content

Commit

Permalink
Merge pull request #40 from Del-sama/master
Browse files Browse the repository at this point in the history
Add poll strategy
  • Loading branch information
vgarvardt authored Oct 11, 2021
2 parents 959444b + 7428f88 commit 32b91d7
Show file tree
Hide file tree
Showing 5 changed files with 248 additions and 30 deletions.
51 changes: 51 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ VALUES
// If a job is found, it will be locked on the transactional level, so other workers
// will be skipping it. If no job is found, nil will be returned instead of an error.
//
// This function cares about the priority first to lock top priority jobs first even if there are available ones that
// should be executed earlier but with the lower priority.
//
// Because Gue uses transaction-level locks, we have to hold the
// same transaction throughout the process of getting a job, working it,
// deleting it, and releasing the lock.
Expand Down Expand Up @@ -177,6 +180,54 @@ WHERE job_id = $1 FOR UPDATE SKIP LOCKED`, id).Scan(
return nil, fmt.Errorf("could not lock the job (rollback result: %v): %w", rbErr, err)
}

// LockNextScheduledJob attempts to retrieve the earliest scheduled Job from the database in the specified queue.
// If a job is found, it will be locked on the transactional level, so other workers
// will be skipping it. If no job is found, nil will be returned instead of an error.
//
// This function cares about the scheduled time first to lock earliest to execute jobs first even if there are ones
// with a higher priority scheduled to a later time but already eligible for execution
//
// Because Gue uses transaction-level locks, we have to hold the
// same transaction throughout the process of getting a job, working it,
// deleting it, and releasing the lock.
//
// After the Job has been worked, you must call either Done() or Error() on it
// in order to commit transaction to persist Job changes (remove or update it).
func (c *Client) LockNextScheduledJob(ctx context.Context, queue string) (*Job, error) {
tx, err := c.pool.Begin(ctx)
if err != nil {
return nil, err
}

now := time.Now().UTC()

j := Job{pool: c.pool, tx: tx, backoff: c.backoff}

err = tx.QueryRow(ctx, `SELECT job_id, queue, priority, run_at, job_type, args, error_count
FROM gue_jobs
WHERE queue = $1 AND run_at <= $2
ORDER BY run_at, priority ASC
LIMIT 1 FOR UPDATE SKIP LOCKED`, queue, now).Scan(
&j.ID,
&j.Queue,
&j.Priority,
&j.RunAt,
&j.Type,
(*json.RawMessage)(&j.Args),
&j.ErrorCount,
)
if err == nil {
return &j, nil
}

rbErr := tx.Rollback(ctx)
if err == adapter.ErrNoRows {
return nil, rbErr
}

return nil, fmt.Errorf("could not lock a job (rollback result: %v): %w", rbErr, err)
}

func newID() string {
hasher := md5.New()
// nolint:errcheck
Expand Down
108 changes: 108 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,114 @@ func testLockJobByIDNoJob(t *testing.T, connPool adapter.ConnPool) {
require.Nil(t, j)
}

func TestLockNextScheduledJob(t *testing.T) {
t.Run("pgx/v3", func(t *testing.T) {
testLockNextScheduledJob(t, adapterTesting.OpenTestPoolPGXv3(t))
})
t.Run("pgx/v4", func(t *testing.T) {
testLockNextScheduledJob(t, adapterTesting.OpenTestPoolPGXv4(t))
})
t.Run("lib/pq", func(t *testing.T) {
testLockNextScheduledJob(t, adapterTesting.OpenTestPoolLibPQ(t))
})
t.Run("go-pg/v10", func(t *testing.T) {
testLockNextScheduledJob(t, adapterTesting.OpenTestPoolGoPGv10(t))
})
}

func testLockNextScheduledJob(t *testing.T, connPool adapter.ConnPool) {
c := NewClient(connPool)
ctx := context.Background()

newJob := &Job{
Type: "MyJob",
RunAt: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
}
err := c.Enqueue(ctx, newJob)
require.NoError(t, err)
require.Greater(t, newJob.ID, int64(0))

j, err := c.LockNextScheduledJob(ctx, "")
require.NoError(t, err)

require.NotNil(t, j.tx)
require.NotNil(t, j.pool)
defer func() {
err := j.Done(ctx)
assert.NoError(t, err)
}()

// check values of returned Job
assert.Equal(t, newJob.ID, j.ID)
assert.Equal(t, defaultQueueName, j.Queue)
assert.Equal(t, int16(0), j.Priority)
assert.False(t, j.RunAt.IsZero())
assert.Equal(t, newJob.Type, j.Type)
assert.Equal(t, []byte(`[]`), j.Args)
assert.Equal(t, int32(0), j.ErrorCount)
assert.NotEqual(t, pgtype.Present, j.LastError.Status)
}

func TestLockNextScheduledJobAlreadyLocked(t *testing.T) {
t.Run("pgx/v3", func(t *testing.T) {
testLockNextScheduledJobAlreadyLocked(t, adapterTesting.OpenTestPoolPGXv3(t))
})
t.Run("pgx/v4", func(t *testing.T) {
testLockNextScheduledJobAlreadyLocked(t, adapterTesting.OpenTestPoolPGXv4(t))
})
t.Run("lib/pq", func(t *testing.T) {
testLockNextScheduledJobAlreadyLocked(t, adapterTesting.OpenTestPoolLibPQ(t))
})
t.Run("go-pg/v10", func(t *testing.T) {
testLockNextScheduledJobAlreadyLocked(t, adapterTesting.OpenTestPoolGoPGv10(t))
})
}

func testLockNextScheduledJobAlreadyLocked(t *testing.T, connPool adapter.ConnPool) {
c := NewClient(connPool)
ctx := context.Background()

err := c.Enqueue(ctx, &Job{Type: "MyJob"})
require.NoError(t, err)

j, err := c.LockNextScheduledJob(ctx, "")
require.NoError(t, err)

defer func() {
err := j.Done(ctx)
assert.NoError(t, err)
}()
require.NotNil(t, j)

j2, err := c.LockNextScheduledJob(ctx, "")
require.NoError(t, err)
require.Nil(t, j2)
}

func TestLockNextScheduledJobNoJob(t *testing.T) {
t.Run("pgx/v3", func(t *testing.T) {
testLockNextScheduledJobNoJob(t, adapterTesting.OpenTestPoolPGXv3(t))
})
t.Run("pgx/v4", func(t *testing.T) {
testLockNextScheduledJobNoJob(t, adapterTesting.OpenTestPoolPGXv4(t))
})
t.Run("lib/pq", func(t *testing.T) {
testLockNextScheduledJobNoJob(t, adapterTesting.OpenTestPoolLibPQ(t))
})
t.Run("go-pg/v10", func(t *testing.T) {
testLockNextScheduledJobNoJob(t, adapterTesting.OpenTestPoolGoPGv10(t))
})
}

func testLockNextScheduledJobNoJob(t *testing.T, connPool adapter.ConnPool) {
c := NewClient(connPool)
ctx := context.Background()

j, err := c.LockNextScheduledJob(ctx, "")
require.NoError(t, err)
require.Nil(t, j)
}

func TestJobTx(t *testing.T) {
t.Run("pgx/v3", func(t *testing.T) {
testJobTx(t, adapterTesting.OpenTestPoolPGXv3(t))
Expand Down
85 changes: 55 additions & 30 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,18 @@ import (
"github.com/vgarvardt/gue/v3/adapter"
)

// PollStrategy determines how the DB is queried for the next job to work on
type PollStrategy string

const (
defaultPollInterval = 5 * time.Second
defaultQueueName = ""
// PriorityPollStrategy cares about the priority first to lock top priority jobs first even if there are available
//ones that should be executed earlier but with lower priority.
PriorityPollStrategy PollStrategy = "OrderByPriority"
// RunAtPollStrategy cares about the scheduled time first to lock earliest to execute jobs first even if there
// are ones with a higher priority scheduled to a later time but already eligible for execution
RunAtPollStrategy PollStrategy = "OrderByRunAtPriority"
)

// WorkFunc is a function that performs a Job. If an error is returned, the job
Expand All @@ -26,17 +35,22 @@ type WorkFunc func(ctx context.Context, j *Job) error
// given type.
type WorkMap map[string]WorkFunc

// pollFunc is a function that queries the DB for the next job to work on
type pollFunc func(context.Context, string) (*Job, error)

// Worker is a single worker that pulls jobs off the specified queue. If no Job
// is found, the Worker will sleep for interval seconds.
type Worker struct {
wm WorkMap
interval time.Duration
queue string
c *Client
id string
logger adapter.Logger
mu sync.Mutex
running bool
wm WorkMap
interval time.Duration
queue string
c *Client
id string
logger adapter.Logger
mu sync.Mutex
running bool
pollStrategy PollStrategy
pollFunc pollFunc
}

// NewWorker returns a Worker that fetches Jobs from the Client and executes
Expand All @@ -49,11 +63,12 @@ type Worker struct {
// WithWorkerQueue option.
func NewWorker(c *Client, wm WorkMap, options ...WorkerOption) *Worker {
w := Worker{
interval: defaultPollInterval,
queue: defaultQueueName,
c: c,
wm: wm,
logger: adapter.NoOpLogger{},
interval: defaultPollInterval,
queue: defaultQueueName,
c: c,
wm: wm,
logger: adapter.NoOpLogger{},
pollStrategy: PriorityPollStrategy,
}

for _, option := range options {
Expand All @@ -64,6 +79,13 @@ func NewWorker(c *Client, wm WorkMap, options ...WorkerOption) *Worker {
w.id = newID()
}

switch w.pollStrategy {
case RunAtPollStrategy:
w.pollFunc = w.c.LockNextScheduledJob
default:
w.pollFunc = w.c.LockJob
}

w.logger = w.logger.With(adapter.F("worker-id", w.id))

return &w
Expand Down Expand Up @@ -150,7 +172,8 @@ func (w *Worker) runLoop(ctx context.Context) error {

// WorkOne tries to consume single message from the queue.
func (w *Worker) WorkOne(ctx context.Context) (didWork bool) {
j, err := w.c.LockJob(ctx, w.queue)
j, err := w.pollFunc(ctx, w.queue)

if err != nil {
w.logger.Error("Worker failed to lock a job", adapter.Err(err))
return
Expand Down Expand Up @@ -217,15 +240,16 @@ func recoverPanic(ctx context.Context, logger adapter.Logger, j *Job) {
// WorkerPool is a pool of Workers, each working jobs from the queue queue
// at the specified interval using the WorkMap.
type WorkerPool struct {
wm WorkMap
interval time.Duration
queue string
c *Client
workers []*Worker
id string
logger adapter.Logger
mu sync.Mutex
running bool
wm WorkMap
interval time.Duration
queue string
c *Client
workers []*Worker
id string
logger adapter.Logger
mu sync.Mutex
running bool
pollStrategy PollStrategy
}

// NewWorkerPool creates a new WorkerPool with count workers using the Client c.
Expand All @@ -235,12 +259,13 @@ type WorkerPool struct {
// nameless queue "", which can be overridden by WithPoolQueue option.
func NewWorkerPool(c *Client, wm WorkMap, poolSize int, options ...WorkerPoolOption) *WorkerPool {
w := WorkerPool{
wm: wm,
interval: defaultPollInterval,
queue: defaultQueueName,
c: c,
workers: make([]*Worker, poolSize),
logger: adapter.NoOpLogger{},
wm: wm,
interval: defaultPollInterval,
queue: defaultQueueName,
c: c,
workers: make([]*Worker, poolSize),
logger: adapter.NoOpLogger{},
pollStrategy: PriorityPollStrategy,
}

for _, option := range options {
Expand All @@ -261,9 +286,9 @@ func NewWorkerPool(c *Client, wm WorkMap, poolSize int, options ...WorkerPoolOpt
WithWorkerQueue(w.queue),
WithWorkerID(fmt.Sprintf("%s/worker-%d", w.id, i)),
WithWorkerLogger(w.logger),
WithWorkerPollStrategy(w.pollStrategy),
)
}

return &w
}

Expand Down
14 changes: 14 additions & 0 deletions worker_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ func WithWorkerLogger(logger adapter.Logger) WorkerOption {
}
}

// WithWorkerPollStrategy overrides default poll strategy with given value
func WithWorkerPollStrategy(s PollStrategy) WorkerOption {
return func(w *Worker) {
w.pollStrategy = s
}
}

// WithPoolPollInterval overrides default poll interval with the given value.
// Poll interval is the "sleep" duration if there were no jobs found in the DB.
func WithPoolPollInterval(d time.Duration) WorkerPoolOption {
Expand Down Expand Up @@ -69,3 +76,10 @@ func WithPoolLogger(logger adapter.Logger) WorkerPoolOption {
w.logger = logger
}
}

// WithPoolPollStrategy overrides default poll strategy with given value
func WithPoolPollStrategy(s PollStrategy) WorkerPoolOption {
return func(w *WorkerPool) {
w.pollStrategy = s
}
}
20 changes: 20 additions & 0 deletions worker_option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,16 @@ func TestWithWorkerLogger(t *testing.T) {
l.AssertExpectations(t)
}

func TestWithWorkerPollStrategy(t *testing.T) {
wm := WorkMap{
"MyJob": func(ctx context.Context, j *Job) error {
return nil
},
}
workerWithWorkerPollStrategy := NewWorker(nil, wm, WithWorkerPollStrategy(RunAtPollStrategy))
assert.Equal(t, RunAtPollStrategy, workerWithWorkerPollStrategy.pollStrategy)
}

func TestWithPoolPollInterval(t *testing.T) {
wm := WorkMap{
"MyJob": func(ctx context.Context, j *Job) error {
Expand Down Expand Up @@ -167,3 +177,13 @@ func TestWithPoolLogger(t *testing.T) {

l.AssertExpectations(t)
}

func TestWithPoolPollStrategy(t *testing.T) {
wm := WorkMap{
"MyJob": func(ctx context.Context, j *Job) error {
return nil
},
}
workerPoolWithPoolPollStrategy := NewWorkerPool(nil, wm, 2, WithPoolPollStrategy(RunAtPollStrategy))
assert.Equal(t, RunAtPollStrategy, workerPoolWithPoolPollStrategy.pollStrategy)
}

0 comments on commit 32b91d7

Please sign in to comment.