Skip to content

Commit

Permalink
refactor: move various duties from ChannelRouter to graph.Builder
Browse files Browse the repository at this point in the history
This commit is a large refactor that moves over various responsibilities
from the ChannelRouter to the graph.Builder. These include all graph
related tasks such as:
- graph pruning
- validation of new network updates & persisting new updates
- notifying topology update clients of any changes.

This is a large commit but:
- many of the files are purely moved from `routing` to `graph`
- the business logic put in the graph Builder is copied exactly as is
  from the ChannelRouter with one exception:
- The ChannelRouter just needs to be able to call the Builder's
  `ApplyChannelUpdate` method. So this is now exported and provided to
the ChannelRouter as a config option.
- The trickiest part was just moving over the test code since quite a
  bit had to be duplicated.
  • Loading branch information
ellemouton committed Jul 15, 2024
1 parent 0b7364f commit 7f1be39
Show file tree
Hide file tree
Showing 26 changed files with 5,741 additions and 4,482 deletions.
4 changes: 2 additions & 2 deletions autopilot/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (

"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/graph"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing"
)

// ManagerCfg houses a set of values and methods that is passed to the Manager
Expand Down Expand Up @@ -36,7 +36,7 @@ type ManagerCfg struct {

// SubscribeTopology is used to get a subscription for topology changes
// on the network.
SubscribeTopology func() (*routing.TopologyClient, error)
SubscribeTopology func() (*graph.TopologyClient, error)
}

// Manager is struct that manages an autopilot agent, making it possible to
Expand Down
6 changes: 3 additions & 3 deletions discovery/chan_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (

"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/graph"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/netann"
"github.com/lightningnetwork/lnd/routing"
"github.com/lightningnetwork/lnd/routing/route"
)

Expand Down Expand Up @@ -136,7 +136,7 @@ func (c *ChanSeries) UpdatesInHorizon(chain chainhash.Hash,
if edge1 != nil {
// We don't want to send channel updates that don't
// conform to the spec (anymore).
err := routing.ValidateChannelUpdateFields(0, edge1)
err := graph.ValidateChannelUpdateFields(0, edge1)
if err != nil {
log.Errorf("not sending invalid channel "+
"update %v: %v", edge1, err)
Expand All @@ -145,7 +145,7 @@ func (c *ChanSeries) UpdatesInHorizon(chain chainhash.Hash,
}
}
if edge2 != nil {
err := routing.ValidateChannelUpdateFields(0, edge2)
err := graph.ValidateChannelUpdateFields(0, edge2)
if err != nil {
log.Errorf("not sending invalid channel "+
"update %v: %v", edge2, err)
Expand Down
43 changes: 21 additions & 22 deletions discovery/gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/multimutex"
"github.com/lightningnetwork/lnd/netann"
"github.com/lightningnetwork/lnd/routing"
"github.com/lightningnetwork/lnd/routing/route"
"github.com/lightningnetwork/lnd/ticker"
"golang.org/x/time/rate"
Expand Down Expand Up @@ -1361,7 +1360,7 @@ func (d *AuthenticatedGossiper) networkHandler() {

// We'll use this validation to ensure that we process jobs in their
// dependency order during parallel validation.
validationBarrier := routing.NewValidationBarrier(1000, d.quit)
validationBarrier := graph.NewValidationBarrier(1000, d.quit)

for {
select {
Expand Down Expand Up @@ -1486,7 +1485,7 @@ func (d *AuthenticatedGossiper) networkHandler() {
//
// NOTE: must be run as a goroutine.
func (d *AuthenticatedGossiper) handleNetworkMessages(nMsg *networkMsg,
deDuped *deDupedAnnouncements, vb *routing.ValidationBarrier) {
deDuped *deDupedAnnouncements, vb *graph.ValidationBarrier) {

defer d.wg.Done()
defer vb.CompleteJob()
Expand All @@ -1502,10 +1501,10 @@ func (d *AuthenticatedGossiper) handleNetworkMessages(nMsg *networkMsg,
log.Debugf("Validating network message %s got err: %v",
nMsg.msg.MsgType(), err)

if !routing.IsError(
if !graph.IsError(
err,
routing.ErrVBarrierShuttingDown,
routing.ErrParentValidationFailed,
graph.ErrVBarrierShuttingDown,
graph.ErrParentValidationFailed,
) {

log.Warnf("unexpected error during validation "+
Expand Down Expand Up @@ -1861,7 +1860,7 @@ func (d *AuthenticatedGossiper) processRejectedEdge(
if err != nil {
return nil, err
}
err = routing.ValidateChannelAnn(chanAnn)
err = graph.ValidateChannelAnn(chanAnn)
if err != nil {
err := fmt.Errorf("assembled channel announcement proof "+
"for shortChanID=%v isn't valid: %v",
Expand Down Expand Up @@ -1910,7 +1909,7 @@ func (d *AuthenticatedGossiper) processRejectedEdge(
func (d *AuthenticatedGossiper) addNode(msg *lnwire.NodeAnnouncement,
op ...batch.SchedulerOption) error {

if err := routing.ValidateNodeAnn(msg); err != nil {
if err := graph.ValidateNodeAnn(msg); err != nil {
return fmt.Errorf("unable to validate node announcement: %w",
err)
}
Expand Down Expand Up @@ -2064,7 +2063,7 @@ func (d *AuthenticatedGossiper) processZombieUpdate(
"with chan_id=%v", msg.ShortChannelID)
}

err := routing.VerifyChannelUpdateSignature(msg, pubKey)
err := graph.VerifyChannelUpdateSignature(msg, pubKey)
if err != nil {
return fmt.Errorf("unable to verify channel "+
"update signature: %v", err)
Expand Down Expand Up @@ -2201,7 +2200,7 @@ func (d *AuthenticatedGossiper) updateChannel(info *models.ChannelEdgeInfo,

// To ensure that our signature is valid, we'll verify it ourself
// before committing it to the slice returned.
err = routing.ValidateChannelUpdateAnn(d.selfKey, info.Capacity, chanUpdate)
err = graph.ValidateChannelUpdateAnn(d.selfKey, info.Capacity, chanUpdate)
if err != nil {
return nil, nil, fmt.Errorf("generated invalid channel "+
"update sig: %v", err)
Expand Down Expand Up @@ -2338,11 +2337,11 @@ func (d *AuthenticatedGossiper) handleNodeAnnouncement(nMsg *networkMsg,
log.Debugf("Adding node: %x got error: %v", nodeAnn.NodeID,
err)

if !routing.IsError(
if !graph.IsError(
err,
routing.ErrOutdated,
routing.ErrIgnored,
routing.ErrVBarrierShuttingDown,
graph.ErrOutdated,
graph.ErrIgnored,
graph.ErrVBarrierShuttingDown,
) {

log.Error(err)
Expand Down Expand Up @@ -2457,7 +2456,7 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg,
// the signatures within the proof as it should be well formed.
var proof *models.ChannelAuthProof
if nMsg.isRemote {
if err := routing.ValidateChannelAnn(ann); err != nil {
if err := graph.ValidateChannelAnn(ann); err != nil {
err := fmt.Errorf("unable to validate announcement: "+
"%v", err)

Expand Down Expand Up @@ -2538,7 +2537,7 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg,
// If the edge was rejected due to already being known, then it
// may be the case that this new message has a fresh channel
// proof, so we'll check.
if routing.IsError(err, routing.ErrIgnored) {
if graph.IsError(err, graph.ErrIgnored) {
// Attempt to process the rejected message to see if we
// get any new announcements.
anns, rErr := d.processRejectedEdge(ann, proof)
Expand Down Expand Up @@ -2862,7 +2861,7 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
// Validate the channel announcement with the expected public key and
// channel capacity. In the case of an invalid channel update, we'll
// return an error to the caller and exit early.
err = routing.ValidateChannelUpdateAnn(pubKey, chanInfo.Capacity, upd)
err = graph.ValidateChannelUpdateAnn(pubKey, chanInfo.Capacity, upd)
if err != nil {
rErr := fmt.Errorf("unable to validate channel update "+
"announcement for short_chan_id=%v: %v",
Expand Down Expand Up @@ -2947,10 +2946,10 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
}

if err := d.cfg.Router.UpdateEdge(update, ops...); err != nil {
if routing.IsError(
err, routing.ErrOutdated,
routing.ErrIgnored,
routing.ErrVBarrierShuttingDown,
if graph.IsError(
err, graph.ErrOutdated,
graph.ErrIgnored,
graph.ErrVBarrierShuttingDown,
) {

log.Debugf("Update edge for short_chan_id(%v) got: %v",
Expand Down Expand Up @@ -3268,7 +3267,7 @@ func (d *AuthenticatedGossiper) handleAnnSig(nMsg *networkMsg,

// With all the necessary components assembled validate the full
// channel announcement proof.
if err := routing.ValidateChannelAnn(chanAnn); err != nil {
if err := graph.ValidateChannelAnn(chanAnn); err != nil {
err := fmt.Errorf("channel announcement proof for "+
"short_chan_id=%v isn't valid: %v", shortChanID, err)

Expand Down
5 changes: 2 additions & 3 deletions discovery/gossiper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/lightningnetwork/lnd/lntest/wait"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/netann"
"github.com/lightningnetwork/lnd/routing"
"github.com/lightningnetwork/lnd/routing/route"
"github.com/lightningnetwork/lnd/ticker"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -351,7 +350,7 @@ func (r *mockGraphSource) IsStaleEdgePolicy(chanID lnwire.ShortChannelID,
// Since it exists within our zombie index, we'll check that it
// respects the router's live edge horizon to determine whether
// it is stale or not.
return time.Since(timestamp) > routing.DefaultChannelPruneExpiry
return time.Since(timestamp) > graph.DefaultChannelPruneExpiry
}

switch {
Expand Down Expand Up @@ -2258,7 +2257,7 @@ func TestProcessZombieEdgeNowLive(t *testing.T) {

// We'll generate a channel update with a timestamp far enough in the
// past to consider it a zombie.
zombieTimestamp := time.Now().Add(-routing.DefaultChannelPruneExpiry)
zombieTimestamp := time.Now().Add(-graph.DefaultChannelPruneExpiry)
batch.chanUpdAnn2.Timestamp = uint32(zombieTimestamp.Unix())
if err := signUpdate(remoteKeyPriv2, batch.chanUpdAnn2); err != nil {
t.Fatalf("unable to sign update with new timestamp: %v", err)
Expand Down
26 changes: 13 additions & 13 deletions funding/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channeldb/models"
"github.com/lightningnetwork/lnd/discovery"
"github.com/lightningnetwork/lnd/graph"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/labels"
Expand All @@ -33,7 +34,6 @@ import (
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
"github.com/lightningnetwork/lnd/lnwallet/chanfunding"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing"
"golang.org/x/crypto/salsa20"
)

Expand Down Expand Up @@ -3415,10 +3415,10 @@ func (f *Manager) addToRouterGraph(completeChan *channeldb.OpenChannel,
select {
case err := <-errChan:
if err != nil {
if routing.IsError(err, routing.ErrOutdated,
routing.ErrIgnored) {
if graph.IsError(err, graph.ErrOutdated,
graph.ErrIgnored) {

log.Debugf("Router rejected "+
log.Debugf("Graph rejected "+
"ChannelAnnouncement: %v", err)
} else {
return fmt.Errorf("error sending channel "+
Expand All @@ -3435,10 +3435,10 @@ func (f *Manager) addToRouterGraph(completeChan *channeldb.OpenChannel,
select {
case err := <-errChan:
if err != nil {
if routing.IsError(err, routing.ErrOutdated,
routing.ErrIgnored) {
if graph.IsError(err, graph.ErrOutdated,
graph.ErrIgnored) {

log.Debugf("Router rejected "+
log.Debugf("Graph rejected "+
"ChannelUpdate: %v", err)
} else {
return fmt.Errorf("error sending channel "+
Expand Down Expand Up @@ -4354,10 +4354,10 @@ func (f *Manager) announceChannel(localIDKey, remoteIDKey *btcec.PublicKey,
select {
case err := <-errChan:
if err != nil {
if routing.IsError(err, routing.ErrOutdated,
routing.ErrIgnored) {
if graph.IsError(err, graph.ErrOutdated,
graph.ErrIgnored) {

log.Debugf("Router rejected "+
log.Debugf("Graph rejected "+
"AnnounceSignatures: %v", err)
} else {
log.Errorf("Unable to send channel "+
Expand All @@ -4384,10 +4384,10 @@ func (f *Manager) announceChannel(localIDKey, remoteIDKey *btcec.PublicKey,
select {
case err := <-errChan:
if err != nil {
if routing.IsError(err, routing.ErrOutdated,
routing.ErrIgnored) {
if graph.IsError(err, graph.ErrOutdated,
graph.ErrIgnored) {

log.Debugf("Router rejected "+
log.Debugf("Graph rejected "+
"NodeAnnouncement: %v", err)
} else {
log.Errorf("Unable to send node "+
Expand Down
2 changes: 1 addition & 1 deletion routing/ann_validation.go → graph/ann_validation.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package routing
package graph

import (
"bytes"
Expand Down
Loading

0 comments on commit 7f1be39

Please sign in to comment.