Skip to content

Commit

Permalink
add basic zetaclient implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
gartnera committed Dec 16, 2024
1 parent 29dab39 commit dc84716
Show file tree
Hide file tree
Showing 6 changed files with 177 additions and 6 deletions.
5 changes: 5 additions & 0 deletions cmd/zetaclientd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ func Start(_ *cobra.Command, _ []string) error {
graceful.ShutdownNow()
})

maintenance.NewShutdownListener(zetacoreClient, logger.Std).Listen(ctx, func() {
logger.Std.Info().Msg("Shutdown listener received an action to shutdown zetaclientd.")
graceful.ShutdownNow()
})

// CreateSignerMap: This creates a map of all signers for each chain.
// Each signer is responsible for signing transactions for a particular chain
signerMap, err := orchestrator.CreateSignerMap(ctx, tss, logger)
Expand Down
16 changes: 10 additions & 6 deletions e2e/e2etests/test_operational_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,33 @@ import (

// TestOperationalFlags tests the functionality of operations flags.
func TestOperationalFlags(r *runner.E2ERunner, _ []string) {
operationalFlagsRes, err := r.Clients.Zetacore.Observer.OperationalFlags(
_, err := r.Clients.Zetacore.Observer.OperationalFlags(
r.Ctx,
&observertypes.QueryOperationalFlagsRequest{},
)
require.NoError(r, err)

// always set to low height so it's ignored by zetaclient
nextRestartHeight := operationalFlagsRes.OperationalFlags.RestartHeight + 1
currentHeight, err := r.Clients.Zetacore.GetBlockHeight(r.Ctx)
require.NoError(r, err)

// schedule a restart for 5 blocks in the future
restartHeight := currentHeight + 5
updateMsg := observertypes.NewMsgUpdateOperationalFlags(
r.ZetaTxServer.MustGetAccountAddressFromName(utils.OperationalPolicyName),
observertypes.OperationalFlags{
RestartHeight: nextRestartHeight,
RestartHeight: restartHeight,
},
)

_, err = r.ZetaTxServer.BroadcastTx(utils.OperationalPolicyName, updateMsg)
require.NoError(r, err)

operationalFlagsRes, err = r.Clients.Zetacore.Observer.OperationalFlags(
operationalFlagsRes, err := r.Clients.Zetacore.Observer.OperationalFlags(
r.Ctx,
&observertypes.QueryOperationalFlagsRequest{},
)
require.NoError(r, err)
require.Equal(r, nextRestartHeight, operationalFlagsRes.OperationalFlags.RestartHeight)
require.Equal(r, restartHeight, operationalFlagsRes.OperationalFlags.RestartHeight)

// TODO: wait for restart height + 2 then test that start timestamp metric has increased
}
1 change: 1 addition & 0 deletions zetaclient/chains/interfaces/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ type ZetacoreClient interface {
) ([]crosschaintypes.OutboundTracker, error)
GetCrosschainFlags(ctx context.Context) (observertypes.CrosschainFlags, error)
GetRateLimiterFlags(ctx context.Context) (crosschaintypes.RateLimiterFlags, error)
GetOperationalFlags(ctx context.Context) (observertypes.OperationalFlags, error)
GetObserverList(ctx context.Context) ([]string, error)
GetBTCTSSAddress(ctx context.Context, chainID int64) (string, error)
GetZetaHotKeyBalance(ctx context.Context) (sdkmath.Int, error)
Expand Down
125 changes: 125 additions & 0 deletions zetaclient/maintenance/shutdown_listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package maintenance

import (
"context"
"time"

"cosmossdk.io/errors"
"github.com/rs/zerolog"

"github.com/zeta-chain/node/pkg/bg"
"github.com/zeta-chain/node/pkg/retry"
observertypes "github.com/zeta-chain/node/x/observer/types"
"github.com/zeta-chain/node/zetaclient/chains/interfaces"
)

const restartListenerTicker = 10 * time.Second

// ShutdownListener is a struct that listens for scheduled shutdown notices via the observer
// operational flags
type ShutdownListener struct {
client interfaces.ZetacoreClient
logger zerolog.Logger

lastRestartHeightMissed int64
}

// NewShutdownListener creates a new ShutdownListener.
func NewShutdownListener(client interfaces.ZetacoreClient, logger zerolog.Logger) *ShutdownListener {
log := logger.With().Str("module", "shutdown_listener").Logger()
return &ShutdownListener{
client: client,
logger: log,
}
}

func (o *ShutdownListener) Listen(ctx context.Context, action func()) {
var (
withLogger = bg.WithLogger(o.logger)
onComplete = bg.OnComplete(action)
)

bg.Work(ctx, o.waitForUpdate, bg.WithName("shutdown_listener.wait_for_update"), withLogger, onComplete)
}

func (o *ShutdownListener) waitForUpdate(ctx context.Context) error {
operationalFlags, err := retry.DoTypedWithBackoffAndRetry(
func() (observertypes.OperationalFlags, error) { return o.client.GetOperationalFlags(ctx) },
retry.DefaultConstantBackoff(),
)
if err != nil {
return errors.Wrap(err, "unable to get initial operational flags")
}
if o.handleNewFlags(ctx, operationalFlags) {
return nil
}

ticker := time.NewTicker(restartListenerTicker)
defer ticker.Stop()

for {
select {
case <-ticker.C:
operationalFlags, err = o.client.GetOperationalFlags(ctx)
if err != nil {
return errors.Wrap(err, "unable to get operational flags")
}
if o.handleNewFlags(ctx, operationalFlags) {
return nil
}
case <-ctx.Done():
o.logger.Info().Msg("waitForUpdate (shutdown listener) stopped")
return nil
}
}
}

// handleNewFlags processes the flags and returns true if a shutdown should be signaled
func (o *ShutdownListener) handleNewFlags(ctx context.Context, f observertypes.OperationalFlags) bool {
if f.RestartHeight < 1 {
return false
}

currentHeight, err := o.client.GetBlockHeight(ctx)
if err != nil {
o.logger.Error().Err(err).Msg("unable to get block height")
return false
}

if f.RestartHeight > currentHeight {
// only log restart height misseed once
if o.lastRestartHeightMissed != f.RestartHeight {
o.logger.Error().
Int64("restart_height", f.RestartHeight).
Int64("current_height", currentHeight).
Msg("restart height missed")
}
o.lastRestartHeightMissed = f.RestartHeight
return false
}

o.logger.Warn().
Int64("restart_height", f.RestartHeight).
Int64("current_height", currentHeight).
Msg("restart scheduled")

newBlockChan, err := o.client.NewBlockSubscriber(ctx)
if err != nil {
o.logger.Error().Err(err).Msg("unable to subscribe to new blocks")
return false
}
for {
select {
case newBlock := <-newBlockChan:
if newBlock.Block.Height >= f.RestartHeight {
o.logger.Warn().
Int64("restart_height", f.RestartHeight).
Int64("current_height", newBlock.Block.Height).
Msg("restart height reached")
return true
}
case <-ctx.Done():
return false
}
}
}
24 changes: 24 additions & 0 deletions zetaclient/testutils/mocks/zetacore_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions zetaclient/zetacore/client_operationalflags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package zetacore

import (
"context"

observertypes "github.com/zeta-chain/node/x/observer/types"
)

func (c *Client) GetOperationalFlags(ctx context.Context) (observertypes.OperationalFlags, error) {
res, err := c.Observer.OperationalFlags(ctx, &observertypes.QueryOperationalFlagsRequest{})
return res.OperationalFlags, err
}

0 comments on commit dc84716

Please sign in to comment.