diff --git a/integration_testing/blocksync_test.go b/integration_testing/blocksync_test.go index aeb989f47..b2dbaf03a 100644 --- a/integration_testing/blocksync_test.go +++ b/integration_testing/blocksync_test.go @@ -57,7 +57,7 @@ func TestSimpleSyncRestart(t *testing.T) { randomHeight := randomUint32Between(t, 10, node2.Config.MaxSyncBlockHeight) t.Logf("Random height for a restart (re-use if test failed): %v", randomHeight) // Reboot node2 at a specific height and reconnect it with node1 - restartAtHeight(t, node2, randomHeight) + node2 = restartAtHeight(t, node2, randomHeight) waitForNodeToFullySync(node2) compareNodesByDB(t, node1, node2, 0) diff --git a/integration_testing/tools.go b/integration_testing/tools.go index 00469cc2e..59de71663 100644 --- a/integration_testing/tools.go +++ b/integration_testing/tools.go @@ -69,7 +69,7 @@ func generateConfig(t *testing.T, port uint32, dataDir string, maxPeers uint32) config.MaxSyncBlockHeight = 0 config.ConnectIPs = []string{} config.PrivateMode = true - config.GlogV = 2 + config.GlogV = 0 config.GlogVmodule = "*bitcoin_manager*=0,*balance*=0,*view*=0,*frontend*=0,*peer*=0,*addr*=0,*network*=0,*utils*=0,*connection*=0,*main*=0,*server*=0,*mempool*=0,*miner*=0,*blockchain*=0" config.MaxInboundPeers = maxPeers config.TargetOutboundPeers = maxPeers diff --git a/lib/peer.go b/lib/peer.go index 1aa2e409b..245e0d704 100644 --- a/lib/peer.go +++ b/lib/peer.go @@ -48,7 +48,6 @@ type Peer struct { StatsMtx deadlock.RWMutex TimeOffsetSecs int64 TimeConnected time.Time - startingHeight uint32 ID uint64 // Ping-related fields. LastPingNonce uint64 @@ -70,6 +69,7 @@ type Peer struct { // Basic state. PeerInfoMtx deadlock.Mutex serviceFlags ServiceFlag + latestHeight uint64 addrStr string netAddr *wire.NetAddress minTxFeeRateNanosPerKB uint64 @@ -665,10 +665,10 @@ func (pp *Peer) MinFeeRateNanosPerKB() uint64 { } // StartingBlockHeight is the height of the peer's blockchain tip. -func (pp *Peer) StartingBlockHeight() uint32 { +func (pp *Peer) StartingBlockHeight() uint64 { pp.StatsMtx.RLock() defer pp.StatsMtx.RUnlock() - return pp.startingHeight + return pp.latestHeight } // NumBlocksToSend is the number of blocks the Peer has requested from @@ -893,6 +893,13 @@ func (pp *Peer) _setKnownAddressesMap(key string, val bool) { pp.knownAddressesMap[key] = val } +func (pp *Peer) SetLatestBlockHeight(height uint64) { + pp.StatsMtx.Lock() + defer pp.StatsMtx.Unlock() + + pp.latestHeight = height +} + func (pp *Peer) SetServiceFlag(sf ServiceFlag) { pp.PeerInfoMtx.Lock() defer pp.PeerInfoMtx.Unlock() diff --git a/lib/remote_node.go b/lib/remote_node.go index 42fe21521..2848eefc8 100644 --- a/lib/remote_node.go +++ b/lib/remote_node.go @@ -203,6 +203,10 @@ func (rn *RemoteNode) GetServiceFlag() ServiceFlag { return rn.handshakeMetadata.serviceFlag } +func (rn *RemoteNode) GetLatestBlockHeight() uint64 { + return rn.handshakeMetadata.latestBlockHeight +} + func (rn *RemoteNode) GetUserAgent() string { return rn.handshakeMetadata.userAgent } diff --git a/lib/server.go b/lib/server.go index 06609967c..62a087c81 100644 --- a/lib/server.go +++ b/lib/server.go @@ -62,9 +62,9 @@ type Server struct { eventManager *EventManager TxIndex *TXIndex - handshakeController *HandshakeManager + handshakeManager *HandshakeManager // fastHotStuffEventLoop consensus.FastHotStuffEventLoop - connectionController *NetworkManager + networkManager *NetworkManager // posMempool *PosMemPool TODO: Add the mempool later params *DeSoParams @@ -182,7 +182,7 @@ func (srv *Server) ResetRequestQueues() { } func (srv *Server) GetConnectionController() *NetworkManager { - return srv.connectionController + return srv.networkManager } // dataLock must be acquired for writing before calling this function. @@ -504,8 +504,8 @@ func NewServer( } rnManager := NewRemoteNodeManager(srv, _chain, _cmgr, _blsKeystore, _params, _minFeeRateNanosPerKB, nodeServices) - srv.handshakeController = NewHandshakeController(rnManager) - srv.connectionController = NewConnectionController(_params, _cmgr, srv.handshakeController, rnManager, _blsKeystore, + srv.handshakeManager = NewHandshakeController(rnManager) + srv.networkManager = NewConnectionController(_params, _cmgr, srv.handshakeManager, rnManager, _blsKeystore, _desoAddrMgr, _connectIps, _targetOutboundPeers, _maxInboundPeers, _limitOneInboundConnectionPerIP) if srv.stateChangeSyncer != nil { @@ -838,8 +838,8 @@ func (srv *Server) GetBlocks(pp *Peer, maxHeight int) { func (srv *Server) _handleHeaderBundle(pp *Peer, msg *MsgDeSoHeaderBundle) { printHeight := pp.StartingBlockHeight() - if srv.blockchain.headerTip().Height > printHeight { - printHeight = srv.blockchain.headerTip().Height + if uint64(srv.blockchain.headerTip().Height) > printHeight { + printHeight = uint64(srv.blockchain.headerTip().Height) } glog.Infof(CLog(Yellow, fmt.Sprintf("Received header bundle with %v headers "+ "in state %s from peer %v. Downloaded ( %v / %v ) total headers", @@ -1539,6 +1539,7 @@ func (srv *Server) _startSync() { // Find a peer with StartingHeight bigger than our best header tip. var bestPeer *Peer for _, peer := range srv.cmgr.GetAllPeers() { + if !peer.IsSyncCandidate() { glog.Infof("Peer is not sync candidate: %v (isOutbound: %v)", peer, peer.isOutbound) continue @@ -1546,7 +1547,7 @@ func (srv *Server) _startSync() { // Choose the peer with the best height out of everyone who's a // valid sync candidate. - if peer.StartingBlockHeight() < bestHeight { + if peer.StartingBlockHeight() < uint64(bestHeight) { continue } @@ -1602,6 +1603,7 @@ func (srv *Server) HandleAcceptedPeer(rn *RemoteNode) { } pp := rn.GetPeer() pp.SetServiceFlag(rn.GetServiceFlag()) + pp.SetLatestBlockHeight(rn.GetLatestBlockHeight()) isSyncCandidate := pp.IsSyncCandidate() isSyncing := srv.blockchain.isSyncing() @@ -2199,7 +2201,7 @@ func (srv *Server) _handleAddrMessage(origin *Peer, desoMsg DeSoMessage) { var ok bool if msg, ok = desoMsg.(*MsgDeSoAddr); !ok { glog.Errorf("Server._handleAddrMessage: Problem decoding MsgDeSoAddr: %v", spew.Sdump(desoMsg)) - srv.connectionController.rnManager.DisconnectById(id) + srv.networkManager.rnManager.DisconnectById(id) return } @@ -2215,7 +2217,7 @@ func (srv *Server) _handleAddrMessage(origin *Peer, desoMsg DeSoMessage) { "Peer id=%v for sending us an addr message with %d transactions, which exceeds "+ "the max allowed %d", origin.ID, len(msg.AddrList), MaxAddrsPerAddrMsg)) - srv.connectionController.rnManager.DisconnectById(id) + srv.networkManager.rnManager.DisconnectById(id) return } @@ -2266,7 +2268,7 @@ func (srv *Server) _handleGetAddrMessage(origin *Peer, desoMsg DeSoMessage) { if _, ok := desoMsg.(*MsgDeSoGetAddr); !ok { glog.Errorf("Server._handleAddrMessage: Problem decoding "+ "MsgDeSoAddr: %v", spew.Sdump(desoMsg)) - srv.connectionController.rnManager.DisconnectById(id) + srv.networkManager.rnManager.DisconnectById(id) return } @@ -2292,10 +2294,10 @@ func (srv *Server) _handleGetAddrMessage(origin *Peer, desoMsg DeSoMessage) { } res.AddrList = append(res.AddrList, singleAddr) } - rn := srv.connectionController.rnManager.GetRemoteNodeById(id) - if err := srv.connectionController.rnManager.SendMessage(rn, res); err != nil { + rn := srv.networkManager.rnManager.GetRemoteNodeById(id) + if err := srv.networkManager.rnManager.SendMessage(rn, res); err != nil { glog.Errorf("Server._handleGetAddrMessage: Problem sending addr message to peer %v: %v", origin, err) - srv.connectionController.rnManager.DisconnectById(id) + srv.networkManager.rnManager.DisconnectById(id) return } } @@ -2305,9 +2307,9 @@ func (srv *Server) _handleControlMessages(serverMessage *ServerMessage) (_should // Control messages used internally to signal to the server. case *MsgDeSoDisconnectedPeer: srv._handleDonePeer(serverMessage.Peer) - srv.connectionController._handleDonePeerMessage(serverMessage.Peer, serverMessage.Msg) + srv.networkManager._handleDonePeerMessage(serverMessage.Peer, serverMessage.Msg) case *MsgDeSoNewConnection: - srv.connectionController._handleNewConnectionMessage(serverMessage.Peer, serverMessage.Msg) + srv.networkManager._handleNewConnectionMessage(serverMessage.Peer, serverMessage.Msg) case *MsgDeSoQuit: return true } @@ -2346,9 +2348,9 @@ func (srv *Server) _handlePeerMessages(serverMessage *ServerMessage) { case *MsgDeSoInv: srv._handleInv(serverMessage.Peer, msg) case *MsgDeSoVersion: - srv.handshakeController.handleVersionMessage(serverMessage.Peer, serverMessage.Msg) + srv.handshakeManager.handleVersionMessage(serverMessage.Peer, serverMessage.Msg) case *MsgDeSoVerack: - srv.handshakeController.handleVerackMessage(serverMessage.Peer, serverMessage.Msg) + srv.handshakeManager.handleVerackMessage(serverMessage.Peer, serverMessage.Msg) } } @@ -2514,7 +2516,7 @@ func (srv *Server) _startAddressRelayer() { // For the first ten minutes after the connection controller starts, relay our address to all // peers. After the first ten minutes, do it once every 24 hours. glog.V(1).Infof("Server.startAddressRelayer: Relaying our own addr to peers") - remoteNodes := srv.connectionController.rnManager.GetAllRemoteNodes().GetAll() + remoteNodes := srv.networkManager.rnManager.GetAllRemoteNodes().GetAll() if numMinutesPassed < 10 || numMinutesPassed%(RebroadcastNodeAddrIntervalMinutes) == 0 { for _, rn := range remoteNodes { if !rn.IsHandshakeCompleted() { @@ -2596,7 +2598,7 @@ func (srv *Server) Stop() { srv.cmgr.Stop() glog.Infof(CLog(Yellow, "Server.Stop: Closed the ConnectionManger")) - srv.connectionController.Stop() + srv.networkManager.Stop() glog.Infof(CLog(Yellow, "Server.Stop: Closed the NetworkManager")) // Stop the miner if we have one running. @@ -2682,7 +2684,7 @@ func (srv *Server) Start() { go srv.miner.Start() } - srv.connectionController.Start() + srv.networkManager.Start() } // SyncPrefixProgress keeps track of sync progress on an individual prefix. It is used in