Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Beat [2/4]: implement blockbeat #8894

Open
wants to merge 10 commits into
base: yy-prepare-blockbeat
Choose a base branch
from

Conversation

yyforyongyu
Copy link
Collaborator

Depends on #8893.

This PR introduces a minimal version of the new service Blockbeat as described in #7952, to handle block synchronization among different subsystems.

During startup, blockbeat consumers are registered in the BlockbeatDispatcher, a subservice that's responsible for dispatching new blockbeats to its consumers and waiting for its consumers to finish processing the blocks. If any of the consumers fail to process the block under 30s, or encounter an error during block processing, the system will shut down as it's critical to handle blocks.

This PR focuses on implementing blockbeat Consumer interface for ChainArb, UtxoSweeper and TxPublisher, the following PR focuses on finalizing blockbeat processing in ChainArb's subservices - ChannelArbitrator, chainWatcher, and ContractResolver.

Overview

The flow of the blockbeat process is shown below, whenever a new block arrives, it goes through the waterfall like this,

  1. blockbeat dispatcher receives a new block epoch and makes a blockbeat, and sends it to its consumers sequentially.
  2. ChainArb receives the blockbeat, processes it and signals back when done.
  3. UtxoSweeper receives the blockbeat, processes it and signals back when done.
  4. TxPublisher receives the blockbeat, processes it and signals back when done.
  5. This new block is now considered processed by the blockbeat dispatcher.
sequenceDiagram
		autonumber
		participant bb as BlockBeat
		participant cc as ChainArb
		participant us as UtxoSweeper
		participant tp as TxPublisher
		
		note left of bb: 0. received block x,<br>dispatching...
		
    note over bb,cc: 1. send block x to ChainArb,<br>wait for its done signal
		bb->>cc: block x
		rect rgba(165, 0, 85, 0.8)
      critical signal processed
        cc->>bb: processed block
      option Process error or timeout
        bb->>bb: error and exit
      end
    end

    note over bb,us: 2. send block x to UtxoSweeper, wait for its done signal
		bb->>us: block x
		rect rgba(165, 0, 85, 0.8)
      critical signal processed
        us->>bb: processed block
      option Process error or timeout
        bb->>bb: error and exit
      end
    end

    note over bb,tp: 3. send block x to TxPublisher, wait for its done signal
		bb->>tp: block x
		rect rgba(165, 0, 85, 0.8)
      critical signal processed
        tp->>bb: processed block
      option Process error or timeout
        bb->>bb: error and exit
      end
    end
Loading

NOTE: itests are failing in this PR and is fixed in the final PR.

TODO

  • add unit tests for new code
  • add readme

@yyforyongyu yyforyongyu added this to the v0.18.2 milestone Jul 4, 2024
@yyforyongyu yyforyongyu self-assigned this Jul 4, 2024
Copy link
Contributor

coderabbitai bot commented Jul 4, 2024

Important

Review skipped

Auto reviews are limited to specific labels.

Labels to auto review (1)
  • llm-review

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

Share
Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai generate interesting stats about this repository and render them as a table.
    • @coderabbitai show all the console.log statements in this repository.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (invoked as PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Additionally, you can add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Collaborator

@guggero guggero left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did a first pass to load context.
The sweeper commits towards the end seem to make sense, but I'm missing all the review context from the sweeper PR saga to make a good judgement call.

But I really like the concept of the block beat! This should help us quite a bit in synchronizing all subsystems.

I assume a follow-up PR that refactors all subsystems to use this will come once this lands?

@@ -0,0 +1 @@
# Chainio
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove file or add some basic info?
Your diagram from the PR body would fit here nicely I think. And the mermaid syntax should be supported too.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do!

// log is a logger that is initialized with no output filters. This
// means the package will not perform any logging by default until the caller
// requests it.
var clog btclog.Logger
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: any specific reason to rename this from the default log? Also, the Godoc comment no longer matches now.

chainio/log.go Outdated

// The default amount of logging is none.
func init() {
UseLogger(build.NewSubLogger("CHIO", nil))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should add this as a Subsystem variable in the package, so it can be used in the root log.go file without needing to re-use the value.

// epoch sequentially.
//
// NOTE: Part of the Blockbeat interface.
func (b Beat) DispatchSequential(consumers []Consumer) error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are all these methods implemented on a non-pointer receiver on purpose?
I think for channels it's fine as they are pointers. But if we have other members, such as a wait group or locks, we would run into an issue if Beat was copied instead of being accessed as a pointer.

}

// Wait for all consumers in each queue to finish.
for name, errChan := range errChans {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the order important here? Otherwise we could use errgroup.Group instead.
Maybe that would also be a bit less aggressive in creating goroutines, as it would cap the number of concurrent executions to number of CPUs (while here we create one goroutine per consumer).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. Currently if the number of consumers is much larger than the number of CPUs, the chances of timeouts go up a lot.

go func(qid uint32, c []Consumer, b Beat) {
// Notify each consumer in this queue sequentially.
errChan <- b.DispatchSequential(c)
}(qid, consumers, b.beat)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here re errgroup.Group.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I guess being able to listen on b.quit below might make this a bit harder.
Or maybe it's time to also use the ContextGuard we've been using in tapd. So you would use errgroup.WithContext(b.WithCtxQuitNoTimeout()) assuming a context guard is embedded in BlockbeatDispatcher.

// NOTE: part of the `chainio.Consumer` interface.
func (b *BeatConsumer) SetCurrentBeat(beat Beat) {
beat.log.Tracef("set current height for [%s]", b.name)
b.currentBeat = beat
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since I assume this is also the main way for a subsystem to access its current block, shouldn't this either be atomic or be guarded by a mutex?

@@ -321,6 +322,8 @@ func (h HtlcSetKey) String() string {
// broadcasting to ensure that we avoid any possibility of race conditions, and
// sweep the output(s) without contest.
type ChannelArbitrator struct {
chainio.BeatConsumer
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs to come after the atomic variables, otherwise we might run into alignment issues in 32-bit systems.

log.go Outdated
@@ -164,6 +165,7 @@ func SetupLoggers(root *build.RotatingLogWriter, interceptor signal.Interceptor)
AddSubLogger(root, "CHFD", interceptor, chanfunding.UseLogger)
AddSubLogger(root, "PEER", interceptor, peer.UseLogger)
AddSubLogger(root, "CHCL", interceptor, chancloser.UseLogger)
AddSubLogger(root, "CHIO", interceptor, chainio.UseLogger)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: see my previous comment about defining a Subsystem variable instead.

@saubyk saubyk added the P1 MUST be fixed or reviewed label Jul 15, 2024
@yyforyongyu yyforyongyu force-pushed the yy-prepare-blockbeat branch 2 times, most recently from 16e2353 to 7183080 Compare July 15, 2024 18:02
Copy link
Collaborator

@ellemouton ellemouton left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really liking this! Left some questions so long (pretty much all just questions for my understanding). Gonna need another round to really grok things

// concurrently.
DispatchConcurrent(consumers []Consumer) error

// DispatchConcurrent sends the blockbeat to the specified consumers
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: comment mismatch


// DispatchConcurrent sends the blockbeat to the specified consumers
// concurrently.
DispatchConcurrent(consumers []Consumer) error
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm so subsystems get control over when to send the beat to other subsystems? are consumers and subsystems the same thing?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are things cyclic? from the interfaces, it looks like Consumers get given a BlockBeat from which they can then call Dispatch* on other Consumers.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah ok - looks like this is how a subsystem can create consumers from within itself and use this to pass the beat to those consumers?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it correct then to say that Consumers must call NotifyBlockProcessed after any calls to Dispatch*?

Also: I assume only the first level consumer (ie the one registerd with the dispatcher_ should ever call NotifyBlockProcessed and not any of the consumers passed into the Dispatch* calls... if so - should we have two separate Consumer interfaces so that it isnt possible for the sub-consumers to call NotivyBlockProcessed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah ok - looks like this is how a subsystem can create consumers from within itself and use this to pass the beat to those consumers?

Yes. Since sometimes there are subsystems living inside a subsystem, e.g., multiple ChannelArbitrators inside a ChainArbitrator, so I created this helper methods to let subsystem easily dispatch blockbeats.

is it correct then to say that Consumers must call NotifyBlockProcessed after any calls to Dispatch*?
I assume only the first level consumer...

Correct. As long as it's a Consumer, it must call NotifyBlockProcessed, which is intentionally by design.

// state change, and to provide them with the data they need to process it. In
// other words, subsystems must react to this state change and should consider
// being driven by the blockbeat in their own state machines.
type Blockbeat interface {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so the only info about the block carried by block beat is the height?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, should the name be BlockBeat (ie 2 words)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also - does this need to be an interface? is there any other expected implementation of this other than Beat? looks like even the ProcessBlock(b Blockbeat) <-chan error method on the Consumer interface below is later changed to ProcessBlock(b Beat) <-chan error

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so the only info about the block carried by block beat is the height?

In this PR yes, but in the final PR it has more info f95fc39
Sorry if it's a bit confusing - this PR is created from check-picking commits from the final PR, so not all pieces fit together here. The ultimate plan is that we can remove chainntfns and use this package instead for all onchain data.

also, should the name be BlockBeat (ie 2 words)

So I got the inspiration from heartbeat, guess I'm inventing words here but lmk what you think.

is there any other expected implementation of this other than Beat?

Hopefully not🤓 Initially this is a concrete struct, but later on, it was changed because it allows unit tests to be written more easily.

// `lnd`'s subsystems. During startup, subsystems that are block-driven should
// implement the `Consumer` interface and register themselves via
// `RegisterQueue`. When two subsystems are independent of each other, they
// should be registered in differet queues so blocks are notified concurrently.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/differet/different

// consumerQueues is a map of consumers that will receive blocks. Each
// queue is notified concurrently, and consumers in the same queue is
// notified sequentially.
consumerQueues map[uint32][]Consumer
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe just note what the key is here

}

// Start listening to new block epochs.
blockEpochs, err := b.notifier.RegisterBlockEpochNtfn(nil)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we not maybe have a persisted block height/hash cursor so we subscribe from that height/hash instead of the tip? Ie, dont we want to guarantee that each subsystem is given each block regardless of downtime?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then each subsystem also needs to be idempotent such that it is able to "process" each block more than once without any side-effects

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could also do: each consumer has a block cursor and then on registration tells the dispatcher the last block it successfully processed & then this registration starts from the lowest block height received from consumers

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also noting that it seems we currently send each consumer this first block twice right? Once at SetInitialBlock time and then again at Start time.

// error chan must be returned.
//
// NOTE: When implementing this, it's very important to send back the
// error or nil to the channel `b.errChan` immediately, otherwise
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's unclear from an interface perspective what b.errrChan here is

return b.epoch.Height
}

// NOTE: Part of the Blockbeat interface.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: comment start is missing

Comment on lines +139 to +151
err, timeout := fn.RecvOrTimeout(
c.ProcessBlock(beatCopy), DefaultProcessBlockTimeout,
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we may need 2 separate timeout checks here:

  • one for actually getting back the error channel from c.ProcessBlock(beatCopy)
  • another waiting for a signal on that returned error channel.

This currently does the second part but I think what can happen is that c.ProcessBLock hangs and so then RecvOrTimeout never actually starts?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ie, I think we actually need a helper like this:

func RecvOrTimeout[T any](fn func() <-chan T, timeout time.Duration) (T, error) {
	var (
		c           chan T
		timeoutChan = time.After(timeout)
	)

	// Wait for the function to return the channel we want to listen on.
	select {
	case c = <-fn():
	case <-timeoutChan:
		var zero T
		return zero, fmt.Errorf("timeout hit")
	}

	// Now that we have the channel, wait to receive on it.
	select {
	case m := <-c:
		return m, nil

	case <-timeoutChan:
		var zero T
		return zero, fmt.Errorf("timeout hit")
	}
}

// Send the beat to the blockbeat channel. It's expected that the
// consumer will read from this channel and process the block. Once
// processed, it should return the error or nil to the beat.Err chan.
case b.BlockbeatChan <- beat:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the BlockbeatChan be buffered so that ProcessBlock returns immediately with the errChan?

@yyforyongyu yyforyongyu force-pushed the yy-prepare-blockbeat branch 2 times, most recently from 755c7f3 to 9ec032a Compare July 18, 2024 07:32
@yyforyongyu yyforyongyu changed the title Beat [2/3]: implement blockbeat Beat [2/4]: implement blockbeat Jul 18, 2024
@saubyk saubyk modified the milestones: v0.18.3, v0.19.0 Jul 18, 2024
Copy link
Collaborator

@morehouse morehouse left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple high-level concerns:

  • The timeout might be too short, causing complete node shutdown under transient load when previously recovery would have been possible.
  • TxPublisher should receive beats before UtxoSweeper, as discussed previously.

Comment on lines +13 to +15
// DefaultProcessBlockTimeout is the timeout value used when waiting for one
// consumer to finish processing the new block epoch.
var DefaultProcessBlockTimeout = 30 * time.Second
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

30 seconds is quite aggressive IMO, especially for lower end setups. Something like #8889 could potentially keep LND occupied for that long, thereby forcing the entire node to shutdown over something transient.

// NOTE: When implementing this, it's very important to send back the
// error or nil to the channel `b.errChan` immediately, otherwise
// BlockbeatDispatcher will timeout and lnd will shutdown.
ProcessBlock(b Beat) <-chan error
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't these methods use the interface BlockBeat instead of the implemented Beat?

}

// Wait for all consumers in each queue to finish.
for name, errChan := range errChans {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. Currently if the number of consumers is much larger than the number of CPUs, the chances of timeouts go up a lot.

c.Unlock()

// Iterate all the copied channels and send the blockbeat to them.
err := beat.DispatchConcurrent(channels)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I worry about timeouts in extreme cases here. handleBlockbeat has 30 seconds to complete, but every single channel arbitrator needs to process the block in that time. If there's thousands of channels, how long does it actually take to dispatch?

yyforyongyu and others added 10 commits October 15, 2024 17:17
This commit inits the package `chainio` and defines the interface
`Blockbeat` and `Consumer`. The `Consumer` must be implemented by other
subsystems if it requires block epoch subscription.
In this commit, a minimal implementation of `Blockbeat` is added to
synchronize block heights, which will be used in `ChainArb`, `Sweeper`,
and `TxPublisher` so blocks are processed sequentially among them.
This commit adds a blockbeat dispatcher which handles sending new blocks
to all subscribed consumers.
This commit implements `Consumer` on `TxPublisher`, `UtxoSweeper`,
`ChainArbitrator` and `ChannelArbitrator`. In addition, a
`BlockConsumer` is added to save code duplication.
This commit removes the independent block subscriptions in `UtxoSweeper`
and `TxPublisher`. These subsystems now listen to the `BlockbeatChan`
for new blocks.
The sweeper can handle the waiting so there's no need to wait for blocks
inside the resolvers. By offering the inputs prior to their mature
heights also guarantees the inputs with the same deadline are
aggregated.
This commit removes the block subscriptions used in `ChainArbitrator`
and `ChannelArbitrator`, replaced them with the blockbeat managed by
`BlockbeatDispatcher`.
This `immediate` flag was added as a hack so during a restart, the
pending resolvers would offer the inputs to the sweeper and ask it to
sweep them immediately. This is no longer need due to `blockbeat`, as
now during restart, a block is always sent to all subsystems via the
flow `ChainArb` -> `ChannelArb` -> resolvers -> sweeper. Thus, when
there are pending inputs offered, they will be processed by the sweeper
immediately.
@lightninglabs-deploy
Copy link

@yyforyongyu, remember to re-request review from reviewers when ready

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
P1 MUST be fixed or reviewed utxo sweeping
Projects
Status: In Progress
Development

Successfully merging this pull request may close these issues.

6 participants