diff --git a/spindle.go b/spindle.go index 8fa8035..736340b 100644 --- a/spindle.go +++ b/spindle.go @@ -51,17 +51,17 @@ type Lock struct { name string // lock name id string // unique id for this instance duration int64 // lock duration in ms - iter int64 + iter atomic.Int64 token *time.Time mtx *sync.Mutex logger *log.Logger - active int32 + active atomic.Int32 } // Run starts the main lock loop which can be canceled using the input context. You can // provide an optional done channel if you want to be notified when the loop is done. func (l *Lock) Run(ctx context.Context, done ...chan error) error { - var leader int32 // for heartbeat + var leader atomic.Int32 // for heartbeat go func() { min := (time.Millisecond * time.Duration(l.duration)) / 2 bo := gaxv2.Backoff{Max: time.Millisecond * time.Duration(l.duration)} @@ -71,7 +71,7 @@ func (l *Lock) Run(ctx context.Context, done ...chan error) error { } switch { - case atomic.LoadInt32(&leader) > 0: + case leader.Load() > 0: var tm time.Duration for { tm = bo.Pause() @@ -97,8 +97,8 @@ func (l *Lock) Run(ctx context.Context, done ...chan error) error { } if l.tokenString() != "" && l.tokenString() == tokenLocked { - atomic.AddInt32(&leader, 1) - if atomic.LoadInt32(&leader) == 1 { + leader.Add(1) + if leader.Load() == 1 { l.heartbeat() // only on 1 } @@ -120,7 +120,7 @@ func (l *Lock) Run(ctx context.Context, done ...chan error) error { if ok { l.logger.Println("leader active (not me)") - atomic.StoreInt32(&leader, 0) // reset heartbeat + leader.Store(0) // reset heartbeat return true } } @@ -128,25 +128,26 @@ func (l *Lock) Run(ctx context.Context, done ...chan error) error { return false // lock available } - var initial int32 = 1 - var active int32 + var initial atomic.Int32 + var active atomic.Int32 + initial.Store(1) attemptLeader := func() { defer func(begin time.Time) { l.logger.Printf("round %v took %v", l.Iterations(), time.Since(begin)) - atomic.StoreInt32(&l.active, 1) // global - atomic.StoreInt32(&active, 0) // local - atomic.AddInt64(&l.iter, 1) + l.active.Store(1) // global + active.Store(0) // local + l.iter.Add(1) }(time.Now()) - atomic.StoreInt32(&active, 1) // local + active.Store(1) // local if locked() { return } - // Attempt first ever lock. The return commit timestamp will be our fencing token. - // Only one node should be able to do this successfully. - if atomic.LoadInt32(&initial) == 1 { + // Attempt first ever lock. The return commit timestamp will be our fencing + // token. Only one node should be able to do this successfully. + if initial.Load() == 1 { prefix := "[init]" cts, err := l.db.ReadWriteTransaction(context.Background(), func(ctx context.Context, txn *spanner.ReadWriteTransaction) error { @@ -169,11 +170,11 @@ func (l *Lock) Run(ctx context.Context, done ...chan error) error { return } - atomic.StoreInt32(&initial, 0) + initial.Store(0) } // For the succeeding lock attempts. - if atomic.LoadInt32(&initial) == 0 { + if initial.Load() == 0 { prefix := "[next]" token, _, err := l.getCurrentTokenAndId() if err != nil { @@ -196,7 +197,8 @@ func (l *Lock) Run(ctx context.Context, done ...chan error) error { fmt.Fprintf(&q, "values ('%s')", nxt) _, err := txn.Update(ctx, spanner.Statement{SQL: q.String()}) return err - }) + }, + ) if err == nil { // We got the lock. Attempt to update the current token to this commit timestamp. @@ -244,7 +246,7 @@ func (l *Lock) Run(ctx context.Context, done ...chan error) error { case <-first: // immediately before 1st tick go attemptLeader() case <-ticker.C: // duration heartbeat - if atomic.LoadInt32(&active) == 1 { + if active.Load() == 1 { continue } @@ -254,7 +256,7 @@ func (l *Lock) Run(ctx context.Context, done ...chan error) error { done[0] <- nil } - atomic.StoreInt32(&l.active, 0) // global + l.active.Store(0) // global ticker.Stop() return } @@ -266,7 +268,7 @@ func (l *Lock) Run(ctx context.Context, done ...chan error) error { // HasLock returns true if this instance got the lock, together with the lock token. func (l *Lock) HasLock() (bool, string) { - if atomic.LoadInt32(&l.active) == 0 { + if l.active.Load() == 0 { return false, "" } @@ -284,7 +286,7 @@ func (l *Lock) HasLock() (bool, string) { // Leader returns the current leader id. func (l *Lock) Leader() (string, error) { - if atomic.LoadInt32(&l.active) == 0 { + if l.active.Load() == 0 { return "", ErrNotRunning } @@ -296,7 +298,7 @@ func (l *Lock) Leader() (string, error) { func (l *Lock) Duration() int64 { return l.duration } // Iterations returns the number of iterations done by the main loop. -func (l *Lock) Iterations() int64 { return atomic.LoadInt64(&l.iter) } +func (l *Lock) Iterations() int64 { return l.iter.Load() } // Client returns the Spanner client. func (l *Lock) Client() *spanner.Client { return l.db }