Skip to content

Commit

Permalink
Revert "Another split"
Browse files Browse the repository at this point in the history
This reverts commit eaeec58.
  • Loading branch information
AeonSw4n committed Jan 29, 2024
1 parent 9b56979 commit de6cf0d
Show file tree
Hide file tree
Showing 9 changed files with 403 additions and 397 deletions.
6 changes: 2 additions & 4 deletions integration_testing/connection_bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,12 @@ func (bridge *ConnectionBridge) createInboundConnection(node *cmd.Node) *lib.Pee

// This channel is redundant in our setting.
messagesFromPeer := make(chan *lib.ServerMessage, 100)
newPeerChan := make(chan *lib.Peer, 100)
donePeerChan := make(chan *lib.Peer, 100)
// Because it is an inbound Peer of the node, it is simultaneously a "fake" outbound Peer of the bridge.
// Hence, we will mark the _isOutbound parameter as "true" in NewPeer.
peer := lib.NewPeer(uint64(lib.RandInt64(math.MaxInt64)), conn, true,
netAddress, true, 10000, 0, &lib.DeSoMainnetParams,
messagesFromPeer, nil, nil, lib.NodeSyncTypeAny, newPeerChan, donePeerChan)
messagesFromPeer, nil, nil, lib.NodeSyncTypeAny, donePeerChan)
return peer
}

Expand All @@ -144,11 +143,10 @@ func (bridge *ConnectionBridge) createOutboundConnection(node *cmd.Node, otherNo
addrMgr := addrmgr.New("", net.LookupIP)
na, err := lib.IPToNetAddr(conn.RemoteAddr().String(), addrMgr, otherNode.Params)
messagesFromPeer := make(chan *lib.ServerMessage, 100)
newPeerChan := make(chan *lib.Peer, 100)
donePeerChan := make(chan *lib.Peer, 100)
peer := lib.NewPeer(uint64(lib.RandInt64(math.MaxInt64)), conn,
false, na, false, 10000, 0, bridge.nodeB.Params,
messagesFromPeer, nil, nil, lib.NodeSyncTypeAny, newPeerChan, donePeerChan)
messagesFromPeer, nil, nil, lib.NodeSyncTypeAny, donePeerChan)
bridge.newPeerChan <- peer
//}
}(ll)
Expand Down
68 changes: 35 additions & 33 deletions lib/block_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"math"
"sync"
"sync/atomic"
"time"

"github.com/btcsuite/btcd/wire"
Expand All @@ -19,6 +18,10 @@ import (
)

type DeSoBlockProducer struct {
startGroup sync.WaitGroup
exitChan chan struct{}
exitGroup sync.WaitGroup

// The minimum amount of time we wait before trying to produce a new block
// template. If this value is set low enough then we will produce a block template
// continuously.
Expand Down Expand Up @@ -47,11 +50,6 @@ type DeSoBlockProducer struct {

// producerWaitGroup allows us to wait until the producer has properly closed.
producerWaitGroup sync.WaitGroup
// exit is used to signal that DeSoBlockProducer routines should be terminated.
exit int32
// isAsleep is a helper variable for quitting that indicates whether the DeSoBlockProducer is asleep. While producing
// blocks, we sleep for a few seconds. Instead of waiting for the sleep to finish, we use this variable to quit immediately.
isAsleep int32
}

type BlockTemplateStats struct {
Expand Down Expand Up @@ -102,6 +100,7 @@ func NewDeSoBlockProducer(
chain: chain,
params: params,
postgres: postgres,
exitChan: make(chan struct{}),
}, nil
}

Expand Down Expand Up @@ -353,10 +352,8 @@ func (desoBlockProducer *DeSoBlockProducer) _getBlockTemplate(publicKey []byte)
}

func (desoBlockProducer *DeSoBlockProducer) Stop() {
atomic.AddInt32(&desoBlockProducer.exit, 1)
if atomic.LoadInt32(&desoBlockProducer.isAsleep) == 0 {
desoBlockProducer.producerWaitGroup.Wait()
}
close(desoBlockProducer.exitChan)
desoBlockProducer.exitGroup.Wait()
}

func (desoBlockProducer *DeSoBlockProducer) GetRecentBlock(blockHash *BlockHash) *MsgDeSoBlock {
Expand Down Expand Up @@ -591,35 +588,40 @@ func (desoBlockProducer *DeSoBlockProducer) Start() {
return
}

desoBlockProducer.startGroup.Add(1)
desoBlockProducer.exitGroup.Add(1)
go desoBlockProducer.start()
desoBlockProducer.startGroup.Wait()
}

func (desoBlockProducer *DeSoBlockProducer) start() {
// Set the time to a nil value so we run on the first iteration of the loop.
var lastBlockUpdate time.Time
desoBlockProducer.producerWaitGroup.Add(1)

sleepDuration := 0 * time.Second
for {
if atomic.LoadInt32(&desoBlockProducer.exit) >= 0 {
desoBlockProducer.producerWaitGroup.Done()
select {
case <-desoBlockProducer.exitChan:
desoBlockProducer.exitGroup.Done()
return
}

secondsLeft := float64(desoBlockProducer.minBlockUpdateIntervalSeconds) - time.Since(lastBlockUpdate).Seconds()
if !lastBlockUpdate.IsZero() && secondsLeft > 0 {
glog.V(1).Infof("Sleeping for %v seconds before producing next block template...", secondsLeft)
atomic.AddInt32(&desoBlockProducer.isAsleep, 1)
time.Sleep(time.Duration(math.Ceil(secondsLeft)) * time.Second)
atomic.AddInt32(&desoBlockProducer.isAsleep, -1)
continue
}
case <-time.After(sleepDuration):
secondsLeft := float64(desoBlockProducer.minBlockUpdateIntervalSeconds) - time.Since(lastBlockUpdate).Seconds()
if !lastBlockUpdate.IsZero() && secondsLeft > 0 {
glog.V(1).Infof("Sleeping for %v seconds before producing next block template...", secondsLeft)
sleepDuration = time.Duration(math.Ceil(secondsLeft)) * time.Second
continue
}

// Update the time so start the clock for the next iteration.
lastBlockUpdate = time.Now()
// Update the time so start the clock for the next iteration.
lastBlockUpdate = time.Now()

glog.V(1).Infof("Producing block template...")
err := desoBlockProducer.UpdateLatestBlockTemplate()
if err != nil {
// If we hit an error, log it and sleep for a second. This could happen due to us
// being in the middle of processing a block or something.
glog.Errorf("Error producing block template: %v", err)
time.Sleep(time.Second)
glog.V(1).Infof("Producing block template...")
err := desoBlockProducer.UpdateLatestBlockTemplate()
if err != nil {
// If we hit an error, log it and sleep for a second. This could happen due to us
// being in the middle of processing a block or something.
glog.Errorf("Error producing block template: %v", err)
sleepDuration = time.Second
}
}
}
}
18 changes: 1 addition & 17 deletions lib/connection_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ func (cc *ConnectionController) Start() {
go cc.startRemoteNodeCleanup()

cc.startGroup.Wait()
cc.exitGroup.Add(4)
}

func (cc *ConnectionController) Stop() {
if !DisableNetworkManagerRoutines {
cc.exitGroup.Add(4)
close(cc.exitChan)
cc.exitGroup.Wait()
}
Expand Down Expand Up @@ -201,22 +201,6 @@ func (cc *ConnectionController) _handleDonePeerMessage(origin *Peer, desoMsg DeS
}
}

func (cc *ConnectionController) _handleAddrMessage(origin *Peer, desoMsg DeSoMessage) {
if desoMsg.GetMsgType() != MsgTypeAddr {
return
}

// TODO
}

func (cc *ConnectionController) _handleGetAddrMessage(origin *Peer, desoMsg DeSoMessage) {
if desoMsg.GetMsgType() != MsgTypeGetAddr {
return
}

// TODO
}

// _handleNewConnectionMessage is called when a new outbound or inbound connection is established. It is responsible
// for creating a RemoteNode from the connection and initiating the handshake. The incoming DeSoMessage is a control message.
func (cc *ConnectionController) _handleNewConnectionMessage(origin *Peer, desoMsg DeSoMessage) {
Expand Down
Loading

0 comments on commit de6cf0d

Please sign in to comment.