Skip to content

Commit

Permalink
Merge pull request #593 from najeal/retry-exponential
Browse files Browse the repository at this point in the history
add exponential retry mechanism for rpc requests using utility function
  • Loading branch information
cam-schultz authored Jan 2, 2025
2 parents 4e00722 + eb5cab4 commit 7d1e095
Show file tree
Hide file tree
Showing 14 changed files with 201 additions and 156 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ require (
github.com/bits-and-blooms/bitset v1.10.0 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.3.2 // indirect
github.com/btcsuite/btcd/btcutil v1.1.3 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cockroachdb/errors v1.9.1 // indirect
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtE
github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/cp v0.1.0 h1:SE+dxFebS7Iik5LK0tsi1k9ZCxEaFX4AjQmoyA+1dJk=
github.com/cespare/cp v0.1.0/go.mod h1:SOGHArjBr4JWaSDEVpWpo/hNg6RoKrls6Oh40hiwW+s=
Expand Down
23 changes: 13 additions & 10 deletions messages/teleporter/message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@ import (
"google.golang.org/grpc"
)

// The maximum gas limit that can be specified for a Teleporter message
// Based on the C-Chain 15_000_000 gas limit per block, with other Warp message gas overhead conservatively estimated.
const maxTeleporterGasLimit = 12_000_000
const (
// The maximum gas limit that can be specified for a Teleporter message
// Based on the C-Chain 15_000_000 gas limit per block, with other Warp message gas overhead conservatively estimated.
maxTeleporterGasLimit = 12_000_000
defaultBlockAcceptanceTimeout = 30 * time.Second
)

type factory struct {
messageConfig Config
Expand Down Expand Up @@ -362,14 +365,14 @@ func (m *messageHandler) waitForReceipt(
teleporterMessageID ids.ID,
) error {
destinationBlockchainID := destinationClient.DestinationBlockchainID()
callCtx, callCtxCancel := context.WithTimeout(context.Background(), 30*time.Second)
callCtx, callCtxCancel := context.WithTimeout(context.Background(), defaultBlockAcceptanceTimeout)
defer callCtxCancel()
receipt, err := utils.CallWithRetry[*types.Receipt](
callCtx,
func() (*types.Receipt, error) {
return destinationClient.Client().(ethclient.Client).TransactionReceipt(callCtx, txHash)
},
)
var receipt *types.Receipt
operation := func() (err error) {
receipt, err = destinationClient.Client().(ethclient.Client).TransactionReceipt(callCtx, txHash)
return err
}
err := utils.WithRetriesTimeout(m.logger, operation, defaultBlockAcceptanceTimeout)
if err != nil {
m.logger.Error(
"Failed to get transaction receipt",
Expand Down
68 changes: 22 additions & 46 deletions relayer/application_relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,11 @@ import (
)

const (
// Number of retries to collect signatures from validators
maxRelayerQueryAttempts = 5
// Maximum amount of time to spend waiting (in addition to network round trip time per attempt)
// during relayer signature query routine
signatureRequestRetryWaitPeriodMs = 10_000
retryTimeout = 10 * time.Second
)

var (
// Errors
errFailedToGetAggSig = errors.New("failed to get aggregate signature from node endpoint")
)
// Errors
var errFailedToGetAggSig = errors.New("failed to get aggregate signature from node endpoint")

// CheckpointManager stores committed heights in the database
type CheckpointManager interface {
Expand Down Expand Up @@ -276,53 +270,35 @@ func (r *ApplicationRelayer) createSignedMessage(
signedWarpMessageBytes hexutil.Bytes
err error
)
for attempt := 1; attempt <= maxRelayerQueryAttempts; attempt++ {
r.logger.Debug(
"Relayer collecting signatures from peers.",
zap.Int("attempt", attempt),
zap.String("sourceBlockchainID", r.sourceBlockchain.GetBlockchainID().String()),
zap.String("destinationBlockchainID", r.relayerID.DestinationBlockchainID.String()),
zap.String("signingSubnetID", r.signingSubnetID.String()),
)

err = r.sourceWarpSignatureClient.CallContext(
operation := func() error {
return r.sourceWarpSignatureClient.CallContext(
context.Background(),
&signedWarpMessageBytes,
"warp_getMessageAggregateSignature",
unsignedMessage.ID(),
r.warpConfig.QuorumNumerator,
r.signingSubnetID.String(),
)
if err == nil {
warpMsg, err := avalancheWarp.ParseMessage(signedWarpMessageBytes)
if err != nil {
r.logger.Error(
"Failed to parse signed warp message",
zap.Error(err),
)
return nil, err
}
return warpMsg, err
}
r.logger.Info(
"Failed to get aggregate signature from node endpoint. Retrying.",
zap.Int("attempt", attempt),
}
err = utils.WithRetriesTimeout(r.logger, operation, retryTimeout)
if err != nil {
r.logger.Error(
"Failed to get aggregate signature from node endpoint.",
zap.String("sourceBlockchainID", r.sourceBlockchain.GetBlockchainID().String()),
zap.String("destinationBlockchainID", r.relayerID.DestinationBlockchainID.String()),
zap.String("signingSubnetID", r.signingSubnetID.String()),
)
return nil, errFailedToGetAggSig
}
warpMsg, err := avalancheWarp.ParseMessage(signedWarpMessageBytes)
if err != nil {
r.logger.Error(
"Failed to parse signed warp message",
zap.Error(err),
)
if attempt != maxRelayerQueryAttempts {
// Sleep such that all retries are uniformly spread across totalRelayerQueryPeriodMs
// TODO: We may want to consider an exponential back off rather than a uniform sleep period.
time.Sleep(time.Duration(signatureRequestRetryWaitPeriodMs/maxRelayerQueryAttempts) * time.Millisecond)
}
return nil, err
}
r.logger.Warn(
"Failed to get aggregate signature from node endpoint",
zap.Int("attempts", maxRelayerQueryAttempts),
zap.String("sourceBlockchainID", r.sourceBlockchain.GetBlockchainID().String()),
zap.String("destinationBlockchainID", r.relayerID.DestinationBlockchainID.String()),
zap.String("signingSubnetID", r.signingSubnetID.String()),
)
return nil, errFailedToGetAggSig
return warpMsg, nil
}

//
Expand Down
9 changes: 5 additions & 4 deletions relayer/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"math/big"
"math/rand"
"time"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/utils/logging"
Expand All @@ -20,10 +21,10 @@ import (
)

const (
maxSubscribeAttempts = 10
retrySubscribeTimeout = 10 * time.Second
// TODO attempt to resubscribe in perpetuity once we are able to process missed blocks and
// refresh the chain config on reconnect.
maxResubscribeAttempts = 10
retryResubscribeTimeout = 10 * time.Second
)

// Listener handles all messages sent from a given source chain
Expand Down Expand Up @@ -137,7 +138,7 @@ func newListener(

// Open the subscription. We must do this before processing any missed messages, otherwise we may
// miss an incoming message in between fetching the latest block and subscribing.
err = lstnr.Subscriber.Subscribe(maxSubscribeAttempts)
err = lstnr.Subscriber.Subscribe(retrySubscribeTimeout)
if err != nil {
logger.Error(
"Failed to subscribe to node",
Expand Down Expand Up @@ -228,7 +229,7 @@ func (lstnr *Listener) processLogs(ctx context.Context) error {
// Sets the listener health status to false while attempting to reconnect.
func (lstnr *Listener) reconnectToSubscriber() error {
// Attempt to reconnect the subscription
err := lstnr.Subscriber.Subscribe(maxResubscribeAttempts)
err := lstnr.Subscriber.Subscribe(retryResubscribeTimeout)
if err != nil {
return fmt.Errorf("failed to resubscribe to node: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion relayer/message_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (mc *MessageCoordinator) ProcessBlock(
zap.Stringer("blockchainID", blockchainID),
)
// Parse the logs in the block, and group by application relayer
block, err := relayerTypes.NewWarpBlockInfo(blockHeader, ethClient)
block, err := relayerTypes.NewWarpBlockInfo(mc.logger, blockHeader, ethClient)
if err != nil {
mc.logger.Error("Failed to create Warp block info", zap.Error(err))
errChan <- err
Expand Down
44 changes: 22 additions & 22 deletions signature-aggregator/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,17 @@ import (
"github.com/ava-labs/icm-services/signature-aggregator/aggregator/cache"
"github.com/ava-labs/icm-services/signature-aggregator/metrics"
"github.com/ava-labs/icm-services/utils"
"github.com/cenkalti/backoff/v4"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
)

type blsSignatureBuf [bls.SignatureLen]byte

const (
// Number of retries to collect signatures from validators
maxRelayerQueryAttempts = 10
// Maximum amount of time to spend waiting (in addition to network round trip time per attempt)
// during relayer signature query routine
signatureRequestRetryWaitPeriodMs = 20_000
signatureRequestTimeout = 20 * time.Second
)

var (
Expand Down Expand Up @@ -215,12 +214,12 @@ func (s *SignatureAggregator) CreateSignedMessage(
return nil, fmt.Errorf("%s: %w", msg, err)
}

var signedMsg *avalancheWarp.Message
// Query the validators with retries. On each retry, query one node per unique BLS pubkey
for attempt := 1; attempt <= maxRelayerQueryAttempts; attempt++ {
operation := func() error {
responsesExpected := len(connectedValidators.ValidatorSet) - len(signatureMap)
s.logger.Debug(
"Aggregator collecting signatures from peers.",
zap.Int("attempt", attempt),
zap.String("sourceBlockchainID", unsignedMessage.SourceChainID.String()),
zap.String("signingSubnetID", signingSubnet.String()),
zap.Int("validatorSetSize", len(connectedValidators.ValidatorSet)),
Expand Down Expand Up @@ -287,7 +286,8 @@ func (s *SignatureAggregator) CreateSignedMessage(
zap.String("warpMessageID", unsignedMessage.ID().String()),
zap.String("sourceBlockchainID", unsignedMessage.SourceChainID.String()),
)
signedMsg, relevant, err := s.handleResponse(
var relevant bool
signedMsg, relevant, err = s.handleResponse(
response,
sentTo,
requestID,
Expand All @@ -300,10 +300,10 @@ func (s *SignatureAggregator) CreateSignedMessage(
if err != nil {
// don't increase node failures metric here, because we did
// it in handleResponse
return nil, fmt.Errorf(
return backoff.Permanent(fmt.Errorf(
"failed to handle response: %w",
err,
)
))
}
if relevant {
responseCount++
Expand All @@ -316,28 +316,28 @@ func (s *SignatureAggregator) CreateSignedMessage(
zap.Uint64("signatureWeight", accumulatedSignatureWeight.Uint64()),
zap.String("sourceBlockchainID", unsignedMessage.SourceChainID.String()),
)
return signedMsg, nil
return nil
}
// Break once we've had successful or unsuccessful responses from each requested node
if responseCount == responsesExpected {
break
}
}
}
if attempt != maxRelayerQueryAttempts {
// Sleep such that all retries are uniformly spread across totalRelayerQueryPeriodMs
// TODO: We may want to consider an exponential back off rather than a uniform sleep period.
time.Sleep(time.Duration(signatureRequestRetryWaitPeriodMs/maxRelayerQueryAttempts) * time.Millisecond)
}
return errNotEnoughSignatures
}
s.logger.Warn(
"Failed to collect a threshold of signatures",
zap.Int("attempts", maxRelayerQueryAttempts),
zap.String("warpMessageID", unsignedMessage.ID().String()),
zap.Uint64("accumulatedWeight", accumulatedSignatureWeight.Uint64()),
zap.String("sourceBlockchainID", unsignedMessage.SourceChainID.String()),
)
return nil, errNotEnoughSignatures

err = utils.WithRetriesTimeout(s.logger, operation, signatureRequestTimeout)
if err != nil {
s.logger.Warn(
"Failed to collect a threshold of signatures",
zap.String("warpMessageID", unsignedMessage.ID().String()),
zap.Uint64("accumulatedWeight", accumulatedSignatureWeight.Uint64()),
zap.String("sourceBlockchainID", unsignedMessage.SourceChainID.String()),
)
return nil, errNotEnoughSignatures
}
return signedMsg, nil
}

func (s *SignatureAggregator) getSubnetID(blockchainID ids.ID) (ids.ID, error) {
Expand Down
12 changes: 5 additions & 7 deletions signature-aggregator/aggregator/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,17 +214,15 @@ func TestCreateSignedMessageRetriesAndFailsWithoutP2PResponses(t *testing.T) {

appRequests := makeAppRequests(chainID, requestID, connectedValidators)
for _, appRequest := range appRequests {
mockNetwork.EXPECT().RegisterAppRequest(appRequest).Times(
maxRelayerQueryAttempts,
)
mockNetwork.EXPECT().RegisterAppRequest(appRequest).AnyTimes()
}

mockNetwork.EXPECT().RegisterRequestID(
requestID,
len(appRequests),
).Return(
make(chan message.InboundMessage, len(appRequests)),
).Times(maxRelayerQueryAttempts)
).AnyTimes()

var nodeIDs set.Set[ids.NodeID]
for _, appRequest := range appRequests {
Expand All @@ -235,7 +233,7 @@ func TestCreateSignedMessageRetriesAndFailsWithoutP2PResponses(t *testing.T) {
nodeIDs,
subnetID,
subnets.NoOpAllower,
).Times(maxRelayerQueryAttempts)
).AnyTimes()

_, err = aggregator.CreateSignedMessage(msg, nil, subnetID, 80)
require.ErrorIs(
Expand All @@ -257,7 +255,7 @@ func TestCreateSignedMessageSucceeds(t *testing.T) {
require.NoError(t, err)

// the signers:
var connectedValidators, validatorSecretKeys = makeConnectedValidators(5)
connectedValidators, validatorSecretKeys := makeConnectedValidators(5)

// prime the aggregator:

Expand All @@ -276,7 +274,7 @@ func TestCreateSignedMessageSucceeds(t *testing.T) {

// prime the signers' responses:

var requestID = aggregator.currentRequestID.Load() + 1
requestID := aggregator.currentRequestID.Load() + 1

appRequests := makeAppRequests(chainID, requestID, connectedValidators)
for _, appRequest := range appRequests {
Expand Down
23 changes: 12 additions & 11 deletions types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"

"github.com/ava-labs/avalanchego/utils/logging"
avalancheWarp "github.com/ava-labs/avalanchego/vms/platformvm/warp"
"github.com/ava-labs/icm-services/utils"
"github.com/ava-labs/subnet-evm/core/types"
Expand Down Expand Up @@ -38,25 +39,25 @@ type WarpMessageInfo struct {
}

// Extract Warp logs from the block, if they exist
func NewWarpBlockInfo(header *types.Header, ethClient ethclient.Client) (*WarpBlockInfo, error) {
func NewWarpBlockInfo(logger logging.Logger, header *types.Header, ethClient ethclient.Client) (*WarpBlockInfo, error) {
var (
logs []types.Log
err error
)
// Check if the block contains warp logs, and fetch them from the client if it does
if header.Bloom.Test(WarpPrecompileLogFilter[:]) {
cctx, cancel := context.WithTimeout(context.Background(), utils.DefaultRPCRetryTimeout)
cctx, cancel := context.WithTimeout(context.Background(), utils.DefaultRPCTimeout)
defer cancel()
logs, err = utils.CallWithRetry[[]types.Log](
cctx,
func() ([]types.Log, error) {
return ethClient.FilterLogs(context.Background(), interfaces.FilterQuery{
Topics: [][]common.Hash{{WarpPrecompileLogFilter}},
Addresses: []common.Address{warp.ContractAddress},
FromBlock: header.Number,
ToBlock: header.Number,
})
operation := func() (err error) {
logs, err = ethClient.FilterLogs(cctx, interfaces.FilterQuery{
Topics: [][]common.Hash{{WarpPrecompileLogFilter}},
Addresses: []common.Address{warp.ContractAddress},
FromBlock: header.Number,
ToBlock: header.Number,
})
return err
}
err = utils.WithRetriesTimeout(logger, operation, utils.DefaultRPCTimeout)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 7d1e095

Please sign in to comment.