Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

routing: shutdown chanrouter correctly. #8497

Merged
merged 6 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion chainntnfs/bitcoindnotify/bitcoind.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,12 @@ func (b *BitcoindNotifier) Stop() error {

close(epochClient.epochChan)
}
b.txNotifier.TearDown()

// The txNotifier is only initialized in the start method therefore we
// need to make sure we don't access a nil pointer here.
if b.txNotifier != nil {
b.txNotifier.TearDown()
ziggie1984 marked this conversation as resolved.
Show resolved Hide resolved
}

// Stop the mempool notifier.
b.memNotifier.TearDown()
Expand Down
7 changes: 6 additions & 1 deletion chainntnfs/neutrinonotify/neutrino.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,12 @@ func (n *NeutrinoNotifier) Stop() error {

close(epochClient.epochChan)
}
n.txNotifier.TearDown()

// The txNotifier is only initialized in the start method therefore we
// need to make sure we don't access a nil pointer here.
if n.txNotifier != nil {
n.txNotifier.TearDown()
}

return nil
}
Expand Down
32 changes: 28 additions & 4 deletions chanfitness/chaneventstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ package chanfitness

import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/btcsuite/btcd/wire"
Expand Down Expand Up @@ -48,6 +50,9 @@ var (
// ChannelEventStore maintains a set of event logs for the node's channels to
// provide insight into the performance and health of channels.
type ChannelEventStore struct {
started atomic.Bool
stopped atomic.Bool

cfg *Config

// peers tracks all of our currently monitored peers and their channels.
Expand Down Expand Up @@ -142,7 +147,11 @@ func NewChannelEventStore(config *Config) *ChannelEventStore {
// information from the store. If this function fails, it cancels its existing
// subscriptions and returns an error.
func (c *ChannelEventStore) Start() error {
log.Info("ChannelEventStore starting")
log.Info("ChannelEventStore starting...")

if c.started.Swap(true) {
ziggie1984 marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("ChannelEventStore started more than once")
}

// Create a subscription to channel events.
channelClient, err := c.cfg.SubscribeChannelEvents()
Expand Down Expand Up @@ -198,21 +207,36 @@ func (c *ChannelEventStore) Start() error {
cancel: cancel,
})

log.Debug("ChannelEventStore started")

return nil
}

// Stop terminates all goroutines started by the event store.
func (c *ChannelEventStore) Stop() {
func (c *ChannelEventStore) Stop() error {
log.Info("ChannelEventStore shutting down...")
defer log.Debug("ChannelEventStore shutdown complete")

if c.stopped.Swap(true) {
return fmt.Errorf("ChannelEventStore stopped more than once")
}

// Stop the consume goroutine.
close(c.quit)
c.wg.Wait()

// Stop the ticker after the goroutine reading from it has exited, to
// avoid a race.
c.cfg.FlapCountTicker.Stop()
var err error
if c.cfg.FlapCountTicker == nil {
ziggie1984 marked this conversation as resolved.
Show resolved Hide resolved
err = fmt.Errorf("ChannelEventStore FlapCountTicker not " +
"initialized")
} else {
c.cfg.FlapCountTicker.Stop()
}

log.Debugf("ChannelEventStore shutdown complete")

return err
}

// addChannel checks whether we are already tracking a channel's peer, creates a
Expand Down
7 changes: 6 additions & 1 deletion discovery/gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,12 @@ func (d *AuthenticatedGossiper) stop() {
log.Debug("Authenticated Gossiper is stopping")
defer log.Debug("Authenticated Gossiper stopped")

d.blockEpochs.Cancel()
// `blockEpochs` is only initialized in the start routine so we make
// sure we don't panic here in the case where the `Stop` method is
// called when the `Start` method does not complete.
if d.blockEpochs != nil {
ziggie1984 marked this conversation as resolved.
Show resolved Hide resolved
d.blockEpochs.Cancel()
}

d.syncMgr.Stop()

Expand Down
4 changes: 4 additions & 0 deletions docs/release-notes/release-notes-0.18.3.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@

* [Fixed a bug](https://github.com/lightningnetwork/lnd/pull/8896) that caused
LND to use a default fee rate for the batch channel opening flow.

* [Fixed](https://github.com/lightningnetwork/lnd/pull/8497) a case where LND
ziggie1984 marked this conversation as resolved.
Show resolved Hide resolved
would not shut down properly when interrupted via e.g. SIGTERM. Moreover, LND
now shutsdown correctly in case one subsystem fails to startup.

* The fee limit for payments [was made
compatible](https://github.com/lightningnetwork/lnd/pull/8941) with inbound
Expand Down
5 changes: 4 additions & 1 deletion graph/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,8 @@ func (b *Builder) Start() error {
b.wg.Add(1)
go b.networkHandler()

log.Debug("Builder started")

return nil
}

Expand All @@ -312,7 +314,6 @@ func (b *Builder) Stop() error {
}

log.Info("Builder shutting down...")
defer log.Debug("Builder shutdown complete")
ziggie1984 marked this conversation as resolved.
Show resolved Hide resolved

// Our filtered chain view could've only been started if
// AssumeChannelValid isn't present.
Expand All @@ -325,6 +326,8 @@ func (b *Builder) Stop() error {
close(b.quit)
b.wg.Wait()

log.Debug("Builder shutdown complete")

return nil
}

Expand Down
26 changes: 25 additions & 1 deletion htlcswitch/interceptable_switch.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"crypto/sha256"
"fmt"
"sync"
"sync/atomic"

"github.com/go-errors/errors"
"github.com/lightningnetwork/lnd/chainntnfs"
Expand Down Expand Up @@ -33,6 +34,9 @@ var (
// Settle - routes UpdateFulfillHTLC to the originating link.
// Fail - routes UpdateFailHTLC to the originating link.
type InterceptableSwitch struct {
started atomic.Bool
stopped atomic.Bool

// htlcSwitch is the underline switch
htlcSwitch *Switch

Expand Down Expand Up @@ -201,6 +205,12 @@ func (s *InterceptableSwitch) SetInterceptor(
}

func (s *InterceptableSwitch) Start() error {
log.Info("InterceptableSwitch starting...")

if s.started.Swap(true) {
return fmt.Errorf("InterceptableSwitch started more than once")
}

blockEpochStream, err := s.notifier.RegisterBlockEpochNtfn(nil)
if err != nil {
return err
Expand All @@ -217,14 +227,28 @@ func (s *InterceptableSwitch) Start() error {
}
}()

log.Debug("InterceptableSwitch started")

return nil
}

func (s *InterceptableSwitch) Stop() error {
log.Info("InterceptableSwitch shutting down...")

if s.stopped.Swap(true) {
return fmt.Errorf("InterceptableSwitch stopped more than once")
}

close(s.quit)
s.wg.Wait()

s.blockEpochStream.Cancel()
// We need to check whether the start routine run and initialized the
// `blockEpochStream`.
if s.blockEpochStream != nil {
s.blockEpochStream.Cancel()
}

log.Debug("InterceptableSwitch shutdown complete")

return nil
}
Expand Down
14 changes: 9 additions & 5 deletions htlcswitch/link.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,14 +562,18 @@ func (l *channelLink) Stop() {
}

// Ensure the channel for the timer is drained.
if !l.updateFeeTimer.Stop() {
select {
case <-l.updateFeeTimer.C:
default:
if l.updateFeeTimer != nil {
if !l.updateFeeTimer.Stop() {
select {
case <-l.updateFeeTimer.C:
default:
}
}
}

l.hodlQueue.Stop()
if l.hodlQueue != nil {
l.hodlQueue.Stop()
}

close(l.quit)
l.wg.Wait()
Expand Down
53 changes: 40 additions & 13 deletions invoices/invoiceregistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ func (r *htlcReleaseEvent) Less(other queue.PriorityQueueItem) bool {
// created by the daemon. The registry is a thin wrapper around a map in order
// to ensure that all updates/reads are thread safe.
type InvoiceRegistry struct {
started atomic.Bool
stopped atomic.Bool

sync.RWMutex

nextClientID uint32 // must be used atomically
Expand Down Expand Up @@ -213,42 +216,66 @@ func (i *InvoiceRegistry) scanInvoicesOnStart(ctx context.Context) error {

// Start starts the registry and all goroutines it needs to carry out its task.
func (i *InvoiceRegistry) Start() error {
// Start InvoiceExpiryWatcher and prepopulate it with existing active
// invoices.
err := i.expiryWatcher.Start(func(hash lntypes.Hash, force bool) error {
return i.cancelInvoiceImpl(context.Background(), hash, force)
})
var err error

log.Info("InvoiceRegistry starting...")

if i.started.Swap(true) {
return fmt.Errorf("InvoiceRegistry started more than once")
}
// Start InvoiceExpiryWatcher and prepopulate it with existing
// active invoices.
err = i.expiryWatcher.Start(
ziggie1984 marked this conversation as resolved.
Show resolved Hide resolved
func(hash lntypes.Hash, force bool) error {
return i.cancelInvoiceImpl(
context.Background(), hash, force,
)
})
if err != nil {
return err
}

log.Info("InvoiceRegistry starting")

i.wg.Add(1)
go i.invoiceEventLoop()

// Now scan all pending and removable invoices to the expiry watcher or
// delete them.
// Now scan all pending and removable invoices to the expiry
// watcher or delete them.
err = i.scanInvoicesOnStart(context.Background())
if err != nil {
_ = i.Stop()
return err
}

return nil
log.Debug("InvoiceRegistry started")

return err
}

// Stop signals the registry for a graceful shutdown.
func (i *InvoiceRegistry) Stop() error {
log.Info("InvoiceRegistry shutting down...")

if i.stopped.Swap(true) {
return fmt.Errorf("InvoiceRegistry stopped more than once")
}

log.Info("InvoiceRegistry shutting down...")
defer log.Debug("InvoiceRegistry shutdown complete")

i.expiryWatcher.Stop()
var err error
if i.expiryWatcher == nil {
err = fmt.Errorf("InvoiceRegistry expiryWatcher is not " +
"initialized")
} else {
i.expiryWatcher.Stop()
}

close(i.quit)

i.wg.Wait()
return nil

log.Debug("InvoiceRegistry shutdown complete")

return err
}

// invoiceEvent represents a new event that has modified on invoice on disk.
Expand Down
28 changes: 25 additions & 3 deletions lnd.go
Original file line number Diff line number Diff line change
Expand Up @@ -674,11 +674,33 @@ func Main(cfg *Config, lisCfg ListenerCfg, implCfg *ImplementationCfg,
bestHeight)

// With all the relevant chains initialized, we can finally start the
// server itself.
if err := server.Start(); err != nil {
// server itself. We start the server in an asynchronous goroutine so
// that we are able to interrupt and shutdown the daemon gracefully in
// case the startup of the subservers do not behave as expected.
errChan := make(chan error)
ziggie1984 marked this conversation as resolved.
Show resolved Hide resolved
go func() {
errChan <- server.Start()
}()

defer func() {
err := server.Stop()
if err != nil {
ltndLog.Warnf("Stopping the server including all "+
"its subsystems failed with %v", err)
}
}()

select {
case err := <-errChan:
if err == nil {
break
}

return mkErr("unable to start server: %v", err)

case <-interceptor.ShutdownChannel():
return nil
}
defer server.Stop()

// We transition the server state to Active, as the server is up.
interceptorChain.SetServerActive()
Expand Down
4 changes: 3 additions & 1 deletion lnwallet/chainfee/estimator.go
Original file line number Diff line number Diff line change
Expand Up @@ -884,7 +884,9 @@ func (w *WebAPIEstimator) Stop() error {
return nil
}

w.updateFeeTicker.Stop()
if w.updateFeeTicker != nil {
w.updateFeeTicker.Stop()
}

close(w.quit)
w.wg.Wait()
Expand Down
Loading
Loading