Skip to content

Commit

Permalink
free operation queue semaphore in process snapshot chunk and when hit…
Browse files Browse the repository at this point in the history
…ting error in handleSnapshot that prevents us from calling process snapshot chunk (#1420)

* free operation queue semaphore in process snapshot chunk and when hitting error in handleSnapshot that prevents us from calling process snapshot chunk

* set log level to 1 for frequently seen log lines

* free operation queue semaphore if chunk is empty

* make StateSemaphoreLock an RW lock b/c GetStatus doesn't need a write lock

* don't hold chain lock in SetSnapshotChunk. Only get initial checksum bytes if checksum isn't disabled

* don't unlock mainDbMutex in SetSnapshotChunk

* acquire chain lock before calling LatestLocator

* Diamondhands/review free operation queue semaphore in process chunk (#1423)

* make GetUtxoViewAndUtxoOpsAtBlockHash public (#1422)

* review

* remove fixmes and add comments

---------

Co-authored-by: Lazy Nina <[email protected]>
Co-authored-by: Lazy Nina <[email protected]>

---------

Co-authored-by: diamondhands <[email protected]>
Co-authored-by: diamondhands0 <[email protected]>
  • Loading branch information
3 people authored Oct 24, 2024
1 parent 5dfcd92 commit 3cc2234
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 28 deletions.
2 changes: 1 addition & 1 deletion lib/block_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ func (desoBlockProducer *DeSoBlockProducer) _getBlockTemplate(publicKey []byte)
return nil, nil, nil, errors.Wrapf(err, "DeSoBlockProducer._getBlockTemplate: Problem computing next difficulty: ")
}

glog.Infof("Produced block with %v txns with approx %v total txns in mempool",
glog.V(1).Infof("Produced block with %v txns with approx %v total txns in mempool",
len(blockRet.Txns), len(desoBlockProducer.mempool.readOnlyUniversalTransactionList))
return blockRet, diffTarget, lastNode, nil
}
Expand Down
12 changes: 11 additions & 1 deletion lib/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1255,6 +1255,11 @@ func (bc *Blockchain) LatestLocator(tip *BlockNode) []*BlockHash {
}

func (bc *Blockchain) HeaderLocatorWithNodeHash(blockHash *BlockHash) ([]*BlockHash, error) {
// We can acquire the ChainLock because the only place this is called currently is from
// _handleHeaderBundle, which doesn't have the lock.
// If we do not acquire the lock, we may hit a concurrent map read write error which causes panic.
bc.ChainLock.RLock()
defer bc.ChainLock.RUnlock()
node, exists := bc.blockIndexByHash.Get(*blockHash)
if !exists {
return nil, fmt.Errorf("Blockchain.HeaderLocatorWithNodeHash: Node for hash %v is not in our blockIndexByHash", blockHash)
Expand All @@ -1266,6 +1271,11 @@ func (bc *Blockchain) HeaderLocatorWithNodeHash(blockHash *BlockHash) ([]*BlockH
// LatestHeaderLocator calls LatestLocator in order to fetch a locator
// for the best header chain.
func (bc *Blockchain) LatestHeaderLocator() []*BlockHash {
// We can acquire the ChainLock here because all calls to this function happen in peer.go
// and server.go, which don't hold the lock.
// If we do not acquire the lock, we may hit a concurrent map read write error which causes panic.
bc.ChainLock.RLock()
defer bc.ChainLock.RUnlock()
headerTip := bc.headerTip()

return bc.LatestLocator(headerTip)
Expand Down Expand Up @@ -2574,7 +2584,7 @@ func (bc *Blockchain) processBlockPoW(desoBlock *MsgDeSoBlock, verifySignatures
if *bc.blockView.TipHash != *currentTip.Hash {
//return false, false, fmt.Errorf("ProcessBlock: Tip hash for utxo view (%v) is "+
// "not the current tip hash (%v)", utxoView.TipHash, currentTip.Hash)
glog.Infof("ProcessBlock: Tip hash for utxo view (%v) is "+
glog.V(1).Infof("ProcessBlock: Tip hash for utxo view (%v) is "+
"not the current tip hash (%v)", bc.blockView.TipHash, currentTip.Hash)
}

Expand Down
14 changes: 7 additions & 7 deletions lib/legacy_mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -792,7 +792,7 @@ func (mp *DeSoMempool) OpenTempDBAndDumpTxns() error {
allTxns := mp.readOnlyUniversalTransactionList

tempMempoolDBDir := filepath.Join(mp.mempoolDir, "temp_mempool_dump")
glog.Infof("OpenTempDBAndDumpTxns: Opening new temp db %v", tempMempoolDBDir)
glog.V(1).Infof("OpenTempDBAndDumpTxns: Opening new temp db %v", tempMempoolDBDir)
// Make the top-level folder if it doesn't exist.
err := MakeDirIfNonExistent(mp.mempoolDir)
if err != nil {
Expand All @@ -817,7 +817,7 @@ func (mp *DeSoMempool) OpenTempDBAndDumpTxns() error {
// If we're at a multiple of 1k or we're at the end of the list
// then dump the txns to disk
if len(txnsToDump)%1000 == 0 || ii == len(allTxns)-1 {
glog.Infof("OpenTempDBAndDumpTxns: Dumping txns %v to %v", ii-len(txnsToDump)+1, ii)
glog.V(1).Infof("OpenTempDBAndDumpTxns: Dumping txns %v to %v", ii-len(txnsToDump)+1, ii)
err := tempMempoolDB.Update(func(txn *badger.Txn) error {
return FlushMempoolToDbWithTxn(txn, nil, blockHeight, txnsToDump, mp.bc.eventManager)
})
Expand All @@ -828,7 +828,7 @@ func (mp *DeSoMempool) OpenTempDBAndDumpTxns() error {
}
}
endTime := time.Now()
glog.Infof("OpenTempDBAndDumpTxns: Full txn dump of %v txns completed "+
glog.V(1).Infof("OpenTempDBAndDumpTxns: Full txn dump of %v txns completed "+
"in %v seconds. Safe to reboot node", len(allTxns), endTime.Sub(startTime).Seconds())
return nil
}
Expand Down Expand Up @@ -2606,7 +2606,7 @@ func (mp *DeSoMempool) InefficientRemoveTransaction(tx *MsgDeSoTxn) {
}

func (mp *DeSoMempool) StartReadOnlyUtxoViewRegenerator() {
glog.Info("Calling StartReadOnlyUtxoViewRegenerator...")
glog.V(1).Info("Calling StartReadOnlyUtxoViewRegenerator...")

go func() {
var oldSeqNum int64
Expand Down Expand Up @@ -2697,7 +2697,7 @@ func (mp *DeSoMempool) StartMempoolDBDumper() {
for {
select {
case <-time.After(30 * time.Second):
glog.Info("StartMempoolDBDumper: Waking up! Dumping txns now...")
glog.V(1).Info("StartMempoolDBDumper: Waking up! Dumping txns now...")

// Dump the txns and time it.
mp.DumpTxnsToDB()
Expand Down Expand Up @@ -2734,7 +2734,7 @@ func (mp *DeSoMempool) LoadTxnsFromDB() {
// If we make it this far, we found a mempool dump to load. Woohoo!
tempMempoolDBOpts := mp.getBadgerOptions(savedTxnsDir)
tempMempoolDBOpts.ValueDir = savedTxnsDir
glog.Infof("LoadTxnsFrom: Opening new temp db %v", savedTxnsDir)
glog.V(1).Infof("LoadTxnsFrom: Opening new temp db %v", savedTxnsDir)
tempMempoolDB, err := badger.Open(tempMempoolDBOpts)
if err != nil {
glog.Infof("LoadTxnsFrom: Could not open temp db to dump mempool: %v", err)
Expand All @@ -2759,7 +2759,7 @@ func (mp *DeSoMempool) LoadTxnsFromDB() {
}
}
endTime := time.Now()
glog.Infof("LoadTxnsFromDB: Loaded %v txns in %v seconds", len(dbMempoolTxnsOrderedByTime), endTime.Sub(startTime).Seconds())
glog.V(1).Infof("LoadTxnsFromDB: Loaded %v txns in %v seconds", len(dbMempoolTxnsOrderedByTime), endTime.Sub(startTime).Seconds())
}

func (mp *DeSoMempool) Stop() {
Expand Down
2 changes: 1 addition & 1 deletion lib/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ func (pp *Peer) HandleInv(msg *MsgDeSoInv) {
// Ignore invs while we're still syncing and before we've requested
// all mempool transactions from one of our peers to bootstrap.
if pp.srv.blockchain.isSyncing() {
glog.Infof("Server._handleInv: Ignoring INV while syncing from Peer %v", pp)
glog.V(1).Infof("Server._handleInv: Ignoring INV while syncing from Peer %v", pp)
return
}

Expand Down
16 changes: 14 additions & 2 deletions lib/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1440,8 +1440,16 @@ func (srv *Server) _handleSnapshot(pp *Peer, msg *MsgDeSoSnapshotData) {
"<%v>, Last entry: <%v>), (number of entries: %v), metadata (%v), and isEmpty (%v), from Peer %v",
msg.SnapshotChunk[0].Key, msg.SnapshotChunk[len(msg.SnapshotChunk)-1].Key, len(msg.SnapshotChunk),
msg.SnapshotMetadata, msg.SnapshotChunk[0].IsEmpty(), pp)))
// Free up a slot in the operationQueueSemaphore, now that a chunk has been processed.
srv.snapshot.FreeOperationQueueSemaphore()

// This is ugly but the alternative is to meticulously call FreeOperationQueueSemaphore every time
// we return with an error, which is worse.
chunkProcessed := false
freeSempahoreIfChunkNotProcessed := func() {
if !chunkProcessed {
srv.snapshot.FreeOperationQueueSemaphore()
}
}
defer freeSempahoreIfChunkNotProcessed()

// There is a possibility that during hypersync the network entered a new snapshot epoch. We handle this case by
// restarting the node and starting hypersync from scratch.
Expand All @@ -1459,6 +1467,7 @@ func (srv *Server) _handleSnapshot(pp *Peer, msg *MsgDeSoSnapshotData) {
} else {
glog.Errorf(CLog(Red, "srv._handleSnapshot: Trying to restart the node but nodeMessageChannel is empty, "+
"this should never happen."))
return
}
}

Expand Down Expand Up @@ -1491,6 +1500,7 @@ func (srv *Server) _handleSnapshot(pp *Peer, msg *MsgDeSoSnapshotData) {
return
}

// TODO: disable checksum support?
// If we haven't yet set the epoch checksum bytes in the hyper sync progress, we'll do it now.
// If we did set the checksum bytes, we will verify that they match the one that peer has sent us.
prevChecksumBytes := make([]byte, len(srv.HyperSyncProgress.SnapshotMetadata.CurrentEpochChecksumBytes))
Expand Down Expand Up @@ -1570,6 +1580,7 @@ func (srv *Server) _handleSnapshot(pp *Peer, msg *MsgDeSoSnapshotData) {

// Process the DBEntries from the msg and add them to the db.
srv.timer.Start("Server._handleSnapshot Process Snapshot")
chunkProcessed = true
srv.snapshot.ProcessSnapshotChunk(srv.blockchain.db, &srv.blockchain.ChainLock, dbChunk,
srv.HyperSyncProgress.SnapshotMetadata.SnapshotBlockHeight)
srv.timer.End("Server._handleSnapshot Process Snapshot")
Expand Down Expand Up @@ -1679,6 +1690,7 @@ func (srv *Server) _handleSnapshot(pp *Peer, msg *MsgDeSoSnapshotData) {
//
// We split the db update into batches of 10,000 block nodes to avoid a single transaction
// being too large and possibly causing an error in badger.
glog.V(0).Infof("Server._handleSnapshot: Updating snapshot block nodes in the database")
var blockNodeBatch []*BlockNode
for ii := uint64(1); ii <= srv.HyperSyncProgress.SnapshotMetadata.SnapshotBlockHeight; ii++ {
currentNode := srv.blockchain.bestHeaderChain[ii]
Expand Down
39 changes: 23 additions & 16 deletions lib/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -1211,15 +1211,17 @@ func (snap *Snapshot) SetSnapshotChunk(mainDb *badger.DB, mainDbMutex *deadlock.
dbFlushId := uuid.New()

snap.timer.Start("SetSnapshotChunk.Total")
// If there's a problem retrieving the snapshot checksum, we'll reschedule this snapshot chunk set.
initialChecksumBytes, err := snap.Checksum.ToBytes()
if err != nil {
glog.Errorf("Snapshot.SetSnapshotChunk: Problem retrieving checksum bytes, error: (%v)", err)
snap.ProcessSnapshotChunk(mainDb, mainDbMutex, chunk, blockHeight)
return err
var initialChecksumBytes []byte
if !snap.disableChecksum {
// If there's a problem retrieving the snapshot checksum, we'll reschedule this snapshot chunk set.
initialChecksumBytes, err = snap.Checksum.ToBytes()
if err != nil {
glog.Errorf("Snapshot.SetSnapshotChunk: Problem retrieving checksum bytes, error: (%v)", err)
snap.ProcessSnapshotChunk(mainDb, mainDbMutex, chunk, blockHeight)
return err
}
}

mainDbMutex.Lock()
// We use badgerDb write batches as it's the fastest way to write multiple records to the db.
wb := mainDb.NewWriteBatch()
defer wb.Cancel()
Expand Down Expand Up @@ -1287,7 +1289,7 @@ func (snap *Snapshot) SetSnapshotChunk(mainDb *badger.DB, mainDbMutex *deadlock.
//snap.timer.End("SetSnapshotChunk.Checksum")
}()
syncGroup.Wait()
mainDbMutex.Unlock()
//mainDbMutex.Unlock()

// If there's a problem setting the snapshot checksum, we'll reschedule this snapshot chunk set.
if err != nil {
Expand All @@ -1299,11 +1301,13 @@ func (snap *Snapshot) SetSnapshotChunk(mainDb *badger.DB, mainDbMutex *deadlock.
}
glog.Infof("Snapshot.SetSnapshotChunk: Problem setting the snapshot chunk, error (%v)", err)

// We reset the snapshot checksum so its initial value, so we won't overlap with processing the next snapshot chunk.
// If we've errored during a writeBatch set we'll redo this chunk in next SetSnapshotChunk so we're fine with overlaps.
if err := snap.Checksum.FromBytes(initialChecksumBytes); err != nil {
panic(fmt.Errorf("Snapshot.SetSnapshotChunk: Problem resetting checksum. This should never happen, "+
"error: (%v)", err))
if !snap.disableChecksum {
// We reset the snapshot checksum so its initial value, so we won't overlap with processing the next snapshot chunk.
// If we've errored during a writeBatch set we'll redo this chunk in next SetSnapshotChunk so we're fine with overlaps.
if err = snap.Checksum.FromBytes(initialChecksumBytes); err != nil {
panic(fmt.Errorf("Snapshot.SetSnapshotChunk: Problem resetting checksum. This should never happen, "+
"error: (%v)", err))
}
}
snap.ProcessSnapshotChunk(mainDb, mainDbMutex, chunk, blockHeight)
return err
Expand All @@ -1315,6 +1319,9 @@ func (snap *Snapshot) SetSnapshotChunk(mainDb *badger.DB, mainDbMutex *deadlock.
Succeeded: true,
})
}
// If we get here, then we've successfully processed the snapshot chunk
// and can free one slot in the operation queue semaphore.
snap.FreeOperationQueueSemaphore()

snap.timer.End("SetSnapshotChunk.Total")

Expand Down Expand Up @@ -1914,7 +1921,7 @@ type SnapshotOperationChannel struct {
// from the MainDBSemaphore and the AncestralDBSemaphore which manage concurrency
// around flushes only.
StateSemaphore int32
StateSemaphoreLock sync.Mutex
StateSemaphoreLock sync.RWMutex

mainDb *badger.DB
snapshotDbMutex *sync.Mutex
Expand Down Expand Up @@ -2028,8 +2035,8 @@ func (opChan *SnapshotOperationChannel) FinishOperation() {
}

func (opChan *SnapshotOperationChannel) GetStatus() int32 {
opChan.StateSemaphoreLock.Lock()
defer opChan.StateSemaphoreLock.Unlock()
opChan.StateSemaphoreLock.RLock()
defer opChan.StateSemaphoreLock.RUnlock()

return opChan.StateSemaphore
}
Expand Down

0 comments on commit 3cc2234

Please sign in to comment.