Skip to content

Commit

Permalink
fix: NewScheduler incorrectly creates underlying Client, closing brok…
Browse files Browse the repository at this point in the history
…er properly (#977)

* fix: NewScheduler wrongly creates a client whose sharedConnection value is always true

* This is affecting the PeriodicManager as well as the Scheduler

* fix: closing the Client also closes the broker

* The error was also previously unhandled. For shared connections an error will be returned by the broker itself because the sharedConnection bool is also set on the client. This also means we can get rid of the sharedConnection flag on the Scheduler itself and let it work internally.
  • Loading branch information
kamikazechaser authored Dec 6, 2024
1 parent 106c07a commit 127fac2
Showing 1 changed file with 17 additions and 10 deletions.
27 changes: 17 additions & 10 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,29 +44,41 @@ type Scheduler struct {
// to avoid using cron.EntryID as the public API of
// the Scheduler.
idmap map[string]cron.EntryID
// When a Scheduler has been created with an existing Redis connection, we do
// not want to close it.
sharedConnection bool
}

const defaultHeartbeatInterval = 10 * time.Second

// NewScheduler returns a new Scheduler instance given the redis connection option.
// The parameter opts is optional, defaults will be used if opts is set to nil
func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler {
scheduler := newScheduler(opts)

redisClient, ok := r.MakeRedisClient().(redis.UniversalClient)
if !ok {
panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r))
}
scheduler := NewSchedulerFromRedisClient(redisClient, opts)
scheduler.sharedConnection = false

rdb := rdb.NewRDB(redisClient)

scheduler.rdb = rdb
scheduler.client = &Client{broker: rdb, sharedConnection: false}

return scheduler
}

// NewSchedulerFromRedisClient returns a new instance of Scheduler given a redis.UniversalClient
// The parameter opts is optional, defaults will be used if opts is set to nil.
// Warning: The underlying redis connection pool will not be closed by Asynq, you are responsible for closing it.
func NewSchedulerFromRedisClient(c redis.UniversalClient, opts *SchedulerOpts) *Scheduler {
scheduler := newScheduler(opts)

scheduler.rdb = rdb.NewRDB(c)
scheduler.client = NewClientFromRedisClient(c)

return scheduler
}

func newScheduler(opts *SchedulerOpts) *Scheduler {
if opts == nil {
opts = &SchedulerOpts{}
}
Expand All @@ -93,8 +105,6 @@ func NewSchedulerFromRedisClient(c redis.UniversalClient, opts *SchedulerOpts) *
state: &serverState{value: srvStateNew},
heartbeatInterval: heartbeatInterval,
logger: logger,
client: NewClientFromRedisClient(c),
rdb: rdb.NewRDB(c),
cron: cron.New(cron.WithLocation(loc)),
location: loc,
done: make(chan struct{}),
Expand Down Expand Up @@ -294,9 +304,6 @@ func (s *Scheduler) Shutdown() {
if err := s.client.Close(); err != nil {
s.logger.Errorf("Failed to close redis client connection: %v", err)
}
if !s.sharedConnection {
s.rdb.Close()
}
s.logger.Info("Scheduler stopped")
}

Expand Down

0 comments on commit 127fac2

Please sign in to comment.