diff --git a/core/services/ocr2/plugins/ccip/ccipexec/initializers.go b/core/services/ocr2/plugins/ccip/ccipexec/initializers.go index 390d215b9f..bed7b593c4 100644 --- a/core/services/ocr2/plugins/ccip/ccipexec/initializers.go +++ b/core/services/ocr2/plugins/ccip/ccipexec/initializers.go @@ -12,9 +12,10 @@ import ( "github.com/Masterminds/semver/v3" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/pkg/errors" + "go.uber.org/multierr" + chainselectors "github.com/smartcontractkit/chain-selectors" libocr2 "github.com/smartcontractkit/libocr/offchainreporting2plus" - "go.uber.org/multierr" commonlogger "github.com/smartcontractkit/chainlink-common/pkg/logger" @@ -158,8 +159,9 @@ func initTokenDataProviders(lggr logger.Logger, jobID string, pluginConfig ccipc lggr, usdcReader, attestationURI, - pluginConfig.USDCConfig.AttestationAPITimeoutSeconds, + int(pluginConfig.USDCConfig.AttestationAPITimeoutSeconds), pluginConfig.USDCConfig.SourceTokenAddress, + time.Duration(pluginConfig.USDCConfig.AttestationAPIIntervalMilliseconds)*time.Millisecond, ) } diff --git a/core/services/ocr2/plugins/ccip/config/config.go b/core/services/ocr2/plugins/ccip/config/config.go index 635c47ac2d..cfc637a6c3 100644 --- a/core/services/ocr2/plugins/ccip/config/config.go +++ b/core/services/ocr2/plugins/ccip/config/config.go @@ -101,15 +101,17 @@ type USDCConfig struct { SourceTokenAddress common.Address SourceMessageTransmitterAddress common.Address AttestationAPI string - AttestationAPITimeoutSeconds int + AttestationAPITimeoutSeconds uint + // AttestationAPIIntervalMilliseconds can be set to -1 to disable or 0 to use a default interval. + AttestationAPIIntervalMilliseconds int } func (uc *USDCConfig) ValidateUSDCConfig() error { if uc.AttestationAPI == "" { return errors.New("AttestationAPI is required") } - if uc.AttestationAPITimeoutSeconds < 0 { - return errors.New("AttestationAPITimeoutSeconds must be non-negative") + if uc.AttestationAPIIntervalMilliseconds < -1 { + return errors.New("AttestationAPIIntervalMilliseconds must be -1 to disable, 0 for default or greater to define the exact interval") } if uc.SourceTokenAddress == utils.ZeroAddress { return errors.New("SourceTokenAddress is required") diff --git a/core/services/ocr2/plugins/ccip/config/config_test.go b/core/services/ocr2/plugins/ccip/config/config_test.go index 94260906b6..659586b0ab 100644 --- a/core/services/ocr2/plugins/ccip/config/config_test.go +++ b/core/services/ocr2/plugins/ccip/config/config_test.go @@ -189,15 +189,6 @@ func TestUSDCValidate(t *testing.T) { }, err: "", }, - { - config: USDCConfig{ - AttestationAPI: "api", - SourceTokenAddress: utils.RandomAddress(), - SourceMessageTransmitterAddress: utils.RandomAddress(), - AttestationAPITimeoutSeconds: -1, - }, - err: "AttestationAPITimeoutSeconds must be non-negative", - }, } for _, tc := range testcases { diff --git a/core/services/ocr2/plugins/ccip/tokendata/observability/usdc_client_test.go b/core/services/ocr2/plugins/ccip/tokendata/observability/usdc_client_test.go index 37410c9677..da269d58f3 100644 --- a/core/services/ocr2/plugins/ccip/tokendata/observability/usdc_client_test.go +++ b/core/services/ocr2/plugins/ccip/tokendata/observability/usdc_client_test.go @@ -89,8 +89,8 @@ func testMonitoring(t *testing.T, name string, server *httptest.Server, requests // Service with monitored http client. usdcTokenAddr := utils.RandomAddress() observedHttpClient := http2.NewObservedIHttpClientWithMetric(&http2.HttpClient{}, histogram) - tokenDataReaderDefault := usdc.NewUSDCTokenDataReader(log, usdcReader, attestationURI, 0, usdcTokenAddr) - tokenDataReader := usdc.NewUSDCTokenDataReaderWithHttpClient(*tokenDataReaderDefault, observedHttpClient, usdcTokenAddr) + tokenDataReaderDefault := usdc.NewUSDCTokenDataReader(log, usdcReader, attestationURI, 0, usdcTokenAddr, usdc.APIIntervalRateLimitDisabled) + tokenDataReader := usdc.NewUSDCTokenDataReaderWithHttpClient(*tokenDataReaderDefault, observedHttpClient, usdcTokenAddr, usdc.APIIntervalRateLimitDisabled) require.NotNil(t, tokenDataReader) for i := 0; i < requests; i++ { diff --git a/core/services/ocr2/plugins/ccip/tokendata/usdc/usdc.go b/core/services/ocr2/plugins/ccip/tokendata/usdc/usdc.go index 424882ff26..e058a2a7fd 100644 --- a/core/services/ocr2/plugins/ccip/tokendata/usdc/usdc.go +++ b/core/services/ocr2/plugins/ccip/tokendata/usdc/usdc.go @@ -12,6 +12,7 @@ import ( "time" "github.com/pkg/errors" + "golang.org/x/time/rate" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" @@ -33,10 +34,19 @@ const ( // defaultCoolDownDurationSec defines the default time to wait after getting rate limited. // this value is only used if the 429 response does not contain the Retry-After header - defaultCoolDownDuration = 60 * time.Second + defaultCoolDownDuration = 5 * time.Minute // maxCoolDownDuration defines the maximum duration we can wait till firing the next request maxCoolDownDuration = 10 * time.Minute + + // defaultRequestInterval defines the rate in requests per second that the attestation API can be called. + // this is set according to the APIs documentated 10 requests per second rate limit. + defaultRequestInterval = 100 * time.Millisecond + + // APIIntervalRateLimitDisabled is a special value to disable the rate limiting. + APIIntervalRateLimitDisabled = -1 + // APIIntervalRateLimitDefault is a special value to select the default rate limit interval. + APIIntervalRateLimitDefault = 0 ) type attestationStatus string @@ -85,6 +95,7 @@ type TokenDataReader struct { attestationApi *url.URL attestationApiTimeout time.Duration usdcTokenAddress common.Address + rate *rate.Limiter // coolDownUntil defines whether requests are blocked or not. coolDownUntil time.Time @@ -105,11 +116,19 @@ func NewUSDCTokenDataReader( usdcAttestationApi *url.URL, usdcAttestationApiTimeoutSeconds int, usdcTokenAddress common.Address, + requestInterval time.Duration, ) *TokenDataReader { timeout := time.Duration(usdcAttestationApiTimeoutSeconds) * time.Second if usdcAttestationApiTimeoutSeconds == 0 { timeout = defaultAttestationTimeout } + + if requestInterval == APIIntervalRateLimitDisabled { + requestInterval = 0 + } else if requestInterval == APIIntervalRateLimitDefault { + requestInterval = defaultRequestInterval + } + return &TokenDataReader{ lggr: lggr, usdcReader: usdcReader, @@ -118,6 +137,7 @@ func NewUSDCTokenDataReader( attestationApiTimeout: timeout, usdcTokenAddress: usdcTokenAddress, coolDownMu: &sync.RWMutex{}, + rate: rate.NewLimiter(rate.Every(requestInterval), 1), } } @@ -125,6 +145,7 @@ func NewUSDCTokenDataReaderWithHttpClient( origin TokenDataReader, httpClient http.IHttpClient, usdcTokenAddress common.Address, + requestInterval time.Duration, ) *TokenDataReader { return &TokenDataReader{ lggr: origin.lggr, @@ -134,9 +155,14 @@ func NewUSDCTokenDataReaderWithHttpClient( attestationApiTimeout: origin.attestationApiTimeout, coolDownMu: origin.coolDownMu, usdcTokenAddress: usdcTokenAddress, + rate: rate.NewLimiter(rate.Every(requestInterval), 1), } } +// ReadTokenData queries the USDC attestation API to construct a message and +// attestation response. When called back to back, or multiple times +// concurrently, responses are delayed according how the request interval is +// configured. func (s *TokenDataReader) ReadTokenData(ctx context.Context, msg cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta, tokenIndex int) (messageAndAttestation []byte, err error) { if tokenIndex < 0 || tokenIndex >= len(msg.TokenAmounts) { return nil, fmt.Errorf("token index out of bounds") @@ -147,6 +173,14 @@ func (s *TokenDataReader) ReadTokenData(ctx context.Context, msg cciptypes.EVM2E return nil, tokendata.ErrRequestsBlocked } + if s.rate != nil { + // Wait blocks until it the attestation API can be called or the + // context is Done. + if waitErr := s.rate.Wait(ctx); err != nil { + return nil, fmt.Errorf("usdc rate limiting error: %w", waitErr) + } + } + messageBody, err := s.getUSDCMessageBody(ctx, msg, tokenIndex) if err != nil { return []byte{}, errors.Wrap(err, "failed getting the USDC message body") diff --git a/core/services/ocr2/plugins/ccip/tokendata/usdc/usdc_blackbox_test.go b/core/services/ocr2/plugins/ccip/tokendata/usdc/usdc_blackbox_test.go index dc506aeed4..95b309ff74 100644 --- a/core/services/ocr2/plugins/ccip/tokendata/usdc/usdc_blackbox_test.go +++ b/core/services/ocr2/plugins/ccip/tokendata/usdc/usdc_blackbox_test.go @@ -97,7 +97,7 @@ func TestUSDCReader_ReadTokenData(t *testing.T) { require.NoError(t, err) addr := utils.RandomAddress() - usdcService := usdc.NewUSDCTokenDataReader(lggr, &usdcReader, attestationURI, 0, addr) + usdcService := usdc.NewUSDCTokenDataReader(lggr, &usdcReader, attestationURI, 0, addr, usdc.APIIntervalRateLimitDisabled) msgAndAttestation, err := usdcService.ReadTokenData(context.Background(), cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{ EVM2EVMMessage: cciptypes.EVM2EVMMessage{ SequenceNumber: seqNum, diff --git a/core/services/ocr2/plugins/ccip/tokendata/usdc/usdc_test.go b/core/services/ocr2/plugins/ccip/tokendata/usdc/usdc_test.go index 1b859ca37c..f9f97b5abe 100644 --- a/core/services/ocr2/plugins/ccip/tokendata/usdc/usdc_test.go +++ b/core/services/ocr2/plugins/ccip/tokendata/usdc/usdc_test.go @@ -8,6 +8,8 @@ import ( "net/http" "net/http/httptest" "net/url" + "strings" + "sync" "testing" "time" @@ -38,7 +40,7 @@ func TestUSDCReader_callAttestationApi(t *testing.T) { require.NoError(t, err) lggr := logger.TestLogger(t) usdcReader, _ := ccipdata.NewUSDCReader(lggr, "job_123", mockMsgTransmitter, nil, false) - usdcService := NewUSDCTokenDataReader(lggr, usdcReader, attestationURI, 0, common.Address{}) + usdcService := NewUSDCTokenDataReader(lggr, usdcReader, attestationURI, 0, common.Address{}, APIIntervalRateLimitDisabled) attestation, err := usdcService.callAttestationApi(context.Background(), [32]byte(common.FromHex(usdcMessageHash))) require.NoError(t, err) @@ -61,7 +63,7 @@ func TestUSDCReader_callAttestationApiMock(t *testing.T) { lggr := logger.TestLogger(t) lp := mocks.NewLogPoller(t) usdcReader, _ := ccipdata.NewUSDCReader(lggr, "job_123", mockMsgTransmitter, lp, false) - usdcService := NewUSDCTokenDataReader(lggr, usdcReader, attestationURI, 0, common.Address{}) + usdcService := NewUSDCTokenDataReader(lggr, usdcReader, attestationURI, 0, common.Address{}, APIIntervalRateLimitDisabled) attestation, err := usdcService.callAttestationApi(context.Background(), utils.RandomBytes32()) require.NoError(t, err) @@ -196,7 +198,7 @@ func TestUSDCReader_callAttestationApiMockError(t *testing.T) { lggr := logger.TestLogger(t) lp := mocks.NewLogPoller(t) usdcReader, _ := ccipdata.NewUSDCReader(lggr, "job_123", mockMsgTransmitter, lp, false) - usdcService := NewUSDCTokenDataReader(lggr, usdcReader, attestationURI, test.customTimeoutSeconds, common.Address{}) + usdcService := NewUSDCTokenDataReader(lggr, usdcReader, attestationURI, test.customTimeoutSeconds, common.Address{}, APIIntervalRateLimitDisabled) lp.On("RegisterFilter", mock.Anything).Return(nil) require.NoError(t, usdcReader.RegisterFilters()) @@ -232,7 +234,7 @@ func TestGetUSDCMessageBody(t *testing.T) { usdcTokenAddr := utils.RandomAddress() lggr := logger.TestLogger(t) - usdcService := NewUSDCTokenDataReader(lggr, &usdcReader, nil, 0, usdcTokenAddr) + usdcService := NewUSDCTokenDataReader(lggr, &usdcReader, nil, 0, usdcTokenAddr, APIIntervalRateLimitDisabled) // Make the first call and assert the underlying function is called body, err := usdcService.getUSDCMessageBody(context.Background(), cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{ @@ -302,3 +304,122 @@ func TestTokenDataReader_getUsdcTokenEndOffset(t *testing.T) { }) } } + +func TestUSDCReader_rateLimiting(t *testing.T) { + testCases := []struct { + name string + requests uint64 + rateConfig time.Duration + testDuration time.Duration + timeout time.Duration + err string + }{ + { + name: "no rate limit when disabled", + requests: 10, + rateConfig: APIIntervalRateLimitDisabled, + testDuration: 1 * time.Millisecond, + }, + { + name: "yes rate limited with default config", + requests: 5, + rateConfig: APIIntervalRateLimitDefault, + testDuration: 4 * defaultRequestInterval, + }, + { + name: "yes rate limited with config", + requests: 10, + rateConfig: 50 * time.Millisecond, + testDuration: 9 * 50 * time.Millisecond, + }, + { + name: "timeout after first request", + requests: 5, + rateConfig: 100 * time.Millisecond, + testDuration: 1 * time.Millisecond, + timeout: 1 * time.Millisecond, + err: "usdc rate limiting error: rate: Wait(n=1) would exceed context deadline", + }, + { + name: "timeout after second request", + requests: 5, + rateConfig: 100 * time.Millisecond, + testDuration: 100 * time.Millisecond, + timeout: 150 * time.Millisecond, + err: "usdc rate limiting error: rate: Wait(n=1) would exceed context deadline", + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + response := attestationResponse{ + Status: attestationStatusSuccess, + Attestation: "720502893578a89a8a87982982ef781c18b193", + } + + ts := getMockUSDCEndpoint(t, response) + defer ts.Close() + attestationURI, err := url.ParseRequestURI(ts.URL) + require.NoError(t, err) + + lggr := logger.TestLogger(t) + lp := mocks.NewLogPoller(t) + usdcReader, _ := ccipdata.NewUSDCReader(lggr, "job_123", mockMsgTransmitter, lp, false) + usdcService := NewUSDCTokenDataReader(lggr, usdcReader, attestationURI, 0, utils.RandomAddress(), tc.rateConfig) + + ctx := context.Background() + if tc.timeout > 0 { + var cf context.CancelFunc + ctx, cf = context.WithTimeout(ctx, tc.timeout) + defer cf() + } + + trigger := make(chan struct{}) + errorChan := make(chan error, tc.requests) + wg := sync.WaitGroup{} + for i := 0; i < int(tc.requests); i++ { + wg.Add(1) + go func() { + defer wg.Done() + + <-trigger + _, err := usdcService.ReadTokenData(ctx, cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{ + EVM2EVMMessage: cciptypes.EVM2EVMMessage{ + TokenAmounts: []cciptypes.TokenAmount{{Token: ccipcalc.EvmAddrToGeneric(utils.ZeroAddress), Amount: nil}}, // trigger failure due to wrong address + }, + }, 0) + + errorChan <- err + }() + } + + // Start the test + start := time.Now() + close(trigger) + + // Wait for requests to complete + wg.Wait() + finish := time.Now() + close(errorChan) + + // Collect errors + errorFound := false + for err := range errorChan { + if tc.err != "" && !strings.Contains(err.Error(), tc.err) { + errorFound = true + } else if err != nil && !strings.Contains(err.Error(), "get usdc token 0 end offset") { + // Ignore that one error, it's expected because of how mocking is used. + // Anything else is unexpected. + require.Fail(t, "unexpected error", err) + } + } + if tc.err != "" { + assert.True(t, errorFound) + } + assert.WithinDuration(t, start.Add(tc.testDuration), finish, 50*time.Millisecond) + }) + } +}