Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Z/update state change entry encoder #1195

Draft
wants to merge 26 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
9340a83
Updates to encode and decode for state change entry
superzordon Mar 1, 2024
8711334
Reduce sleep time
superzordon Mar 1, 2024
2ba3264
Update state syncer mempool reversion procedure, add temporary logging
superzordon Mar 5, 2024
83b4596
Update state syncer
superzordon Mar 7, 2024
7d4822d
Update reversion procedure
superzordon Mar 8, 2024
f7b9b04
Update decoder to handle block height error
superzordon Mar 8, 2024
13147ad
Add logging
superzordon Mar 13, 2024
e461586
Updates to syncing logic
superzordon Mar 13, 2024
54b3059
Update state change unflushed bytes logic
superzordon Mar 17, 2024
ef37161
Add caching to mempool, update logging
superzordon Mar 19, 2024
381d4ae
Update revert logic
superzordon Mar 19, 2024
0c77aae
Updates to transaction caching
superzordon Mar 20, 2024
67e8113
Add back erroring to mempool
superzordon Mar 21, 2024
eee0bfe
Update block height issue
superzordon Mar 27, 2024
d5a9d1c
Add timing logs
superzordon Apr 2, 2024
b4ea9b1
Add caching to mempool routine
superzordon Apr 3, 2024
29ccc73
Update caching routine
superzordon Apr 3, 2024
0bafa59
Add logging
superzordon Apr 3, 2024
a0cb66c
Update error handling
superzordon Apr 3, 2024
63cd51c
Remove mempool transaction syncing to debug
superzordon Apr 3, 2024
4b2dfad
Update mempool caching routine
superzordon Apr 3, 2024
c0eb640
Uncomment transaction connecting
superzordon Apr 4, 2024
0844bac
Add logging
superzordon Apr 4, 2024
ed3d4aa
Add additional messaging index to core state
superzordon Apr 11, 2024
1fc31fe
Go fmt
superzordon Apr 11, 2024
6cfaa1d
Change state change entry encoder
superzordon May 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 17 additions & 9 deletions lib/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2185,17 +2185,15 @@ func (bc *Blockchain) ProcessBlock(desoBlock *MsgDeSoBlock, verifySignatures boo
}
bc.timer.End("Blockchain.ProcessBlock: Transactions Db snapshot & operations")
if innerErr = bc.blockView.FlushToDbWithTxn(txn, blockHeight); innerErr != nil {
// If we're in the middle of a sync, we should notify the event manager that we failed to sync the block.
if bc.eventManager != nil && !bc.eventManager.isMempoolManager {
bc.eventManager.stateSyncerFlushed(&StateSyncerFlushedEvent{
FlushId: uuid.Nil,
Succeeded: false,
})
}
return errors.Wrapf(innerErr, "ProcessBlock: Problem writing utxo view to db on simple add to tip")
}
// Immediately after the utxo view is flushed to badger, emit a state syncer flushed event, so that
// state syncer maintains a consistent view of the blockchain.
// Note: We ignore the mempool manager here, as that process handles state syncer flush events itself.
if bc.eventManager != nil && !bc.eventManager.isMempoolManager {
bc.eventManager.stateSyncerFlushed(&StateSyncerFlushedEvent{
FlushId: uuid.Nil,
Succeeded: innerErr == nil,
})
}

bc.timer.End("Blockchain.ProcessBlock: Transactions Db utxo flush")
bc.timer.Start("Blockchain.ProcessBlock: Transactions Db snapshot & operations")
Expand Down Expand Up @@ -2548,6 +2546,16 @@ func (bc *Blockchain) ProcessBlock(desoBlock *MsgDeSoBlock, verifySignatures boo
// Signal the server that we've accepted this block in some way.
if bc.eventManager != nil {
bc.eventManager.blockAccepted(&BlockEvent{Block: desoBlock})
// Immediately after the utxo view is flushed to badger, emit a state syncer flushed event, so that
// state syncer maintains a consistent view of the blockchain.
// Note: We ignore the mempool manager here, as that process handles state syncer flush events itself.
if !bc.eventManager.isMempoolManager {
fmt.Printf("Emitting state syncer flushed event for synced block\n")
bc.eventManager.stateSyncerFlushed(&StateSyncerFlushedEvent{
FlushId: uuid.Nil,
Succeeded: true,
})
}
}

bc.timer.Print("Blockchain.ProcessBlock: Initial")
Expand Down
4 changes: 3 additions & 1 deletion lib/db_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ type DBPrefixes struct {
// The Minor/Major distinction is used to deterministically map the two accessGroupIds of message's sender/recipient
// into a single pair based on the lexicographical ordering of the two accessGroupIds. This is done to ensure that
// both sides of the conversation have the same key for the same conversation, and we can store just a single message.
PrefixDmMessagesIndex []byte `prefix_id:"[75]" is_state:"true"`
PrefixDmMessagesIndex []byte `prefix_id:"[75]" is_state:"true" core_state:"true"`

// PrefixDmThreadIndex is modified by the NewMessage transaction and is used to store a DmThreadEntry
// for each existing dm thread. It answers the question: "Give me all the threads for a particular user."
Expand Down Expand Up @@ -989,6 +989,7 @@ func DBSetWithTxn(txn *badger.Txn, snap *Snapshot, key []byte, value []byte, eve
KeyBytes: key,
EncoderBytes: value,
AncestralRecordBytes: ancestralValue,
IsReverted: false,
},
FlushId: uuid.Nil,
IsMempoolTxn: eventManager.isMempoolManager,
Expand Down Expand Up @@ -1091,6 +1092,7 @@ func DBDeleteWithTxn(txn *badger.Txn, snap *Snapshot, key []byte, eventManager *
KeyBytes: key,
EncoderBytes: nil,
AncestralRecordBytes: ancestralValue,
IsReverted: false,
},
FlushId: uuid.Nil,
IsMempoolTxn: eventManager.isMempoolManager,
Expand Down
4 changes: 3 additions & 1 deletion lib/event_manager.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package lib

import "github.com/google/uuid"
import (
"github.com/google/uuid"
)

type TransactionEventFunc func(event *TransactionEvent)
type StateSyncerOperationEventFunc func(event *StateSyncerOperationEvent)
Expand Down
3 changes: 3 additions & 0 deletions lib/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ func NewServer(

// Only initialize state change syncer if the directories are defined.
var stateChangeSyncer *StateChangeSyncer
fmt.Printf("State Change Dir: %v\n", _stateChangeDir)
if _stateChangeDir != "" {
// Create the state change syncer to handle syncing state changes to disk, and assign some of its methods
// to the event manager.
Expand Down Expand Up @@ -469,12 +470,14 @@ func NewServer(
srv.stateChangeSyncer.BlockHeight = uint64(_chain.headerTip().Height)
}

fmt.Printf("Before mempool\n")
// Create a mempool to store transactions until they're ready to be mined into
// blocks.
_mempool := NewDeSoMempool(_chain, _rateLimitFeerateNanosPerKB,
_minFeeRateNanosPerKB, _blockCypherAPIKey, _runReadOnlyUtxoViewUpdater, _dataDir,
_mempoolDumpDir, false)

fmt.Printf("After mempool\n")
// Initialize state syncer mempool job, if needed.
if srv.stateChangeSyncer != nil {
srv.stateChangeSyncer.StartMempoolSyncRoutine(srv)
Expand Down
1 change: 1 addition & 0 deletions lib/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -1265,6 +1265,7 @@ func (snap *Snapshot) SetSnapshotChunk(mainDb *badger.DB, mainDbMutex *deadlock.
OperationType: DbOperationTypeInsert,
KeyBytes: dbEntry.Key,
EncoderBytes: dbEntry.Value,
IsReverted: false,
},
FlushId: dbFlushId,
})
Expand Down
Loading
Loading