diff --git a/integration_testing/connection_controller_routines_test.go b/integration_testing/connection_controller_routines_test.go index 582031bd8..6eec830dd 100644 --- a/integration_testing/connection_controller_routines_test.go +++ b/integration_testing/connection_controller_routines_test.go @@ -85,6 +85,223 @@ func TestConnectionControllerInitiatePersistentConnections(t *testing.T) { t.Logf("Test #2 passed | Successfully run validator node6 with --connect-ips set to node2, node3, node4, node5") } +func TestConnectionControllerValidatorConnector(t *testing.T) { + require := require.New(t) + t.Cleanup(func() { + setGetActiveValidatorImpl(lib.BasicGetActiveValidators) + }) + + // Spawn 5 validators node1, node2, node3, node4, node5 and two non-validators node6 and node7. + // All the validators are initially in the validator set. And later, node1 and node2 will be removed from the + // validator set. Then, make node3 inactive, and node2 active again. Then, make all the validators inactive. + // Make node6, and node7 connect-ips to all the validators. + + blsPriv1, err := bls.NewPrivateKey() + require.NoError(err) + node1 := spawnValidatorNodeProtocol2(t, 18000, "node1", blsPriv1) + blsPriv2, err := bls.NewPrivateKey() + require.NoError(err) + node2 := spawnValidatorNodeProtocol2(t, 18001, "node2", blsPriv2) + blsPriv3, err := bls.NewPrivateKey() + require.NoError(err) + node3 := spawnValidatorNodeProtocol2(t, 18002, "node3", blsPriv3) + blsPriv4, err := bls.NewPrivateKey() + require.NoError(err) + node4 := spawnValidatorNodeProtocol2(t, 18003, "node4", blsPriv4) + blsPriv5, err := bls.NewPrivateKey() + require.NoError(err) + node5 := spawnValidatorNodeProtocol2(t, 18004, "node5", blsPriv5) + + node6 := spawnNonValidatorNodeProtocol2(t, 18005, "node6") + node7 := spawnNonValidatorNodeProtocol2(t, 18006, "node7") + + node1 = startNode(t, node1) + defer node1.Stop() + node2 = startNode(t, node2) + defer node2.Stop() + node3 = startNode(t, node3) + defer node3.Stop() + node4 = startNode(t, node4) + defer node4.Stop() + node5 = startNode(t, node5) + defer node5.Stop() + setGetActiveValidatorImplWithValidatorNodes(t, node1, node2, node3, node4, node5) + + node6.Config.ConnectIPs = []string{ + node1.Listeners[0].Addr().String(), + node2.Listeners[0].Addr().String(), + node3.Listeners[0].Addr().String(), + node4.Listeners[0].Addr().String(), + node5.Listeners[0].Addr().String(), + } + node7.Config.ConnectIPs = node6.Config.ConnectIPs + node6 = startNode(t, node6) + defer node6.Stop() + node7 = startNode(t, node7) + defer node7.Stop() + + // Verify full graph between active validators. + waitForValidatorFullGraph(t, node1, node2, node3, node4, node5) + // Verify connections of non-validators. + for _, nonValidator := range []*cmd.Node{node6, node7} { + waitForValidatorConnectionOneWay(t, nonValidator, node1, node2, node3, node4, node5) + } + // Verify connections of initial validators. + for _, validator := range []*cmd.Node{node1, node2, node3, node4, node5} { + waitForNonValidatorInboundConnection(t, validator, node6) + waitForNonValidatorInboundConnection(t, validator, node7) + } + // Verify connection counts of active validators. + for _, validator := range []*cmd.Node{node1, node2, node3, node4, node5} { + waitForMinNonValidatorCountRemoteNodeIndexer(t, validator, 6, 4, 0, 2) + } + // NOOP Verify connection counts of inactive validators. + // Verify connection counts of non-validators. + waitForCountRemoteNodeIndexer(t, node6, 5, 5, 0, 0) + waitForCountRemoteNodeIndexer(t, node7, 5, 5, 0, 0) + t.Logf("Test #1 passed | Successfully run validators node1, node2, node3, node4, node5; non-validators node6, node7") + + // Remove node1 and node2 from the validator set. + setGetActiveValidatorImplWithValidatorNodes(t, node3, node4, node5) + // Verify full graph between active validators. + waitForValidatorFullGraph(t, node3, node4, node5) + // Verify connections of non-validators. + for _, nonValidator := range []*cmd.Node{node1, node2, node6, node7} { + waitForValidatorConnectionOneWay(t, nonValidator, node3, node4, node5) + } + // Verify connections of initial validators. + for _, validator := range []*cmd.Node{node1, node2, node3, node4, node5} { + waitForNonValidatorInboundConnection(t, validator, node6) + waitForNonValidatorInboundConnection(t, validator, node7) + } + // Verify connections of active validators. + for _, validator := range []*cmd.Node{node3, node4, node5} { + waitForNonValidatorInboundXOROutboundConnection(t, validator, node1) + waitForNonValidatorInboundXOROutboundConnection(t, validator, node2) + waitForMinNonValidatorCountRemoteNodeIndexer(t, validator, 6, 2, 0, 2) + } + // Verify connection counts of inactive validators. + for _, validator := range []*cmd.Node{node1, node2} { + waitForMinNonValidatorCountRemoteNodeIndexer(t, validator, 6, 3, 0, 2) + } + // Verify connection counts of non-validators. + waitForCountRemoteNodeIndexer(t, node6, 5, 3, 2, 0) + waitForCountRemoteNodeIndexer(t, node7, 5, 3, 2, 0) + t.Logf("Test #2 passed | Successfully run validators node3, node4, node5; inactive-validators node1, node2; " + + "non-validators node6, node7") + + // Remove node3 from the validator set. Make node1 active again. + setGetActiveValidatorImplWithValidatorNodes(t, node1, node4, node5) + // Verify full graph between active validators. + waitForValidatorFullGraph(t, node1, node4, node5) + // Verify connections of non-validators. + for _, nonValidator := range []*cmd.Node{node2, node3, node6, node7} { + waitForValidatorConnectionOneWay(t, nonValidator, node1, node4, node5) + } + // Verify connections of initial validators. + for _, validator := range []*cmd.Node{node1, node2, node3, node4, node5} { + waitForNonValidatorInboundConnection(t, validator, node6) + waitForNonValidatorInboundConnection(t, validator, node7) + } + // Verify connections of active validators. + for _, validator := range []*cmd.Node{node1, node4, node5} { + waitForNonValidatorInboundXOROutboundConnection(t, validator, node2) + waitForNonValidatorInboundXOROutboundConnection(t, validator, node3) + waitForMinNonValidatorCountRemoteNodeIndexer(t, validator, 6, 2, 0, 2) + } + // Verify connection counts of inactive validators. + for _, validator := range []*cmd.Node{node2, node3} { + waitForMinNonValidatorCountRemoteNodeIndexer(t, validator, 6, 3, 0, 2) + } + // Verify connection counts of non-validators. + waitForCountRemoteNodeIndexer(t, node6, 5, 3, 2, 0) + waitForCountRemoteNodeIndexer(t, node7, 5, 3, 2, 0) + t.Logf("Test #3 passed | Successfully run validators node1, node4, node5; inactive validators node2, node3; " + + "non-validators node6, node7") + + // Make all validators inactive. + setGetActiveValidatorImplWithValidatorNodes(t) + // NOOP Verify full graph between active validators. + // NOOP Verify connections of non-validators. + // Verify connections of initial validators. + for _, validator := range []*cmd.Node{node1, node2, node3, node4, node5} { + waitForNonValidatorInboundConnection(t, validator, node6) + waitForNonValidatorInboundConnection(t, validator, node7) + } + // NOOP Verify connections of active validators. + // Verify connections and counts of inactive validators. + inactiveValidators := []*cmd.Node{node1, node2, node3, node4, node5} + for ii := 0; ii < len(inactiveValidators); ii++ { + for jj := ii + 1; jj < len(inactiveValidators); jj++ { + waitForNonValidatorInboundXOROutboundConnection(t, inactiveValidators[ii], inactiveValidators[jj]) + } + } + inactiveValidatorsRev := []*cmd.Node{node5, node4, node3, node2, node1} + for ii := 0; ii < len(inactiveValidatorsRev); ii++ { + for jj := ii + 1; jj < len(inactiveValidatorsRev); jj++ { + waitForNonValidatorInboundXOROutboundConnection(t, inactiveValidatorsRev[ii], inactiveValidatorsRev[jj]) + } + } + for _, validator := range inactiveValidators { + waitForMinNonValidatorCountRemoteNodeIndexer(t, validator, 6, 0, 0, 2) + } + // Verify connection counts of non-validators. + waitForCountRemoteNodeIndexer(t, node6, 5, 0, 5, 0) + waitForCountRemoteNodeIndexer(t, node7, 5, 0, 5, 0) + t.Logf("Test #4 passed | Successfully run inactive validators node1, node2, node3, node4, node5; " + + "non-validators node6, node7") +} + +func TestConnectionControllerValidatorInboundDeduplication(t *testing.T) { + require := require.New(t) + t.Cleanup(func() { + setGetActiveValidatorImpl(lib.BasicGetActiveValidators) + }) + + // Spawn a non-validator node1, and two validators node2, node3. The validator nodes will have the same public key. + // Node2 and node3 will not initially be in the validator set. First, node2 will start an outbound connection to + // node1. We wait until the node2 is re-indexed as non-validator by node1, and then we make node3 open an outbound + // connection to node1. We wait until node3 is re-indexed as non-validator by node1. Then, we make node2 and node3 + // join the validator set (i.e. add one entry with the duplicated public key). Now, node1 should disconnect from + // either node2 or node3 because of duplicate public key. + + node1 := spawnNonValidatorNodeProtocol2(t, 18000, "node1") + blsPriv2, err := bls.NewPrivateKey() + require.NoError(err) + node2 := spawnValidatorNodeProtocol2(t, 18001, "node2", blsPriv2) + node3 := spawnValidatorNodeProtocol2(t, 18002, "node3", blsPriv2) + + node1 = startNode(t, node1) + defer node1.Stop() + node2 = startNode(t, node2) + defer node2.Stop() + node3 = startNode(t, node3) + defer node3.Stop() + + cc2 := node2.Server.GetConnectionController() + require.NoError(cc2.CreateNonValidatorOutboundConnection(node1.Listeners[0].Addr().String())) + // First wait for node2 to be indexed as a validator by node1. + waitForValidatorConnection(t, node1, node2) + // Now wait for node2 to be re-indexed as a non-validator. + waitForNonValidatorInboundConnectionDynamic(t, node1, node2, true) + waitForNonValidatorOutboundConnection(t, node2, node1) + + // Now connect node3 to node1. + cc3 := node3.Server.GetConnectionController() + require.NoError(cc3.CreateNonValidatorOutboundConnection(node1.Listeners[0].Addr().String())) + // First wait for node3 to be indexed as a validator by node1. + waitForValidatorConnection(t, node1, node3) + // Now wait for node3 to be re-indexed as a non-validator. + waitForNonValidatorInboundConnectionDynamic(t, node1, node3, true) + waitForNonValidatorOutboundConnection(t, node3, node1) + + // Now add node2 and node3 to the validator set. + setGetActiveValidatorImplWithValidatorNodes(t, node2) + // Now wait for node1 to disconnect from either node2 or node3. + waitForCountRemoteNodeIndexer(t, node1, 1, 1, 0, 0) + t.Logf("Test #1 passed | Successfully run non-validator node1; validators node2, node3 with duplicate public key") +} + func TestConnectionControllerNonValidatorCircularConnectIps(t *testing.T) { node1 := spawnNonValidatorNodeProtocol2(t, 18000, "node1") node2 := spawnNonValidatorNodeProtocol2(t, 18001, "node2") diff --git a/integration_testing/connection_controller_utils_test.go b/integration_testing/connection_controller_utils_test.go index 74a33b943..43cf418bc 100644 --- a/integration_testing/connection_controller_utils_test.go +++ b/integration_testing/connection_controller_utils_test.go @@ -69,6 +69,14 @@ func waitForNonValidatorInboundConnection(t *testing.T, node1 *cmd.Node, node2 * waitForCondition(t, fmt.Sprintf("Waiting for Node (%s) to connect to inbound non-validator Node (%s)", userAgentN1, userAgentN2), condition) } +func waitForNonValidatorInboundConnectionDynamic(t *testing.T, node1 *cmd.Node, node2 *cmd.Node, inactiveValidator bool) { + userAgentN1 := node1.Params.UserAgent + userAgentN2 := node2.Params.UserAgent + condition := conditionNonValidatorInboundConnectionDynamic(t, node1, node2, inactiveValidator) + waitForCondition(t, fmt.Sprintf("Waiting for Node (%s) to connect to inbound non-validator Node (%s), "+ + "inactiveValidator (%v)", userAgentN1, userAgentN2, inactiveValidator), condition) +} + func conditionNonValidatorInboundConnection(t *testing.T, node1 *cmd.Node, node2 *cmd.Node) func() bool { return conditionNonValidatorInboundConnectionDynamic(t, node1, node2, false) } diff --git a/lib/connection_controller.go b/lib/connection_controller.go index 18d423f46..e025c24a2 100644 --- a/lib/connection_controller.go +++ b/lib/connection_controller.go @@ -87,11 +87,12 @@ func NewConnectionController(params *DeSoParams, cmgr *ConnectionManager, handsh } func (cc *ConnectionController) Start() { - cc.startGroup.Add(1) + cc.startGroup.Add(2) go cc.startPersistentConnector() + go cc.startValidatorConnector() cc.startGroup.Wait() - cc.exitGroup.Add(1) + cc.exitGroup.Add(2) } func (cc *ConnectionController) Stop() { @@ -116,6 +117,26 @@ func (cc *ConnectionController) startPersistentConnector() { } } +// startValidatorConnector is responsible for ensuring that the node is connected to all active validators. It does +// this in two steps. First, it looks through the already established connections and checks if any of these connections +// are validators. If they are, it adds them to the validator index. It also checks if any of the existing validators +// are no longer active and removes them from the validator index. Second, it checks if any of the active validators +// are missing from the validator index. If they are, it attempts to connect to them. +func (cc *ConnectionController) startValidatorConnector() { + cc.startGroup.Done() + for { + select { + case <-cc.exitChan: + cc.exitGroup.Done() + return + case <-time.After(1 * time.Second): + activeValidatorsMap := GetActiveValidatorImpl() + cc.refreshValidatorIndex(activeValidatorsMap) + cc.connectValidators(activeValidatorsMap) + } + } +} + // ########################### // ## Handlers (Peer, DeSoMessage) // ########################### @@ -231,6 +252,84 @@ func (cc *ConnectionController) refreshConnectIps() { } } +// ########################### +// ## Validator Connections +// ########################### + +// refreshValidatorIndex re-indexes validators based on the activeValidatorsMap. It is called periodically by the +// validator connector. +func (cc *ConnectionController) refreshValidatorIndex(activeValidatorsMap *collections.ConcurrentMap[bls.SerializedPublicKey, *ValidatorEntry]) { + // De-index inactive validators. We skip any checks regarding RemoteNodes connection status, nor do we verify whether + // de-indexing the validator would result in an excess number of outbound/inbound connections. Any excess connections + // will be cleaned up by the peer connector. + validatorRemoteNodeMap := cc.rnManager.GetValidatorIndex().Copy() + for pk, rn := range validatorRemoteNodeMap { + // If the validator is no longer active, de-index it. + if _, ok := activeValidatorsMap.Get(pk); !ok { + cc.rnManager.SetNonValidator(rn) + cc.rnManager.UnsetValidator(rn) + } + } + + // Look for validators in our existing outbound / inbound connections. + allNonValidators := cc.rnManager.GetAllNonValidators() + for _, rn := range allNonValidators { + // It is possible for a RemoteNode to be in the non-validator indices, and still have a public key. This can happen + // if the RemoteNode advertised support for the SFValidator service flag during handshake, and provided us + // with a public key, and a corresponding proof of possession signature. + pk := rn.GetValidatorPublicKey() + if pk == nil { + continue + } + // It is possible that through unlikely concurrence, and malevolence, two non-validators happen to have the same + // public key, which goes undetected during handshake. To prevent this from affecting the indexing of the validator + // set, we check that the non-validator's public key is not already present in the validator index. + if _, ok := cc.rnManager.GetValidatorIndex().Get(pk.Serialize()); ok { + cc.rnManager.Disconnect(rn) + continue + } + + // If the RemoteNode turns out to be in the validator set, index it. + if _, ok := activeValidatorsMap.Get(pk.Serialize()); ok { + cc.rnManager.SetValidator(rn) + cc.rnManager.UnsetNonValidator(rn) + } + } +} + +// connectValidators attempts to connect to all active validators that are not already connected. It is called +// periodically by the validator connector. +func (cc *ConnectionController) connectValidators(activeValidatorsMap *collections.ConcurrentMap[bls.SerializedPublicKey, *ValidatorEntry]) { + // Look through the active validators and connect to any that we're not already connected to. + if cc.blsKeystore == nil { + return + } + + validators := activeValidatorsMap.Copy() + for pk, validator := range validators { + _, exists := cc.rnManager.GetValidatorIndex().Get(pk) + // If we're already connected to the validator, continue. + if exists { + continue + } + if cc.blsKeystore.GetSigner().GetPublicKey().Serialize() == pk { + continue + } + + publicKey, err := pk.Deserialize() + if err != nil { + continue + } + + // For now, we only dial the first domain in the validator's domain list. + address := string(validator.Domains[0]) + if err := cc.CreateValidatorConnection(address, publicKey); err != nil { + // TODO: Do we want to log an error here? + continue + } + } +} + func (cc *ConnectionController) CreateValidatorConnection(ipStr string, publicKey *bls.PublicKey) error { netAddr, err := cc.ConvertIPStringToNetAddress(ipStr) if err != nil {