Skip to content

Commit

Permalink
Merge upstream (#689)
Browse files Browse the repository at this point in the history
## Motivation


## Solution
  • Loading branch information
RensR authored Apr 9, 2024
2 parents 53940f2 + 02ecbf6 commit 3396054
Show file tree
Hide file tree
Showing 47 changed files with 421 additions and 233 deletions.
15 changes: 12 additions & 3 deletions core/services/ocr2/plugins/ccip/ccipcommit/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/smartcontractkit/libocr/offchainreporting2plus/types"

cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip"

"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipcalc"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata"
Expand Down Expand Up @@ -67,7 +68,7 @@ func (rf *CommitReportingPluginFactory) UpdateDynamicReaders(ctx context.Context
func (rf *CommitReportingPluginFactory) NewReportingPlugin(config types.ReportingPluginConfig) (types.ReportingPlugin, types.ReportingPluginInfo, error) {
ctx := context.Background() // todo: consider adding some timeout

destPriceReg, err := rf.config.commitStore.ChangeConfig(config.OnchainConfig, config.OffchainConfig)
destPriceReg, err := rf.config.commitStore.ChangeConfig(ctx, config.OnchainConfig, config.OffchainConfig)
if err != nil {
return nil, types.ReportingPluginInfo{}, err
}
Expand All @@ -80,7 +81,15 @@ func (rf *CommitReportingPluginFactory) NewReportingPlugin(config types.Reportin
return nil, types.ReportingPluginInfo{}, err
}

pluginOffChainConfig := rf.config.commitStore.OffchainConfig()
pluginOffChainConfig, err := rf.config.commitStore.OffchainConfig(ctx)
if err != nil {
return nil, types.ReportingPluginInfo{}, err
}

gasPriceEstimator, err := rf.config.commitStore.GasPriceEstimator(ctx)
if err != nil {
return nil, types.ReportingPluginInfo{}, err
}

lggr := rf.config.lggr.Named("CommitReportingPlugin")
return &CommitReportingPlugin{
Expand All @@ -93,7 +102,7 @@ func (rf *CommitReportingPluginFactory) NewReportingPlugin(config types.Reportin
lggr: lggr,
destPriceRegistryReader: rf.destPriceRegReader,
offRampReaders: rf.config.offRamps,
gasPriceEstimator: rf.config.commitStore.GasPriceEstimator(),
gasPriceEstimator: gasPriceEstimator,
offchainConfig: pluginOffChainConfig,
metricsCollector: rf.config.metricsCollector,
chainHealthcheck: rf.config.chainHealthcheck,
Expand Down
2 changes: 1 addition & 1 deletion core/services/ocr2/plugins/ccip/ccipcommit/initializers.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func jobSpecToCommitPluginConfig(ctx context.Context, lggr logger.Logger, jb job
destOffRampReaders = append(destOffRampReaders, destOffRampReader)
}

onRampRouterAddr, err := onRampReader.RouterAddress()
onRampRouterAddr, err := onRampReader.RouterAddress(ctx)
if err != nil {
return nil, nil, nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions core/services/ocr2/plugins/ccip/ccipcommit/ocr2.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ func (r *CommitReportingPlugin) Report(ctx context.Context, epochAndRound types.
if err != nil {
return false, nil, err
}
encodedReport, err := r.commitStoreReader.EncodeCommitReport(report)
encodedReport, err := r.commitStoreReader.EncodeCommitReport(ctx, report)
if err != nil {
return false, nil, err
}
Expand Down Expand Up @@ -632,7 +632,7 @@ func (r *CommitReportingPlugin) buildReport(ctx context.Context, lggr logger.Log
}

func (r *CommitReportingPlugin) ShouldAcceptFinalizedReport(ctx context.Context, reportTimestamp types.ReportTimestamp, report types.Report) (bool, error) {
parsedReport, err := r.commitStoreReader.DecodeCommitReport(report)
parsedReport, err := r.commitStoreReader.DecodeCommitReport(ctx, report)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -669,7 +669,7 @@ func (r *CommitReportingPlugin) ShouldAcceptFinalizedReport(ctx context.Context,
// ShouldTransmitAcceptedReport checks if the report is stale, if it is it should not be transmitted.
func (r *CommitReportingPlugin) ShouldTransmitAcceptedReport(ctx context.Context, reportTimestamp types.ReportTimestamp, report types.Report) (bool, error) {
lggr := r.lggr.Named("CommitShouldTransmitAcceptedReport")
parsedReport, err := r.commitStoreReader.DecodeCommitReport(report)
parsedReport, err := r.commitStoreReader.DecodeCommitReport(ctx, report)
if err != nil {
return false, err
}
Expand Down
15 changes: 8 additions & 7 deletions core/services/ocr2/plugins/ccip/ccipcommit/ocr2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/config"

cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas/mocks"
mocks2 "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller/mocks"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils"
Expand Down Expand Up @@ -614,7 +615,7 @@ func TestCommitReportingPlugin_ShouldAcceptFinalizedReport(t *testing.T) {

commitStoreReader := ccipdatamocks.NewCommitStoreReader(t)
p.commitStoreReader = commitStoreReader
commitStoreReader.On("DecodeCommitReport", encodedReport).
commitStoreReader.On("DecodeCommitReport", mock.Anything, encodedReport).
Return(cciptypes.CommitStoreReport{}, errors.New("unable to decode report"))

_, err := p.ShouldAcceptFinalizedReport(ctx, types.ReportTimestamp{}, encodedReport)
Expand All @@ -628,7 +629,7 @@ func TestCommitReportingPlugin_ShouldAcceptFinalizedReport(t *testing.T) {

commitStoreReader := ccipdatamocks.NewCommitStoreReader(t)
p.commitStoreReader = commitStoreReader
commitStoreReader.On("DecodeCommitReport", mock.Anything).Return(report, nil)
commitStoreReader.On("DecodeCommitReport", mock.Anything, mock.Anything).Return(report, nil)

chainHealthCheck := ccipcachemocks.NewChainHealthcheck(t)
chainHealthCheck.On("IsHealthy", ctx).Return(true, nil).Maybe()
Expand All @@ -654,7 +655,7 @@ func TestCommitReportingPlugin_ShouldAcceptFinalizedReport(t *testing.T) {
MerkleRoot: [32]byte{123}, // this report is considered non-empty since it has a merkle root
}

commitStoreReader.On("DecodeCommitReport", mock.Anything).Return(report, nil)
commitStoreReader.On("DecodeCommitReport", mock.Anything, mock.Anything).Return(report, nil)
commitStoreReader.On("GetExpectedNextSequenceNumber", mock.Anything).Return(onChainSeqNum, nil)

chainHealthCheck := ccipcachemocks.NewChainHealthcheck(t)
Expand Down Expand Up @@ -702,7 +703,7 @@ func TestCommitReportingPlugin_ShouldAcceptFinalizedReport(t *testing.T) {
},
MerkleRoot: [32]byte{123},
}
commitStoreReader.On("DecodeCommitReport", mock.Anything).Return(report, nil)
commitStoreReader.On("DecodeCommitReport", mock.Anything, mock.Anything).Return(report, nil)
commitStoreReader.On("GetExpectedNextSequenceNumber", mock.Anything).Return(onChainSeqNum, nil)

// non-stale since report interval is not behind on-chain seq num
Expand Down Expand Up @@ -752,7 +753,7 @@ func TestCommitReportingPlugin_ShouldTransmitAcceptedReport(t *testing.T) {
report.Interval = cciptypes.CommitStoreInterval{Min: onChainSeqNum, Max: onChainSeqNum + 10}
encodedReport, err := encodeCommitReport(report)
assert.NoError(t, err)
commitStoreReader.On("DecodeCommitReport", encodedReport).Return(report, nil).Once()
commitStoreReader.On("DecodeCommitReport", mock.Anything, encodedReport).Return(report, nil).Once()
shouldTransmit, err := p.ShouldTransmitAcceptedReport(ctx, types.ReportTimestamp{}, encodedReport)
assert.NoError(t, err)
assert.True(t, shouldTransmit)
Expand All @@ -763,15 +764,15 @@ func TestCommitReportingPlugin_ShouldTransmitAcceptedReport(t *testing.T) {
report.Interval = cciptypes.CommitStoreInterval{Min: onChainSeqNum - 2, Max: onChainSeqNum + 10}
encodedReport, err := encodeCommitReport(report)
assert.NoError(t, err)
commitStoreReader.On("DecodeCommitReport", encodedReport).Return(report, nil).Once()
commitStoreReader.On("DecodeCommitReport", mock.Anything, encodedReport).Return(report, nil).Once()
shouldTransmit, err := p.ShouldTransmitAcceptedReport(ctx, types.ReportTimestamp{}, encodedReport)
assert.NoError(t, err)
assert.False(t, shouldTransmit)
})

t.Run("error when report cannot be decoded", func(t *testing.T) {
reportBytes := []byte("whatever")
commitStoreReader.On("DecodeCommitReport", reportBytes).
commitStoreReader.On("DecodeCommitReport", mock.Anything, reportBytes).
Return(cciptypes.CommitStoreReport{}, errors.New("decode error")).Once()
_, err := p.ShouldTransmitAcceptedReport(ctx, types.ReportTimestamp{}, reportBytes)
assert.Error(t, err)
Expand Down
3 changes: 2 additions & 1 deletion core/services/ocr2/plugins/ccip/ccipexec/initializers.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
commonlogger "github.com/smartcontractkit/chainlink-common/pkg/logger"

cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip"

"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/cache"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipcalc"

Expand Down Expand Up @@ -194,7 +195,7 @@ func jobSpecToExecPluginConfig(ctx context.Context, lggr logger.Logger, jb job.J
if err != nil {
return nil, nil, nil, nil, errors.Wrap(err, "create onramp reader")
}
dynamicOnRampConfig, err := onRampReader.GetDynamicConfig()
dynamicOnRampConfig, err := onRampReader.GetDynamicConfig(ctx)
if err != nil {
return nil, nil, nil, nil, errors.Wrap(err, "get onramp dynamic config")
}
Expand Down
5 changes: 2 additions & 3 deletions core/services/ocr2/plugins/ccip/internal/cache/autosync.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/pkg/errors"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
)

type AutoSync[T any] interface {
Expand Down Expand Up @@ -90,7 +89,7 @@ func (c *LogpollerEventsBased[T]) hasExpired(ctx context.Context) (expired bool,
// NOTE: latest block should be fetched before LatestBlockByEventSigsAddrsWithConfs
// Otherwise there might be new events between LatestBlockByEventSigsAddrsWithConfs and
// latestBlock which will be missed.
latestBlock, err := c.logPoller.LatestBlock(pg.WithParentCtx(ctx))
latestBlock, err := c.logPoller.LatestBlock(ctx)
latestFinalizedBlock := int64(0)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return false, 0, fmt.Errorf("get latest log poller block: %w", err)
Expand All @@ -106,11 +105,11 @@ func (c *LogpollerEventsBased[T]) hasExpired(ctx context.Context) (expired bool,
}

blockOfLatestEvent, err = c.logPoller.LatestBlockByEventSigsAddrsWithConfs(
ctx,
blockOfCurrentValue,
c.observedEvents,
[]common.Address{c.address},
logpoller.Finalized,
pg.WithParentCtx(ctx),
)
if err != nil {
return false, 0, fmt.Errorf("get latest events form lp: %w", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,11 @@ func TestLogpollerEventsBased(t *testing.T) {
if round.stateLatestBlock > 0 {
lp.On(
"LatestBlockByEventSigsAddrsWithConfs",
mock.Anything,
round.stateLatestBlock,
observedEvents,
[]common.Address{contractAddress},
logpoller.Finalized,
mock.Anything,
).Return(round.latestEventBlock, nil).Once()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,10 @@ func (f *fakeStatusWrapper) IsSourceCursed(context.Context) (bool, error) {
return !f.healthy, f.err
}

func (f *fakeStatusWrapper) Close() error {
return nil
}

func (f *fakeStatusWrapper) set(healthy bool, err error) {
f.mu.Lock()
defer f.mu.Unlock()
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -186,3 +186,7 @@ func getBatchedTypeAndVersion(ctx context.Context, evmBatchCaller rpclib.EvmBatc
}
return result, nil
}

func (br *EVMTokenPoolBatchedReader) Close() error {
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/config"

cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets"
evmclientmocks "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client/mocks"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas"
Expand Down Expand Up @@ -139,6 +140,7 @@ func TestCommitOnchainConfig(t *testing.T) {

func TestCommitStoreReaders(t *testing.T) {
user, ec := newSim(t)
ctx := testutils.Context(t)
lggr := logger.TestLogger(t)
lpOpts := logpoller.Opts{
PollPeriod: 100 * time.Millisecond,
Expand All @@ -147,7 +149,7 @@ func TestCommitStoreReaders(t *testing.T) {
RpcBatchSize: 2,
KeepFinalizedBlocksDepth: 1000,
}
lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.SimulatedChainID, pgtest.NewSqlxDB(t), lggr, pgtest.NewQConfig(true)), ec, lggr, lpOpts)
lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.SimulatedChainID, pgtest.NewSqlxDB(t), lggr), ec, lggr, lpOpts)

// Deploy 2 commit store versions
onramp1 := utils.RandomAddress()
Expand Down Expand Up @@ -242,11 +244,11 @@ func TestCommitStoreReaders(t *testing.T) {
commitAndGetBlockTs(ec)

// Apply report
b, err := c10r.EncodeCommitReport(rep)
b, err := c10r.EncodeCommitReport(ctx, rep)
require.NoError(t, err)
_, err = ch.Report(user, b, er)
require.NoError(t, err)
b, err = c12r.EncodeCommitReport(rep)
b, err = c12r.EncodeCommitReport(ctx, rep)
require.NoError(t, err)
_, err = ch2.Report(user, b, er)
require.NoError(t, err)
Expand Down Expand Up @@ -282,9 +284,9 @@ func TestCommitStoreReaders(t *testing.T) {
require.NotNil(t, cfg)

// Assert encoding
b, err := cr.EncodeCommitReport(rep)
b, err := cr.EncodeCommitReport(ctx, rep)
require.NoError(t, err)
d, err := cr.DecodeCommitReport(b)
d, err := cr.DecodeCommitReport(ctx, b)
require.NoError(t, err)
assert.Equal(t, d, rep)

Expand Down Expand Up @@ -345,13 +347,20 @@ func TestCommitStoreReaders(t *testing.T) {
assert.Equal(t, reps[0].TokenPrices, rep.TokenPrices)

// Until we detect the config, we'll have empty offchain config
assert.Equal(t, cr.OffchainConfig(), cciptypes.CommitOffchainConfig{})
newPr, err := cr.ChangeConfig(configs[v][0], configs[v][1])
c1, err := cr.OffchainConfig(ctx)
require.NoError(t, err)
assert.Equal(t, c1, cciptypes.CommitOffchainConfig{})
newPr, err := cr.ChangeConfig(ctx, configs[v][0], configs[v][1])
require.NoError(t, err)
assert.Equal(t, ccipcalc.EvmAddrToGeneric(prs[v]), newPr)
assert.Equal(t, commonOffchain, cr.OffchainConfig())

c2, err := cr.OffchainConfig(ctx)
require.NoError(t, err)
assert.Equal(t, commonOffchain, c2)
// We should be able to query for gas prices now.
gp, err := cr.GasPriceEstimator().GetGasPrice(context.Background())
gpe, err := cr.GasPriceEstimator(ctx)
require.NoError(t, err)
gp, err := gpe.GetGasPrice(context.Background())
require.NoError(t, err)
assert.True(t, gp.Cmp(big.NewInt(0)) > 0)
})
Expand Down Expand Up @@ -389,7 +398,7 @@ func TestNewCommitStoreReader(t *testing.T) {
addr := ccipcalc.EvmAddrToGeneric(utils.RandomAddress())
lp := lpmocks.NewLogPoller(t)
if tc.expectedErr == "" {
lp.On("RegisterFilter", mock.Anything).Return(nil)
lp.On("RegisterFilter", mock.Anything, mock.Anything).Return(nil)
}
_, err = factory.NewCommitStoreReader(logger.TestLogger(t), factory.NewEvmVersionFinder(), addr, c, lp, nil, nil)
if tc.expectedErr != "" {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/pkg/errors"

cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
Expand Down Expand Up @@ -55,7 +56,7 @@ func initOrCloseCommitStoreReader(lggr logger.Logger, versionFinder VersionFinde
return nil, err
}
if closeReader {
return nil, cs.Close(pgOpts...)
return nil, cs.Close()
}
return cs, cs.RegisterFilters(pgOpts...)
case ccipdata.V1_2_0:
Expand All @@ -64,9 +65,9 @@ func initOrCloseCommitStoreReader(lggr logger.Logger, versionFinder VersionFinde
return nil, err
}
if closeReader {
return nil, cs.Close(pgOpts...)
return nil, cs.Close()
}
return cs, cs.RegisterFilters(pgOpts...)
return cs, cs.RegisterFilters()
default:
return nil, errors.Errorf("unsupported commit store version %v", version.String())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/stretchr/testify/mock"

cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
mocks2 "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller/mocks"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils"
Expand All @@ -23,13 +24,13 @@ func TestCommitStore(t *testing.T) {
addr := cciptypes.Address(utils.RandomAddress().String())
lp := mocks2.NewLogPoller(t)

lp.On("RegisterFilter", mock.Anything).Return(nil)
lp.On("RegisterFilter", mock.Anything, mock.Anything).Return(nil)
versionFinder := newMockVersionFinder(ccipconfig.CommitStore, *semver.MustParse(versionStr), nil)
_, err := NewCommitStoreReader(lggr, versionFinder, addr, nil, lp, nil, nil)
assert.NoError(t, err)

expFilterName := logpoller.FilterName(v1_0_0.EXEC_REPORT_ACCEPTS, addr)
lp.On("UnregisterFilter", expFilterName).Return(nil)
lp.On("UnregisterFilter", mock.Anything, expFilterName).Return(nil)
err = CloseCommitStoreReader(lggr, versionFinder, addr, nil, lp, nil, nil)
assert.NoError(t, err)
}
Expand Down
Loading

0 comments on commit 3396054

Please sign in to comment.