Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a CL proxy to allow multiple EL nodes #29

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ To stop the playground, press `Ctrl+C`.
The `EL` instance is deployed with this deterministic enode address:

```
enode://3479db4d9217fb5d7a8ed4d61ac36e120b05d36c2eefb795dc42ff2e971f251a2315f5649ea1833271e020b9adc98d5db9973c7ed92d6b2f1f2223088c3d852f@127.0.0.1:8545
enode://3479db4d9217fb5d7a8ed4d61ac36e120b05d36c2eefb795dc42ff2e971f251a2315f5649ea1833271e020b9adc98d5db9973c7ed92d6b2f1f2223088c3d852f@127.0.0.1:30303
```

Options:
Expand Down
193 changes: 193 additions & 0 deletions cl-proxy/cl-proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package clproxy

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"strings"
"time"

"github.com/flashbots/mev-boost-relay/common"
"github.com/sirupsen/logrus"
)

type Config struct {
LogOutput io.Writer
Port uint64
Primary string
Secondary string
}

func DefaultConfig() *Config {
return &Config{
LogOutput: os.Stdout,
Port: 5656,
}
}

type ClProxy struct {
config *Config
log *logrus.Entry
server *http.Server
}

func New(config *Config) (*ClProxy, error) {
log := common.LogSetup(false, "info")
log.Logger.SetOutput(config.LogOutput)

proxy := &ClProxy{
config: config,
log: log,
}

return proxy, nil
}

// Run starts the HTTP server
func (s *ClProxy) Run() error {
mux := http.NewServeMux()
s.server = &http.Server{
Addr: fmt.Sprintf(":%d", s.config.Port),
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
Handler: mux,
}

mux.HandleFunc("/", s.handleRequest)

s.log.Infof("Starting server on port %d", s.config.Port)
if err := s.server.ListenAndServe(); err != http.ErrServerClosed {
return fmt.Errorf("server error: %v", err)
}
return nil
}

// Close gracefully shuts down the server
func (s *ClProxy) Close() error {
s.log.Info("Shutting down server...")

// Create a context with timeout for shutdown
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

// Attempt graceful shutdown
if err := s.server.Shutdown(ctx); err != nil {
return fmt.Errorf("server shutdown error: %v", err)
}

return nil
}

type jsonrpcMessage struct {
Version string `json:"jsonrpc,omitempty"`
ID json.RawMessage `json:"id,omitempty"`
Method string `json:"method,omitempty"`
Params []json.RawMessage `json:"params,omitempty"`
Result json.RawMessage `json:"result,omitempty"`
}

func (s *ClProxy) handleRequest(w http.ResponseWriter, r *http.Request) {
// Only accept POST requests
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}

data, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, "Bad request", http.StatusBadRequest)
return
}

// Multiplex all the request to both primary and secondary but omit the
// block building requests (this is, remove 'params' field from FCU and omit get payload).
// There are two reasons for this:
// - The secondary builder does not use the Engine API to build blocks but the relayer so these requests are not necessary.
// - The CL->EL setup is not configured anyway to handle two block builders throught the Engine API.
// Note that we still have to relay this request to the primary EL node since we need
// to have a fallback node in the CL.
var jsonRPCRequest jsonrpcMessage
if err := json.Unmarshal(data, &jsonRPCRequest); err != nil {
http.Error(w, "Bad request", http.StatusBadRequest)
return
}

s.log.Info(fmt.Sprintf("Received request: method=%s", jsonRPCRequest.Method))

// proxy to primary and consider its response as the final response to send back to the CL
resp, err := s.proxy(s.config.Primary, r, data)
if err != nil {
s.log.Errorf("Error multiplexing to primary: %v", err)
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}
defer resp.Body.Close()

respData, err := io.ReadAll(resp.Body)
if err != nil {
s.log.Errorf("Error reading response from primary: %v", err)
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(resp.StatusCode)
w.Write(respData)

if s.config.Secondary == "" {
return
}

if strings.HasPrefix(jsonRPCRequest.Method, "engine_getPayload") {
// the only request we do not send since the secondary builder does not have the payload id
// and it will always fail
return
}

if strings.HasPrefix(jsonRPCRequest.Method, "engine_forkchoiceUpdated") {
// set to nil the second parameter of the forkchoiceUpdated call
if len(jsonRPCRequest.Params) == 1 {
// not expected
s.log.Warn("ForkchoiceUpdated call with only one parameter")
} else {
jsonRPCRequest.Params[1] = nil

data, err = json.Marshal(jsonRPCRequest)
if err != nil {
s.log.Errorf("Error marshalling forkchoiceUpdated request: %v", err)
return
}
}
}

// proxy to secondary
s.log.Info(fmt.Sprintf("Multiplexing request to secondary: method=%s", jsonRPCRequest.Method))
if _, err := s.proxy(s.config.Secondary, r, data); err != nil {
s.log.Errorf("Error multiplexing to secondary: %v", err)
}
}

func (s *ClProxy) proxy(dst string, r *http.Request, data []byte) (*http.Response, error) {
// Create a new request
req, err := http.NewRequest(http.MethodPost, dst, bytes.NewBuffer(data))
if err != nil {
return nil, err
}

// Copy headers. It is important since we have to copy
// the JWT header from the CL
req.Header = r.Header

// Perform the request
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return nil, err
}

return resp, nil
}
38 changes: 36 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
ecrypto "github.com/ethereum/go-ethereum/crypto"

"github.com/ferranbt/builder-playground/artifacts"
clproxy "github.com/ferranbt/builder-playground/cl-proxy"
mevboostrelay "github.com/ferranbt/builder-playground/mev-boost-relay"

"github.com/hashicorp/go-uuid"
Expand Down Expand Up @@ -61,6 +62,7 @@ var genesisDelayFlag uint64
var watchPayloadsFlag bool
var latestForkFlag bool
var useRethForValidation bool
var secondaryBuilderPort uint64

var rootCmd = &cobra.Command{
Use: "playground",
Expand Down Expand Up @@ -168,6 +170,7 @@ func main() {
rootCmd.Flags().BoolVar(&watchPayloadsFlag, "watch-payloads", false, "")
rootCmd.Flags().BoolVar(&latestForkFlag, "electra", false, "")
rootCmd.Flags().BoolVar(&useRethForValidation, "use-reth-for-validation", false, "enable flashbots_validateBuilderSubmissionV* on reth and use them for validation")
rootCmd.Flags().Uint64Var(&secondaryBuilderPort, "secondary", 1234, "port to use for the secondary builder")

downloadArtifactsCmd.Flags().BoolVar(&validateFlag, "validate", false, "")
validateCmd.Flags().Uint64Var(&numBlocksValidate, "num-blocks", 5, "")
Expand Down Expand Up @@ -369,6 +372,31 @@ func setupServices(svcManager *serviceManager, out *output) error {
return err
}

// Start the cl proxy
{
cfg := clproxy.DefaultConfig()
cfg.Primary = "http://localhost:8551"

if secondaryBuilderPort != 0 {
cfg.Secondary = fmt.Sprintf("http://localhost:%d", secondaryBuilderPort)
}

var err error
if cfg.LogOutput, err = out.LogOutput("cl-proxy"); err != nil {
return err
}
clproxy, err := clproxy.New(cfg)
if err != nil {
return fmt.Errorf("failed to create cl proxy: %w", err)
}

go func() {
if err := clproxy.Run(); err != nil {
svcManager.emitError()
}
}()
}

rethVersion := func() string {
cmd := exec.Command(rethBin, "--version")
out, err := cmd.Output()
Expand Down Expand Up @@ -404,13 +432,14 @@ func setupServices(svcManager *serviceManager, out *output) error {
"--p2p-secret-key", defaultRethDiscoveryPrivKeyLoc,
"--addr", "127.0.0.1",
"--port", "30303",
"--disable-discovery",
// "--disable-discovery",
// http config
"--http",
"--http.api", "admin,eth,net,web3",
"--http.port", "8545",
"--authrpc.port", "8551",
"--authrpc.jwtsecret", "{{.Dir}}/jwtsecret",
"-vvvv",
).
If(useRethForValidation, func(s *service) *service {
return s.WithReplacementArgs("--http.api", "admin,eth,web3,net,rpc,flashbots")
Expand Down Expand Up @@ -470,7 +499,7 @@ func setupServices(svcManager *serviceManager, out *output) error {
"--http-port", "3500",
"--disable-packet-filter",
"--target-peers", "0",
"--execution-endpoint", "http://localhost:8551",
"--execution-endpoint", "http://localhost:5656",
"--execution-jwt", "{{.Dir}}/jwtsecret",
"--builder", "http://localhost:5555",
"--builder-fallback-epochs-since-finalization", "0",
Expand Down Expand Up @@ -538,6 +567,11 @@ func setupServices(svcManager *serviceManager, out *output) error {
ports: []*port{
{name: "http", port: 5555},
},
}, &service{
name: "cl-proxy",
ports: []*port{
{name: "jsonrpc", port: 5656},
},
})

// print services info
Expand Down
Loading