Skip to content

Commit

Permalink
fail to write error - wip..
Browse files Browse the repository at this point in the history
  • Loading branch information
rianhughes committed Oct 22, 2024
1 parent 04ec793 commit 2501c70
Showing 1 changed file with 25 additions and 4 deletions.
29 changes: 25 additions & 4 deletions migration/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"maps"
"runtime"
"sync"

"github.com/NethermindEth/juno/adapters/sn2core"
"github.com/NethermindEth/juno/blockchain"
Expand Down Expand Up @@ -436,7 +437,7 @@ func (m *changeTrieNodeEncoding) Migrate(_ context.Context, txn db.Transaction,
return nil, iterator.Close()
}

func processBlocks(txn db.Transaction, processBlock func(uint64) error) error {
func processBlocks(txn db.Transaction, processBlock func(uint64, *sync.Mutex) error) error {
numOfWorkers := runtime.GOMAXPROCS(0)
workerPool := pool.New().WithErrors().WithMaxGoroutines(numOfWorkers)

Expand All @@ -454,41 +455,61 @@ func processBlocks(txn db.Transaction, processBlock func(uint64) error) error {
}
close(blockNumbers)
}()
var txnLock sync.Mutex
for range numOfWorkers {
workerPool.Go(func() error {
for bNumber := range blockNumbers {
if err := processBlock(bNumber); err != nil {
if err := processBlock(bNumber, &txnLock); err != nil {
return err
}
}
return nil
})
}
return workerPool.Wait()
// This causes the transaction to be discarded
// err = workerPool.Wait()
// if err != nil {
// return err
// }
// txnLock.Lock()
// err = txn.Commit()
// txnLock.Unlock()
// return err
}

// calculateBlockCommitments calculates the txn and event commitments for each block and stores them separately
func calculateBlockCommitments(txn db.Transaction, network *utils.Network) error {
processBlockFunc := func(blockNumber uint64) error {
processBlockFunc := func(blockNumber uint64, txnLock *sync.Mutex) error {
txnLock.Lock()
block, err := blockchain.BlockByNumber(txn, blockNumber)
txnLock.Unlock()
if err != nil {
return err
}
txnLock.Lock()
commitments, err := core.VerifyBlockHash(block, network, nil)
txnLock.Unlock()
if err != nil {
return err
}
txnLock.Lock()
defer txnLock.Unlock()
return blockchain.StoreBlockCommitments(txn, block.Number, commitments)
}
return processBlocks(txn, processBlockFunc)
}

func calculateL1MsgHashes(txn db.Transaction, n *utils.Network) error {
processBlockFunc := func(blockNumber uint64) error {
processBlockFunc := func(blockNumber uint64, txnLock *sync.Mutex) error {
txnLock.Lock()
txns, err := blockchain.TransactionsByBlockNumber(txn, blockNumber)
txnLock.Unlock()
if err != nil {
return err
}
txnLock.Lock()
defer txnLock.Unlock()
return blockchain.StoreL1HandlerMsgHashes(txn, txns)
}
return processBlocks(txn, processBlockFunc)
Expand Down

0 comments on commit 2501c70

Please sign in to comment.