Skip to content

Commit

Permalink
fix review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
pythonberg1997 committed Feb 21, 2024
1 parent 2e15dd6 commit 70bc8a6
Show file tree
Hide file tree
Showing 12 changed files with 47 additions and 159 deletions.
121 changes: 26 additions & 95 deletions core/txpool/bundlepool/bundlepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/prque"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/txpool"
Expand All @@ -25,6 +24,8 @@ const (
// that validating a new bundle remains a constant operation (in reality
// O(maxslots), where max slots are 4 currently).
bundleSlotSize = 128 * 1024 // 128KB

maxMinTimestampFromNow = int64(300) // 5 minutes
)

var (
Expand Down Expand Up @@ -61,19 +62,17 @@ type BundlePool struct {

slots uint64 // Number of slots currently allocated

bundleGasPricer *BundleGasPricer
simulator BundleSimulator
simulator BundleSimulator
}

func New(config Config) *BundlePool {
// Sanitize the input to ensure no vulnerable gas prices are set
config = (&config).sanitize()

pool := &BundlePool{
config: config,
bundles: make(map[common.Hash]*types.Bundle),
bundleHeap: make(BundleHeap, 0),
bundleGasPricer: NewBundleGasPricer(config.BundleGasPricerExpireTime),
config: config,
bundles: make(map[common.Hash]*types.Bundle),
bundleHeap: make(BundleHeap, 0),
}

return pool
Expand Down Expand Up @@ -102,16 +101,18 @@ func (p *BundlePool) AddBundle(bundle *types.Bundle) error {
return txpool.ErrSimulatorMissing
}

if bundle.MinTimestamp > uint64(time.Now().Unix()+maxMinTimestampFromNow) {
return txpool.ErrBundleTimestampTooHigh
}

price, err := p.simulator.SimulateBundle(bundle)
if err != nil {
return err
}
minimalGasPrice := p.bundleGasPricer.MinimalBundleGasPrice()
if price.Cmp(minimalGasPrice) < 0 && p.slots+numSlots(bundle) > p.config.GlobalSlots {
if price.Cmp(p.minimalBundleGasPrice()) < 0 && p.slots+numSlots(bundle) > p.config.GlobalSlots {
return txpool.ErrBundleGasPriceLow
}
bundle.Price = price
p.bundleGasPricer.Push(price)

hash := bundle.Hash()
if _, ok := p.bundles[hash]; ok {
Expand Down Expand Up @@ -144,15 +145,15 @@ func (p *BundlePool) PruneBundle(hash common.Hash) {
p.deleteBundle(hash)
}

func (p *BundlePool) PendingBundles(blockNumber *big.Int, blockTimestamp uint64) []*types.Bundle {
func (p *BundlePool) PendingBundles(blockNumber uint64, blockTimestamp uint64) []*types.Bundle {
p.mu.Lock()
defer p.mu.Unlock()

ret := make([]*types.Bundle, 0)
for hash, bundle := range p.bundles {
// Prune outdated bundles
if (bundle.MaxTimestamp != 0 && blockTimestamp > bundle.MaxTimestamp) ||
blockNumber.Cmp(new(big.Int).SetUint64(bundle.MaxBlockNumber)) > 0 {
blockNumber > bundle.MaxBlockNumber {
p.deleteBundle(hash)
continue
}
Expand Down Expand Up @@ -245,22 +246,19 @@ func (p *BundlePool) Nonce(addr common.Address) uint64 {
// Stats retrieves the current pool stats, namely the number of pending and the
// number of queued (non-executable) transactions.
func (p *BundlePool) Stats() (int, int) {
// TODO implement me
panic("implement me")
return 0, 0
}

// Content retrieves the data content of the transaction pool, returning all the
// pending as well as queued transactions, grouped by account and sorted by nonce.
func (p *BundlePool) Content() (map[common.Address][]*types.Transaction, map[common.Address][]*types.Transaction) {
// TODO implement me
panic("implement me")
return make(map[common.Address][]*types.Transaction), make(map[common.Address][]*types.Transaction)
}

// ContentFrom retrieves the data content of the transaction pool, returning the
// pending as well as queued transactions of this address, grouped by nonce.
func (p *BundlePool) ContentFrom(addr common.Address) ([]*types.Transaction, []*types.Transaction) {
// TODO implement me
panic("implement me")
return []*types.Transaction{}, []*types.Transaction{}
}

// Locals retrieves the accounts currently considered local by the pool.
Expand All @@ -271,8 +269,7 @@ func (p *BundlePool) Locals() []common.Address {
// Status returns the known status (unknown/pending/queued) of a transaction
// identified by their hashes.
func (p *BundlePool) Status(hash common.Hash) txpool.TxStatus {
// TODO implement me
panic("implement me")
return txpool.TxStatusUnknown
}

func (p *BundlePool) filter(tx *types.Transaction) bool {
Expand Down Expand Up @@ -320,85 +317,19 @@ func (p *BundlePool) drop() {
}
}

// =====================================================================================================================

// NewBundleGasPricer creates a new BundleGasPricer.
func NewBundleGasPricer(expire time.Duration) *BundleGasPricer {
return &BundleGasPricer{
expire: expire,
queue: prque.New[int64, *gasPriceInfo](nil),
latest: common.Big0,
}
}

// BundleGasPricer is a limited number of queues.
// In order to avoid too drastic gas price changes, the latest n gas prices are cached.
// Allowed as long as the user's Gas Price matches this range.
type BundleGasPricer struct {
mu sync.RWMutex
expire time.Duration
queue *prque.Prque[int64, *gasPriceInfo]
latest *big.Int
}

type gasPriceInfo struct {
val *big.Int
time time.Time
}

// Push is a method to cache a new gas price.
func (bgp *BundleGasPricer) Push(gasPrice *big.Int) {
bgp.mu.Lock()
defer bgp.mu.Unlock()
bgp.retire()
index := -gasPrice.Int64()
bgp.queue.Push(&gasPriceInfo{val: gasPrice, time: time.Now()}, index)
bgp.latest = gasPrice
}

func (bgp *BundleGasPricer) retire() {
now := time.Now()
// make sure the first element is not expired
for !bgp.queue.Empty() {
v, _ := bgp.queue.Peek()
info := v
if info.time.Add(bgp.expire).After(now) {
break
// minimalBundleGasPrice return the lowest gas price from the pool.
func (p *BundlePool) minimalBundleGasPrice() *big.Int {
for len(p.bundleHeap) != 0 {
leastPriceBundleHash := p.bundleHeap[0].Hash()
if bundle, ok := p.bundles[leastPriceBundleHash]; ok {
return bundle.Price
}
bgp.queue.Pop()
heap.Pop(&p.bundleHeap)
}
// only keep one element
length := bgp.queue.Size()
for i := 1; i < length; i++ {
bgp.queue.Remove(i)
}
}

// LatestBundleGasPrice is a method to get the latest-cached bundle gas price.
func (bgp *BundleGasPricer) LatestBundleGasPrice() *big.Int {
bgp.mu.RLock()
defer bgp.mu.RUnlock()
return bgp.latest
return new(big.Int)
}

// MinimalBundleGasPrice is a method to get minimal cached bundle gas price.
func (bgp *BundleGasPricer) MinimalBundleGasPrice() *big.Int {
bgp.mu.Lock()
defer bgp.mu.Unlock()
if bgp.queue.Empty() {
return common.Big0
}
bgp.retire()
v, _ := bgp.queue.Peek()
return v.val
}

// Clear is a method to clear all caches.
func (bgp *BundleGasPricer) Clear() {
bgp.mu.Lock()
defer bgp.mu.Unlock()
bgp.queue.Reset()
}
// =====================================================================================================================

// numSlots calculates the number of slots needed for a single bundle.
func numSlots(bundle *types.Bundle) uint64 {
Expand Down
6 changes: 3 additions & 3 deletions core/txpool/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,13 @@ var (
// ErrSimulatorMissing is returned if the bundle simulator is missing.
ErrSimulatorMissing = errors.New("bundle simulator is missing")

// ErrBundleTimestampTooHigh is returned if the bundle's MinTimestamp is too high.
ErrBundleTimestampTooHigh = errors.New("bundle MinTimestamp is too high")

// ErrBundleGasPriceLow is returned if the bundle gas price is too low.
ErrBundleGasPriceLow = errors.New("bundle gas price is too low")

// ErrBundleAlreadyExist is returned if the bundle is already contained
// within the pool.
ErrBundleAlreadyExist = errors.New("bundle already exist")

// ErrBundlePoolOverflow is returned if the bundle pool is full
ErrBundlePoolOverflow = errors.New("bundle pool is full")
)
2 changes: 1 addition & 1 deletion core/txpool/subpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ type BundleSubpool interface {
AddBundle(bundle *types.Bundle) error

// PendingBundles retrieves all currently processable bundles.
PendingBundles(blockNumber *big.Int, blockTimestamp uint64) []*types.Bundle
PendingBundles(blockNumber uint64, blockTimestamp uint64) []*types.Bundle

// AllBundles returns all the bundles currently in the pool.
AllBundles() []*types.Bundle
Expand Down
25 changes: 1 addition & 24 deletions core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,6 @@ type BlockChain interface {
SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription
}

type MevGasPricer interface {
MinimalBundleGasPrice() *big.Int
LatestBundleGasPrice() *big.Int
}

// TxPool is an aggregator for various transaction specific pools, collectively
// tracking all the transactions deemed interesting by the node. Transactions
// enter the pool when they are received from the network or submitted locally.
Expand Down Expand Up @@ -330,24 +325,6 @@ func (p *TxPool) PruneBundle(hash common.Hash) {
}
}

func (p *TxPool) MinimalBundleGasPrice() *big.Int {
for _, subpool := range p.subpools {
if gasPricer, ok := subpool.(MevGasPricer); ok {
return gasPricer.MinimalBundleGasPrice()
}
}
return common.Big0
}

func (p *TxPool) LatestBundleGasPrice() *big.Int {
for _, subpool := range p.subpools {
if gasPricer, ok := subpool.(MevGasPricer); ok {
return gasPricer.LatestBundleGasPrice()
}
}
return common.Big0
}

// Pending retrieves all currently processable transactions, grouped by origin
// account and sorted by nonce.
func (p *TxPool) Pending(enforceTips bool) map[common.Address][]*LazyTransaction {
Expand All @@ -361,7 +338,7 @@ func (p *TxPool) Pending(enforceTips bool) map[common.Address][]*LazyTransaction
}

// PendingBundles retrieves all currently processable bundles.
func (p *TxPool) PendingBundles(blockNumber *big.Int, blockTimestamp uint64) []*types.Bundle {
func (p *TxPool) PendingBundles(blockNumber uint64, blockTimestamp uint64) []*types.Bundle {
for _, subpool := range p.subpools {
if bundleSubpool, ok := subpool.(BundleSubpool); ok {
return bundleSubpool.PendingBundles(blockNumber, blockTimestamp)
Expand Down
2 changes: 1 addition & 1 deletion core/types/bid.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type Bid struct {
ParentHash common.Hash `json:"parentHash"`
Txs []hexutil.Bytes `json:"txs,omitempty"`
GasUsed uint64 `json:"gasUsed"`
GasFee uint64 `json:"gasFee"`
GasFee *big.Int `json:"gasFee"`
BuilderFee *big.Int `json:"builderFee"`

// caches
Expand Down
9 changes: 0 additions & 9 deletions eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,15 +295,6 @@ func (b *EthAPIBackend) SendBundle(ctx context.Context, bundle *types.Bundle) er
return b.eth.txPool.AddBundle(bundle)
}

func (b *EthAPIBackend) MinimalBundleGasPrice() *big.Int {
gasFloor := big.NewInt(b.eth.config.Miner.MevGasPriceFloor)
gasPrice := b.eth.TxPool().MinimalBundleGasPrice()
if gasPrice.Cmp(gasFloor) < 0 {
return gasFloor
}
return gasPrice
}

func (b *EthAPIBackend) BundlePrice() *big.Int {
bundles := b.eth.txPool.AllBundles()

Expand Down
2 changes: 1 addition & 1 deletion eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ type txPool interface {
Pending(enforceTips bool) map[common.Address][]*txpool.LazyTransaction

// PendingBundles should return pending bundles.
PendingBundles(blockNumber *big.Int, blockTimestamp uint64) []*types.Bundle
PendingBundles(blockNumber uint64, blockTimestamp uint64) []*types.Bundle

// SubscribeNewTxsEvent should return an event subscription of
// NewTxsEvent and send events to the given channel.
Expand Down
10 changes: 2 additions & 8 deletions internal/ethapi/api_bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,8 @@ type SendBundleArgs struct {
RevertingTxHashes []common.Hash `json:"revertingTxHashes"`
}

// BundlePrice is the response for the API `eth_bundlePrice`
type BundlePrice struct {
BundlePrice *big.Int `json:"bundlePrice"`
MinimalGasPrice *big.Int `json:"minimalGasPrice"`
}

func (s *PrivateTxBundleAPI) BundlePrice(ctx context.Context) (*BundlePrice, error) {
return &BundlePrice{BundlePrice: s.b.BundlePrice(), MinimalGasPrice: s.b.MinimalBundleGasPrice()}, nil
func (s *PrivateTxBundleAPI) BundlePrice(ctx context.Context) *big.Int {
return s.b.BundlePrice()
}

// SendBundle will add the signed transaction to the transaction pool.
Expand Down
1 change: 0 additions & 1 deletion internal/ethapi/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ type Backend interface {
// Transaction pool API
SendTx(ctx context.Context, signedTx *types.Transaction) error
SendBundle(ctx context.Context, bundle *types.Bundle) error
MinimalBundleGasPrice() *big.Int
BundlePrice() *big.Int
UnregisterMevValidator(validator common.Address)
GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error)
Expand Down
4 changes: 0 additions & 4 deletions les/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,6 @@ func (b *LesApiBackend) SendBundle(ctx context.Context, bundle *types.Bundle) er
return b.eth.txPool.AddBundle(bundle)
}

func (b *LesApiBackend) MinimalBundleGasPrice() *big.Int {
return nil
}

func (b *LesApiBackend) BundlePrice() *big.Int {
return nil
}
Expand Down
Loading

0 comments on commit 70bc8a6

Please sign in to comment.