diff --git a/integration_testing/connection_controller_routines_test.go b/integration_testing/connection_controller_routines_test.go new file mode 100644 index 000000000..21fd30ff4 --- /dev/null +++ b/integration_testing/connection_controller_routines_test.go @@ -0,0 +1,96 @@ +package integration_testing + +import ( + "github.com/deso-protocol/core/bls" + "github.com/deso-protocol/core/cmd" + "github.com/deso-protocol/core/lib" + "github.com/stretchr/testify/require" + "os" + "testing" +) + +func TestConnectionControllerInitiatePersistentConnections(t *testing.T) { + require := require.New(t) + + // NonValidator Node1 will set its --connect-ips to two non-validators node2 and node3, + // and two validators node4 and node5. + node1 := spawnNonValidatorNodeProtocol2(t, 18000, "node1") + node2 := spawnNonValidatorNodeProtocol2(t, 18001, "node2") + node3 := spawnNonValidatorNodeProtocol2(t, 18002, "node3") + blsPriv4, err := bls.NewPrivateKey() + require.NoError(err) + node4 := spawnValidatorNodeProtocol2(t, 18003, "node4", blsPriv4) + blsPriv5, err := bls.NewPrivateKey() + require.NoError(err) + node5 := spawnValidatorNodeProtocol2(t, 18004, "node5", blsPriv5) + + node2 = startNode(t, node2) + node3 = startNode(t, node3) + node4 = startNode(t, node4) + node5 = startNode(t, node5) + defer node2.Stop() + defer node3.Stop() + defer node4.Stop() + defer node5.Stop() + + oldGetActiveValidatorImpl := lib.GetActiveValidatorImpl + defer func() { + setGetActiveValidatorImpl(oldGetActiveValidatorImpl) + }() + setGetActiveValidatorImpl(func() map[bls.SerializedPublicKey]*lib.ValidatorEntry { + return map[bls.SerializedPublicKey]*lib.ValidatorEntry{ + blsPriv4.PublicKey().Serialize(): createSimpleValidatorEntry(node4), + blsPriv5.PublicKey().Serialize(): createSimpleValidatorEntry(node5), + } + }) + + node1.Config.ConnectIPs = []string{ + node2.Listeners[0].Addr().String(), + node3.Listeners[0].Addr().String(), + node4.Listeners[0].Addr().String(), + node5.Listeners[0].Addr().String(), + } + node1 = startNode(t, node1) + defer node1.Stop() + waitForNonValidatorOutboundConnection(t, node1, node2) + waitForNonValidatorOutboundConnection(t, node1, node3) + waitForValidatorConnection(t, node1, node4) + waitForValidatorConnection(t, node1, node5) +} + +func spawnNonValidatorNodeProtocol2(t *testing.T, port uint32, id string) *cmd.Node { + dbDir := getDirectory(t) + t.Cleanup(func() { + os.RemoveAll(dbDir) + }) + config := generateConfig(t, port, dbDir, 10) + config.SyncType = lib.NodeSyncTypeBlockSync + node := cmd.NewNode(config) + node.Params.UserAgent = id + node.Params.ProtocolVersion = lib.ProtocolVersion2 + return node +} + +func spawnValidatorNodeProtocol2(t *testing.T, port uint32, id string, blsPriv *bls.PrivateKey) *cmd.Node { + dbDir := getDirectory(t) + t.Cleanup(func() { + os.RemoveAll(dbDir) + }) + config := generateConfig(t, port, dbDir, 10) + config.SyncType = lib.NodeSyncTypeBlockSync + config.PosValidatorSeed = blsPriv.ToString() + node := cmd.NewNode(config) + node.Params.UserAgent = id + node.Params.ProtocolVersion = lib.ProtocolVersion2 + return node +} + +func setGetActiveValidatorImpl(mapping func() map[bls.SerializedPublicKey]*lib.ValidatorEntry) { + lib.GetActiveValidatorImpl = mapping +} + +func createSimpleValidatorEntry(node *cmd.Node) *lib.ValidatorEntry { + return &lib.ValidatorEntry{ + Domains: [][]byte{[]byte(node.Listeners[0].Addr().String())}, + } +} diff --git a/integration_testing/connection_controller_test.go b/integration_testing/connection_controller_test.go index b1ed4971a..04e78428f 100644 --- a/integration_testing/connection_controller_test.go +++ b/integration_testing/connection_controller_test.go @@ -169,10 +169,7 @@ func TestConnectionControllerHandshakeDataErrors(t *testing.T) { require := require.New(t) dbDir1 := getDirectory(t) - dbDir2 := getDirectory(t) defer os.RemoveAll(dbDir1) - defer os.RemoveAll(dbDir2) - config1 := generateConfig(t, 18000, dbDir1, 10) config1.SyncType = lib.NodeSyncTypeBlockSync blsPriv1, err := bls.NewPrivateKey() @@ -180,6 +177,8 @@ func TestConnectionControllerHandshakeDataErrors(t *testing.T) { config1.PosValidatorSeed = blsPriv1.ToString() // This node should have ProtocolVersion2, but it has ProtocolVersion1 as we want it to disconnect. + dbDir2 := getDirectory(t) + defer os.RemoveAll(dbDir2) config2 := generateConfig(t, 18001, dbDir2, 10) config2.SyncType = lib.NodeSyncTypeBlockSync blsPriv2, err := bls.NewPrivateKey() diff --git a/integration_testing/tools.go b/integration_testing/tools.go index 2f97e942d..4db913136 100644 --- a/integration_testing/tools.go +++ b/integration_testing/tools.go @@ -69,7 +69,7 @@ func generateConfig(t *testing.T, port uint32, dataDir string, maxPeers uint32) config.MaxSyncBlockHeight = 0 config.ConnectIPs = []string{} config.PrivateMode = true - config.GlogV = 0 + config.GlogV = 2 config.GlogVmodule = "*bitcoin_manager*=0,*balance*=0,*view*=0,*frontend*=0,*peer*=0,*addr*=0,*network*=0,*utils*=0,*connection*=0,*main*=0,*server*=0,*mempool*=0,*miner*=0,*blockchain*=0" config.MaxInboundPeers = maxPeers config.TargetOutboundPeers = maxPeers diff --git a/lib/connection_controller.go b/lib/connection_controller.go index 5bf22c76a..81e2257ef 100644 --- a/lib/connection_controller.go +++ b/lib/connection_controller.go @@ -13,6 +13,12 @@ import ( "time" ) +type GetActiveValidatorsFunc func() map[bls.SerializedPublicKey]*ValidatorEntry + +var GetActiveValidatorImpl GetActiveValidatorsFunc = func() map[bls.SerializedPublicKey]*ValidatorEntry { + return make(map[bls.SerializedPublicKey]*ValidatorEntry) +} + // ConnectionController is a structure that oversees all connections to remote nodes. It is responsible for kicking off // the initial connections node makes to the network. It is also responsible for creating RemoteNodes from all // successful outbound and inbound connections. The ConnectionController also ensures that the node is connected to @@ -29,8 +35,6 @@ type ConnectionController struct { rnManager *RemoteNodeManager - getActiveValidators func() map[bls.SerializedPublicKey]*ValidatorEntry - // The address manager keeps track of peer addresses we're aware of. When // we need to connect to a new outbound peer, it chooses one of the addresses // it's aware of at random and provides it to us. @@ -53,8 +57,8 @@ type ConnectionController struct { } func NewConnectionController(params *DeSoParams, cmgr *ConnectionManager, handshakeController *HandshakeController, - rnManager *RemoteNodeManager, blsKeystore *BLSKeystore, addrMgr *addrmgr.AddrManager, targetOutboundPeers uint32, - maxInboundPeers uint32, limitOneInboundConnectionPerIP bool) *ConnectionController { + rnManager *RemoteNodeManager, blsKeystore *BLSKeystore, addrMgr *addrmgr.AddrManager, connectIps []string, + targetOutboundPeers uint32, maxInboundPeers uint32, limitOneInboundConnectionPerIP bool) *ConnectionController { return &ConnectionController{ params: params, @@ -63,6 +67,7 @@ func NewConnectionController(params *DeSoParams, cmgr *ConnectionManager, handsh handshake: handshakeController, rnManager: rnManager, AddrMgr: addrMgr, + connectIps: connectIps, targetOutboundPeers: targetOutboundPeers, maxInboundPeers: maxInboundPeers, limitOneInboundConnectionPerIP: limitOneInboundConnectionPerIP, @@ -95,6 +100,7 @@ func (cc *ConnectionController) initiatePersistentConnections() { // disconnect from one, we will try to reconnect to the same one. if len(cc.connectIps) > 0 { for _, connectIp := range cc.connectIps { + glog.Infof("ConnectionController.initiatePersistentConnections: Connecting to connectIp: %v", connectIp) if err := cc.CreateNonValidatorPersistentOutboundConnection(connectIp); err != nil { glog.Errorf("ConnectionController.initiatePersistentConnections: Problem connecting "+ "to connectIp %v: %v", connectIp, err) @@ -116,7 +122,7 @@ func (cc *ConnectionController) startValidatorConnector() { cc.exitGroup.Done() return case <-time.After(1 * time.Second): - activeValidatorsMap := cc.getActiveValidators() + activeValidatorsMap := GetActiveValidatorImpl() cc.refreshValidatorIndex(activeValidatorsMap) cc.connectValidators(activeValidatorsMap) } @@ -136,7 +142,6 @@ func (cc *ConnectionController) startNonValidatorConnector() { cc.exitGroup.Done() return case <-time.After(1 * time.Second): - cc.refreshNonValidatorOutboundIndex() cc.refreshNonValidatorInboundIndex() cc.connectNonValidators() @@ -271,12 +276,20 @@ func (cc *ConnectionController) refreshValidatorIndex(activeValidatorsMap map[bl // periodically by the validator connector. func (cc *ConnectionController) connectValidators(activeValidatorsMap map[bls.SerializedPublicKey]*ValidatorEntry) { // Look through the active validators and connect to any that we're not already connected to. + if cc.blsKeystore == nil { + return + } + for pk, validator := range activeValidatorsMap { _, exists := cc.rnManager.GetValidatorIndex().Get(pk) // If we're already connected to the validator, continue. if exists { continue } + if cc.blsKeystore.GetSigner().GetPublicKey().Serialize() == pk { + continue + } + publicKey, err := pk.Deserialize() if err != nil { continue @@ -295,32 +308,6 @@ func (cc *ConnectionController) connectValidators(activeValidatorsMap map[bls.Se // ## NonValidator Connections // ########################### -func (cc *ConnectionController) connectNonValidators() { - // Only connect to addresses from the addrmgr if we don't specify --connect-ips. These addresses are *not* persistent, - // meaning if we disconnect from one we'll try a different one. - // TODO: We used this condition in the old code to prevent the node from connecting to non-connect-ips nodes. - // I'm not sure whether this is still necessary. I suppose the concern here was that the connect-ips nodes - // should be prioritized over the addrmgr nodes, especially during syncing. However, I think we can achieve the - // same result by defining another flag, like a boolean --sync-from-persistent-peers-only, which could indicate that - // we disregard non-persistent non-connect-ips nodes during syncing, if the flag is set to true. - if len(cc.connectIps) == 0 { - return - } - - numOutboundPeers := uint32(cc.rnManager.GetNonValidatorOutboundIndex().Count()) - - remainingOutboundPeers := uint32(0) - if numOutboundPeers < cc.targetOutboundPeers { - remainingOutboundPeers = cc.targetOutboundPeers - numOutboundPeers - } - for ii := uint32(0); ii < remainingOutboundPeers; ii++ { - addr := cc.getRandomUnconnectedAddress() - cc.AddrMgr.Attempt(addr) - // FIXME: error handle - cc.rnManager.CreateNonValidatorOutboundConnection(addr) - } -} - // refreshNonValidatorOutboundIndex is called periodically by the peer connector. It is responsible for disconnecting excess // outbound remote nodes. func (cc *ConnectionController) refreshNonValidatorOutboundIndex() { @@ -379,6 +366,36 @@ func (cc *ConnectionController) refreshNonValidatorInboundIndex() { } } +func (cc *ConnectionController) connectNonValidators() { + // Only connect to addresses from the addrmgr if we don't specify --connect-ips. These addresses are *not* persistent, + // meaning if we disconnect from one we'll try a different one. + // TODO: We used this condition in the old code to prevent the node from connecting to non-connect-ips nodes. + // I'm not sure whether this is still necessary. I suppose the concern here was that the connect-ips nodes + // should be prioritized over the addrmgr nodes, especially during syncing. However, I think we can achieve the + // same result by defining another flag, like a boolean --sync-from-persistent-peers-only, which could indicate that + // we disregard non-persistent non-connect-ips nodes during syncing, if the flag is set to true. + // FIXME: This about uncommenting the below condition. + //if len(cc.connectIps) == 0 { + // return + //} + + numOutboundPeers := uint32(cc.rnManager.GetNonValidatorOutboundIndex().Count()) + + remainingOutboundPeers := uint32(0) + if numOutboundPeers < cc.targetOutboundPeers { + remainingOutboundPeers = cc.targetOutboundPeers - numOutboundPeers + } + for ii := uint32(0); ii < remainingOutboundPeers; ii++ { + addr := cc.getRandomUnconnectedAddress() + if addr == nil { + break + } + cc.AddrMgr.Attempt(addr) + // FIXME: error handle + cc.rnManager.CreateNonValidatorOutboundConnection(addr) + } +} + func (cc *ConnectionController) getRandomUnconnectedAddress() *wire.NetAddress { for tries := 0; tries < 100; tries++ { addr := cc.AddrMgr.GetAddress() diff --git a/lib/handshake_controller.go b/lib/handshake_controller.go index bde07745a..f355bad93 100644 --- a/lib/handshake_controller.go +++ b/lib/handshake_controller.go @@ -122,7 +122,7 @@ func (hc *HandshakeController) _handleVersionMessage(origin *Peer, desoMsg DeSoM if hc.usedNonces.Contains(msgNonce) { hc.usedNonces.Delete(msgNonce) glog.Errorf("HandshakeController._handleVersionMessage: Disconnecting RemoteNode with id: (%v) "+ - "nonce collision", origin.ID) + "nonce collision, nonce (%v)", origin.ID, msgNonce) hc.rnManager.Disconnect(rn) return } diff --git a/lib/server.go b/lib/server.go index d76da571d..4d5138f77 100644 --- a/lib/server.go +++ b/lib/server.go @@ -499,8 +499,8 @@ func NewServer( rnManager := NewRemoteNodeManager(srv, _chain, _cmgr, _blsKeystore, _params, _minFeeRateNanosPerKB, nodeServices) srv.handshakeController = NewHandshakeController(rnManager) - srv.connectionController = NewConnectionController(_params, _cmgr, srv.handshakeController, rnManager, - _blsKeystore, _desoAddrMgr, _targetOutboundPeers, _maxInboundPeers, _limitOneInboundConnectionPerIP) + srv.connectionController = NewConnectionController(_params, _cmgr, srv.handshakeController, rnManager, _blsKeystore, + _desoAddrMgr, _connectIps, _targetOutboundPeers, _maxInboundPeers, _limitOneInboundConnectionPerIP) if srv.stateChangeSyncer != nil { srv.stateChangeSyncer.BlockHeight = uint64(_chain.headerTip().Height)