Skip to content

Commit

Permalink
Added relay websocket support
Browse files Browse the repository at this point in the history
  • Loading branch information
Taarush Vemulapalli authored and jparyani committed May 18, 2021
1 parent b36e0f0 commit cb4764f
Show file tree
Hide file tree
Showing 6 changed files with 293 additions and 3 deletions.
5 changes: 2 additions & 3 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,12 @@ jobs:
uses: actions/checkout@v2
with:
repository: flashbots/mev-geth-demo
ref: jason/v0.2
path: e2e

- run: cd e2e && yarn install
- run: |
cd e2e
GETH=`pwd`/../build/bin/geth ./run.sh &
sleep 15
yarn run demo-simple
yarn run demo-contract
yarn run demo-ws-simple
yarn run demo-ws-contract
4 changes: 4 additions & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ var (
utils.EWASMInterpreterFlag,
utils.EVMInterpreterFlag,
utils.MinerNotifyFullFlag,
utils.RelayWSURL,
utils.RelayWSSigningKey,
utils.RelayWSSigningKeystoreFile,
utils.RelayWSKeystorePW,
configFileFlag,
utils.CatalystFlag,
}
Expand Down
48 changes: 48 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,24 @@ var (
Name: "allow-insecure-unlock",
Usage: "Allow insecure account unlocking when account-related RPCs are exposed by http",
}
// Related to the relay websocket options
RelayWSURL = cli.StringFlag{
Name: "relaywsurl",
Usage: "URL of the websocket relay sending bundles",
Value: "wss://miner-relay.flashbots.net",
}
RelayWSSigningKey = cli.StringFlag{
Name: "relaywssigningkey",
Usage: "Access key to authenticate with the relay websocket",
}
RelayWSSigningKeystoreFile = cli.StringFlag{
Name: "relaywssigningkeystorefile",
Usage: "Directory to keystore required to authenticate with the relay websocket",
}
RelayWSKeystorePW = cli.StringFlag{
Name: "relaywskeystorepw",
Usage: "Password of keystore required to authenticate with the relay websocket",
}
RPCGlobalGasCapFlag = cli.Uint64Flag{
Name: "rpc.gascap",
Usage: "Sets a cap on gas that can be used in eth_call/estimateGas (0=infinite)",
Expand Down Expand Up @@ -1344,6 +1362,36 @@ func setTxPool(ctx *cli.Context, cfg *core.TxPoolConfig) {
if ctx.GlobalIsSet(TxPoolLifetimeFlag.Name) {
cfg.Lifetime = ctx.GlobalDuration(TxPoolLifetimeFlag.Name)
}
WSURL := ctx.GlobalString(RelayWSURL.Name)
if WSURL == "" {
log.Warn("Relay websocket URL has not been provided, cannot receive bundles")
} else {
cfg.RelayWSURL = WSURL
}
WSKeystoreFile := ctx.GlobalString(RelayWSSigningKeystoreFile.Name)
WSKeystorePW := ctx.GlobalString(RelayWSKeystorePW.Name)
WSKey := ctx.GlobalString(RelayWSSigningKey.Name)
// Defaults to using the keystore
if WSKeystoreFile != "" && WSKeystorePW != "" {
jsonBytes, err := ioutil.ReadFile(WSKeystoreFile)
if err != nil {
log.Warn("Error decoding keystore file, only supports valid ethereum keystore with a password")
} else {
key, keyErr := keystore.DecryptKey(jsonBytes, WSKeystorePW)
if keyErr != nil {
log.Warn("Error decrypting keystore file with given password")
} else {
cfg.RelayWSSigningKey = common.Bytes2Hex(crypto.FromECDSA(key.PrivateKey))
}
}
} else { // If keystore is not provided, try the pk flag
if WSKey == "" {
log.Warn("Relay websocket auth config (private key or keystore) not found, cannot receive bundles")
} else {
cfg.RelayWSSigningKey = WSKey
}
}
cfg.Etherbase = ctx.GlobalString(MinerEtherbaseFlag.Name)
}

func setEthash(ctx *cli.Context, cfg *ethconfig.Config) {
Expand Down
226 changes: 226 additions & 0 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,28 @@
package core

import (
"encoding/json"
"errors"
"math"
"math/big"
"net/http"
"sort"
"strconv"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/common/prque"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rpc"
"github.com/gorilla/websocket"
)

const (
Expand Down Expand Up @@ -153,6 +160,10 @@ type TxPoolConfig struct {
GlobalQueue uint64 // Maximum number of non-executable transaction slots for all accounts

Lifetime time.Duration // Maximum amount of time non-executable transaction are queued

RelayWSURL string // Relay websocket url
RelayWSSigningKey string // Relay websocket access key
Etherbase string
}

// DefaultTxPoolConfig contains the default configurations for the transaction
Expand Down Expand Up @@ -211,6 +222,197 @@ func (config *TxPoolConfig) sanitize() TxPoolConfig {
return conf
}

// Connection to receive bundles from the relay via websocket
// From Gorilla websocket docs:
// Connections support one concurrent reader and one concurrent writer.
type wsConn struct {
conn *websocket.Conn
wsConnected bool
pingPeriod time.Duration
rlock sync.Mutex
wlock sync.Mutex
}

type relayAbstractMessage struct {
Data *json.RawMessage `json:"data"`
Type string `json:"type"`
}

type relaySuccessMessage struct {
Data string `json:"data"`
Type string `json:"type"`
}

type relayBundleMessage struct {
Data bundleData `json:"data"`
Type string `json:"type"`
}

type bundleData struct {
EncodedTxs []hexutil.Bytes `json:"txs"`
BlockNumber rpc.BlockNumber `json:"blockNumber"`
MinTimestamp uint64 `json:"minTimestamp"`
MaxTimestamp uint64 `json:"maxTimestamp"`
RevertingTxHashes []common.Hash `json:"revertingTxHashes"`
}

// Connect to the Relay WS to receive bundles
func (pool *TxPool) connectWS() {
log.Info("Attempting websocket connection")
authMessage := map[string]string{"timestamp": pool.getUTCTimestamp(), "signature": pool.getWSAuthSignature(), "coinbase": pool.config.Etherbase}
encodedAuthMessage, _ := json.Marshal(authMessage)
conn, _, err := websocket.DefaultDialer.Dial(pool.config.RelayWSURL, http.Header{"X-Auth-Message": {string(encodedAuthMessage)}})
if err != nil {
log.Error("Relay websocket connection error", "err", err)
pool.wsConnection.wsConnected = false
} else {
pool.wsConnection.wsConnected = true
pool.wsConnection.conn = conn
log.Info("Initiated a successful WS connection")
// Setup a pong handler, for client side heartbeat
pool.wsConnection.conn.SetPongHandler(func(string) error {
pool.wsConnection.wsConnected = true
return nil
})
}
}

// WS Status
func (pool *TxPool) WebsocketStatus() bool {
return pool.wsConnection.wsConnected
}

// WriteJSON wraps corresponding method on the websocket but is safe for concurrent calling
func (wsConnection *wsConn) WriteMessage(messageType int, data []byte) error {
wsConnection.wlock.Lock()
defer wsConnection.wlock.Unlock()

return wsConnection.conn.WriteMessage(messageType, data)
}

// ReadMessage wraps corresponding method on the websocket but is safe for concurrent calling
func (wsConnection *wsConn) ReadMessage() (int, string, error) {
wsConnection.rlock.Lock()
defer wsConnection.rlock.Unlock()

messageType, message, err := wsConnection.conn.ReadMessage()
return messageType, string(message), err
}

// Close wraps corresponding method on the websocket but is safe for concurrent calling
func (wsConnection *wsConn) CloseWS() error {
// The Close and WriteControl methods can be called concurrently with other methods
return wsConnection.conn.Close()
}

// Go routine that looks for messages from the relay
func (pool *TxPool) readWSMessages() {
for {
if pool.wsConnection.wsConnected {
messageType, message, err := pool.wsConnection.ReadMessage()
if err != nil {
if messageType == -1 { // message type emitted when relay ws connection is closed
log.Error("WS connection with relay closed")
pool.wsConnection.wsConnected = false
continue
} else {
log.Error("WS error while reading the relay message: ", "err", err.Error(), "messageType", messageType)
continue
}
}
var abstractMessage relayAbstractMessage
if err := json.Unmarshal([]byte(message), &abstractMessage); err != nil {
log.Error("Error while decoding relay message: ", "err", err.Error())
continue
}
// If relay message is of type "success", log the success message from relay
if abstractMessage.Type == "success" {
var successMessage relaySuccessMessage
if err := json.Unmarshal([]byte(message), &successMessage); err != nil {
log.Error("Error while decoding relay success message: ", "err", err.Error())
continue
}
log.Info(successMessage.Data)
}
// If relay message is of type "bundle", decode the payload and add bundle
// The sender is responsible for signing the transaction and using the correct nonce and ensuring validity
if abstractMessage.Type == "bundle" {
var bundleMessage relayBundleMessage
if err := json.Unmarshal([]byte(message), &bundleMessage); err != nil {
log.Error("Error while decoding relay bundle message: ", "err", err.Error())
continue
}

var txs types.Transactions
parseFailure := false

for _, encodedTx := range bundleMessage.Data.EncodedTxs {
tx := new(types.Transaction)
if err := tx.UnmarshalBinary(encodedTx); err != nil {
log.Error("Error while decoding bundle transactions: ", "err", err.Error())
parseFailure = true
break
}
txs = append(txs, tx)
}
if parseFailure {
continue
}

// Finally, we add the bundle sent to the tx pool
pool.AddMevBundle(txs, big.NewInt(bundleMessage.Data.BlockNumber.Int64()), bundleMessage.Data.MinTimestamp, bundleMessage.Data.MaxTimestamp, bundleMessage.Data.RevertingTxHashes)
}
} else {
time.Sleep(500 * time.Millisecond)
}
}
}

func (pool *TxPool) getUTCTimestamp() string {
now := time.Now()
seconds := now.Unix()
return strconv.FormatInt(seconds, 10)
}

func (pool *TxPool) getWSAuthSignature() string {
privateKey, err := crypto.HexToECDSA(pool.config.RelayWSSigningKey)
if err != nil {
log.Warn(err.Error())
}
message := pool.getUTCTimestamp()
hash := crypto.Keccak256Hash([]byte(message))
signature, err := crypto.Sign(hash.Bytes(), privateKey)
if err != nil {
log.Warn(err.Error())
}
return hexutil.Encode(signature)
}

// Go routine to ping the ws server periodically to ensure a live connection
func (pool *TxPool) ping() {
pool.wsConnection.wlock.Lock()
defer pool.wsConnection.wlock.Unlock()
ticker := time.NewTicker(pool.wsConnection.pingPeriod)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if !pool.wsConnection.wsConnected && pool.wsEnabled {
// If a ws connection hasn't already been established, attempt it
pool.connectWS()
} else {
// If it has already been connected, test with a ping message
if err := pool.wsConnection.conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(pool.wsConnection.pingPeriod)); err != nil {
pool.wsConnection.wsConnected = false
log.Info("Error while sending a ping to the relay WS, attempting reconnection")
pool.connectWS()
}
}

}
}
}

// TxPool contains all currently known transactions. Transactions
// enter the pool when they are received from the network or submitted
// locally. They exit the pool when they are included in the blockchain.
Expand All @@ -228,6 +430,9 @@ type TxPool struct {
signer types.Signer
mu sync.RWMutex

wsConnection wsConn // Websocket connection
wsEnabled bool

istanbul bool // Fork indicator whether we are in the istanbul stage.
eip2718 bool // Fork indicator whether we are using EIP-2718 type transactions.

Expand Down Expand Up @@ -312,6 +517,17 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
pool.wg.Add(1)
go pool.loop()

// Create WS connection with the relay server
if config.RelayWSURL != "" && config.RelayWSSigningKey != "" {
pool.wsEnabled = true
pool.wsConnection.wsConnected = false
pool.wsConnection.pingPeriod = 10 * time.Second
go pool.ping()
go pool.readWSMessages()
} else {
pool.wsEnabled = false
log.Warn("Provide --relayWSURL and --relayWSSigningKey flags to receive bundles from the relay ws")
}
return pool
}

Expand Down Expand Up @@ -404,6 +620,16 @@ func (pool *TxPool) Stop() {
if pool.journal != nil {
pool.journal.close()
}
pool.wsConnection.rlock.Lock()
defer pool.wsConnection.rlock.Unlock()
if pool.wsEnabled && pool.wsConnection.wsConnected {
err := pool.wsConnection.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
if err != nil {
log.Error("Error closing WS connection:", err)
return
}
pool.wsConnection.CloseWS()
}
log.Info("Transaction pool stopped")
}

Expand Down
4 changes: 4 additions & 0 deletions eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,10 @@ func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction)
}

func (b *EthAPIBackend) SendBundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash) error {
// If the client is already connected to the relay websocket, ignore bundles from the proxy
if b.eth.txPool.WebsocketStatus() {
return nil
}
return b.eth.txPool.AddMevBundle(txs, big.NewInt(blockNumber.Int64()), minTimestamp, maxTimestamp, revertingTxHashes)
}

Expand Down
9 changes: 9 additions & 0 deletions node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,15 @@ type Config struct {
// exposed.
WSModules []string

// RelayWSURL is the url of the relay websocket which sends bundles
RelayWSURL string
// RelayWSSigningKey is the ethereum private key of a whitelisted eoa, required to authenticate with the relay ws server
RelayWSSigningKey string
// RelayWSSigningKeystoreFile allows WS authentication signing via keystore files
RelayWSSigningKeystoreFile string
// RelayWSKeystorePW is the password of the WS auth keystore
RelayWSKeystorePW string

// WSExposeAll exposes all API modules via the WebSocket RPC interface rather
// than just the public ones.
//
Expand Down

0 comments on commit cb4764f

Please sign in to comment.