Skip to content

Commit

Permalink
Revert "Code split"
Browse files Browse the repository at this point in the history
This reverts commit 831096ac1d3008233868ac8b8f0eca4cd2b9553e.
  • Loading branch information
AeonSw4n committed Jan 24, 2024
1 parent 394464f commit efad766
Show file tree
Hide file tree
Showing 3 changed files with 326 additions and 2 deletions.
217 changes: 217 additions & 0 deletions integration_testing/connection_controller_routines_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,223 @@ func TestConnectionControllerInitiatePersistentConnections(t *testing.T) {
t.Logf("Test #2 passed | Successfully run validator node6 with --connect-ips set to node2, node3, node4, node5")
}

func TestConnectionControllerValidatorConnector(t *testing.T) {
require := require.New(t)
t.Cleanup(func() {
setGetActiveValidatorImpl(lib.BasicGetActiveValidators)
})

// Spawn 5 validators node1, node2, node3, node4, node5 and two non-validators node6 and node7.
// All the validators are initially in the validator set. And later, node1 and node2 will be removed from the
// validator set. Then, make node3 inactive, and node2 active again. Then, make all the validators inactive.
// Make node6, and node7 connect-ips to all the validators.

blsPriv1, err := bls.NewPrivateKey()
require.NoError(err)
node1 := spawnValidatorNodeProtocol2(t, 18000, "node1", blsPriv1)
blsPriv2, err := bls.NewPrivateKey()
require.NoError(err)
node2 := spawnValidatorNodeProtocol2(t, 18001, "node2", blsPriv2)
blsPriv3, err := bls.NewPrivateKey()
require.NoError(err)
node3 := spawnValidatorNodeProtocol2(t, 18002, "node3", blsPriv3)
blsPriv4, err := bls.NewPrivateKey()
require.NoError(err)
node4 := spawnValidatorNodeProtocol2(t, 18003, "node4", blsPriv4)
blsPriv5, err := bls.NewPrivateKey()
require.NoError(err)
node5 := spawnValidatorNodeProtocol2(t, 18004, "node5", blsPriv5)

node6 := spawnNonValidatorNodeProtocol2(t, 18005, "node6")
node7 := spawnNonValidatorNodeProtocol2(t, 18006, "node7")

node1 = startNode(t, node1)
defer node1.Stop()
node2 = startNode(t, node2)
defer node2.Stop()
node3 = startNode(t, node3)
defer node3.Stop()
node4 = startNode(t, node4)
defer node4.Stop()
node5 = startNode(t, node5)
defer node5.Stop()
setGetActiveValidatorImplWithValidatorNodes(t, node1, node2, node3, node4, node5)

node6.Config.ConnectIPs = []string{
node1.Listeners[0].Addr().String(),
node2.Listeners[0].Addr().String(),
node3.Listeners[0].Addr().String(),
node4.Listeners[0].Addr().String(),
node5.Listeners[0].Addr().String(),
}
node7.Config.ConnectIPs = node6.Config.ConnectIPs
node6 = startNode(t, node6)
defer node6.Stop()
node7 = startNode(t, node7)
defer node7.Stop()

// Verify full graph between active validators.
waitForValidatorFullGraph(t, node1, node2, node3, node4, node5)
// Verify connections of non-validators.
for _, nonValidator := range []*cmd.Node{node6, node7} {
waitForValidatorConnectionOneWay(t, nonValidator, node1, node2, node3, node4, node5)
}
// Verify connections of initial validators.
for _, validator := range []*cmd.Node{node1, node2, node3, node4, node5} {
waitForNonValidatorInboundConnection(t, validator, node6)
waitForNonValidatorInboundConnection(t, validator, node7)
}
// Verify connection counts of active validators.
for _, validator := range []*cmd.Node{node1, node2, node3, node4, node5} {
waitForMinNonValidatorCountRemoteNodeIndexer(t, validator, 6, 4, 0, 2)
}
// NOOP Verify connection counts of inactive validators.
// Verify connection counts of non-validators.
waitForCountRemoteNodeIndexer(t, node6, 5, 5, 0, 0)
waitForCountRemoteNodeIndexer(t, node7, 5, 5, 0, 0)
t.Logf("Test #1 passed | Successfully run validators node1, node2, node3, node4, node5; non-validators node6, node7")

// Remove node1 and node2 from the validator set.
setGetActiveValidatorImplWithValidatorNodes(t, node3, node4, node5)
// Verify full graph between active validators.
waitForValidatorFullGraph(t, node3, node4, node5)
// Verify connections of non-validators.
for _, nonValidator := range []*cmd.Node{node1, node2, node6, node7} {
waitForValidatorConnectionOneWay(t, nonValidator, node3, node4, node5)
}
// Verify connections of initial validators.
for _, validator := range []*cmd.Node{node1, node2, node3, node4, node5} {
waitForNonValidatorInboundConnection(t, validator, node6)
waitForNonValidatorInboundConnection(t, validator, node7)
}
// Verify connections of active validators.
for _, validator := range []*cmd.Node{node3, node4, node5} {
waitForNonValidatorInboundXOROutboundConnection(t, validator, node1)
waitForNonValidatorInboundXOROutboundConnection(t, validator, node2)
waitForMinNonValidatorCountRemoteNodeIndexer(t, validator, 6, 2, 0, 2)
}
// Verify connection counts of inactive validators.
for _, validator := range []*cmd.Node{node1, node2} {
waitForMinNonValidatorCountRemoteNodeIndexer(t, validator, 6, 3, 0, 2)
}
// Verify connection counts of non-validators.
waitForCountRemoteNodeIndexer(t, node6, 5, 3, 2, 0)
waitForCountRemoteNodeIndexer(t, node7, 5, 3, 2, 0)
t.Logf("Test #2 passed | Successfully run validators node3, node4, node5; inactive-validators node1, node2; " +
"non-validators node6, node7")

// Remove node3 from the validator set. Make node1 active again.
setGetActiveValidatorImplWithValidatorNodes(t, node1, node4, node5)
// Verify full graph between active validators.
waitForValidatorFullGraph(t, node1, node4, node5)
// Verify connections of non-validators.
for _, nonValidator := range []*cmd.Node{node2, node3, node6, node7} {
waitForValidatorConnectionOneWay(t, nonValidator, node1, node4, node5)
}
// Verify connections of initial validators.
for _, validator := range []*cmd.Node{node1, node2, node3, node4, node5} {
waitForNonValidatorInboundConnection(t, validator, node6)
waitForNonValidatorInboundConnection(t, validator, node7)
}
// Verify connections of active validators.
for _, validator := range []*cmd.Node{node1, node4, node5} {
waitForNonValidatorInboundXOROutboundConnection(t, validator, node2)
waitForNonValidatorInboundXOROutboundConnection(t, validator, node3)
waitForMinNonValidatorCountRemoteNodeIndexer(t, validator, 6, 2, 0, 2)
}
// Verify connection counts of inactive validators.
for _, validator := range []*cmd.Node{node2, node3} {
waitForMinNonValidatorCountRemoteNodeIndexer(t, validator, 6, 3, 0, 2)
}
// Verify connection counts of non-validators.
waitForCountRemoteNodeIndexer(t, node6, 5, 3, 2, 0)
waitForCountRemoteNodeIndexer(t, node7, 5, 3, 2, 0)
t.Logf("Test #3 passed | Successfully run validators node1, node4, node5; inactive validators node2, node3; " +
"non-validators node6, node7")

// Make all validators inactive.
setGetActiveValidatorImplWithValidatorNodes(t)
// NOOP Verify full graph between active validators.
// NOOP Verify connections of non-validators.
// Verify connections of initial validators.
for _, validator := range []*cmd.Node{node1, node2, node3, node4, node5} {
waitForNonValidatorInboundConnection(t, validator, node6)
waitForNonValidatorInboundConnection(t, validator, node7)
}
// NOOP Verify connections of active validators.
// Verify connections and counts of inactive validators.
inactiveValidators := []*cmd.Node{node1, node2, node3, node4, node5}
for ii := 0; ii < len(inactiveValidators); ii++ {
for jj := ii + 1; jj < len(inactiveValidators); jj++ {
waitForNonValidatorInboundXOROutboundConnection(t, inactiveValidators[ii], inactiveValidators[jj])
}
}
inactiveValidatorsRev := []*cmd.Node{node5, node4, node3, node2, node1}
for ii := 0; ii < len(inactiveValidatorsRev); ii++ {
for jj := ii + 1; jj < len(inactiveValidatorsRev); jj++ {
waitForNonValidatorInboundXOROutboundConnection(t, inactiveValidatorsRev[ii], inactiveValidatorsRev[jj])
}
}
for _, validator := range inactiveValidators {
waitForMinNonValidatorCountRemoteNodeIndexer(t, validator, 6, 0, 0, 2)
}
// Verify connection counts of non-validators.
waitForCountRemoteNodeIndexer(t, node6, 5, 0, 5, 0)
waitForCountRemoteNodeIndexer(t, node7, 5, 0, 5, 0)
t.Logf("Test #4 passed | Successfully run inactive validators node1, node2, node3, node4, node5; " +
"non-validators node6, node7")
}

func TestConnectionControllerValidatorInboundDeduplication(t *testing.T) {
require := require.New(t)
t.Cleanup(func() {
setGetActiveValidatorImpl(lib.BasicGetActiveValidators)
})

// Spawn a non-validator node1, and two validators node2, node3. The validator nodes will have the same public key.
// Node2 and node3 will not initially be in the validator set. First, node2 will start an outbound connection to
// node1. We wait until the node2 is re-indexed as non-validator by node1, and then we make node3 open an outbound
// connection to node1. We wait until node3 is re-indexed as non-validator by node1. Then, we make node2 and node3
// join the validator set (i.e. add one entry with the duplicated public key). Now, node1 should disconnect from
// either node2 or node3 because of duplicate public key.

node1 := spawnNonValidatorNodeProtocol2(t, 18000, "node1")
blsPriv2, err := bls.NewPrivateKey()
require.NoError(err)
node2 := spawnValidatorNodeProtocol2(t, 18001, "node2", blsPriv2)
node3 := spawnValidatorNodeProtocol2(t, 18002, "node3", blsPriv2)

node1 = startNode(t, node1)
defer node1.Stop()
node2 = startNode(t, node2)
defer node2.Stop()
node3 = startNode(t, node3)
defer node3.Stop()

cc2 := node2.Server.GetConnectionController()
require.NoError(cc2.CreateNonValidatorOutboundConnection(node1.Listeners[0].Addr().String()))
// First wait for node2 to be indexed as a validator by node1.
waitForValidatorConnection(t, node1, node2)
// Now wait for node2 to be re-indexed as a non-validator.
waitForNonValidatorInboundConnectionDynamic(t, node1, node2, true)
waitForNonValidatorOutboundConnection(t, node2, node1)

// Now connect node3 to node1.
cc3 := node3.Server.GetConnectionController()
require.NoError(cc3.CreateNonValidatorOutboundConnection(node1.Listeners[0].Addr().String()))
// First wait for node3 to be indexed as a validator by node1.
waitForValidatorConnection(t, node1, node3)
// Now wait for node3 to be re-indexed as a non-validator.
waitForNonValidatorInboundConnectionDynamic(t, node1, node3, true)
waitForNonValidatorOutboundConnection(t, node3, node1)

// Now add node2 and node3 to the validator set.
setGetActiveValidatorImplWithValidatorNodes(t, node2)
// Now wait for node1 to disconnect from either node2 or node3.
waitForCountRemoteNodeIndexer(t, node1, 1, 1, 0, 0)
t.Logf("Test #1 passed | Successfully run non-validator node1; validators node2, node3 with duplicate public key")
}

func TestConnectionControllerNonValidatorCircularConnectIps(t *testing.T) {
node1 := spawnNonValidatorNodeProtocol2(t, 18000, "node1")
node2 := spawnNonValidatorNodeProtocol2(t, 18001, "node2")
Expand Down
8 changes: 8 additions & 0 deletions integration_testing/connection_controller_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,14 @@ func waitForNonValidatorInboundConnection(t *testing.T, node1 *cmd.Node, node2 *
waitForCondition(t, fmt.Sprintf("Waiting for Node (%s) to connect to inbound non-validator Node (%s)", userAgentN1, userAgentN2), condition)
}

func waitForNonValidatorInboundConnectionDynamic(t *testing.T, node1 *cmd.Node, node2 *cmd.Node, inactiveValidator bool) {
userAgentN1 := node1.Params.UserAgent
userAgentN2 := node2.Params.UserAgent
condition := conditionNonValidatorInboundConnectionDynamic(t, node1, node2, inactiveValidator)
waitForCondition(t, fmt.Sprintf("Waiting for Node (%s) to connect to inbound non-validator Node (%s), "+
"inactiveValidator (%v)", userAgentN1, userAgentN2, inactiveValidator), condition)
}

func conditionNonValidatorInboundConnection(t *testing.T, node1 *cmd.Node, node2 *cmd.Node) func() bool {
return conditionNonValidatorInboundConnectionDynamic(t, node1, node2, false)
}
Expand Down
103 changes: 101 additions & 2 deletions lib/connection_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,12 @@ func NewConnectionController(params *DeSoParams, cmgr *ConnectionManager, handsh
}

func (cc *ConnectionController) Start() {
cc.startGroup.Add(1)
cc.startGroup.Add(2)
go cc.startPersistentConnector()
go cc.startValidatorConnector()

cc.startGroup.Wait()
cc.exitGroup.Add(1)
cc.exitGroup.Add(2)
}

func (cc *ConnectionController) Stop() {
Expand All @@ -116,6 +117,26 @@ func (cc *ConnectionController) startPersistentConnector() {
}
}

// startValidatorConnector is responsible for ensuring that the node is connected to all active validators. It does
// this in two steps. First, it looks through the already established connections and checks if any of these connections
// are validators. If they are, it adds them to the validator index. It also checks if any of the existing validators
// are no longer active and removes them from the validator index. Second, it checks if any of the active validators
// are missing from the validator index. If they are, it attempts to connect to them.
func (cc *ConnectionController) startValidatorConnector() {
cc.startGroup.Done()
for {
select {
case <-cc.exitChan:
cc.exitGroup.Done()
return
case <-time.After(1 * time.Second):
activeValidatorsMap := GetActiveValidatorImpl()
cc.refreshValidatorIndex(activeValidatorsMap)
cc.connectValidators(activeValidatorsMap)
}
}
}

// ###########################
// ## Handlers (Peer, DeSoMessage)
// ###########################
Expand Down Expand Up @@ -231,6 +252,84 @@ func (cc *ConnectionController) refreshConnectIps() {
}
}

// ###########################
// ## Validator Connections
// ###########################

// refreshValidatorIndex re-indexes validators based on the activeValidatorsMap. It is called periodically by the
// validator connector.
func (cc *ConnectionController) refreshValidatorIndex(activeValidatorsMap *collections.ConcurrentMap[bls.SerializedPublicKey, *ValidatorEntry]) {
// De-index inactive validators. We skip any checks regarding RemoteNodes connection status, nor do we verify whether
// de-indexing the validator would result in an excess number of outbound/inbound connections. Any excess connections
// will be cleaned up by the peer connector.
validatorRemoteNodeMap := cc.rnManager.GetValidatorIndex().Copy()
for pk, rn := range validatorRemoteNodeMap {
// If the validator is no longer active, de-index it.
if _, ok := activeValidatorsMap.Get(pk); !ok {
cc.rnManager.SetNonValidator(rn)
cc.rnManager.UnsetValidator(rn)
}
}

// Look for validators in our existing outbound / inbound connections.
allNonValidators := cc.rnManager.GetAllNonValidators()
for _, rn := range allNonValidators {
// It is possible for a RemoteNode to be in the non-validator indices, and still have a public key. This can happen
// if the RemoteNode advertised support for the SFValidator service flag during handshake, and provided us
// with a public key, and a corresponding proof of possession signature.
pk := rn.GetValidatorPublicKey()
if pk == nil {
continue
}
// It is possible that through unlikely concurrence, and malevolence, two non-validators happen to have the same
// public key, which goes undetected during handshake. To prevent this from affecting the indexing of the validator
// set, we check that the non-validator's public key is not already present in the validator index.
if _, ok := cc.rnManager.GetValidatorIndex().Get(pk.Serialize()); ok {
cc.rnManager.Disconnect(rn)
continue
}

// If the RemoteNode turns out to be in the validator set, index it.
if _, ok := activeValidatorsMap.Get(pk.Serialize()); ok {
cc.rnManager.SetValidator(rn)
cc.rnManager.UnsetNonValidator(rn)
}
}
}

// connectValidators attempts to connect to all active validators that are not already connected. It is called
// periodically by the validator connector.
func (cc *ConnectionController) connectValidators(activeValidatorsMap *collections.ConcurrentMap[bls.SerializedPublicKey, *ValidatorEntry]) {
// Look through the active validators and connect to any that we're not already connected to.
if cc.blsKeystore == nil {
return
}

validators := activeValidatorsMap.Copy()
for pk, validator := range validators {
_, exists := cc.rnManager.GetValidatorIndex().Get(pk)
// If we're already connected to the validator, continue.
if exists {
continue
}
if cc.blsKeystore.GetSigner().GetPublicKey().Serialize() == pk {
continue
}

publicKey, err := pk.Deserialize()
if err != nil {
continue
}

// For now, we only dial the first domain in the validator's domain list.
address := string(validator.Domains[0])
if err := cc.CreateValidatorConnection(address, publicKey); err != nil {
// TODO: Do we want to log an error here?
continue
}
}
}

func (cc *ConnectionController) CreateValidatorConnection(ipStr string, publicKey *bls.PublicKey) error {
netAddr, err := cc.ConvertIPStringToNetAddress(ipStr)
if err != nil {
Expand Down

0 comments on commit efad766

Please sign in to comment.