Skip to content

Commit

Permalink
Merge pull request #8848 from ellemouton/graphManager
Browse files Browse the repository at this point in the history
refactor: move graph responsibilities from routing.ChannelRouter to new graph.Builder
  • Loading branch information
guggero authored Jul 16, 2024
2 parents 09b38aa + b112e10 commit fae7e0c
Show file tree
Hide file tree
Showing 45 changed files with 6,341 additions and 4,577 deletions.
8 changes: 5 additions & 3 deletions autopilot/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (d *dbNode) Addrs() []net.Addr {
//
// NOTE: Part of the autopilot.Node interface.
func (d *dbNode) ForEachChannel(cb func(ChannelEdge) error) error {
return d.db.ForEachNodeChannel(d.tx, d.node.PubKeyBytes,
return d.db.ForEachNodeChannelTx(d.tx, d.node.PubKeyBytes,
func(tx kvdb.RTx, ei *models.ChannelEdgeInfo, ep,
_ *models.ChannelEdgePolicy) error {

Expand All @@ -105,7 +105,9 @@ func (d *dbNode) ForEachChannel(cb func(ChannelEdge) error) error {
return nil
}

node, err := d.db.FetchLightningNode(tx, ep.ToNode)
node, err := d.db.FetchLightningNodeTx(
tx, ep.ToNode,
)
if err != nil {
return err
}
Expand Down Expand Up @@ -164,7 +166,7 @@ func (d *databaseChannelGraph) addRandChannel(node1, node2 *btcec.PublicKey,
return nil, err
}

dbNode, err := d.db.FetchLightningNode(nil, vertex)
dbNode, err := d.db.FetchLightningNode(vertex)
switch {
case err == channeldb.ErrGraphNodeNotFound:
fallthrough
Expand Down
4 changes: 2 additions & 2 deletions autopilot/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (

"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/graph"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing"
)

// ManagerCfg houses a set of values and methods that is passed to the Manager
Expand Down Expand Up @@ -36,7 +36,7 @@ type ManagerCfg struct {

// SubscribeTopology is used to get a subscription for topology changes
// on the network.
SubscribeTopology func() (*routing.TopologyClient, error)
SubscribeTopology func() (*graph.TopologyClient, error)
}

// Manager is struct that manages an autopilot agent, making it possible to
Expand Down
2 changes: 1 addition & 1 deletion channeldb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1351,7 +1351,7 @@ func (d *DB) AddrsForNode(nodePub *btcec.PublicKey) ([]net.Addr,
if err != nil {
return nil, err
}
graphNode, err := d.graph.FetchLightningNode(nil, pubKey)
graphNode, err := d.graph.FetchLightningNode(pubKey)
if err != nil && err != ErrGraphNodeNotFound {
return nil, err
} else if err == ErrGraphNodeNotFound {
Expand Down
64 changes: 54 additions & 10 deletions channeldb/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ func (c *ChannelGraph) FetchNodeFeatures(
}

// Fallback that uses the database.
targetNode, err := c.FetchLightningNode(nil, node)
targetNode, err := c.FetchLightningNode(node)
switch err {
// If the node exists and has features, return them directly.
case nil:
Expand Down Expand Up @@ -565,7 +565,7 @@ func (c *ChannelGraph) ForEachNodeCached(cb func(node route.Vertex,
return c.ForEachNode(func(tx kvdb.RTx, node *LightningNode) error {
channels := make(map[uint64]*DirectedChannel)

err := c.ForEachNodeChannel(tx, node.PubKeyBytes,
err := c.ForEachNodeChannelTx(tx, node.PubKeyBytes,
func(tx kvdb.RTx, e *models.ChannelEdgeInfo,
p1 *models.ChannelEdgePolicy,
p2 *models.ChannelEdgePolicy) error {
Expand Down Expand Up @@ -2374,10 +2374,19 @@ func (c *ChannelGraph) FilterChannelRange(startHeight,
// skipped and the result will contain only those edges that exist at the time
// of the query. This can be used to respond to peer queries that are seeking to
// fill in gaps in their view of the channel graph.
func (c *ChannelGraph) FetchChanInfos(chanIDs []uint64) ([]ChannelEdge, error) {
return c.fetchChanInfos(nil, chanIDs)
}

// fetchChanInfos returns the set of channel edges that correspond to the passed
// channel ID's. If an edge is the query is unknown to the database, it will
// skipped and the result will contain only those edges that exist at the time
// of the query. This can be used to respond to peer queries that are seeking to
// fill in gaps in their view of the channel graph.
//
// NOTE: An optional transaction may be provided. If none is provided, then a
// new one will be created.
func (c *ChannelGraph) FetchChanInfos(tx kvdb.RTx, chanIDs []uint64) (
func (c *ChannelGraph) fetchChanInfos(tx kvdb.RTx, chanIDs []uint64) (
[]ChannelEdge, error) {
// TODO(roasbeef): sort cids?

Expand Down Expand Up @@ -2922,7 +2931,7 @@ func (c *ChannelGraph) isPublic(tx kvdb.RTx, nodePub route.Vertex,
// used to terminate the check early.
nodeIsPublic := false
errDone := errors.New("done")
err := c.ForEachNodeChannel(tx, nodePub, func(tx kvdb.RTx,
err := c.ForEachNodeChannelTx(tx, nodePub, func(tx kvdb.RTx,
info *models.ChannelEdgeInfo, _ *models.ChannelEdgePolicy,
_ *models.ChannelEdgePolicy) error {

Expand Down Expand Up @@ -2954,12 +2963,31 @@ func (c *ChannelGraph) isPublic(tx kvdb.RTx, nodePub route.Vertex,
return nodeIsPublic, nil
}

// FetchLightningNodeTx attempts to look up a target node by its identity
// public key. If the node isn't found in the database, then
// ErrGraphNodeNotFound is returned. An optional transaction may be provided.
// If none is provided, then a new one will be created.
func (c *ChannelGraph) FetchLightningNodeTx(tx kvdb.RTx, nodePub route.Vertex) (
*LightningNode, error) {

return c.fetchLightningNode(tx, nodePub)
}

// FetchLightningNode attempts to look up a target node by its identity public
// key. If the node isn't found in the database, then ErrGraphNodeNotFound is
// returned.
func (c *ChannelGraph) FetchLightningNode(nodePub route.Vertex) (*LightningNode,
error) {

return c.fetchLightningNode(nil, nodePub)
}

// fetchLightningNode attempts to look up a target node by its identity public
// key. If the node isn't found in the database, then ErrGraphNodeNotFound is
// returned. An optional transaction may be provided. If none is provided, then
// a new one will be created.
func (c *ChannelGraph) FetchLightningNode(tx kvdb.RTx, nodePub route.Vertex) (
*LightningNode, error) {
func (c *ChannelGraph) fetchLightningNode(tx kvdb.RTx,
nodePub route.Vertex) (*LightningNode, error) {

var node *LightningNode
fetch := func(tx kvdb.RTx) error {
Expand Down Expand Up @@ -3196,13 +3224,29 @@ func nodeTraversal(tx kvdb.RTx, nodePub []byte, db kvdb.Backend,
// halted with the error propagated back up to the caller.
//
// Unknown policies are passed into the callback as nil values.
func (c *ChannelGraph) ForEachNodeChannel(nodePub route.Vertex,
cb func(kvdb.RTx, *models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
*models.ChannelEdgePolicy) error) error {

return nodeTraversal(nil, nodePub[:], c.db, cb)
}

// ForEachNodeChannelTx iterates through all channels of the given node,
// executing the passed callback with an edge info structure and the policies
// of each end of the channel. The first edge policy is the outgoing edge *to*
// the connecting node, while the second is the incoming edge *from* the
// connecting node. If the callback returns an error, then the iteration is
// halted with the error propagated back up to the caller.
//
// Unknown policies are passed into the callback as nil values.
//
// If the caller wishes to re-use an existing boltdb transaction, then it
// should be passed as the first argument. Otherwise the first argument should
// should be passed as the first argument. Otherwise, the first argument should
// be nil and a fresh transaction will be created to execute the graph
// traversal.
func (c *ChannelGraph) ForEachNodeChannel(tx kvdb.RTx, nodePub route.Vertex,
cb func(kvdb.RTx, *models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
func (c *ChannelGraph) ForEachNodeChannelTx(tx kvdb.RTx,
nodePub route.Vertex, cb func(kvdb.RTx, *models.ChannelEdgeInfo,
*models.ChannelEdgePolicy,
*models.ChannelEdgePolicy) error) error {

return nodeTraversal(tx, nodePub[:], c.db, cb)
Expand Down Expand Up @@ -3705,7 +3749,7 @@ func (c *ChannelGraph) markEdgeLiveUnsafe(tx kvdb.RwTx, chanID uint64) error {
// We need to add the channel back into our graph cache, otherwise we
// won't use it for path finding.
if c.graphCache != nil {
edgeInfos, err := c.FetchChanInfos(tx, []uint64{chanID})
edgeInfos, err := c.fetchChanInfos(tx, []uint64{chanID})
if err != nil {
return err
}
Expand Down
20 changes: 10 additions & 10 deletions channeldb/graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func TestNodeInsertionAndDeletion(t *testing.T) {

// Next, fetch the node from the database to ensure everything was
// serialized properly.
dbNode, err := graph.FetchLightningNode(nil, testPub)
dbNode, err := graph.FetchLightningNode(testPub)
require.NoError(t, err, "unable to locate node")

if _, exists, err := graph.HasLightningNode(dbNode.PubKeyBytes); err != nil {
Expand All @@ -164,7 +164,7 @@ func TestNodeInsertionAndDeletion(t *testing.T) {

// Finally, attempt to fetch the node again. This should fail as the
// node should have been deleted from the database.
_, err = graph.FetchLightningNode(nil, testPub)
_, err = graph.FetchLightningNode(testPub)
if err != ErrGraphNodeNotFound {
t.Fatalf("fetch after delete should fail!")
}
Expand Down Expand Up @@ -192,7 +192,7 @@ func TestPartialNode(t *testing.T) {

// Next, fetch the node from the database to ensure everything was
// serialized properly.
dbNode, err := graph.FetchLightningNode(nil, testPub)
dbNode, err := graph.FetchLightningNode(testPub)
require.NoError(t, err, "unable to locate node")

if _, exists, err := graph.HasLightningNode(dbNode.PubKeyBytes); err != nil {
Expand Down Expand Up @@ -222,7 +222,7 @@ func TestPartialNode(t *testing.T) {

// Finally, attempt to fetch the node again. This should fail as the
// node should have been deleted from the database.
_, err = graph.FetchLightningNode(nil, testPub)
_, err = graph.FetchLightningNode(testPub)
if err != ErrGraphNodeNotFound {
t.Fatalf("fetch after delete should fail!")
}
Expand Down Expand Up @@ -1055,7 +1055,7 @@ func TestGraphTraversal(t *testing.T) {
// outgoing channels for a particular node.
numNodeChans := 0
firstNode, secondNode := nodeList[0], nodeList[1]
err = graph.ForEachNodeChannel(nil, firstNode.PubKeyBytes,
err = graph.ForEachNodeChannel(firstNode.PubKeyBytes,
func(_ kvdb.RTx, _ *models.ChannelEdgeInfo, outEdge,
inEdge *models.ChannelEdgePolicy) error {

Expand Down Expand Up @@ -2685,7 +2685,7 @@ func TestFetchChanInfos(t *testing.T) {
// We'll now attempt to query for the range of channel ID's we just
// inserted into the database. We should get the exact same set of
// edges back.
resp, err := graph.FetchChanInfos(nil, edgeQuery)
resp, err := graph.FetchChanInfos(edgeQuery)
require.NoError(t, err, "unable to fetch chan edges")
if len(resp) != len(edges) {
t.Fatalf("expected %v edges, instead got %v", len(edges),
Expand Down Expand Up @@ -2737,7 +2737,7 @@ func TestIncompleteChannelPolicies(t *testing.T) {
// Ensure that channel is reported with unknown policies.
checkPolicies := func(node *LightningNode, expectedIn, expectedOut bool) {
calls := 0
err := graph.ForEachNodeChannel(nil, node.PubKeyBytes,
err := graph.ForEachNodeChannel(node.PubKeyBytes,
func(_ kvdb.RTx, _ *models.ChannelEdgeInfo, outEdge,
inEdge *models.ChannelEdgePolicy) error {

Expand Down Expand Up @@ -3014,7 +3014,7 @@ func TestPruneGraphNodes(t *testing.T) {

// Finally, we'll ensure that node3, the only fully unconnected node as
// properly deleted from the graph and not another node in its place.
_, err = graph.FetchLightningNode(nil, node3.PubKeyBytes)
_, err = graph.FetchLightningNode(node3.PubKeyBytes)
if err == nil {
t.Fatalf("node 3 should have been deleted!")
}
Expand Down Expand Up @@ -3048,13 +3048,13 @@ func TestAddChannelEdgeShellNodes(t *testing.T) {

// Ensure that node1 was inserted as a full node, while node2 only has
// a shell node present.
node1, err = graph.FetchLightningNode(nil, node1.PubKeyBytes)
node1, err = graph.FetchLightningNode(node1.PubKeyBytes)
require.NoError(t, err, "unable to fetch node1")
if !node1.HaveNodeAnnouncement {
t.Fatalf("have shell announcement for node1, shouldn't")
}

node2, err = graph.FetchLightningNode(nil, node2.PubKeyBytes)
node2, err = graph.FetchLightningNode(node2.PubKeyBytes)
require.NoError(t, err, "unable to fetch node2")
if node2.HaveNodeAnnouncement {
t.Fatalf("should have shell announcement for node2, but is full")
Expand Down
Loading

0 comments on commit fae7e0c

Please sign in to comment.