Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/services/ccipcapability: add the ccip capability package #1024

Closed
wants to merge 12 commits into from
210 changes: 210 additions & 0 deletions core/services/ccipcapability/delegate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
package ccipcapability

import (
"context"
"fmt"

"github.com/pkg/errors"

"github.com/smartcontractkit/chainlink-common/pkg/loop"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/ccipcapability/launcher"
"github.com/smartcontractkit/chainlink/v2/core/services/ccipcapability/oraclecreator"
cctypes "github.com/smartcontractkit/chainlink/v2/core/services/ccipcapability/types"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore/chaintype"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ocr2key"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/p2pkey"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2"
"github.com/smartcontractkit/chainlink/v2/core/services/ocrcommon"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
"github.com/smartcontractkit/chainlink/v2/plugins"
)

type RelayGetter interface {
GetIDToRelayerMap() (map[types.RelayID]loop.Relayer, error)
}

type Delegate struct {
lggr logger.Logger
registrarConfig plugins.RegistrarConfig
pipelineRunner pipeline.Runner
relayGetter RelayGetter
capRegistry cctypes.CapabilityRegistry
keystore keystore.Master
ds sqlutil.DataSource
peerWrapper *ocrcommon.SingletonPeerWrapper

isNewlyCreatedJob bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you imagine this is going to be used for? It's currently passed into the oracle creator which effectively makes it a constant.

}

func NewDelegate(
lggr logger.Logger,
registrarConfig plugins.RegistrarConfig,
pipelineRunner pipeline.Runner,
relayGetter RelayGetter,
registrySyncer cctypes.CapabilityRegistry,
keystore keystore.Master,
ds sqlutil.DataSource,
peerWrapper *ocrcommon.SingletonPeerWrapper,
) *Delegate {
return &Delegate{
lggr: lggr,
registrarConfig: registrarConfig,
pipelineRunner: pipelineRunner,
relayGetter: relayGetter,
capRegistry: registrySyncer,
ds: ds,
keystore: keystore,
peerWrapper: peerWrapper,
}
}

func (d *Delegate) JobType() job.Type {
return job.CCIP
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if the single job for both plugins is going to lead to any kind of limitations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will find out ;-)

}

func (d *Delegate) BeforeJobCreated(job.Job) {
// This is only called first time the job is created
d.isNewlyCreatedJob = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems weird (maybe after it starts being used it becomes clear though): for a newly created delegate newJob = false.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this is just standard chainlink job stuff, other jobs do this as well

}

func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) (services []job.ServiceCtx, err error) {
// In general there should only be one P2P key but the node may have multiple.
// The job spec should specify the correct P2P key to use.
peerID, err := p2pkey.MakePeerID(spec.CCIPSpec.P2PKeyID)
if err != nil {
return nil, errors.Wrapf(err, "failed to make peer ID from provided spec p2p id: %s", spec.CCIPSpec.P2PKeyID)
makramkd marked this conversation as resolved.
Show resolved Hide resolved
}

p2pID, err := d.keystore.P2P().Get(peerID)
if err != nil {
return nil, errors.Wrap(err, "failed to get all p2p keys")
}

ocrKeys := make(map[chaintype.ChainType]ocr2key.KeyBundle)
makramkd marked this conversation as resolved.
Show resolved Hide resolved
for chainType, bundleAny := range spec.CCIPSpec.OCRKeyBundleIDs {
makramkd marked this conversation as resolved.
Show resolved Hide resolved
ct := chaintype.ChainType(chainType)
if !chaintype.IsSupportedChainType(ct) {
return nil, errors.Errorf("unsupported chain type: %s", chainType)
}

bundleID, ok := bundleAny.(string)
if !ok {
return nil, errors.New("OCRKeyBundleIDs must be a map of chain types to OCR key bundle IDs")
}

bundle, err2 := d.keystore.OCR2().Get(bundleID)
if err2 != nil {
return nil, errors.Wrapf(err2, "OCR key bundle with ID %s not found", bundleID)
}

ocrKeys[ct] = bundle
}

relayers, err := d.relayGetter.GetIDToRelayerMap()
if err != nil {
return nil, errors.Wrap(err, "failed to get all relayers")
}

transmitterKeys := make(map[types.RelayID][]string)
for relayID := range relayers {
switch relayID.Network {
case types.NetworkEVM:
makramkd marked this conversation as resolved.
Show resolved Hide resolved
ethKeys, err2 := d.keystore.Eth().GetAll(ctx)
if err2 != nil {
return nil, fmt.Errorf("error getting all eth keys: %w", err2)
}

transmitterKeys[relayID] = func() (r []string) {
for _, key := range ethKeys {
r = append(r, key.String())
}
return
}()
case types.NetworkCosmos:
cosmosKeys, err2 := d.keystore.Cosmos().GetAll()
if err2 != nil {
return nil, fmt.Errorf("error getting all cosmos keys: %w", err2)
}

transmitterKeys[relayID] = func() (r []string) {
for _, key := range cosmosKeys {
r = append(r, key.String())
}
return
}()
case types.NetworkSolana:
solKey, err2 := d.keystore.Solana().GetAll()
if err2 != nil {
return nil, fmt.Errorf("error getting all solana keys: %w", err2)
}

transmitterKeys[relayID] = func() (r []string) {
for _, key := range solKey {
r = append(r, key.String())
}
return
}()
case types.NetworkStarkNet:
starkKey, err2 := d.keystore.StarkNet().GetAll()
if err2 != nil {
return nil, fmt.Errorf("error getting all stark keys: %w", err2)
}

transmitterKeys[relayID] = func() (r []string) {
for _, key := range starkKey {
r = append(r, key.String())
}
return
}()
default:
return nil, errors.Errorf("unsupported network: %s", relayID.Network)
}
}

// NOTE: we can use the same DB for all plugin instances,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This DB is not used at all, correct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or is it used internally by ocr?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is used by OCR, yes

// since all queries are scoped by config digest.
ocrDB := ocr2.NewDB(d.ds, spec.ID, 0, d.lggr)

// TODO: implement
hcr := &homeChainReader{}

oracleCreator := oraclecreator.New(
ocrKeys,
transmitterKeys,
relayers,
d.peerWrapper,
spec.ExternalJobID,
spec.ID,
d.isNewlyCreatedJob,
spec.CCIPSpec.RelayConfigs,
spec.CCIPSpec.PluginConfig,
ocrDB,
)

return []job.ServiceCtx{
hcr,
launcher.New(
spec.CCIPSpec.CapabilityVersion,
spec.CCIPBootstrapSpec.CapabilityLabelledName,
p2pID,
d.capRegistry,
d.lggr,
hcr,
oracleCreator,
),
}, nil
}

func (d *Delegate) AfterJobCreated(spec job.Job) {}

func (d *Delegate) BeforeJobDeleted(spec job.Job) {}

func (d *Delegate) OnDeleteJob(ctx context.Context, spec job.Job) error {
// TODO: shut down needed services?
return nil
}
53 changes: 53 additions & 0 deletions core/services/ccipcapability/home_chain_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package ccipcapability

import (
"context"

"github.com/smartcontractkit/chainlink/v2/core/services"
cctypes "github.com/smartcontractkit/chainlink/v2/core/services/ccipcapability/types"
)

var _ cctypes.HomeChainReader = (*homeChainReader)(nil)
var _ services.ServiceCtx = (*homeChainReader)(nil)

type homeChainReader struct{}

// Close implements services.Service.
func (h *homeChainReader) Close() error {
panic("unimplemented")
}

// HealthReport implements services.Service.
func (h *homeChainReader) HealthReport() map[string]error {
panic("unimplemented")
}

// Name implements services.Service.
func (h *homeChainReader) Name() string {
panic("unimplemented")
}

// Ready implements services.Service.
func (h *homeChainReader) Ready() error {
panic("unimplemented")
}

// Start implements services.Service.
func (h *homeChainReader) Start(context.Context) error {
panic("unimplemented")
}

// GetAllChainConfigs implements HomeChainReader.
func (h *homeChainReader) GetAllChainConfigs(ctx context.Context) (map[uint64]cctypes.ChainConfig, error) {
panic("unimplemented")
}

// GetOCRConfigs implements HomeChainReader.
func (h *homeChainReader) GetOCRConfigs(ctx context.Context, donID uint32, pluginType cctypes.PluginType) ([]cctypes.OCRConfig, error) {
panic("unimplemented")
}

// IsHealthy implements HomeChainReader.
func (h *homeChainReader) IsHealthy() bool {
panic("unimplemented")
}
59 changes: 59 additions & 0 deletions core/services/ccipcapability/launcher/bluegreen.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package launcher

import (
"go.uber.org/multierr"

cctypes "github.com/smartcontractkit/chainlink/v2/core/services/ccipcapability/types"
)

// blueGreenDeployment represents a blue-green deployment of OCR instances.
type blueGreenDeployment struct {
Comment on lines +11 to +12
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where are capabilities actually deployed? Seems weird to have this built into the node.

Currently it seems like this only manages switching between configs. It might be nice to switch between LOOPPs as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eventually it'll switch between LOOPPs, that LOOPP or not factor is hidden behind the OracleCreator interface.

Capabilities are created on the capability registry, DONs that support a capability are also created there, so the capability registry is the source of truth on that.

// blue is the blue OCR instance.
// blue must always be present.
blue cctypes.CCIPOracle

// green is the green OCR instance.
// green may or may not be present.
// green must never be present if blue is not present.
// TODO: should we enforce this invariant somehow?
green cctypes.CCIPOracle
}

// ccipDeployment represents blue-green deployments of both commit and exec
// OCR instances.
type ccipDeployment struct {
commit blueGreenDeployment
exec blueGreenDeployment
}

// Shutdown shuts down all OCR instances in the deployment.
func (c *ccipDeployment) Shutdown() error {
var err error

err = multierr.Append(err, c.commit.blue.Shutdown())
if c.commit.green != nil {
err = multierr.Append(err, c.commit.green.Shutdown())
}

err = multierr.Append(err, c.exec.blue.Shutdown())
if c.exec.green != nil {
err = multierr.Append(err, c.exec.green.Shutdown())
}
return err
}

// NumCommitInstances returns the number of commit OCR instances in the deployment.
func (c *ccipDeployment) NumCommitInstances() int {
makramkd marked this conversation as resolved.
Show resolved Hide resolved
if c.commit.green != nil {
return 2
}
return 1
}

// NumExecInstances returns the number of exec OCR instances in the deployment.
func (c *ccipDeployment) NumExecInstances() int {
if c.exec.green != nil {
return 2
}
return 1
}
Loading
Loading