Skip to content

Commit

Permalink
Revert "Code split"
Browse files Browse the repository at this point in the history
This reverts commit 104c3ec.
  • Loading branch information
AeonSw4n committed Jan 15, 2024
1 parent 8058f91 commit 1638758
Showing 1 changed file with 226 additions and 0 deletions.
226 changes: 226 additions & 0 deletions lib/connection_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net"
"strconv"
"sync"
"time"
)

// ConnectionController is a structure that oversees all connections to remote nodes. It is responsible for kicking off
Expand Down Expand Up @@ -46,6 +47,10 @@ type ConnectionController struct {
// When true, only one connection per IP is allowed. Prevents eclipse attacks
// among other things.
limitOneInboundConnectionPerIP bool

startGroup sync.WaitGroup
exitChan chan struct{}
exitGroup sync.WaitGroup
}

func NewConnectionController(params *DeSoParams, cmgr *ConnectionManager, handshakeController *HandshakeController,
Expand All @@ -62,13 +67,87 @@ func NewConnectionController(params *DeSoParams, cmgr *ConnectionManager, handsh
targetOutboundPeers: targetOutboundPeers,
maxInboundPeers: maxInboundPeers,
limitOneInboundConnectionPerIP: limitOneInboundConnectionPerIP,
exitChan: make(chan struct{}),
}
}

func (cc *ConnectionController) Start() {
cc.startGroup.Add(3)
// Start the validator connector
go cc.startValidatorConnector()

cc.startGroup.Wait()
cc.exitGroup.Add(3)
}

func (cc *ConnectionController) Stop() {
close(cc.exitChan)
cc.exitGroup.Wait()
}

func (cc *ConnectionController) GetRemoteNodeManager() *RemoteNodeManager {
return cc.rnManager
}

func (cc *ConnectionController) initiatePersistentConnections() {
// This is a hack to make outbound connections go away.
if cc.targetOutboundPeers == 0 {
return
}
if len(cc.connectIps) > 0 {
// Connect to addresses passed via the --connect-ips flag. These addresses
// are persistent in the sense that if we disconnect from one, we will
// try to reconnect to the same one.
for _, connectIp := range cc.connectIps {
if err := cc.CreateNonValidatorPersistentOutboundConnection(connectIp); err != nil {
glog.Errorf("ConnectionController.initiatePersistentConnections: Problem connecting "+
"to connectIp %v: %v", connectIp, err)
}
}
}
}

func (cc *ConnectionController) startValidatorConnector() {
cc.startGroup.Done()
for {
select {
case <-cc.exitChan:
cc.exitGroup.Done()
return
case <-time.After(1 * time.Minute):
cc.validatorMapLock.Lock()
activeValidatorsMap := cc.getActiveValidators()
cc.refreshValidatorIndex(activeValidatorsMap)
cc.connectValidators(activeValidatorsMap)
cc.validatorMapLock.Unlock()
}
}
}

func (cc *ConnectionController) startPeerConnector() {
cc.startGroup.Done()

for {
select {
case <-cc.exitChan:
cc.exitGroup.Done()
return
case <-time.After(1 * time.Second):
// Only connect to addresses from the addrmgr if we don't specify --connect-ips.
// These addresses are *not* persistent, meaning if we disconnect from one we'll
// try a different one.
// TODO: Do we still want this?
if len(cc.connectIps) == 0 {
continue
}

cc.refreshOutboundIndex()
cc.refreshInboundIndex()
cc.connectPeers()
}
}
}

// ###########################
// ## Handlers (Peer, DeSoMessage)
// ###########################
Expand Down Expand Up @@ -147,10 +226,157 @@ func (cc *ConnectionController) cleanupFailedOutboundConnection(connection Conne
cc.cmgr.RemoveAttemptedOutboundAddrs(oc.address)
}

// ###########################
// ## Validator Connections
// ###########################

func (cc *ConnectionController) refreshValidatorIndex(activeValidatorsMap map[bls.SerializedPublicKey]*ValidatorEntry) {
// De-index inactive validators. Doesn't matter if the RemoteNodes are connected or not, if we exceed the number of
// outbound/inbound peers, these RemoteNodes will be Disconnected later.
// FIXME: Should we care about never-connecting persistent dials?
validatorRemoteNodeMap := cc.rnManager.GetValidatorIndex().Copy()
for pk, rn := range validatorRemoteNodeMap {
if _, ok := activeValidatorsMap[pk]; !ok {
cc.rnManager.SetNonValidator(rn)
cc.rnManager.UnsetValidator(rn)
}
}

// Look for validators in our existing outbound / inbound connections.
allNonValidators := cc.rnManager.GetAllNonValidators()
for _, rn := range allNonValidators {
pk := rn.GetValidatorPublicKey()
if pk == nil {
continue
}
if _, ok := activeValidatorsMap[pk.Serialize()]; ok {
cc.rnManager.SetValidator(rn)
cc.rnManager.UnsetNonValidator(rn)
}
}
}

func (cc *ConnectionController) connectValidators(activeValidatorsMap map[bls.SerializedPublicKey]*ValidatorEntry) {
for pk, validator := range activeValidatorsMap {
_, exists := cc.rnManager.GetValidatorIndex().Get(pk)
if !exists {
// FIXME: for now we'll only use the first address in the ValidatorEntry
publicKey, err := pk.Deserialize()
if err != nil {
continue
}

address := string(validator.Domains[0])
if err := cc.CreateValidatorConnection(address, publicKey); err != nil {
// TODO: Do we want to log an error here?
continue
}
}
}
}

// ###########################
// ## Connections
// ###########################

func (cc *ConnectionController) connectPeers() {
numOutboundPeers := uint32(cc.rnManager.GetNonValidatorOutboundIndex().Count())

remainingOutboundPeers := uint32(0)
if numOutboundPeers < cc.targetOutboundPeers {
remainingOutboundPeers = cc.targetOutboundPeers - numOutboundPeers
}
for ii := uint32(0); ii < remainingOutboundPeers; ii++ {
addr := cc.getRandomUnconnectedAddress()
cc.AddrMgr.Attempt(addr)
// FIXME: error handle
cc.rnManager.CreateNonValidatorOutboundConnection(addr)
}
}

func (cc *ConnectionController) refreshOutboundIndex() {
numOutboundPeers := uint32(cc.rnManager.GetNonValidatorOutboundIndex().Count())

excessiveOutboundPeers := uint32(0)
if numOutboundPeers > cc.targetOutboundPeers {
excessiveOutboundPeers = numOutboundPeers - cc.targetOutboundPeers
}
// Disconnect random outbound peers if we have too many peers.
allOutboundRemoteNodes := cc.rnManager.GetNonValidatorOutboundIndex().GetAll()
var attemptedOutboundRemoteNodes, connectedOutboundRemoteNodes []*RemoteNode

for _, rn := range allOutboundRemoteNodes {
if rn.IsHandshakeCompleted() {
connectedOutboundRemoteNodes = append(connectedOutboundRemoteNodes, rn)
} else {
attemptedOutboundRemoteNodes = append(attemptedOutboundRemoteNodes, rn)
}
}
for _, rn := range attemptedOutboundRemoteNodes {
if excessiveOutboundPeers == 0 {
break
}
cc.rnManager.Disconnect(rn)
excessiveOutboundPeers--
}
for _, rn := range connectedOutboundRemoteNodes {
if excessiveOutboundPeers == 0 {
break
}
cc.rnManager.Disconnect(rn)
excessiveOutboundPeers--
}
}

func (cc *ConnectionController) refreshInboundIndex() {
numConnectedInboundPeers := uint32(cc.rnManager.GetNonValidatorInboundIndex().Count())

excessiveInboundPeers := uint32(0)
if numConnectedInboundPeers > cc.maxInboundPeers {
excessiveInboundPeers = numConnectedInboundPeers - cc.maxInboundPeers
}
// Disconnect random inbound peers if we have too many peers.
inboundRemoteNodes := cc.rnManager.GetNonValidatorInboundIndex().GetAll()
for _, rn := range inboundRemoteNodes {
if excessiveInboundPeers == 0 {
break
}
cc.rnManager.Disconnect(rn)
excessiveInboundPeers--
}
}

func (cc *ConnectionController) getRandomUnconnectedAddress() *wire.NetAddress {
for tries := 0; tries < 100; tries++ {
addr := cc.AddrMgr.GetAddress()
if addr == nil {
//glog.V(2).Infof("ConnectionManager.getRandomUnconnectedAddress: addr from GetAddressWithExclusions was nil")
break
}

if cc.cmgr.IsConnectedOutboundIpAddress(addr.NetAddress()) {
//glog.V(2).Infof("ConnectionManager.getRandomUnconnectedAddress: Not choosing address due to redundancy %v:%v", addr.NetAddress().IP, addr.NetAddress().Port)
continue
}

if cc.cmgr.IsAttemptedOutboundIpAddress(addr.NetAddress()) {
continue
}

// We can only have one outbound address per /16. This is similar to
// Bitcoin and we do it to prevent Sybil attacks.
if cc.cmgr.IsFromRedundantOutboundIPAddress(addr.NetAddress()) {
//glog.V(2).Infof("ConnectionManager.getRandomUnconnectedAddress: Not choosing address due to redundant group key %v:%v", addr.NetAddress().IP, addr.NetAddress().Port)
continue
}

return addr.NetAddress()
}

//glog.V(2).Infof("ConnectionManager.getRandomAddr: Returning nil")
return nil
}

func (cc *ConnectionController) CreateValidatorConnection(ipStr string, publicKey *bls.PublicKey) (_err error) {
netAddr, err := cc.ConvertIPStringToNetAddress(ipStr)
if err != nil {
Expand Down

0 comments on commit 1638758

Please sign in to comment.