-
Notifications
You must be signed in to change notification settings - Fork 56
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* add an implementation for the job delegate * ocr instance launcher and blue/green deployment
- Loading branch information
Showing
18 changed files
with
2,141 additions
and
16 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,199 @@ | ||
package ccipcapability | ||
|
||
import ( | ||
"context" | ||
|
||
"github.com/pkg/errors" | ||
|
||
"github.com/smartcontractkit/chainlink-common/pkg/loop" | ||
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil" | ||
"github.com/smartcontractkit/chainlink-common/pkg/types" | ||
"github.com/smartcontractkit/chainlink/v2/core/logger" | ||
"github.com/smartcontractkit/chainlink/v2/core/services/ccipcapability/launcher" | ||
"github.com/smartcontractkit/chainlink/v2/core/services/ccipcapability/oraclecreator" | ||
cctypes "github.com/smartcontractkit/chainlink/v2/core/services/ccipcapability/types" | ||
"github.com/smartcontractkit/chainlink/v2/core/services/job" | ||
"github.com/smartcontractkit/chainlink/v2/core/services/keystore" | ||
"github.com/smartcontractkit/chainlink/v2/core/services/keystore/chaintype" | ||
"github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ocr2key" | ||
"github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/p2pkey" | ||
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2" | ||
"github.com/smartcontractkit/chainlink/v2/core/services/ocrcommon" | ||
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline" | ||
"github.com/smartcontractkit/chainlink/v2/plugins" | ||
) | ||
|
||
type RelayGetter interface { | ||
GetIDToRelayerMap() (map[types.RelayID]loop.Relayer, error) | ||
} | ||
|
||
type Delegate struct { | ||
lggr logger.Logger | ||
registrarConfig plugins.RegistrarConfig | ||
pipelineRunner pipeline.Runner | ||
relayGetter RelayGetter | ||
capRegistry cctypes.CapabilityRegistry | ||
keystore keystore.Master | ||
ds sqlutil.DataSource | ||
peerWrapper *ocrcommon.SingletonPeerWrapper | ||
|
||
isNewlyCreatedJob bool | ||
} | ||
|
||
func NewDelegate( | ||
lggr logger.Logger, | ||
registrarConfig plugins.RegistrarConfig, | ||
pipelineRunner pipeline.Runner, | ||
relayGetter RelayGetter, | ||
registrySyncer cctypes.CapabilityRegistry, | ||
keystore keystore.Master, | ||
ds sqlutil.DataSource, | ||
peerWrapper *ocrcommon.SingletonPeerWrapper, | ||
) *Delegate { | ||
return &Delegate{ | ||
lggr: lggr, | ||
registrarConfig: registrarConfig, | ||
pipelineRunner: pipelineRunner, | ||
relayGetter: relayGetter, | ||
capRegistry: registrySyncer, | ||
ds: ds, | ||
keystore: keystore, | ||
peerWrapper: peerWrapper, | ||
} | ||
} | ||
|
||
func (d *Delegate) JobType() job.Type { | ||
return job.CCIP | ||
} | ||
|
||
func (d *Delegate) BeforeJobCreated(job.Job) { | ||
// This is only called first time the job is created | ||
d.isNewlyCreatedJob = true | ||
} | ||
|
||
func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) (services []job.ServiceCtx, err error) { | ||
// In general there should only be one P2P key but the node may have multiple. | ||
// The job spec should specify the correct P2P key to use. | ||
peerID, err := p2pkey.MakePeerID(spec.CCIPSpec.P2PKeyID) | ||
if err != nil { | ||
return nil, errors.Wrapf(err, "failed to make peer ID from provided spec p2p id: %s", spec.CCIPSpec.P2PKeyID) | ||
} | ||
|
||
p2pID, err := d.keystore.P2P().Get(peerID) | ||
if err != nil { | ||
return nil, errors.Wrap(err, "failed to get all p2p keys") | ||
} | ||
|
||
ocrKeys := make(map[chaintype.ChainType]ocr2key.KeyBundle) | ||
for chainType, bundleAny := range spec.CCIPSpec.OCRKeyBundleIDs { | ||
ct := chaintype.ChainType(chainType) | ||
if !chaintype.IsSupportedChainType(ct) { | ||
return nil, errors.Errorf("unsupported chain type: %s", chainType) | ||
} | ||
|
||
bundleID, ok := bundleAny.(string) | ||
if !ok { | ||
return nil, errors.New("OCRKeyBundleIDs must be a map of chain types to OCR key bundle IDs") | ||
} | ||
|
||
bundle, err := d.keystore.OCR2().Get(bundleID) | ||
if err != nil { | ||
return nil, errors.Wrapf(err, "OCR key bundle with ID %s not found", bundleID) | ||
} | ||
|
||
ocrKeys[ct] = bundle | ||
} | ||
|
||
transmitterKeys := make(map[types.RelayID]string) | ||
for relayIDStr, transmitterIDAny := range spec.CCIPSpec.TransmitterIDs { | ||
var relayID types.RelayID | ||
if err := relayID.UnmarshalString(relayIDStr); err != nil { | ||
return nil, errors.Wrapf(err, "invalid relay ID specified in transmitter ids mapping: %s", relayIDStr) | ||
} | ||
|
||
transmitterID, ok := transmitterIDAny.(string) | ||
if !ok { | ||
return nil, errors.New("transmitter id is not a string") | ||
} | ||
|
||
switch relayID.Network { | ||
case types.NetworkEVM: | ||
ethKey, err := d.keystore.Eth().Get(ctx, transmitterID) | ||
if err != nil { | ||
return nil, errors.Wrapf(err, "eth transmitter key with ID %s not found", transmitterID) | ||
} | ||
|
||
transmitterKeys[relayID] = ethKey.String() | ||
case types.NetworkCosmos: | ||
cosmosKey, err := d.keystore.Cosmos().Get(transmitterID) | ||
if err != nil { | ||
return nil, errors.Wrapf(err, "cosmos transmitter key with ID %s not found", transmitterID) | ||
} | ||
|
||
transmitterKeys[relayID] = cosmosKey.String() | ||
case types.NetworkSolana: | ||
solKey, err := d.keystore.Solana().Get(transmitterID) | ||
if err != nil { | ||
return nil, errors.Wrapf(err, "solana transmitter key with ID %s not found", transmitterID) | ||
} | ||
|
||
transmitterKeys[relayID] = solKey.String() | ||
case types.NetworkStarkNet: | ||
starkKey, err := d.keystore.StarkNet().Get(transmitterID) | ||
if err != nil { | ||
return nil, errors.Wrapf(err, "starknet transmitter key with ID %s not found", transmitterID) | ||
} | ||
|
||
transmitterKeys[relayID] = starkKey.String() | ||
default: | ||
return nil, errors.Errorf("unsupported network: %s", relayID.Network) | ||
} | ||
} | ||
|
||
relayers, err := d.relayGetter.GetIDToRelayerMap() | ||
if err != nil { | ||
return nil, errors.Wrap(err, "failed to get all relayers") | ||
} | ||
|
||
// NOTE: we can use the same DB for all plugin instances, | ||
// since all queries are scoped by config digest. | ||
ocrDB := ocr2.NewDB(d.ds, spec.ID, 0, d.lggr) | ||
|
||
// TODO: implement | ||
hcr := &homeChainReader{} | ||
|
||
oracleCreator := oraclecreator.New( | ||
ocrKeys, | ||
transmitterKeys, | ||
relayers, | ||
d.peerWrapper, | ||
spec.ExternalJobID, | ||
spec.ID, | ||
d.isNewlyCreatedJob, | ||
spec.CCIPSpec.RelayConfigs, | ||
spec.CCIPSpec.PluginConfig, | ||
ocrDB, | ||
) | ||
|
||
return []job.ServiceCtx{ | ||
hcr, | ||
launcher.New( | ||
spec.CCIPSpec.CapabilityVersion, | ||
spec.CCIPBootstrapSpec.CapabilityLabelledName, | ||
p2pID, | ||
d.capRegistry, | ||
d.lggr, | ||
hcr, | ||
oracleCreator, | ||
), | ||
}, nil | ||
} | ||
|
||
func (d *Delegate) AfterJobCreated(spec job.Job) {} | ||
|
||
func (d *Delegate) BeforeJobDeleted(spec job.Job) {} | ||
|
||
func (d *Delegate) OnDeleteJob(ctx context.Context, spec job.Job) error { | ||
// TODO: shut down needed services? | ||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
package ccipcapability | ||
|
||
import ( | ||
"context" | ||
|
||
"github.com/smartcontractkit/chainlink/v2/core/services" | ||
cctypes "github.com/smartcontractkit/chainlink/v2/core/services/ccipcapability/types" | ||
) | ||
|
||
var _ cctypes.HomeChainReader = (*homeChainReader)(nil) | ||
var _ services.ServiceCtx = (*homeChainReader)(nil) | ||
|
||
type homeChainReader struct{} | ||
|
||
// Close implements services.Service. | ||
func (h *homeChainReader) Close() error { | ||
panic("unimplemented") | ||
} | ||
|
||
// HealthReport implements services.Service. | ||
func (h *homeChainReader) HealthReport() map[string]error { | ||
panic("unimplemented") | ||
} | ||
|
||
// Name implements services.Service. | ||
func (h *homeChainReader) Name() string { | ||
panic("unimplemented") | ||
} | ||
|
||
// Ready implements services.Service. | ||
func (h *homeChainReader) Ready() error { | ||
panic("unimplemented") | ||
} | ||
|
||
// Start implements services.Service. | ||
func (h *homeChainReader) Start(context.Context) error { | ||
panic("unimplemented") | ||
} | ||
|
||
// GetAllChainConfigs implements HomeChainReader. | ||
func (h *homeChainReader) GetAllChainConfigs(ctx context.Context) (map[uint64]cctypes.ChainConfig, error) { | ||
panic("unimplemented") | ||
} | ||
|
||
// GetOCRConfigs implements HomeChainReader. | ||
func (h *homeChainReader) GetOCRConfigs(ctx context.Context, donID uint32, pluginType cctypes.PluginType) ([]cctypes.OCRConfig, error) { | ||
panic("unimplemented") | ||
} | ||
|
||
// IsHealthy implements HomeChainReader. | ||
func (h *homeChainReader) IsHealthy() bool { | ||
panic("unimplemented") | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
package launcher | ||
|
||
import ( | ||
cctypes "github.com/smartcontractkit/chainlink/v2/core/services/ccipcapability/types" | ||
"go.uber.org/multierr" | ||
) | ||
|
||
// blueGreenDeployment represents a blue-green deployment of OCR instances. | ||
type blueGreenDeployment struct { | ||
// blue is the blue OCR instance. | ||
// blue must always be present. | ||
blue cctypes.CCIPOracle | ||
|
||
// green is the green OCR instance. | ||
// green may or may not be present. | ||
// green must never be present if blue is not present. | ||
// TODO: should we enforce this invariant somehow? | ||
green cctypes.CCIPOracle | ||
} | ||
|
||
// ccipDeployment represents blue-green deployments of both commit and exec | ||
// OCR instances. | ||
type ccipDeployment struct { | ||
commit blueGreenDeployment | ||
exec blueGreenDeployment | ||
} | ||
|
||
// Shutdown shuts down all OCR instances in the deployment. | ||
func (c *ccipDeployment) Shutdown() error { | ||
var err error | ||
|
||
err = multierr.Append(err, c.commit.blue.Shutdown()) | ||
if c.commit.green != nil { | ||
err = multierr.Append(err, c.commit.green.Shutdown()) | ||
} | ||
|
||
err = multierr.Append(err, c.exec.blue.Shutdown()) | ||
if c.exec.green != nil { | ||
err = multierr.Append(err, c.exec.green.Shutdown()) | ||
} | ||
return err | ||
} | ||
|
||
// NumCommitInstances returns the number of commit OCR instances in the deployment. | ||
func (c *ccipDeployment) NumCommitInstances() int { | ||
if c.commit.green != nil { | ||
return 2 | ||
} | ||
return 1 | ||
} | ||
|
||
// NumExecInstances returns the number of exec OCR instances in the deployment. | ||
func (c *ccipDeployment) NumExecInstances() int { | ||
if c.exec.green != nil { | ||
return 2 | ||
} | ||
return 1 | ||
} |
Oops, something went wrong.