Skip to content

Commit

Permalink
processing ConsensusSidecars in da-processor
Browse files Browse the repository at this point in the history
  • Loading branch information
abi87 committed Oct 28, 2024
1 parent 5276920 commit 214069d
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 31 deletions.
7 changes: 4 additions & 3 deletions beacond/cmd/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ func DefaultComponents() []any {
// *BeaconStateMarshallable, *BlockStore, *KVStore, *StorageBackend,
// ],
components.ProvideDAService[
*AvailabilityStore, *BeaconBlockBody, *BlobSidecar,
*BlobSidecars, *Logger,
*AvailabilityStore, *BeaconBlockBody, *BeaconBlockHeader,
*ConsensusSidecars, *BlobSidecar, *BlobSidecars, *Logger,
],
components.ProvideDBManager[*AvailabilityStore, *DepositStore, *Logger],
components.ProvideDepositPruner[
Expand Down Expand Up @@ -113,7 +113,8 @@ func DefaultComponents() []any {
*AvailabilityStore,
*ConsensusBlock, *BeaconBlock, *BeaconBlockBody,
*BeaconBlockHeader, *BlockStore, *BeaconState,
*BeaconStateMarshallable, *BlobSidecar, *BlobSidecars,
*BeaconStateMarshallable,
*ConsensusSidecars, *BlobSidecar, *BlobSidecars,
*Deposit, *DepositStore, *ExecutionPayload, *ExecutionPayloadHeader,
*Genesis, *KVStore, *Logger,
NodeAPIContext,
Expand Down
9 changes: 6 additions & 3 deletions beacond/cmd/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,10 @@ type (
CometBFTService = cometbft.Service[*Logger]

// DAService is a type alias for the DA service.
DAService = da.Service[*AvailabilityStore, *BlobSidecars]
DAService = da.Service[
*AvailabilityStore,
*ConsensusSidecars, *BlobSidecars, *BeaconBlockHeader,
]

// DBManager is a type alias for the database manager.
DBManager = manager.DBManager
Expand Down Expand Up @@ -269,8 +272,8 @@ type (

// BlobSidecars type aliases.
ConsensusSidecars = consruntimetypes.ConsensusSidecars[
BlobSidecars,
BeaconBlockHeader,
*BlobSidecars,
*BeaconBlockHeader,
]
BlobSidecar = datypes.BlobSidecar
BlobSidecars = datypes.BlobSidecars
Expand Down
51 changes: 29 additions & 22 deletions mod/da/pkg/da/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ import (

type Service[
AvailabilityStoreT any,
ConsensusSidecarsT ConsensusSidecars[BlobSidecarsT, BeaconBlockHeaderT],
BlobSidecarsT BlobSidecar,
BeaconBlockHeaderT any,
] struct {
avs AvailabilityStoreT
bp BlobProcessor[
Expand All @@ -44,15 +46,17 @@ type Service[
dispatcher asynctypes.EventDispatcher
logger log.Logger
// subSidecarsReceived is a channel holding SidecarsReceived events.
subSidecarsReceived chan async.Event[BlobSidecarsT]
subSidecarsReceived chan async.Event[ConsensusSidecarsT]
// subFinalBlobSidecars is a channel holding FinalSidecarsReceived events.
subFinalBlobSidecars chan async.Event[BlobSidecarsT]
}

// NewService returns a new DA service.
func NewService[
AvailabilityStoreT any,
ConsensusSidecarsT ConsensusSidecars[BlobSidecarsT, BeaconBlockHeaderT],
BlobSidecarsT BlobSidecar,
BeaconBlockHeaderT any,
](
avs AvailabilityStoreT,
bp BlobProcessor[
Expand All @@ -61,28 +65,29 @@ func NewService[
dispatcher asynctypes.EventDispatcher,
logger log.Logger,
) *Service[
AvailabilityStoreT, BlobSidecarsT,
AvailabilityStoreT, ConsensusSidecarsT, BlobSidecarsT, BeaconBlockHeaderT,
] {
return &Service[
AvailabilityStoreT, BlobSidecarsT,
AvailabilityStoreT,
ConsensusSidecarsT, BlobSidecarsT, BeaconBlockHeaderT,
]{
avs: avs,
bp: bp,
dispatcher: dispatcher,
logger: logger,
subSidecarsReceived: make(chan async.Event[BlobSidecarsT]),
subSidecarsReceived: make(chan async.Event[ConsensusSidecarsT]),
subFinalBlobSidecars: make(chan async.Event[BlobSidecarsT]),
}
}

// Name returns the name of the service.
func (s *Service[_, _]) Name() string {
func (s *Service[_, _, _, _]) Name() string {
return "da"
}

// Start subscribes the DA service to SidecarsReceived and FinalSidecarsReceived
// events and begins the main event loop to handle them accordingly.
func (s *Service[_, _]) Start(ctx context.Context) error {
func (s *Service[_, _, _, _]) Start(ctx context.Context) error {
var err error

// subscribe to SidecarsReceived events
Expand All @@ -106,7 +111,7 @@ func (s *Service[_, _]) Start(ctx context.Context) error {

// eventLoop listens and handles SidecarsReceived and FinalSidecarsReceived
// events.
func (s *Service[_, _]) eventLoop(ctx context.Context) {
func (s *Service[_, _, _, _]) eventLoop(ctx context.Context) {
for {
select {
case <-ctx.Done():
Expand All @@ -126,7 +131,7 @@ func (s *Service[_, _]) eventLoop(ctx context.Context) {
// handleFinalSidecarsReceived handles the BlobSidecarsProcessRequest
// event.
// It processes the sidecars and publishes a BlobSidecarsProcessed event.
func (s *Service[_, BlobSidecarsT]) handleFinalSidecarsReceived(
func (s *Service[_, _, BlobSidecarsT, _]) handleFinalSidecarsReceived(
msg async.Event[BlobSidecarsT],
) {
if err := s.processSidecars(msg.Context(), msg.Data()); err != nil {
Expand All @@ -140,25 +145,26 @@ func (s *Service[_, BlobSidecarsT]) handleFinalSidecarsReceived(

// handleSidecarsReceived handles the SidecarsVerifyRequest event.
// It verifies the sidecars and publishes a SidecarsVerified event.
func (s *Service[_, BlobSidecarsT]) handleSidecarsReceived(
msg async.Event[BlobSidecarsT],
func (s *Service[_, ConsensusSidecarsT, _, _]) handleSidecarsReceived(
msg async.Event[ConsensusSidecarsT],
) {
var sidecarsErr error
// verify the sidecars.
if sidecarsErr = s.verifySidecars(msg.Data()); sidecarsErr != nil {
sidecarsErr := s.verifySidecars(msg.Data())
if sidecarsErr != nil {
s.logger.Error(
"Failed to receive blob sidecars",
"error",
sidecarsErr,
"error", sidecarsErr,
)
}

// emit the sidecars verification event with error from verifySidecars
if err := s.dispatcher.Publish(
async.NewEvent(
msg.Context(), async.SidecarsVerified, msg.Data(), sidecarsErr,
),
); err != nil {
event := async.NewEvent(
msg.Context(),
async.SidecarsVerified,
msg.Data().GetSidecars(),
sidecarsErr,
)
if err := s.dispatcher.Publish(event); err != nil {
s.logger.Error("failed to publish event", "err", err)
}
}
Expand All @@ -168,7 +174,7 @@ func (s *Service[_, BlobSidecarsT]) handleSidecarsReceived(
/* -------------------------------------------------------------------------- */

// ProcessSidecars processes the blob sidecars.
func (s *Service[_, BlobSidecarsT]) processSidecars(
func (s *Service[_, _, BlobSidecarsT, _]) processSidecars(
_ context.Context,
sidecars BlobSidecarsT,
) error {
Expand All @@ -181,9 +187,10 @@ func (s *Service[_, BlobSidecarsT]) processSidecars(
}

// VerifyIncomingBlobs receives blobs from the network and processes them.
func (s *Service[_, BlobSidecarsT]) verifySidecars(
sidecars BlobSidecarsT,
func (s *Service[_, ConsensusSidecarsT, _, _]) verifySidecars(
cs ConsensusSidecarsT,
) error {
sidecars := cs.GetSidecars()
// If there are no blobs to verify, return early.
if sidecars.IsNil() || sidecars.Len() == 0 {
return nil
Expand Down
5 changes: 5 additions & 0 deletions mod/da/pkg/da/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ type BlobProcessor[AvailabilityStoreT any, BlobSidecarsT any] interface {
) error
}

type ConsensusSidecars[BlobSidecarsT any, BeaconBlockHeaderT any] interface {
GetSidecars() BlobSidecarsT
GetHeader() BeaconBlockHeaderT
}

// BlobSidecar is the interface for the blob sidecar.
type BlobSidecar interface {
// Len returns the length of the sidecar.
Expand Down
9 changes: 8 additions & 1 deletion mod/node-core/pkg/components/blobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,17 +142,24 @@ type DAServiceIn[
func ProvideDAService[
AvailabilityStoreT AvailabilityStore[BeaconBlockBodyT, BlobSidecarsT],
BeaconBlockBodyT any,
BeaconBlockHeaderT any,
ConsensusSidecarsT ConsensusSidecars[BlobSidecarsT, BeaconBlockHeaderT],
BlobSidecarT any,
BlobSidecarsT BlobSidecars[BlobSidecarsT, BlobSidecarT],
LoggerT log.AdvancedLogger[LoggerT],
](
in DAServiceIn[
AvailabilityStoreT, BeaconBlockBodyT, BlobSidecarsT, LoggerT,
],
) *da.Service[AvailabilityStoreT, BlobSidecarsT] {
) *da.Service[
AvailabilityStoreT,
ConsensusSidecarsT, BlobSidecarsT, BeaconBlockHeaderT,
] {
return da.NewService[
AvailabilityStoreT,
ConsensusSidecarsT,
BlobSidecarsT,
BeaconBlockHeaderT,
](
in.AvailabilityStore,
in.BlobProcessor,
Expand Down
8 changes: 8 additions & 0 deletions mod/node-core/pkg/components/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,14 @@ type (
GetKzgCommitment() eip4844.KZGCommitment
}

ConsensusSidecars[
BlobSidecarsT any,
BeaconBlockHeaderT any,
] interface {
GetSidecars() BlobSidecarsT
GetHeader() BeaconBlockHeaderT
}

// BlobSidecars is the interface for blobs sidecars.
BlobSidecars[T, BlobSidecarT any] interface {
constraints.Nillable
Expand Down
10 changes: 8 additions & 2 deletions mod/node-core/pkg/components/service_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type ServiceRegistryInput[
*Validator, Validators, WithdrawalT,
],
BeaconStateMarshallableT any,
ConsensusSidecarsT ConsensusSidecars[BlobSidecarsT, BeaconBlockHeaderT],
BlobSidecarT any,
BlobSidecarsT BlobSidecars[BlobSidecarsT, BlobSidecarT],
DepositT Deposit[DepositT, *ForkData, WithdrawalCredentials],
Expand Down Expand Up @@ -84,7 +85,10 @@ type ServiceRegistryInput[
ExecutionPayloadHeaderT, GenesisT,
*engineprimitives.PayloadAttributes[WithdrawalT],
]
DAService *da.Service[AvailabilityStoreT, BlobSidecarsT]
DAService *da.Service[
AvailabilityStoreT,
ConsensusSidecarsT, BlobSidecarsT, BeaconBlockHeaderT,
]
DBManager *DBManager
DepositService *deposit.Service[
BeaconBlockT, BeaconBlockBodyT, DepositT,
Expand Down Expand Up @@ -126,6 +130,7 @@ func ProvideServiceRegistry[
*Validator, Validators, WithdrawalT,
],
BeaconStateMarshallableT any,
ConsensusSidecarsT ConsensusSidecars[BlobSidecarsT, BeaconBlockHeaderT],
BlobSidecarT any,
BlobSidecarsT BlobSidecars[BlobSidecarsT, BlobSidecarT],
DepositT Deposit[DepositT, *ForkData, WithdrawalCredentials],
Expand All @@ -144,7 +149,8 @@ func ProvideServiceRegistry[
AvailabilityStoreT,
ConsensusBlockT, BeaconBlockT, BeaconBlockBodyT,
BeaconBlockHeaderT, BeaconBlockStoreT, BeaconStateT,
BeaconStateMarshallableT, BlobSidecarT, BlobSidecarsT,
BeaconStateMarshallableT,
ConsensusSidecarsT, BlobSidecarT, BlobSidecarsT,
DepositT, DepositStoreT, ExecutionPayloadT, ExecutionPayloadHeaderT,
GenesisT, KVStoreT, LoggerT, NodeAPIContextT, WithdrawalT, WithdrawalsT,
],
Expand Down

0 comments on commit 214069d

Please sign in to comment.