Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pkg): add ticker package #2617

Merged
merged 6 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 139 additions & 0 deletions pkg/ticker/ticker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Package ticker provides a dynamic ticker that can change its interval at runtime.
// The ticker can be stopped gracefully and handles context-based termination.
//
// This package is useful for scenarios where periodic execution of a function is needed
// and the interval might need to change dynamically based on runtime conditions.
//
// It also invokes a first tick immediately after the ticker starts. It's safe to use it concurrently.
//
// It also terminates gracefully when the context is done (return ctx.Err()) or when the stop signal is received.
//
// Example usage:
//
// ticker := New(time.Second, func(ctx context.Context, t *Ticker) error {
// resp, err := client.GetPrice(ctx)
// if err != nil {
// logger.Err(err).Error().Msg("failed to get price")
// return nil
// }
//
// observer.SetPrice(resp.GasPrice)
// t.SetInterval(resp.GasPriceInterval)
//
// return nil
// })
//
// err := ticker.Run(ctx)
package ticker

import (
"context"
"fmt"
"sync"
"time"

"cosmossdk.io/errors"
)

// Ticker represents a ticker that will run a function periodically.
// It also invokes BEFORE ticker starts.
type Ticker struct {
interval time.Duration
ticker *time.Ticker
runner Runner
signalChan chan struct{}

// runnerMu is a mutex to prevent double run
runnerMu sync.Mutex

// stateMu is a mutex to prevent concurrent SetInterval calls
stateMu sync.Mutex

stopped bool
}

// Runner is a function that will be called by the Ticker
lumtis marked this conversation as resolved.
Show resolved Hide resolved
type Runner func(ctx context.Context, t *Ticker) error

// New creates a new Ticker.
func New(interval time.Duration, runner Runner) *Ticker {
return &Ticker{interval: interval, runner: runner}
}

// Run creates and runs a new Ticker.
func Run(ctx context.Context, interval time.Duration, runner Runner) error {
return New(interval, runner).Run(ctx)
}
swift1337 marked this conversation as resolved.
Show resolved Hide resolved

// SecondsFromUint64 converts uint64 to time.Duration in seconds.
func SecondsFromUint64(d uint64) time.Duration {
return time.Duration(d) * time.Second

Check warning on line 70 in pkg/ticker/ticker.go

View check run for this annotation

Codecov / codecov/patch

pkg/ticker/ticker.go#L69-L70

Added lines #L69 - L70 were not covered by tests
}

// Run runs the ticker by blocking current goroutine. It also invokes BEFORE ticker starts.
// Stops when (if any):
// - context is done (returns ctx.Err())
// - runner returns an error or panics
// - shutdown signal is received
func (t *Ticker) Run(ctx context.Context) (err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panic during ticker run: %v", r)
}
}()

// prevent concurrent runs
t.runnerMu.Lock()
swift1337 marked this conversation as resolved.
Show resolved Hide resolved
defer t.runnerMu.Unlock()

// setup
t.ticker = time.NewTicker(t.interval)
swift1337 marked this conversation as resolved.
Show resolved Hide resolved
t.signalChan = make(chan struct{})
swift1337 marked this conversation as resolved.
Show resolved Hide resolved
t.stopped = false

// initial run
if err := t.runner(ctx, t); err != nil {
return errors.Wrap(err, "ticker runner failed")

Check warning on line 96 in pkg/ticker/ticker.go

View check run for this annotation

Codecov / codecov/patch

pkg/ticker/ticker.go#L96

Added line #L96 was not covered by tests
}

for {
select {
case <-ctx.Done():
return ctx.Err()
case <-t.ticker.C:
if err := t.runner(ctx, t); err != nil {
return errors.Wrap(err, "ticker runner failed")
}
case <-t.signalChan:
return nil
}
}
}
swift1337 marked this conversation as resolved.
Show resolved Hide resolved

// SetInterval updates the interval of the ticker.
func (t *Ticker) SetInterval(interval time.Duration) {
t.stateMu.Lock()
defer t.stateMu.Unlock()

// noop
if t.interval == interval || t.ticker == nil {
return

Check warning on line 120 in pkg/ticker/ticker.go

View check run for this annotation

Codecov / codecov/patch

pkg/ticker/ticker.go#L120

Added line #L120 was not covered by tests
}

t.interval = interval
t.ticker.Reset(interval)
}

// Stop stops the ticker. Safe to call concurrently or multiple times.
func (t *Ticker) Stop() {
t.stateMu.Lock()
defer t.stateMu.Unlock()

// noop
if t.stopped || t.signalChan == nil {
return
}

close(t.signalChan)
t.stopped = true
}
173 changes: 173 additions & 0 deletions pkg/ticker/ticker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package ticker

import (
"context"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestTicker(t *testing.T) {
const (
dur = time.Millisecond * 100
durSmall = dur / 10
)

t.Run("Basic case with context", func(t *testing.T) {
// ARRANGE
// Given a counter
var counter int

// And a context
ctx, cancel := context.WithTimeout(context.Background(), dur+durSmall)
defer cancel()

// And a ticker
ticker := New(dur, func(_ context.Context, t *Ticker) error {
counter++

return nil
})

// ACT
err := ticker.Run(ctx)

// ASSERT
assert.ErrorIs(t, err, context.DeadlineExceeded)

// two runs: start run + 1 tick
assert.Equal(t, 2, counter)
})
swift1337 marked this conversation as resolved.
Show resolved Hide resolved

t.Run("Halts when error occurred", func(t *testing.T) {
// ARRANGE
// Given a counter
var counter int

ctx := context.Background()

// And a ticker func that returns an error after 10 runs
ticker := New(durSmall, func(_ context.Context, t *Ticker) error {
counter++
if counter > 9 {
return fmt.Errorf("oops")
}

return nil
})

// ACT
err := ticker.Run(ctx)

// ASSERT
assert.ErrorContains(t, err, "oops")
assert.Equal(t, 10, counter)
})
swift1337 marked this conversation as resolved.
Show resolved Hide resolved

t.Run("Dynamic interval update", func(t *testing.T) {
// ARRANGE
// Given a counter
var counter int

// Given duration
duration := dur * 10

ctx, cancel := context.WithTimeout(context.Background(), duration)
defer cancel()

// And a ticker what decreases the interval by 2 each time
ticker := New(durSmall, func(_ context.Context, ticker *Ticker) error {
t.Logf("Counter: %d, Duration: %s", counter, duration.String())

counter++
duration /= 2

ticker.SetInterval(duration)

return nil
})

// ACT
err := ticker.Run(ctx)

// ASSERT
assert.ErrorIs(t, err, context.DeadlineExceeded)

// It should have run at 2 times with ctxTimeout = tickerDuration (start + 1 tick),
// But it should have run more than that because of the interval decrease
assert.GreaterOrEqual(t, counter, 2)
})
swift1337 marked this conversation as resolved.
Show resolved Hide resolved

t.Run("Stop ticker", func(t *testing.T) {
// ARRANGE
// Given a counter
var counter int

// And a context
ctx := context.Background()

// And a ticker
ticker := New(durSmall, func(_ context.Context, _ *Ticker) error {
counter++
return nil
})

// And a function with a stop signal
go func() {
time.Sleep(dur)
ticker.Stop()
}()

// ACT
err := ticker.Run(ctx)

// ASSERT
assert.NoError(t, err)
assert.Greater(t, counter, 8)

t.Run("Stop ticker for the second time", func(t *testing.T) {
ticker.Stop()
})
})
swift1337 marked this conversation as resolved.
Show resolved Hide resolved

t.Run("Panic", func(t *testing.T) {
// ARRANGE
// Given a context
ctx := context.Background()

// And a ticker
ticker := New(durSmall, func(_ context.Context, _ *Ticker) error {
panic("oops")
})

// ACT
err := ticker.Run(ctx)

// ASSERT
assert.ErrorContains(t, err, "panic during ticker run: oops")
})
swift1337 marked this conversation as resolved.
Show resolved Hide resolved

t.Run("Run as a single call", func(t *testing.T) {
// ARRANGE
// Given a counter
var counter int

// Given a context
ctx, cancel := context.WithTimeout(context.Background(), dur+durSmall)
defer cancel()

tick := func(ctx context.Context, t *Ticker) error {
counter++
return nil
}

// ACT
err := Run(ctx, dur, tick)

// ASSERT
assert.ErrorIs(t, err, context.DeadlineExceeded)
assert.Equal(t, 2, counter)
})
swift1337 marked this conversation as resolved.
Show resolved Hide resolved
}
57 changes: 30 additions & 27 deletions zetaclient/chains/evm/observer/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
"github.com/zeta-chain/protocol-contracts/pkg/contracts/evm/erc20custody.sol"
"github.com/zeta-chain/protocol-contracts/pkg/contracts/evm/zetaconnector.non-eth.sol"

"github.com/zeta-chain/zetacore/pkg/bg"
"github.com/zeta-chain/zetacore/pkg/chains"
"github.com/zeta-chain/zetacore/pkg/coin"
"github.com/zeta-chain/zetacore/pkg/constant"
"github.com/zeta-chain/zetacore/pkg/ticker"
"github.com/zeta-chain/zetacore/x/crosschain/types"
"github.com/zeta-chain/zetacore/zetaclient/chains/evm"
"github.com/zeta-chain/zetacore/zetaclient/compliance"
Expand All @@ -41,37 +43,38 @@
return err
}

ticker, err := clienttypes.NewDynamicTicker(
fmt.Sprintf("EVM_WatchInbound_%d", ob.Chain().ChainId),
ob.GetChainParams().InboundTicker,
)
if err != nil {
ob.Logger().Inbound.Error().Err(err).Msg("error creating ticker")
return err
}
defer ticker.Stop()

ob.Logger().Inbound.Info().Msgf("WatchInbound started for chain %d", ob.Chain().ChainId)
sampledLogger := ob.Logger().Inbound.Sample(&zerolog.BasicSampler{N: 10})

for {
select {
case <-ticker.C():
if !app.IsInboundObservationEnabled(ob.GetChainParams()) {
sampledLogger.Info().
Msgf("WatchInbound: inbound observation is disabled for chain %d", ob.Chain().ChainId)
continue
}
err := ob.ObserveInbound(ctx, sampledLogger)
if err != nil {
ob.Logger().Inbound.Err(err).Msg("WatchInbound: observeInbound error")
}
ticker.UpdateInterval(ob.GetChainParams().InboundTicker, ob.Logger().Inbound)
case <-ob.StopChannel():
ob.Logger().Inbound.Info().Msgf("WatchInbound stopped for chain %d", ob.Chain().ChainId)
var (
logger = ob.Logger().Inbound.Sample(&zerolog.BasicSampler{N: 10})
interval = ticker.SecondsFromUint64(ob.GetChainParams().InboundTicker)
)

Check warning on line 51 in zetaclient/chains/evm/observer/inbound.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/chains/evm/observer/inbound.go#L48-L51

Added lines #L48 - L51 were not covered by tests
swift1337 marked this conversation as resolved.
Show resolved Hide resolved

t := ticker.New(interval, func(ctx context.Context, t *ticker.Ticker) error {

Check warning on line 53 in zetaclient/chains/evm/observer/inbound.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/chains/evm/observer/inbound.go#L53

Added line #L53 was not covered by tests
lumtis marked this conversation as resolved.
Show resolved Hide resolved
// noop
if !app.IsInboundObservationEnabled(ob.GetChainParams()) {
logger.Info().Msg("WatchInbound: inbound observation is disabled")

Check warning on line 56 in zetaclient/chains/evm/observer/inbound.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/chains/evm/observer/inbound.go#L55-L56

Added lines #L55 - L56 were not covered by tests
return nil
}
}

if err := ob.ObserveInbound(ctx, logger); err != nil {
ob.Logger().Inbound.Err(err).Msg("WatchInbound: observeInbound error")

Check warning on line 61 in zetaclient/chains/evm/observer/inbound.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/chains/evm/observer/inbound.go#L60-L61

Added lines #L60 - L61 were not covered by tests
}

newInterval := ticker.SecondsFromUint64(ob.GetChainParams().InboundTicker)
t.SetInterval(newInterval)

Check warning on line 65 in zetaclient/chains/evm/observer/inbound.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/chains/evm/observer/inbound.go#L64-L65

Added lines #L64 - L65 were not covered by tests

return nil

Check warning on line 67 in zetaclient/chains/evm/observer/inbound.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/chains/evm/observer/inbound.go#L67

Added line #L67 was not covered by tests
})
swift1337 marked this conversation as resolved.
Show resolved Hide resolved

bg.Work(ctx, func(_ context.Context) error {
<-ob.StopChannel()
t.Stop()
ob.Logger().Inbound.Info().Msg("WatchInbound stopped")
return nil
})

Check warning on line 75 in zetaclient/chains/evm/observer/inbound.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/chains/evm/observer/inbound.go#L70-L75

Added lines #L70 - L75 were not covered by tests
swift1337 marked this conversation as resolved.
Show resolved Hide resolved

return t.Run(ctx)

Check warning on line 77 in zetaclient/chains/evm/observer/inbound.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/chains/evm/observer/inbound.go#L77

Added line #L77 was not covered by tests
swift1337 marked this conversation as resolved.
Show resolved Hide resolved
}

// WatchInboundTracker gets a list of Inbound tracker suggestions from zeta-core at each tick and tries to check if the in-tx was confirmed.
Expand Down
Loading