Skip to content

Commit

Permalink
multi: Automatically close non-reestablished channels
Browse files Browse the repository at this point in the history
This adds automation for closing channels that haven't been
reestablished for some time using the recently introduced metric.

Channels with peers that have been online for some time but which have
NOT been reestablished though the use of ChannelReestablish messages are
good candidates for being force closed, because it is likely the remote
peer has lost the ability to use them.

Given that the metric tracks the total time across restarts and only
while the remote peer is online, we use a default of 72h for the
threshold to auto close the channel.  This should be a reasonable
compromise between not closing too fast on hubs (that are online 24/7)
and ephemeral nodes (that may be online only for an hour or two a day).

A config parameter is added to control the threshold time used for
autoclosing and an itest is added that asserts the correct behavior.
  • Loading branch information
matheusd committed Dec 4, 2023
1 parent 678f2cb commit 2e6ca2c
Show file tree
Hide file tree
Showing 11 changed files with 316 additions and 2 deletions.
119 changes: 119 additions & 0 deletions automation/automation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package automation

import (
"context"
"time"

"github.com/decred/dcrlnd/channeldb"
"github.com/decred/dcrlnd/lncfg"
"github.com/decred/dcrlnd/lnrpc"
)

// Config are the config parameters for the automation server.
type Config struct {
*lncfg.Automation

// CloseChannel should be set to the rpc server function that allows
// closing a channel.
CloseChannel func(in *lnrpc.CloseChannelRequest,
updateStream lnrpc.Lightning_CloseChannelServer) error

DB *channeldb.DB
}

// Server is set of automation services for dcrlnd nodes.
type Server struct {
cfg *Config
ctx context.Context
cancelCtx func()
}

// NewServer creates a new automation server.
func NewServer(cfg *Config) *Server {
ctx, cancel := context.WithCancel(context.Background())
s := &Server{
cfg: cfg,
ctx: ctx,
cancelCtx: cancel,
}
return s
}

// runForceCloseStaleChanReestablish autocloses channels where a remote peer
// has been online without sending ChannelReestablish messages.
func (s *Server) runForceCloseStaleChanReestablish() {
// Use a default ticker for 1 hour, but reduce if the threshold is lower
// than that (useful for tests).
forceCloseInterval := time.Duration(s.cfg.ForceCloseChanReestablishWait) * time.Second
tickerInterval := time.Hour
if forceCloseInterval < tickerInterval {
tickerInterval = forceCloseInterval
}
log.Debugf("Performing force close check for stale channels based on "+
"ChannelReestablish every %s", tickerInterval)

ticker := time.NewTicker(tickerInterval)
for {
select {
case <-s.ctx.Done():
ticker.Stop()
return
case <-ticker.C:
}

log.Debugf("Time to check channels for force close due to stale " +
"chan reestablish messages")

chans, err := s.cfg.DB.FetchAllOpenChannels()
if err != nil {
log.Errorf("Unable to list open channels: %v", err)
continue
}

for _, c := range chans {
sid := c.ShortChannelID
waitTime, err := s.cfg.DB.GetChanReestablishWaitTime(sid)
if err != nil {
log.Errorf("Unable to get chan reestablish msg "+
"times for %s: %v", sid, err)
continue
}

if waitTime < forceCloseInterval {
log.Tracef("Skipping autoclose of %s due to low "+
"wait time %s", sid, waitTime)
continue
}

// Start force close.
chanPoint := c.FundingOutpoint
log.Infof("Starting force-close attempt of channel %s (%s) "+
"due to channel reestablish msg wait time %s greater "+
"than max interval %s", chanPoint,
sid, waitTime, forceCloseInterval)
go func() {
req := &lnrpc.CloseChannelRequest{
ChannelPoint: lnrpc.OutpointToChanPoint(&chanPoint),
Force: true,
}
err = s.cfg.CloseChannel(req, nil)
if err != nil {
log.Errorf("Unable to force-close channel %s: %v",
sid, err)
}
}()
}
}
}

func (s *Server) Start() error {
if s.cfg.ForceCloseChanReestablishWait > 0 {
go s.runForceCloseStaleChanReestablish()
}
return nil
}

func (s *Server) Stop() error {
s.cancelCtx()
return nil
}
29 changes: 29 additions & 0 deletions automation/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package automation

import (
"github.com/decred/dcrlnd/build"
"github.com/decred/slog"
)

// log is a logger that is initialized with no output filters. This
// means the package will not perform any logging by default until the caller
// requests it.
var log slog.Logger

// The default amount of logging is none.
func init() {
UseLogger(build.NewSubLogger("AUTO", nil))
}

// DisableLog disables all library log output. Logging output is disabled
// by default until UseLogger is called.
func DisableLog() {
UseLogger(slog.Disabled)
}

// UseLogger uses a specified Logger to output package logging info.
// This should be used in preference to SetLogWriter if the caller is also
// using slog.
func UseLogger(logger slog.Logger) {
log = logger
}
5 changes: 5 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,8 @@ type Config struct {

Autopilot *lncfg.AutoPilot `group:"Autopilot" namespace:"autopilot"`

Automation *lncfg.Automation `group:"Automation" namespace:"automation"`

Tor *lncfg.Tor `group:"Tor" namespace:"tor"`

SubRPCServers *subRPCServerConfigs `group:"subrpc"`
Expand Down Expand Up @@ -439,6 +441,9 @@ func DefaultConfig() Config {
"top_centrality": 1.0,
},
},
Automation: &lncfg.Automation{
ForceCloseChanReestablishWait: 60 * 60 * 24 * 3, // 3 days
},
PaymentsExpirationGracePeriod: defaultPaymentsExpirationGracePeriod,
TrickleDelay: defaultTrickleDelay,
ChanStatusSampleInterval: defaultChanStatusSampleInterval,
Expand Down
9 changes: 9 additions & 0 deletions lncfg/automation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package lncfg

// Automation holds some server level automation config options.
type Automation struct {
// ForceCloseChanReestablishWait is the time after which the automation
// server force closes a channel where the local peer has sent
// ChannelReestablish messages but the remote peer does not.
ForceCloseChanReestablishWait int64 `long:"closechanreestablishwait" description:"Force close a channel if the difference between time channel reestablish msgs were sent and received is higher than the specified one"`
}
12 changes: 12 additions & 0 deletions lnd.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"gopkg.in/macaroon.v2"

"github.com/decred/dcrd/txscript/v4/stdaddr"
"github.com/decred/dcrlnd/automation"
"github.com/decred/dcrlnd/autopilot"
"github.com/decred/dcrlnd/build"
"github.com/decred/dcrlnd/cert"
Expand Down Expand Up @@ -988,6 +989,17 @@ func Main(cfg *Config, lisCfg ListenerCfg, interceptor signal.Interceptor) error
defer tower.Stop()
}

// Start the dcrlnd specific automation services.
autoServer := automation.NewServer(&automation.Config{
Automation: cfg.Automation,
CloseChannel: rpcServer.CloseChannel,
DB: remoteChanDB,
})
if err := autoServer.Start(); err != nil {
return err
}
defer autoServer.Stop()

// Wait for shutdown signal from either a graceful server stop or from
// the interrupt handler.
<-interceptor.ShutdownChannel()
Expand Down
12 changes: 12 additions & 0 deletions lnrpc/rpc_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sort"

"github.com/decred/dcrd/chaincfg/chainhash"
"github.com/decred/dcrd/wire"
"github.com/decred/dcrlnd/lnwallet"
)

Expand Down Expand Up @@ -90,6 +91,17 @@ func ExtractMinConfs(minConfs int32, spendUnconfirmed bool) (int32, error) {
}
}

// OutpointToChanPoint transforms a standard wire outpoint (that represents a
// channel id) into a ChannelPoint.
func OutpointToChanPoint(out *wire.OutPoint) *ChannelPoint {
return &ChannelPoint{
FundingTxid: &ChannelPoint_FundingTxidBytes{
FundingTxidBytes: out.Hash[:],
},
OutputIndex: out.Index,
}
}

// GetChanPointFundingTxid returns the given channel point's funding txid in
// raw bytes.
func GetChanPointFundingTxid(chanPoint *ChannelPoint) (*chainhash.Hash, error) {
Expand Down
107 changes: 107 additions & 0 deletions lntest/itest/dcrlnd_automation_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package itest

import (
"time"

"github.com/decred/dcrd/dcrutil/v4"
"github.com/decred/dcrlnd/lnrpc"
"github.com/decred/dcrlnd/lntest"
"github.com/stretchr/testify/require"
"matheusd.com/testctx"
)

func testMissingChanReestablishAutoClosesChan(net *lntest.NetworkHarness, t *harnessTest) {
const (
chanAmt = dcrutil.Amount(10000000)
pushAmt = dcrutil.Amount(5000000)
)
password := []byte("El Psy Kongroo")
var err error

// Create a new retore scenario.
carolArgs := []string{"--automation.closechanreestablishwait=5"}
carol := net.NewNode(t.t, "carol", carolArgs)
defer shutdownAndAssert(net, t, carol)

daveArgs := []string{"--nolisten", "--minbackoff=1h"}
dave, mnemonic, _, err := net.NewNodeWithSeed(
"dave", daveArgs, password, false,
)
require.Nil(t.t, err)
defer shutdownAndAssert(net, t, dave)

net.SendCoins(testctx.New(t), t.t, dcrutil.AtomsPerCoin, carol)
net.SendCoins(testctx.New(t), t.t, dcrutil.AtomsPerCoin, dave)
net.EnsureConnected(testctx.New(t), t.t, dave, carol)

chanPoint := openChannelAndAssert(
testctx.New(t), t, net, carol, dave,
lntest.OpenChannelParams{
Amt: chanAmt,
PushAmt: pushAmt,
},
)

// Wait for both sides to see the opened channel.
err = dave.WaitForNetworkChannelOpen(testctx.New(t), chanPoint)
require.Nil(t.t, err)
err = carol.WaitForNetworkChannelOpen(testctx.New(t), chanPoint)
require.Nil(t.t, err)

// Perform a payment to assert channel is working.
invoice := &lnrpc.Invoice{
Memo: "testing",
Value: 100000,
}
invoiceResp, err := carol.AddInvoice(testctx.New(t), invoice)
require.Nil(t.t, err)
err = completePaymentRequests(
testctx.New(t), dave, dave.RouterClient,
[]string{invoiceResp.PaymentRequest}, true,
)
require.Nil(t.t, err)

// Recreate Dave without the channel.
err = net.ShutdownNode(dave)
require.Nil(t.t, err)
time.Sleep(time.Second)

daveRestored, err := net.RestoreNodeWithSeed(
"dave", nil, password, mnemonic, 1000,
nil, copyPorts(dave),
)
require.Nil(t.t, err)
assertNumPendingChannels(t, daveRestored, 0, 0, 0, 0)
assertNodeNumChannels(t, daveRestored, 0)
// ht.AssertNumEdges(daveRestored, 0, true)

// Assert Carol does not autoclose and Dave does not have the channel.
net.EnsureConnected(testctx.New(t), t.t, daveRestored, carol)
time.Sleep(time.Second)
assertNumPendingChannels(t, daveRestored, 0, 0, 0, 0)
assertNodeNumChannels(t, daveRestored, 0)
assertNumPendingChannels(t, carol, 0, 0, 0, 0)
assertNodeNumChannels(t, carol, 1)

// Assert Carol is tracking the time Dave has been online without
// reestablishing the channel.
require.Nil(t.t, net.DisconnectNodes(testctx.New(t), carol, daveRestored))
time.Sleep(time.Second)
chanInfo, err := carol.ListChannels(testctx.New(t), &lnrpc.ListChannelsRequest{})
require.Nil(t.t, err)
require.Len(t.t, chanInfo.Channels, 1)
require.Greater(t.t, chanInfo.Channels[0].ChanReestablishWaitTimeMs, int64(2000))

// Wait long enough for Carol's automation to want to force-close the
// channel.
net.EnsureConnected(testctx.New(t), t.t, daveRestored, carol)
time.Sleep(time.Second * 3)
err = net.ShutdownNode(daveRestored)
require.Nil(t.t, err)

// Assert Carol force-closes the channel.
assertNumPendingChannels(t, carol, 1, 0, 0, 0)
assertNodeNumChannels(t, carol, 0)
mineBlocks(t, net, 1, 1)
cleanupForceClose(t, net, carol, chanPoint)
}
4 changes: 4 additions & 0 deletions lntest/itest/lnd_test_list_on_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,10 @@ var allTestCases = []*testCase{
name: "connection timeout",
test: testNetworkConnectionTimeout,
},
{
name: "missing channelreestablish auto closes channel",
test: testMissingChanReestablishAutoClosesChan,
},
{
name: "stateless init",
test: testStatelessInit,
Expand Down
2 changes: 2 additions & 0 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dcrlnd

import (
"github.com/decred/dcrd/connmgr"
"github.com/decred/dcrlnd/automation"
"github.com/decred/dcrlnd/autopilot"
"github.com/decred/dcrlnd/build"
"github.com/decred/dcrlnd/chainntnfs"
Expand Down Expand Up @@ -134,6 +135,7 @@ func SetupLoggers(root *build.RotatingLogWriter, interceptor signal.Interceptor)
AddSubLogger(root, "KCHN", interceptor, keychain.UseLogger)
AddSubLogger(root, "CSCN", interceptor, chainscan.UseLogger)
AddSubLogger(root, "CSDR", interceptor, csdrivers.UseLogger)
AddSubLogger(root, "AUTO", interceptor, automation.UseLogger)

AddSubLogger(root, "LNWL", interceptor, lnwallet.UseLogger)
AddSubLogger(root, "DISC", interceptor, discovery.UseLogger)
Expand Down
6 changes: 4 additions & 2 deletions rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2266,8 +2266,10 @@ out:
rpcsLog.Tracef("[closechannel] sending update: %v",
rpcClosingUpdate)

if err := updateStream.Send(rpcClosingUpdate); err != nil {
return err
if updateStream != nil {
if err := updateStream.Send(rpcClosingUpdate); err != nil {
return err
}
}

// If a final channel closing updates is being sent,
Expand Down
13 changes: 13 additions & 0 deletions sample-dcrlnd.conf
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,19 @@
; 3)
; autopilot.conftarget=2

[automation]

; In seconds, how long the total difference between sent and received ChannelReestablish
; messages may grow after which a channel is force-closed. When this difference
; is large, it means the local node has connected to the remote peer and has sent
; ChannelReestablish messages, but the remote peer has not done the same, which
; is a hint that the channel is not usable anymore. Note that this is the
; total time a peer has been connected without reestablishing the channel
; across restarts and reconnections.
;
; If this is <= 0, then autoclosing due to this heuristic is disabled.
; closechanreestablishwait = 259200

[tor]
; Allow outbound and inbound connections to be routed through Tor
; tor.active=true
Expand Down

0 comments on commit 2e6ca2c

Please sign in to comment.