From 4c03984dab599150d6fbddbc7f512fd8f61cbc58 Mon Sep 17 00:00:00 2001 From: rian Date: Fri, 11 Oct 2024 14:11:49 +0300 Subject: [PATCH] fail to write error - wip.. --- migration/migration.go | 29 +++++++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/migration/migration.go b/migration/migration.go index 2576c100a0..e6f5a0f4e0 100644 --- a/migration/migration.go +++ b/migration/migration.go @@ -9,6 +9,7 @@ import ( "fmt" "maps" "runtime" + "sync" "github.com/NethermindEth/juno/adapters/sn2core" "github.com/NethermindEth/juno/blockchain" @@ -436,7 +437,7 @@ func (m *changeTrieNodeEncoding) Migrate(_ context.Context, txn db.Transaction, return nil, iterator.Close() } -func processBlocks(txn db.Transaction, processBlock func(uint64) error) error { +func processBlocks(txn db.Transaction, processBlock func(uint64, *sync.Mutex) error) error { numOfWorkers := runtime.GOMAXPROCS(0) workerPool := pool.New().WithErrors().WithMaxGoroutines(numOfWorkers) @@ -454,10 +455,11 @@ func processBlocks(txn db.Transaction, processBlock func(uint64) error) error { } close(blockNumbers) }() + var txnLock sync.Mutex for range numOfWorkers { workerPool.Go(func() error { for bNumber := range blockNumbers { - if err := processBlock(bNumber); err != nil { + if err := processBlock(bNumber, &txnLock); err != nil { return err } } @@ -465,30 +467,49 @@ func processBlocks(txn db.Transaction, processBlock func(uint64) error) error { }) } return workerPool.Wait() + // This causes the transaction to be discarded + // err = workerPool.Wait() + // if err != nil { + // return err + // } + // txnLock.Lock() + // err = txn.Commit() + // txnLock.Unlock() + // return err } // calculateBlockCommitments calculates the txn and event commitments for each block and stores them separately func calculateBlockCommitments(txn db.Transaction, network *utils.Network) error { - processBlockFunc := func(blockNumber uint64) error { + processBlockFunc := func(blockNumber uint64, txnLock *sync.Mutex) error { + txnLock.Lock() block, err := blockchain.BlockByNumber(txn, blockNumber) + txnLock.Unlock() if err != nil { return err } + txnLock.Lock() commitments, err := core.VerifyBlockHash(block, network, nil) + txnLock.Unlock() if err != nil { return err } + txnLock.Lock() + defer txnLock.Unlock() return blockchain.StoreBlockCommitments(txn, block.Number, commitments) } return processBlocks(txn, processBlockFunc) } func calculateL1MsgHashes(txn db.Transaction, n *utils.Network) error { - processBlockFunc := func(blockNumber uint64) error { + processBlockFunc := func(blockNumber uint64, txnLock *sync.Mutex) error { + txnLock.Lock() txns, err := blockchain.TransactionsByBlockNumber(txn, blockNumber) + txnLock.Unlock() if err != nil { return err } + txnLock.Lock() + defer txnLock.Unlock() return blockchain.StoreL1HandlerMsgHashes(txn, txns) } return processBlocks(txn, processBlockFunc)