Skip to content

Commit

Permalink
chainHealthcheck + token background worker now managed by reporting p…
Browse files Browse the repository at this point in the history
…lugin factory
  • Loading branch information
patrickhuie19 committed Jul 9, 2024
1 parent 3512d93 commit 230a359
Show file tree
Hide file tree
Showing 8 changed files with 139 additions and 22 deletions.
44 changes: 35 additions & 9 deletions core/services/ocr2/plugins/ccip/ccipexec/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ import (
"fmt"
"sync"

"go.uber.org/multierr"

"github.com/smartcontractkit/chainlink-common/pkg/services"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/observability"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata"
Expand All @@ -27,21 +31,37 @@ type ExecutionReportingPluginFactory struct {
destPriceRegReader ccipdata.PriceRegistryReader
destPriceRegAddr cciptypes.Address
readersMu *sync.Mutex

services []services.Service
}

func (rf *ExecutionReportingPluginFactory) Name() string {
//TODO implement me
panic("implement me")
}

func (rf *ExecutionReportingPluginFactory) Start(ctx context.Context) error {
//TODO implement me
panic("implement me")
// Start is used to run chainHealthcheck and tokenDataWorker, which were previously passed
// back to the delegate as top level job.ServiceCtx to be managed in core alongside the reporting
// plugin factory
func (rf *ExecutionReportingPluginFactory) Start(ctx context.Context) (err error) {
rf.readersMu.Lock()
defer rf.readersMu.Unlock()
for _, service := range rf.services {
serviceErr := service.Start(ctx)
err = multierr.Append(err, serviceErr)
}
return
}

func (rf *ExecutionReportingPluginFactory) Close() error {
//TODO implement me
panic("implement me")
func (rf *ExecutionReportingPluginFactory) Close() (err error) {
rf.readersMu.Lock()
defer rf.readersMu.Unlock()
for _, service := range rf.services {
closeErr := service.Close()
err = multierr.Append(err, closeErr)
}

return
}

func (rf *ExecutionReportingPluginFactory) Ready() error {
Expand Down Expand Up @@ -161,6 +181,7 @@ func NewExecutionReportingPluginFactoryV2(ctx context.Context, lggr logger.Logge
chainHealthcheck: chainHealthcheck,
newReportingPluginRetryConfig: defaultNewReportingPluginRetryConfig,
},
services: []services.Service{chainHealthcheck, tokenBackgroundWorker},
readersMu: &sync.Mutex{},

// the fields below are initially empty and populated on demand
Expand All @@ -180,7 +201,7 @@ func NewExecutionReportingPluginFactory(config ExecutionPluginStaticConfig) *Exe
}
}

func (rf *ExecutionReportingPluginFactory) UpdateDynamicReaders(ctx context.Context, newPriceRegAddr cciptypes.Address) error {
func (rf *ExecutionReportingPluginFactory) UpdateDynamicReaders(_ context.Context, newPriceRegAddr cciptypes.Address) error {
rf.readersMu.Lock()
defer rf.readersMu.Unlock()
// TODO: Investigate use of Close() to cleanup.
Expand Down Expand Up @@ -216,7 +237,7 @@ func (rf *ExecutionReportingPluginFactory) NewReportingPlugin(config types.Repor
initialRetryDelay := rf.config.newReportingPluginRetryConfig.InitialDelay
maxDelay := rf.config.newReportingPluginRetryConfig.MaxDelay

pluginAndInfo, err := ccipcommon.RetryUntilSuccess(rf.NewReportingPluginFn(config), initialRetryDelay, maxDelay)
pluginAndInfo, err := ccipcommon.RetryUntilSuccess(rf.newReportingPluginFn(config), initialRetryDelay, maxDelay)
if err != nil {
return nil, types.ReportingPluginInfo{}, err
}
Expand All @@ -226,10 +247,15 @@ func (rf *ExecutionReportingPluginFactory) NewReportingPlugin(config types.Repor
// NewReportingPluginFn implements the NewReportingPlugin logic. It is defined as a function so that it can easily be
// retried via RetryUntilSuccess. NewReportingPlugin must return successfully in order for the Exec plugin to function,
// hence why we can only keep retrying it until it succeeds.
func (rf *ExecutionReportingPluginFactory) NewReportingPluginFn(config types.ReportingPluginConfig) func() (reportingPluginAndInfo, error) {
func (rf *ExecutionReportingPluginFactory) newReportingPluginFn(config types.ReportingPluginConfig) func() (reportingPluginAndInfo, error) {
return func() (reportingPluginAndInfo, error) {
ctx := context.Background() // todo: consider setting a timeout

// Start the chainHealthcheck and tokenDataWorker services
// Using Start, while a bit more obtuse, allows us to manage these services
// in the same process as the plugin factory in LOOP mode
err := rf.Start(ctx)

destPriceRegistry, destWrappedNative, err := rf.config.offRampReader.ChangeConfig(ctx, config.OnchainConfig, config.OffchainConfig)
if err != nil {
return reportingPluginAndInfo{}, err
Expand Down
12 changes: 4 additions & 8 deletions core/services/ocr2/plugins/ccip/ccipexec/initializers.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func NewExecServices(ctx context.Context, lggr logger.Logger, cfg plugins.Regist
loopCmd := env.CCIPExecPlugin.Cmd.Get()
loopEnabled := loopCmd != ""

var wrappedPluginFactory *ExecutionReportingPluginFactory
var pluginFactory types.ReportingPluginFactory
var err error
if loopEnabled {
// find loop command
Expand All @@ -71,15 +71,15 @@ func NewExecServices(ctx context.Context, lggr logger.Logger, cfg plugins.Regist
})
// get reporting plugin factory from loop
factoryServer := loop.NewExecutionService(lggr, grpcOpts, cmdFn, srcProvider, dstProvider, uint32(srcChainID), uint32(dstChainID), sourceTokenAddress)
// wrap into ExecutionReportingPluginFactory
pluginFactory = factoryServer
} else {
wrappedPluginFactory, err = NewExecutionReportingPluginFactoryV2(ctx, lggr, sourceTokenAddress, srcChainID, dstChainID, srcProvider, dstProvider)
pluginFactory, err = NewExecutionReportingPluginFactoryV2(ctx, lggr, sourceTokenAddress, srcChainID, dstChainID, srcProvider, dstProvider)
if err != nil {
return nil, err
}
}

argsNoPlugin.ReportingPluginFactory = promwrapper.NewPromFactory(wrappedPluginFactory, "CCIPExecution", jb.OCR2OracleSpec.Relay, big.NewInt(0).SetInt64(dstChainID))
argsNoPlugin.ReportingPluginFactory = promwrapper.NewPromFactory(pluginFactory, "CCIPExecution", jb.OCR2OracleSpec.Relay, big.NewInt(0).SetInt64(dstChainID))
argsNoPlugin.Logger = commonlogger.NewOCRWrapper(lggr, true, logError)
oracle, err := libocr2.NewOracle(argsNoPlugin)
if err != nil {
Expand All @@ -94,14 +94,10 @@ func NewExecServices(ctx context.Context, lggr logger.Logger, cfg plugins.Regist
dstProvider,
job.NewServiceAdapter(oracle),
),
wrappedPluginFactory.config.chainHealthcheck,
wrappedPluginFactory.config.tokenDataWorker,
}, nil
}
return []job.ServiceCtx{
job.NewServiceAdapter(oracle),
wrappedPluginFactory.config.chainHealthcheck,
wrappedPluginFactory.config.tokenDataWorker,
}, nil
}

Expand Down
13 changes: 11 additions & 2 deletions core/services/ocr2/plugins/ccip/internal/cache/chain_health.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/services"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata"
)

Expand All @@ -33,7 +32,7 @@ import (
//
//go:generate mockery --quiet --name ChainHealthcheck --filename chain_health_mock.go --case=underscore
type ChainHealthcheck interface {
job.ServiceCtx
services.Service
IsHealthy(ctx context.Context) (bool, error)
}

Expand Down Expand Up @@ -64,6 +63,16 @@ type chainHealthcheck struct {
backgroundCancel context.CancelFunc
}

func (c *chainHealthcheck) HealthReport() map[string]error {
//TODO implement me
panic("implement me")
}

func (c *chainHealthcheck) Name() string {
//TODO implement me
panic("implement me")
}

func NewChainHealthcheck(lggr logger.Logger, onRamp ccipdata.OnRampReader, commitStore ccipdata.CommitStoreReader) *chainHealthcheck {
ctx, cancel := context.WithCancel(context.Background())

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,21 @@ type ObservedChainHealthcheck struct {
laneHealthStatus *prometheus.GaugeVec
}

func (o *ObservedChainHealthcheck) Ready() error {
//TODO implement me
panic("implement me")
}

func (o *ObservedChainHealthcheck) HealthReport() map[string]error {
//TODO implement me
panic("implement me")
}

func (o *ObservedChainHealthcheck) Name() string {
//TODO implement me
panic("implement me")
}

func NewObservedChainHealthCheck(
chainHealthcheck ChainHealthcheck,
plugin string,
Expand Down
13 changes: 11 additions & 2 deletions core/services/ocr2/plugins/ccip/tokendata/bgworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/services"

cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
)

type msgResult struct {
Expand All @@ -22,7 +21,7 @@ type msgResult struct {
}

type Worker interface {
job.ServiceCtx
services.Service
// AddJobsFromMsgs will include the provided msgs for background processing.
AddJobsFromMsgs(ctx context.Context, msgs []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta)

Expand All @@ -46,6 +45,16 @@ type BackgroundWorker struct {
backgroundCancel context.CancelFunc
}

func (w *BackgroundWorker) HealthReport() map[string]error {
//TODO implement me
panic("implement me")
}

func (w *BackgroundWorker) Name() string {
//TODO implement me
panic("implement me")
}

func NewBackgroundWorker(
tokenDataReaders map[cciptypes.Address]Reader,
numWorkers int,
Expand Down
6 changes: 6 additions & 0 deletions core/services/relay/evm/exec_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,9 @@ func (d DstExecProvider) NewCommitStoreReader(ctx context.Context, addr cciptype
return
}

// NewOffRampReader constructs a reader for the offramp contract on the dest chain.
// The offramp address is known when the provider is constructed - by consuming it from the provider instead of at runtime
// we save ourselves wiring it through the execution reporting plugin factory grpc server + client
func (d DstExecProvider) NewOffRampReader(ctx context.Context, _ cciptypes.Address) (offRampReader cciptypes.OffRampReader, err error) {
offRampReader, err = ccip.NewOffRampReader(d.lggr, d.versionFinder, d.offRampAddress, d.client, d.lp, d.gasEstimator, &d.maxGasPrice, true)
return
Expand All @@ -299,6 +302,9 @@ func (d DstExecProvider) NewTokenDataReader(ctx context.Context, tokenAddress cc
return nil, fmt.Errorf("invalid: NewTokenDataReader called on DstExecProvider. It should only be called on SrcExecProvider")
}

// NewTokenPoolBatchedReader constructs a batched caller to read token prices from the destination pool.
// The offramp address is known when the provider is constructed - by consuming it from the provider instead of at runtime
// we save ourselves wiring it through the execution reporting plugin factory grpc server + client
func (d DstExecProvider) NewTokenPoolBatchedReader(ctx context.Context, _ cciptypes.Address, sourceChainSelector uint64) (tokenPoolBatchedReader cciptypes.TokenPoolBatchedReader, err error) {
batchCaller := ccip.NewDynamicLimitedBatchCaller(
d.lggr,
Expand Down
2 changes: 1 addition & 1 deletion plugins/cmd/chainlink-ccip-exec/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func main() {
defer close(stop)

plugin.Serve(&plugin.ServeConfig{
HandshakeConfig: loop.PluginMedianHandshakeConfig(),
HandshakeConfig: loop.PluginCCIPExecutionHandshakeConfig(),
Plugins: map[string]plugin.Plugin{
loop.CCIPExecutionLOOPName: &loop.ExecutionLoop{
PluginServer: p,
Expand Down

0 comments on commit 230a359

Please sign in to comment.