Skip to content

Commit

Permalink
Remove error on missing value in destTokenPricesUSD (#692)
Browse files Browse the repository at this point in the history
## Motivation
Self serve will not offer a full mapping of source to dest tokens. This
PR enables optimistic aggregate rate limiting, meaning it will only
calculate the rate limit value for tokens which are mapped. For all
lanes before 1.5, this means no change. For the 1.5+ lanes it means no
token will be included. This will be fixed in a future PR, before 1.5
will ever be deployed.
  • Loading branch information
RensR authored Apr 8, 2024
1 parent 264b952 commit 53940f2
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 36 deletions.
28 changes: 16 additions & 12 deletions core/services/ocr2/plugins/ccip/ccipexec/ocr2.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,17 @@ func (r *ExecutionReportingPlugin) getExecutableObservations(ctx context.Context
return nil, err
}

inflightAggregateValue, err := getInflightAggregateRateLimit(lggr, inflight, tokenExecData.destTokenPrices, tokenExecData.sourceToDestTokens)
if err != nil {
lggr.Errorw("Unexpected error computing inflight values", "err", err)
return []ccip.ObservedMessage{}, nil
}

batch := r.buildBatch(
ctx,
rootLggr,
rep,
inflight,
inflightAggregateValue,
tokenExecData.rateLimiterTokenBucket.Tokens,
tokenExecData.sourceTokenPrices,
tokenExecData.destTokenPrices,
Expand Down Expand Up @@ -256,18 +262,13 @@ func (r *ExecutionReportingPlugin) buildBatch(
ctx context.Context,
lggr logger.Logger,
report commitReportWithSendRequests,
inflight []InflightInternalExecutionReport,
inflightAggregateValue *big.Int,
aggregateTokenLimit *big.Int,
sourceTokenPricesUSD map[cciptypes.Address]*big.Int,
destTokenPricesUSD map[cciptypes.Address]*big.Int,
gasPrice *big.Int,
sourceToDestToken map[cciptypes.Address]cciptypes.Address,
) (executableMessages []ccip.ObservedMessage) {
inflightAggregateValue, err := inflightAggregates(inflight, destTokenPricesUSD, sourceToDestToken)
if err != nil {
lggr.Errorw("Unexpected error computing inflight values", "err", err)
return []ccip.ObservedMessage{}
}
// We assume that next observation will start after previous epoch transmission so nonces should be already updated onchain.
// Worst case scenario we will try to process the same message again, and it will be skipped but protocol would progress anyway.
// We don't use inflightCache here to avoid cases in which inflight cache keeps progressing but due to transmission failures
Expand Down Expand Up @@ -305,7 +306,7 @@ func (r *ExecutionReportingPlugin) buildBatch(
continue
}

msgValue, err := aggregateTokenValue(destTokenPricesUSD, sourceToDestToken, msg.TokenAmounts)
msgValue, err := aggregateTokenValue(lggr, destTokenPricesUSD, sourceToDestToken, msg.TokenAmounts)
if err != nil {
msgLggr.Errorw("Skipping message unable to compute aggregate value", "err", err)
continue
Expand Down Expand Up @@ -522,12 +523,14 @@ func (r *ExecutionReportingPlugin) getReportsWithSendRequests(
return reportsWithSendReqs, nil
}

func aggregateTokenValue(destTokenPricesUSD map[cciptypes.Address]*big.Int, sourceToDest map[cciptypes.Address]cciptypes.Address, tokensAndAmount []cciptypes.TokenAmount) (*big.Int, error) {
func aggregateTokenValue(lggr logger.Logger, destTokenPricesUSD map[cciptypes.Address]*big.Int, sourceToDest map[cciptypes.Address]cciptypes.Address, tokensAndAmount []cciptypes.TokenAmount) (*big.Int, error) {
sum := big.NewInt(0)
for i := 0; i < len(tokensAndAmount); i++ {
price, ok := destTokenPricesUSD[sourceToDest[tokensAndAmount[i].Token]]
if !ok {
return nil, errors.Errorf("do not have price for source token %v", tokensAndAmount[i].Token)
// If we don't have a price for the token, we will assume it's worth 0.
lggr.Infof("No price for token %s, assuming 0", tokensAndAmount[i].Token)
continue
}
sum.Add(sum, new(big.Int).Quo(new(big.Int).Mul(price, tokensAndAmount[i].Amount), big.NewInt(1e18)))
}
Expand Down Expand Up @@ -775,7 +778,8 @@ func (r *ExecutionReportingPlugin) Close() error {
return nil
}

func inflightAggregates(
func getInflightAggregateRateLimit(
lggr logger.Logger,
inflight []InflightInternalExecutionReport,
destTokenPrices map[cciptypes.Address]*big.Int,
sourceToDest map[cciptypes.Address]cciptypes.Address,
Expand All @@ -784,7 +788,7 @@ func inflightAggregates(

for _, rep := range inflight {
for _, message := range rep.messages {
msgValue, err := aggregateTokenValue(destTokenPrices, sourceToDest, message.TokenAmounts)
msgValue, err := aggregateTokenValue(lggr, destTokenPrices, sourceToDest, message.TokenAmounts)
if err != nil {
return nil, err
}
Expand Down
41 changes: 21 additions & 20 deletions core/services/ocr2/plugins/ccip/ccipexec/ocr2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ func TestExecutionReportingPlugin_buildBatch(t *testing.T) {
var tt = []struct {
name string
reqs []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta
inflight []InflightInternalExecutionReport
inflight *big.Int
tokenLimit, destGasPrice *big.Int
srcPrices, dstPrices map[cciptypes.Address]*big.Int
offRampNoncesBySender map[cciptypes.Address]uint64
Expand All @@ -517,7 +517,7 @@ func TestExecutionReportingPlugin_buildBatch(t *testing.T) {
{
name: "single message no tokens",
reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{msg1},
inflight: []InflightInternalExecutionReport{},
inflight: big.NewInt(0),
tokenLimit: big.NewInt(0),
destGasPrice: big.NewInt(10),
srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1)},
Expand All @@ -528,7 +528,7 @@ func TestExecutionReportingPlugin_buildBatch(t *testing.T) {
{
name: "executed non finalized messages should be skipped",
reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{msg2},
inflight: []InflightInternalExecutionReport{},
inflight: big.NewInt(0),
tokenLimit: big.NewInt(0),
destGasPrice: big.NewInt(10),
srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1)},
Expand All @@ -539,7 +539,7 @@ func TestExecutionReportingPlugin_buildBatch(t *testing.T) {
{
name: "finalized executed log",
reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{msg3},
inflight: []InflightInternalExecutionReport{},
inflight: big.NewInt(0),
tokenLimit: big.NewInt(0),
destGasPrice: big.NewInt(10),
srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1)},
Expand All @@ -550,7 +550,7 @@ func TestExecutionReportingPlugin_buildBatch(t *testing.T) {
{
name: "dst token price does not exist",
reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{msg2},
inflight: []InflightInternalExecutionReport{},
inflight: big.NewInt(0),
tokenLimit: big.NewInt(0),
destGasPrice: big.NewInt(10),
srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1)},
Expand All @@ -561,7 +561,7 @@ func TestExecutionReportingPlugin_buildBatch(t *testing.T) {
{
name: "src token price does not exist",
reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{msg2},
inflight: []InflightInternalExecutionReport{},
inflight: big.NewInt(0),
tokenLimit: big.NewInt(0),
destGasPrice: big.NewInt(10),
srcPrices: map[cciptypes.Address]*big.Int{},
Expand All @@ -572,7 +572,7 @@ func TestExecutionReportingPlugin_buildBatch(t *testing.T) {
{
name: "message with tokens is not executed if limit is reached",
reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{msg4},
inflight: []InflightInternalExecutionReport{},
inflight: big.NewInt(0),
tokenLimit: big.NewInt(2),
destGasPrice: big.NewInt(10),
srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1e18)},
Expand All @@ -584,14 +584,9 @@ func TestExecutionReportingPlugin_buildBatch(t *testing.T) {
expectedSeqNrs: nil,
},
{
name: "message with tokens is not executed if limit is reached when inflight is full",
reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{msg5},
inflight: []InflightInternalExecutionReport{
{
createdAt: time.Now(),
messages: []cciptypes.EVM2EVMMessage{msg4.EVM2EVMMessage},
},
},
name: "message with tokens is not executed if limit is reached when inflight is full",
reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{msg5},
inflight: new(big.Int).Mul(big.NewInt(1e18), big.NewInt(100)),
tokenLimit: big.NewInt(19),
destGasPrice: big.NewInt(10),
srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1e18)},
Expand Down Expand Up @@ -645,7 +640,7 @@ func TestExecutionReportingPlugin_buildBatch(t *testing.T) {
BlockTimestamp: time.Date(2010, 1, 1, 12, 12, 12, 0, time.UTC),
},
},
inflight: []InflightInternalExecutionReport{},
inflight: big.NewInt(0),
tokenLimit: big.NewInt(0),
destGasPrice: big.NewInt(10),
srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1)},
Expand Down Expand Up @@ -1141,6 +1136,7 @@ func Test_inflightAggregates(t *testing.T) {
addrs[i] = cciptypes.Address(utils.RandomAddress().String())
tokenAddrs[i] = cciptypes.Address(utils.RandomAddress().String())
}
lggr := logger.TestLogger(t)

testCases := []struct {
name string
Expand Down Expand Up @@ -1201,7 +1197,7 @@ func Test_inflightAggregates(t *testing.T) {
expErr: false,
},
{
name: "missing price",
name: "missing price should be 0",
inflight: []InflightInternalExecutionReport{
{
messages: []cciptypes.EVM2EVMMessage{
Expand All @@ -1222,7 +1218,8 @@ func Test_inflightAggregates(t *testing.T) {
sourceToDest: map[cciptypes.Address]cciptypes.Address{
tokenAddrs[2]: tokenAddrs[3],
},
expErr: true,
expInflightAggrVal: big.NewInt(0),
expErr: false,
},
{
name: "nothing inflight",
Expand All @@ -1237,8 +1234,12 @@ func Test_inflightAggregates(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
inflightAggrVal, err := inflightAggregates(
tc.inflight, tc.destTokenPrices, tc.sourceToDest)
inflightAggrVal, err := getInflightAggregateRateLimit(
lggr,
tc.inflight,
tc.destTokenPrices,
tc.sourceToDest,
)

if tc.expErr {
assert.Error(t, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,10 +254,6 @@ func (o *OffRamp) CurrentRateLimiterState(ctx context.Context) (cciptypes.TokenB
}, nil
}

func (o *OffRamp) GetDestinationToken(ctx context.Context, address common.Address) (common.Address, error) {
return o.offRampV100.GetDestinationToken(&bind.CallOpts{Context: ctx}, address)
}

func (o *OffRamp) getDestinationTokensFromSourceTokens(ctx context.Context, tokenAddresses []cciptypes.Address) ([]cciptypes.Address, error) {
destTokens := make([]cciptypes.Address, len(tokenAddresses))
found := make(map[cciptypes.Address]bool)
Expand Down

0 comments on commit 53940f2

Please sign in to comment.