Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug(blockchain): make block finalization wait on sidecar availability #2118

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions mod/beacon/blockchain/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/berachain/beacon-kit/mod/primitives/pkg/async"
"github.com/berachain/beacon-kit/mod/primitives/pkg/transition"
"golang.org/x/sync/errgroup"
)

// ProcessGenesisData processes the genesis state and initializes the beacon
Expand Down Expand Up @@ -63,25 +64,26 @@ func (s *Service[
return nil, err
}

// If the blobs needed to process the block are not available, we
// return an error. It is safe to use the slot off of the beacon block
// since it has been verified as correct already.
if !s.storageBackend.AvailabilityStore().IsDataAvailable(
ctx, blk.GetSlot(), blk.GetBody(),
) {
return nil, ErrDataNotAvailable
// To complete block finalization, we wait for the associated blob
// to be persisted and verify it's available. Once that is done we
// complete block finalization.
calbera marked this conversation as resolved.
Show resolved Hide resolved
g, _ := errgroup.WithContext(ctx)
g.Go(func() error {
return s.verifyFinalBlobAvailability(ctx, blk)
})
Comment on lines +70 to +73
Copy link
Contributor

@coderabbitai coderabbitai bot Nov 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Use the returned context from errgroup.WithContext for proper cancellation propagation

Currently, the context returned by errgroup.WithContext(ctx) is being ignored. To ensure that cancellation signals are properly propagated to the goroutines, you should use the returned context and pass it to the goroutines.

Apply this diff to fix the issue:

-	g, _ := errgroup.WithContext(ctx)
+	g, egCtx := errgroup.WithContext(ctx)
 	g.Go(func() error {
-		return s.verifyFinalBlobAvailability(ctx, blk)
+		return s.verifyFinalBlobAvailability(egCtx, blk)
 	})
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
g, _ := errgroup.WithContext(ctx)
g.Go(func() error {
return s.verifyFinalBlobAvailability(ctx, blk)
})
g, egCtx := errgroup.WithContext(ctx)
g.Go(func() error {
return s.verifyFinalBlobAvailability(egCtx, blk)
})

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 should use the context from errgroup here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@calbera, glad we're in agreement. Using the context from errgroup will ensure proper cancellation propagation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait why are we using errgroup here for 1 task? Usually err group helps for multiple concurrent tasks. Also we just end up waiting on its result anyways below, so it seems we can just call verifyFinalBlobAvailability in the same thread?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@calbera, you're absolutely right. Since we're only running a single task and waiting for its result immediately, using errgroup is unnecessary here. We can simplify the code by calling verifyFinalBlobAvailability directly in the same goroutine.

Here's the updated code:

if err = s.verifyFinalBlobAvailability(ctx, blk); err != nil {
    return nil, err
}

This removes the overhead of the errgroup and simplifies the code.


✏️ Learnings added
Learnt from: calbera
PR: berachain/beacon-kit#2118
File: mod/beacon/blockchain/process.go:70-73
Timestamp: 2024-11-04T20:24:07.176Z
Learning: When there's only one task and we're waiting for its result immediately, using `errgroup` may be unnecessary. Instead, call the function directly in the same goroutine to simplify the code.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.


// Wait for the sidecar to be finalized and its availability checked
if err = g.Wait(); err != nil {
return nil, err
calbera marked this conversation as resolved.
Show resolved Hide resolved
}

// If required, we want to forkchoice at the end of post
// block processing.
// TODO: this is hood as fuck.
// We won't send an fcu if the block is bad, should be addressed
// via ticker later.
if err = s.dispatcher.Publish(
async.NewEvent(
ctx, async.BeaconBlockFinalized, blk,
),
); err != nil {
event := async.NewEvent(ctx, async.BeaconBlockFinalized, blk)
if err = s.dispatcher.Publish(event); err != nil {
Comment on lines +85 to +86
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Check for potential data races in concurrent functions

The call to s.sendPostBlockFCU(ctx, st, blk) is executed as a new goroutine. Ensure that this function and any shared resources it accesses are thread-safe to prevent data races or synchronization issues.

return nil, err
}

Expand Down
40 changes: 40 additions & 0 deletions mod/beacon/blockchain/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ type Service[
// forceStartupSyncOnce is used to force a sync of the startup head.
forceStartupSyncOnce *sync.Once

// blobFinalized is used to verify blob sidecar availability upon
// block finalization
blobFinalized chan struct{}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@itsdevbear we may want to harden this and pass some kind of blob data to enhance verification. Not sure what. You got suggestions?

calbera marked this conversation as resolved.
Show resolved Hide resolved

// subBlobFinalized is a channel holding BlobSidecarsFinalized events.
subBlobFinalized chan async.Event[struct{}]
// subFinalBlkReceived is a channel holding FinalBeaconBlockReceived events.
subFinalBlkReceived chan async.Event[BeaconBlockT]
// subBlockReceived is a channel holding BeaconBlockReceived events.
Expand Down Expand Up @@ -142,6 +148,8 @@ func NewService[
metrics: newChainMetrics(telemetrySink),
optimisticPayloadBuilds: optimisticPayloadBuilds,
forceStartupSyncOnce: new(sync.Once),
blobFinalized: make(chan struct{}),
subBlobFinalized: make(chan async.Event[struct{}]),
subFinalBlkReceived: make(chan async.Event[BeaconBlockT]),
subBlockReceived: make(chan async.Event[BeaconBlockT]),
subGenDataReceived: make(chan async.Event[GenesisT]),
Expand Down Expand Up @@ -179,6 +187,12 @@ func (s *Service[
return err
}

if err := s.dispatcher.Subscribe(
async.BlobSidecarsFinalized, s.subBlobFinalized,
); err != nil {
return err
}

// start the main event loop to listen and handle events.
go s.eventLoop(ctx)
return nil
Expand All @@ -198,6 +212,8 @@ func (s *Service[
s.handleBeaconBlockReceived(event)
case event := <-s.subFinalBlkReceived:
s.handleBeaconBlockFinalization(event)
case event := <-s.subBlobFinalized:
s.blobFinalized <- event.Data()
Comment on lines +215 to +216
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid potential deadlock with unbuffered 'blobFinalized' channel

In the eventLoop method at lines 214-215, sending to s.blobFinalized may block if there's no goroutine ready to receive from it. Since blobFinalized is an unbuffered channel, this could lead to a deadlock if not properly synchronized.

Consider one of the following solutions:

Solution 1: Use a buffered channel

Apply this diff to make blobFinalized a buffered channel:

-    blobFinalized:             make(chan struct{}),
+    blobFinalized:             make(chan struct{}, 1),

Solution 2: Ensure proper synchronization

Ensure that verifyFinalBlobAvailability is called before the event is sent, guaranteeing that a goroutine is ready to receive from blobFinalized.

Committable suggestion skipped: line range outside the PR's diff.

}
}
}
Expand Down Expand Up @@ -313,3 +329,27 @@ func (s *Service[
)
}
}

// Given the block blk, ready for finalization, verifyFinalBlobAvailability
// waits for its blob sidecar to be available and verify it.
func (s *Service[
_, BeaconBlockT, _, _, _, _, _, _, _, _,
]) verifyFinalBlobAvailability(ctx context.Context, blk BeaconBlockT) error {
// wait for blob sidecar to be finalized
select {
case <-s.blobFinalized:
// Proceed with verification
case <-ctx.Done():
return ctx.Err()
}

// If the blobs needed to process the block are not available, we
// return an error. It is safe to use the slot off of the beacon block
// since it has been verified as correct already.
if !s.storageBackend.AvailabilityStore().IsDataAvailable(
ctx, blk.GetSlot(), blk.GetBody(),
) {
return ErrDataNotAvailable
}
return nil
}
Comment on lines +338 to +355
Copy link
Contributor

@coderabbitai coderabbitai bot Nov 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure 'blobFinalized' signal corresponds to the specific block being verified

Currently, verifyFinalBlobAvailability waits on s.blobFinalized, which signals when any blob sidecar is finalized. This approach may cause the function to proceed even if the blob sidecar for the block blk has not been finalized, potentially leading to incorrect verification. To prevent this, modify the synchronization mechanism to ensure that the blob sidecar for the specific block is finalized before proceeding.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would agree, might need to check that the blk.GetSlot() <= number of blobFinalized event

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@calbera,

That's a valid point. Comparing blk.GetSlot() to the number of blobFinalized events could help, but it might not be entirely reliable if blobs are finalized out of order or if multiple blobs are finalized concurrently. To ensure that the correct blob sidecar corresponds to the specific block being verified, it might be better to track blob finalization per slot.

Consider modifying blobFinalized to be a map from slots to channels:

// Use a map to track blob finalization per slot
blobFinalized map[common.Slot]chan struct{}

Then, in verifyFinalBlobAvailability, you can wait on the channel associated with the specific slot:

select {
case <-s.blobFinalized[blk.GetSlot()]:
    // Proceed with verification
case <-ctx.Done():
    return ctx.Err()
}

This way, the function only proceeds when the blob sidecar for the block blk has been finalized, ensuring accurate synchronization.

Let me know what you think about this approach.

(_/)
( •_•)
🐰🎉

12 changes: 10 additions & 2 deletions mod/da/pkg/da/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,16 @@ func (s *Service[_, BlobSidecarsT]) handleFinalSidecarsReceived(
if err := s.processSidecars(msg.Context(), msg.Data()); err != nil {
s.logger.Error(
"Failed to process blob sidecars",
"error",
err,
"error", err,
)
return
}

event := async.NewEvent(msg.Context(), async.BlobSidecarsFinalized, struct{}{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be worth adding the blk slot num in the event data here

if err := s.dispatcher.Publish(event); err != nil {
s.logger.Error(
"Failed to publish blob finalized event",
"error", err,
Comment on lines +140 to +144
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Consider enhancing the finalization event payload for better observability.

The current implementation uses an empty struct as the event payload. Including metadata about the processed sidecars would aid in debugging and monitoring the finalization process.

Consider this enhancement:

-event := async.NewEvent(msg.Context(), async.BlobSidecarsFinalized, struct{}{})
+event := async.NewEvent(msg.Context(), async.BlobSidecarsFinalized, struct {
+    SidecarCount  int
+    ProcessedAt   time.Time
+}{
+    SidecarCount:  msg.Data().Len(),
+    ProcessedAt:   time.Now(),
+})

Don't forget to add the time import:

import "time"

Comment on lines +135 to +144
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Consider implementing finalization timeout and retry mechanism.

While the current implementation ensures sidecar processing completion before finalization, it might benefit from additional robustness measures:

  1. Timeout mechanism to prevent indefinite waiting
  2. Retry logic for transient failures
  3. Metrics for monitoring processing duration and retry attempts

This would help handle edge cases in distributed scenarios where network delays or temporary failures might occur.

Would you like me to provide a detailed implementation suggestion for these enhancements?

)
}
}
Expand Down
1 change: 1 addition & 0 deletions mod/node-core/pkg/components/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func ProvideDispatcher[
dp.WithEvent[async.Event[BlobSidecarsT]](async.SidecarsVerified),
dp.WithEvent[async.Event[BeaconBlockT]](async.FinalBeaconBlockReceived),
dp.WithEvent[async.Event[BlobSidecarsT]](async.FinalSidecarsReceived),
dp.WithEvent[async.Event[struct{}]](async.BlobSidecarsFinalized),
dp.WithEvent[ValidatorUpdateEvent](
async.FinalValidatorUpdatesProcessed,
),
Expand Down
1 change: 1 addition & 0 deletions mod/primitives/pkg/async/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,5 @@ const (
FinalSidecarsReceived = "final-blob-sidecars-received"
FinalValidatorUpdatesProcessed = "final-validator-updates"
BeaconBlockFinalized = "beacon-block-finalized"
BlobSidecarsFinalized = "blob-sidecars-finalized"
)
Loading