Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: background worker routines to shutdown client for migration #2538

Merged
merged 26 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
0bc8952
add background threads
kingpinXD Jul 19, 2024
3338f44
add cancel for child context
kingpinXD Jul 24, 2024
625259b
add comments
kingpinXD Jul 24, 2024
ee16242
remove commented code
kingpinXD Jul 24, 2024
a0e0df5
add default constant backoff
kingpinXD Jul 24, 2024
731b03e
generate files
kingpinXD Jul 24, 2024
b62938a
resolve comments 1
kingpinXD Jul 24, 2024
2cce32c
resolve comments 1
kingpinXD Jul 24, 2024
31e854c
resolve comments 2
kingpinXD Jul 24, 2024
cae80f2
rename to callback to clarify terminiology
kingpinXD Jul 24, 2024
6834fa8
remove cancel cause
kingpinXD Jul 24, 2024
476ee0a
rebase develop
kingpinXD Jul 25, 2024
55a3c82
Merge branch 'develop' into restart-thread-zetaclient
kingpinXD Jul 27, 2024
6b058e2
generate files
kingpinXD Jul 28, 2024
0c30b3d
move changelog to unreleased
kingpinXD Jul 31, 2024
f993da5
Add bg.OnComplete
swift1337 Jul 31, 2024
7c113a5
Add maintenance package. Move TSS listener to maintenance
swift1337 Jul 31, 2024
8e93dee
Merge branch 'develop' into restart-thread-zetaclient
swift1337 Aug 2, 2024
a665e67
Fix merge conflicts
swift1337 Aug 2, 2024
b93f067
fix tss migration test
kingpinXD Aug 2, 2024
4d8c689
Merge remote-tracking branch 'origin/restart-thread-zetaclient' into …
kingpinXD Aug 2, 2024
42a57a7
add structured logging
kingpinXD Aug 2, 2024
1c132de
remove check fro nonce 0
kingpinXD Aug 6, 2024
b5c0d77
fix OutboundID generation to make the identifier more unique
ws4charlie Aug 6, 2024
c5da6e1
rebase develop
kingpinXD Aug 6, 2024
3895d4f
Merge branch 'develop' into restart-thread-zetaclient
kingpinXD Aug 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

* [2578](https://github.com/zeta-chain/node/pull/2578) - Add Gateway address in protocol contract list
* [2597](https://github.com/zeta-chain/node/pull/2597) - Add generic rpc metrics to zetaclient
* [2538](https://github.com/zeta-chain/node/pull/2538) - add background worker routines to shutdown zetaclientd when needed for tss migration

## v19.0.0

Expand Down
133 changes: 5 additions & 128 deletions cmd/zetaclientd-supervisor/lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,15 @@ import (
"runtime"
"strings"
"sync"
"syscall"
"time"

"github.com/cosmos/cosmos-sdk/client/grpc/tmservice"
upgradetypes "github.com/cosmos/cosmos-sdk/x/upgrade/types"
"github.com/hashicorp/go-getter"
"github.com/rs/zerolog"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

observertypes "github.com/zeta-chain/zetacore/x/observer/types"
"github.com/zeta-chain/zetacore/zetaclient/config"
)

Expand Down Expand Up @@ -79,7 +78,7 @@ func newZetaclientdSupervisor(
logger = logger.With().Str("module", "zetaclientdSupervisor").Logger()
conn, err := grpc.Dial(
fmt.Sprintf("%s:9090", zetaCoreURL),
grpc.WithInsecure(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
return nil, fmt.Errorf("grpc dial: %w", err)
Expand All @@ -99,9 +98,6 @@ func newZetaclientdSupervisor(
func (s *zetaclientdSupervisor) Start(ctx context.Context) {
go s.watchForVersionChanges(ctx)
go s.handleCoreUpgradePlan(ctx)
go s.handleNewKeygen(ctx)
go s.handleNewTSSKeyGeneration(ctx)
go s.handleTSSUpdate(ctx)
}

func (s *zetaclientdSupervisor) WaitForReloadSignal(ctx context.Context) {
Expand Down Expand Up @@ -177,125 +173,6 @@ func (s *zetaclientdSupervisor) watchForVersionChanges(ctx context.Context) {
}
}

func (s *zetaclientdSupervisor) handleTSSUpdate(ctx context.Context) {
maxRetries := 11
retryInterval := 5 * time.Second

// TODO : use retry library under pkg/retry
// https://github.com/zeta-chain/node/issues/2492
for i := 0; i < maxRetries; i++ {
client := observertypes.NewQueryClient(s.zetacoredConn)
tss, err := client.TSS(ctx, &observertypes.QueryGetTSSRequest{})
if err != nil {
s.logger.Warn().Err(err).Msg("unable to get original tss")
time.Sleep(retryInterval)
continue
}
i = 0
for {
select {
case <-time.After(time.Second):
case <-ctx.Done():
return
}
tssNew, err := client.TSS(ctx, &observertypes.QueryGetTSSRequest{})
if err != nil {
s.logger.Warn().Err(err).Msg("unable to get tss")
continue
}

if tssNew.TSS.TssPubkey == tss.TSS.TssPubkey {
continue
}

tss = tssNew
s.logger.Info().
Msgf("tss address is updated from %s to %s", tss.TSS.TssPubkey, tssNew.TSS.TssPubkey)
time.Sleep(6 * time.Second)
s.logger.Info().Msg("restarting zetaclientd to update tss address")
s.restartChan <- syscall.SIGHUP
}
}
s.logger.Warn().Msg("handleTSSUpdate exiting without success")
}

func (s *zetaclientdSupervisor) handleNewTSSKeyGeneration(ctx context.Context) {
maxRetries := 11
retryInterval := 5 * time.Second

// TODO : use retry library under pkg/retry
for i := 0; i < maxRetries; i++ {
client := observertypes.NewQueryClient(s.zetacoredConn)
alltss, err := client.TssHistory(ctx, &observertypes.QueryTssHistoryRequest{})
if err != nil {
s.logger.Warn().Err(err).Msg("unable to get tss original history")
time.Sleep(retryInterval)
continue
}
i = 0
tssLenCurrent := len(alltss.TssList)
for {
select {
case <-time.After(time.Second):
case <-ctx.Done():
return
}
tssListNew, err := client.TssHistory(ctx, &observertypes.QueryTssHistoryRequest{})
if err != nil {
s.logger.Warn().Err(err).Msg("unable to get tss new history")
continue
}
tssLenUpdated := len(tssListNew.TssList)

if tssLenUpdated == tssLenCurrent {
continue
}
if tssLenUpdated < tssLenCurrent {
tssLenCurrent = len(tssListNew.TssList)
continue
}

tssLenCurrent = tssLenUpdated
s.logger.Info().Msgf("tss list updated from %d to %d", tssLenCurrent, tssLenUpdated)
time.Sleep(5 * time.Second)
s.logger.Info().Msg("restarting zetaclientd to update tss list")
s.restartChan <- syscall.SIGHUP
}
}
s.logger.Warn().Msg("handleNewTSSKeyGeneration exiting without success")
}

func (s *zetaclientdSupervisor) handleNewKeygen(ctx context.Context) {
client := observertypes.NewQueryClient(s.zetacoredConn)
prevKeygenBlock := int64(0)
for {
select {
case <-time.After(time.Second):
case <-ctx.Done():
return
}
resp, err := client.Keygen(ctx, &observertypes.QueryGetKeygenRequest{})
if err != nil {
s.logger.Warn().Err(err).Msg("unable to get keygen")
continue
}
if resp.Keygen == nil {
s.logger.Warn().Err(err).Msg("keygen is nil")
continue
}

if resp.Keygen.Status != observertypes.KeygenStatus_PendingKeygen {
continue
}
keygenBlock := resp.Keygen.BlockNumber
if prevKeygenBlock == keygenBlock {
continue
}
prevKeygenBlock = keygenBlock
s.logger.Info().Msgf("got new keygen at block %d", keygenBlock)
s.restartChan <- syscall.SIGHUP
}
}
func (s *zetaclientdSupervisor) handleCoreUpgradePlan(ctx context.Context) {
client := upgradetypes.NewQueryClient(s.zetacoredConn)

Expand Down Expand Up @@ -345,16 +222,16 @@ func (s *zetaclientdSupervisor) downloadZetaclientd(ctx context.Context, plan *u
if plan.Info == "" {
return errors.New("upgrade info empty")
}
var config upgradeConfig
err := json.Unmarshal([]byte(plan.Info), &config)
var cfg upgradeConfig
err := json.Unmarshal([]byte(plan.Info), &cfg)
if err != nil {
return fmt.Errorf("unmarshal upgrade config: %w", err)
}

s.logger.Info().Msg("downloading zetaclientd")

binKey := fmt.Sprintf("%s-%s/%s", zetaclientdBinaryName, runtime.GOOS, runtime.GOARCH)
binURL, ok := config.Binaries[binKey]
binURL, ok := cfg.Binaries[binKey]
if !ok {
return fmt.Errorf("no binary found for: %s", binKey)
}
Expand Down
12 changes: 10 additions & 2 deletions cmd/zetaclientd-supervisor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"syscall"
"time"

"cosmossdk.io/errors"
"golang.org/x/sync/errgroup"

"github.com/zeta-chain/zetacore/app"
Expand Down Expand Up @@ -69,10 +70,17 @@ func main() {
cmd.Stdin = &passwordInputBuffer

eg, ctx := errgroup.WithContext(ctx)
eg.Go(cmd.Run)
eg.Go(func() error {
defer cancel()
if err := cmd.Run(); err != nil {
return errors.Wrap(err, "zetaclient process failed")
}

logger.Info().Msg("zetaclient process exited")
return nil
})
eg.Go(func() error {
supervisor.WaitForReloadSignal(ctx)
cancel()
return nil
})
eg.Go(func() error {
Expand Down
22 changes: 16 additions & 6 deletions cmd/zetaclientd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/zeta-chain/zetacore/zetaclient/chains/base"
"github.com/zeta-chain/zetacore/zetaclient/config"
zctx "github.com/zeta-chain/zetacore/zetaclient/context"
"github.com/zeta-chain/zetacore/zetaclient/maintenance"
"github.com/zeta-chain/zetacore/zetaclient/metrics"
"github.com/zeta-chain/zetacore/zetaclient/orchestrator"
mc "github.com/zeta-chain/zetacore/zetaclient/tss"
Expand Down Expand Up @@ -206,6 +207,16 @@ func start(_ *cobra.Command, _ []string) error {
// Set P2P ID for telemetry
telemetryServer.SetP2PID(server.GetLocalPeerID())

// Creating a channel to listen for os signals (or other signals)
signalChannel := make(chan os.Signal, 1)
signal.Notify(signalChannel, syscall.SIGINT, syscall.SIGTERM)

// Maintenance workers ============
maintenance.NewTSSListener(zetacoreClient, masterLogger).Listen(ctx, func() {
masterLogger.Info().Msg("TSS listener received an action to shutdown zetaclientd.")
signalChannel <- syscall.SIGTERM
})

// Generate a new TSS if keygen is set and add it into the tss server
// If TSS has already been generated, and keygen was successful ; we use the existing TSS
err = GenerateTSS(ctx, masterLogger, zetacoreClient, server)
Expand Down Expand Up @@ -257,7 +268,7 @@ func start(_ *cobra.Command, _ []string) error {
// Update Current TSS value from zetacore, if TSS keygen is successful, the TSS address is set on zeta-core
// Returns err if the RPC call fails as zeta client needs the current TSS address to be set
// This is only needed in case of a new Keygen , as the TSS address is set on zetacore only after the keygen is successful i.e enough votes have been broadcast
currentTss, err := zetacoreClient.GetCurrentTSS(ctx)
currentTss, err := zetacoreClient.GetTSS(ctx)
if err != nil {
startLogger.Error().Err(err).Msg("GetCurrentTSS error")
return err
Expand Down Expand Up @@ -350,11 +361,10 @@ func start(_ *cobra.Command, _ []string) error {
// defer zetaSupplyChecker.Stop()
//}

startLogger.Info().Msgf("awaiting the os.Interrupt, syscall.SIGTERM signals...")
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
sig := <-ch
startLogger.Info().Msgf("stop signal received: %s", sig)
startLogger.Info().Msgf("Zetaclientd is running")

sig := <-signalChannel
startLogger.Info().Msgf("Stop signal received: %q", sig)

zetacoreClient.Stop()
kingpinXD marked this conversation as resolved.
Show resolved Hide resolved

Expand Down
2 changes: 2 additions & 0 deletions contrib/localnet/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ services:
- ETHDEV_ENDPOINT=http://eth:8545
- HOTKEY_BACKEND=file
- HOTKEY_PASSWORD=password # test purposes only
restart: always
volumes:
- ssh:/root/.ssh
- preparams:/root/preparams
Expand All @@ -142,6 +143,7 @@ services:
- ETHDEV_ENDPOINT=http://eth:8545
- HOTKEY_BACKEND=file
- HOTKEY_PASSWORD=password # test purposes only
restart: always
volumes:
- ssh:/root/.ssh
- preparams:/root/preparams
Expand Down
38 changes: 28 additions & 10 deletions pkg/bg/bg.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ import (
)

type config struct {
name string
logger zerolog.Logger
name string
logger zerolog.Logger
onComplete func()
}

type Opt func(*config)
Expand All @@ -20,15 +21,22 @@ func WithName(name string) Opt {
return func(cfg *config) { cfg.name = name }
}

// OnComplete is a callback function that is called
// when the background task is completed without an error
func OnComplete(fn func()) Opt {
return func(cfg *config) { cfg.onComplete = fn }
}

func WithLogger(logger zerolog.Logger) Opt {
return func(cfg *config) { cfg.logger = logger }
}

// Work emits a new task in the background
func Work(ctx context.Context, f func(context.Context) error, opts ...Opt) {
cfg := config{
name: "",
logger: zerolog.Nop(),
name: "",
logger: zerolog.Nop(),
onComplete: nil,
}

for _, opt := range opts {
Expand All @@ -45,10 +53,25 @@ func Work(ctx context.Context, f func(context.Context) error, opts ...Opt) {

if err := f(ctx); err != nil {
logError(err, cfg, false)
return
}

if cfg.onComplete != nil {
cfg.onComplete()
}

cfg.logger.Info().Str("worker.name", cfg.getName()).Msg("Background task completed")
}()
}

func (c config) getName() string {
if c.name != "" {
return c.name
}

return "unknown"
}

func logError(err error, cfg config, isPanic bool) {
if err == nil {
return
Expand All @@ -71,10 +94,5 @@ func logError(err error, cfg config, isPanic bool) {
evt.Bytes("stack_trace", buf)
}

name := cfg.name
if name == "" {
name = "unknown"
}

evt.Str("worker.name", name).Msg("Background task failed")
evt.Str("worker.name", cfg.getName()).Msg("Background task failed")
}
Loading
Loading