From eb7887feb7212846d09dc45d4601df81872f523d Mon Sep 17 00:00:00 2001 From: Nathan Haim Date: Thu, 12 Dec 2024 22:45:52 +0100 Subject: [PATCH 1/7] add exponential retry mechanism for rpc requests using utility function --- go.mod | 2 +- go.sum | 2 + relayer/application_relayer.go | 78 ++++++++++------------------ utils/backoff.go | 44 ++++++++++++++++ vms/evm/subscriber.go | 95 ++++++++++++++-------------------- 5 files changed, 113 insertions(+), 108 deletions(-) create mode 100644 utils/backoff.go diff --git a/go.mod b/go.mod index 479be9e3..305407e8 100644 --- a/go.mod +++ b/go.mod @@ -49,7 +49,7 @@ require ( github.com/bits-and-blooms/bitset v1.10.0 // indirect github.com/btcsuite/btcd/btcec/v2 v2.3.2 // indirect github.com/btcsuite/btcd/btcutil v1.1.3 // indirect - github.com/cenkalti/backoff/v4 v4.2.1 // indirect + github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cockroachdb/errors v1.9.1 // indirect github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect diff --git a/go.sum b/go.sum index 14f9660a..bfa6459b 100644 --- a/go.sum +++ b/go.sum @@ -131,6 +131,8 @@ github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtE github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs= github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/cp v0.1.0 h1:SE+dxFebS7Iik5LK0tsi1k9ZCxEaFX4AjQmoyA+1dJk= github.com/cespare/cp v0.1.0/go.mod h1:SOGHArjBr4JWaSDEVpWpo/hNg6RoKrls6Oh40hiwW+s= diff --git a/relayer/application_relayer.go b/relayer/application_relayer.go index f6fd2588..7f2f451a 100644 --- a/relayer/application_relayer.go +++ b/relayer/application_relayer.go @@ -30,15 +30,10 @@ import ( const ( // Number of retries to collect signatures from validators maxRelayerQueryAttempts = 5 - // Maximum amount of time to spend waiting (in addition to network round trip time per attempt) - // during relayer signature query routine - signatureRequestRetryWaitPeriodMs = 10_000 ) -var ( - // Errors - errFailedToGetAggSig = errors.New("failed to get aggregate signature from node endpoint") -) +// Errors +var errFailedToGetAggSig = errors.New("failed to get aggregate signature from node endpoint") // CheckpointManager stores committed heights in the database type CheckpointManager interface { @@ -276,53 +271,36 @@ func (r *ApplicationRelayer) createSignedMessage( signedWarpMessageBytes hexutil.Bytes err error ) - for attempt := 1; attempt <= maxRelayerQueryAttempts; attempt++ { - r.logger.Debug( - "Relayer collecting signatures from peers.", - zap.Int("attempt", attempt), - zap.String("sourceBlockchainID", r.sourceBlockchain.GetBlockchainID().String()), - zap.String("destinationBlockchainID", r.relayerID.DestinationBlockchainID.String()), - zap.String("signingSubnetID", r.signingSubnetID.String()), - ) - - err = r.sourceWarpSignatureClient.CallContext( - context.Background(), - &signedWarpMessageBytes, - "warp_getMessageAggregateSignature", - unsignedMessage.ID(), - r.warpConfig.QuorumNumerator, - r.signingSubnetID.String(), - ) - if err == nil { - warpMsg, err := avalancheWarp.ParseMessage(signedWarpMessageBytes) - if err != nil { - r.logger.Error( - "Failed to parse signed warp message", - zap.Error(err), - ) - return nil, err - } - return warpMsg, err - } - r.logger.Info( - "Failed to get aggregate signature from node endpoint. Retrying.", - zap.Int("attempt", attempt), - zap.Error(err), - ) - if attempt != maxRelayerQueryAttempts { - // Sleep such that all retries are uniformly spread across totalRelayerQueryPeriodMs - // TODO: We may want to consider an exponential back off rather than a uniform sleep period. - time.Sleep(time.Duration(signatureRequestRetryWaitPeriodMs/maxRelayerQueryAttempts) * time.Millisecond) - } - } - r.logger.Warn( - "Failed to get aggregate signature from node endpoint", - zap.Int("attempts", maxRelayerQueryAttempts), + err = utils.WithMaxRetriesLog( + func() error { + return r.sourceWarpSignatureClient.CallContext( + context.Background(), + &signedWarpMessageBytes, + "warp_getMessageAggregateSignature", + unsignedMessage.ID(), + r.warpConfig.QuorumNumerator, + r.signingSubnetID.String(), + ) + }, + maxRelayerQueryAttempts, + r.logger, + "Failed to get aggregate signature from node endpoint.", zap.String("sourceBlockchainID", r.sourceBlockchain.GetBlockchainID().String()), zap.String("destinationBlockchainID", r.relayerID.DestinationBlockchainID.String()), zap.String("signingSubnetID", r.signingSubnetID.String()), ) - return nil, errFailedToGetAggSig + if err != nil { + return nil, errFailedToGetAggSig + } + warpMsg, err := avalancheWarp.ParseMessage(signedWarpMessageBytes) + if err != nil { + r.logger.Error( + "Failed to parse signed warp message", + zap.Error(err), + ) + return nil, err + } + return warpMsg, nil } // diff --git a/utils/backoff.go b/utils/backoff.go new file mode 100644 index 00000000..5b5b1a89 --- /dev/null +++ b/utils/backoff.go @@ -0,0 +1,44 @@ +package utils + +import ( + "time" + + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/cenkalti/backoff/v4" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +// WithMaxRetriesLog runs the operation until it succeeds or max retries has been reached. +// It uses exponential back off. +// It optionally logs information if logger is set. +func WithMaxRetriesLog( + operation backoff.Operation, + max uint64, + logger logging.Logger, + msg string, + fields ...zapcore.Field, +) error { + attempt := uint(1) + expBackOff := backoff.WithMaxRetries(backoff.NewExponentialBackOff(), max) + notify := func(err error, duration time.Duration) { + if logger == nil { + return + } + fields := append(fields, zap.Uint("attempt", attempt), zap.Error(err), zap.Duration("backoff", duration)) + logger.Warn(msg, fields...) + attempt++ + } + err := backoff.RetryNotify(operation, expBackOff, notify) + if err != nil && logger != nil { + fields := append(fields, zap.Uint64("attempts", uint64(attempt)), zap.Error(err)) + logger.Error(msg, fields...) + } + return err +} + +// WithMaxRetries rens the operation until it succeeds or max retries has been reached. +// It uses exponential back off. +func WithMaxRetries(operation backoff.Operation, max uint64) error { + return WithMaxRetriesLog(operation, max, nil, "") +} diff --git a/vms/evm/subscriber.go b/vms/evm/subscriber.go index 672e9290..e0176edd 100644 --- a/vms/evm/subscriber.go +++ b/vms/evm/subscriber.go @@ -11,6 +11,7 @@ import ( "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/icm-services/utils" "github.com/ava-labs/subnet-evm/core/types" "github.com/ava-labs/subnet-evm/ethclient" "github.com/ava-labs/subnet-evm/interfaces" @@ -22,7 +23,7 @@ const ( maxClientSubscriptionBuffer = 20000 subscribeRetryTimeout = 1 * time.Second MaxBlocksPerRequest = 200 - rpcMaxRetries = 5 + rpcMaxTries = 5 ) // subscriber implements Subscriber @@ -128,75 +129,55 @@ func (s *subscriber) processBlockRange( func (s *subscriber) getHeaderByNumberRetryable(headerNumber *big.Int) (*types.Header, error) { var err error var header *types.Header - attempt := 1 - for { - header, err = s.rpcClient.HeaderByNumber(context.Background(), headerNumber) - if err == nil { - return header, nil - } - s.logger.Warn( - "Failed to get header by number", - zap.String("blockchainID", s.blockchainID.String()), - zap.Int("attempt", attempt), - zap.Error(err), - ) - if attempt >= rpcMaxRetries { - return nil, err - } - time.Sleep(subscribeRetryTimeout) - attempt++ + err = utils.WithMaxRetriesLog( + func() (err error) { + header, err = s.rpcClient.HeaderByNumber(context.Background(), headerNumber) + return err + }, + rpcMaxTries, + s.logger, + "Failed to get header by number", + zap.String("blockchainID", s.blockchainID.String()), + zap.Error(err), + ) + if err != nil { + return nil, err } + return header, nil } // Loops forever iff maxResubscribeAttempts == 0 func (s *subscriber) Subscribe(maxResubscribeAttempts int) error { - // Retry subscribing until successful. Attempt to resubscribe maxResubscribeAttempts times - attempt := 1 - for { - // Unsubscribe before resubscribing - // s.sub should only be nil on the first call to Subscribe - if s.sub != nil { - s.sub.Unsubscribe() - } - err := s.subscribe() - if err == nil { - s.logger.Info( - "Successfully subscribed", - zap.String("blockchainID", s.blockchainID.String()), - ) - return nil - } - - s.logger.Warn( - "Failed to subscribe to node", - zap.Int("attempt", attempt), - zap.String("blockchainID", s.blockchainID.String()), - zap.Error(err), - ) - - if attempt == maxResubscribeAttempts { - break - } - - time.Sleep(subscribeRetryTimeout) - attempt++ + // Unsubscribe before resubscribing + // s.sub should only be nil on the first call to Subscribe + if s.sub != nil { + s.sub.Unsubscribe() } - return fmt.Errorf("failed to subscribe to node with all %d attempts", maxResubscribeAttempts) + err := s.subscribe(uint64(maxResubscribeAttempts)) + if err != nil { + return fmt.Errorf("failed to subscribe to node with all %d attempts", maxResubscribeAttempts) + } + return nil } -func (s *subscriber) subscribe() error { - sub, err := s.wsClient.SubscribeNewHead(context.Background(), s.headers) +// subscribe until it succeeds or reached maxSubscribeAttempts. +func (s *subscriber) subscribe(maxSubscribeAttempts uint64) error { + var sub interfaces.Subscription + err := utils.WithMaxRetriesLog( + func() (err error) { + sub, err = s.wsClient.SubscribeNewHead(context.Background(), s.headers) + return err + }, + maxSubscribeAttempts, + s.logger, + "Failed to subscribe to node", + zap.String("blockchainID", s.blockchainID.String()), + ) if err != nil { - s.logger.Error( - "Failed to subscribe to logs", - zap.String("blockchainID", s.blockchainID.String()), - zap.Error(err), - ) return err } s.sub = sub - return nil } From 430fcaf61c0fc47e0ab94bbc138c768a6fb047b4 Mon Sep 17 00:00:00 2001 From: Nathan Haim Date: Thu, 12 Dec 2024 23:02:18 +0100 Subject: [PATCH 2/7] add tests to retry mechanism --- utils/backoff_test.go | 55 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100644 utils/backoff_test.go diff --git a/utils/backoff_test.go b/utils/backoff_test.go new file mode 100644 index 00000000..e6b9f038 --- /dev/null +++ b/utils/backoff_test.go @@ -0,0 +1,55 @@ +package utils + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestWithMaxRetries(t *testing.T) { + t.Run("NotEnoughRetry", func(t *testing.T) { + retryable := newMockRetryableFn(3) + err := WithMaxRetries( + func() (err error) { + _, err = retryable.Run() + return err + }, + 2, + ) + require.Error(t, err) + }) + t.Run("EnoughRetry", func(t *testing.T) { + retryable := newMockRetryableFn(2) + var res bool + err := WithMaxRetries( + func() (err error) { + res, err = retryable.Run() + return err + }, + 2, + ) + require.NoError(t, err) + require.True(t, res) + }) +} + +type mockRetryableFn struct { + counter uint64 + trigger uint64 +} + +func newMockRetryableFn(trigger uint64) mockRetryableFn { + return mockRetryableFn{ + counter: 0, + trigger: trigger, + } +} + +func (m *mockRetryableFn) Run() (bool, error) { + if m.counter == m.trigger { + return true, nil + } + m.counter++ + return false, errors.New("error") +} From 43fc8b2c1b623ff22fca939cd3f84c2b962782f3 Mon Sep 17 00:00:00 2001 From: Nathan Haim Date: Fri, 20 Dec 2024 17:01:57 +0100 Subject: [PATCH 3/7] update backoff mechanism + use it in additional places --- messages/teleporter/message_handler.go | 12 +++--- relayer/application_relayer.go | 38 ++++++++-------- relayer/message_coordinator.go | 2 +- signature-aggregator/aggregator/aggregator.go | 43 ++++++++++--------- types/types.go | 22 +++++----- utils/backoff.go | 35 ++++----------- utils/backoff_test.go | 12 ++++-- vms/evm/subscriber.go | 43 +++++++++---------- 8 files changed, 97 insertions(+), 110 deletions(-) diff --git a/messages/teleporter/message_handler.go b/messages/teleporter/message_handler.go index befc720c..01db70af 100644 --- a/messages/teleporter/message_handler.go +++ b/messages/teleporter/message_handler.go @@ -364,12 +364,12 @@ func (m *messageHandler) waitForReceipt( destinationBlockchainID := destinationClient.DestinationBlockchainID() callCtx, callCtxCancel := context.WithTimeout(context.Background(), 30*time.Second) defer callCtxCancel() - receipt, err := utils.CallWithRetry[*types.Receipt]( - callCtx, - func() (*types.Receipt, error) { - return destinationClient.Client().(ethclient.Client).TransactionReceipt(callCtx, txHash) - }, - ) + var receipt *types.Receipt + operation := func() (err error) { + receipt, err = destinationClient.Client().(ethclient.Client).TransactionReceipt(callCtx, txHash) + return err + } + err := utils.WithMaxRetries(operation, 30*time.Second, m.logger) if err != nil { m.logger.Error( "Failed to get transaction receipt", diff --git a/relayer/application_relayer.go b/relayer/application_relayer.go index 7f2f451a..4f236f49 100644 --- a/relayer/application_relayer.go +++ b/relayer/application_relayer.go @@ -28,8 +28,7 @@ import ( ) const ( - // Number of retries to collect signatures from validators - maxRelayerQueryAttempts = 5 + retryMaxElapsedTime = 10 * time.Second ) // Errors @@ -271,25 +270,24 @@ func (r *ApplicationRelayer) createSignedMessage( signedWarpMessageBytes hexutil.Bytes err error ) - err = utils.WithMaxRetriesLog( - func() error { - return r.sourceWarpSignatureClient.CallContext( - context.Background(), - &signedWarpMessageBytes, - "warp_getMessageAggregateSignature", - unsignedMessage.ID(), - r.warpConfig.QuorumNumerator, - r.signingSubnetID.String(), - ) - }, - maxRelayerQueryAttempts, - r.logger, - "Failed to get aggregate signature from node endpoint.", - zap.String("sourceBlockchainID", r.sourceBlockchain.GetBlockchainID().String()), - zap.String("destinationBlockchainID", r.relayerID.DestinationBlockchainID.String()), - zap.String("signingSubnetID", r.signingSubnetID.String()), - ) + operation := func() error { + return r.sourceWarpSignatureClient.CallContext( + context.Background(), + &signedWarpMessageBytes, + "warp_getMessageAggregateSignature", + unsignedMessage.ID(), + r.warpConfig.QuorumNumerator, + r.signingSubnetID.String(), + ) + } + err = utils.WithMaxRetries(operation, retryMaxElapsedTime, r.logger) if err != nil { + r.logger.Error( + "Failed to get aggregate signature from node endpoint.", + zap.String("sourceBlockchainID", r.sourceBlockchain.GetBlockchainID().String()), + zap.String("destinationBlockchainID", r.relayerID.DestinationBlockchainID.String()), + zap.String("signingSubnetID", r.signingSubnetID.String()), + ) return nil, errFailedToGetAggSig } warpMsg, err := avalancheWarp.ParseMessage(signedWarpMessageBytes) diff --git a/relayer/message_coordinator.go b/relayer/message_coordinator.go index 482991e7..69653620 100644 --- a/relayer/message_coordinator.go +++ b/relayer/message_coordinator.go @@ -234,7 +234,7 @@ func (mc *MessageCoordinator) ProcessBlock( zap.Stringer("blockchainID", blockchainID), ) // Parse the logs in the block, and group by application relayer - block, err := relayerTypes.NewWarpBlockInfo(blockHeader, ethClient) + block, err := relayerTypes.NewWarpBlockInfo(mc.logger, blockHeader, ethClient) if err != nil { mc.logger.Error("Failed to create Warp block info", zap.Error(err)) errChan <- err diff --git a/signature-aggregator/aggregator/aggregator.go b/signature-aggregator/aggregator/aggregator.go index f2bdc939..1d733147 100644 --- a/signature-aggregator/aggregator/aggregator.go +++ b/signature-aggregator/aggregator/aggregator.go @@ -31,6 +31,7 @@ import ( "github.com/ava-labs/icm-services/signature-aggregator/metrics" "github.com/ava-labs/icm-services/utils" msg "github.com/ava-labs/subnet-evm/plugin/evm/message" + "github.com/cenkalti/backoff/v4" "go.uber.org/zap" "google.golang.org/protobuf/proto" ) @@ -42,7 +43,7 @@ const ( maxRelayerQueryAttempts = 10 // Maximum amount of time to spend waiting (in addition to network round trip time per attempt) // during relayer signature query routine - signatureRequestRetryWaitPeriodMs = 20_000 + signatureRequestMaxElapsedTime = 20 * time.Second ) var ( @@ -224,12 +225,12 @@ func (s *SignatureAggregator) CreateSignedMessage( return nil, fmt.Errorf("%s: %w", msg, err) } + var signedMsg *avalancheWarp.Message // Query the validators with retries. On each retry, query one node per unique BLS pubkey - for attempt := 1; attempt <= maxRelayerQueryAttempts; attempt++ { + operation := func() error { responsesExpected := len(connectedValidators.ValidatorSet) - len(signatureMap) s.logger.Debug( "Aggregator collecting signatures from peers.", - zap.Int("attempt", attempt), zap.String("sourceBlockchainID", unsignedMessage.SourceChainID.String()), zap.String("signingSubnetID", signingSubnet.String()), zap.Int("validatorSetSize", len(connectedValidators.ValidatorSet)), @@ -296,7 +297,8 @@ func (s *SignatureAggregator) CreateSignedMessage( zap.String("warpMessageID", unsignedMessage.ID().String()), zap.String("sourceBlockchainID", unsignedMessage.SourceChainID.String()), ) - signedMsg, relevant, err := s.handleResponse( + var relevant bool + signedMsg, relevant, err = s.handleResponse( response, sentTo, requestID, @@ -309,10 +311,10 @@ func (s *SignatureAggregator) CreateSignedMessage( if err != nil { // don't increase node failures metric here, because we did // it in handleResponse - return nil, fmt.Errorf( + return backoff.Permanent(fmt.Errorf( "failed to handle response: %w", err, - ) + )) } if relevant { responseCount++ @@ -325,7 +327,7 @@ func (s *SignatureAggregator) CreateSignedMessage( zap.Uint64("signatureWeight", accumulatedSignatureWeight.Uint64()), zap.String("sourceBlockchainID", unsignedMessage.SourceChainID.String()), ) - return signedMsg, nil + return nil } // Break once we've had successful or unsuccessful responses from each requested node if responseCount == responsesExpected { @@ -333,20 +335,21 @@ func (s *SignatureAggregator) CreateSignedMessage( } } } - if attempt != maxRelayerQueryAttempts { - // Sleep such that all retries are uniformly spread across totalRelayerQueryPeriodMs - // TODO: We may want to consider an exponential back off rather than a uniform sleep period. - time.Sleep(time.Duration(signatureRequestRetryWaitPeriodMs/maxRelayerQueryAttempts) * time.Millisecond) - } + return errNotEnoughSignatures } - s.logger.Warn( - "Failed to collect a threshold of signatures", - zap.Int("attempts", maxRelayerQueryAttempts), - zap.String("warpMessageID", unsignedMessage.ID().String()), - zap.Uint64("accumulatedWeight", accumulatedSignatureWeight.Uint64()), - zap.String("sourceBlockchainID", unsignedMessage.SourceChainID.String()), - ) - return nil, errNotEnoughSignatures + + err = utils.WithMaxRetries(operation, signatureRequestMaxElapsedTime, s.logger) + if err != nil { + s.logger.Warn( + "Failed to collect a threshold of signatures", + zap.Int("attempts", maxRelayerQueryAttempts), + zap.String("warpMessageID", unsignedMessage.ID().String()), + zap.Uint64("accumulatedWeight", accumulatedSignatureWeight.Uint64()), + zap.String("sourceBlockchainID", unsignedMessage.SourceChainID.String()), + ) + return nil, errNotEnoughSignatures + } + return signedMsg, nil } func (s *SignatureAggregator) getSubnetID(blockchainID ids.ID) (ids.ID, error) { diff --git a/types/types.go b/types/types.go index cbb00b35..d4a07bed 100644 --- a/types/types.go +++ b/types/types.go @@ -6,7 +6,9 @@ package types import ( "context" "errors" + "time" + "github.com/ava-labs/avalanchego/utils/logging" avalancheWarp "github.com/ava-labs/avalanchego/vms/platformvm/warp" "github.com/ava-labs/icm-services/utils" "github.com/ava-labs/subnet-evm/core/types" @@ -38,7 +40,7 @@ type WarpMessageInfo struct { } // Extract Warp logs from the block, if they exist -func NewWarpBlockInfo(header *types.Header, ethClient ethclient.Client) (*WarpBlockInfo, error) { +func NewWarpBlockInfo(logger logging.Logger, header *types.Header, ethClient ethclient.Client) (*WarpBlockInfo, error) { var ( logs []types.Log err error @@ -47,16 +49,16 @@ func NewWarpBlockInfo(header *types.Header, ethClient ethclient.Client) (*WarpBl if header.Bloom.Test(WarpPrecompileLogFilter[:]) { cctx, cancel := context.WithTimeout(context.Background(), utils.DefaultRPCRetryTimeout) defer cancel() - logs, err = utils.CallWithRetry[[]types.Log]( - cctx, - func() ([]types.Log, error) { - return ethClient.FilterLogs(context.Background(), interfaces.FilterQuery{ - Topics: [][]common.Hash{{WarpPrecompileLogFilter}}, - Addresses: []common.Address{warp.ContractAddress}, - FromBlock: header.Number, - ToBlock: header.Number, - }) + operation := func() (err error) { + logs, err = ethClient.FilterLogs(cctx, interfaces.FilterQuery{ + Topics: [][]common.Hash{{WarpPrecompileLogFilter}}, + Addresses: []common.Address{warp.ContractAddress}, + FromBlock: header.Number, + ToBlock: header.Number, }) + return err + } + err = utils.WithMaxRetries(operation, 5*time.Second, logger) if err != nil { return nil, err } diff --git a/utils/backoff.go b/utils/backoff.go index 5b5b1a89..70879a30 100644 --- a/utils/backoff.go +++ b/utils/backoff.go @@ -5,40 +5,21 @@ import ( "github.com/ava-labs/avalanchego/utils/logging" "github.com/cenkalti/backoff/v4" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" ) -// WithMaxRetriesLog runs the operation until it succeeds or max retries has been reached. -// It uses exponential back off. -// It optionally logs information if logger is set. -func WithMaxRetriesLog( +// WithMaxRetries uses an exponential backoff to run the operation until it +// succeeds or max elapsed time has been reached. +func WithMaxRetries( operation backoff.Operation, - max uint64, + maxElapsedTime time.Duration, logger logging.Logger, - msg string, - fields ...zapcore.Field, ) error { - attempt := uint(1) - expBackOff := backoff.WithMaxRetries(backoff.NewExponentialBackOff(), max) + expBackOff := backoff.NewExponentialBackOff( + backoff.WithMaxElapsedTime(maxElapsedTime), + ) notify := func(err error, duration time.Duration) { - if logger == nil { - return - } - fields := append(fields, zap.Uint("attempt", attempt), zap.Error(err), zap.Duration("backoff", duration)) - logger.Warn(msg, fields...) - attempt++ + logger.Warn("operation failed, retrying...") } err := backoff.RetryNotify(operation, expBackOff, notify) - if err != nil && logger != nil { - fields := append(fields, zap.Uint64("attempts", uint64(attempt)), zap.Error(err)) - logger.Error(msg, fields...) - } return err } - -// WithMaxRetries rens the operation until it succeeds or max retries has been reached. -// It uses exponential back off. -func WithMaxRetries(operation backoff.Operation, max uint64) error { - return WithMaxRetriesLog(operation, max, nil, "") -} diff --git a/utils/backoff_test.go b/utils/backoff_test.go index e6b9f038..9e2ce412 100644 --- a/utils/backoff_test.go +++ b/utils/backoff_test.go @@ -3,7 +3,9 @@ package utils import ( "errors" "testing" + "time" + "github.com/ava-labs/avalanchego/utils/logging" "github.com/stretchr/testify/require" ) @@ -15,7 +17,9 @@ func TestWithMaxRetries(t *testing.T) { _, err = retryable.Run() return err }, - 2, + // using default values: we want to run max 2 tries. + 624*time.Millisecond, + logging.NoLog{}, ) require.Error(t, err) }) @@ -27,7 +31,9 @@ func TestWithMaxRetries(t *testing.T) { res, err = retryable.Run() return err }, - 2, + // using default values we want to run 3 tries. + 2000*time.Millisecond, + logging.NoLog{}, ) require.NoError(t, err) require.True(t, res) @@ -47,7 +53,7 @@ func newMockRetryableFn(trigger uint64) mockRetryableFn { } func (m *mockRetryableFn) Run() (bool, error) { - if m.counter == m.trigger { + if m.counter >= m.trigger { return true, nil } m.counter++ diff --git a/vms/evm/subscriber.go b/vms/evm/subscriber.go index e0176edd..bf6b00bd 100644 --- a/vms/evm/subscriber.go +++ b/vms/evm/subscriber.go @@ -21,9 +21,8 @@ import ( const ( // Max buffer size for ethereum subscription channels maxClientSubscriptionBuffer = 20000 - subscribeRetryTimeout = 1 * time.Second MaxBlocksPerRequest = 200 - rpcMaxTries = 5 + retryMaxElapsedTime = 5 * time.Second ) // subscriber implements Subscriber @@ -129,18 +128,17 @@ func (s *subscriber) processBlockRange( func (s *subscriber) getHeaderByNumberRetryable(headerNumber *big.Int) (*types.Header, error) { var err error var header *types.Header - err = utils.WithMaxRetriesLog( - func() (err error) { - header, err = s.rpcClient.HeaderByNumber(context.Background(), headerNumber) - return err - }, - rpcMaxTries, - s.logger, - "Failed to get header by number", - zap.String("blockchainID", s.blockchainID.String()), - zap.Error(err), - ) + operation := func() (err error) { + header, err = s.rpcClient.HeaderByNumber(context.Background(), headerNumber) + return err + } + err = utils.WithMaxRetries(operation, retryMaxElapsedTime, s.logger) if err != nil { + s.logger.Error( + "Failed to get header by number", + zap.String("blockchainID", s.blockchainID.String()), + zap.Error(err), + ) return nil, err } return header, nil @@ -164,17 +162,16 @@ func (s *subscriber) Subscribe(maxResubscribeAttempts int) error { // subscribe until it succeeds or reached maxSubscribeAttempts. func (s *subscriber) subscribe(maxSubscribeAttempts uint64) error { var sub interfaces.Subscription - err := utils.WithMaxRetriesLog( - func() (err error) { - sub, err = s.wsClient.SubscribeNewHead(context.Background(), s.headers) - return err - }, - maxSubscribeAttempts, - s.logger, - "Failed to subscribe to node", - zap.String("blockchainID", s.blockchainID.String()), - ) + operation := func() (err error) { + sub, err = s.wsClient.SubscribeNewHead(context.Background(), s.headers) + return err + } + err := utils.WithMaxRetries(operation, retryMaxElapsedTime, s.logger) if err != nil { + s.logger.Error( + "Failed to subscribe to node", + zap.String("blockchainID", s.blockchainID.String()), + ) return err } s.sub = sub From b266ed677f3c7480ccc9327aed4841d5c8a0e516 Mon Sep 17 00:00:00 2001 From: Nathan Haim Date: Fri, 20 Dec 2024 17:28:24 +0100 Subject: [PATCH 4/7] backoff fix test --- signature-aggregator/aggregator/aggregator.go | 3 --- signature-aggregator/aggregator/aggregator_test.go | 12 +++++------- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/signature-aggregator/aggregator/aggregator.go b/signature-aggregator/aggregator/aggregator.go index 1d733147..077ca319 100644 --- a/signature-aggregator/aggregator/aggregator.go +++ b/signature-aggregator/aggregator/aggregator.go @@ -39,8 +39,6 @@ import ( type blsSignatureBuf [bls.SignatureLen]byte const ( - // Number of retries to collect signatures from validators - maxRelayerQueryAttempts = 10 // Maximum amount of time to spend waiting (in addition to network round trip time per attempt) // during relayer signature query routine signatureRequestMaxElapsedTime = 20 * time.Second @@ -342,7 +340,6 @@ func (s *SignatureAggregator) CreateSignedMessage( if err != nil { s.logger.Warn( "Failed to collect a threshold of signatures", - zap.Int("attempts", maxRelayerQueryAttempts), zap.String("warpMessageID", unsignedMessage.ID().String()), zap.Uint64("accumulatedWeight", accumulatedSignatureWeight.Uint64()), zap.String("sourceBlockchainID", unsignedMessage.SourceChainID.String()), diff --git a/signature-aggregator/aggregator/aggregator_test.go b/signature-aggregator/aggregator/aggregator_test.go index 35ac56de..f512bd01 100644 --- a/signature-aggregator/aggregator/aggregator_test.go +++ b/signature-aggregator/aggregator/aggregator_test.go @@ -217,9 +217,7 @@ func TestCreateSignedMessageRetriesAndFailsWithoutP2PResponses(t *testing.T) { appRequests := makeAppRequests(chainID, requestID, connectedValidators) for _, appRequest := range appRequests { - mockNetwork.EXPECT().RegisterAppRequest(appRequest).Times( - maxRelayerQueryAttempts, - ) + mockNetwork.EXPECT().RegisterAppRequest(appRequest).AnyTimes() } mockNetwork.EXPECT().RegisterRequestID( @@ -227,7 +225,7 @@ func TestCreateSignedMessageRetriesAndFailsWithoutP2PResponses(t *testing.T) { len(appRequests), ).Return( make(chan message.InboundMessage, len(appRequests)), - ).Times(maxRelayerQueryAttempts) + ).AnyTimes() var nodeIDs set.Set[ids.NodeID] for _, appRequest := range appRequests { @@ -238,7 +236,7 @@ func TestCreateSignedMessageRetriesAndFailsWithoutP2PResponses(t *testing.T) { nodeIDs, subnetID, subnets.NoOpAllower, - ).Times(maxRelayerQueryAttempts) + ).AnyTimes() _, err = aggregator.CreateSignedMessage(msg, nil, subnetID, 80) require.ErrorIs( @@ -260,7 +258,7 @@ func TestCreateSignedMessageSucceeds(t *testing.T) { require.NoError(t, err) // the signers: - var connectedValidators, validatorSecretKeys = makeConnectedValidators(5) + connectedValidators, validatorSecretKeys := makeConnectedValidators(5) // prime the aggregator: @@ -279,7 +277,7 @@ func TestCreateSignedMessageSucceeds(t *testing.T) { // prime the signers' responses: - var requestID = aggregator.currentRequestID.Load() + 1 + requestID := aggregator.currentRequestID.Load() + 1 appRequests := makeAppRequests(chainID, requestID, connectedValidators) for _, appRequest := range appRequests { From 89dc5b5b8157c345642e2c857bd64ccfd01aaab0 Mon Sep 17 00:00:00 2001 From: Nathan Haim Date: Mon, 23 Dec 2024 13:09:47 +0100 Subject: [PATCH 5/7] fix: nits --- messages/teleporter/message_handler.go | 13 ++++++++----- relayer/application_relayer.go | 2 +- relayer/listener.go | 9 +++++---- signature-aggregator/aggregator/aggregator.go | 2 +- types/types.go | 5 ++--- utils/backoff.go | 6 +++--- utils/backoff_test.go | 8 ++++---- utils/utils.go | 2 +- vms/evm/subscriber.go | 16 ++++++++-------- vms/subscriber.go | 3 ++- 10 files changed, 35 insertions(+), 31 deletions(-) diff --git a/messages/teleporter/message_handler.go b/messages/teleporter/message_handler.go index 01db70af..2918a79d 100644 --- a/messages/teleporter/message_handler.go +++ b/messages/teleporter/message_handler.go @@ -29,9 +29,12 @@ import ( "google.golang.org/grpc" ) -// The maximum gas limit that can be specified for a Teleporter message -// Based on the C-Chain 15_000_000 gas limit per block, with other Warp message gas overhead conservatively estimated. -const maxTeleporterGasLimit = 12_000_000 +const ( + // The maximum gas limit that can be specified for a Teleporter message + // Based on the C-Chain 15_000_000 gas limit per block, with other Warp message gas overhead conservatively estimated. + maxTeleporterGasLimit = 12_000_000 + defaultBlockAcceptanceTimeout = 30 * time.Second +) type factory struct { messageConfig Config @@ -362,14 +365,14 @@ func (m *messageHandler) waitForReceipt( teleporterMessageID ids.ID, ) error { destinationBlockchainID := destinationClient.DestinationBlockchainID() - callCtx, callCtxCancel := context.WithTimeout(context.Background(), 30*time.Second) + callCtx, callCtxCancel := context.WithTimeout(context.Background(), defaultBlockAcceptanceTimeout) defer callCtxCancel() var receipt *types.Receipt operation := func() (err error) { receipt, err = destinationClient.Client().(ethclient.Client).TransactionReceipt(callCtx, txHash) return err } - err := utils.WithMaxRetries(operation, 30*time.Second, m.logger) + err := utils.WithRetriesTimeout(m.logger, operation, defaultBlockAcceptanceTimeout) if err != nil { m.logger.Error( "Failed to get transaction receipt", diff --git a/relayer/application_relayer.go b/relayer/application_relayer.go index 4f236f49..8fdd00b8 100644 --- a/relayer/application_relayer.go +++ b/relayer/application_relayer.go @@ -280,7 +280,7 @@ func (r *ApplicationRelayer) createSignedMessage( r.signingSubnetID.String(), ) } - err = utils.WithMaxRetries(operation, retryMaxElapsedTime, r.logger) + err = utils.WithRetriesTimeout(r.logger, operation, retryMaxElapsedTime) if err != nil { r.logger.Error( "Failed to get aggregate signature from node endpoint.", diff --git a/relayer/listener.go b/relayer/listener.go index 2d973609..0b833b4b 100644 --- a/relayer/listener.go +++ b/relayer/listener.go @@ -8,6 +8,7 @@ import ( "fmt" "math/big" "math/rand" + "time" "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/utils/logging" @@ -20,10 +21,10 @@ import ( ) const ( - maxSubscribeAttempts = 10 + retryMaxSubscribeElapsedTime = 10 * time.Second // TODO attempt to resubscribe in perpetuity once we are able to process missed blocks and // refresh the chain config on reconnect. - maxResubscribeAttempts = 10 + retryMaxResubscribeElapsedTime = 10 * time.Second ) // Listener handles all messages sent from a given source chain @@ -137,7 +138,7 @@ func newListener( // Open the subscription. We must do this before processing any missed messages, otherwise we may // miss an incoming message in between fetching the latest block and subscribing. - err = lstnr.Subscriber.Subscribe(maxSubscribeAttempts) + err = lstnr.Subscriber.Subscribe(retryMaxSubscribeElapsedTime) if err != nil { logger.Error( "Failed to subscribe to node", @@ -228,7 +229,7 @@ func (lstnr *Listener) processLogs(ctx context.Context) error { // Sets the listener health status to false while attempting to reconnect. func (lstnr *Listener) reconnectToSubscriber() error { // Attempt to reconnect the subscription - err := lstnr.Subscriber.Subscribe(maxResubscribeAttempts) + err := lstnr.Subscriber.Subscribe(retryMaxResubscribeElapsedTime) if err != nil { return fmt.Errorf("failed to resubscribe to node: %w", err) } diff --git a/signature-aggregator/aggregator/aggregator.go b/signature-aggregator/aggregator/aggregator.go index 14b04400..6a368998 100644 --- a/signature-aggregator/aggregator/aggregator.go +++ b/signature-aggregator/aggregator/aggregator.go @@ -327,7 +327,7 @@ func (s *SignatureAggregator) CreateSignedMessage( return errNotEnoughSignatures } - err = utils.WithMaxRetries(operation, signatureRequestMaxElapsedTime, s.logger) + err = utils.WithRetriesTimeout(s.logger, operation, signatureRequestMaxElapsedTime) if err != nil { s.logger.Warn( "Failed to collect a threshold of signatures", diff --git a/types/types.go b/types/types.go index d4a07bed..fdc5be43 100644 --- a/types/types.go +++ b/types/types.go @@ -6,7 +6,6 @@ package types import ( "context" "errors" - "time" "github.com/ava-labs/avalanchego/utils/logging" avalancheWarp "github.com/ava-labs/avalanchego/vms/platformvm/warp" @@ -47,7 +46,7 @@ func NewWarpBlockInfo(logger logging.Logger, header *types.Header, ethClient eth ) // Check if the block contains warp logs, and fetch them from the client if it does if header.Bloom.Test(WarpPrecompileLogFilter[:]) { - cctx, cancel := context.WithTimeout(context.Background(), utils.DefaultRPCRetryTimeout) + cctx, cancel := context.WithTimeout(context.Background(), utils.DefaultRPCTimeout) defer cancel() operation := func() (err error) { logs, err = ethClient.FilterLogs(cctx, interfaces.FilterQuery{ @@ -58,7 +57,7 @@ func NewWarpBlockInfo(logger logging.Logger, header *types.Header, ethClient eth }) return err } - err = utils.WithMaxRetries(operation, 5*time.Second, logger) + err = utils.WithRetriesTimeout(logger, operation, utils.DefaultRPCTimeout) if err != nil { return nil, err } diff --git a/utils/backoff.go b/utils/backoff.go index 70879a30..7a40efc5 100644 --- a/utils/backoff.go +++ b/utils/backoff.go @@ -7,12 +7,12 @@ import ( "github.com/cenkalti/backoff/v4" ) -// WithMaxRetries uses an exponential backoff to run the operation until it +// WithRetriesTimeout uses an exponential backoff to run the operation until it // succeeds or max elapsed time has been reached. -func WithMaxRetries( +func WithRetriesTimeout( + logger logging.Logger, operation backoff.Operation, maxElapsedTime time.Duration, - logger logging.Logger, ) error { expBackOff := backoff.NewExponentialBackOff( backoff.WithMaxElapsedTime(maxElapsedTime), diff --git a/utils/backoff_test.go b/utils/backoff_test.go index 9e2ce412..110cfc01 100644 --- a/utils/backoff_test.go +++ b/utils/backoff_test.go @@ -12,28 +12,28 @@ import ( func TestWithMaxRetries(t *testing.T) { t.Run("NotEnoughRetry", func(t *testing.T) { retryable := newMockRetryableFn(3) - err := WithMaxRetries( + err := WithRetriesTimeout( + logging.NoLog{}, func() (err error) { _, err = retryable.Run() return err }, // using default values: we want to run max 2 tries. 624*time.Millisecond, - logging.NoLog{}, ) require.Error(t, err) }) t.Run("EnoughRetry", func(t *testing.T) { retryable := newMockRetryableFn(2) var res bool - err := WithMaxRetries( + err := WithRetriesTimeout( + logging.NoLog{}, func() (err error) { res, err = retryable.Run() return err }, // using default values we want to run 3 tries. 2000*time.Millisecond, - logging.NoLog{}, ) require.NoError(t, err) require.True(t, res) diff --git a/utils/utils.go b/utils/utils.go index e41f0888..afa49f26 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -28,7 +28,7 @@ var ( ) const ( - DefaultRPCRetryTimeout = 5 * time.Second + DefaultRPCTimeout = 5 * time.Second ) // diff --git a/vms/evm/subscriber.go b/vms/evm/subscriber.go index bf6b00bd..1be1acbc 100644 --- a/vms/evm/subscriber.go +++ b/vms/evm/subscriber.go @@ -5,7 +5,7 @@ package evm import ( "context" - "fmt" + "errors" "math/big" "time" @@ -22,7 +22,7 @@ const ( // Max buffer size for ethereum subscription channels maxClientSubscriptionBuffer = 20000 MaxBlocksPerRequest = 200 - retryMaxElapsedTime = 5 * time.Second + resubscribeMaxElapsedTime = 5 * time.Second ) // subscriber implements Subscriber @@ -132,7 +132,7 @@ func (s *subscriber) getHeaderByNumberRetryable(headerNumber *big.Int) (*types.H header, err = s.rpcClient.HeaderByNumber(context.Background(), headerNumber) return err } - err = utils.WithMaxRetries(operation, retryMaxElapsedTime, s.logger) + err = utils.WithRetriesTimeout(s.logger, operation, utils.DefaultRPCTimeout) if err != nil { s.logger.Error( "Failed to get header by number", @@ -145,28 +145,28 @@ func (s *subscriber) getHeaderByNumberRetryable(headerNumber *big.Int) (*types.H } // Loops forever iff maxResubscribeAttempts == 0 -func (s *subscriber) Subscribe(maxResubscribeAttempts int) error { +func (s *subscriber) Subscribe(retryMaxElapsedTime time.Duration) error { // Unsubscribe before resubscribing // s.sub should only be nil on the first call to Subscribe if s.sub != nil { s.sub.Unsubscribe() } - err := s.subscribe(uint64(maxResubscribeAttempts)) + err := s.subscribe(retryMaxElapsedTime) if err != nil { - return fmt.Errorf("failed to subscribe to node with all %d attempts", maxResubscribeAttempts) + return errors.New("failed to subscribe to node") } return nil } // subscribe until it succeeds or reached maxSubscribeAttempts. -func (s *subscriber) subscribe(maxSubscribeAttempts uint64) error { +func (s *subscriber) subscribe(retryMaxElapsedTime time.Duration) error { var sub interfaces.Subscription operation := func() (err error) { sub, err = s.wsClient.SubscribeNewHead(context.Background(), s.headers) return err } - err := utils.WithMaxRetries(operation, retryMaxElapsedTime, s.logger) + err := utils.WithRetriesTimeout(s.logger, operation, retryMaxElapsedTime) if err != nil { s.logger.Error( "Failed to subscribe to node", diff --git a/vms/subscriber.go b/vms/subscriber.go index 7a4db649..8b01266f 100644 --- a/vms/subscriber.go +++ b/vms/subscriber.go @@ -5,6 +5,7 @@ package vms import ( "math/big" + "time" "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/utils/logging" @@ -25,7 +26,7 @@ type Subscriber interface { // Subscribe registers a subscription. After Subscribe is called, // log events that match [filter] are written to the channel returned // by Logs - Subscribe(maxResubscribeAttempts int) error + Subscribe(retryMaxElapsedTime time.Duration) error // Headers returns the channel that the subscription writes block headers to Headers() <-chan *types.Header From e91355d4500824f4e88c4cbc91ff5ee3618157d3 Mon Sep 17 00:00:00 2001 From: Nathan Haim Date: Fri, 27 Dec 2024 13:46:55 +0100 Subject: [PATCH 6/7] rename maxElapsedTime consts has timeout --- relayer/application_relayer.go | 4 ++-- relayer/listener.go | 8 ++++---- signature-aggregator/aggregator/aggregator.go | 4 ++-- utils/backoff.go | 6 +++--- vms/evm/subscriber.go | 12 ++++++------ vms/subscriber.go | 2 +- 6 files changed, 18 insertions(+), 18 deletions(-) diff --git a/relayer/application_relayer.go b/relayer/application_relayer.go index 8fdd00b8..a45a0742 100644 --- a/relayer/application_relayer.go +++ b/relayer/application_relayer.go @@ -28,7 +28,7 @@ import ( ) const ( - retryMaxElapsedTime = 10 * time.Second + retryTimeout = 10 * time.Second ) // Errors @@ -280,7 +280,7 @@ func (r *ApplicationRelayer) createSignedMessage( r.signingSubnetID.String(), ) } - err = utils.WithRetriesTimeout(r.logger, operation, retryMaxElapsedTime) + err = utils.WithRetriesTimeout(r.logger, operation, retryTimeout) if err != nil { r.logger.Error( "Failed to get aggregate signature from node endpoint.", diff --git a/relayer/listener.go b/relayer/listener.go index 0b833b4b..bb17bd66 100644 --- a/relayer/listener.go +++ b/relayer/listener.go @@ -21,10 +21,10 @@ import ( ) const ( - retryMaxSubscribeElapsedTime = 10 * time.Second + retrySubscribeTimeout = 10 * time.Second // TODO attempt to resubscribe in perpetuity once we are able to process missed blocks and // refresh the chain config on reconnect. - retryMaxResubscribeElapsedTime = 10 * time.Second + retryResubscribeTimeout = 10 * time.Second ) // Listener handles all messages sent from a given source chain @@ -138,7 +138,7 @@ func newListener( // Open the subscription. We must do this before processing any missed messages, otherwise we may // miss an incoming message in between fetching the latest block and subscribing. - err = lstnr.Subscriber.Subscribe(retryMaxSubscribeElapsedTime) + err = lstnr.Subscriber.Subscribe(retrySubscribeTimeout) if err != nil { logger.Error( "Failed to subscribe to node", @@ -229,7 +229,7 @@ func (lstnr *Listener) processLogs(ctx context.Context) error { // Sets the listener health status to false while attempting to reconnect. func (lstnr *Listener) reconnectToSubscriber() error { // Attempt to reconnect the subscription - err := lstnr.Subscriber.Subscribe(retryMaxResubscribeElapsedTime) + err := lstnr.Subscriber.Subscribe(retryResubscribeTimeout) if err != nil { return fmt.Errorf("failed to resubscribe to node: %w", err) } diff --git a/signature-aggregator/aggregator/aggregator.go b/signature-aggregator/aggregator/aggregator.go index 6a368998..5249160e 100644 --- a/signature-aggregator/aggregator/aggregator.go +++ b/signature-aggregator/aggregator/aggregator.go @@ -38,7 +38,7 @@ type blsSignatureBuf [bls.SignatureLen]byte const ( // Maximum amount of time to spend waiting (in addition to network round trip time per attempt) // during relayer signature query routine - signatureRequestMaxElapsedTime = 20 * time.Second + signatureRequestTimeout = 20 * time.Second ) var ( @@ -327,7 +327,7 @@ func (s *SignatureAggregator) CreateSignedMessage( return errNotEnoughSignatures } - err = utils.WithRetriesTimeout(s.logger, operation, signatureRequestMaxElapsedTime) + err = utils.WithRetriesTimeout(s.logger, operation, signatureRequestTimeout) if err != nil { s.logger.Warn( "Failed to collect a threshold of signatures", diff --git a/utils/backoff.go b/utils/backoff.go index 7a40efc5..55bb5298 100644 --- a/utils/backoff.go +++ b/utils/backoff.go @@ -8,14 +8,14 @@ import ( ) // WithRetriesTimeout uses an exponential backoff to run the operation until it -// succeeds or max elapsed time has been reached. +// succeeds or timeout limit has been reached. func WithRetriesTimeout( logger logging.Logger, operation backoff.Operation, - maxElapsedTime time.Duration, + timeout time.Duration, ) error { expBackOff := backoff.NewExponentialBackOff( - backoff.WithMaxElapsedTime(maxElapsedTime), + backoff.WithMaxElapsedTime(timeout), ) notify := func(err error, duration time.Duration) { logger.Warn("operation failed, retrying...") diff --git a/vms/evm/subscriber.go b/vms/evm/subscriber.go index 1be1acbc..fa7289d8 100644 --- a/vms/evm/subscriber.go +++ b/vms/evm/subscriber.go @@ -22,7 +22,7 @@ const ( // Max buffer size for ethereum subscription channels maxClientSubscriptionBuffer = 20000 MaxBlocksPerRequest = 200 - resubscribeMaxElapsedTime = 5 * time.Second + resubscribeTimeout = 5 * time.Second ) // subscriber implements Subscriber @@ -145,28 +145,28 @@ func (s *subscriber) getHeaderByNumberRetryable(headerNumber *big.Int) (*types.H } // Loops forever iff maxResubscribeAttempts == 0 -func (s *subscriber) Subscribe(retryMaxElapsedTime time.Duration) error { +func (s *subscriber) Subscribe(retryTimeout time.Duration) error { // Unsubscribe before resubscribing // s.sub should only be nil on the first call to Subscribe if s.sub != nil { s.sub.Unsubscribe() } - err := s.subscribe(retryMaxElapsedTime) + err := s.subscribe(retryTimeout) if err != nil { return errors.New("failed to subscribe to node") } return nil } -// subscribe until it succeeds or reached maxSubscribeAttempts. -func (s *subscriber) subscribe(retryMaxElapsedTime time.Duration) error { +// subscribe until it succeeds or reached timeout. +func (s *subscriber) subscribe(retryTimeout time.Duration) error { var sub interfaces.Subscription operation := func() (err error) { sub, err = s.wsClient.SubscribeNewHead(context.Background(), s.headers) return err } - err := utils.WithRetriesTimeout(s.logger, operation, retryMaxElapsedTime) + err := utils.WithRetriesTimeout(s.logger, operation, retryTimeout) if err != nil { s.logger.Error( "Failed to subscribe to node", diff --git a/vms/subscriber.go b/vms/subscriber.go index 8b01266f..aa5615c6 100644 --- a/vms/subscriber.go +++ b/vms/subscriber.go @@ -26,7 +26,7 @@ type Subscriber interface { // Subscribe registers a subscription. After Subscribe is called, // log events that match [filter] are written to the channel returned // by Logs - Subscribe(retryMaxElapsedTime time.Duration) error + Subscribe(retryTimeout time.Duration) error // Headers returns the channel that the subscription writes block headers to Headers() <-chan *types.Header From 4108b2d2604f7c3ed8458d5ecb17d360d257e074 Mon Sep 17 00:00:00 2001 From: Nathan Haim Date: Thu, 2 Jan 2025 11:30:27 +0100 Subject: [PATCH 7/7] delete unused const --- vms/evm/subscriber.go | 1 - 1 file changed, 1 deletion(-) diff --git a/vms/evm/subscriber.go b/vms/evm/subscriber.go index fa7289d8..486dfec3 100644 --- a/vms/evm/subscriber.go +++ b/vms/evm/subscriber.go @@ -22,7 +22,6 @@ const ( // Max buffer size for ethereum subscription channels maxClientSubscriptionBuffer = 20000 MaxBlocksPerRequest = 200 - resubscribeTimeout = 5 * time.Second ) // subscriber implements Subscriber