Skip to content

Commit

Permalink
Fetching only not executed messages
Browse files Browse the repository at this point in the history
  • Loading branch information
mateusz-sekara committed Nov 6, 2023
1 parent 5582391 commit 1966fe1
Show file tree
Hide file tree
Showing 17 changed files with 311 additions and 25 deletions.
4 changes: 4 additions & 0 deletions core/chains/evm/logpoller/disabled.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,7 @@ func (d disabled) LogsUntilBlockHashDataWordGreaterThan(eventSig common.Hash, ad
func (d disabled) FetchNotExecutedReports(commitStoreAddr common.Address, commitStoreEvent common.Hash, offrampAddress common.Address, offrampEventSig common.Hash, after time.Time, qopts ...pg.QOpt) ([]Log, error) {
return nil, ErrDisabled
}

func (d disabled) FetchNotExecutedMessages(onrampAddr common.Address, onrampEvent common.Hash, offrampAddress common.Address, offrampEventSig common.Hash, wordValueMin, wordValueMax common.Hash, qopts ...pg.QOpt) ([]Log, error) {
return nil, ErrDisabled
}
5 changes: 5 additions & 0 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type LogPoller interface {
LogsDataWordGreaterThan(eventSig common.Hash, address common.Address, wordIndex int, wordValueMin common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error)
LogsUntilBlockHashDataWordGreaterThan(eventSig common.Hash, address common.Address, wordIndex int, wordValueMin common.Hash, untilBlockHash common.Hash, qopts ...pg.QOpt) ([]Log, error)
FetchNotExecutedReports(commitStoreAddr common.Address, commitStoreEvent common.Hash, offrampAddress common.Address, offrampEventSig common.Hash, after time.Time, qopts ...pg.QOpt) ([]Log, error)
FetchNotExecutedMessages(onrampAddr common.Address, onrampEvent common.Hash, offrampAddress common.Address, offrampEventSig common.Hash, wordValueMin, wordValueMax common.Hash, qopts ...pg.QOpt) ([]Log, error)
}

type LogPollerTest interface {
Expand Down Expand Up @@ -1159,6 +1160,10 @@ func (lp *logPoller) FetchNotExecutedReports(commitStoreAddr common.Address, com
return lp.orm.FetchNotExecutedReports(commitStoreAddr, commitStoreEvent, offrampAddress, offrampEventSig, after, qopts...)
}

func (lp *logPoller) FetchNotExecutedMessages(onrampAddr common.Address, onrampEvent common.Hash, offrampAddress common.Address, offrampEventSig common.Hash, wordValueMin, wordValueMax common.Hash, qopts ...pg.QOpt) ([]Log, error) {
return lp.orm.FetchNotExecutedMessages(onrampAddr, onrampEvent, offrampAddress, offrampEventSig, wordValueMin, wordValueMax, qopts...)
}

func EvmWord(i uint64) common.Hash {
var b = make([]byte, 8)
binary.BigEndian.PutUint64(b, i)
Expand Down
92 changes: 85 additions & 7 deletions core/chains/evm/logpoller/log_poller_ccip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
)

const (
numberOfReports = 10000
numberOfReports = 1000
numberOfMessagesPerReport = 100
numberOfMessages = numberOfReports * numberOfMessagesPerReport
)
Expand Down Expand Up @@ -108,11 +108,9 @@ func populateDbWithSomeExecuted(b *testing.B, o *logpoller.DbORM, chainID *big.I
func populateDbWithMessages(b *testing.B, o *logpoller.DbORM, chainID *big.Int, onrampAddress common.Address, onrampEvent common.Hash) {
var logs []logpoller.Log
for i := 1; i <= numberOfMessages; i++ {
data := make([]byte, 64)
// MinSeqNr
data = append(data, logpoller.EvmWord(uint64(numberOfMessagesPerReport*i+1)).Bytes()...)
// MaxSeqNr
data = append(data, logpoller.EvmWord(uint64(numberOfMessagesPerReport*(i+1))).Bytes()...)
data := make([]byte, 128)
// SeqNr
data = append(data, logpoller.EvmWord(uint64(i)).Bytes()...)

logs = append(logs, logpoller.Log{
EvmChainId: utils.NewBig(chainID),
Expand All @@ -131,7 +129,6 @@ func populateDbWithMessages(b *testing.B, o *logpoller.DbORM, chainID *big.Int,
}
require.NoError(b, o.InsertBlock(utils.RandomAddress().Hash(), int64(100_000), time.Now()))
require.NoError(b, o.InsertLogs(logs))

}

func Benchmark_CreatedAfter(b *testing.B) {
Expand Down Expand Up @@ -318,3 +315,84 @@ func Benchmark_SingleQuerySomeExecuted(b *testing.B) {
require.Len(b, logs, numberOfReports)
}
}

// V1
// 168 millis
// Benchmark_AllMessagesExecuted-12 1 10373778417 ns/op

// V2
// 93 millis
//
// Benchmark_AllMessagesExecuted
// Benchmark_AllMessagesExecuted-12 14 83441804 ns/op
// Benchmark_AllMessagesExecuted-12 14 80466869 ns/op
// Benchmark_AllMessagesExecuted-12 12 83875597 ns/op
// Benchmark_AllMessagesExecuted-12 13 79230596 ns/op
// Benchmark_AllMessagesExecuted-12 13 83327372 ns/op
func Benchmark_AllMessagesExecuted(b *testing.B) {
chainId := big.NewInt(137)
_, db := heavyweight.FullTestDBV2(b, "msgs", nil)
o := logpoller.NewORM(chainId, db, logger.TestLogger(b), pgtest.NewQConfig(false))

offrampAddress := utils.RandomAddress()
offrampExecuted := common.HexToHash("0xd4f851956a5d67c3997d1c9205045fef79bae2947fdee7e9e2641abc7391ef65")

onrampAddress := utils.RandomAddress()
onrampEvent := common.HexToHash("0xaffc45517195d6499808c643bd4a7b0ffeedf95bea5852840d7bfcf63f59e821")

populateDbWithMessages(b, o, chainId, onrampAddress, onrampEvent)
populateDbWithExecutionStateChanges(b, o, chainId, offrampAddress, offrampExecuted)
b.ResetTimer()

for i := 0; i < b.N; i++ {
//start := time.Now()
logs, err := o.FetchNotExecutedMessages(
onrampAddress,
onrampEvent,
offrampAddress,
offrampExecuted,
logpoller.EvmWord(0),
logpoller.EvmWord(numberOfMessages),
)
//fmt.Printf("%d millis\n", time.Since(start).Milliseconds())
require.NoError(b, err)
require.Len(b, logs, 0)
}
}

// V1
// 189 millis
// Benchmark_NoneMessagesExecuted-12 1 3135554125 ns/op
// V2
// 230 millis
// Benchmark_NoneMessagesExecuted-12 1 3809230583 ns/op

func Benchmark_NoneMessagesExecuted(b *testing.B) {
chainId := big.NewInt(137)
_, db := heavyweight.FullTestDBV2(b, "msgs", nil)
o := logpoller.NewORM(chainId, db, logger.TestLogger(b), pgtest.NewQConfig(false))

offrampAddress := utils.RandomAddress()
offrampExecuted := common.HexToHash("0xd4f851956a5d67c3997d1c9205045fef79bae2947fdee7e9e2641abc7391ef65")

onrampAddress := utils.RandomAddress()
onrampEvent := common.HexToHash("0xaffc45517195d6499808c643bd4a7b0ffeedf95bea5852840d7bfcf63f59e821")

populateDbWithMessages(b, o, chainId, onrampAddress, onrampEvent)
b.ResetTimer()

for i := 0; i < b.N; i++ {
start := time.Now()
logs, err := o.FetchNotExecutedMessages(
onrampAddress,
onrampEvent,
offrampAddress,
offrampExecuted,
logpoller.EvmWord(0),
logpoller.EvmWord(numberOfMessages),
)
fmt.Printf("%d millis\n", time.Since(start).Milliseconds())
require.NoError(b, err)
require.Len(b, logs, numberOfMessages)
}
}
33 changes: 33 additions & 0 deletions core/chains/evm/logpoller/mocks/log_poller.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions core/chains/evm/logpoller/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,12 @@ func (o *ObservedORM) FetchNotExecutedReports(commitStoreAddr common.Address, co
})
}

func (o *ObservedORM) FetchNotExecutedMessages(onrampAddr common.Address, onrampEvent common.Hash, offrampAddress common.Address, offrampEventSig common.Hash, wordValueMin, wordValueMax common.Hash, qopts ...pg.QOpt) ([]Log, error) {
return withObservedQueryAndResults(o, "FetchNotExecutedMessages", func() ([]Log, error) {
return o.ORM.FetchNotExecutedMessages(onrampAddr, onrampEvent, offrampAddress, offrampEventSig, wordValueMin, wordValueMax, qopts...)
})
}

func withObservedQueryAndResults[T any](o *ObservedORM, queryName string, query func() ([]T, error)) ([]T, error) {
results, err := withObservedQuery(o, queryName, query)
if err == nil {
Expand Down
94 changes: 94 additions & 0 deletions core/chains/evm/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type ORM interface {
SelectLogsDataWordGreaterThan(address common.Address, eventSig common.Hash, wordIndex int, wordValueMin common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error)
SelectLogsUntilBlockHashDataWordGreaterThan(address common.Address, eventSig common.Hash, wordIndex int, wordValueMin common.Hash, untilBlockHash common.Hash, qopts ...pg.QOpt) ([]Log, error)
FetchNotExecutedReports(commitStoreAddr common.Address, commitStoreEvent common.Hash, offrampAddress common.Address, offrampEventSig common.Hash, after time.Time, qopts ...pg.QOpt) ([]Log, error)
FetchNotExecutedMessages(onrampAddr common.Address, onrampEvent common.Hash, offrampAddress common.Address, offrampEventSig common.Hash, wordValueMin, wordValueMax common.Hash, qopts ...pg.QOpt) ([]Log, error)
}

type DbORM struct {
Expand Down Expand Up @@ -428,6 +429,99 @@ func (o *DbORM) FetchNotExecutedReportsV4(
return logs, err
}

func (o *DbORM) FetchNotExecutedMessages(
onrampAddr common.Address,
onrampEvent common.Hash,
offrampAddress common.Address,
offrampEventSig common.Hash,
wordValueMin, wordValueMax common.Hash,
qopts ...pg.QOpt,
) ([]Log, error) {
return o.FetchNotExecutedMessagesV2(onrampAddr, onrampEvent, offrampAddress, offrampEventSig, wordValueMin, wordValueMax, qopts...)
}

func (o *DbORM) FetchNotExecutedMessagesV1(
onrampAddr common.Address,
onrampEvent common.Hash,
offrampAddress common.Address,
offrampEventSig common.Hash,
wordValueMin, wordValueMax common.Hash,
qopts ...pg.QOpt,
) ([]Log, error) {
queryArgs := map[string]interface{}{
"chain_id": utils.NewBig(o.chainID),
"onramp_addr": onrampAddr,
"onramp_event": onrampEvent,
"offramp_addr": offrampAddress,
"offramp_event": offrampEventSig,
"topic_index": 2,
"seq_word_index": 4,
"min_seq_nr": wordValueMin,
"max_seq_nr": wordValueMax,
}

var logs []Log
q := o.q.WithOpts(qopts...)
err := q.SelectNamed(&logs, `SELECT messages.* FROM evm.logs messages
LEFT JOIN evm.logs executed
ON executed.topics[:topic_index] = substring(messages.data from 32 * :seq_word_index + 1 for 32)
AND executed.evm_chain_id = :chain_id
AND executed.address = :offramp_addr
AND executed.event_sig = :offramp_event
WHERE messages.evm_chain_id = :chain_id
AND messages.address = :onramp_addr
AND messages.event_sig = :onramp_event
AND substring(messages.data from 32*:seq_word_index+1 for 32) >= :min_seq_nr
AND substring(messages.data from 32*:seq_word_index+1 for 32) <= :max_seq_nr
AND executed.address IS NULL
ORDER BY (messages.block_number, messages.log_index)`, queryArgs)
if err != nil {
return nil, err
}
return logs, nil
}

func (o *DbORM) FetchNotExecutedMessagesV2(
onrampAddr common.Address,
onrampEvent common.Hash,
offrampAddress common.Address,
offrampEventSig common.Hash,
wordValueMin, wordValueMax common.Hash,
qopts ...pg.QOpt,
) ([]Log, error) {
queryArgs := map[string]interface{}{
"chain_id": utils.NewBig(o.chainID),
"onramp_addr": onrampAddr,
"onramp_event": onrampEvent,
"offramp_addr": offrampAddress,
"offramp_event": offrampEventSig,
"topic_index": 2,
"seq_word_index": 4,
"min_seq_nr": wordValueMin,
"max_seq_nr": wordValueMax,
}

var logs []Log
q := o.q.WithOpts(qopts...)
err := q.SelectNamed(&logs, `SELECT messages.* FROM evm.logs messages
WHERE messages.evm_chain_id = :chain_id
AND messages.address = :onramp_addr
AND messages.event_sig = :onramp_event
AND substring(messages.data from 32*:seq_word_index+1 for 32) BETWEEN :min_seq_nr AND :max_seq_nr
AND substring(messages.data from 32*:seq_word_index+1 for 32) NOT IN (
SELECT executed.topics[:topic_index] from evm.logs executed
WHERE executed.evm_chain_id = :chain_id
AND executed.address = :offramp_addr
AND executed.event_sig = :offramp_event
AND executed.topics[:topic_index] BETWEEN :min_seq_nr AND :max_seq_nr
)
ORDER BY (messages.block_number, messages.log_index)`, queryArgs)
if err != nil {
return nil, err
}
return logs, nil
}

// SelectLogsWithSigsByBlockRangeFilter finds the logs in the given block range with the given event signatures
// emitted from the given address.
func (o *DbORM) SelectLogsWithSigs(start, end int64, address common.Address, eventSigs []common.Hash, qopts ...pg.QOpt) (logs []Log, err error) {
Expand Down
2 changes: 1 addition & 1 deletion core/services/ocr2/plugins/ccip/commit_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func jobSpecToCommitPluginConfig(lggr logger.Logger, jb job.Job, pr pipeline.Run
}

// Load all the readers relevant for this plugin.
onRampReader, err := ccipdata.NewOnRampReader(commitLggr, staticConfig.SourceChainSelector, staticConfig.ChainSelector, staticConfig.OnRamp, sourceChain.LogPoller(), sourceChain.Client(), sourceChain.Config().EVM().FinalityTagEnabled(), qopts...)
onRampReader, err := ccipdata.NewOnRampReader(commitLggr, staticConfig.SourceChainSelector, staticConfig.ChainSelector, staticConfig.OnRamp, common.Address{}, sourceChain.LogPoller(), sourceChain.Client(), sourceChain.Config().EVM().FinalityTagEnabled(), qopts...)
if err != nil {
return nil, nil, errors.Wrap(err, "failed onramp reader")
}
Expand Down
2 changes: 1 addition & 1 deletion core/services/ocr2/plugins/ccip/execution_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func jobSpecToExecPluginConfig(lggr logger.Logger, jb job.Job, chainSet evm.Lega
return nil, nil, errors.Wrap(err, "could not load commitStoreReader reader")
}
onRampReader, err := ccipdata.NewOnRampReader(execLggr, offRampConfig.SourceChainSelector,
offRampConfig.ChainSelector, offRampConfig.OnRamp, sourceChain.LogPoller(), sourceChain.Client(), sourceChain.Config().EVM().FinalityTagEnabled())
offRampConfig.ChainSelector, offRampConfig.OnRamp, common.HexToAddress(spec.ContractID), sourceChain.LogPoller(), sourceChain.Client(), sourceChain.Config().EVM().FinalityTagEnabled())
if err != nil {
return nil, nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -772,7 +772,7 @@ func (r *ExecutionReportingPlugin) getReportsWithSendRequests(

var sendRequests []ccipdata.Event[internal.EVM2EVMMessage]
eg.Go(func() error {
sendReqs, err := r.config.onRampReader.GetSendRequestsBetweenSeqNums(
sendReqs, err := r.config.onRampReader.GetSendRequestsBetweenSeqNumsV2(
ctx,
intervalMin,
intervalMax,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@ type OnRampReader interface {
GetSendRequestsGteSeqNum(ctx context.Context, seqNum uint64, confs int) ([]Event[internal.EVM2EVMMessage], error)
// GetSendRequestsBetweenSeqNums returns all the message send requests in the provided sequence numbers range (inclusive).
GetSendRequestsBetweenSeqNums(ctx context.Context, seqNumMin, seqNumMax uint64, confs int) ([]Event[internal.EVM2EVMMessage], error)
GetSendRequestsBetweenSeqNumsV2(ctx context.Context, seqNumMin, seqNumMax uint64, confs int) ([]Event[internal.EVM2EVMMessage], error)
// Get router configured in the onRamp
RouterAddress() (common.Address, error)
}

// NewOnRampReader determines the appropriate version of the onramp and returns a reader for it
func NewOnRampReader(lggr logger.Logger, sourceSelector, destSelector uint64, onRampAddress common.Address, sourceLP logpoller.LogPoller, source client.Client, finalityTags bool, qopts ...pg.QOpt) (OnRampReader, error) {
func NewOnRampReader(lggr logger.Logger, sourceSelector, destSelector uint64, onRampAddress common.Address, offRampAddress common.Address, sourceLP logpoller.LogPoller, source client.Client, finalityTags bool, qopts ...pg.QOpt) (OnRampReader, error) {
contractType, version, err := ccipconfig.TypeAndVersion(onRampAddress, source)
if err != nil {
return nil, errors.Errorf("expected '%v' got '%v' (%v)", ccipconfig.EVM2EVMOnRamp, contractType, err)
Expand All @@ -50,7 +51,7 @@ func NewOnRampReader(lggr logger.Logger, sourceSelector, destSelector uint64, on
case V1_1_0:
return NewOnRampV1_1_0(lggr, sourceSelector, destSelector, onRampAddress, sourceLP, source, finalityTags)
case V1_2_0:
return NewOnRampV1_2_0(lggr, sourceSelector, destSelector, onRampAddress, sourceLP, source, finalityTags)
return NewOnRampV1_2_0(lggr, sourceSelector, destSelector, onRampAddress, offRampAddress, sourceLP, source, finalityTags)
default:
return nil, errors.Errorf("got unexpected version %v", version.String())
}
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 1966fe1

Please sign in to comment.