diff --git a/lib/connection_controller.go b/lib/connection_controller.go index a95bac0db..9a2a780b7 100644 --- a/lib/connection_controller.go +++ b/lib/connection_controller.go @@ -10,6 +10,7 @@ import ( "net" "strconv" "sync" + "time" ) // ConnectionController is a structure that oversees all connections to remote nodes. It is responsible for kicking off @@ -46,6 +47,10 @@ type ConnectionController struct { // When true, only one connection per IP is allowed. Prevents eclipse attacks // among other things. limitOneInboundConnectionPerIP bool + + startGroup sync.WaitGroup + exitChan chan struct{} + exitGroup sync.WaitGroup } func NewConnectionController(params *DeSoParams, cmgr *ConnectionManager, handshakeController *HandshakeController, @@ -62,13 +67,87 @@ func NewConnectionController(params *DeSoParams, cmgr *ConnectionManager, handsh targetOutboundPeers: targetOutboundPeers, maxInboundPeers: maxInboundPeers, limitOneInboundConnectionPerIP: limitOneInboundConnectionPerIP, + exitChan: make(chan struct{}), } } +func (cc *ConnectionController) Start() { + cc.startGroup.Add(3) + // Start the validator connector + go cc.startValidatorConnector() + + cc.startGroup.Wait() + cc.exitGroup.Add(3) +} + +func (cc *ConnectionController) Stop() { + close(cc.exitChan) + cc.exitGroup.Wait() +} + func (cc *ConnectionController) GetRemoteNodeManager() *RemoteNodeManager { return cc.rnManager } +func (cc *ConnectionController) initiatePersistentConnections() { + // This is a hack to make outbound connections go away. + if cc.targetOutboundPeers == 0 { + return + } + if len(cc.connectIps) > 0 { + // Connect to addresses passed via the --connect-ips flag. These addresses + // are persistent in the sense that if we disconnect from one, we will + // try to reconnect to the same one. + for _, connectIp := range cc.connectIps { + if err := cc.CreateNonValidatorPersistentOutboundConnection(connectIp); err != nil { + glog.Errorf("ConnectionController.initiatePersistentConnections: Problem connecting "+ + "to connectIp %v: %v", connectIp, err) + } + } + } +} + +func (cc *ConnectionController) startValidatorConnector() { + cc.startGroup.Done() + for { + select { + case <-cc.exitChan: + cc.exitGroup.Done() + return + case <-time.After(1 * time.Minute): + cc.validatorMapLock.Lock() + activeValidatorsMap := cc.getActiveValidators() + cc.refreshValidatorIndex(activeValidatorsMap) + cc.connectValidators(activeValidatorsMap) + cc.validatorMapLock.Unlock() + } + } +} + +func (cc *ConnectionController) startPeerConnector() { + cc.startGroup.Done() + + for { + select { + case <-cc.exitChan: + cc.exitGroup.Done() + return + case <-time.After(1 * time.Second): + // Only connect to addresses from the addrmgr if we don't specify --connect-ips. + // These addresses are *not* persistent, meaning if we disconnect from one we'll + // try a different one. + // TODO: Do we still want this? + if len(cc.connectIps) == 0 { + continue + } + + cc.refreshOutboundIndex() + cc.refreshInboundIndex() + cc.connectPeers() + } + } +} + // ########################### // ## Handlers (Peer, DeSoMessage) // ########################### @@ -147,10 +226,157 @@ func (cc *ConnectionController) cleanupFailedOutboundConnection(connection Conne cc.cmgr.RemoveAttemptedOutboundAddrs(oc.address) } +// ########################### +// ## Validator Connections +// ########################### + +func (cc *ConnectionController) refreshValidatorIndex(activeValidatorsMap map[bls.SerializedPublicKey]*ValidatorEntry) { + // De-index inactive validators. Doesn't matter if the RemoteNodes are connected or not, if we exceed the number of + // outbound/inbound peers, these RemoteNodes will be Disconnected later. + // FIXME: Should we care about never-connecting persistent dials? + validatorRemoteNodeMap := cc.rnManager.GetValidatorIndex().Copy() + for pk, rn := range validatorRemoteNodeMap { + if _, ok := activeValidatorsMap[pk]; !ok { + cc.rnManager.SetNonValidator(rn) + cc.rnManager.UnsetValidator(rn) + } + } + + // Look for validators in our existing outbound / inbound connections. + allNonValidators := cc.rnManager.GetAllNonValidators() + for _, rn := range allNonValidators { + pk := rn.GetValidatorPublicKey() + if pk == nil { + continue + } + if _, ok := activeValidatorsMap[pk.Serialize()]; ok { + cc.rnManager.SetValidator(rn) + cc.rnManager.UnsetNonValidator(rn) + } + } +} + +func (cc *ConnectionController) connectValidators(activeValidatorsMap map[bls.SerializedPublicKey]*ValidatorEntry) { + for pk, validator := range activeValidatorsMap { + _, exists := cc.rnManager.GetValidatorIndex().Get(pk) + if !exists { + // FIXME: for now we'll only use the first address in the ValidatorEntry + publicKey, err := pk.Deserialize() + if err != nil { + continue + } + + address := string(validator.Domains[0]) + if err := cc.CreateValidatorConnection(address, publicKey); err != nil { + // TODO: Do we want to log an error here? + continue + } + } + } +} + // ########################### // ## Connections // ########################### +func (cc *ConnectionController) connectPeers() { + numOutboundPeers := uint32(cc.rnManager.GetNonValidatorOutboundIndex().Count()) + + remainingOutboundPeers := uint32(0) + if numOutboundPeers < cc.targetOutboundPeers { + remainingOutboundPeers = cc.targetOutboundPeers - numOutboundPeers + } + for ii := uint32(0); ii < remainingOutboundPeers; ii++ { + addr := cc.getRandomUnconnectedAddress() + cc.AddrMgr.Attempt(addr) + // FIXME: error handle + cc.rnManager.CreateNonValidatorOutboundConnection(addr) + } +} + +func (cc *ConnectionController) refreshOutboundIndex() { + numOutboundPeers := uint32(cc.rnManager.GetNonValidatorOutboundIndex().Count()) + + excessiveOutboundPeers := uint32(0) + if numOutboundPeers > cc.targetOutboundPeers { + excessiveOutboundPeers = numOutboundPeers - cc.targetOutboundPeers + } + // Disconnect random outbound peers if we have too many peers. + allOutboundRemoteNodes := cc.rnManager.GetNonValidatorOutboundIndex().GetAll() + var attemptedOutboundRemoteNodes, connectedOutboundRemoteNodes []*RemoteNode + + for _, rn := range allOutboundRemoteNodes { + if rn.IsHandshakeCompleted() { + connectedOutboundRemoteNodes = append(connectedOutboundRemoteNodes, rn) + } else { + attemptedOutboundRemoteNodes = append(attemptedOutboundRemoteNodes, rn) + } + } + for _, rn := range attemptedOutboundRemoteNodes { + if excessiveOutboundPeers == 0 { + break + } + cc.rnManager.Disconnect(rn) + excessiveOutboundPeers-- + } + for _, rn := range connectedOutboundRemoteNodes { + if excessiveOutboundPeers == 0 { + break + } + cc.rnManager.Disconnect(rn) + excessiveOutboundPeers-- + } +} + +func (cc *ConnectionController) refreshInboundIndex() { + numConnectedInboundPeers := uint32(cc.rnManager.GetNonValidatorInboundIndex().Count()) + + excessiveInboundPeers := uint32(0) + if numConnectedInboundPeers > cc.maxInboundPeers { + excessiveInboundPeers = numConnectedInboundPeers - cc.maxInboundPeers + } + // Disconnect random inbound peers if we have too many peers. + inboundRemoteNodes := cc.rnManager.GetNonValidatorInboundIndex().GetAll() + for _, rn := range inboundRemoteNodes { + if excessiveInboundPeers == 0 { + break + } + cc.rnManager.Disconnect(rn) + excessiveInboundPeers-- + } +} + +func (cc *ConnectionController) getRandomUnconnectedAddress() *wire.NetAddress { + for tries := 0; tries < 100; tries++ { + addr := cc.AddrMgr.GetAddress() + if addr == nil { + //glog.V(2).Infof("ConnectionManager.getRandomUnconnectedAddress: addr from GetAddressWithExclusions was nil") + break + } + + if cc.cmgr.IsConnectedOutboundIpAddress(addr.NetAddress()) { + //glog.V(2).Infof("ConnectionManager.getRandomUnconnectedAddress: Not choosing address due to redundancy %v:%v", addr.NetAddress().IP, addr.NetAddress().Port) + continue + } + + if cc.cmgr.IsAttemptedOutboundIpAddress(addr.NetAddress()) { + continue + } + + // We can only have one outbound address per /16. This is similar to + // Bitcoin and we do it to prevent Sybil attacks. + if cc.cmgr.IsFromRedundantOutboundIPAddress(addr.NetAddress()) { + //glog.V(2).Infof("ConnectionManager.getRandomUnconnectedAddress: Not choosing address due to redundant group key %v:%v", addr.NetAddress().IP, addr.NetAddress().Port) + continue + } + + return addr.NetAddress() + } + + //glog.V(2).Infof("ConnectionManager.getRandomAddr: Returning nil") + return nil +} + func (cc *ConnectionController) CreateValidatorConnection(ipStr string, publicKey *bls.PublicKey) (_err error) { netAddr, err := cc.ConvertIPStringToNetAddress(ipStr) if err != nil {