diff --git a/core/chains/evm/logpoller/disabled.go b/core/chains/evm/logpoller/disabled.go index 9da54227a7..14f13a2b52 100644 --- a/core/chains/evm/logpoller/disabled.go +++ b/core/chains/evm/logpoller/disabled.go @@ -112,3 +112,7 @@ func (d disabled) LogsUntilBlockHashDataWordGreaterThan(eventSig common.Hash, ad func (d disabled) FetchNotExecutedReports(commitStoreAddr common.Address, commitStoreEvent common.Hash, offrampAddress common.Address, offrampEventSig common.Hash, after time.Time, qopts ...pg.QOpt) ([]Log, error) { return nil, ErrDisabled } + +func (d disabled) FetchNotExecutedMessages(onrampAddr common.Address, onrampEvent common.Hash, offrampAddress common.Address, offrampEventSig common.Hash, wordValueMin, wordValueMax common.Hash, qopts ...pg.QOpt) ([]Log, error) { + return nil, ErrDisabled +} diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index 472bba1c2c..c02e463d05 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -60,6 +60,7 @@ type LogPoller interface { LogsDataWordGreaterThan(eventSig common.Hash, address common.Address, wordIndex int, wordValueMin common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) LogsUntilBlockHashDataWordGreaterThan(eventSig common.Hash, address common.Address, wordIndex int, wordValueMin common.Hash, untilBlockHash common.Hash, qopts ...pg.QOpt) ([]Log, error) FetchNotExecutedReports(commitStoreAddr common.Address, commitStoreEvent common.Hash, offrampAddress common.Address, offrampEventSig common.Hash, after time.Time, qopts ...pg.QOpt) ([]Log, error) + FetchNotExecutedMessages(onrampAddr common.Address, onrampEvent common.Hash, offrampAddress common.Address, offrampEventSig common.Hash, wordValueMin, wordValueMax common.Hash, qopts ...pg.QOpt) ([]Log, error) } type LogPollerTest interface { @@ -1159,6 +1160,10 @@ func (lp *logPoller) FetchNotExecutedReports(commitStoreAddr common.Address, com return lp.orm.FetchNotExecutedReports(commitStoreAddr, commitStoreEvent, offrampAddress, offrampEventSig, after, qopts...) } +func (lp *logPoller) FetchNotExecutedMessages(onrampAddr common.Address, onrampEvent common.Hash, offrampAddress common.Address, offrampEventSig common.Hash, wordValueMin, wordValueMax common.Hash, qopts ...pg.QOpt) ([]Log, error) { + return lp.orm.FetchNotExecutedMessages(onrampAddr, onrampEvent, offrampAddress, offrampEventSig, wordValueMin, wordValueMax, qopts...) +} + func EvmWord(i uint64) common.Hash { var b = make([]byte, 8) binary.BigEndian.PutUint64(b, i) diff --git a/core/chains/evm/logpoller/log_poller_ccip_test.go b/core/chains/evm/logpoller/log_poller_ccip_test.go index 6a49641141..3c9e333fef 100644 --- a/core/chains/evm/logpoller/log_poller_ccip_test.go +++ b/core/chains/evm/logpoller/log_poller_ccip_test.go @@ -18,7 +18,7 @@ import ( ) const ( - numberOfReports = 10000 + numberOfReports = 1000 numberOfMessagesPerReport = 100 numberOfMessages = numberOfReports * numberOfMessagesPerReport ) @@ -108,11 +108,9 @@ func populateDbWithSomeExecuted(b *testing.B, o *logpoller.DbORM, chainID *big.I func populateDbWithMessages(b *testing.B, o *logpoller.DbORM, chainID *big.Int, onrampAddress common.Address, onrampEvent common.Hash) { var logs []logpoller.Log for i := 1; i <= numberOfMessages; i++ { - data := make([]byte, 64) - // MinSeqNr - data = append(data, logpoller.EvmWord(uint64(numberOfMessagesPerReport*i+1)).Bytes()...) - // MaxSeqNr - data = append(data, logpoller.EvmWord(uint64(numberOfMessagesPerReport*(i+1))).Bytes()...) + data := make([]byte, 128) + // SeqNr + data = append(data, logpoller.EvmWord(uint64(i)).Bytes()...) logs = append(logs, logpoller.Log{ EvmChainId: utils.NewBig(chainID), @@ -131,7 +129,6 @@ func populateDbWithMessages(b *testing.B, o *logpoller.DbORM, chainID *big.Int, } require.NoError(b, o.InsertBlock(utils.RandomAddress().Hash(), int64(100_000), time.Now())) require.NoError(b, o.InsertLogs(logs)) - } func Benchmark_CreatedAfter(b *testing.B) { @@ -318,3 +315,84 @@ func Benchmark_SingleQuerySomeExecuted(b *testing.B) { require.Len(b, logs, numberOfReports) } } + +// V1 +// 168 millis +// Benchmark_AllMessagesExecuted-12 1 10373778417 ns/op + +// V2 +// 93 millis +// +// Benchmark_AllMessagesExecuted +// Benchmark_AllMessagesExecuted-12 14 83441804 ns/op +// Benchmark_AllMessagesExecuted-12 14 80466869 ns/op +// Benchmark_AllMessagesExecuted-12 12 83875597 ns/op +// Benchmark_AllMessagesExecuted-12 13 79230596 ns/op +// Benchmark_AllMessagesExecuted-12 13 83327372 ns/op +func Benchmark_AllMessagesExecuted(b *testing.B) { + chainId := big.NewInt(137) + _, db := heavyweight.FullTestDBV2(b, "msgs", nil) + o := logpoller.NewORM(chainId, db, logger.TestLogger(b), pgtest.NewQConfig(false)) + + offrampAddress := utils.RandomAddress() + offrampExecuted := common.HexToHash("0xd4f851956a5d67c3997d1c9205045fef79bae2947fdee7e9e2641abc7391ef65") + + onrampAddress := utils.RandomAddress() + onrampEvent := common.HexToHash("0xaffc45517195d6499808c643bd4a7b0ffeedf95bea5852840d7bfcf63f59e821") + + populateDbWithMessages(b, o, chainId, onrampAddress, onrampEvent) + populateDbWithExecutionStateChanges(b, o, chainId, offrampAddress, offrampExecuted) + b.ResetTimer() + + for i := 0; i < b.N; i++ { + //start := time.Now() + logs, err := o.FetchNotExecutedMessages( + onrampAddress, + onrampEvent, + offrampAddress, + offrampExecuted, + logpoller.EvmWord(0), + logpoller.EvmWord(numberOfMessages), + ) + //fmt.Printf("%d millis\n", time.Since(start).Milliseconds()) + require.NoError(b, err) + require.Len(b, logs, 0) + } +} + +// V1 +// 189 millis +// Benchmark_NoneMessagesExecuted-12 1 3135554125 ns/op +// V2 +// 230 millis +// Benchmark_NoneMessagesExecuted-12 1 3809230583 ns/op + +func Benchmark_NoneMessagesExecuted(b *testing.B) { + chainId := big.NewInt(137) + _, db := heavyweight.FullTestDBV2(b, "msgs", nil) + o := logpoller.NewORM(chainId, db, logger.TestLogger(b), pgtest.NewQConfig(false)) + + offrampAddress := utils.RandomAddress() + offrampExecuted := common.HexToHash("0xd4f851956a5d67c3997d1c9205045fef79bae2947fdee7e9e2641abc7391ef65") + + onrampAddress := utils.RandomAddress() + onrampEvent := common.HexToHash("0xaffc45517195d6499808c643bd4a7b0ffeedf95bea5852840d7bfcf63f59e821") + + populateDbWithMessages(b, o, chainId, onrampAddress, onrampEvent) + b.ResetTimer() + + for i := 0; i < b.N; i++ { + start := time.Now() + logs, err := o.FetchNotExecutedMessages( + onrampAddress, + onrampEvent, + offrampAddress, + offrampExecuted, + logpoller.EvmWord(0), + logpoller.EvmWord(numberOfMessages), + ) + fmt.Printf("%d millis\n", time.Since(start).Milliseconds()) + require.NoError(b, err) + require.Len(b, logs, numberOfMessages) + } +} diff --git a/core/chains/evm/logpoller/mocks/log_poller.go b/core/chains/evm/logpoller/mocks/log_poller.go index e35d4fd4c6..bc528539af 100644 --- a/core/chains/evm/logpoller/mocks/log_poller.go +++ b/core/chains/evm/logpoller/mocks/log_poller.go @@ -35,6 +35,39 @@ func (_m *LogPoller) Close() error { return r0 } +// FetchNotExecutedMessages provides a mock function with given fields: onrampAddr, onrampEvent, offrampAddress, offrampEventSig, wordValueMin, wordValueMax, qopts +func (_m *LogPoller) FetchNotExecutedMessages(onrampAddr common.Address, onrampEvent common.Hash, offrampAddress common.Address, offrampEventSig common.Hash, wordValueMin common.Hash, wordValueMax common.Hash, qopts ...pg.QOpt) ([]logpoller.Log, error) { + _va := make([]interface{}, len(qopts)) + for _i := range qopts { + _va[_i] = qopts[_i] + } + var _ca []interface{} + _ca = append(_ca, onrampAddr, onrampEvent, offrampAddress, offrampEventSig, wordValueMin, wordValueMax) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 []logpoller.Log + var r1 error + if rf, ok := ret.Get(0).(func(common.Address, common.Hash, common.Address, common.Hash, common.Hash, common.Hash, ...pg.QOpt) ([]logpoller.Log, error)); ok { + return rf(onrampAddr, onrampEvent, offrampAddress, offrampEventSig, wordValueMin, wordValueMax, qopts...) + } + if rf, ok := ret.Get(0).(func(common.Address, common.Hash, common.Address, common.Hash, common.Hash, common.Hash, ...pg.QOpt) []logpoller.Log); ok { + r0 = rf(onrampAddr, onrampEvent, offrampAddress, offrampEventSig, wordValueMin, wordValueMax, qopts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]logpoller.Log) + } + } + + if rf, ok := ret.Get(1).(func(common.Address, common.Hash, common.Address, common.Hash, common.Hash, common.Hash, ...pg.QOpt) error); ok { + r1 = rf(onrampAddr, onrampEvent, offrampAddress, offrampEventSig, wordValueMin, wordValueMax, qopts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // FetchNotExecutedReports provides a mock function with given fields: commitStoreAddr, commitStoreEvent, offrampAddress, offrampEventSig, after, qopts func (_m *LogPoller) FetchNotExecutedReports(commitStoreAddr common.Address, commitStoreEvent common.Hash, offrampAddress common.Address, offrampEventSig common.Hash, after time.Time, qopts ...pg.QOpt) ([]logpoller.Log, error) { _va := make([]interface{}, len(qopts)) diff --git a/core/chains/evm/logpoller/observability.go b/core/chains/evm/logpoller/observability.go index 983cc23bac..fb180e7792 100644 --- a/core/chains/evm/logpoller/observability.go +++ b/core/chains/evm/logpoller/observability.go @@ -246,6 +246,12 @@ func (o *ObservedORM) FetchNotExecutedReports(commitStoreAddr common.Address, co }) } +func (o *ObservedORM) FetchNotExecutedMessages(onrampAddr common.Address, onrampEvent common.Hash, offrampAddress common.Address, offrampEventSig common.Hash, wordValueMin, wordValueMax common.Hash, qopts ...pg.QOpt) ([]Log, error) { + return withObservedQueryAndResults(o, "FetchNotExecutedMessages", func() ([]Log, error) { + return o.ORM.FetchNotExecutedMessages(onrampAddr, onrampEvent, offrampAddress, offrampEventSig, wordValueMin, wordValueMax, qopts...) + }) +} + func withObservedQueryAndResults[T any](o *ObservedORM, queryName string, query func() ([]T, error)) ([]T, error) { results, err := withObservedQuery(o, queryName, query) if err == nil { diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index 0aafffbc0e..36fbd74719 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -55,6 +55,7 @@ type ORM interface { SelectLogsDataWordGreaterThan(address common.Address, eventSig common.Hash, wordIndex int, wordValueMin common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) SelectLogsUntilBlockHashDataWordGreaterThan(address common.Address, eventSig common.Hash, wordIndex int, wordValueMin common.Hash, untilBlockHash common.Hash, qopts ...pg.QOpt) ([]Log, error) FetchNotExecutedReports(commitStoreAddr common.Address, commitStoreEvent common.Hash, offrampAddress common.Address, offrampEventSig common.Hash, after time.Time, qopts ...pg.QOpt) ([]Log, error) + FetchNotExecutedMessages(onrampAddr common.Address, onrampEvent common.Hash, offrampAddress common.Address, offrampEventSig common.Hash, wordValueMin, wordValueMax common.Hash, qopts ...pg.QOpt) ([]Log, error) } type DbORM struct { @@ -428,6 +429,99 @@ func (o *DbORM) FetchNotExecutedReportsV4( return logs, err } +func (o *DbORM) FetchNotExecutedMessages( + onrampAddr common.Address, + onrampEvent common.Hash, + offrampAddress common.Address, + offrampEventSig common.Hash, + wordValueMin, wordValueMax common.Hash, + qopts ...pg.QOpt, +) ([]Log, error) { + return o.FetchNotExecutedMessagesV2(onrampAddr, onrampEvent, offrampAddress, offrampEventSig, wordValueMin, wordValueMax, qopts...) +} + +func (o *DbORM) FetchNotExecutedMessagesV1( + onrampAddr common.Address, + onrampEvent common.Hash, + offrampAddress common.Address, + offrampEventSig common.Hash, + wordValueMin, wordValueMax common.Hash, + qopts ...pg.QOpt, +) ([]Log, error) { + queryArgs := map[string]interface{}{ + "chain_id": utils.NewBig(o.chainID), + "onramp_addr": onrampAddr, + "onramp_event": onrampEvent, + "offramp_addr": offrampAddress, + "offramp_event": offrampEventSig, + "topic_index": 2, + "seq_word_index": 4, + "min_seq_nr": wordValueMin, + "max_seq_nr": wordValueMax, + } + + var logs []Log + q := o.q.WithOpts(qopts...) + err := q.SelectNamed(&logs, `SELECT messages.* FROM evm.logs messages + LEFT JOIN evm.logs executed + ON executed.topics[:topic_index] = substring(messages.data from 32 * :seq_word_index + 1 for 32) + AND executed.evm_chain_id = :chain_id + AND executed.address = :offramp_addr + AND executed.event_sig = :offramp_event + WHERE messages.evm_chain_id = :chain_id + AND messages.address = :onramp_addr + AND messages.event_sig = :onramp_event + AND substring(messages.data from 32*:seq_word_index+1 for 32) >= :min_seq_nr + AND substring(messages.data from 32*:seq_word_index+1 for 32) <= :max_seq_nr + AND executed.address IS NULL + ORDER BY (messages.block_number, messages.log_index)`, queryArgs) + if err != nil { + return nil, err + } + return logs, nil +} + +func (o *DbORM) FetchNotExecutedMessagesV2( + onrampAddr common.Address, + onrampEvent common.Hash, + offrampAddress common.Address, + offrampEventSig common.Hash, + wordValueMin, wordValueMax common.Hash, + qopts ...pg.QOpt, +) ([]Log, error) { + queryArgs := map[string]interface{}{ + "chain_id": utils.NewBig(o.chainID), + "onramp_addr": onrampAddr, + "onramp_event": onrampEvent, + "offramp_addr": offrampAddress, + "offramp_event": offrampEventSig, + "topic_index": 2, + "seq_word_index": 4, + "min_seq_nr": wordValueMin, + "max_seq_nr": wordValueMax, + } + + var logs []Log + q := o.q.WithOpts(qopts...) + err := q.SelectNamed(&logs, `SELECT messages.* FROM evm.logs messages + WHERE messages.evm_chain_id = :chain_id + AND messages.address = :onramp_addr + AND messages.event_sig = :onramp_event + AND substring(messages.data from 32*:seq_word_index+1 for 32) BETWEEN :min_seq_nr AND :max_seq_nr + AND substring(messages.data from 32*:seq_word_index+1 for 32) NOT IN ( + SELECT executed.topics[:topic_index] from evm.logs executed + WHERE executed.evm_chain_id = :chain_id + AND executed.address = :offramp_addr + AND executed.event_sig = :offramp_event + AND executed.topics[:topic_index] BETWEEN :min_seq_nr AND :max_seq_nr + ) + ORDER BY (messages.block_number, messages.log_index)`, queryArgs) + if err != nil { + return nil, err + } + return logs, nil +} + // SelectLogsWithSigsByBlockRangeFilter finds the logs in the given block range with the given event signatures // emitted from the given address. func (o *DbORM) SelectLogsWithSigs(start, end int64, address common.Address, eventSigs []common.Hash, qopts ...pg.QOpt) (logs []Log, err error) { diff --git a/core/services/ocr2/plugins/ccip/commit_plugin.go b/core/services/ocr2/plugins/ccip/commit_plugin.go index f8eaa01f1a..4e8d0dbc63 100644 --- a/core/services/ocr2/plugins/ccip/commit_plugin.go +++ b/core/services/ocr2/plugins/ccip/commit_plugin.go @@ -81,7 +81,7 @@ func jobSpecToCommitPluginConfig(lggr logger.Logger, jb job.Job, pr pipeline.Run } // Load all the readers relevant for this plugin. - onRampReader, err := ccipdata.NewOnRampReader(commitLggr, staticConfig.SourceChainSelector, staticConfig.ChainSelector, staticConfig.OnRamp, sourceChain.LogPoller(), sourceChain.Client(), sourceChain.Config().EVM().FinalityTagEnabled(), qopts...) + onRampReader, err := ccipdata.NewOnRampReader(commitLggr, staticConfig.SourceChainSelector, staticConfig.ChainSelector, staticConfig.OnRamp, common.Address{}, sourceChain.LogPoller(), sourceChain.Client(), sourceChain.Config().EVM().FinalityTagEnabled(), qopts...) if err != nil { return nil, nil, errors.Wrap(err, "failed onramp reader") } diff --git a/core/services/ocr2/plugins/ccip/execution_plugin.go b/core/services/ocr2/plugins/ccip/execution_plugin.go index ae3a42c2af..26dd778640 100644 --- a/core/services/ocr2/plugins/ccip/execution_plugin.go +++ b/core/services/ocr2/plugins/ccip/execution_plugin.go @@ -102,7 +102,7 @@ func jobSpecToExecPluginConfig(lggr logger.Logger, jb job.Job, chainSet evm.Lega return nil, nil, errors.Wrap(err, "could not load commitStoreReader reader") } onRampReader, err := ccipdata.NewOnRampReader(execLggr, offRampConfig.SourceChainSelector, - offRampConfig.ChainSelector, offRampConfig.OnRamp, sourceChain.LogPoller(), sourceChain.Client(), sourceChain.Config().EVM().FinalityTagEnabled()) + offRampConfig.ChainSelector, offRampConfig.OnRamp, common.HexToAddress(spec.ContractID), sourceChain.LogPoller(), sourceChain.Client(), sourceChain.Config().EVM().FinalityTagEnabled()) if err != nil { return nil, nil, err } diff --git a/core/services/ocr2/plugins/ccip/execution_reporting_plugin.go b/core/services/ocr2/plugins/ccip/execution_reporting_plugin.go index fad06f43ed..d7a9870c70 100644 --- a/core/services/ocr2/plugins/ccip/execution_reporting_plugin.go +++ b/core/services/ocr2/plugins/ccip/execution_reporting_plugin.go @@ -772,7 +772,7 @@ func (r *ExecutionReportingPlugin) getReportsWithSendRequests( var sendRequests []ccipdata.Event[internal.EVM2EVMMessage] eg.Go(func() error { - sendReqs, err := r.config.onRampReader.GetSendRequestsBetweenSeqNums( + sendReqs, err := r.config.onRampReader.GetSendRequestsBetweenSeqNumsV2( ctx, intervalMin, intervalMax, diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/onramp_reader.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/onramp_reader.go index 4e27b808d5..191e870016 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/onramp_reader.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/onramp_reader.go @@ -34,12 +34,13 @@ type OnRampReader interface { GetSendRequestsGteSeqNum(ctx context.Context, seqNum uint64, confs int) ([]Event[internal.EVM2EVMMessage], error) // GetSendRequestsBetweenSeqNums returns all the message send requests in the provided sequence numbers range (inclusive). GetSendRequestsBetweenSeqNums(ctx context.Context, seqNumMin, seqNumMax uint64, confs int) ([]Event[internal.EVM2EVMMessage], error) + GetSendRequestsBetweenSeqNumsV2(ctx context.Context, seqNumMin, seqNumMax uint64, confs int) ([]Event[internal.EVM2EVMMessage], error) // Get router configured in the onRamp RouterAddress() (common.Address, error) } // NewOnRampReader determines the appropriate version of the onramp and returns a reader for it -func NewOnRampReader(lggr logger.Logger, sourceSelector, destSelector uint64, onRampAddress common.Address, sourceLP logpoller.LogPoller, source client.Client, finalityTags bool, qopts ...pg.QOpt) (OnRampReader, error) { +func NewOnRampReader(lggr logger.Logger, sourceSelector, destSelector uint64, onRampAddress common.Address, offRampAddress common.Address, sourceLP logpoller.LogPoller, source client.Client, finalityTags bool, qopts ...pg.QOpt) (OnRampReader, error) { contractType, version, err := ccipconfig.TypeAndVersion(onRampAddress, source) if err != nil { return nil, errors.Errorf("expected '%v' got '%v' (%v)", ccipconfig.EVM2EVMOnRamp, contractType, err) @@ -50,7 +51,7 @@ func NewOnRampReader(lggr logger.Logger, sourceSelector, destSelector uint64, on case V1_1_0: return NewOnRampV1_1_0(lggr, sourceSelector, destSelector, onRampAddress, sourceLP, source, finalityTags) case V1_2_0: - return NewOnRampV1_2_0(lggr, sourceSelector, destSelector, onRampAddress, sourceLP, source, finalityTags) + return NewOnRampV1_2_0(lggr, sourceSelector, destSelector, onRampAddress, offRampAddress, sourceLP, source, finalityTags) default: return nil, errors.Errorf("got unexpected version %v", version.String()) } diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/onramp_reader_mock.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/onramp_reader_mock.go index 7ff71a50b4..51192027f2 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/onramp_reader_mock.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/onramp_reader_mock.go @@ -64,6 +64,32 @@ func (_m *MockOnRampReader) GetSendRequestsBetweenSeqNums(ctx context.Context, s return r0, r1 } +// GetSendRequestsBetweenSeqNumsV2 provides a mock function with given fields: ctx, seqNumMin, seqNumMax, confs +func (_m *MockOnRampReader) GetSendRequestsBetweenSeqNumsV2(ctx context.Context, seqNumMin uint64, seqNumMax uint64, confs int) ([]Event[internal.EVM2EVMMessage], error) { + ret := _m.Called(ctx, seqNumMin, seqNumMax, confs) + + var r0 []Event[internal.EVM2EVMMessage] + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, uint64, uint64, int) ([]Event[internal.EVM2EVMMessage], error)); ok { + return rf(ctx, seqNumMin, seqNumMax, confs) + } + if rf, ok := ret.Get(0).(func(context.Context, uint64, uint64, int) []Event[internal.EVM2EVMMessage]); ok { + r0 = rf(ctx, seqNumMin, seqNumMax, confs) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]Event[internal.EVM2EVMMessage]) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, uint64, uint64, int) error); ok { + r1 = rf(ctx, seqNumMin, seqNumMax, confs) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // GetSendRequestsGteSeqNum provides a mock function with given fields: ctx, seqNum, confs func (_m *MockOnRampReader) GetSendRequestsGteSeqNum(ctx context.Context, seqNum uint64, confs int) ([]Event[internal.EVM2EVMMessage], error) { ret := _m.Called(ctx, seqNum, confs) diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/onramp_reader_test.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/onramp_reader_test.go index b44f56fbd2..c0db5ef7de 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/onramp_reader_test.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/onramp_reader_test.go @@ -36,6 +36,7 @@ func TestNewOnRampReader_noContractAtAddress(t *testing.T) { logger.TestLogger(t), testutils.SimulatedChainID.Uint64(), testutils.SimulatedChainID.Uint64(), common.Address{}, + common.Address{}, lp, bc, true) @@ -94,7 +95,7 @@ func setupOnRampReaderTH(t *testing.T, version string) onRampReaderTH { } // Create the version-specific reader. - reader, err := ccipdata.NewOnRampReader(log, testutils.SimulatedChainID.Uint64(), testutils.SimulatedChainID.Uint64(), onRampAddress, lp, bc, false) + reader, err := ccipdata.NewOnRampReader(log, testutils.SimulatedChainID.Uint64(), testutils.SimulatedChainID.Uint64(), onRampAddress, common.Address{}, lp, bc, false) require.NoError(t, err) return onRampReaderTH{ diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/onramp_v1_0_0.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/onramp_v1_0_0.go index 99fe0857ec..550519f47e 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/onramp_v1_0_0.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/onramp_v1_0_0.go @@ -234,6 +234,22 @@ func (o *OnRampV1_0_0) GetSendRequestsBetweenSeqNums(ctx context.Context, seqNum return parseLogs[internal.EVM2EVMMessage](logs, o.lggr, o.logToMessage) } +func (o *OnRampV1_0_0) GetSendRequestsBetweenSeqNumsV2(ctx context.Context, seqNumMin, seqNumMax uint64, confs int) ([]Event[internal.EVM2EVMMessage], error) { + logs, err := o.lp.FetchNotExecutedMessages( + o.address, + o.sendRequestedEventSig, + common.Address{}, + common.Hash{}, + abihelpers.EvmWord(seqNumMin), + abihelpers.EvmWord(seqNumMax), + pg.WithParentCtx(ctx), + ) + if err != nil { + return nil, err + } + return parseLogs[internal.EVM2EVMMessage](logs, o.lggr, o.logToMessage) +} + func (o *OnRampV1_0_0) Close(qopts ...pg.QOpt) error { return o.lp.UnregisterFilter(o.filterName, qopts...) } diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/onramp_v1_2_0.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/onramp_v1_2_0.go index 5cc16aa905..22d2efc05d 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/onramp_v1_2_0.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/onramp_v1_2_0.go @@ -206,6 +206,8 @@ type OnRampV1_2_0 struct { filterName string sendRequestedEventSig common.Hash sendRequestedSeqNumberWord int + offrampAddress common.Address + offrampEventSig common.Hash } func (o *OnRampV1_2_0) logToMessage(log types.Log) (*internal.EVM2EVMMessage, error) { @@ -291,6 +293,22 @@ func (o *OnRampV1_2_0) GetSendRequestsBetweenSeqNums(ctx context.Context, seqNum return parseLogs[internal.EVM2EVMMessage](logs, o.lggr, o.logToMessage) } +func (o *OnRampV1_2_0) GetSendRequestsBetweenSeqNumsV2(ctx context.Context, seqNumMin, seqNumMax uint64, confs int) ([]Event[internal.EVM2EVMMessage], error) { + logs, err := o.lp.FetchNotExecutedMessages( + o.address, + o.sendRequestedEventSig, + o.offrampAddress, + o.offrampEventSig, + abihelpers.EvmWord(seqNumMin), + abihelpers.EvmWord(seqNumMax), + pg.WithParentCtx(ctx), + ) + if err != nil { + return nil, err + } + return parseLogs[internal.EVM2EVMMessage](logs, o.lggr, o.logToMessage) +} + func (o *OnRampV1_2_0) RouterAddress() (common.Address, error) { config, err := o.onRamp.GetDynamicConfig(nil) if err != nil { @@ -303,15 +321,7 @@ func (o *OnRampV1_2_0) Close(qopts ...pg.QOpt) error { return o.lp.UnregisterFilter(o.filterName, qopts...) } -func NewOnRampV1_2_0( - lggr logger.Logger, - sourceSelector, - destSelector uint64, - onRampAddress common.Address, - sourceLP logpoller.LogPoller, - source client.Client, - finalityTags bool, -) (*OnRampV1_2_0, error) { +func NewOnRampV1_2_0(lggr logger.Logger, sourceSelector, destSelector uint64, onRampAddress, offrampAddress common.Address, sourceLP logpoller.LogPoller, source client.Client, finalityTags bool) (*OnRampV1_2_0, error) { onRamp, err := evm_2_evm_onramp.NewEVM2EVMOnRamp(onRampAddress, source) if err != nil { return nil, err @@ -326,6 +336,7 @@ func NewOnRampV1_2_0( }); err != nil { return nil, err } + return &OnRampV1_2_0{ finalityTags: finalityTags, lggr: lggr, @@ -337,5 +348,7 @@ func NewOnRampV1_2_0( address: onRampAddress, sendRequestedSeqNumberWord: CCIPSendRequestSeqNumIndexV1_2_0, sendRequestedEventSig: CCIPSendRequestEventSigV1_2_0, + offrampAddress: offrampAddress, + offrampEventSig: abihelpers.MustGetEventID("ExecutionStateChanged", abihelpers.MustParseABI(evm_2_evm_offramp.EVM2EVMOffRampABI)), }, nil } diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/onramp_v1_2_0_test.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/onramp_v1_2_0_test.go index ac7e0bcd03..a44d7e92bd 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/onramp_v1_2_0_test.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/onramp_v1_2_0_test.go @@ -92,7 +92,8 @@ func TestLogPollerClient_GetSendRequestsGteSeqNum(t *testing.T) { t.Run("using confs", func(t *testing.T) { lp := mocks.NewLogPoller(t) lp.On("RegisterFilter", mock.Anything).Return(nil) - onRampV2, err := NewOnRampV1_2_0(lggr, 1, 1, onRampAddr, lp, nil, false) + _, err := NewOnRampV1_0_0(lggr, 1, 1, onRampAddr, lp, nil, false) + onRampV2, err := NewOnRampV1_2_0(lggr, 1, 1, onRampAddr, common.Address{}, lp, nil, false) require.NoError(t, err) lp.On("LogsDataWordGreaterThan", onRampV2.sendRequestedEventSig, @@ -120,7 +121,7 @@ func TestLogPollerClient_GetSendRequestsGteSeqNum(t *testing.T) { cl.On("HeaderByNumber", mock.Anything, mock.Anything).Return(h, nil) lp := mocks.NewLogPoller(t) lp.On("RegisterFilter", mock.Anything).Return(nil) - onRampV2, err := NewOnRampV1_2_0(lggr, 1, 1, onRampAddr, lp, cl, true) + onRampV2, err := NewOnRampV1_2_0(lggr, 1, 1, onRampAddr, common.Address{}, lp, cl, true) require.NoError(t, err) lp.On("LogsUntilBlockHashDataWordGreaterThan", onRampV2.sendRequestedEventSig, diff --git a/core/store/migrate/migrations/0199_unlimited_power.sql b/core/store/migrate/migrations/0199_unlimited_power.sql new file mode 100644 index 0000000000..d20b59fc74 --- /dev/null +++ b/core/store/migrate/migrations/0199_unlimited_power.sql @@ -0,0 +1,8 @@ +-- +goose Up +CREATE INDEX logs_idx_data_word_four ON evm.logs (substring(data from 97 for 32)); +CREATE INDEX logs_idx_data_word_five ON evm.logs (substring(data from 129 for 32)); + + +-- +goose Down +DROP INDEX IF EXISTS evm.logs_idx_data_word_five; +DROP INDEX IF EXISTS evm.logs_idx_data_word_four; \ No newline at end of file diff --git a/integration-tests/ccip-tests/testconfig/override/override.toml b/integration-tests/ccip-tests/testconfig/override/override.toml index c22c85fff5..f68fb87e61 100644 --- a/integration-tests/ccip-tests/testconfig/override/override.toml +++ b/integration-tests/ccip-tests/testconfig/override/override.toml @@ -16,7 +16,7 @@ KeepEnvAlive = true NumberOfCommitNodes = 16 MsgType = 'WithoutToken' PhaseTimeout = '50m' -TestDuration = '2h' +TestDuration = '4h' NodeFunding = 1000.0 RequestPerUnitTime = [1] TimeUnit = '1s'