Skip to content

Commit

Permalink
Test
Browse files Browse the repository at this point in the history
  • Loading branch information
mateusz-sekara committed Aug 6, 2024
1 parent a5c94c6 commit 3aa8224
Show file tree
Hide file tree
Showing 18 changed files with 189 additions and 61 deletions.
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ require (
github.com/shopspring/decimal v1.4.0
github.com/smartcontractkit/chain-selectors v1.0.19
github.com/smartcontractkit/chainlink-automation v1.0.4
github.com/smartcontractkit/chainlink-common v0.2.1-0.20240717132349-ee5af9b79834
github.com/smartcontractkit/chainlink-common v0.2.1-0.20240806072316-c2a953bab92e
github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000
github.com/smartcontractkit/libocr v0.0.0-20240717100443-f6226e09bee7
github.com/spf13/cobra v1.8.0
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1072,8 +1072,8 @@ github.com/smartcontractkit/chainlink-automation v1.0.4 h1:iyW181JjKHLNMnDleI8um
github.com/smartcontractkit/chainlink-automation v1.0.4/go.mod h1:u4NbPZKJ5XiayfKHD/v3z3iflQWqvtdhj13jVZXj/cM=
github.com/smartcontractkit/chainlink-ccip v0.0.0-20240729211818-87f9bb698f6e h1:aFYsUujFGiVwPAlvG94GHMIaLm8AFQ0V+d10AQD9ogY=
github.com/smartcontractkit/chainlink-ccip v0.0.0-20240729211818-87f9bb698f6e/go.mod h1:K67FldZZmg+//7yyI0yBMXEw6uvYevTj/ErBEvXaBXM=
github.com/smartcontractkit/chainlink-common v0.2.1-0.20240717132349-ee5af9b79834 h1:pTf4xdcmiWBqWZ6rTy2RMTDBzhHk89VC1pM7jXKQztI=
github.com/smartcontractkit/chainlink-common v0.2.1-0.20240717132349-ee5af9b79834/go.mod h1:fh9eBbrReCmv31bfz52ENCAMa7nTKQbdhb2B3+S2VGo=
github.com/smartcontractkit/chainlink-common v0.2.1-0.20240806072316-c2a953bab92e h1:WeZo5uVofbnE/HsHJPDKyE9F4iL1c2yKXInOBRFMdoU=
github.com/smartcontractkit/chainlink-common v0.2.1-0.20240806072316-c2a953bab92e/go.mod h1:fh9eBbrReCmv31bfz52ENCAMa7nTKQbdhb2B3+S2VGo=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240710121324-3ed288aa9b45 h1:NBQLtqk8zsyY4qTJs+NElI3aDFTcAo83JHvqD04EvB0=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240710121324-3ed288aa9b45/go.mod h1:LV0h7QBQUpoC2UUi6TcUvcIFm1xjP/DtEcqV8+qeLUs=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240718160222-2dc0c8136bfa h1:g75H8oh2ws52s8BekwvGQ9XvBVu3E7WM1rfiA0PN0zk=
Expand Down
10 changes: 3 additions & 7 deletions core/services/ocr2/plugins/ccip/ccipexec/batching.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,17 +459,13 @@ func validateSeqNumbers(serviceCtx context.Context, commitStore ccipdata.CommitS
}

// Gets the commit report from the saved logs for a given sequence number.
func getCommitReportForSeqNum(ctx context.Context, commitStoreReader ccipdata.CommitStoreReader, seqNum uint64) (cciptypes.CommitStoreReport, error) {
acceptedReports, err := commitStoreReader.GetCommitReportMatchingSeqNum(ctx, seqNum, 0)
func getCommitReportForRoot(ctx context.Context, commitStoreReader ccipdata.CommitStoreReader, root [32]byte) (cciptypes.CommitStoreReport, error) {
acceptedReports, err := commitStoreReader.GetCommitReport(ctx, root)
if err != nil {
return cciptypes.CommitStoreReport{}, err
}

if len(acceptedReports) == 0 {
return cciptypes.CommitStoreReport{}, errors.Errorf("seq number not committed")
}

return acceptedReports[0].CommitStoreReport, nil
return acceptedReports.CommitStoreReport, nil
}

type messageStatus string
Expand Down
48 changes: 30 additions & 18 deletions core/services/ocr2/plugins/ccip/ccipexec/ocr2.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/hashutil"
cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/cache"
Expand Down Expand Up @@ -118,14 +119,14 @@ func (r *ExecutionReportingPlugin) Observation(ctx context.Context, timestamp ty
r.inflightReports.expire(lggr)
inFlight := r.inflightReports.getAll()

executableObservations, err := r.getExecutableObservations(ctx, lggr, inFlight)
executableObservations, merkleRoot, err := r.getExecutableObservations(ctx, lggr, inFlight)
if err != nil {
return nil, err
}
// cap observations which fits MaxObservationLength (after serialized)
capped := sort.Search(len(executableObservations), func(i int) bool {
var encoded []byte
encoded, err = ccip.NewExecutionObservation(executableObservations[:i+1]).Marshal()
encoded, err = ccip.NewExecutionObservation(executableObservations[:i+1], merkleRoot).Marshal()
if err != nil {
// false makes Search keep looking to the right, always including any "erroring" ObservedMessage and allowing us to detect in the bottom
return false
Expand All @@ -139,18 +140,18 @@ func (r *ExecutionReportingPlugin) Observation(ctx context.Context, timestamp ty
r.metricsCollector.NumberOfMessagesProcessed(ccip.Observation, len(executableObservations))
lggr.Infow("Observation", "executableMessages", executableObservations)
// Note can be empty
return ccip.NewExecutionObservation(executableObservations).Marshal()
return ccip.NewExecutionObservation(executableObservations, merkleRoot).Marshal()
}

func (r *ExecutionReportingPlugin) getExecutableObservations(ctx context.Context, lggr logger.Logger, inflight []InflightInternalExecutionReport) ([]ccip.ObservedMessage, error) {
func (r *ExecutionReportingPlugin) getExecutableObservations(ctx context.Context, lggr logger.Logger, inflight []InflightInternalExecutionReport) ([]ccip.ObservedMessage, [32]byte, error) {
unexpiredReports, err := r.commitRootsCache.RootsEligibleForExecution(ctx)
if err != nil {
return nil, err
return nil, [32]byte{}, err
}
r.metricsCollector.UnexpiredCommitRoots(len(unexpiredReports))

if len(unexpiredReports) == 0 {
return []ccip.ObservedMessage{}, nil
return []ccip.ObservedMessage{}, [32]byte{}, nil
}

getExecTokenData := cache.LazyFunction[execTokenData](func() (execTokenData, error) {
Expand All @@ -163,7 +164,7 @@ func (r *ExecutionReportingPlugin) getExecutableObservations(ctx context.Context

unexpiredReportsWithSendReqs, err := r.getReportsWithSendRequests(ctx, unexpiredReportsPart)
if err != nil {
return nil, err
return nil, [32]byte{}, err
}

for _, unexpiredReport := range unexpiredReportsWithSendReqs {
Expand Down Expand Up @@ -198,7 +199,7 @@ func (r *ExecutionReportingPlugin) getExecutableObservations(ctx context.Context

blessed, err := r.commitStoreReader.IsBlessed(ctx, merkleRoot)
if err != nil {
return nil, err
return nil, [32]byte{}, err
}
if !blessed {
rootLggr.Infow("Report is accepted but not blessed")
Expand All @@ -207,7 +208,7 @@ func (r *ExecutionReportingPlugin) getExecutableObservations(ctx context.Context

tokenExecData, err := getExecTokenData()
if err != nil {
return nil, err
return nil, [32]byte{}, err
}

batch, msgExecStates := r.buildBatch(
Expand All @@ -222,12 +223,12 @@ func (r *ExecutionReportingPlugin) getExecutableObservations(ctx context.Context
tokenExecData.sourceToDestTokens)
if len(batch) != 0 {
lggr.Infow("Execution batch created", "batchSize", len(batch), "messageStates", msgExecStates)
return batch, nil
return batch, merkleRoot, nil
}
r.commitRootsCache.Snooze(merkleRoot)
}
}
return []ccip.ObservedMessage{}, nil
return []ccip.ObservedMessage{}, [32]byte{}, nil
}

// Calculates a map that indicates whether a sequence number has already been executed.
Expand Down Expand Up @@ -406,11 +407,11 @@ func (r *ExecutionReportingPlugin) getReportsWithSendRequests(

// Assumes non-empty report. Messages to execute can span more than one report, but are assumed to be in order of increasing
// sequence number.
func (r *ExecutionReportingPlugin) buildReport(ctx context.Context, lggr logger.Logger, observedMessages []ccip.ObservedMessage) ([]byte, error) {
func (r *ExecutionReportingPlugin) buildReport(ctx context.Context, lggr logger.Logger, observedMessages []ccip.ObservedMessage, root [32]byte) ([]byte, error) {
if err := validateSeqNumbers(ctx, r.commitStoreReader, observedMessages); err != nil {
return nil, err
}
commitReport, err := getCommitReportForSeqNum(ctx, r.commitStoreReader, observedMessages[0].SeqNr)
commitReport, err := getCommitReportForRoot(ctx, r.commitStoreReader, root)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -482,15 +483,15 @@ func (r *ExecutionReportingPlugin) Report(ctx context.Context, timestamp types.R
return false, nil, nil
}

observedMessages, err := calculateObservedMessagesConsensus(parsableObservations, r.F)
observedMessages, root, err := calculateObservedMessagesConsensus(parsableObservations, r.F)
if err != nil {
return false, nil, err
}
if len(observedMessages) == 0 {
return false, nil, nil
}

report, err := r.buildReport(ctx, lggr, observedMessages)
report, err := r.buildReport(ctx, lggr, observedMessages, root)
if err != nil {
return false, nil, err
}
Expand All @@ -508,13 +509,13 @@ type tallyVal struct {
tokenData [][]byte
}

func calculateObservedMessagesConsensus(observations []ccip.ExecutionObservation, f int) ([]ccip.ObservedMessage, error) {
func calculateObservedMessagesConsensus(observations []ccip.ExecutionObservation, f int) ([]ccip.ObservedMessage, [32]byte, error) {
tally := make(map[tallyKey]tallyVal)
for _, obs := range observations {
for seqNr, msgData := range obs.Messages {
tokenDataHash, err := hashutil.BytesOfBytesKeccak(msgData.TokenData)
if err != nil {
return nil, fmt.Errorf("bytes of bytes keccak: %w", err)
return nil, [32]byte{}, fmt.Errorf("bytes of bytes keccak: %w", err)
}

key := tallyKey{seqNr: seqNr, tokenDataHash: tokenDataHash}
Expand Down Expand Up @@ -555,7 +556,18 @@ func calculateObservedMessagesConsensus(observations []ccip.ExecutionObservation
sort.Slice(finalSequenceNumbers, func(i, j int) bool {
return finalSequenceNumbers[i].SeqNr < finalSequenceNumbers[j].SeqNr
})
return finalSequenceNumbers, nil

root := [32]byte{}
for _, seqNr := range finalSequenceNumbers {
for _, obs := range observations {
if _, ok := obs.Messages[seqNr.SeqNr]; ok {
root = obs.Root
break
}
}
}

return finalSequenceNumbers, root, nil
}

func (r *ExecutionReportingPlugin) ShouldAcceptFinalizedReport(ctx context.Context, timestamp types.ReportTimestamp, report types.Report) (bool, error) {
Expand Down
18 changes: 8 additions & 10 deletions core/services/ocr2/plugins/ccip/ccipexec/ocr2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,14 +414,12 @@ func TestExecutionReportingPlugin_buildReport(t *testing.T) {
commitStore.On("VerifyExecutionReport", mock.Anything, mock.Anything, mock.Anything).Return(true, nil)
commitStore.On("GetExpectedNextSequenceNumber", mock.Anything).
Return(executionReport.Messages[len(executionReport.Messages)-1].SequenceNumber+1, nil)
commitStore.On("GetCommitReportMatchingSeqNum", ctx, observations[0].SeqNr, 0).
Return([]cciptypes.CommitStoreReportWithTxMeta{
{
CommitStoreReport: cciptypes.CommitStoreReport{
Interval: cciptypes.CommitStoreInterval{
Min: observations[0].SeqNr,
Max: observations[len(observations)-1].SeqNr,
},
commitStore.On("GetCommitReport", ctx, mock.Anything).
Return(cciptypes.CommitStoreReportWithTxMeta{
CommitStoreReport: cciptypes.CommitStoreReport{
Interval: cciptypes.CommitStoreInterval{
Min: observations[0].SeqNr,
Max: observations[len(observations)-1].SeqNr,
},
},
}, nil)
Expand Down Expand Up @@ -456,7 +454,7 @@ func TestExecutionReportingPlugin_buildReport(t *testing.T) {
ctx, observations[0].SeqNr, observations[len(observations)-1].SeqNr, false).Return(sendReqs, nil)
p.onRampReader = sourceReader

execReport, err := p.buildReport(ctx, p.lggr, observations)
execReport, err := p.buildReport(ctx, p.lggr, observations, utils.RandomBytes32())
assert.NoError(t, err)
assert.LessOrEqual(t, len(execReport), MaxExecutionReportLength, "built execution report length")
}
Expand Down Expand Up @@ -689,7 +687,7 @@ func Test_calculateObservedMessagesConsensus(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
res, err := calculateObservedMessagesConsensus(
res, _, err := calculateObservedMessagesConsensus(
tt.args.observations,
tt.args.f,
)
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,10 @@ func (c *CommitStore) parseReport(log types.Log) (*cciptypes.CommitStoreReport,
}, nil
}

func (c *CommitStore) GetCommitReport(ctx context.Context, root [32]byte) (cciptypes.CommitStoreReportWithTxMeta, error) {
return cciptypes.CommitStoreReportWithTxMeta{}, nil
}

func (c *CommitStore) GetCommitReportMatchingSeqNum(ctx context.Context, seqNr uint64, confs int) ([]cciptypes.CommitStoreReportWithTxMeta, error) {
logs, err := c.lp.LogsDataWordBetween(
ctx,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,54 @@ func (c *CommitStore) parseReport(log types.Log) (*cciptypes.CommitStoreReport,
}, nil
}

func (c *CommitStore) GetCommitReport(ctx context.Context, root [32]byte) (cciptypes.CommitStoreReportWithTxMeta, error) {
reportsQuery, err := query.Where(
c.address.String(),
logpoller.NewAddressFilter(c.address),
logpoller.NewEventSigFilter(c.reportAcceptedSig),
logpoller.NewEventByTopicFilter(uint64(c.reportAcceptedMaxSeqIndex+1), []primitives.ValueComparator{
{Value: common.BytesToHash(root[:]).Hex(), Operator: primitives.Eq},
}),
)
if err != nil {
return cciptypes.CommitStoreReportWithTxMeta{}, err
}

logs, err := c.lp.FilteredLogs(
ctx,
reportsQuery,
query.NewLimitAndSort(query.Limit{}, query.NewSortBySequence(query.Asc)),
"GetCommitReport",
)
if err != nil {
return cciptypes.CommitStoreReportWithTxMeta{}, err
}

parsedLogs, err := ccipdata.ParseLogs[cciptypes.CommitStoreReport](
logs,
c.lggr,
c.parseReport,
)
if err != nil {
return cciptypes.CommitStoreReportWithTxMeta{}, err
}

res := make([]cciptypes.CommitStoreReportWithTxMeta, 0, len(parsedLogs))
for _, log := range parsedLogs {
res = append(res, cciptypes.CommitStoreReportWithTxMeta{
TxMeta: log.TxMeta,
CommitStoreReport: log.Data,
})
}

if len(res) == 0 {
return cciptypes.CommitStoreReportWithTxMeta{}, fmt.Errorf("no roots found %x", root)
} else if len(res) == 1 {
return res[0], nil
}
return cciptypes.CommitStoreReportWithTxMeta{}, fmt.Errorf("more than one report found for root %x", root)
}

func (c *CommitStore) GetCommitReportMatchingSeqNum(ctx context.Context, seqNr uint64, confs int) ([]cciptypes.CommitStoreReportWithTxMeta, error) {
logs, err := c.lp.LogsDataWordBetween(
ctx,
Expand Down
5 changes: 3 additions & 2 deletions core/services/ocr2/plugins/ccip/observations.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func (o CommitObservation) Marshal() ([]byte, error) {
// know what you are doing.
type ExecutionObservation struct {
Messages map[uint64]MsgData `json:"messages"`
Root [32]byte `json:"root"`
}

type MsgData struct {
Expand All @@ -63,12 +64,12 @@ type ObservedMessage struct {
MsgData
}

func NewExecutionObservation(observations []ObservedMessage) ExecutionObservation {
func NewExecutionObservation(observations []ObservedMessage, root [32]byte) ExecutionObservation {
denormalized := make(map[uint64]MsgData, len(observations))
for _, o := range observations {
denormalized[o.SeqNr] = MsgData{TokenData: o.TokenData}
}
return ExecutionObservation{Messages: denormalized}
return ExecutionObservation{Messages: denormalized, Root: root}
}

func NewObservedMessage(seqNr uint64, tokenData [][]byte) ObservedMessage {
Expand Down
Loading

0 comments on commit 3aa8224

Please sign in to comment.