Skip to content

Commit

Permalink
Exec NewReportingPlugin must not error (#894)
Browse files Browse the repository at this point in the history
## Motivation
Previously, any single error during the call to NewReportingPlugin for
the Exec plugin would cause the OCR instance to not start. Even if the
error was transient and would be resolved if NewReportingPlugin was
called again, the OCR instance would remain down.

## Solution
In order to be robust to transient errors, NewReportingPlugin is retried
until it succeeds.
  • Loading branch information
rstout authored Jun 5, 2024
1 parent 6b93217 commit 9eb2708
Show file tree
Hide file tree
Showing 17 changed files with 219 additions and 61 deletions.
5 changes: 5 additions & 0 deletions .changeset/honest-pens-flash.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"ccip": patch
---

#bugfix Exec NewReportingPlugin retries until it succeeds
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ require (
github.com/VictoriaMetrics/fastcache v1.12.1 // indirect
github.com/XSAM/otelsql v0.27.0 // indirect
github.com/armon/go-metrics v0.4.1 // indirect
github.com/avast/retry-go/v4 v4.5.1 // indirect
github.com/avast/retry-go/v4 v4.6.0 // indirect
github.com/bahlo/generic-list-go v0.2.0 // indirect
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,8 @@ github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmV
github.com/armon/go-metrics v0.4.1 h1:hR91U9KYmb6bLBYLQjyM+3j+rcd/UhE+G78SFnF8gJA=
github.com/armon/go-metrics v0.4.1/go.mod h1:E6amYzXo6aW1tqzoZGT755KkbgrJsSdpwZ+3JqfkOG4=
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/avast/retry-go/v4 v4.5.1 h1:AxIx0HGi4VZ3I02jr78j5lZ3M6x1E0Ivxa6b0pUUh7o=
github.com/avast/retry-go/v4 v4.5.1/go.mod h1:/sipNsvNB3RRuT5iNcb6h73nw3IBmXJ/H3XrCQYSOpc=
github.com/avast/retry-go/v4 v4.6.0 h1:K9xNA+KeB8HHc2aWFuLb25Offp+0iVRXEvFx8IinRJA=
github.com/avast/retry-go/v4 v4.6.0/go.mod h1:gvWlPhBVsvBbLkVGDg/KwvBv0bEkCOLRRSHKIr2PyOE=
github.com/aws/aws-sdk-go v1.22.1/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go v1.23.20/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go v1.45.25 h1:c4fLlh5sLdK2DCRTY1z0hyuJZU4ygxX8m1FswL6/nF4=
Expand Down
73 changes: 50 additions & 23 deletions core/services/ocr2/plugins/ccip/ccipexec/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/smartcontractkit/libocr/offchainreporting2plus/types"

cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipcommon"

"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/cache"
Expand Down Expand Up @@ -60,36 +61,58 @@ func (rf *ExecutionReportingPluginFactory) UpdateDynamicReaders(ctx context.Cont
return nil
}

type reportingPluginAndInfo struct {
plugin types.ReportingPlugin
pluginInfo types.ReportingPluginInfo
}

// NewReportingPlugin registers a new ReportingPlugin
func (rf *ExecutionReportingPluginFactory) NewReportingPlugin(config types.ReportingPluginConfig) (types.ReportingPlugin, types.ReportingPluginInfo, error) {
ctx := context.Background() // todo: consider setting a timeout
initialRetryDelay := rf.config.newReportingPluginRetryConfig.InitialDelay
maxDelay := rf.config.newReportingPluginRetryConfig.MaxDelay

destPriceRegistry, destWrappedNative, err := rf.config.offRampReader.ChangeConfig(ctx, config.OnchainConfig, config.OffchainConfig)
if err != nil {
return nil, types.ReportingPluginInfo{}, err
}
// Open dynamic readers
err = rf.UpdateDynamicReaders(ctx, destPriceRegistry)
pluginAndInfo, err := ccipcommon.RetryUntilSuccess(rf.NewReportingPluginFn(config), initialRetryDelay, maxDelay)
if err != nil {
return nil, types.ReportingPluginInfo{}, err
}
return pluginAndInfo.plugin, pluginAndInfo.pluginInfo, err
}

offchainConfig, err := rf.config.offRampReader.OffchainConfig(ctx)
if err != nil {
return nil, types.ReportingPluginInfo{}, fmt.Errorf("get offchain config from offramp: %w", err)
}
// NewReportingPluginFn implements the NewReportingPlugin logic. It is defined as a function so that it can easily be
// retried via RetryUntilSuccess. NewReportingPlugin must return successfully in order for the Exec plugin to function,
// hence why we can only keep retrying it until it succeeds.
func (rf *ExecutionReportingPluginFactory) NewReportingPluginFn(config types.ReportingPluginConfig) func() (reportingPluginAndInfo, error) {
return func() (reportingPluginAndInfo, error) {
ctx := context.Background() // todo: consider setting a timeout

gasPriceEstimator, err := rf.config.offRampReader.GasPriceEstimator(ctx)
if err != nil {
return nil, types.ReportingPluginInfo{}, fmt.Errorf("get gas price estimator from offramp: %w", err)
}
destPriceRegistry, destWrappedNative, err := rf.config.offRampReader.ChangeConfig(ctx, config.OnchainConfig, config.OffchainConfig)
if err != nil {
return reportingPluginAndInfo{}, err
}

onchainConfig, err := rf.config.offRampReader.OnchainConfig(ctx)
if err != nil {
return nil, types.ReportingPluginInfo{}, fmt.Errorf("get onchain config from offramp: %w", err)
}
// Open dynamic readers
err = rf.UpdateDynamicReaders(ctx, destPriceRegistry)
if err != nil {
return reportingPluginAndInfo{}, err
}

offchainConfig, err := rf.config.offRampReader.OffchainConfig(ctx)
if err != nil {
return reportingPluginAndInfo{}, fmt.Errorf("get offchain config from offramp: %w", err)
}

gasPriceEstimator, err := rf.config.offRampReader.GasPriceEstimator(ctx)
if err != nil {
return reportingPluginAndInfo{}, fmt.Errorf("get gas price estimator from offramp: %w", err)
}

lggr := rf.config.lggr.Named("ExecutionReportingPlugin")
return &ExecutionReportingPlugin{
onchainConfig, err := rf.config.offRampReader.OnchainConfig(ctx)
if err != nil {
return reportingPluginAndInfo{}, fmt.Errorf("get onchain config from offramp: %w", err)
}

lggr := rf.config.lggr.Named("ExecutionReportingPlugin")
plugin := &ExecutionReportingPlugin{
F: config.F,
lggr: lggr,
offchainConfig: offchainConfig,
Expand All @@ -109,7 +132,8 @@ func (rf *ExecutionReportingPluginFactory) NewReportingPlugin(config types.Repor
commitRootsCache: cache.NewCommitRootsCache(lggr, onchainConfig.PermissionLessExecutionThresholdSeconds, offchainConfig.RootSnoozeTime.Duration()),
metricsCollector: rf.config.metricsCollector,
chainHealthcheck: rf.config.chainHealthcheck,
}, types.ReportingPluginInfo{
}
pluginInfo := types.ReportingPluginInfo{
Name: "CCIPExecution",
// Setting this to false saves on calldata since OffRamp doesn't require agreement between NOPs
// (OffRamp is only able to execute committed messages).
Expand All @@ -118,5 +142,8 @@ func (rf *ExecutionReportingPluginFactory) NewReportingPlugin(config types.Repor
MaxObservationLength: ccip.MaxObservationLength,
MaxReportLength: MaxExecutionReportLength,
},
}, nil
}

return reportingPluginAndInfo{plugin, pluginInfo}, nil
}
}
67 changes: 67 additions & 0 deletions core/services/ocr2/plugins/ccip/ccipexec/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package ccipexec

import (
"errors"
"testing"
"time"

"github.com/smartcontractkit/libocr/offchainreporting2plus/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"github.com/smartcontractkit/chainlink-common/pkg/types/ccip"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata"
ccipdataprovidermocks "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata/ccipdataprovider/mocks"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata/mocks"
)

// Assert that NewReportingPlugin keeps retrying until it succeeds.
//
// NewReportingPlugin makes several calls (e.g. OffRampReader.ChangeConfig()) that can fail. We use mocks to cause the
// first call to each of these functions to fail, then all subsequent calls succeed. We assert that NewReportingPlugin
// retries a sufficient number of times to get through the transient errors and eventually succeed.
func TestNewReportingPluginRetriesUntilSuccess(t *testing.T) {
execConfig := ExecutionPluginStaticConfig{}

// For this unit test, ensure that there is no delay between retries
execConfig.newReportingPluginRetryConfig = ccipdata.RetryConfig{
InitialDelay: 0 * time.Nanosecond,
MaxDelay: 0 * time.Nanosecond,
}

// Set up the OffRampReader mock
mockOffRampReader := new(mocks.OffRampReader)

// The first call is set to return an error, the following calls return a nil error
mockOffRampReader.On("ChangeConfig", mock.Anything, mock.Anything, mock.Anything).Return(ccip.Address(""), ccip.Address(""), errors.New("")).Once()
mockOffRampReader.On("ChangeConfig", mock.Anything, mock.Anything, mock.Anything).Return(ccip.Address("addr1"), ccip.Address("addr2"), nil).Times(5)

mockOffRampReader.On("OffchainConfig", mock.Anything).Return(ccip.ExecOffchainConfig{}, errors.New("")).Once()
mockOffRampReader.On("OffchainConfig", mock.Anything).Return(ccip.ExecOffchainConfig{}, nil).Times(3)

mockOffRampReader.On("GasPriceEstimator", mock.Anything).Return(nil, errors.New("")).Once()
mockOffRampReader.On("GasPriceEstimator", mock.Anything).Return(nil, nil).Times(2)

mockOffRampReader.On("OnchainConfig", mock.Anything).Return(ccip.ExecOnchainConfig{}, errors.New("")).Once()
mockOffRampReader.On("OnchainConfig", mock.Anything).Return(ccip.ExecOnchainConfig{}, nil).Times(1)

execConfig.offRampReader = mockOffRampReader

// Set up the PriceRegistry mock
priceRegistryProvider := new(ccipdataprovidermocks.PriceRegistry)
priceRegistryProvider.On("NewPriceRegistryReader", mock.Anything, mock.Anything).Return(nil, errors.New("")).Once()
priceRegistryProvider.On("NewPriceRegistryReader", mock.Anything, mock.Anything).Return(nil, nil).Once()
execConfig.priceRegistryProvider = priceRegistryProvider

execConfig.lggr, _ = logger.NewLogger()

factory := NewExecutionReportingPluginFactory(execConfig)
reportingConfig := types.ReportingPluginConfig{}
reportingConfig.OnchainConfig = []byte{1, 2, 3}
reportingConfig.OffchainConfig = []byte{1, 2, 3}

// Assert that NewReportingPlugin succeeds despite many transient internal failures (mocked out above)
_, _, err := factory.NewReportingPlugin(reportingConfig)
assert.Equal(t, nil, err)
}
3 changes: 1 addition & 2 deletions core/services/ocr2/plugins/ccip/ccipexec/helpers.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package ccipexec

import (
"github.com/pkg/errors"

mapset "github.com/deckarep/golang-set/v2"
"github.com/pkg/errors"

cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip"
)
Expand Down
27 changes: 15 additions & 12 deletions core/services/ocr2/plugins/ccip/ccipexec/initializers.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ import (

const numTokenDataWorkers = 5

var defaultNewReportingPluginRetryConfig = ccipdata.RetryConfig{InitialDelay: time.Second, MaxDelay: 5 * time.Minute}

func NewExecutionServices(ctx context.Context, lggr logger.Logger, jb job.Job, chainSet legacyevm.LegacyChainContainer, new bool, argsNoPlugin libocr2.OCR2OracleArgs, logError func(string)) ([]job.ServiceCtx, error) {
execPluginConfig, backfillArgs, chainHealthcheck, tokenWorker, err := jobSpecToExecPluginConfig(ctx, lggr, jb, chainSet)
if err != nil {
Expand Down Expand Up @@ -292,18 +294,19 @@ func jobSpecToExecPluginConfig(ctx context.Context, lggr logger.Logger, jb job.J
onchainConfig.PermissionLessExecutionThresholdSeconds,
)
return &ExecutionPluginStaticConfig{
lggr: execLggr,
onRampReader: onRampReader,
commitStoreReader: commitStoreReader,
offRampReader: offRampReader,
sourcePriceRegistryProvider: ccipdataprovider.NewEvmPriceRegistry(params.sourceChain.LogPoller(), params.sourceChain.Client(), execLggr, ccip.ExecPluginLabel),
sourceWrappedNativeToken: cciptypes.Address(sourceWrappedNative.String()),
destChainSelector: destChainSelector,
priceRegistryProvider: ccipdataprovider.NewEvmPriceRegistry(params.destChain.LogPoller(), params.destChain.Client(), execLggr, ccip.ExecPluginLabel),
tokenPoolBatchedReader: tokenPoolBatchedReader,
tokenDataWorker: tokenBackgroundWorker,
metricsCollector: metricsCollector,
chainHealthcheck: chainHealthcheck,
lggr: execLggr,
onRampReader: onRampReader,
commitStoreReader: commitStoreReader,
offRampReader: offRampReader,
sourcePriceRegistryProvider: ccipdataprovider.NewEvmPriceRegistry(params.sourceChain.LogPoller(), params.sourceChain.Client(), execLggr, ccip.ExecPluginLabel),
sourceWrappedNativeToken: cciptypes.Address(sourceWrappedNative.String()),
destChainSelector: destChainSelector,
priceRegistryProvider: ccipdataprovider.NewEvmPriceRegistry(params.destChain.LogPoller(), params.destChain.Client(), execLggr, ccip.ExecPluginLabel),
tokenPoolBatchedReader: tokenPoolBatchedReader,
tokenDataWorker: tokenBackgroundWorker,
metricsCollector: metricsCollector,
chainHealthcheck: chainHealthcheck,
newReportingPluginRetryConfig: defaultNewReportingPluginRetryConfig,
}, &ccipcommon.BackfillArgs{
SourceLP: params.sourceChain.LogPoller(),
DestLP: params.destChain.LogPoller(),
Expand Down
25 changes: 13 additions & 12 deletions core/services/ocr2/plugins/ccip/ccipexec/ocr2.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,19 @@ var (
)

type ExecutionPluginStaticConfig struct {
lggr logger.Logger
onRampReader ccipdata.OnRampReader
offRampReader ccipdata.OffRampReader
commitStoreReader ccipdata.CommitStoreReader
sourcePriceRegistryProvider ccipdataprovider.PriceRegistry
sourceWrappedNativeToken cciptypes.Address
tokenDataWorker tokendata.Worker
destChainSelector uint64
priceRegistryProvider ccipdataprovider.PriceRegistry // destination price registry provider.
tokenPoolBatchedReader batchreader.TokenPoolBatchedReader
metricsCollector ccip.PluginMetricsCollector
chainHealthcheck cache.ChainHealthcheck
lggr logger.Logger
onRampReader ccipdata.OnRampReader
offRampReader ccipdata.OffRampReader
commitStoreReader ccipdata.CommitStoreReader
sourcePriceRegistryProvider ccipdataprovider.PriceRegistry
sourceWrappedNativeToken cciptypes.Address
tokenDataWorker tokendata.Worker
destChainSelector uint64
priceRegistryProvider ccipdataprovider.PriceRegistry // destination price registry provider.
tokenPoolBatchedReader batchreader.TokenPoolBatchedReader
metricsCollector ccip.PluginMetricsCollector
chainHealthcheck cache.ChainHealthcheck
newReportingPluginRetryConfig ccipdata.RetryConfig
}

type ExecutionReportingPlugin struct {
Expand Down
15 changes: 15 additions & 0 deletions core/services/ocr2/plugins/ccip/internal/ccipcommon/shortcuts.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
"fmt"
"sort"
"strings"
"time"

"github.com/avast/retry-go/v4"

"golang.org/x/sync/errgroup"

Expand Down Expand Up @@ -123,3 +126,15 @@ func SelectorToBytes(chainSelector uint64) [32]byte {
binary.BigEndian.PutUint64(b[:], chainSelector)
return b
}

// RetryUntilSuccess repeatedly calls fn until it returns a nil error. After each failed call there is an exponential
// backoff applied, between initialDelay and maxDelay.
func RetryUntilSuccess[T any](fn func() (T, error), initialDelay time.Duration, maxDelay time.Duration) (T, error) {
return retry.DoWithData(
fn,
retry.Delay(initialDelay),
retry.MaxDelay(maxDelay),
retry.DelayType(retry.BackOffDelay),
retry.UntilSucceeded(),
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sort"
"strconv"
"testing"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -163,3 +164,34 @@ func TestIsTxRevertError(t *testing.T) {
})
}
}

func TestRetryUntilSuccess(t *testing.T) {
// Set delays to 0 for tests
initialDelay := 0 * time.Nanosecond
maxDelay := 0 * time.Nanosecond

numAttempts := 5
numCalls := 0
// A function that returns success only after numAttempts calls. RetryUntilSuccess will repeatedly call this
// function until it succeeds.
fn := func() (int, error) {
numCalls++
numAttempts--
if numAttempts > 0 {
return numCalls, fmt.Errorf("")
}
return numCalls, nil
}

// Assert that RetryUntilSuccess returns the expected value when fn returns success on the 5th attempt
numCalls, err := RetryUntilSuccess(fn, initialDelay, maxDelay)
assert.Nil(t, err)
assert.Equal(t, 5, numCalls)

// Assert that RetryUntilSuccess returns the expected value when fn returns success on the 8th attempt
numAttempts = 8
numCalls = 0
numCalls, err = RetryUntilSuccess(fn, initialDelay, maxDelay)
assert.Nil(t, err)
assert.Equal(t, 8, numCalls)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package ccipdata

import "time"

// RetryConfig configures an initial delay between retries and a max delay between retries
type RetryConfig struct {
InitialDelay time.Duration
MaxDelay time.Duration
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/NethermindEth/juno v0.3.1
github.com/NethermindEth/starknet.go v0.6.1-0.20231218140327-915109ab5bc1
github.com/XSAM/otelsql v0.27.0
github.com/avast/retry-go/v4 v4.5.1
github.com/avast/retry-go/v4 v4.6.0
github.com/btcsuite/btcd/btcec/v2 v2.3.2
github.com/cometbft/cometbft v0.37.2
github.com/cosmos/cosmos-sdk v0.47.4
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmV
github.com/armon/go-metrics v0.4.1 h1:hR91U9KYmb6bLBYLQjyM+3j+rcd/UhE+G78SFnF8gJA=
github.com/armon/go-metrics v0.4.1/go.mod h1:E6amYzXo6aW1tqzoZGT755KkbgrJsSdpwZ+3JqfkOG4=
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/avast/retry-go/v4 v4.5.1 h1:AxIx0HGi4VZ3I02jr78j5lZ3M6x1E0Ivxa6b0pUUh7o=
github.com/avast/retry-go/v4 v4.5.1/go.mod h1:/sipNsvNB3RRuT5iNcb6h73nw3IBmXJ/H3XrCQYSOpc=
github.com/avast/retry-go/v4 v4.6.0 h1:K9xNA+KeB8HHc2aWFuLb25Offp+0iVRXEvFx8IinRJA=
github.com/avast/retry-go/v4 v4.6.0/go.mod h1:gvWlPhBVsvBbLkVGDg/KwvBv0bEkCOLRRSHKIr2PyOE=
github.com/aws/aws-sdk-go v1.22.1/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go v1.23.20/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go v1.45.25 h1:c4fLlh5sLdK2DCRTY1z0hyuJZU4ygxX8m1FswL6/nF4=
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ require (
github.com/armon/go-metrics v0.4.1 // indirect
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect
github.com/avast/retry-go v3.0.0+incompatible // indirect
github.com/avast/retry-go/v4 v4.5.1 // indirect
github.com/avast/retry-go/v4 v4.6.0 // indirect
github.com/aws/aws-sdk-go v1.45.25 // indirect
github.com/aws/constructs-go/constructs/v10 v10.1.255 // indirect
github.com/aws/jsii-runtime-go v1.75.0 // indirect
Expand Down
Loading

0 comments on commit 9eb2708

Please sign in to comment.