Skip to content

Commit

Permalink
ConnectIps test
Browse files Browse the repository at this point in the history
  • Loading branch information
AeonSw4n committed Jan 18, 2024
1 parent c1f1b47 commit 1854db3
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 39 deletions.
96 changes: 96 additions & 0 deletions integration_testing/connection_controller_routines_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package integration_testing

import (
"github.com/deso-protocol/core/bls"
"github.com/deso-protocol/core/cmd"
"github.com/deso-protocol/core/lib"
"github.com/stretchr/testify/require"
"os"
"testing"
)

func TestConnectionControllerInitiatePersistentConnections(t *testing.T) {
require := require.New(t)

// NonValidator Node1 will set its --connect-ips to two non-validators node2 and node3,
// and two validators node4 and node5.
node1 := spawnNonValidatorNodeProtocol2(t, 18000, "node1")
node2 := spawnNonValidatorNodeProtocol2(t, 18001, "node2")
node3 := spawnNonValidatorNodeProtocol2(t, 18002, "node3")
blsPriv4, err := bls.NewPrivateKey()
require.NoError(err)
node4 := spawnValidatorNodeProtocol2(t, 18003, "node4", blsPriv4)
blsPriv5, err := bls.NewPrivateKey()
require.NoError(err)
node5 := spawnValidatorNodeProtocol2(t, 18004, "node5", blsPriv5)

node2 = startNode(t, node2)
node3 = startNode(t, node3)
node4 = startNode(t, node4)
node5 = startNode(t, node5)
defer node2.Stop()
defer node3.Stop()
defer node4.Stop()
defer node5.Stop()

oldGetActiveValidatorImpl := lib.GetActiveValidatorImpl
defer func() {
setGetActiveValidatorImpl(oldGetActiveValidatorImpl)
}()
setGetActiveValidatorImpl(func() map[bls.SerializedPublicKey]*lib.ValidatorEntry {
return map[bls.SerializedPublicKey]*lib.ValidatorEntry{
blsPriv4.PublicKey().Serialize(): createSimpleValidatorEntry(node4),
blsPriv5.PublicKey().Serialize(): createSimpleValidatorEntry(node5),
}
})

node1.Config.ConnectIPs = []string{
node2.Listeners[0].Addr().String(),
node3.Listeners[0].Addr().String(),
node4.Listeners[0].Addr().String(),
node5.Listeners[0].Addr().String(),
}
node1 = startNode(t, node1)
defer node1.Stop()
waitForNonValidatorOutboundConnection(t, node1, node2)
waitForNonValidatorOutboundConnection(t, node1, node3)
waitForValidatorConnection(t, node1, node4)
waitForValidatorConnection(t, node1, node5)
}

func spawnNonValidatorNodeProtocol2(t *testing.T, port uint32, id string) *cmd.Node {
dbDir := getDirectory(t)
t.Cleanup(func() {
os.RemoveAll(dbDir)
})
config := generateConfig(t, port, dbDir, 10)
config.SyncType = lib.NodeSyncTypeBlockSync
node := cmd.NewNode(config)
node.Params.UserAgent = id
node.Params.ProtocolVersion = lib.ProtocolVersion2
return node
}

func spawnValidatorNodeProtocol2(t *testing.T, port uint32, id string, blsPriv *bls.PrivateKey) *cmd.Node {
dbDir := getDirectory(t)
t.Cleanup(func() {
os.RemoveAll(dbDir)
})
config := generateConfig(t, port, dbDir, 10)
config.SyncType = lib.NodeSyncTypeBlockSync
config.PosValidatorSeed = blsPriv.ToString()
node := cmd.NewNode(config)
node.Params.UserAgent = id
node.Params.ProtocolVersion = lib.ProtocolVersion2
return node
}

func setGetActiveValidatorImpl(mapping func() map[bls.SerializedPublicKey]*lib.ValidatorEntry) {
lib.GetActiveValidatorImpl = mapping
}

func createSimpleValidatorEntry(node *cmd.Node) *lib.ValidatorEntry {
return &lib.ValidatorEntry{
Domains: [][]byte{[]byte(node.Listeners[0].Addr().String())},
}
}
5 changes: 2 additions & 3 deletions integration_testing/connection_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,17 +169,16 @@ func TestConnectionControllerHandshakeDataErrors(t *testing.T) {
require := require.New(t)

dbDir1 := getDirectory(t)
dbDir2 := getDirectory(t)
defer os.RemoveAll(dbDir1)
defer os.RemoveAll(dbDir2)

config1 := generateConfig(t, 18000, dbDir1, 10)
config1.SyncType = lib.NodeSyncTypeBlockSync
blsPriv1, err := bls.NewPrivateKey()
require.NoError(err)
config1.PosValidatorSeed = blsPriv1.ToString()

// This node should have ProtocolVersion2, but it has ProtocolVersion1 as we want it to disconnect.
dbDir2 := getDirectory(t)
defer os.RemoveAll(dbDir2)
config2 := generateConfig(t, 18001, dbDir2, 10)
config2.SyncType = lib.NodeSyncTypeBlockSync
blsPriv2, err := bls.NewPrivateKey()
Expand Down
2 changes: 1 addition & 1 deletion integration_testing/tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func generateConfig(t *testing.T, port uint32, dataDir string, maxPeers uint32)
config.MaxSyncBlockHeight = 0
config.ConnectIPs = []string{}
config.PrivateMode = true
config.GlogV = 0
config.GlogV = 2
config.GlogVmodule = "*bitcoin_manager*=0,*balance*=0,*view*=0,*frontend*=0,*peer*=0,*addr*=0,*network*=0,*utils*=0,*connection*=0,*main*=0,*server*=0,*mempool*=0,*miner*=0,*blockchain*=0"
config.MaxInboundPeers = maxPeers
config.TargetOutboundPeers = maxPeers
Expand Down
81 changes: 49 additions & 32 deletions lib/connection_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ import (
"time"
)

type GetActiveValidatorsFunc func() map[bls.SerializedPublicKey]*ValidatorEntry

var GetActiveValidatorImpl GetActiveValidatorsFunc = func() map[bls.SerializedPublicKey]*ValidatorEntry {
return make(map[bls.SerializedPublicKey]*ValidatorEntry)
}

// ConnectionController is a structure that oversees all connections to remote nodes. It is responsible for kicking off
// the initial connections node makes to the network. It is also responsible for creating RemoteNodes from all
// successful outbound and inbound connections. The ConnectionController also ensures that the node is connected to
Expand All @@ -29,8 +35,6 @@ type ConnectionController struct {

rnManager *RemoteNodeManager

getActiveValidators func() map[bls.SerializedPublicKey]*ValidatorEntry

// The address manager keeps track of peer addresses we're aware of. When
// we need to connect to a new outbound peer, it chooses one of the addresses
// it's aware of at random and provides it to us.
Expand All @@ -53,8 +57,8 @@ type ConnectionController struct {
}

func NewConnectionController(params *DeSoParams, cmgr *ConnectionManager, handshakeController *HandshakeController,
rnManager *RemoteNodeManager, blsKeystore *BLSKeystore, addrMgr *addrmgr.AddrManager, targetOutboundPeers uint32,
maxInboundPeers uint32, limitOneInboundConnectionPerIP bool) *ConnectionController {
rnManager *RemoteNodeManager, blsKeystore *BLSKeystore, addrMgr *addrmgr.AddrManager, connectIps []string,
targetOutboundPeers uint32, maxInboundPeers uint32, limitOneInboundConnectionPerIP bool) *ConnectionController {

return &ConnectionController{
params: params,
Expand All @@ -63,6 +67,7 @@ func NewConnectionController(params *DeSoParams, cmgr *ConnectionManager, handsh
handshake: handshakeController,
rnManager: rnManager,
AddrMgr: addrMgr,
connectIps: connectIps,
targetOutboundPeers: targetOutboundPeers,
maxInboundPeers: maxInboundPeers,
limitOneInboundConnectionPerIP: limitOneInboundConnectionPerIP,
Expand Down Expand Up @@ -95,6 +100,7 @@ func (cc *ConnectionController) initiatePersistentConnections() {
// disconnect from one, we will try to reconnect to the same one.
if len(cc.connectIps) > 0 {
for _, connectIp := range cc.connectIps {
glog.Infof("ConnectionController.initiatePersistentConnections: Connecting to connectIp: %v", connectIp)
if err := cc.CreateNonValidatorPersistentOutboundConnection(connectIp); err != nil {
glog.Errorf("ConnectionController.initiatePersistentConnections: Problem connecting "+
"to connectIp %v: %v", connectIp, err)
Expand All @@ -116,7 +122,7 @@ func (cc *ConnectionController) startValidatorConnector() {
cc.exitGroup.Done()
return
case <-time.After(1 * time.Second):
activeValidatorsMap := cc.getActiveValidators()
activeValidatorsMap := GetActiveValidatorImpl()
cc.refreshValidatorIndex(activeValidatorsMap)
cc.connectValidators(activeValidatorsMap)
}
Expand All @@ -136,7 +142,6 @@ func (cc *ConnectionController) startNonValidatorConnector() {
cc.exitGroup.Done()
return
case <-time.After(1 * time.Second):

cc.refreshNonValidatorOutboundIndex()
cc.refreshNonValidatorInboundIndex()
cc.connectNonValidators()
Expand Down Expand Up @@ -271,12 +276,20 @@ func (cc *ConnectionController) refreshValidatorIndex(activeValidatorsMap map[bl
// periodically by the validator connector.
func (cc *ConnectionController) connectValidators(activeValidatorsMap map[bls.SerializedPublicKey]*ValidatorEntry) {
// Look through the active validators and connect to any that we're not already connected to.
if cc.blsKeystore == nil {
return
}

for pk, validator := range activeValidatorsMap {
_, exists := cc.rnManager.GetValidatorIndex().Get(pk)
// If we're already connected to the validator, continue.
if exists {
continue
}
if cc.blsKeystore.GetSigner().GetPublicKey().Serialize() == pk {
continue
}

publicKey, err := pk.Deserialize()
if err != nil {
continue
Expand All @@ -295,32 +308,6 @@ func (cc *ConnectionController) connectValidators(activeValidatorsMap map[bls.Se
// ## NonValidator Connections
// ###########################

func (cc *ConnectionController) connectNonValidators() {
// Only connect to addresses from the addrmgr if we don't specify --connect-ips. These addresses are *not* persistent,
// meaning if we disconnect from one we'll try a different one.
// TODO: We used this condition in the old code to prevent the node from connecting to non-connect-ips nodes.
// I'm not sure whether this is still necessary. I suppose the concern here was that the connect-ips nodes
// should be prioritized over the addrmgr nodes, especially during syncing. However, I think we can achieve the
// same result by defining another flag, like a boolean --sync-from-persistent-peers-only, which could indicate that
// we disregard non-persistent non-connect-ips nodes during syncing, if the flag is set to true.
if len(cc.connectIps) == 0 {
return
}

numOutboundPeers := uint32(cc.rnManager.GetNonValidatorOutboundIndex().Count())

remainingOutboundPeers := uint32(0)
if numOutboundPeers < cc.targetOutboundPeers {
remainingOutboundPeers = cc.targetOutboundPeers - numOutboundPeers
}
for ii := uint32(0); ii < remainingOutboundPeers; ii++ {
addr := cc.getRandomUnconnectedAddress()
cc.AddrMgr.Attempt(addr)
// FIXME: error handle
cc.rnManager.CreateNonValidatorOutboundConnection(addr)
}
}

// refreshNonValidatorOutboundIndex is called periodically by the peer connector. It is responsible for disconnecting excess
// outbound remote nodes.
func (cc *ConnectionController) refreshNonValidatorOutboundIndex() {
Expand Down Expand Up @@ -379,6 +366,36 @@ func (cc *ConnectionController) refreshNonValidatorInboundIndex() {
}
}

func (cc *ConnectionController) connectNonValidators() {
// Only connect to addresses from the addrmgr if we don't specify --connect-ips. These addresses are *not* persistent,
// meaning if we disconnect from one we'll try a different one.
// TODO: We used this condition in the old code to prevent the node from connecting to non-connect-ips nodes.
// I'm not sure whether this is still necessary. I suppose the concern here was that the connect-ips nodes
// should be prioritized over the addrmgr nodes, especially during syncing. However, I think we can achieve the
// same result by defining another flag, like a boolean --sync-from-persistent-peers-only, which could indicate that
// we disregard non-persistent non-connect-ips nodes during syncing, if the flag is set to true.
// FIXME: This about uncommenting the below condition.
//if len(cc.connectIps) == 0 {
// return
//}

numOutboundPeers := uint32(cc.rnManager.GetNonValidatorOutboundIndex().Count())

remainingOutboundPeers := uint32(0)
if numOutboundPeers < cc.targetOutboundPeers {
remainingOutboundPeers = cc.targetOutboundPeers - numOutboundPeers
}
for ii := uint32(0); ii < remainingOutboundPeers; ii++ {
addr := cc.getRandomUnconnectedAddress()
if addr == nil {
break
}
cc.AddrMgr.Attempt(addr)
// FIXME: error handle
cc.rnManager.CreateNonValidatorOutboundConnection(addr)
}
}

func (cc *ConnectionController) getRandomUnconnectedAddress() *wire.NetAddress {
for tries := 0; tries < 100; tries++ {
addr := cc.AddrMgr.GetAddress()
Expand Down
2 changes: 1 addition & 1 deletion lib/handshake_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (hc *HandshakeController) _handleVersionMessage(origin *Peer, desoMsg DeSoM
if hc.usedNonces.Contains(msgNonce) {
hc.usedNonces.Delete(msgNonce)
glog.Errorf("HandshakeController._handleVersionMessage: Disconnecting RemoteNode with id: (%v) "+
"nonce collision", origin.ID)
"nonce collision, nonce (%v)", origin.ID, msgNonce)
hc.rnManager.Disconnect(rn)
return
}
Expand Down
4 changes: 2 additions & 2 deletions lib/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,8 +499,8 @@ func NewServer(
rnManager := NewRemoteNodeManager(srv, _chain, _cmgr, _blsKeystore, _params, _minFeeRateNanosPerKB, nodeServices)

srv.handshakeController = NewHandshakeController(rnManager)
srv.connectionController = NewConnectionController(_params, _cmgr, srv.handshakeController, rnManager,
_blsKeystore, _desoAddrMgr, _targetOutboundPeers, _maxInboundPeers, _limitOneInboundConnectionPerIP)
srv.connectionController = NewConnectionController(_params, _cmgr, srv.handshakeController, rnManager, _blsKeystore,
_desoAddrMgr, _connectIps, _targetOutboundPeers, _maxInboundPeers, _limitOneInboundConnectionPerIP)

if srv.stateChangeSyncer != nil {
srv.stateChangeSyncer.BlockHeight = uint64(_chain.headerTip().Height)
Expand Down

0 comments on commit 1854db3

Please sign in to comment.