Skip to content

Commit

Permalink
Revert "revert addrmgr"
Browse files Browse the repository at this point in the history
This reverts commit 8058f91.
  • Loading branch information
AeonSw4n committed Jan 18, 2024
1 parent 28657e8 commit a66cc02
Showing 1 changed file with 0 additions and 204 deletions.
204 changes: 0 additions & 204 deletions lib/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"encoding/hex"
"fmt"
"github.com/btcsuite/btcd/wire"
"github.com/deso-protocol/core/consensus"
"net"
"reflect"
Expand Down Expand Up @@ -122,15 +121,6 @@ type Server struct {
// requested data but have not yet received a response.
requestedTransactionsMap map[BlockHash]*GetDataRequestInfo

// addrsToBroadcast is a list of all the addresses we've received from valid addr
// messages that we intend to broadcast to our peers. It is organized as:
// <recipient address> -> <list of addresses we received from that recipient>.
//
// It is organized in this way so that we can limit the number of addresses we
// are distributing for a single peer to avoid a DOS attack.
addrsToBroadcastLock deadlock.RWMutex
addrsToBroadcastt map[string][]*SingleAddr

// When set to true, we disable the ConnectionManager
DisableNetworking bool

Expand Down Expand Up @@ -588,9 +578,6 @@ func NewServer(
srv.StartStatsdReporter()
}

// Initialize the addrs to broadcast map.
srv.addrsToBroadcastt = make(map[string][]*SingleAddr)

// This will initialize the request queues.
srv.ResetRequestQueues()

Expand Down Expand Up @@ -2168,89 +2155,6 @@ func (srv *Server) StartStatsdReporter() {
}()
}

func (srv *Server) _handleAddrMessage(pp *Peer, msg *MsgDeSoAddr) {
srv.addrsToBroadcastLock.Lock()
defer srv.addrsToBroadcastLock.Unlock()

glog.V(1).Infof("Server._handleAddrMessage: Received Addr from peer %v with addrs %v", pp, spew.Sdump(msg.AddrList))

// If this addr message contains more than the maximum allowed number of addresses
// then disconnect this peer.
if len(msg.AddrList) > MaxAddrsPerAddrMsg {
glog.Errorf(fmt.Sprintf("Server._handleAddrMessage: Disconnecting "+
"Peer %v for sending us an addr message with %d transactions, which exceeds "+
"the max allowed %d",
pp, len(msg.AddrList), MaxAddrsPerAddrMsg))
pp.Disconnect()
return
}

// Add all the addresses we received to the addrmgr.
netAddrsReceived := []*wire.NetAddress{}
for _, addr := range msg.AddrList {
addrAsNetAddr := wire.NewNetAddressIPPort(addr.IP, addr.Port, (wire.ServiceFlag)(addr.Services))
if !addrmgr.IsRoutable(addrAsNetAddr) {
glog.V(1).Infof("Dropping address %v from peer %v because it is not routable", addr, pp)
continue
}

netAddrsReceived = append(
netAddrsReceived, addrAsNetAddr)
}
// TODO: temporary
addressMgr := addrmgr.New("", net.LookupIP)
addressMgr.AddAddresses(netAddrsReceived, pp.netAddr)

// If the message had <= 10 addrs in it, then queue all the addresses for relaying
// on the next cycle.
if len(msg.AddrList) <= 10 {
glog.V(1).Infof("Server._handleAddrMessage: Queueing %d addrs for forwarding from "+
"peer %v", len(msg.AddrList), pp)
sourceAddr := &SingleAddr{
Timestamp: time.Now(),
IP: pp.netAddr.IP,
Port: pp.netAddr.Port,
Services: pp.serviceFlags,
}
listToAddTo, hasSeenSource := srv.addrsToBroadcastt[sourceAddr.StringWithPort(false /*includePort*/)]
if !hasSeenSource {
listToAddTo = []*SingleAddr{}
}
// If this peer has been sending us a lot of little crap, evict a lot of their
// stuff but don't disconnect.
if len(listToAddTo) > MaxAddrsPerAddrMsg {
listToAddTo = listToAddTo[:MaxAddrsPerAddrMsg/2]
}
listToAddTo = append(listToAddTo, msg.AddrList...)
srv.addrsToBroadcastt[sourceAddr.StringWithPort(false /*includePort*/)] = listToAddTo
}
}

func (srv *Server) _handleGetAddrMessage(pp *Peer, msg *MsgDeSoGetAddr) {
glog.V(1).Infof("Server._handleGetAddrMessage: Received GetAddr from peer %v", pp)
// When we get a GetAddr message, choose MaxAddrsPerMsg from the AddrMgr
// and send them back to the peer.
// TODO: temporary
addressMgr := addrmgr.New("", net.LookupIP)
netAddrsFound := addressMgr.AddressCache()
if len(netAddrsFound) > MaxAddrsPerAddrMsg {
netAddrsFound = netAddrsFound[:MaxAddrsPerAddrMsg]
}

// Convert the list to a SingleAddr list.
res := &MsgDeSoAddr{}
for _, netAddr := range netAddrsFound {
singleAddr := &SingleAddr{
Timestamp: time.Now(),
IP: netAddr.IP,
Port: netAddr.Port,
Services: (ServiceFlag)(netAddr.Services),
}
res.AddrList = append(res.AddrList, singleAddr)
}
pp.AddDeSoMessage(res, false)
}

func (srv *Server) _handleControlMessages(serverMessage *ServerMessage) (_shouldQuit bool) {
switch serverMessage.Msg.(type) {
// Control messages used internally to signal to the server.
Expand Down Expand Up @@ -2398,20 +2302,6 @@ func (srv *Server) _startConsensus() {

glog.V(2).Infof("Server._startConsensus: Handling message of type %v from Peer %v",
serverMessage.Msg.GetMsgType(), serverMessage.Peer)

// If the message is an addr message we handle it independent of whether or
// not the BitcoinManager is synced.
if serverMessage.Msg.GetMsgType() == MsgTypeAddr {
srv._handleAddrMessage(serverMessage.Peer, serverMessage.Msg.(*MsgDeSoAddr))
continue
}
// If the message is a GetAddr message we handle it independent of whether or
// not the BitcoinManager is synced.
if serverMessage.Msg.GetMsgType() == MsgTypeGetAddr {
srv._handleGetAddrMessage(serverMessage.Peer, serverMessage.Msg.(*MsgDeSoGetAddr))
continue
}

srv._handlePeerMessages(serverMessage)

// Always check for and handle control messages regardless of whether the
Expand All @@ -2432,98 +2322,6 @@ func (srv *Server) _startConsensus() {
glog.V(2).Info("Server.Start: Server done")
}

func (srv *Server) _getAddrsToBroadcast() []*SingleAddr {
srv.addrsToBroadcastLock.Lock()
defer srv.addrsToBroadcastLock.Unlock()

// If there's nothing in the map, return.
if len(srv.addrsToBroadcastt) == 0 {
return []*SingleAddr{}
}

// If we get here then we have some addresses to broadcast.
addrsToBroadcast := []*SingleAddr{}
for len(addrsToBroadcast) < 10 && len(srv.addrsToBroadcastt) > 0 {
// Choose a key at random. This works because map iteration is random in golang.
bucket := ""
for kk := range srv.addrsToBroadcastt {
bucket = kk
break
}

// Remove the last element from the slice for the given bucket.
currentAddrList := srv.addrsToBroadcastt[bucket]
if len(currentAddrList) > 0 {
lastIndex := len(currentAddrList) - 1
currentAddr := currentAddrList[lastIndex]
currentAddrList = currentAddrList[:lastIndex]
if len(currentAddrList) == 0 {
delete(srv.addrsToBroadcastt, bucket)
} else {
srv.addrsToBroadcastt[bucket] = currentAddrList
}

addrsToBroadcast = append(addrsToBroadcast, currentAddr)
}
}

return addrsToBroadcast
}

// Must be run inside a goroutine. Relays addresses to peers at regular intervals
// and relays our own address to peers once every 24 hours.
func (srv *Server) _startAddressRelayer() {
for numMinutesPassed := 0; ; numMinutesPassed++ {
if atomic.LoadInt32(&srv.shutdown) >= 1 {
break
}
// For the first ten minutes after the server starts, relay our address to all
// peers. After the first ten minutes, do it once every 24 hours.
// TODO: temporary
addressMgr := addrmgr.New("", net.LookupIP)
glog.V(1).Infof("Server.Start._startAddressRelayer: Relaying our own addr to peers")
if numMinutesPassed < 10 || numMinutesPassed%(RebroadcastNodeAddrIntervalMinutes) == 0 {
for _, pp := range srv.cmgr.GetAllPeers() {
bestAddress := addressMgr.GetBestLocalAddress(pp.netAddr)
if bestAddress != nil {
glog.V(2).Infof("Server.Start._startAddressRelayer: Relaying address %v to "+
"peer %v", bestAddress.IP.String(), pp)
pp.AddDeSoMessage(&MsgDeSoAddr{
AddrList: []*SingleAddr{
{
Timestamp: time.Now(),
IP: bestAddress.IP,
Port: bestAddress.Port,
Services: (ServiceFlag)(bestAddress.Services),
},
},
}, false)
}
}
}

glog.V(2).Infof("Server.Start._startAddressRelayer: Seeing if there are addrs to relay...")
// Broadcast the addrs we have to all of our peers.
addrsToBroadcast := srv._getAddrsToBroadcast()
if len(addrsToBroadcast) == 0 {
glog.V(2).Infof("Server.Start._startAddressRelayer: No addrs to relay.")
time.Sleep(AddrRelayIntervalSeconds * time.Second)
continue
}

glog.V(2).Infof("Server.Start._startAddressRelayer: Found %d addrs to "+
"relay: %v", len(addrsToBroadcast), spew.Sdump(addrsToBroadcast))
// Iterate over all our peers and broadcast the addrs to all of them.
for _, pp := range srv.cmgr.GetAllPeers() {
pp.AddDeSoMessage(&MsgDeSoAddr{
AddrList: addrsToBroadcast,
}, false)
}
time.Sleep(AddrRelayIntervalSeconds * time.Second)
continue
}
}

func (srv *Server) _startTransactionRelayer() {
// If we've set a maximum sync height, we will not relay transactions.
if srv.blockchain.MaxSyncBlockHeight > 0 {
Expand Down Expand Up @@ -2616,8 +2414,6 @@ func (srv *Server) Start() {

go srv._startConsensus()

go srv._startAddressRelayer()

go srv._startTransactionRelayer()

// Once the ConnectionManager is started, peers will be found and connected to and
Expand Down

0 comments on commit a66cc02

Please sign in to comment.