From 4d28194802d00912c975f7916d6b7efc2b0f055e Mon Sep 17 00:00:00 2001 From: Carlo Xu Date: Mon, 29 Nov 2021 16:57:18 -0600 Subject: [PATCH] Private Transaction API Sample (v1.10.13) --- cmd/geth/main.go | 1 + cmd/geth/usage.go | 1 + cmd/utils/flags.go | 8 +++ core/tx_pool.go | 113 +++++++++++++++++++++++++++++++---- eth/api_backend.go | 8 ++- eth/handler.go | 4 ++ eth/handler_test.go | 5 ++ eth/protocols/eth/handler.go | 4 ++ eth/sync.go | 7 ++- graphql/graphql.go | 2 +- internal/ethapi/api.go | 25 ++++++-- internal/ethapi/backend.go | 2 +- internal/web3ext/web3ext.go | 6 ++ les/api_backend.go | 2 +- 14 files changed, 164 insertions(+), 24 deletions(-) diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 03a6d72e608f..7e1bdaaed533 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -90,6 +90,7 @@ var ( utils.TxPoolAccountQueueFlag, utils.TxPoolGlobalQueueFlag, utils.TxPoolLifetimeFlag, + utils.TxPoolPrivateLifetimeFlag, utils.SyncModeFlag, utils.ExitWhenSyncedFlag, utils.GCModeFlag, diff --git a/cmd/geth/usage.go b/cmd/geth/usage.go index 9139c403d173..6514e00bed73 100644 --- a/cmd/geth/usage.go +++ b/cmd/geth/usage.go @@ -98,6 +98,7 @@ var AppHelpFlagGroups = []flags.FlagGroup{ utils.TxPoolAccountQueueFlag, utils.TxPoolGlobalQueueFlag, utils.TxPoolLifetimeFlag, + utils.TxPoolPrivateLifetimeFlag, }, }, { diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index f90be5d35404..db4bc8863412 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -400,6 +400,11 @@ var ( Usage: "Maximum amount of time non-executable transaction are queued", Value: ethconfig.Defaults.TxPool.Lifetime, } + TxPoolPrivateLifetimeFlag = cli.DurationFlag{ + Name: "txpool.privatelifetime", + Usage: "Maximum amount of time private transactions are withheld from public broadcasting", + Value: ethconfig.Defaults.TxPool.PrivateTxLifetime, + } // Performance tuning settings CacheFlag = cli.IntFlag{ Name: "cache", @@ -1466,6 +1471,9 @@ func setTxPool(ctx *cli.Context, cfg *core.TxPoolConfig) { if ctx.GlobalIsSet(TxPoolLifetimeFlag.Name) { cfg.Lifetime = ctx.GlobalDuration(TxPoolLifetimeFlag.Name) } + if ctx.GlobalIsSet(TxPoolPrivateLifetimeFlag.Name) { + cfg.PrivateTxLifetime = ctx.GlobalDuration(TxPoolPrivateLifetimeFlag.Name) + } addresses := strings.Split(ctx.GlobalString(MinerTrustedRelaysFlag.Name), ",") for _, address := range addresses { diff --git a/core/tx_pool.go b/core/tx_pool.go index 6440f021fe52..ae16d6b01b25 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -88,8 +88,9 @@ var ( ) var ( - evictionInterval = time.Minute // Time interval to check for evictable transactions - statsReportInterval = 8 * time.Second // Time interval to report transaction pool stats + evictionInterval = time.Minute // Time interval to check for evictable transactions + statsReportInterval = 8 * time.Second // Time interval to report transaction pool stats + privateTxCleanupInterval = 1 * time.Hour ) var ( @@ -164,7 +165,8 @@ type TxPoolConfig struct { AccountQueue uint64 // Maximum number of non-executable transaction slots permitted per account GlobalQueue uint64 // Maximum number of non-executable transaction slots for all accounts - Lifetime time.Duration // Maximum amount of time non-executable transaction are queued + Lifetime time.Duration // Maximum amount of time non-executable transaction are queued + PrivateTxLifetime time.Duration // Maximum amount of time to keep private transactions private TrustedRelays []common.Address // Trusted relay addresses. Duplicated from the miner config. } @@ -183,7 +185,8 @@ var DefaultTxPoolConfig = TxPoolConfig{ AccountQueue: 64, GlobalQueue: 1024, - Lifetime: 3 * time.Hour, + Lifetime: 3 * time.Hour, + PrivateTxLifetime: 3 * 24 * time.Hour, } // sanitize checks the provided user configurations and changes anything that's @@ -222,6 +225,10 @@ func (config *TxPoolConfig) sanitize() TxPoolConfig { log.Warn("Sanitizing invalid txpool lifetime", "provided", conf.Lifetime, "updated", DefaultTxPoolConfig.Lifetime) conf.Lifetime = DefaultTxPoolConfig.Lifetime } + if conf.PrivateTxLifetime < 1 { + log.Warn("Sanitizing invalid txpool private tx lifetime", "provided", conf.PrivateTxLifetime, "updated", DefaultTxPoolConfig.PrivateTxLifetime) + conf.PrivateTxLifetime = DefaultTxPoolConfig.PrivateTxLifetime + } return conf } @@ -261,6 +268,7 @@ type TxPool struct { NewMegabundleHooks []func(common.Address, *types.MevBundle) all *txLookup // All transactions to allow lookups priced *txPricedList // All transactions sorted by price + privateTxs *timestampedTxHashSet chainHeadCh chan ChainHeadEvent chainHeadSub event.Subscription @@ -296,6 +304,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block beats: make(map[common.Address]time.Time), megabundles: make(map[common.Address]types.MevBundle), all: newTxLookup(), + privateTxs: newExpiringTxHashSet(config.PrivateTxLifetime), chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize), reqResetCh: make(chan *txpoolResetRequest), reqPromoteCh: make(chan *accountSet), @@ -346,9 +355,10 @@ func (pool *TxPool) loop() { var ( prevPending, prevQueued, prevStales int // Start the stats reporting and transaction eviction tickers - report = time.NewTicker(statsReportInterval) - evict = time.NewTicker(evictionInterval) - journal = time.NewTicker(pool.config.Rejournal) + report = time.NewTicker(statsReportInterval) + evict = time.NewTicker(evictionInterval) + journal = time.NewTicker(pool.config.Rejournal) + privateTx = time.NewTicker(privateTxCleanupInterval) // Track the previous head headers for transaction reorgs head = pool.chain.CurrentBlock() ) @@ -412,6 +422,10 @@ func (pool *TxPool) loop() { } pool.mu.Unlock() } + + // Remove stale hashes that must be kept private + case <-privateTx.C: + pool.privateTxs.prune() } } } @@ -532,6 +546,11 @@ func (pool *TxPool) ContentFrom(addr common.Address) (types.Transactions, types. return pending, queued } +// IsPrivateTxHash indicates whether the transaction should be shared with peers +func (pool *TxPool) IsPrivateTxHash(hash common.Hash) bool { + return pool.privateTxs.Contains(hash) +} + // Pending retrieves all currently processable transactions, grouped by origin // account and sorted by nonce. The returned transaction set is a copy and can be // freely modified by calling code. @@ -958,7 +977,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T // This method is used to add transactions from the RPC API and performs synchronous pool // reorganization and event propagation. func (pool *TxPool) AddLocals(txs []*types.Transaction) []error { - return pool.addTxs(txs, !pool.config.NoLocals, true) + return pool.addTxs(txs, !pool.config.NoLocals, true, false) } // AddLocal enqueues a single local transaction into the pool if it is valid. This is @@ -974,12 +993,18 @@ func (pool *TxPool) AddLocal(tx *types.Transaction) error { // This method is used to add transactions from the p2p network and does not wait for pool // reorganization and internal event propagation. func (pool *TxPool) AddRemotes(txs []*types.Transaction) []error { - return pool.addTxs(txs, false, false) + return pool.addTxs(txs, false, false, false) +} + +// AddPrivateRemote adds transactions to the pool, but does not broadcast these transactions to any peers. +func (pool *TxPool) AddPrivateRemote(tx *types.Transaction) error { + errs := pool.addTxs([]*types.Transaction{tx}, false, false, true) + return errs[0] } // This is like AddRemotes, but waits for pool reorganization. Tests use this method. func (pool *TxPool) AddRemotesSync(txs []*types.Transaction) []error { - return pool.addTxs(txs, false, true) + return pool.addTxs(txs, false, true, false) } // This is like AddRemotes with a single transaction, but waits for pool reorganization. Tests use this method. @@ -998,7 +1023,7 @@ func (pool *TxPool) AddRemote(tx *types.Transaction) error { } // addTxs attempts to queue a batch of transactions if they are valid. -func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error { +func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync, private bool) []error { // Filter out known ones without obtaining the pool lock or recovering signatures var ( errs = make([]error, len(txs)) @@ -1027,6 +1052,13 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error { return errs } + // Track private transactions, so they don't get leaked to the public mempool + if private { + for _, tx := range news { + pool.privateTxs.Add(tx.Hash()) + } + } + // Process all the new transaction and merge any errors into the original slice pool.mu.Lock() newErrs, dirtyAddrs := pool.addTxsLocked(news, local) @@ -1321,7 +1353,11 @@ func (pool *TxPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirt if len(events) > 0 { var txs []*types.Transaction for _, set := range events { - txs = append(txs, set.Flatten()...) + for _, tx := range set.Flatten() { + if !pool.IsPrivateTxHash(tx.Hash()) { + txs = append(txs, tx) + } + } } pool.txFeed.Send(NewTxsEvent{txs}) } @@ -1931,6 +1967,59 @@ func (t *txLookup) RemotesBelowTip(threshold *big.Int) types.Transactions { return found } +type timestampedTxHashSet struct { + lock sync.RWMutex + hashes []common.Hash + timestamps map[common.Hash]time.Time + ttl time.Duration +} + +func newExpiringTxHashSet(ttl time.Duration) *timestampedTxHashSet { + s := ×tampedTxHashSet{ + hashes: make([]common.Hash, 0), + timestamps: make(map[common.Hash]time.Time), + ttl: ttl, + } + + return s +} + +func (s *timestampedTxHashSet) Add(hash common.Hash) { + s.lock.Lock() + defer s.lock.Unlock() + + s.hashes = append(s.hashes, hash) + s.timestamps[hash] = time.Now().Add(s.ttl) +} + +func (s *timestampedTxHashSet) Contains(hash common.Hash) bool { + s.lock.RLock() + defer s.lock.RUnlock() + _, ok := s.timestamps[hash] + return ok +} + +func (s *timestampedTxHashSet) prune() { + s.lock.Lock() + defer s.lock.Unlock() + + var ( + count int + now = time.Now() + ) + for _, hash := range s.hashes { + ts := s.timestamps[hash] + if ts.After(now) { + break + } + + delete(s.timestamps, hash) + count += 1 + } + + s.hashes = s.hashes[count:] +} + // numSlots calculates the number of slots needed for a single transaction. func numSlots(tx *types.Transaction) int { return int((tx.Size() + txSlotSize - 1) / txSlotSize) diff --git a/eth/api_backend.go b/eth/api_backend.go index 16e4a2d8101b..d6cf6e923c39 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -242,8 +242,12 @@ func (b *EthAPIBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscri return b.eth.BlockChain().SubscribeLogsEvent(ch) } -func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error { - return b.eth.txPool.AddLocal(signedTx) +func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction, private bool) error { + if private { + return b.eth.txPool.AddPrivateRemote(signedTx) + } else { + return b.eth.txPool.AddLocal(signedTx) + } } func (b *EthAPIBackend) SendBundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash) error { diff --git a/eth/handler.go b/eth/handler.go index 54efe18d64a1..bfe32c922948 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -72,6 +72,10 @@ type txPool interface { // SubscribeNewTxsEvent should return an event subscription of // NewTxsEvent and send events to the given channel. SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription + + // IsPrivateTxHash indicates if the transaction hash should not + // be broadcast on public channels + IsPrivateTxHash(hash common.Hash) bool } // handlerConfig is the collection of initialization parameters to create a full diff --git a/eth/handler_test.go b/eth/handler_test.go index d967b6df935e..382bed491bd1 100644 --- a/eth/handler_test.go +++ b/eth/handler_test.go @@ -113,6 +113,11 @@ func (p *testTxPool) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subs return p.txFeed.Subscribe(ch) } +// IsPrivateTxHash always returns false in tests +func (p *testTxPool) IsPrivateTxHash(hash common.Hash) bool { + return false +} + // testHandler is a live implementation of the Ethereum protocol handler, just // preinitialized with some sane testing defaults and the transaction pool mocked // out. diff --git a/eth/protocols/eth/handler.go b/eth/protocols/eth/handler.go index 3a0b21c30bdb..b655e1e6d079 100644 --- a/eth/protocols/eth/handler.go +++ b/eth/protocols/eth/handler.go @@ -91,6 +91,10 @@ type Backend interface { type TxPool interface { // Get retrieves the transaction from the local txpool with the given hash. Get(hash common.Hash) *types.Transaction + + // IsPrivateTxHash indicates if the transaction hash should not + // be broadcast on public channels + IsPrivateTxHash(hash common.Hash) bool } // MakeProtocols constructs the P2P protocol definitions for `eth`. diff --git a/eth/sync.go b/eth/sync.go index d67d2311d0d9..ff72d88a4b1c 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -46,7 +46,12 @@ func (h *handler) syncTransactions(p *eth.Peer) { var txs types.Transactions pending := h.txpool.Pending(false) for _, batch := range pending { - txs = append(txs, batch...) + for _, tx := range batch { + // don't share any transactions marked as private + if !h.txpool.IsPrivateTxHash(tx.Hash()) { + txs = append(txs, tx) + } + } } if len(txs) == 0 { return diff --git a/graphql/graphql.go b/graphql/graphql.go index 0654fd1af388..b3ee7e6ebce9 100644 --- a/graphql/graphql.go +++ b/graphql/graphql.go @@ -1241,7 +1241,7 @@ func (r *Resolver) SendRawTransaction(ctx context.Context, args struct{ Data hex if err := tx.UnmarshalBinary(args.Data); err != nil { return common.Hash{}, err } - hash, err := ethapi.SubmitTransaction(ctx, r.backend, tx) + hash, err := ethapi.SubmitTransaction(ctx, r.backend, tx, false) return hash, err } diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index 824422e697a8..a9c3fbc95152 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -468,7 +468,7 @@ func (s *PrivateAccountAPI) SendTransaction(ctx context.Context, args Transactio log.Warn("Failed transaction send attempt", "from", args.from(), "to", args.To, "value", args.Value.ToInt(), "err", err) return common.Hash{}, err } - return SubmitTransaction(ctx, s.b, signed) + return SubmitTransaction(ctx, s.b, signed, false) } // SignTransaction will create a transaction from the given arguments and @@ -1655,7 +1655,7 @@ func (s *PublicTransactionPoolAPI) sign(addr common.Address, tx *types.Transacti } // SubmitTransaction is a helper function that submits tx to txPool and logs a message. -func SubmitTransaction(ctx context.Context, b Backend, tx *types.Transaction) (common.Hash, error) { +func SubmitTransaction(ctx context.Context, b Backend, tx *types.Transaction, private bool) (common.Hash, error) { // If the transaction fee cap is already specified, ensure the // fee of the given transaction is _reasonable_. if err := checkTxFee(tx.GasPrice(), tx.Gas(), b.RPCTxFeeCap()); err != nil { @@ -1665,7 +1665,7 @@ func SubmitTransaction(ctx context.Context, b Backend, tx *types.Transaction) (c // Ensure only eip155 signed transactions are submitted if EIP155Required is set. return common.Hash{}, errors.New("only replay-protected (EIP-155) transactions allowed over RPC") } - if err := b.SendTx(ctx, tx); err != nil { + if err := b.SendTx(ctx, tx, private); err != nil { return common.Hash{}, err } // Print a log with full tx details for manual investigations and interventions @@ -1713,7 +1713,7 @@ func (s *PublicTransactionPoolAPI) SendTransaction(ctx context.Context, args Tra if err != nil { return common.Hash{}, err } - return SubmitTransaction(ctx, s.b, signed) + return SubmitTransaction(ctx, s.b, signed, false) } // FillTransaction fills the defaults (nonce, gas, gasPrice or 1559 fields) @@ -1740,7 +1740,20 @@ func (s *PublicTransactionPoolAPI) SendRawTransaction(ctx context.Context, input if err := tx.UnmarshalBinary(input); err != nil { return common.Hash{}, err } - return SubmitTransaction(ctx, s.b, tx) + return SubmitTransaction(ctx, s.b, tx, false) +} + +// SendPrivateRawTransaction will add the signed transaction to the transaction pool, +// without broadcasting the transaction to its peers, and mark the transaction to avoid +// future syncs. +// +// See SendRawTransaction. +func (s *PublicTransactionPoolAPI) SendPrivateRawTransaction(ctx context.Context, input hexutil.Bytes) (common.Hash, error) { + tx := new(types.Transaction) + if err := tx.UnmarshalBinary(input); err != nil { + return common.Hash{}, err + } + return SubmitTransaction(ctx, s.b, tx, true) } // Sign calculates an ECDSA signature for: @@ -1873,7 +1886,7 @@ func (s *PublicTransactionPoolAPI) Resend(ctx context.Context, sendArgs Transact if err != nil { return common.Hash{}, err } - if err = s.b.SendTx(ctx, signedTx); err != nil { + if err = s.b.SendTx(ctx, signedTx, false); err != nil { return common.Hash{}, err } return signedTx.Hash(), nil diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index 41ac17704085..22e0701a85d2 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -74,7 +74,7 @@ type Backend interface { SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription // Transaction pool API - SendTx(ctx context.Context, signedTx *types.Transaction) error + SendTx(ctx context.Context, signedTx *types.Transaction, private bool) error SendBundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash) error SendMegabundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash, relayAddr common.Address) error GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error) diff --git a/internal/web3ext/web3ext.go b/internal/web3ext/web3ext.go index 4ce1f84f22e9..de877f5159a4 100644 --- a/internal/web3ext/web3ext.go +++ b/internal/web3ext/web3ext.go @@ -530,6 +530,12 @@ web3._extend({ params: 1, inputFormatter: [web3._extend.formatters.inputTransactionFormatter] }), + new web3._extend.Method({ + name: 'sendPrivateRawTransaction', + call: 'eth_sendPrivateRawTransaction', + params: 1, + inputFormatter: [null] + }), new web3._extend.Method({ name: 'fillTransaction', call: 'eth_fillTransaction', diff --git a/les/api_backend.go b/les/api_backend.go index b910bd3e1f48..d0ead5ac8763 100644 --- a/les/api_backend.go +++ b/les/api_backend.go @@ -191,7 +191,7 @@ func (b *LesApiBackend) GetEVM(ctx context.Context, msg core.Message, state *sta return vm.NewEVM(context, txContext, state, b.eth.chainConfig, *vmConfig), state.Error, nil } -func (b *LesApiBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error { +func (b *LesApiBackend) SendTx(ctx context.Context, signedTx *types.Transaction, private bool) error { return b.eth.txPool.Add(ctx, signedTx) }