Skip to content

Commit

Permalink
p2p: add default timeouts for dht.FindPeer to prevent goroutines stuc…
Browse files Browse the repository at this point in the history
…k, increase default reconnect interval from 10s to 20s
  • Loading branch information
pymq committed Oct 28, 2024
1 parent 17fedca commit f1b056f
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 9 deletions.
2 changes: 1 addition & 1 deletion config/other.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func setDefaults(conf *Config, bus awlevent.Bus) {
conf.P2pNode.BootstrapPeers = make([]string, 0)
}
if conf.P2pNode.ReconnectionIntervalSec == 0 {
conf.P2pNode.ReconnectionIntervalSec = 10
conf.P2pNode.ReconnectionIntervalSec = 20
}
if conf.P2pNode.ParallelSendingStreamsCount == 0 {
conf.P2pNode.ParallelSendingStreamsCount = 1
Expand Down
12 changes: 9 additions & 3 deletions p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,11 @@ func (p *P2p) ConnectPeer(ctx context.Context, peerID peer.ID) error {
if p.IsConnected(peerID) {
return nil
}

// FindPeer runs until peer is found in DHT or context is cancelled, so a timeout is mandatory
ctx, cancel := context.WithTimeout(p.ctx, 10*time.Second)

Check failure on line 198 in p2p/p2p.go

View workflow job for this annotation

GitHub Actions / lint

SA4009(related information): assignment to ctx (staticcheck)
defer cancel()

peerInfo, err := p.FindPeer(ctx, peerID)
if err != nil {
return fmt.Errorf("could not find peer %s: %v", peerID.String(), err)
Expand Down Expand Up @@ -299,17 +304,18 @@ func (p *P2p) MaintainBackgroundConnections(ctx context.Context, interval time.D
}
p.connectToKnownPeers(ctx, interval, knownPeersIdsFunc())

t := time.NewTicker(interval)
defer t.Stop()
ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-t.C:
case <-ticker.C:
}

p.connectToKnownPeers(ctx, interval, knownPeersIdsFunc())
ticker.Reset(interval)
}
}

Expand Down
11 changes: 6 additions & 5 deletions service/auth_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package service
import (
"context"
"fmt"
"maps"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -91,7 +92,7 @@ func (s *AuthStatus) StatusStreamHandler(stream network.Stream) {
s.logger.Errorf("sending status info to %s as an answer: %v", peerID, err)
}

s.logger.Infof("successfully exchanged status info with %s (%s)", knownPeer.DisplayName(), peerID)
s.logger.Infof("successfully exchanged status info (inbound) with %s (%s)", knownPeer.DisplayName(), peerID)
if isBlocked {
return
}
Expand Down Expand Up @@ -133,6 +134,7 @@ func (s *AuthStatus) ExchangeNewStatusInfo(ctx context.Context, remotePeerID pee
return fmt.Errorf("receiving status info: %v", err)
}

s.logger.Infof("successfully exchanged status info (outbound) with %s (%s)", knownPeer.DisplayName(), remotePeerID.String())
if isBlocked {
return nil
}
Expand Down Expand Up @@ -346,10 +348,7 @@ func (s *AuthStatus) ExchangeStatusInfoWithAllKnownPeers(ctx context.Context) {
func (s *AuthStatus) BackgroundRetryAuthRequests(ctx context.Context) {
f := func() {
s.authsLock.RLock()
outgoingAuthsCopy := make(map[peer.ID]protocol.AuthPeer, len(s.outgoingAuths))
for key, val := range s.outgoingAuths {
outgoingAuthsCopy[key] = val
}
outgoingAuthsCopy := maps.Clone(s.outgoingAuths)
s.authsLock.RUnlock()

for peerID, auth := range outgoingAuthsCopy {
Expand All @@ -366,6 +365,7 @@ func (s *AuthStatus) BackgroundRetryAuthRequests(ctx context.Context) {
return
case <-ticker.C:
f()
ticker.Reset(backgroundRetryAuthRequests)
}
}
}
Expand All @@ -380,6 +380,7 @@ func (s *AuthStatus) BackgroundExchangeStatusInfo(ctx context.Context) {
return
case <-ticker.C:
s.ExchangeStatusInfoWithAllKnownPeers(ctx)
ticker.Reset(backgroundExchangeStatusInfoInterval)
}
}
}
Expand Down

0 comments on commit f1b056f

Please sign in to comment.