Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add watchdog to refresh the lock atomatically #73

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,29 @@ func main() {
}
```

## Watchdog

If you want a long lock, you can use watchdog to refresh in background atomatically.
Set an interval for watchdog shorter than TTL, it would refresh the lock before expiration,
therefore your lock won't be released until you release it explicitly.

```go
lock, err := locker.Obtain(ctx, "my-key", 100*time.Millisecond, &redislock.Options{
Watchdog: redislock.NewTickWatchdog(50*time.Millisecond),
})
```

## Stats

Sometimes you need statistics for monitoring, telemetry, debugging, etc.

```go
stats := redislock.GetStats()
```

If you want prometheus metrics, see [redislock-prometheus](https://github.com/WqyJh/redislock-prometheus).


## Documentation

Full documentation is available on [GoDoc](http://godoc.org/github.com/bsm/redislock)
133 changes: 119 additions & 14 deletions redislock.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,11 @@ func (c *Client) Obtain(ctx context.Context, key string, ttl time.Duration, opt
// If any of requested key are already locked, no additional keys are
// locked and ErrNotObtained is returned.
// May return ErrNotObtained if not successful.
func (c *Client) ObtainMulti(ctx context.Context, keys []string, ttl time.Duration, opt *Options) (*Lock, error) {
func (c *Client) ObtainMulti(ctx context.Context, keys []string, ttl time.Duration, opt *Options) (_ *Lock, err error) {
defer func() {
recordStatus(&stats.Obtain, err)
}()

token := opt.getToken()
// Create a random token
if token == "" {
Expand All @@ -83,20 +87,18 @@ func (c *Client) ObtainMulti(ctx context.Context, keys []string, ttl time.Durati
ttlVal := strconv.FormatInt(int64(ttl/time.Millisecond), 10)
retry := opt.getRetryStrategy()

// make sure we don't retry forever
if _, ok := ctx.Deadline(); !ok {
var cancel context.CancelFunc
ctx, cancel = context.WithDeadline(ctx, time.Now().Add(ttl))
defer cancel()
}

var ticker *time.Ticker
for {
ok, err := c.obtain(ctx, keys, value, len(token), ttlVal)
if err != nil {
return nil, err
} else if ok {
return &Lock{Client: c, keys: keys, value: value, tokenLen: len(token)}, nil
lock := &Lock{Client: c, keys: keys, value: value, tokenLen: len(token)}
watchdog := opt.getWatchdog()
if watchdog != nil {
go watchdog.Start(ctx, lock, ttl)
}
return lock, nil
}

backoff := retry.NextBackoff()
Expand Down Expand Up @@ -152,6 +154,7 @@ type Lock struct {
keys []string
value string
tokenLen int
opt *Options
}

// Obtain is a short-cut for New(...).Obtain(...).
Expand Down Expand Up @@ -206,12 +209,17 @@ func (l *Lock) TTL(ctx context.Context) (time.Duration, error) {

// Refresh extends the lock with a new TTL.
// May return ErrNotObtained if refresh is unsuccessful.
func (l *Lock) Refresh(ctx context.Context, ttl time.Duration, opt *Options) error {
func (l *Lock) Refresh(ctx context.Context, ttl time.Duration, opt *Options) (err error) {
defer func() {
recordStatus(&stats.Refresh, err)
}()

if l == nil {
return ErrNotObtained
}

ttlVal := strconv.FormatInt(int64(ttl/time.Millisecond), 10)
_, err := luaRefresh.Run(ctx, l.client, l.keys, l.value, ttlVal).Result()
_, err = luaRefresh.Run(ctx, l.client, l.keys, l.value, ttlVal).Result()
if err != nil {
if errors.Is(err, redis.Nil) {
return ErrNotObtained
Expand All @@ -223,11 +231,21 @@ func (l *Lock) Refresh(ctx context.Context, ttl time.Duration, opt *Options) err

// Release manually releases the lock.
// May return ErrLockNotHeld.
func (l *Lock) Release(ctx context.Context) error {
func (l *Lock) Release(ctx context.Context) (err error) {
defer func() {
recordStatus(&stats.Release, err)
}()

if l == nil {
return ErrLockNotHeld
}
_, err := luaRelease.Run(ctx, l.client, l.keys, l.value).Result()

dog := l.opt.getWatchdog()
if dog != nil {
dog.Stop()
}

_, err = luaRelease.Run(ctx, l.client, l.keys, l.value).Result()
if err != nil {
if errors.Is(err, redis.Nil) {
return ErrLockNotHeld
Expand All @@ -245,12 +263,15 @@ type Options struct {
// Default: do not retry
RetryStrategy RetryStrategy

// Metadata string.
// Metadata string is appended to the lock token.
Metadata string

// Token is a unique value that is used to identify the lock. By default, a random tokens are generated. Use this
// option to provide a custom token instead.
Token string

// Watchdog allows to refresh atomatically.
Watchdog Watchdog
}

func (o *Options) getMetadata() string {
Expand All @@ -267,6 +288,13 @@ func (o *Options) getToken() string {
return ""
}

func (o *Options) getWatchdog() Watchdog {
if o != nil && o.Watchdog != nil {
return o.Watchdog
}
return nil
}

func (o *Options) getRetryStrategy() RetryStrategy {
if o != nil && o.RetryStrategy != nil {
return o.RetryStrategy
Expand Down Expand Up @@ -345,3 +373,80 @@ func (r *exponentialBackoff) NextBackoff() time.Duration {
return d
}
}

func isCanceled(err error) bool {
return errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)
}

func recordStatus(s *Status, err error) {
if err == nil {
atomic.AddInt64(&s.Success, 1)
}
if err == ErrNotObtained {
atomic.AddInt64(&s.Failed, 1)
} else if isCanceled(err) {
atomic.AddInt64(&s.Cancel, 1)
} else {
atomic.AddInt64(&s.Error, 1)
}
}

// --------------------------------------------------------------------

// Watchdog allows to refresh atomatically.
type Watchdog interface {
// Start starts the watchdog.
Start(ctx context.Context, lock *Lock, ttl time.Duration)
// Stop stops and waits the watchdog.
Stop()
}

// TickWatchdog refreshes the lock at regular intervals.
type TickWatchdog struct {
ctx context.Context
cancel context.CancelFunc
interval time.Duration
ch chan struct{}
}

// NewTickWatchdog creates a new watchdog that refreshes the lock at regular intervals.
func NewTickWatchdog(interval time.Duration) *TickWatchdog {
return &TickWatchdog{interval: interval, ch: make(chan struct{})}
}

// Start starts the watchdog.
func (w *TickWatchdog) Start(ctx context.Context, lock *Lock, ttl time.Duration) {
defer close(w.ch)
atomic.AddInt64(&stats.Watchdog, 1)

w.ctx, w.cancel = context.WithCancel(ctx)

ticker := time.NewTicker(w.interval)
defer ticker.Stop()

for {
select {
case <-w.ctx.Done():
atomic.AddInt64(&stats.WatchdogDone, 1)
return
case <-ticker.C:
atomic.AddInt64(&stats.WatchdogTick, 1)

err := lock.Refresh(w.ctx, ttl, nil)
if err != nil {
if err == ErrNotObtained {
return
}
// continue on other errors
}
}
}
}

// Stop stops and waits the watchdog.
func (w *TickWatchdog) Stop() {
if w.cancel != nil {
w.cancel()
}
<-w.ch
}
52 changes: 52 additions & 0 deletions stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package redislock

import "sync/atomic"

type Status struct {
Success int64
Failed int64
Error int64
Cancel int64
}

type Stats struct {
Obtain Status
Release Status
Refresh Status
Backoff int64
Watchdog int64
WatchdogDone int64
WatchdogTick int64
}

var (
stats Stats
)

func GetStats() Stats {
s := Stats{
Obtain: Status{
Success: atomic.LoadInt64(&stats.Obtain.Success),
Failed: atomic.LoadInt64(&stats.Obtain.Failed),
Error: atomic.LoadInt64(&stats.Obtain.Error),
Cancel: atomic.LoadInt64(&stats.Obtain.Cancel),
},
Release: Status{
Success: atomic.LoadInt64(&stats.Release.Success),
Failed: atomic.LoadInt64(&stats.Release.Failed),
Error: atomic.LoadInt64(&stats.Release.Error),
Cancel: atomic.LoadInt64(&stats.Release.Cancel),
},
Refresh: Status{
Success: atomic.LoadInt64(&stats.Refresh.Success),
Failed: atomic.LoadInt64(&stats.Refresh.Failed),
Error: atomic.LoadInt64(&stats.Refresh.Error),
Cancel: atomic.LoadInt64(&stats.Refresh.Cancel),
},
Backoff: atomic.LoadInt64(&stats.Backoff),
Watchdog: atomic.LoadInt64(&stats.Watchdog),
WatchdogDone: atomic.LoadInt64(&stats.WatchdogDone),
WatchdogTick: atomic.LoadInt64(&stats.WatchdogTick),
}
return s
}