From c8d2698a9b9ff2bea924925198d48bd92a46f2a9 Mon Sep 17 00:00:00 2001 From: Mateusz Morusiewicz <11313015+Ruteri@users.noreply.github.com> Date: Wed, 30 Mar 2022 23:39:29 +0200 Subject: [PATCH] Discard reverting megabundle blocks and head change interrupted blocks (#123) * Discard reverting megabundle blocks and head change interrupted blocks * Discard all blocks with incomplete bundles * Run reverting megabundles regression test separately from bundle tests --- .github/workflows/go.yml | 23 +++++++++++++++++- miner/worker.go | 52 ++++++++++++++++++++++++---------------- 2 files changed, 54 insertions(+), 21 deletions(-) diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 47c3013398fe..d198d152cd40 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -56,10 +56,31 @@ jobs: path: e2e - run: cd e2e && yarn install - - run: | + - name: Run single node e2e + run: | cd e2e GETH=`pwd`/../build/bin/geth ./run.sh & sleep 15 yarn run demo-simple + yarn run e2e-reverting-bundles yarn run demo-contract + pkill -9 geth || true + - name: Run private tx with two nodes + run: | + cd e2e + GETH=`pwd`/../build/bin/geth ./run.sh & + # Second node, not mining + P2P_PORT=30302 DATADIR=datadir2 HTTP_PORT=8546 MINER_ARGS='--nodiscover' GETH=`pwd`/../build/bin/geth ./run.sh & + sleep 15 + DATADIR1=datadir DATADIR2=datadir2 GETH=`pwd`/../build/bin/geth ./peer_nodes.sh + sleep 15 yarn run demo-private-tx + pkill -9 geth || true + - name: Run megabundle-only node checking for reverts + run: | + cd e2e + # Disable bundle workers + MINER_ARGS='--miner.etherbase=0xd912aecb07e9f4e1ea8e6b4779e7fb6aa1c3e4d8 --miner.trustedrelays=0xfb11e78C4DaFec86237c2862441817701fdf197F --mine --miner.threads=2 --miner.maxmergedbundles=0' GETH=`pwd`/../build/bin/geth ./run.sh & + sleep 15 + yarn run e2e-reverting-megabundle + pkill -9 geth || true diff --git a/miner/worker.go b/miner/worker.go index 3f1d7c2dfc2b..2ef4d04c63f5 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -925,6 +925,7 @@ func (w *worker) commitTransaction(env *environment, tx *types.Transaction) ([]* return receipt.Logs, nil } +// Returns whether the block should be discarded func (w *worker) commitBundle(env *environment, txs types.Transactions, interrupt *int32) bool { gasLimit := env.header.GasLimit if env.gasPool == nil { @@ -938,8 +939,7 @@ func (w *worker) commitBundle(env *environment, txs types.Transactions, interrup // (1) new head block event arrival, the interrupt signal is 1 // (2) worker start or restart, the interrupt signal is 1 // (3) worker recreate the sealing block with any newly arrived transactions, the interrupt signal is 2. - // For the first two cases, the semi-finished work will be discarded. - // For the third case, the semi-finished work will be submitted to the consensus engine. + // Discard the interrupted work, since it is incomplete and contains partial bundles if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone { // Notify resubmit loop to increase resubmitting interval due to too frequent commits. if atomic.LoadInt32(interrupt) == commitInterruptResubmit { @@ -953,12 +953,13 @@ func (w *worker) commitBundle(env *environment, txs types.Transactions, interrup } } - return atomic.LoadInt32(interrupt) == commitInterruptNewHead + return true } - // If we don't have enough gas for any further transactions then we're done + // If we don't have enough gas for any further transactions discard the block + // since not all bundles of the were applied if env.gasPool.Gas() < params.TxGas { log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", params.TxGas) - break + return true } // Error may be ignored here. The error has already been checked @@ -1245,7 +1246,8 @@ func (w *worker) prepareWork(genParams *generateParams) (*environment, error) { // fillTransactions retrieves the pending transactions from the txpool and fills them // into the given sealing block. The transaction selection and ordering strategy can // be customized with the plugin in the future. -func (w *worker) fillTransactions(interrupt *int32, env *environment) { +// Returns whether the block should be discarded. +func (w *worker) fillTransactions(interrupt *int32, env *environment) bool { // Split the pending transactions into locals and remotes // Fill the block with all available pending transactions. pending := w.eth.TxPool().Pending(true) @@ -1260,20 +1262,20 @@ func (w *worker) fillTransactions(interrupt *int32, env *environment) { bundles, err := w.eth.TxPool().MevBundles(env.header.Number, env.header.Time) if err != nil { log.Error("Failed to fetch pending transactions", "err", err) - return + return true } bundleTxs, bundle, numBundles, err := w.generateFlashbotsBundle(env, bundles, pending) if err != nil { log.Error("Failed to generate flashbots bundle", "err", err) - return + return true } log.Info("Flashbots bundle", "ethToCoinbase", ethIntToFloat(bundle.totalEth), "gasUsed", bundle.totalGasUsed, "bundleScore", bundle.mevGasPrice, "bundleLength", len(bundleTxs), "numBundles", numBundles, "worker", w.flashbots.maxMergedBundles) if len(bundleTxs) == 0 { - return + return true } if w.commitBundle(env, bundleTxs, interrupt) { - return + return true } env.profit.Add(env.profit, bundle.ethSentToCoinbase) } @@ -1281,7 +1283,7 @@ func (w *worker) fillTransactions(interrupt *int32, env *environment) { megabundle, err := w.eth.TxPool().GetMegabundle(w.flashbots.relayAddr, env.header.Number, env.header.Time) log.Info("Starting to process a Megabundle", "relay", w.flashbots.relayAddr, "megabundle", megabundle, "error", err) if err != nil { - return // no valid megabundle for this relay, nothing to do + return true // no valid megabundle for this relay, nothing to do } // Flashbots bundle merging duplicates work by simulating TXes and then committing them once more. @@ -1289,7 +1291,7 @@ func (w *worker) fillTransactions(interrupt *int32, env *environment) { coinbaseBalanceBefore := env.state.GetBalance(env.coinbase) if w.commitBundle(env, megabundle.Txs, interrupt) { log.Info("Could not commit a Megabundle", "relay", w.flashbots.relayAddr, "megabundle", megabundle) - return + return true } var txStatuses = map[common.Hash]bool{} for _, receipt := range env.receipts { @@ -1299,11 +1301,11 @@ func (w *worker) fillTransactions(interrupt *int32, env *environment) { status, ok := txStatuses[tx.Hash()] if !ok { log.Error("No TX receipt after megabundle simulation", "TxHash", tx.Hash()) - return + return true } if !status && !containsHash(megabundle.RevertingTxHashes, tx.Hash()) { log.Info("Ignoring megabundle because of failing TX", "relay", w.flashbots.relayAddr, "TxHash", tx.Hash()) - return + return true } } coinbaseBalanceAfter := env.state.GetBalance(env.coinbase) @@ -1315,15 +1317,17 @@ func (w *worker) fillTransactions(interrupt *int32, env *environment) { if len(localTxs) > 0 { txs := types.NewTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee) if w.commitTransactions(env, txs, interrupt) { - return + return true } } if len(remoteTxs) > 0 { txs := types.NewTransactionsByPriceAndNonce(env.signer, remoteTxs, env.header.BaseFee) if w.commitTransactions(env, txs, interrupt) { - return + return true } } + + return false } // generateWork generates a sealing block based on the given parameters. @@ -1334,7 +1338,11 @@ func (w *worker) generateWork(params *generateParams) (*types.Block, error) { } defer work.discard() - w.fillTransactions(nil, work) + shouldDiscard := w.fillTransactions(nil, work) + if shouldDiscard { + return nil, errors.New("could not generate valid block") + } + return w.engine.FinalizeAndAssemble(w.chain, work.header, work.state, work.txs, work.unclelist(), work.receipts) } @@ -1365,7 +1373,11 @@ func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) { w.commit(work.copy(), nil, false, start) } // Fill pending transactions from the txpool - w.fillTransactions(interrupt, work) + shouldDiscard := w.fillTransactions(interrupt, work) + if shouldDiscard { + return + } + w.commit(work.copy(), w.fullTaskHook, true, start) // Swap out the old work with the new one, terminating any leftover @@ -1525,11 +1537,11 @@ func (w *worker) simulateBundles(env *environment, bundles []types.MevBundle, pe simulatedBundles := []simulatedBundle{} for _, bundle := range bundles { - state := env.state.Copy() - gasPool := new(core.GasPool).AddGas(env.header.GasLimit) if len(bundle.Txs) == 0 { continue } + state := env.state.Copy() + gasPool := new(core.GasPool).AddGas(env.header.GasLimit) simmed, err := w.computeBundleGas(env, bundle, state, gasPool, pendingTxs, 0) if err != nil {