Skip to content

Commit

Permalink
Fix another integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
AeonSw4n committed Jan 29, 2024
1 parent eda2d3e commit a31e152
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 26 deletions.
2 changes: 1 addition & 1 deletion integration_testing/blocksync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestSimpleSyncRestart(t *testing.T) {
randomHeight := randomUint32Between(t, 10, node2.Config.MaxSyncBlockHeight)
t.Logf("Random height for a restart (re-use if test failed): %v", randomHeight)
// Reboot node2 at a specific height and reconnect it with node1
restartAtHeight(t, node2, randomHeight)
node2 = restartAtHeight(t, node2, randomHeight)
waitForNodeToFullySync(node2)

compareNodesByDB(t, node1, node2, 0)
Expand Down
2 changes: 1 addition & 1 deletion integration_testing/tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func generateConfig(t *testing.T, port uint32, dataDir string, maxPeers uint32)
config.MaxSyncBlockHeight = 0
config.ConnectIPs = []string{}
config.PrivateMode = true
config.GlogV = 2
config.GlogV = 0
config.GlogVmodule = "*bitcoin_manager*=0,*balance*=0,*view*=0,*frontend*=0,*peer*=0,*addr*=0,*network*=0,*utils*=0,*connection*=0,*main*=0,*server*=0,*mempool*=0,*miner*=0,*blockchain*=0"
config.MaxInboundPeers = maxPeers
config.TargetOutboundPeers = maxPeers
Expand Down
13 changes: 10 additions & 3 deletions lib/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ type Peer struct {
StatsMtx deadlock.RWMutex
TimeOffsetSecs int64
TimeConnected time.Time
startingHeight uint32
ID uint64
// Ping-related fields.
LastPingNonce uint64
Expand All @@ -70,6 +69,7 @@ type Peer struct {
// Basic state.
PeerInfoMtx deadlock.Mutex
serviceFlags ServiceFlag
latestHeight uint64
addrStr string
netAddr *wire.NetAddress
minTxFeeRateNanosPerKB uint64
Expand Down Expand Up @@ -665,10 +665,10 @@ func (pp *Peer) MinFeeRateNanosPerKB() uint64 {
}

// StartingBlockHeight is the height of the peer's blockchain tip.
func (pp *Peer) StartingBlockHeight() uint32 {
func (pp *Peer) StartingBlockHeight() uint64 {
pp.StatsMtx.RLock()
defer pp.StatsMtx.RUnlock()
return pp.startingHeight
return pp.latestHeight
}

// NumBlocksToSend is the number of blocks the Peer has requested from
Expand Down Expand Up @@ -893,6 +893,13 @@ func (pp *Peer) _setKnownAddressesMap(key string, val bool) {
pp.knownAddressesMap[key] = val
}

func (pp *Peer) SetLatestBlockHeight(height uint64) {
pp.StatsMtx.Lock()
defer pp.StatsMtx.Unlock()

pp.latestHeight = height
}

func (pp *Peer) SetServiceFlag(sf ServiceFlag) {
pp.PeerInfoMtx.Lock()
defer pp.PeerInfoMtx.Unlock()
Expand Down
4 changes: 4 additions & 0 deletions lib/remote_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@ func (rn *RemoteNode) GetServiceFlag() ServiceFlag {
return rn.handshakeMetadata.serviceFlag
}

func (rn *RemoteNode) GetLatestBlockHeight() uint64 {
return rn.handshakeMetadata.latestBlockHeight
}

func (rn *RemoteNode) GetUserAgent() string {
return rn.handshakeMetadata.userAgent
}
Expand Down
44 changes: 23 additions & 21 deletions lib/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ type Server struct {
eventManager *EventManager
TxIndex *TXIndex

handshakeController *HandshakeManager
handshakeManager *HandshakeManager
// fastHotStuffEventLoop consensus.FastHotStuffEventLoop
connectionController *NetworkManager
networkManager *NetworkManager
// posMempool *PosMemPool TODO: Add the mempool later

params *DeSoParams
Expand Down Expand Up @@ -182,7 +182,7 @@ func (srv *Server) ResetRequestQueues() {
}

func (srv *Server) GetConnectionController() *NetworkManager {
return srv.connectionController
return srv.networkManager
}

// dataLock must be acquired for writing before calling this function.
Expand Down Expand Up @@ -504,8 +504,8 @@ func NewServer(
}
rnManager := NewRemoteNodeManager(srv, _chain, _cmgr, _blsKeystore, _params, _minFeeRateNanosPerKB, nodeServices)

srv.handshakeController = NewHandshakeController(rnManager)
srv.connectionController = NewConnectionController(_params, _cmgr, srv.handshakeController, rnManager, _blsKeystore,
srv.handshakeManager = NewHandshakeController(rnManager)
srv.networkManager = NewConnectionController(_params, _cmgr, srv.handshakeManager, rnManager, _blsKeystore,
_desoAddrMgr, _connectIps, _targetOutboundPeers, _maxInboundPeers, _limitOneInboundConnectionPerIP)

if srv.stateChangeSyncer != nil {
Expand Down Expand Up @@ -838,8 +838,8 @@ func (srv *Server) GetBlocks(pp *Peer, maxHeight int) {

func (srv *Server) _handleHeaderBundle(pp *Peer, msg *MsgDeSoHeaderBundle) {
printHeight := pp.StartingBlockHeight()
if srv.blockchain.headerTip().Height > printHeight {
printHeight = srv.blockchain.headerTip().Height
if uint64(srv.blockchain.headerTip().Height) > printHeight {
printHeight = uint64(srv.blockchain.headerTip().Height)
}
glog.Infof(CLog(Yellow, fmt.Sprintf("Received header bundle with %v headers "+
"in state %s from peer %v. Downloaded ( %v / %v ) total headers",
Expand Down Expand Up @@ -1539,14 +1539,15 @@ func (srv *Server) _startSync() {
// Find a peer with StartingHeight bigger than our best header tip.
var bestPeer *Peer
for _, peer := range srv.cmgr.GetAllPeers() {

if !peer.IsSyncCandidate() {
glog.Infof("Peer is not sync candidate: %v (isOutbound: %v)", peer, peer.isOutbound)
continue
}

// Choose the peer with the best height out of everyone who's a
// valid sync candidate.
if peer.StartingBlockHeight() < bestHeight {
if peer.StartingBlockHeight() < uint64(bestHeight) {
continue
}

Expand Down Expand Up @@ -1602,6 +1603,7 @@ func (srv *Server) HandleAcceptedPeer(rn *RemoteNode) {
}
pp := rn.GetPeer()
pp.SetServiceFlag(rn.GetServiceFlag())
pp.SetLatestBlockHeight(rn.GetLatestBlockHeight())

isSyncCandidate := pp.IsSyncCandidate()
isSyncing := srv.blockchain.isSyncing()
Expand Down Expand Up @@ -2199,7 +2201,7 @@ func (srv *Server) _handleAddrMessage(origin *Peer, desoMsg DeSoMessage) {
var ok bool
if msg, ok = desoMsg.(*MsgDeSoAddr); !ok {
glog.Errorf("Server._handleAddrMessage: Problem decoding MsgDeSoAddr: %v", spew.Sdump(desoMsg))
srv.connectionController.rnManager.DisconnectById(id)
srv.networkManager.rnManager.DisconnectById(id)
return
}

Expand All @@ -2215,7 +2217,7 @@ func (srv *Server) _handleAddrMessage(origin *Peer, desoMsg DeSoMessage) {
"Peer id=%v for sending us an addr message with %d transactions, which exceeds "+
"the max allowed %d",
origin.ID, len(msg.AddrList), MaxAddrsPerAddrMsg))
srv.connectionController.rnManager.DisconnectById(id)
srv.networkManager.rnManager.DisconnectById(id)
return
}

Expand Down Expand Up @@ -2266,7 +2268,7 @@ func (srv *Server) _handleGetAddrMessage(origin *Peer, desoMsg DeSoMessage) {
if _, ok := desoMsg.(*MsgDeSoGetAddr); !ok {
glog.Errorf("Server._handleAddrMessage: Problem decoding "+
"MsgDeSoAddr: %v", spew.Sdump(desoMsg))
srv.connectionController.rnManager.DisconnectById(id)
srv.networkManager.rnManager.DisconnectById(id)
return
}

Expand All @@ -2292,10 +2294,10 @@ func (srv *Server) _handleGetAddrMessage(origin *Peer, desoMsg DeSoMessage) {
}
res.AddrList = append(res.AddrList, singleAddr)
}
rn := srv.connectionController.rnManager.GetRemoteNodeById(id)
if err := srv.connectionController.rnManager.SendMessage(rn, res); err != nil {
rn := srv.networkManager.rnManager.GetRemoteNodeById(id)
if err := srv.networkManager.rnManager.SendMessage(rn, res); err != nil {
glog.Errorf("Server._handleGetAddrMessage: Problem sending addr message to peer %v: %v", origin, err)
srv.connectionController.rnManager.DisconnectById(id)
srv.networkManager.rnManager.DisconnectById(id)
return
}
}
Expand All @@ -2305,9 +2307,9 @@ func (srv *Server) _handleControlMessages(serverMessage *ServerMessage) (_should
// Control messages used internally to signal to the server.
case *MsgDeSoDisconnectedPeer:
srv._handleDonePeer(serverMessage.Peer)
srv.connectionController._handleDonePeerMessage(serverMessage.Peer, serverMessage.Msg)
srv.networkManager._handleDonePeerMessage(serverMessage.Peer, serverMessage.Msg)
case *MsgDeSoNewConnection:
srv.connectionController._handleNewConnectionMessage(serverMessage.Peer, serverMessage.Msg)
srv.networkManager._handleNewConnectionMessage(serverMessage.Peer, serverMessage.Msg)
case *MsgDeSoQuit:
return true
}
Expand Down Expand Up @@ -2346,9 +2348,9 @@ func (srv *Server) _handlePeerMessages(serverMessage *ServerMessage) {
case *MsgDeSoInv:
srv._handleInv(serverMessage.Peer, msg)
case *MsgDeSoVersion:
srv.handshakeController.handleVersionMessage(serverMessage.Peer, serverMessage.Msg)
srv.handshakeManager.handleVersionMessage(serverMessage.Peer, serverMessage.Msg)
case *MsgDeSoVerack:
srv.handshakeController.handleVerackMessage(serverMessage.Peer, serverMessage.Msg)
srv.handshakeManager.handleVerackMessage(serverMessage.Peer, serverMessage.Msg)
}
}

Expand Down Expand Up @@ -2514,7 +2516,7 @@ func (srv *Server) _startAddressRelayer() {
// For the first ten minutes after the connection controller starts, relay our address to all
// peers. After the first ten minutes, do it once every 24 hours.
glog.V(1).Infof("Server.startAddressRelayer: Relaying our own addr to peers")
remoteNodes := srv.connectionController.rnManager.GetAllRemoteNodes().GetAll()
remoteNodes := srv.networkManager.rnManager.GetAllRemoteNodes().GetAll()
if numMinutesPassed < 10 || numMinutesPassed%(RebroadcastNodeAddrIntervalMinutes) == 0 {
for _, rn := range remoteNodes {
if !rn.IsHandshakeCompleted() {
Expand Down Expand Up @@ -2596,7 +2598,7 @@ func (srv *Server) Stop() {
srv.cmgr.Stop()
glog.Infof(CLog(Yellow, "Server.Stop: Closed the ConnectionManger"))

srv.connectionController.Stop()
srv.networkManager.Stop()
glog.Infof(CLog(Yellow, "Server.Stop: Closed the NetworkManager"))

// Stop the miner if we have one running.
Expand Down Expand Up @@ -2682,7 +2684,7 @@ func (srv *Server) Start() {
go srv.miner.Start()
}

srv.connectionController.Start()
srv.networkManager.Start()
}

// SyncPrefixProgress keeps track of sync progress on an individual prefix. It is used in
Expand Down

0 comments on commit a31e152

Please sign in to comment.