From 8058f9132aaf372762ca203aaaf97fe17a137e1c Mon Sep 17 00:00:00 2001 From: Piotr Nojszewski <29924594+AeonSw4n@users.noreply.github.com> Date: Mon, 15 Jan 2024 07:43:24 -0800 Subject: [PATCH] revert addrmgr --- lib/server.go | 204 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 204 insertions(+) diff --git a/lib/server.go b/lib/server.go index 3fdbbacb3..d4c371955 100644 --- a/lib/server.go +++ b/lib/server.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/hex" "fmt" + "github.com/btcsuite/btcd/wire" "github.com/deso-protocol/core/consensus" "net" "reflect" @@ -121,6 +122,15 @@ type Server struct { // requested data but have not yet received a response. requestedTransactionsMap map[BlockHash]*GetDataRequestInfo + // addrsToBroadcast is a list of all the addresses we've received from valid addr + // messages that we intend to broadcast to our peers. It is organized as: + // -> . + // + // It is organized in this way so that we can limit the number of addresses we + // are distributing for a single peer to avoid a DOS attack. + addrsToBroadcastLock deadlock.RWMutex + addrsToBroadcastt map[string][]*SingleAddr + // When set to true, we disable the ConnectionManager DisableNetworking bool @@ -578,6 +588,9 @@ func NewServer( srv.StartStatsdReporter() } + // Initialize the addrs to broadcast map. + srv.addrsToBroadcastt = make(map[string][]*SingleAddr) + // This will initialize the request queues. srv.ResetRequestQueues() @@ -2155,6 +2168,89 @@ func (srv *Server) StartStatsdReporter() { }() } +func (srv *Server) _handleAddrMessage(pp *Peer, msg *MsgDeSoAddr) { + srv.addrsToBroadcastLock.Lock() + defer srv.addrsToBroadcastLock.Unlock() + + glog.V(1).Infof("Server._handleAddrMessage: Received Addr from peer %v with addrs %v", pp, spew.Sdump(msg.AddrList)) + + // If this addr message contains more than the maximum allowed number of addresses + // then disconnect this peer. + if len(msg.AddrList) > MaxAddrsPerAddrMsg { + glog.Errorf(fmt.Sprintf("Server._handleAddrMessage: Disconnecting "+ + "Peer %v for sending us an addr message with %d transactions, which exceeds "+ + "the max allowed %d", + pp, len(msg.AddrList), MaxAddrsPerAddrMsg)) + pp.Disconnect() + return + } + + // Add all the addresses we received to the addrmgr. + netAddrsReceived := []*wire.NetAddress{} + for _, addr := range msg.AddrList { + addrAsNetAddr := wire.NewNetAddressIPPort(addr.IP, addr.Port, (wire.ServiceFlag)(addr.Services)) + if !addrmgr.IsRoutable(addrAsNetAddr) { + glog.V(1).Infof("Dropping address %v from peer %v because it is not routable", addr, pp) + continue + } + + netAddrsReceived = append( + netAddrsReceived, addrAsNetAddr) + } + // TODO: temporary + addressMgr := addrmgr.New("", net.LookupIP) + addressMgr.AddAddresses(netAddrsReceived, pp.netAddr) + + // If the message had <= 10 addrs in it, then queue all the addresses for relaying + // on the next cycle. + if len(msg.AddrList) <= 10 { + glog.V(1).Infof("Server._handleAddrMessage: Queueing %d addrs for forwarding from "+ + "peer %v", len(msg.AddrList), pp) + sourceAddr := &SingleAddr{ + Timestamp: time.Now(), + IP: pp.netAddr.IP, + Port: pp.netAddr.Port, + Services: pp.serviceFlags, + } + listToAddTo, hasSeenSource := srv.addrsToBroadcastt[sourceAddr.StringWithPort(false /*includePort*/)] + if !hasSeenSource { + listToAddTo = []*SingleAddr{} + } + // If this peer has been sending us a lot of little crap, evict a lot of their + // stuff but don't disconnect. + if len(listToAddTo) > MaxAddrsPerAddrMsg { + listToAddTo = listToAddTo[:MaxAddrsPerAddrMsg/2] + } + listToAddTo = append(listToAddTo, msg.AddrList...) + srv.addrsToBroadcastt[sourceAddr.StringWithPort(false /*includePort*/)] = listToAddTo + } +} + +func (srv *Server) _handleGetAddrMessage(pp *Peer, msg *MsgDeSoGetAddr) { + glog.V(1).Infof("Server._handleGetAddrMessage: Received GetAddr from peer %v", pp) + // When we get a GetAddr message, choose MaxAddrsPerMsg from the AddrMgr + // and send them back to the peer. + // TODO: temporary + addressMgr := addrmgr.New("", net.LookupIP) + netAddrsFound := addressMgr.AddressCache() + if len(netAddrsFound) > MaxAddrsPerAddrMsg { + netAddrsFound = netAddrsFound[:MaxAddrsPerAddrMsg] + } + + // Convert the list to a SingleAddr list. + res := &MsgDeSoAddr{} + for _, netAddr := range netAddrsFound { + singleAddr := &SingleAddr{ + Timestamp: time.Now(), + IP: netAddr.IP, + Port: netAddr.Port, + Services: (ServiceFlag)(netAddr.Services), + } + res.AddrList = append(res.AddrList, singleAddr) + } + pp.AddDeSoMessage(res, false) +} + func (srv *Server) _handleControlMessages(serverMessage *ServerMessage) (_shouldQuit bool) { switch serverMessage.Msg.(type) { // Control messages used internally to signal to the server. @@ -2302,6 +2398,20 @@ func (srv *Server) _startConsensus() { glog.V(2).Infof("Server._startConsensus: Handling message of type %v from Peer %v", serverMessage.Msg.GetMsgType(), serverMessage.Peer) + + // If the message is an addr message we handle it independent of whether or + // not the BitcoinManager is synced. + if serverMessage.Msg.GetMsgType() == MsgTypeAddr { + srv._handleAddrMessage(serverMessage.Peer, serverMessage.Msg.(*MsgDeSoAddr)) + continue + } + // If the message is a GetAddr message we handle it independent of whether or + // not the BitcoinManager is synced. + if serverMessage.Msg.GetMsgType() == MsgTypeGetAddr { + srv._handleGetAddrMessage(serverMessage.Peer, serverMessage.Msg.(*MsgDeSoGetAddr)) + continue + } + srv._handlePeerMessages(serverMessage) // Always check for and handle control messages regardless of whether the @@ -2322,6 +2432,98 @@ func (srv *Server) _startConsensus() { glog.V(2).Info("Server.Start: Server done") } +func (srv *Server) _getAddrsToBroadcast() []*SingleAddr { + srv.addrsToBroadcastLock.Lock() + defer srv.addrsToBroadcastLock.Unlock() + + // If there's nothing in the map, return. + if len(srv.addrsToBroadcastt) == 0 { + return []*SingleAddr{} + } + + // If we get here then we have some addresses to broadcast. + addrsToBroadcast := []*SingleAddr{} + for len(addrsToBroadcast) < 10 && len(srv.addrsToBroadcastt) > 0 { + // Choose a key at random. This works because map iteration is random in golang. + bucket := "" + for kk := range srv.addrsToBroadcastt { + bucket = kk + break + } + + // Remove the last element from the slice for the given bucket. + currentAddrList := srv.addrsToBroadcastt[bucket] + if len(currentAddrList) > 0 { + lastIndex := len(currentAddrList) - 1 + currentAddr := currentAddrList[lastIndex] + currentAddrList = currentAddrList[:lastIndex] + if len(currentAddrList) == 0 { + delete(srv.addrsToBroadcastt, bucket) + } else { + srv.addrsToBroadcastt[bucket] = currentAddrList + } + + addrsToBroadcast = append(addrsToBroadcast, currentAddr) + } + } + + return addrsToBroadcast +} + +// Must be run inside a goroutine. Relays addresses to peers at regular intervals +// and relays our own address to peers once every 24 hours. +func (srv *Server) _startAddressRelayer() { + for numMinutesPassed := 0; ; numMinutesPassed++ { + if atomic.LoadInt32(&srv.shutdown) >= 1 { + break + } + // For the first ten minutes after the server starts, relay our address to all + // peers. After the first ten minutes, do it once every 24 hours. + // TODO: temporary + addressMgr := addrmgr.New("", net.LookupIP) + glog.V(1).Infof("Server.Start._startAddressRelayer: Relaying our own addr to peers") + if numMinutesPassed < 10 || numMinutesPassed%(RebroadcastNodeAddrIntervalMinutes) == 0 { + for _, pp := range srv.cmgr.GetAllPeers() { + bestAddress := addressMgr.GetBestLocalAddress(pp.netAddr) + if bestAddress != nil { + glog.V(2).Infof("Server.Start._startAddressRelayer: Relaying address %v to "+ + "peer %v", bestAddress.IP.String(), pp) + pp.AddDeSoMessage(&MsgDeSoAddr{ + AddrList: []*SingleAddr{ + { + Timestamp: time.Now(), + IP: bestAddress.IP, + Port: bestAddress.Port, + Services: (ServiceFlag)(bestAddress.Services), + }, + }, + }, false) + } + } + } + + glog.V(2).Infof("Server.Start._startAddressRelayer: Seeing if there are addrs to relay...") + // Broadcast the addrs we have to all of our peers. + addrsToBroadcast := srv._getAddrsToBroadcast() + if len(addrsToBroadcast) == 0 { + glog.V(2).Infof("Server.Start._startAddressRelayer: No addrs to relay.") + time.Sleep(AddrRelayIntervalSeconds * time.Second) + continue + } + + glog.V(2).Infof("Server.Start._startAddressRelayer: Found %d addrs to "+ + "relay: %v", len(addrsToBroadcast), spew.Sdump(addrsToBroadcast)) + // Iterate over all our peers and broadcast the addrs to all of them. + for _, pp := range srv.cmgr.GetAllPeers() { + pp.AddDeSoMessage(&MsgDeSoAddr{ + AddrList: addrsToBroadcast, + }, false) + } + time.Sleep(AddrRelayIntervalSeconds * time.Second) + continue + } +} + func (srv *Server) _startTransactionRelayer() { // If we've set a maximum sync height, we will not relay transactions. if srv.blockchain.MaxSyncBlockHeight > 0 { @@ -2414,6 +2616,8 @@ func (srv *Server) Start() { go srv._startConsensus() + go srv._startAddressRelayer() + go srv._startTransactionRelayer() // Once the ConnectionManager is started, peers will be found and connected to and