Skip to content

Commit

Permalink
rm offchain check for token pool rate limits (#690)
Browse files Browse the repository at this point in the history
## Motivation
Self serve will allow for token pools that do not have all the functions
we currently call offchain

After this PR we assume it will go through due to the same rate limits
being enforced onchain on send
  • Loading branch information
RensR authored Apr 8, 2024
1 parent 2aa84ec commit 264b952
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 435 deletions.
161 changes: 7 additions & 154 deletions core/services/ocr2/plugins/ccip/ccipexec/ocr2.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (r *ExecutionReportingPlugin) Observation(ctx context.Context, timestamp ty
r.inflightReports.expire(lggr)
inFlight := r.inflightReports.getAll()

executableObservations, err := r.getExecutableObservations(ctx, lggr, timestamp, inFlight)
executableObservations, err := r.getExecutableObservations(ctx, lggr, inFlight)
if err != nil {
return nil, err
}
Expand All @@ -138,7 +138,7 @@ func (r *ExecutionReportingPlugin) Observation(ctx context.Context, timestamp ty
return ccip.NewExecutionObservation(executableObservations).Marshal()
}

func (r *ExecutionReportingPlugin) getExecutableObservations(ctx context.Context, lggr logger.Logger, timestamp types.ReportTimestamp, inflight []InflightInternalExecutionReport) ([]ccip.ObservedMessage, error) {
func (r *ExecutionReportingPlugin) getExecutableObservations(ctx context.Context, lggr logger.Logger, inflight []InflightInternalExecutionReport) ([]ccip.ObservedMessage, error) {
unexpiredReports, err := r.getUnexpiredCommitReports(
ctx,
r.commitStoreReader,
Expand Down Expand Up @@ -166,14 +166,6 @@ func (r *ExecutionReportingPlugin) getExecutableObservations(ctx context.Context
return nil, err
}

getDestPoolRateLimits := cache.LazyFetch(func() (map[cciptypes.Address]*big.Int, error) {
tokenExecData, err1 := getExecTokenData()
if err1 != nil {
return nil, err1
}
return r.destPoolRateLimits(ctx, unexpiredReportsWithSendReqs, tokenExecData.sourceToDestTokens)
})

for _, unexpiredReport := range unexpiredReportsWithSendReqs {
r.tokenDataWorker.AddJobsFromMsgs(ctx, unexpiredReport.sendRequestsWithMeta)
}
Expand Down Expand Up @@ -218,11 +210,6 @@ func (r *ExecutionReportingPlugin) getExecutableObservations(ctx context.Context
return nil, err
}

destPoolRateLimits, err := getDestPoolRateLimits()
if err != nil {
return nil, err
}

batch := r.buildBatch(
ctx,
rootLggr,
Expand All @@ -232,8 +219,7 @@ func (r *ExecutionReportingPlugin) getExecutableObservations(ctx context.Context
tokenExecData.sourceTokenPrices,
tokenExecData.destTokenPrices,
tokenExecData.gasPrice,
tokenExecData.sourceToDestTokens,
destPoolRateLimits)
tokenExecData.sourceToDestTokens)
if len(batch) != 0 {
return batch, nil
}
Expand All @@ -243,71 +229,6 @@ func (r *ExecutionReportingPlugin) getExecutableObservations(ctx context.Context
return []ccip.ObservedMessage{}, nil
}

// destPoolRateLimits returns a map that consists of the rate limits of each destination token of the provided reports.
// If a token is missing from the returned map it either means that token was not found or token pool is disabled for this token.
func (r *ExecutionReportingPlugin) destPoolRateLimits(ctx context.Context, commitReports []commitReportWithSendRequests, sourceToDestToken map[cciptypes.Address]cciptypes.Address) (map[cciptypes.Address]*big.Int, error) {
tokens, err := r.offRampReader.GetTokens(ctx)
if err != nil {
return nil, fmt.Errorf("get cached token pools: %w", err)
}

dstTokenToPool := make(map[cciptypes.Address]cciptypes.Address)
dstPoolToToken := make(map[cciptypes.Address]cciptypes.Address)
dstPoolAddresses := make([]cciptypes.Address, 0)

for _, msg := range commitReports {
for _, req := range msg.sendRequestsWithMeta {
for _, tk := range req.TokenAmounts {
dstToken, exists := sourceToDestToken[tk.Token]
if !exists {
r.lggr.Warnw("token not found on destination chain", "sourceToken", tk)
continue
}

// another message with the same token exists in the report
// we skip it since we don't want to query for the rate limit twice
if _, seen := dstTokenToPool[dstToken]; seen {
continue
}

poolAddress, exists := tokens.DestinationPool[dstToken]
if !exists {
return nil, fmt.Errorf("pool for token '%s' does not exist", dstToken)
}

if tokenAddr, seen := dstPoolToToken[poolAddress]; seen {
return nil, fmt.Errorf("pool is already seen for token %s", tokenAddr)
}

dstTokenToPool[dstToken] = poolAddress
dstPoolToToken[poolAddress] = dstToken
dstPoolAddresses = append(dstPoolAddresses, poolAddress)
}
}
}

rateLimits, err := r.tokenPoolBatchedReader.GetInboundTokenPoolRateLimits(ctx, dstPoolAddresses)
if err != nil {
return nil, fmt.Errorf("fetch pool rate limits: %w", err)
}

res := make(map[cciptypes.Address]*big.Int, len(dstTokenToPool))
for i, rateLimit := range rateLimits {
// if the rate limit is disabled for this token pool then we omit it from the result
if !rateLimit.IsEnabled {
continue
}

tokenAddr, exists := dstPoolToToken[dstPoolAddresses[i]]
if !exists {
return nil, fmt.Errorf("pool to token mapping does not contain %s", dstPoolAddresses[i])
}
res[tokenAddr] = rateLimit.Tokens
}

return res, nil
}

// Calculates a map that indicates whether a sequence number has already been executed.
// It doesn't matter if the execution succeeded, since we don't retry previous
// attempts even if they failed. Value in the map indicates whether the log is finalized or not.
Expand Down Expand Up @@ -341,9 +262,8 @@ func (r *ExecutionReportingPlugin) buildBatch(
destTokenPricesUSD map[cciptypes.Address]*big.Int,
gasPrice *big.Int,
sourceToDestToken map[cciptypes.Address]cciptypes.Address,
destTokenPoolRateLimits map[cciptypes.Address]*big.Int,
) (executableMessages []ccip.ObservedMessage) {
inflightAggregateValue, inflightTokenAmounts, err := inflightAggregates(inflight, destTokenPricesUSD, sourceToDestToken)
inflightAggregateValue, err := inflightAggregates(inflight, destTokenPricesUSD, sourceToDestToken)
if err != nil {
lggr.Errorw("Unexpected error computing inflight values", "err", err)
return []ccip.ObservedMessage{}
Expand Down Expand Up @@ -385,11 +305,6 @@ func (r *ExecutionReportingPlugin) buildBatch(
continue
}

if !r.isRateLimitEnoughForTokenPool(destTokenPoolRateLimits, msg.TokenAmounts, inflightTokenAmounts, sourceToDestToken) {
msgLggr.Warnw("Skipping message token pool rate limit hit")
continue
}

msgValue, err := aggregateTokenValue(destTokenPricesUSD, sourceToDestToken, msg.TokenAmounts)
if err != nil {
msgLggr.Errorw("Skipping message unable to compute aggregate value", "err", err)
Expand Down Expand Up @@ -470,16 +385,6 @@ func (r *ExecutionReportingPlugin) buildBatch(
}
availableGas -= messageMaxGas
aggregateTokenLimit.Sub(aggregateTokenLimit, msgValue)
for _, tk := range msg.TokenAmounts {
dstToken, exists := sourceToDestToken[tk.Token]
if !exists {
msgLggr.Warnw("destination token does not exist", "token", tk.Token)
continue
}
if rl, exists := destTokenPoolRateLimits[dstToken]; exists {
destTokenPoolRateLimits[dstToken] = rl.Sub(rl, tk.Amount)
}
}

msgLggr.Infow("Adding msg to batch", "seqNr", msg.SequenceNumber, "nonce", msg.Nonce,
"value", msgValue, "aggregateTokenLimit", aggregateTokenLimit)
Expand Down Expand Up @@ -513,49 +418,6 @@ func (r *ExecutionReportingPlugin) getTokenDataWithTimeout(
return tokenData, tDur, err
}

func (r *ExecutionReportingPlugin) isRateLimitEnoughForTokenPool(
destTokenPoolRateLimits map[cciptypes.Address]*big.Int,
sourceTokenAmounts []cciptypes.TokenAmount,
inflightTokenAmounts map[cciptypes.Address]*big.Int,
sourceToDestToken map[cciptypes.Address]cciptypes.Address,
) bool {
rateLimitsCopy := make(map[cciptypes.Address]*big.Int)
for destToken, rl := range destTokenPoolRateLimits {
rateLimitsCopy[destToken] = new(big.Int).Set(rl)
}

for sourceToken, amount := range inflightTokenAmounts {
if destToken, exists := sourceToDestToken[sourceToken]; exists {
if rl, exists := rateLimitsCopy[destToken]; exists {
rateLimitsCopy[destToken] = rl.Sub(rl, amount)
}
}
}

for _, sourceToken := range sourceTokenAmounts {
destToken, exists := sourceToDestToken[sourceToken.Token]
if !exists {
r.lggr.Warnw("dest token not found", "sourceToken", sourceToken.Token)
continue
}

rl, exists := rateLimitsCopy[destToken]
if !exists {
r.lggr.Debugw("rate limit not applied to token", "token", destToken)
continue
}

if rl.Cmp(sourceToken.Amount) < 0 {
r.lggr.Warnw("token pool rate limit reached",
"token", sourceToken.Token, "destToken", destToken, "amount", sourceToken.Amount, "rateLimit", rl)
return false
}
rateLimitsCopy[destToken] = rl.Sub(rl, sourceToken.Amount)
}

return true
}

func hasEnoughTokens(tokenLimit *big.Int, msgValue *big.Int, inflightValue *big.Int) (*big.Int, bool) {
tokensLeft := big.NewInt(0).Sub(tokenLimit, inflightValue)
return tokensLeft, tokensLeft.Cmp(msgValue) >= 0
Expand Down Expand Up @@ -917,28 +779,19 @@ func inflightAggregates(
inflight []InflightInternalExecutionReport,
destTokenPrices map[cciptypes.Address]*big.Int,
sourceToDest map[cciptypes.Address]cciptypes.Address,
) (*big.Int, map[cciptypes.Address]*big.Int, error) {
) (*big.Int, error) {
inflightAggregateValue := big.NewInt(0)
inflightTokenAmounts := make(map[cciptypes.Address]*big.Int)

for _, rep := range inflight {
for _, message := range rep.messages {
msgValue, err := aggregateTokenValue(destTokenPrices, sourceToDest, message.TokenAmounts)
if err != nil {
return nil, nil, err
return nil, err
}
inflightAggregateValue.Add(inflightAggregateValue, msgValue)

for _, tk := range message.TokenAmounts {
if rl, exists := inflightTokenAmounts[tk.Token]; exists {
inflightTokenAmounts[tk.Token] = rl.Add(rl, tk.Amount)
} else {
inflightTokenAmounts[tk.Token] = new(big.Int).Set(tk.Amount)
}
}
}
}
return inflightAggregateValue, inflightTokenAmounts, nil
return inflightAggregateValue, nil
}

// getTokensPrices returns token prices of the given price registry,
Expand Down
Loading

0 comments on commit 264b952

Please sign in to comment.