Skip to content

Commit

Permalink
Calculate megabundle as soon as it's received (#112)
Browse files Browse the repository at this point in the history
* Calculate megabundle as soon as its received
* Make event loop non-blocking and queue tasks and interrupts
  • Loading branch information
Ruteri authored Feb 8, 2022
1 parent 1d7171c commit f66aad3
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 31 deletions.
24 changes: 16 additions & 8 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,13 +253,14 @@ type TxPool struct {
locals *accountSet // Set of local transaction to exempt from eviction rules
journal *txJournal // Journal of local transaction to back up to disk

pending map[common.Address]*txList // All currently processable transactions
queue map[common.Address]*txList // Queued but non-processable transactions
beats map[common.Address]time.Time // Last heartbeat from each known account
mevBundles []types.MevBundle
megabundles map[common.Address]types.MevBundle // One megabundle per each trusted relay
all *txLookup // All transactions to allow lookups
priced *txPricedList // All transactions sorted by price
pending map[common.Address]*txList // All currently processable transactions
queue map[common.Address]*txList // Queued but non-processable transactions
beats map[common.Address]time.Time // Last heartbeat from each known account
mevBundles []types.MevBundle
megabundles map[common.Address]types.MevBundle // One megabundle per each trusted relay
NewMegabundleHooks []func(common.Address, *types.MevBundle)
all *txLookup // All transactions to allow lookups
priced *txPricedList // All transactions sorted by price

chainHeadCh chan ChainHeadEvent
chainHeadSub event.Subscription
Expand Down Expand Up @@ -630,13 +631,20 @@ func (pool *TxPool) AddMegabundle(relayAddr common.Address, txs types.Transactio
return errors.New("megabundle from non-trusted address")
}

pool.megabundles[relayAddr] = types.MevBundle{
megabundle := types.MevBundle{
Txs: txs,
BlockNumber: blockNumber,
MinTimestamp: minTimestamp,
MaxTimestamp: maxTimestamp,
RevertingTxHashes: revertingTxHashes,
}

pool.megabundles[relayAddr] = megabundle

for _, hook := range pool.NewMegabundleHooks {
go hook(relayAddr, &megabundle)
}

return nil
}

Expand Down
29 changes: 22 additions & 7 deletions miner/multi_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,31 @@ func newMultiWorker(config *Config, chainConfig *params.ChainConfig, engine cons
}))
}

relayWorkerMap := make(map[common.Address]*worker)

for i := 0; i < len(config.TrustedRelays); i++ {
workers = append(workers,
newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, init, merger, &flashbotsData{
isFlashbots: true,
isMegabundleWorker: true,
queue: queue,
relayAddr: config.TrustedRelays[i],
}))
relayWorker := newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, init, merger, &flashbotsData{
isFlashbots: true,
isMegabundleWorker: true,
queue: queue,
relayAddr: config.TrustedRelays[i],
})
workers = append(workers, relayWorker)
relayWorkerMap[config.TrustedRelays[i]] = relayWorker
}

eth.TxPool().NewMegabundleHooks = append(eth.TxPool().NewMegabundleHooks, func(relayAddr common.Address, megabundle *types.MevBundle) {
worker, found := relayWorkerMap[relayAddr]
if !found {
return
}

select {
case worker.newMegabundleCh <- megabundle:
default:
}
})

log.Info("creating multi worker", "config.MaxMergedBundles", config.MaxMergedBundles, "config.TrustedRelays", config.TrustedRelays, "worker", len(workers))
return &multiWorker{
regularWorker: regularWorker,
Expand Down
60 changes: 44 additions & 16 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ type worker struct {
exitCh chan struct{}
resubmitIntervalCh chan time.Duration
resubmitAdjustCh chan *intervalAdjust
newMegabundleCh chan *types.MevBundle

wg sync.WaitGroup

Expand Down Expand Up @@ -240,15 +241,17 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus
txsCh: make(chan core.NewTxsEvent, txChanSize),
chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize),
chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize),
newWorkCh: make(chan *newWorkReq),
newWorkCh: make(chan *newWorkReq, 1),
taskCh: taskCh,
resultCh: make(chan *types.Block, resultQueueSize),
exitCh: exitCh,
startCh: make(chan struct{}, 1),
newMegabundleCh: make(chan *types.MevBundle),
resubmitIntervalCh: make(chan time.Duration),
resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize),
flashbots: flashbots,
}

// Subscribe NewTxsEvent for tx pool
worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh)
// Subscribe events for blockchain
Expand Down Expand Up @@ -391,26 +394,38 @@ func recalcRecommit(minRecommit, prev time.Duration, target float64, inc bool) t
func (w *worker) newWorkLoop(recommit time.Duration) {
defer w.wg.Done()
var (
interrupt *int32
minRecommit = recommit // minimal resubmit interval specified by user.
timestamp int64 // timestamp for each round of mining.
runningInterrupt *int32 // Running task interrupt
queuedInterrupt *int32 // Queued task interrupt
minRecommit = recommit // minimal resubmit interval specified by user.
timestamp int64 // timestamp for each round of mining.
)

timer := time.NewTimer(0)
defer timer.Stop()
<-timer.C // discard the initial tick

// commit aborts in-flight transaction execution with given signal and resubmits a new one.
// commit aborts in-flight transaction execution with highest seen signal and resubmits a new one
commit := func(noempty bool, s int32) {
if interrupt != nil {
atomic.StoreInt32(interrupt, s)
}
interrupt = new(int32)
select {
case w.newWorkCh <- &newWorkReq{interrupt: interrupt, noempty: noempty, timestamp: timestamp}:
case <-w.exitCh:
return
case queuedRequest := <-w.newWorkCh:
// Previously queued request wasn't started yet, update the request and resubmit
queuedRequest.noempty = queuedRequest.noempty || noempty
queuedRequest.timestamp = timestamp
w.newWorkCh <- queuedRequest // guaranteed to be nonblocking
default:
// Previously queued request has already started, cycle interrupt pointer and submit new work
runningInterrupt = queuedInterrupt
queuedInterrupt = new(int32)

w.newWorkCh <- &newWorkReq{interrupt: queuedInterrupt, noempty: noempty, timestamp: timestamp} // guaranteed to be nonblocking
}

if runningInterrupt != nil && s > atomic.LoadInt32(runningInterrupt) {
atomic.StoreInt32(runningInterrupt, s)
}

timer.Reset(recommit)
atomic.StoreInt32(&w.newTxs, 0)
}
Expand All @@ -437,6 +452,11 @@ func (w *worker) newWorkLoop(recommit time.Duration) {
timestamp = time.Now().Unix()
commit(false, commitInterruptNewHead)

case <-w.newMegabundleCh:
if w.isRunning() {
commit(true, commitInterruptNone)
}

case <-timer.C:
// If mining is running resubmit a new work cycle periodically to pull in
// higher priced transactions. Disable this overhead for pending blocks.
Expand Down Expand Up @@ -500,7 +520,10 @@ func (w *worker) mainLoop() {
for {
select {
case req := <-w.newWorkCh:
w.commitNewWork(req.interrupt, req.noempty, req.timestamp)
// Don't start if the work has already been interrupted
if req.interrupt == nil || atomic.LoadInt32(req.interrupt) == commitInterruptNone {
w.commitNewWork(req.interrupt, req.noempty, req.timestamp)
}

case ev := <-w.chainSideCh:
// Short circuit for duplicate side blocks
Expand Down Expand Up @@ -761,10 +784,10 @@ func (w *worker) generateEnv(parent *types.Block, header *types.Header) (*enviro
// makeCurrent creates a new environment for the current cycle.
func (w *worker) makeCurrent(parent *types.Block, header *types.Header) error {
env, err := w.generateEnv(parent, header)
env.state.StartPrefetcher("miner")
if err != nil {
return err
}
env.state.StartPrefetcher("miner")

// Swap out the old work with the new one, terminating any leftover prefetcher
// processes in the mean time and starting a new one.
Expand Down Expand Up @@ -869,7 +892,6 @@ func (w *worker) commitBundle(txs types.Transactions, coinbase common.Address, i
// (2) worker start or restart, the interrupt signal is 1
// (3) worker recreate the mining block with any newly arrived transactions, the interrupt signal is 2.
// For the first two cases, the semi-finished work will be discarded.
// For the third case, the semi-finished work will be submitted to the consensus engine.
if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone {
// Notify resubmit loop to increase resubmitting interval due to too frequent commits.
if atomic.LoadInt32(interrupt) == commitInterruptResubmit {
Expand All @@ -881,8 +903,11 @@ func (w *worker) commitBundle(txs types.Transactions, coinbase common.Address, i
ratio: ratio,
inc: true,
}
return false
}
return atomic.LoadInt32(interrupt) == commitInterruptNewHead

// Discard the work as new head is present
return true
}
// If we don't have enough gas for any further transactions then we're done
if w.current.gasPool.Gas() < params.TxGas {
Expand Down Expand Up @@ -982,7 +1007,6 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin
// (2) worker start or restart, the interrupt signal is 1
// (3) worker recreate the mining block with any newly arrived transactions, the interrupt signal is 2.
// For the first two cases, the semi-finished work will be discarded.
// For the third case, the semi-finished work will be submitted to the consensus engine.
if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone {
// Notify resubmit loop to increase resubmitting interval due to too frequent commits.
if atomic.LoadInt32(interrupt) == commitInterruptResubmit {
Expand All @@ -994,8 +1018,11 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin
ratio: ratio,
inc: true,
}
return false
}
return atomic.LoadInt32(interrupt) == commitInterruptNewHead

// Discard the work as new head is present
return true
}
// If we don't have enough gas for any further transactions then we're done
if w.current.gasPool.Gas() < params.TxGas {
Expand Down Expand Up @@ -1223,6 +1250,7 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64)
if err != nil {
return // no valid megabundle for this relay, nothing to do
}

// Flashbots bundle merging duplicates work by simulating TXes and then committing them once more.
// Megabundles API focuses on speed and runs everything in one cycle.
coinbaseBalanceBefore := w.current.state.GetBalance(w.coinbase)
Expand Down

0 comments on commit f66aad3

Please sign in to comment.