diff --git a/core/services/ocr2/plugins/ccip/ccipexec/ocr2.go b/core/services/ocr2/plugins/ccip/ccipexec/ocr2.go index a941218ee2..e3deac74f4 100644 --- a/core/services/ocr2/plugins/ccip/ccipexec/ocr2.go +++ b/core/services/ocr2/plugins/ccip/ccipexec/ocr2.go @@ -210,11 +210,17 @@ func (r *ExecutionReportingPlugin) getExecutableObservations(ctx context.Context return nil, err } + inflightAggregateValue, err := getInflightAggregateRateLimit(lggr, inflight, tokenExecData.destTokenPrices, tokenExecData.sourceToDestTokens) + if err != nil { + lggr.Errorw("Unexpected error computing inflight values", "err", err) + return []ccip.ObservedMessage{}, nil + } + batch := r.buildBatch( ctx, rootLggr, rep, - inflight, + inflightAggregateValue, tokenExecData.rateLimiterTokenBucket.Tokens, tokenExecData.sourceTokenPrices, tokenExecData.destTokenPrices, @@ -256,18 +262,13 @@ func (r *ExecutionReportingPlugin) buildBatch( ctx context.Context, lggr logger.Logger, report commitReportWithSendRequests, - inflight []InflightInternalExecutionReport, + inflightAggregateValue *big.Int, aggregateTokenLimit *big.Int, sourceTokenPricesUSD map[cciptypes.Address]*big.Int, destTokenPricesUSD map[cciptypes.Address]*big.Int, gasPrice *big.Int, sourceToDestToken map[cciptypes.Address]cciptypes.Address, ) (executableMessages []ccip.ObservedMessage) { - inflightAggregateValue, err := inflightAggregates(inflight, destTokenPricesUSD, sourceToDestToken) - if err != nil { - lggr.Errorw("Unexpected error computing inflight values", "err", err) - return []ccip.ObservedMessage{} - } // We assume that next observation will start after previous epoch transmission so nonces should be already updated onchain. // Worst case scenario we will try to process the same message again, and it will be skipped but protocol would progress anyway. // We don't use inflightCache here to avoid cases in which inflight cache keeps progressing but due to transmission failures @@ -305,7 +306,7 @@ func (r *ExecutionReportingPlugin) buildBatch( continue } - msgValue, err := aggregateTokenValue(destTokenPricesUSD, sourceToDestToken, msg.TokenAmounts) + msgValue, err := aggregateTokenValue(lggr, destTokenPricesUSD, sourceToDestToken, msg.TokenAmounts) if err != nil { msgLggr.Errorw("Skipping message unable to compute aggregate value", "err", err) continue @@ -522,12 +523,14 @@ func (r *ExecutionReportingPlugin) getReportsWithSendRequests( return reportsWithSendReqs, nil } -func aggregateTokenValue(destTokenPricesUSD map[cciptypes.Address]*big.Int, sourceToDest map[cciptypes.Address]cciptypes.Address, tokensAndAmount []cciptypes.TokenAmount) (*big.Int, error) { +func aggregateTokenValue(lggr logger.Logger, destTokenPricesUSD map[cciptypes.Address]*big.Int, sourceToDest map[cciptypes.Address]cciptypes.Address, tokensAndAmount []cciptypes.TokenAmount) (*big.Int, error) { sum := big.NewInt(0) for i := 0; i < len(tokensAndAmount); i++ { price, ok := destTokenPricesUSD[sourceToDest[tokensAndAmount[i].Token]] if !ok { - return nil, errors.Errorf("do not have price for source token %v", tokensAndAmount[i].Token) + // If we don't have a price for the token, we will assume it's worth 0. + lggr.Infof("No price for token %s, assuming 0", tokensAndAmount[i].Token) + continue } sum.Add(sum, new(big.Int).Quo(new(big.Int).Mul(price, tokensAndAmount[i].Amount), big.NewInt(1e18))) } @@ -775,7 +778,8 @@ func (r *ExecutionReportingPlugin) Close() error { return nil } -func inflightAggregates( +func getInflightAggregateRateLimit( + lggr logger.Logger, inflight []InflightInternalExecutionReport, destTokenPrices map[cciptypes.Address]*big.Int, sourceToDest map[cciptypes.Address]cciptypes.Address, @@ -784,7 +788,7 @@ func inflightAggregates( for _, rep := range inflight { for _, message := range rep.messages { - msgValue, err := aggregateTokenValue(destTokenPrices, sourceToDest, message.TokenAmounts) + msgValue, err := aggregateTokenValue(lggr, destTokenPrices, sourceToDest, message.TokenAmounts) if err != nil { return nil, err } diff --git a/core/services/ocr2/plugins/ccip/ccipexec/ocr2_test.go b/core/services/ocr2/plugins/ccip/ccipexec/ocr2_test.go index 44e419e5af..a061416e8c 100644 --- a/core/services/ocr2/plugins/ccip/ccipexec/ocr2_test.go +++ b/core/services/ocr2/plugins/ccip/ccipexec/ocr2_test.go @@ -507,7 +507,7 @@ func TestExecutionReportingPlugin_buildBatch(t *testing.T) { var tt = []struct { name string reqs []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta - inflight []InflightInternalExecutionReport + inflight *big.Int tokenLimit, destGasPrice *big.Int srcPrices, dstPrices map[cciptypes.Address]*big.Int offRampNoncesBySender map[cciptypes.Address]uint64 @@ -517,7 +517,7 @@ func TestExecutionReportingPlugin_buildBatch(t *testing.T) { { name: "single message no tokens", reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{msg1}, - inflight: []InflightInternalExecutionReport{}, + inflight: big.NewInt(0), tokenLimit: big.NewInt(0), destGasPrice: big.NewInt(10), srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1)}, @@ -528,7 +528,7 @@ func TestExecutionReportingPlugin_buildBatch(t *testing.T) { { name: "executed non finalized messages should be skipped", reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{msg2}, - inflight: []InflightInternalExecutionReport{}, + inflight: big.NewInt(0), tokenLimit: big.NewInt(0), destGasPrice: big.NewInt(10), srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1)}, @@ -539,7 +539,7 @@ func TestExecutionReportingPlugin_buildBatch(t *testing.T) { { name: "finalized executed log", reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{msg3}, - inflight: []InflightInternalExecutionReport{}, + inflight: big.NewInt(0), tokenLimit: big.NewInt(0), destGasPrice: big.NewInt(10), srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1)}, @@ -550,7 +550,7 @@ func TestExecutionReportingPlugin_buildBatch(t *testing.T) { { name: "dst token price does not exist", reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{msg2}, - inflight: []InflightInternalExecutionReport{}, + inflight: big.NewInt(0), tokenLimit: big.NewInt(0), destGasPrice: big.NewInt(10), srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1)}, @@ -561,7 +561,7 @@ func TestExecutionReportingPlugin_buildBatch(t *testing.T) { { name: "src token price does not exist", reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{msg2}, - inflight: []InflightInternalExecutionReport{}, + inflight: big.NewInt(0), tokenLimit: big.NewInt(0), destGasPrice: big.NewInt(10), srcPrices: map[cciptypes.Address]*big.Int{}, @@ -572,7 +572,7 @@ func TestExecutionReportingPlugin_buildBatch(t *testing.T) { { name: "message with tokens is not executed if limit is reached", reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{msg4}, - inflight: []InflightInternalExecutionReport{}, + inflight: big.NewInt(0), tokenLimit: big.NewInt(2), destGasPrice: big.NewInt(10), srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1e18)}, @@ -584,14 +584,9 @@ func TestExecutionReportingPlugin_buildBatch(t *testing.T) { expectedSeqNrs: nil, }, { - name: "message with tokens is not executed if limit is reached when inflight is full", - reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{msg5}, - inflight: []InflightInternalExecutionReport{ - { - createdAt: time.Now(), - messages: []cciptypes.EVM2EVMMessage{msg4.EVM2EVMMessage}, - }, - }, + name: "message with tokens is not executed if limit is reached when inflight is full", + reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{msg5}, + inflight: new(big.Int).Mul(big.NewInt(1e18), big.NewInt(100)), tokenLimit: big.NewInt(19), destGasPrice: big.NewInt(10), srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1e18)}, @@ -645,7 +640,7 @@ func TestExecutionReportingPlugin_buildBatch(t *testing.T) { BlockTimestamp: time.Date(2010, 1, 1, 12, 12, 12, 0, time.UTC), }, }, - inflight: []InflightInternalExecutionReport{}, + inflight: big.NewInt(0), tokenLimit: big.NewInt(0), destGasPrice: big.NewInt(10), srcPrices: map[cciptypes.Address]*big.Int{srcNative: big.NewInt(1)}, @@ -1141,6 +1136,7 @@ func Test_inflightAggregates(t *testing.T) { addrs[i] = cciptypes.Address(utils.RandomAddress().String()) tokenAddrs[i] = cciptypes.Address(utils.RandomAddress().String()) } + lggr := logger.TestLogger(t) testCases := []struct { name string @@ -1201,7 +1197,7 @@ func Test_inflightAggregates(t *testing.T) { expErr: false, }, { - name: "missing price", + name: "missing price should be 0", inflight: []InflightInternalExecutionReport{ { messages: []cciptypes.EVM2EVMMessage{ @@ -1222,7 +1218,8 @@ func Test_inflightAggregates(t *testing.T) { sourceToDest: map[cciptypes.Address]cciptypes.Address{ tokenAddrs[2]: tokenAddrs[3], }, - expErr: true, + expInflightAggrVal: big.NewInt(0), + expErr: false, }, { name: "nothing inflight", @@ -1237,8 +1234,12 @@ func Test_inflightAggregates(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - inflightAggrVal, err := inflightAggregates( - tc.inflight, tc.destTokenPrices, tc.sourceToDest) + inflightAggrVal, err := getInflightAggregateRateLimit( + lggr, + tc.inflight, + tc.destTokenPrices, + tc.sourceToDest, + ) if tc.expErr { assert.Error(t, err) diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/offramp.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/offramp.go index c9b2697251..5511c1185f 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/offramp.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/offramp.go @@ -254,10 +254,6 @@ func (o *OffRamp) CurrentRateLimiterState(ctx context.Context) (cciptypes.TokenB }, nil } -func (o *OffRamp) GetDestinationToken(ctx context.Context, address common.Address) (common.Address, error) { - return o.offRampV100.GetDestinationToken(&bind.CallOpts{Context: ctx}, address) -} - func (o *OffRamp) getDestinationTokensFromSourceTokens(ctx context.Context, tokenAddresses []cciptypes.Address) ([]cciptypes.Address, error) { destTokens := make([]cciptypes.Address, len(tokenAddresses)) found := make(map[cciptypes.Address]bool)