From 70bc8a655d08810e9f341545e7a81c2770a000a1 Mon Sep 17 00:00:00 2001 From: Roshan Date: Wed, 21 Feb 2024 18:57:24 +0800 Subject: [PATCH] fix review comments --- core/txpool/bundlepool/bundlepool.go | 121 ++++++--------------------- core/txpool/errors.go | 6 +- core/txpool/subpool.go | 2 +- core/txpool/txpool.go | 25 +----- core/types/bid.go | 2 +- eth/api_backend.go | 9 -- eth/handler.go | 2 +- internal/ethapi/api_bundle.go | 10 +-- internal/ethapi/backend.go | 1 - les/api_backend.go | 4 - miner/bidder.go | 22 ++--- miner/worker_builder.go | 2 +- 12 files changed, 47 insertions(+), 159 deletions(-) diff --git a/core/txpool/bundlepool/bundlepool.go b/core/txpool/bundlepool/bundlepool.go index 9adcb86972..9ba3bd2257 100644 --- a/core/txpool/bundlepool/bundlepool.go +++ b/core/txpool/bundlepool/bundlepool.go @@ -7,7 +7,6 @@ import ( "time" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/common/prque" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/txpool" @@ -25,6 +24,8 @@ const ( // that validating a new bundle remains a constant operation (in reality // O(maxslots), where max slots are 4 currently). bundleSlotSize = 128 * 1024 // 128KB + + maxMinTimestampFromNow = int64(300) // 5 minutes ) var ( @@ -61,8 +62,7 @@ type BundlePool struct { slots uint64 // Number of slots currently allocated - bundleGasPricer *BundleGasPricer - simulator BundleSimulator + simulator BundleSimulator } func New(config Config) *BundlePool { @@ -70,10 +70,9 @@ func New(config Config) *BundlePool { config = (&config).sanitize() pool := &BundlePool{ - config: config, - bundles: make(map[common.Hash]*types.Bundle), - bundleHeap: make(BundleHeap, 0), - bundleGasPricer: NewBundleGasPricer(config.BundleGasPricerExpireTime), + config: config, + bundles: make(map[common.Hash]*types.Bundle), + bundleHeap: make(BundleHeap, 0), } return pool @@ -102,16 +101,18 @@ func (p *BundlePool) AddBundle(bundle *types.Bundle) error { return txpool.ErrSimulatorMissing } + if bundle.MinTimestamp > uint64(time.Now().Unix()+maxMinTimestampFromNow) { + return txpool.ErrBundleTimestampTooHigh + } + price, err := p.simulator.SimulateBundle(bundle) if err != nil { return err } - minimalGasPrice := p.bundleGasPricer.MinimalBundleGasPrice() - if price.Cmp(minimalGasPrice) < 0 && p.slots+numSlots(bundle) > p.config.GlobalSlots { + if price.Cmp(p.minimalBundleGasPrice()) < 0 && p.slots+numSlots(bundle) > p.config.GlobalSlots { return txpool.ErrBundleGasPriceLow } bundle.Price = price - p.bundleGasPricer.Push(price) hash := bundle.Hash() if _, ok := p.bundles[hash]; ok { @@ -144,7 +145,7 @@ func (p *BundlePool) PruneBundle(hash common.Hash) { p.deleteBundle(hash) } -func (p *BundlePool) PendingBundles(blockNumber *big.Int, blockTimestamp uint64) []*types.Bundle { +func (p *BundlePool) PendingBundles(blockNumber uint64, blockTimestamp uint64) []*types.Bundle { p.mu.Lock() defer p.mu.Unlock() @@ -152,7 +153,7 @@ func (p *BundlePool) PendingBundles(blockNumber *big.Int, blockTimestamp uint64) for hash, bundle := range p.bundles { // Prune outdated bundles if (bundle.MaxTimestamp != 0 && blockTimestamp > bundle.MaxTimestamp) || - blockNumber.Cmp(new(big.Int).SetUint64(bundle.MaxBlockNumber)) > 0 { + blockNumber > bundle.MaxBlockNumber { p.deleteBundle(hash) continue } @@ -245,22 +246,19 @@ func (p *BundlePool) Nonce(addr common.Address) uint64 { // Stats retrieves the current pool stats, namely the number of pending and the // number of queued (non-executable) transactions. func (p *BundlePool) Stats() (int, int) { - // TODO implement me - panic("implement me") + return 0, 0 } // Content retrieves the data content of the transaction pool, returning all the // pending as well as queued transactions, grouped by account and sorted by nonce. func (p *BundlePool) Content() (map[common.Address][]*types.Transaction, map[common.Address][]*types.Transaction) { - // TODO implement me - panic("implement me") + return make(map[common.Address][]*types.Transaction), make(map[common.Address][]*types.Transaction) } // ContentFrom retrieves the data content of the transaction pool, returning the // pending as well as queued transactions of this address, grouped by nonce. func (p *BundlePool) ContentFrom(addr common.Address) ([]*types.Transaction, []*types.Transaction) { - // TODO implement me - panic("implement me") + return []*types.Transaction{}, []*types.Transaction{} } // Locals retrieves the accounts currently considered local by the pool. @@ -271,8 +269,7 @@ func (p *BundlePool) Locals() []common.Address { // Status returns the known status (unknown/pending/queued) of a transaction // identified by their hashes. func (p *BundlePool) Status(hash common.Hash) txpool.TxStatus { - // TODO implement me - panic("implement me") + return txpool.TxStatusUnknown } func (p *BundlePool) filter(tx *types.Transaction) bool { @@ -320,85 +317,19 @@ func (p *BundlePool) drop() { } } -// ===================================================================================================================== - -// NewBundleGasPricer creates a new BundleGasPricer. -func NewBundleGasPricer(expire time.Duration) *BundleGasPricer { - return &BundleGasPricer{ - expire: expire, - queue: prque.New[int64, *gasPriceInfo](nil), - latest: common.Big0, - } -} - -// BundleGasPricer is a limited number of queues. -// In order to avoid too drastic gas price changes, the latest n gas prices are cached. -// Allowed as long as the user's Gas Price matches this range. -type BundleGasPricer struct { - mu sync.RWMutex - expire time.Duration - queue *prque.Prque[int64, *gasPriceInfo] - latest *big.Int -} - -type gasPriceInfo struct { - val *big.Int - time time.Time -} - -// Push is a method to cache a new gas price. -func (bgp *BundleGasPricer) Push(gasPrice *big.Int) { - bgp.mu.Lock() - defer bgp.mu.Unlock() - bgp.retire() - index := -gasPrice.Int64() - bgp.queue.Push(&gasPriceInfo{val: gasPrice, time: time.Now()}, index) - bgp.latest = gasPrice -} - -func (bgp *BundleGasPricer) retire() { - now := time.Now() - // make sure the first element is not expired - for !bgp.queue.Empty() { - v, _ := bgp.queue.Peek() - info := v - if info.time.Add(bgp.expire).After(now) { - break +// minimalBundleGasPrice return the lowest gas price from the pool. +func (p *BundlePool) minimalBundleGasPrice() *big.Int { + for len(p.bundleHeap) != 0 { + leastPriceBundleHash := p.bundleHeap[0].Hash() + if bundle, ok := p.bundles[leastPriceBundleHash]; ok { + return bundle.Price } - bgp.queue.Pop() + heap.Pop(&p.bundleHeap) } - // only keep one element - length := bgp.queue.Size() - for i := 1; i < length; i++ { - bgp.queue.Remove(i) - } -} - -// LatestBundleGasPrice is a method to get the latest-cached bundle gas price. -func (bgp *BundleGasPricer) LatestBundleGasPrice() *big.Int { - bgp.mu.RLock() - defer bgp.mu.RUnlock() - return bgp.latest + return new(big.Int) } -// MinimalBundleGasPrice is a method to get minimal cached bundle gas price. -func (bgp *BundleGasPricer) MinimalBundleGasPrice() *big.Int { - bgp.mu.Lock() - defer bgp.mu.Unlock() - if bgp.queue.Empty() { - return common.Big0 - } - bgp.retire() - v, _ := bgp.queue.Peek() - return v.val -} - -// Clear is a method to clear all caches. -func (bgp *BundleGasPricer) Clear() { - bgp.mu.Lock() - defer bgp.mu.Unlock() - bgp.queue.Reset() -} +// ===================================================================================================================== // numSlots calculates the number of slots needed for a single bundle. func numSlots(bundle *types.Bundle) uint64 { diff --git a/core/txpool/errors.go b/core/txpool/errors.go index 6245322508..c3e42bddd7 100644 --- a/core/txpool/errors.go +++ b/core/txpool/errors.go @@ -69,13 +69,13 @@ var ( // ErrSimulatorMissing is returned if the bundle simulator is missing. ErrSimulatorMissing = errors.New("bundle simulator is missing") + // ErrBundleTimestampTooHigh is returned if the bundle's MinTimestamp is too high. + ErrBundleTimestampTooHigh = errors.New("bundle MinTimestamp is too high") + // ErrBundleGasPriceLow is returned if the bundle gas price is too low. ErrBundleGasPriceLow = errors.New("bundle gas price is too low") // ErrBundleAlreadyExist is returned if the bundle is already contained // within the pool. ErrBundleAlreadyExist = errors.New("bundle already exist") - - // ErrBundlePoolOverflow is returned if the bundle pool is full - ErrBundlePoolOverflow = errors.New("bundle pool is full") ) diff --git a/core/txpool/subpool.go b/core/txpool/subpool.go index cd68c7b305..415be8f4db 100644 --- a/core/txpool/subpool.go +++ b/core/txpool/subpool.go @@ -150,7 +150,7 @@ type BundleSubpool interface { AddBundle(bundle *types.Bundle) error // PendingBundles retrieves all currently processable bundles. - PendingBundles(blockNumber *big.Int, blockTimestamp uint64) []*types.Bundle + PendingBundles(blockNumber uint64, blockTimestamp uint64) []*types.Bundle // AllBundles returns all the bundles currently in the pool. AllBundles() []*types.Bundle diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index 72b1991192..14ebba7663 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -57,11 +57,6 @@ type BlockChain interface { SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription } -type MevGasPricer interface { - MinimalBundleGasPrice() *big.Int - LatestBundleGasPrice() *big.Int -} - // TxPool is an aggregator for various transaction specific pools, collectively // tracking all the transactions deemed interesting by the node. Transactions // enter the pool when they are received from the network or submitted locally. @@ -330,24 +325,6 @@ func (p *TxPool) PruneBundle(hash common.Hash) { } } -func (p *TxPool) MinimalBundleGasPrice() *big.Int { - for _, subpool := range p.subpools { - if gasPricer, ok := subpool.(MevGasPricer); ok { - return gasPricer.MinimalBundleGasPrice() - } - } - return common.Big0 -} - -func (p *TxPool) LatestBundleGasPrice() *big.Int { - for _, subpool := range p.subpools { - if gasPricer, ok := subpool.(MevGasPricer); ok { - return gasPricer.LatestBundleGasPrice() - } - } - return common.Big0 -} - // Pending retrieves all currently processable transactions, grouped by origin // account and sorted by nonce. func (p *TxPool) Pending(enforceTips bool) map[common.Address][]*LazyTransaction { @@ -361,7 +338,7 @@ func (p *TxPool) Pending(enforceTips bool) map[common.Address][]*LazyTransaction } // PendingBundles retrieves all currently processable bundles. -func (p *TxPool) PendingBundles(blockNumber *big.Int, blockTimestamp uint64) []*types.Bundle { +func (p *TxPool) PendingBundles(blockNumber uint64, blockTimestamp uint64) []*types.Bundle { for _, subpool := range p.subpools { if bundleSubpool, ok := subpool.(BundleSubpool); ok { return bundleSubpool.PendingBundles(blockNumber, blockTimestamp) diff --git a/core/types/bid.go b/core/types/bid.go index a055aa78e6..b460a03bb1 100644 --- a/core/types/bid.go +++ b/core/types/bid.go @@ -16,7 +16,7 @@ type Bid struct { ParentHash common.Hash `json:"parentHash"` Txs []hexutil.Bytes `json:"txs,omitempty"` GasUsed uint64 `json:"gasUsed"` - GasFee uint64 `json:"gasFee"` + GasFee *big.Int `json:"gasFee"` BuilderFee *big.Int `json:"builderFee"` // caches diff --git a/eth/api_backend.go b/eth/api_backend.go index 8c515392f3..9d6169a456 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -295,15 +295,6 @@ func (b *EthAPIBackend) SendBundle(ctx context.Context, bundle *types.Bundle) er return b.eth.txPool.AddBundle(bundle) } -func (b *EthAPIBackend) MinimalBundleGasPrice() *big.Int { - gasFloor := big.NewInt(b.eth.config.Miner.MevGasPriceFloor) - gasPrice := b.eth.TxPool().MinimalBundleGasPrice() - if gasPrice.Cmp(gasFloor) < 0 { - return gasFloor - } - return gasPrice -} - func (b *EthAPIBackend) BundlePrice() *big.Int { bundles := b.eth.txPool.AllBundles() diff --git a/eth/handler.go b/eth/handler.go index 851acb8c37..421af80889 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -91,7 +91,7 @@ type txPool interface { Pending(enforceTips bool) map[common.Address][]*txpool.LazyTransaction // PendingBundles should return pending bundles. - PendingBundles(blockNumber *big.Int, blockTimestamp uint64) []*types.Bundle + PendingBundles(blockNumber uint64, blockTimestamp uint64) []*types.Bundle // SubscribeNewTxsEvent should return an event subscription of // NewTxsEvent and send events to the given channel. diff --git a/internal/ethapi/api_bundle.go b/internal/ethapi/api_bundle.go index 8996538206..3677d742e1 100644 --- a/internal/ethapi/api_bundle.go +++ b/internal/ethapi/api_bundle.go @@ -42,14 +42,8 @@ type SendBundleArgs struct { RevertingTxHashes []common.Hash `json:"revertingTxHashes"` } -// BundlePrice is the response for the API `eth_bundlePrice` -type BundlePrice struct { - BundlePrice *big.Int `json:"bundlePrice"` - MinimalGasPrice *big.Int `json:"minimalGasPrice"` -} - -func (s *PrivateTxBundleAPI) BundlePrice(ctx context.Context) (*BundlePrice, error) { - return &BundlePrice{BundlePrice: s.b.BundlePrice(), MinimalGasPrice: s.b.MinimalBundleGasPrice()}, nil +func (s *PrivateTxBundleAPI) BundlePrice(ctx context.Context) *big.Int { + return s.b.BundlePrice() } // SendBundle will add the signed transaction to the transaction pool. diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index b88a3844f0..217de4cd1d 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -78,7 +78,6 @@ type Backend interface { // Transaction pool API SendTx(ctx context.Context, signedTx *types.Transaction) error SendBundle(ctx context.Context, bundle *types.Bundle) error - MinimalBundleGasPrice() *big.Int BundlePrice() *big.Int UnregisterMevValidator(validator common.Address) GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error) diff --git a/les/api_backend.go b/les/api_backend.go index 07ae5b56c9..d948342a8d 100644 --- a/les/api_backend.go +++ b/les/api_backend.go @@ -204,10 +204,6 @@ func (b *LesApiBackend) SendBundle(ctx context.Context, bundle *types.Bundle) er return b.eth.txPool.AddBundle(bundle) } -func (b *LesApiBackend) MinimalBundleGasPrice() *big.Int { - return nil -} - func (b *LesApiBackend) BundlePrice() *big.Int { return nil } diff --git a/miner/bidder.go b/miner/bidder.go index b86dd4aa77..b589f6bd58 100644 --- a/miner/bidder.go +++ b/miner/bidder.go @@ -2,7 +2,6 @@ package miner import ( "context" - "crypto/tls" "fmt" "net" "net/http" @@ -34,7 +33,6 @@ var ( MaxIdleConnsPerHost: 50, MaxConnsPerHost: 50, IdleConnTimeout: 90 * time.Second, - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, } client = &http.Client{ @@ -134,14 +132,16 @@ func (b *Bidder) mainLoop() { b.bestWorks[work.header.Number.Int64()] = work } case <-timer.C: - if b.bestWorks[currentHeight] != nil { - b.bid(b.bestWorks[currentHeight]) - b.bestWorks[currentHeight] = nil - bidNum++ - if bidNum < maxBid && time.Now().Before(bidUntil) { - timer.Reset(bidUntil.Sub(time.Now()) / time.Duration(maxBid-bidNum)) + go func() { + if b.bestWorks[currentHeight] != nil { + b.bid(b.bestWorks[currentHeight]) + b.bestWorks[currentHeight] = nil + bidNum++ + if bidNum < maxBid && time.Now().Before(bidUntil) { + timer.Reset(bidUntil.Sub(time.Now()) / time.Duration(maxBid-bidNum)) + } } - } + }() case <-b.exitCh: return } @@ -189,7 +189,7 @@ func (b *Bidder) bid(work *environment) { ) if cli == nil { - log.Error("Bidder: invalid validator", "validator", work.coinbase) + log.Info("Bidder: validator not integrated", "validator", work.coinbase) return } @@ -211,7 +211,7 @@ func (b *Bidder) bid(work *environment) { BlockNumber: parent.Number.Uint64() + 1, ParentHash: parent.Hash(), GasUsed: work.header.GasUsed, - GasFee: work.profit.Uint64(), + GasFee: work.profit, Txs: txs, // TODO: decide builderFee according to realtime traffic and validator commission } diff --git a/miner/worker_builder.go b/miner/worker_builder.go index 3123094936..4189ac1952 100644 --- a/miner/worker_builder.go +++ b/miner/worker_builder.go @@ -51,7 +51,7 @@ func (w *worker) fillTransactionsAndBundles(interruptCh chan int32, env *environ } } - bundles = w.eth.TxPool().PendingBundles(env.header.Number, env.header.Time) + bundles = w.eth.TxPool().PendingBundles(env.header.Number.Uint64(), env.header.Time) log.Info("fill bundles and transactions", "bundles_count", len(bundles), "tx_count", len(pending))