Skip to content

Commit

Permalink
usdc: Add self rate limiting to the usdc attestation API. (#666)
Browse files Browse the repository at this point in the history
## Motivation

Circle's USDC attestation API has a 5 minute IP ban if rate limits are
exceeded. In order to avoid the 5 minute ban we should proactively limit
our requests to make sure this doesn't happen.

## Solution

Use `golang.org/x/time/rate` to add rate limit calls to the USDC
TokenDataReader.

This PR makes the following changes to behavior:
* If a 429 occurs and there is no Retry-After header, a 5 minute
cooldown is triggered. This is to match the circle documentation.
* If `usdc.ReadTokenData` is called back to back or concurrently, the
first call will execute and the rest will delay according to an
interval. The interval between calls is configurable.

A new Json Spec parameter is introduced named
`USDCConfig.AttestationAPIIntervalMilliseconds`
  • Loading branch information
winder authored Apr 5, 2024
2 parents 8db474f + 0dd85a3 commit 2aa84ec
Show file tree
Hide file tree
Showing 7 changed files with 172 additions and 22 deletions.
6 changes: 4 additions & 2 deletions core/services/ocr2/plugins/ccip/ccipexec/initializers.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ import (
"github.com/Masterminds/semver/v3"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/pkg/errors"
"go.uber.org/multierr"

chainselectors "github.com/smartcontractkit/chain-selectors"
libocr2 "github.com/smartcontractkit/libocr/offchainreporting2plus"
"go.uber.org/multierr"

commonlogger "github.com/smartcontractkit/chainlink-common/pkg/logger"

Expand Down Expand Up @@ -158,8 +159,9 @@ func initTokenDataProviders(lggr logger.Logger, jobID string, pluginConfig ccipc
lggr,
usdcReader,
attestationURI,
pluginConfig.USDCConfig.AttestationAPITimeoutSeconds,
int(pluginConfig.USDCConfig.AttestationAPITimeoutSeconds),
pluginConfig.USDCConfig.SourceTokenAddress,
time.Duration(pluginConfig.USDCConfig.AttestationAPIIntervalMilliseconds)*time.Millisecond,
)
}

Expand Down
8 changes: 5 additions & 3 deletions core/services/ocr2/plugins/ccip/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,17 @@ type USDCConfig struct {
SourceTokenAddress common.Address
SourceMessageTransmitterAddress common.Address
AttestationAPI string
AttestationAPITimeoutSeconds int
AttestationAPITimeoutSeconds uint
// AttestationAPIIntervalMilliseconds can be set to -1 to disable or 0 to use a default interval.
AttestationAPIIntervalMilliseconds int
}

func (uc *USDCConfig) ValidateUSDCConfig() error {
if uc.AttestationAPI == "" {
return errors.New("AttestationAPI is required")
}
if uc.AttestationAPITimeoutSeconds < 0 {
return errors.New("AttestationAPITimeoutSeconds must be non-negative")
if uc.AttestationAPIIntervalMilliseconds < -1 {
return errors.New("AttestationAPIIntervalMilliseconds must be -1 to disable, 0 for default or greater to define the exact interval")
}
if uc.SourceTokenAddress == utils.ZeroAddress {
return errors.New("SourceTokenAddress is required")
Expand Down
9 changes: 0 additions & 9 deletions core/services/ocr2/plugins/ccip/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,15 +189,6 @@ func TestUSDCValidate(t *testing.T) {
},
err: "",
},
{
config: USDCConfig{
AttestationAPI: "api",
SourceTokenAddress: utils.RandomAddress(),
SourceMessageTransmitterAddress: utils.RandomAddress(),
AttestationAPITimeoutSeconds: -1,
},
err: "AttestationAPITimeoutSeconds must be non-negative",
},
}

for _, tc := range testcases {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ func testMonitoring(t *testing.T, name string, server *httptest.Server, requests
// Service with monitored http client.
usdcTokenAddr := utils.RandomAddress()
observedHttpClient := http2.NewObservedIHttpClientWithMetric(&http2.HttpClient{}, histogram)
tokenDataReaderDefault := usdc.NewUSDCTokenDataReader(log, usdcReader, attestationURI, 0, usdcTokenAddr)
tokenDataReader := usdc.NewUSDCTokenDataReaderWithHttpClient(*tokenDataReaderDefault, observedHttpClient, usdcTokenAddr)
tokenDataReaderDefault := usdc.NewUSDCTokenDataReader(log, usdcReader, attestationURI, 0, usdcTokenAddr, usdc.APIIntervalRateLimitDisabled)
tokenDataReader := usdc.NewUSDCTokenDataReaderWithHttpClient(*tokenDataReaderDefault, observedHttpClient, usdcTokenAddr, usdc.APIIntervalRateLimitDisabled)
require.NotNil(t, tokenDataReader)

for i := 0; i < requests; i++ {
Expand Down
36 changes: 35 additions & 1 deletion core/services/ocr2/plugins/ccip/tokendata/usdc/usdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"github.com/pkg/errors"
"golang.org/x/time/rate"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
Expand All @@ -33,10 +34,19 @@ const (

// defaultCoolDownDurationSec defines the default time to wait after getting rate limited.
// this value is only used if the 429 response does not contain the Retry-After header
defaultCoolDownDuration = 60 * time.Second
defaultCoolDownDuration = 5 * time.Minute

// maxCoolDownDuration defines the maximum duration we can wait till firing the next request
maxCoolDownDuration = 10 * time.Minute

// defaultRequestInterval defines the rate in requests per second that the attestation API can be called.
// this is set according to the APIs documentated 10 requests per second rate limit.
defaultRequestInterval = 100 * time.Millisecond

// APIIntervalRateLimitDisabled is a special value to disable the rate limiting.
APIIntervalRateLimitDisabled = -1
// APIIntervalRateLimitDefault is a special value to select the default rate limit interval.
APIIntervalRateLimitDefault = 0
)

type attestationStatus string
Expand Down Expand Up @@ -85,6 +95,7 @@ type TokenDataReader struct {
attestationApi *url.URL
attestationApiTimeout time.Duration
usdcTokenAddress common.Address
rate *rate.Limiter

// coolDownUntil defines whether requests are blocked or not.
coolDownUntil time.Time
Expand All @@ -105,11 +116,19 @@ func NewUSDCTokenDataReader(
usdcAttestationApi *url.URL,
usdcAttestationApiTimeoutSeconds int,
usdcTokenAddress common.Address,
requestInterval time.Duration,
) *TokenDataReader {
timeout := time.Duration(usdcAttestationApiTimeoutSeconds) * time.Second
if usdcAttestationApiTimeoutSeconds == 0 {
timeout = defaultAttestationTimeout
}

if requestInterval == APIIntervalRateLimitDisabled {
requestInterval = 0
} else if requestInterval == APIIntervalRateLimitDefault {
requestInterval = defaultRequestInterval
}

return &TokenDataReader{
lggr: lggr,
usdcReader: usdcReader,
Expand All @@ -118,13 +137,15 @@ func NewUSDCTokenDataReader(
attestationApiTimeout: timeout,
usdcTokenAddress: usdcTokenAddress,
coolDownMu: &sync.RWMutex{},
rate: rate.NewLimiter(rate.Every(requestInterval), 1),
}
}

func NewUSDCTokenDataReaderWithHttpClient(
origin TokenDataReader,
httpClient http.IHttpClient,
usdcTokenAddress common.Address,
requestInterval time.Duration,
) *TokenDataReader {
return &TokenDataReader{
lggr: origin.lggr,
Expand All @@ -134,9 +155,14 @@ func NewUSDCTokenDataReaderWithHttpClient(
attestationApiTimeout: origin.attestationApiTimeout,
coolDownMu: origin.coolDownMu,
usdcTokenAddress: usdcTokenAddress,
rate: rate.NewLimiter(rate.Every(requestInterval), 1),
}
}

// ReadTokenData queries the USDC attestation API to construct a message and
// attestation response. When called back to back, or multiple times
// concurrently, responses are delayed according how the request interval is
// configured.
func (s *TokenDataReader) ReadTokenData(ctx context.Context, msg cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta, tokenIndex int) (messageAndAttestation []byte, err error) {
if tokenIndex < 0 || tokenIndex >= len(msg.TokenAmounts) {
return nil, fmt.Errorf("token index out of bounds")
Expand All @@ -147,6 +173,14 @@ func (s *TokenDataReader) ReadTokenData(ctx context.Context, msg cciptypes.EVM2E
return nil, tokendata.ErrRequestsBlocked
}

if s.rate != nil {
// Wait blocks until it the attestation API can be called or the
// context is Done.
if waitErr := s.rate.Wait(ctx); err != nil {
return nil, fmt.Errorf("usdc rate limiting error: %w", waitErr)
}
}

messageBody, err := s.getUSDCMessageBody(ctx, msg, tokenIndex)
if err != nil {
return []byte{}, errors.Wrap(err, "failed getting the USDC message body")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func TestUSDCReader_ReadTokenData(t *testing.T) {
require.NoError(t, err)

addr := utils.RandomAddress()
usdcService := usdc.NewUSDCTokenDataReader(lggr, &usdcReader, attestationURI, 0, addr)
usdcService := usdc.NewUSDCTokenDataReader(lggr, &usdcReader, attestationURI, 0, addr, usdc.APIIntervalRateLimitDisabled)
msgAndAttestation, err := usdcService.ReadTokenData(context.Background(), cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{
EVM2EVMMessage: cciptypes.EVM2EVMMessage{
SequenceNumber: seqNum,
Expand Down
129 changes: 125 additions & 4 deletions core/services/ocr2/plugins/ccip/tokendata/usdc/usdc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"net/http"
"net/http/httptest"
"net/url"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -38,7 +40,7 @@ func TestUSDCReader_callAttestationApi(t *testing.T) {
require.NoError(t, err)
lggr := logger.TestLogger(t)
usdcReader, _ := ccipdata.NewUSDCReader(lggr, "job_123", mockMsgTransmitter, nil, false)
usdcService := NewUSDCTokenDataReader(lggr, usdcReader, attestationURI, 0, common.Address{})
usdcService := NewUSDCTokenDataReader(lggr, usdcReader, attestationURI, 0, common.Address{}, APIIntervalRateLimitDisabled)

attestation, err := usdcService.callAttestationApi(context.Background(), [32]byte(common.FromHex(usdcMessageHash)))
require.NoError(t, err)
Expand All @@ -61,7 +63,7 @@ func TestUSDCReader_callAttestationApiMock(t *testing.T) {
lggr := logger.TestLogger(t)
lp := mocks.NewLogPoller(t)
usdcReader, _ := ccipdata.NewUSDCReader(lggr, "job_123", mockMsgTransmitter, lp, false)
usdcService := NewUSDCTokenDataReader(lggr, usdcReader, attestationURI, 0, common.Address{})
usdcService := NewUSDCTokenDataReader(lggr, usdcReader, attestationURI, 0, common.Address{}, APIIntervalRateLimitDisabled)
attestation, err := usdcService.callAttestationApi(context.Background(), utils.RandomBytes32())
require.NoError(t, err)

Expand Down Expand Up @@ -196,7 +198,7 @@ func TestUSDCReader_callAttestationApiMockError(t *testing.T) {
lggr := logger.TestLogger(t)
lp := mocks.NewLogPoller(t)
usdcReader, _ := ccipdata.NewUSDCReader(lggr, "job_123", mockMsgTransmitter, lp, false)
usdcService := NewUSDCTokenDataReader(lggr, usdcReader, attestationURI, test.customTimeoutSeconds, common.Address{})
usdcService := NewUSDCTokenDataReader(lggr, usdcReader, attestationURI, test.customTimeoutSeconds, common.Address{}, APIIntervalRateLimitDisabled)
lp.On("RegisterFilter", mock.Anything).Return(nil)
require.NoError(t, usdcReader.RegisterFilters())

Expand Down Expand Up @@ -232,7 +234,7 @@ func TestGetUSDCMessageBody(t *testing.T) {

usdcTokenAddr := utils.RandomAddress()
lggr := logger.TestLogger(t)
usdcService := NewUSDCTokenDataReader(lggr, &usdcReader, nil, 0, usdcTokenAddr)
usdcService := NewUSDCTokenDataReader(lggr, &usdcReader, nil, 0, usdcTokenAddr, APIIntervalRateLimitDisabled)

// Make the first call and assert the underlying function is called
body, err := usdcService.getUSDCMessageBody(context.Background(), cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{
Expand Down Expand Up @@ -302,3 +304,122 @@ func TestTokenDataReader_getUsdcTokenEndOffset(t *testing.T) {
})
}
}

func TestUSDCReader_rateLimiting(t *testing.T) {
testCases := []struct {
name string
requests uint64
rateConfig time.Duration
testDuration time.Duration
timeout time.Duration
err string
}{
{
name: "no rate limit when disabled",
requests: 10,
rateConfig: APIIntervalRateLimitDisabled,
testDuration: 1 * time.Millisecond,
},
{
name: "yes rate limited with default config",
requests: 5,
rateConfig: APIIntervalRateLimitDefault,
testDuration: 4 * defaultRequestInterval,
},
{
name: "yes rate limited with config",
requests: 10,
rateConfig: 50 * time.Millisecond,
testDuration: 9 * 50 * time.Millisecond,
},
{
name: "timeout after first request",
requests: 5,
rateConfig: 100 * time.Millisecond,
testDuration: 1 * time.Millisecond,
timeout: 1 * time.Millisecond,
err: "usdc rate limiting error: rate: Wait(n=1) would exceed context deadline",
},
{
name: "timeout after second request",
requests: 5,
rateConfig: 100 * time.Millisecond,
testDuration: 100 * time.Millisecond,
timeout: 150 * time.Millisecond,
err: "usdc rate limiting error: rate: Wait(n=1) would exceed context deadline",
},
}

for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

response := attestationResponse{
Status: attestationStatusSuccess,
Attestation: "720502893578a89a8a87982982ef781c18b193",
}

ts := getMockUSDCEndpoint(t, response)
defer ts.Close()
attestationURI, err := url.ParseRequestURI(ts.URL)
require.NoError(t, err)

lggr := logger.TestLogger(t)
lp := mocks.NewLogPoller(t)
usdcReader, _ := ccipdata.NewUSDCReader(lggr, "job_123", mockMsgTransmitter, lp, false)
usdcService := NewUSDCTokenDataReader(lggr, usdcReader, attestationURI, 0, utils.RandomAddress(), tc.rateConfig)

ctx := context.Background()
if tc.timeout > 0 {
var cf context.CancelFunc
ctx, cf = context.WithTimeout(ctx, tc.timeout)
defer cf()
}

trigger := make(chan struct{})
errorChan := make(chan error, tc.requests)
wg := sync.WaitGroup{}
for i := 0; i < int(tc.requests); i++ {
wg.Add(1)
go func() {
defer wg.Done()

<-trigger
_, err := usdcService.ReadTokenData(ctx, cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{
EVM2EVMMessage: cciptypes.EVM2EVMMessage{
TokenAmounts: []cciptypes.TokenAmount{{Token: ccipcalc.EvmAddrToGeneric(utils.ZeroAddress), Amount: nil}}, // trigger failure due to wrong address
},
}, 0)

errorChan <- err
}()
}

// Start the test
start := time.Now()
close(trigger)

// Wait for requests to complete
wg.Wait()
finish := time.Now()
close(errorChan)

// Collect errors
errorFound := false
for err := range errorChan {
if tc.err != "" && !strings.Contains(err.Error(), tc.err) {
errorFound = true
} else if err != nil && !strings.Contains(err.Error(), "get usdc token 0 end offset") {
// Ignore that one error, it's expected because of how mocking is used.
// Anything else is unexpected.
require.Fail(t, "unexpected error", err)
}
}
if tc.err != "" {
assert.True(t, errorFound)
}
assert.WithinDuration(t, start.Add(tc.testDuration), finish, 50*time.Millisecond)
})
}
}

0 comments on commit 2aa84ec

Please sign in to comment.