Skip to content

Commit

Permalink
Revert "Another split"
Browse files Browse the repository at this point in the history
This reverts commit eaeec58.
  • Loading branch information
AeonSw4n committed Jan 18, 2024
1 parent 1854db3 commit 28657e8
Showing 1 changed file with 209 additions and 2 deletions.
211 changes: 209 additions & 2 deletions lib/connection_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"fmt"
"github.com/btcsuite/btcd/addrmgr"
"github.com/btcsuite/btcd/wire"
"github.com/davecgh/go-spew/spew"
"github.com/deso-protocol/core/bls"
"github.com/deso-protocol/go-deadlock"
"github.com/golang/glog"
"github.com/pkg/errors"
"net"
Expand Down Expand Up @@ -40,6 +42,15 @@ type ConnectionController struct {
// it's aware of at random and provides it to us.
AddrMgr *addrmgr.AddrManager

// addrsToBroadcast is a list of all the addresses we've received from valid addr
// messages that we intend to broadcast to our peers. It is organized as:
// <recipient address> -> <list of addresses we received from that recipient>.
//
// It is organized in this way so that we can limit the number of addresses we
// are distributing for a single peer to avoid a DOS attack.
addrsToBroadcastLock deadlock.RWMutex
addrsToBroadcast map[string][]*SingleAddr

// When --connectips is set, we don't connect to anything from the addrmgr.
connectIps []string

Expand Down Expand Up @@ -68,6 +79,7 @@ func NewConnectionController(params *DeSoParams, cmgr *ConnectionManager, handsh
rnManager: rnManager,
AddrMgr: addrMgr,
connectIps: connectIps,
addrsToBroadcast: make(map[string][]*SingleAddr),
targetOutboundPeers: targetOutboundPeers,
maxInboundPeers: maxInboundPeers,
limitOneInboundConnectionPerIP: limitOneInboundConnectionPerIP,
Expand Down Expand Up @@ -149,6 +161,67 @@ func (cc *ConnectionController) startNonValidatorConnector() {
}
}

// Must be run inside a goroutine. Relays addresses to peers at regular intervals
// and relays our own address to peers once every 24 hours.
func (cc *ConnectionController) startAddressRelayer() {
cc.startGroup.Done()
numMinutesPassed := 0
for {
select {
case <-cc.exitChan:
cc.exitGroup.Done()
return
case <-time.After(AddrRelayIntervalSeconds * time.Second):
// For the first ten minutes after the connection controller starts, relay our address to all
// peers. After the first ten minutes, do it once every 24 hours.
glog.V(1).Infof("ConnectionController.startAddressRelayer: Relaying our own addr to peers")
if numMinutesPassed < 10 || numMinutesPassed%(RebroadcastNodeAddrIntervalMinutes) == 0 {
// TODO: Change to retrieve all RemoteNodes from the indexer.
for _, pp := range cc.cmgr.GetAllPeers() {
bestAddress := cc.AddrMgr.GetBestLocalAddress(pp.netAddr)
if bestAddress != nil {
glog.V(2).Infof("ConnectionController.startAddressRelayer: Relaying address %v to "+
"peer %v", bestAddress.IP.String(), pp)
if err := cc.cmgr.SendMessage(&MsgDeSoAddr{
AddrList: []*SingleAddr{
{
Timestamp: time.Now(),
IP: bestAddress.IP,
Port: bestAddress.Port,
Services: (ServiceFlag)(bestAddress.Services),
},
},
}, pp.ID); err != nil {
glog.Errorf("ConnectionController.startAddressRelayer: Problem sending "+
"MsgDeSoAddr to peer %v: %v", pp, err)
}
}
}
}

glog.V(2).Infof("ConnectionController.startAddressRelayer: Seeing if there are addrs to relay...")
// Broadcast the addrs we have to all of our peers.
addrsToBroadcast := cc.getAddrsToBroadcast()
if len(addrsToBroadcast) == 0 {
glog.V(2).Infof("ConnectionController.startAddressRelayer: No addrs to relay.")
time.Sleep(AddrRelayIntervalSeconds * time.Second)
continue
}

glog.V(2).Infof("ConnectionController.startAddressRelayer: Found %d addrs to "+
"relay: %v", len(addrsToBroadcast), spew.Sdump(addrsToBroadcast))
// Iterate over all our peers and broadcast the addrs to all of them.
for _, pp := range cc.cmgr.GetAllPeers() {
pp.AddDeSoMessage(&MsgDeSoAddr{
AddrList: addrsToBroadcast,
}, false)
}
time.Sleep(AddrRelayIntervalSeconds * time.Second)
numMinutesPassed++
}
}
}

// ###########################
// ## Handlers (Peer, DeSoMessage)
// ###########################
Expand All @@ -166,15 +239,110 @@ func (cc *ConnectionController) _handleAddrMessage(origin *Peer, desoMsg DeSoMes
return
}

// TODO
id := NewRemoteNodeId(origin.ID)
var msg *MsgDeSoAddr
var ok bool
if msg, ok = desoMsg.(*MsgDeSoAddr); !ok {
glog.Errorf("ConnectionController._handleAddrMessage: Problem decoding "+
"MsgDeSoAddr: %v", spew.Sdump(desoMsg))
cc.rnManager.DisconnectById(id)
return
}

cc.addrsToBroadcastLock.Lock()
defer cc.addrsToBroadcastLock.Unlock()

glog.V(1).Infof("ConnectionController._handleAddrMessage: Received Addr from peer %v with addrs %v", origin, spew.Sdump(msg.AddrList))

// If this addr message contains more than the maximum allowed number of addresses
// then disconnect this peer.
if len(msg.AddrList) > MaxAddrsPerAddrMsg {
glog.Errorf(fmt.Sprintf("ConnectionController._handleAddrMessage: Disconnecting "+
"Peer %v for sending us an addr message with %d transactions, which exceeds "+
"the max allowed %d",
origin, len(msg.AddrList), MaxAddrsPerAddrMsg))

cc.rnManager.DisconnectById(id)
return
}

// Add all the addresses we received to the addrmgr.
netAddrsReceived := []*wire.NetAddress{}
for _, addr := range msg.AddrList {
addrAsNetAddr := wire.NewNetAddressIPPort(addr.IP, addr.Port, (wire.ServiceFlag)(addr.Services))
if !addrmgr.IsRoutable(addrAsNetAddr) {
glog.V(1).Infof("Dropping address %v from peer %v because it is not routable", addr, origin)
continue
}

netAddrsReceived = append(
netAddrsReceived, addrAsNetAddr)
}
cc.AddrMgr.AddAddresses(netAddrsReceived, origin.netAddr)

// If the message had <= 10 addrs in it, then queue all the addresses for relaying
// on the next cycle.
if len(msg.AddrList) <= 10 {
glog.V(1).Infof("ConnectionController._handleAddrMessage: Queueing %d addrs for forwarding from "+
"peer %v", len(msg.AddrList), origin)
sourceAddr := &SingleAddr{
Timestamp: time.Now(),
IP: origin.netAddr.IP,
Port: origin.netAddr.Port,
Services: origin.serviceFlags,
}
listToAddTo, hasSeenSource := cc.addrsToBroadcast[sourceAddr.StringWithPort(false /*includePort*/)]
if !hasSeenSource {
listToAddTo = []*SingleAddr{}
}
// If this peer has been sending us a lot of little crap, evict a lot of their
// stuff but don't disconnect.
if len(listToAddTo) > MaxAddrsPerAddrMsg {
listToAddTo = listToAddTo[:MaxAddrsPerAddrMsg/2]
}
listToAddTo = append(listToAddTo, msg.AddrList...)
cc.addrsToBroadcast[sourceAddr.StringWithPort(false /*includePort*/)] = listToAddTo
}
}

func (cc *ConnectionController) _handleGetAddrMessage(origin *Peer, desoMsg DeSoMessage) {
if desoMsg.GetMsgType() != MsgTypeGetAddr {
return
}

// TODO
id := NewRemoteNodeId(origin.ID)
if _, ok := desoMsg.(*MsgDeSoGetAddr); !ok {
glog.Errorf("ConnectionController._handleAddrMessage: Problem decoding "+
"MsgDeSoAddr: %v", spew.Sdump(desoMsg))
cc.rnManager.DisconnectById(id)
return
}

glog.V(1).Infof("Server._handleGetAddrMessage: Received GetAddr from peer %v", origin)
// When we get a GetAddr message, choose MaxAddrsPerMsg from the AddrMgr
// and send them back to the peer.
netAddrsFound := cc.AddrMgr.AddressCache()
if len(netAddrsFound) > MaxAddrsPerAddrMsg {
netAddrsFound = netAddrsFound[:MaxAddrsPerAddrMsg]
}

// Convert the list to a SingleAddr list.
res := &MsgDeSoAddr{}
for _, netAddr := range netAddrsFound {
singleAddr := &SingleAddr{
Timestamp: time.Now(),
IP: netAddr.IP,
Port: netAddr.Port,
Services: (ServiceFlag)(netAddr.Services),
}
res.AddrList = append(res.AddrList, singleAddr)
}
rn := cc.rnManager.GetRemoteNodeById(id)
if err := cc.rnManager.SendMessage(rn, res); err != nil {
glog.Errorf("Server._handleGetAddrMessage: Problem sending addr message to peer %v: %v", origin, err)
cc.rnManager.DisconnectById(id)
return
}
}

// _handleNewConnectionMessage is called when a new outbound or inbound connection is established. It is responsible
Expand Down Expand Up @@ -604,3 +772,42 @@ func (cc *ConnectionController) isDuplicateInboundIPAddress(addr net.Addr) bool

return cc.cmgr.IsDuplicateInboundIPAddress(netAddr)
}

func (cc *ConnectionController) getAddrsToBroadcast() []*SingleAddr {
cc.addrsToBroadcastLock.Lock()
defer cc.addrsToBroadcastLock.Unlock()

// If there's nothing in the map, return.
if len(cc.addrsToBroadcast) == 0 {
return []*SingleAddr{}
}

// If we get here then we have some addresses to broadcast.
addrsToBroadcast := []*SingleAddr{}
for uint32(len(addrsToBroadcast)) < cc.params.MaxAddressesToBroadcast &&
len(cc.addrsToBroadcast) > 0 {
// Choose a key at random. This works because map iteration is random in golang.
bucket := ""
for kk := range cc.addrsToBroadcast {
bucket = kk
break
}

// Remove the last element from the slice for the given bucket.
currentAddrList := cc.addrsToBroadcast[bucket]
if len(currentAddrList) > 0 {
lastIndex := len(currentAddrList) - 1
currentAddr := currentAddrList[lastIndex]
currentAddrList = currentAddrList[:lastIndex]
if len(currentAddrList) == 0 {
delete(cc.addrsToBroadcast, bucket)
} else {
cc.addrsToBroadcast[bucket] = currentAddrList
}

addrsToBroadcast = append(addrsToBroadcast, currentAddr)
}
}

return addrsToBroadcast
}

0 comments on commit 28657e8

Please sign in to comment.