Skip to content

Commit

Permalink
addressing comments
Browse files Browse the repository at this point in the history
  • Loading branch information
0xnogo committed Jul 4, 2024
1 parent 6ebc276 commit e5fe820
Show file tree
Hide file tree
Showing 10 changed files with 86 additions and 106 deletions.
5 changes: 0 additions & 5 deletions .changeset/lucky-hotels-sin.md

This file was deleted.

73 changes: 15 additions & 58 deletions core/services/ocr2/plugins/ccip/ccipexec/batching.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
type BatchContext struct {
report commitReportWithSendRequests
inflight []InflightInternalExecutionReport
inflightAggregateValue *big.Int
lggr logger.Logger
availableDataLen int
availableGas uint64
Expand All @@ -44,18 +45,13 @@ type BatchContext struct {
offchainConfig cciptypes.ExecOffchainConfig
}

type BaseBatchingStrategy struct{}

type BatchingStrategy interface {
BuildBatch(ctx context.Context, batchCtx *BatchContext) ([]ccip.ObservedMessage, []messageExecStatus)
}

type BestEffortBatchingStrategy struct {
BaseBatchingStrategy
}
type BestEffortBatchingStrategy struct{}

type ZKOverflowBatchingStrategy struct {
BaseBatchingStrategy
statuschecker statuschecker.CCIPTransactionStatusChecker
}

Expand All @@ -64,16 +60,10 @@ func (s *BestEffortBatchingStrategy) BuildBatch(
ctx context.Context,
batchCtx *BatchContext,
) ([]ccip.ObservedMessage, []messageExecStatus) {
inflightAggregateValue, err := getInflightAggregateRateLimit(batchCtx.lggr, batchCtx.inflight, batchCtx.destTokenPricesUSD, batchCtx.sourceToDestToken)
if err != nil {
batchCtx.lggr.Errorw("Unexpected error computing inflight values", "err", err)
return []ccip.ObservedMessage{}, nil
}

batchBuilder := newBatchBuildContainer(len(batchCtx.report.sendRequestsWithMeta))
for _, msg := range batchCtx.report.sendRequestsWithMeta {
msgLggr := batchCtx.lggr.With("messageID", hexutil.Encode(msg.MessageID[:]), "seqNr", msg.SequenceNumber)
shouldAdd, status, messageMaxGas, tokenData, msgValue, err := s.performCommonChecks(ctx, batchCtx, inflightAggregateValue, msg, msgLggr)
shouldAdd, status, messageMaxGas, tokenData, msgValue, err := performCommonChecks(ctx, batchCtx, msg, msgLggr)

if err != nil {
return []ccip.ObservedMessage{}, []messageExecStatus{}
Expand Down Expand Up @@ -108,16 +98,10 @@ func (s *BestEffortBatchingStrategy) BuildBatch(
// ZKOverflowBatchingStrategy is a batching strategy for ZK chains overflowing under certain conditions.
// It is a simple batching strategy that only allows one message to be added to the batch.
// TXM is used to perform the ZK check: if the message failed the check, it will be skipped.
func (bs *ZKOverflowBatchingStrategy) BuildBatch(
func (bs ZKOverflowBatchingStrategy) BuildBatch(
ctx context.Context,
batchCtx *BatchContext,
) ([]ccip.ObservedMessage, []messageExecStatus) {
inflightAggregateValue, err := getInflightAggregateRateLimit(batchCtx.lggr, batchCtx.inflight, batchCtx.destTokenPricesUSD, batchCtx.sourceToDestToken)
if err != nil {
batchCtx.lggr.Errorw("Unexpected error computing inflight values", "err", err)
return []ccip.ObservedMessage{}, nil
}

batchBuilder := newBatchBuildContainer(1)

for _, msg := range batchCtx.report.sendRequestsWithMeta {
Expand Down Expand Up @@ -145,28 +129,22 @@ func (bs *ZKOverflowBatchingStrategy) BuildBatch(
msgLggr.Infow("No status found for message, adding to batch")
} else {
// Status(es) found for message = check if any of them is final to decide if we should add it to the batch
finalStatus := false
haveFatalStatus := false
for _, s := range statuses {
if s == types.Fatal {
msgLggr.Infow("Skipping message - ZK check failed (fatal status)")
batchBuilder.skip(msg, TXMCheckFailed)
finalStatus = true
break
}
if s == types.Finalized {
msgLggr.Infow("Skipping message - ZK check failed (final status)")
batchBuilder.skip(msg, TXMCheckFailed)
finalStatus = true
msgLggr.Infow("Skipping message - TXM status is fatal")
batchBuilder.skip(msg, TXMFatalStatus)
haveFatalStatus = true
break
}
}
if finalStatus {
if haveFatalStatus {
continue
}
msgLggr.Infow("No final status found for message, adding to batch")
}

shouldAdd, status, messageMaxGas, tokenData, msgValue, err := bs.performCommonChecks(ctx, batchCtx, inflightAggregateValue, msg, msgLggr)
shouldAdd, status, messageMaxGas, tokenData, msgValue, err := performCommonChecks(ctx, batchCtx, msg, msgLggr)

if err != nil {
return []ccip.ObservedMessage{}, []messageExecStatus{}
Expand Down Expand Up @@ -198,10 +176,9 @@ func (bs *ZKOverflowBatchingStrategy) BuildBatch(
return batchBuilder.batch, batchBuilder.statuses
}

func (bs *BaseBatchingStrategy) performCommonChecks(
func performCommonChecks(
ctx context.Context,
batchCtx *BatchContext,
inflightAggregateValue *big.Int,
msg cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta,
msgLggr logger.Logger,
) (bool, messageStatus, uint64, [][]byte, *big.Int, error) {
Expand Down Expand Up @@ -255,12 +232,12 @@ func (bs *BaseBatchingStrategy) performCommonChecks(
}

// if token limit is smaller than message value skip message
if tokensLeft, hasCapacity := hasEnoughTokens(batchCtx.aggregateTokenLimit, msgValue, inflightAggregateValue); !hasCapacity {
if tokensLeft, hasCapacity := hasEnoughTokens(batchCtx.aggregateTokenLimit, msgValue, batchCtx.inflightAggregateValue); !hasCapacity {
msgLggr.Warnw("Skipping message - aggregate token limit exceeded", "aggregateTokenLimit", tokensLeft.String(), "msgValue", msgValue.String())
return false, AggregateTokenLimitExceeded, 0, nil, nil, nil
}

tokenData, elapsed, err1 := bs.getTokenDataWithTimeout(ctx, msg, batchCtx.tokenDataRemainingDuration, batchCtx.tokenDataWorker)
tokenData, elapsed, err1 := getTokenDataWithTimeout(ctx, msg, batchCtx.tokenDataRemainingDuration, batchCtx.tokenDataWorker)
batchCtx.tokenDataRemainingDuration -= elapsed
if err1 != nil {
if errors.Is(err1, tokendata.ErrNotReady) {
Expand Down Expand Up @@ -314,7 +291,7 @@ func (bs *BaseBatchingStrategy) performCommonChecks(

// getTokenDataWithCappedLatency gets the token data for the provided message.
// Stops and returns an error if more than allowedWaitingTime is passed.
func (bs *BaseBatchingStrategy) getTokenDataWithTimeout(
func getTokenDataWithTimeout(
ctx context.Context,
msg cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta,
timeout time.Duration,
Expand Down Expand Up @@ -383,26 +360,6 @@ func validateSendRequests(sendReqs []cciptypes.EVM2EVMMessageWithTxMeta, interva
return nil
}

func getInflightAggregateRateLimit(
lggr logger.Logger,
inflight []InflightInternalExecutionReport,
destTokenPrices map[cciptypes.Address]*big.Int,
sourceToDest map[cciptypes.Address]cciptypes.Address,
) (*big.Int, error) {
inflightAggregateValue := big.NewInt(0)

for _, rep := range inflight {
for _, message := range rep.messages {
msgValue, err := aggregateTokenValue(lggr, destTokenPrices, sourceToDest, message.TokenAmounts)
if err != nil {
return nil, err
}
inflightAggregateValue.Add(inflightAggregateValue, msgValue)
}
}
return inflightAggregateValue, nil
}

func getInflightSeqNums(inflight []InflightInternalExecutionReport) mapset.Set[uint64] {
seqNums := mapset.NewSet[uint64]()
for _, report := range inflight {
Expand Down Expand Up @@ -500,7 +457,7 @@ const (
InsufficientRemainingFee messageStatus = "insufficient_remaining_fee"
AddedToBatch messageStatus = "added_to_batch"
TXMCheckError messageStatus = "txm_check_error"
TXMCheckFailed messageStatus = "txm_check_failed"
TXMFatalStatus messageStatus = "txm_fatal_status"
SkippedInflight messageStatus = "skipped_inflight"
)

Expand Down
23 changes: 11 additions & 12 deletions core/services/ocr2/plugins/ccip/ccipexec/batching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,13 @@ func TestExecutionReportingPlugin_getTokenDataWithCappedLatency(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
bs := &BaseBatchingStrategy{}
tokenDataWorker := delayedTokenDataWorker{delay: tc.workerLatency}

msg := cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{
EVM2EVMMessage: cciptypes.EVM2EVMMessage{TokenAmounts: make([]cciptypes.TokenAmount, 1)},
}

_, _, err := bs.getTokenDataWithTimeout(ctx, msg, tc.allowedWaitingTime, tokenDataWorker)
_, _, err := getTokenDataWithTimeout(ctx, msg, tc.allowedWaitingTime, tokenDataWorker)
if tc.expErr {
assert.Error(t, err)
return
Expand Down Expand Up @@ -559,7 +558,7 @@ func TestBatchingStrategies(t *testing.T) {
dstPrices: map[cciptypes.Address]*big.Int{destNative: big.NewInt(1)},
offRampNoncesBySender: map[cciptypes.Address]uint64{sender1: 0},
expectedStates: []messageExecStatus{
newMessageExecState(zkMsg1.SequenceNumber, zkMsg1.MessageID, TXMCheckFailed),
newMessageExecState(zkMsg1.SequenceNumber, zkMsg1.MessageID, TXMFatalStatus),
},
statuschecker: func(m *mockstatuschecker.CCIPTransactionStatusChecker) {
m.Mock = mock.Mock{} // reset mock
Expand All @@ -578,7 +577,7 @@ func TestBatchingStrategies(t *testing.T) {
offRampNoncesBySender: map[cciptypes.Address]uint64{sender1: 0},
expectedSeqNrs: []ccip.ObservedMessage{{SeqNr: zkMsg2.SequenceNumber}},
expectedStates: []messageExecStatus{
newMessageExecState(zkMsg1.SequenceNumber, zkMsg1.MessageID, TXMCheckFailed),
newMessageExecState(zkMsg1.SequenceNumber, zkMsg1.MessageID, TXMFatalStatus),
newMessageExecState(zkMsg2.SequenceNumber, zkMsg2.MessageID, AddedToBatch),
},
statuschecker: func(m *mockstatuschecker.CCIPTransactionStatusChecker) {
Expand All @@ -597,8 +596,8 @@ func TestBatchingStrategies(t *testing.T) {
dstPrices: map[cciptypes.Address]*big.Int{destNative: big.NewInt(1)},
offRampNoncesBySender: map[cciptypes.Address]uint64{sender1: 0},
expectedStates: []messageExecStatus{
newMessageExecState(zkMsg1.SequenceNumber, zkMsg1.MessageID, TXMCheckFailed),
newMessageExecState(zkMsg2.SequenceNumber, zkMsg2.MessageID, TXMCheckFailed),
newMessageExecState(zkMsg1.SequenceNumber, zkMsg1.MessageID, TXMFatalStatus),
newMessageExecState(zkMsg2.SequenceNumber, zkMsg2.MessageID, TXMFatalStatus),
},
statuschecker: func(m *mockstatuschecker.CCIPTransactionStatusChecker) {
m.Mock = mock.Mock{} // reset mock
Expand Down Expand Up @@ -626,7 +625,7 @@ func TestBatchingStrategies(t *testing.T) {
},
},
{
name: "message snoozed when finalized",
name: "message snoozed when multiple statuses with fatal",
reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{zkMsg1, zkMsg2},
inflight: []InflightInternalExecutionReport{},
tokenLimit: big.NewInt(0),
Expand All @@ -636,12 +635,12 @@ func TestBatchingStrategies(t *testing.T) {
offRampNoncesBySender: map[cciptypes.Address]uint64{sender1: 0},
expectedSeqNrs: []ccip.ObservedMessage{{SeqNr: zkMsg2.SequenceNumber}},
expectedStates: []messageExecStatus{
newMessageExecState(zkMsg1.SequenceNumber, zkMsg1.MessageID, TXMCheckFailed),
newMessageExecState(zkMsg1.SequenceNumber, zkMsg1.MessageID, TXMFatalStatus),
newMessageExecState(zkMsg2.SequenceNumber, zkMsg2.MessageID, AddedToBatch),
},
statuschecker: func(m *mockstatuschecker.CCIPTransactionStatusChecker) {
m.Mock = mock.Mock{} // reset mock
m.On("CheckMessageStatus", mock.Anything, zkMsg1.MessageID.String()).Return([]types.TransactionStatus{types.Unconfirmed, types.Failed, types.Finalized}, 2, nil)
m.On("CheckMessageStatus", mock.Anything, zkMsg1.MessageID.String()).Return([]types.TransactionStatus{types.Unconfirmed, types.Failed, types.Fatal}, 2, nil)
m.On("CheckMessageStatus", mock.Anything, zkMsg2.MessageID.String()).Return([]types.TransactionStatus{}, -1, nil)
},
},
Expand Down Expand Up @@ -698,7 +697,7 @@ func TestBatchingStrategies(t *testing.T) {
skipGasPriceEstimator: true,
},
{
name: "snooze when not inflight but txm returns final status",
name: "snooze when not inflight but txm returns fatal status",
reqs: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{zkMsg1},
inflight: []InflightInternalExecutionReport{},
tokenLimit: big.NewInt(0),
Expand All @@ -707,11 +706,11 @@ func TestBatchingStrategies(t *testing.T) {
dstPrices: map[cciptypes.Address]*big.Int{destNative: big.NewInt(1)},
offRampNoncesBySender: map[cciptypes.Address]uint64{sender1: 0},
expectedStates: []messageExecStatus{
newMessageExecState(zkMsg1.SequenceNumber, zkMsg1.MessageID, TXMCheckFailed),
newMessageExecState(zkMsg1.SequenceNumber, zkMsg1.MessageID, TXMFatalStatus),
},
statuschecker: func(m *mockstatuschecker.CCIPTransactionStatusChecker) {
m.Mock = mock.Mock{} // reset mock
m.On("CheckMessageStatus", mock.Anything, zkMsg1.MessageID.String()).Return([]types.TransactionStatus{types.Unconfirmed, types.Failed, types.Finalized}, 2, nil)
m.On("CheckMessageStatus", mock.Anything, zkMsg1.MessageID.String()).Return([]types.TransactionStatus{types.Unconfirmed, types.Failed, types.Fatal}, 2, nil)
},
skipGasPriceEstimator: true,
},
Expand Down
55 changes: 41 additions & 14 deletions core/services/ocr2/plugins/ccip/ccipexec/ocr2.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,16 @@ func (r *ExecutionReportingPlugin) buildBatch(
return []ccip.ObservedMessage{}, []messageExecStatus{}
}

inflightAggregateValue, err := getInflightAggregateRateLimit(lggr, inflight, destTokenPricesUSD, sourceToDestToken)
if err != nil {
lggr.Errorw("Unexpected error computing inflight values", "err", err)
return []ccip.ObservedMessage{}, nil
}

batchCtx := &BatchContext{
report,
inflight,
inflightAggregateValue,
lggr,
MaxDataLenPerBatch,
uint64(r.offchainConfig.BatchGasLimit),
Expand Down Expand Up @@ -402,20 +409,6 @@ func (r *ExecutionReportingPlugin) getReportsWithSendRequests(
return reportsWithSendReqs, nil
}

func aggregateTokenValue(lggr logger.Logger, destTokenPricesUSD map[cciptypes.Address]*big.Int, sourceToDest map[cciptypes.Address]cciptypes.Address, tokensAndAmount []cciptypes.TokenAmount) (*big.Int, error) {
sum := big.NewInt(0)
for i := 0; i < len(tokensAndAmount); i++ {
price, ok := destTokenPricesUSD[sourceToDest[tokensAndAmount[i].Token]]
if !ok {
// If we don't have a price for the token, we will assume it's worth 0.
lggr.Infof("No price for token %s, assuming 0", tokensAndAmount[i].Token)
continue
}
sum.Add(sum, new(big.Int).Quo(new(big.Int).Mul(price, tokensAndAmount[i].Amount), big.NewInt(1e18)))
}
return sum, nil
}

// Assumes non-empty report. Messages to execute can span more than one report, but are assumed to be in order of increasing
// sequence number.
func (r *ExecutionReportingPlugin) buildReport(ctx context.Context, lggr logger.Logger, observedMessages []ccip.ObservedMessage) ([]byte, error) {
Expand Down Expand Up @@ -657,6 +650,40 @@ func (r *ExecutionReportingPlugin) Close() error {
return nil
}

func getInflightAggregateRateLimit(
lggr logger.Logger,
inflight []InflightInternalExecutionReport,
destTokenPrices map[cciptypes.Address]*big.Int,
sourceToDest map[cciptypes.Address]cciptypes.Address,
) (*big.Int, error) {
inflightAggregateValue := big.NewInt(0)

for _, rep := range inflight {
for _, message := range rep.messages {
msgValue, err := aggregateTokenValue(lggr, destTokenPrices, sourceToDest, message.TokenAmounts)
if err != nil {
return nil, err
}
inflightAggregateValue.Add(inflightAggregateValue, msgValue)
}
}
return inflightAggregateValue, nil
}

func aggregateTokenValue(lggr logger.Logger, destTokenPricesUSD map[cciptypes.Address]*big.Int, sourceToDest map[cciptypes.Address]cciptypes.Address, tokensAndAmount []cciptypes.TokenAmount) (*big.Int, error) {
sum := big.NewInt(0)
for i := 0; i < len(tokensAndAmount); i++ {
price, ok := destTokenPricesUSD[sourceToDest[tokensAndAmount[i].Token]]
if !ok {
// If we don't have a price for the token, we will assume it's worth 0.
lggr.Infof("No price for token %s, assuming 0", tokensAndAmount[i].Token)
continue
}
sum.Add(sum, new(big.Int).Quo(new(big.Int).Mul(price, tokensAndAmount[i].Amount), big.NewInt(1e18)))
}
return sum, nil
}

// getTokensPrices returns token prices of the given price registry,
// price values are USD per 1e18 of smallest token denomination, in base units 1e18 (e.g. 5$ = 5e18 USD per 1e18 units).
// this function is used for price registry of both source and destination chains.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ type ExecOffchainConfig struct {
InflightCacheExpiry config.Duration
// See [ccipdata.ExecOffchainConfig.RootSnoozeTime]
RootSnoozeTime config.Duration
// See [ccipdata.ExecOffchainConfig.BatchingStrategy]
BatchingStrategyId uint8
// See [ccipdata.ExecOffchainConfig.BatchingStrategyID]
BatchingStrategyID uint8
// See [ccipdata.ExecOffchainConfig.MessageVisibilityInterval]
MessageVisibilityInterval config.Duration
}
Expand Down Expand Up @@ -418,6 +418,7 @@ func (o *OffRamp) ChangeConfig(ctx context.Context, onchainConfigBytes []byte, o
InflightCacheExpiry: offchainConfigParsed.InflightCacheExpiry,
RootSnoozeTime: offchainConfigParsed.RootSnoozeTime,
MessageVisibilityInterval: offchainConfigParsed.MessageVisibilityInterval,
// BatchingStrategyID: offchainConfigParsed.BatchingStrategyID, // TODO: add to ExecOffchainConfig (chainlink-common) a BatchingStrategyID field
}
onchainConfig := cciptypes.ExecOnchainConfig{
PermissionLessExecutionThresholdSeconds: time.Second * time.Duration(onchainConfigParsed.PermissionLessExecutionThresholdSeconds),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestExecOffchainConfig100_AllFieldsRequired(t *testing.T) {
RelativeBoostPerWaitHour: 0.07,
InflightCacheExpiry: *config.MustNewDuration(64 * time.Second),
RootSnoozeTime: *config.MustNewDuration(128 * time.Minute),
BatchingStrategyId: 0,
BatchingStrategyID: 0,
}
encoded, err := ccipconfig.EncodeOffchainConfig(&cfg)
require.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ type JSONExecOffchainConfig struct {
InflightCacheExpiry config.Duration
// See [ccipdata.ExecOffchainConfig.RootSnoozeTime]
RootSnoozeTime config.Duration
// See [ccipdata.ExecOffchainConfig.BatchingStrategy]
BatchingStrategyId uint8
// See [ccipdata.ExecOffchainConfig.BatchingStrategyID]
BatchingStrategyID uint8
// See [ccipdata.ExecOffchainConfig.MessageVisibilityInterval]
MessageVisibilityInterval config.Duration
}
Expand Down Expand Up @@ -174,6 +174,7 @@ func (o *OffRamp) ChangeConfig(ctx context.Context, onchainConfigBytes []byte, o
InflightCacheExpiry: offchainConfigParsed.InflightCacheExpiry,
RootSnoozeTime: offchainConfigParsed.RootSnoozeTime,
MessageVisibilityInterval: offchainConfigParsed.MessageVisibilityInterval,
// BatchingStrategyID: offchainConfigParsed.BatchingStrategyID, // TODO: uncomment when the field is added to the config
}
onchainConfig := cciptypes.ExecOnchainConfig{
PermissionLessExecutionThresholdSeconds: time.Second * time.Duration(onchainConfigParsed.PermissionLessExecutionThresholdSeconds),
Expand Down
Loading

0 comments on commit e5fe820

Please sign in to comment.