From 1e3ef6e08a284f1a7874754da874ae5356661caa Mon Sep 17 00:00:00 2001 From: Chris Hager Date: Thu, 1 Aug 2024 20:43:48 +0200 Subject: [PATCH] hook up webserver to bid-processor --- services/bidcollect/bid-processor.go | 23 +++++++++++++--- services/bidcollect/webserver/handler.go | 13 ++++----- services/bidcollect/webserver/webserver.go | 31 +++++++++------------- 3 files changed, 38 insertions(+), 29 deletions(-) diff --git a/services/bidcollect/bid-processor.go b/services/bidcollect/bid-processor.go index 6228605..065d229 100644 --- a/services/bidcollect/bid-processor.go +++ b/services/bidcollect/bid-processor.go @@ -10,6 +10,7 @@ import ( "github.com/flashbots/relayscan/common" "github.com/flashbots/relayscan/services/bidcollect/types" + "github.com/flashbots/relayscan/services/bidcollect/webserver" "github.com/sirupsen/logrus" ) @@ -20,10 +21,11 @@ import ( // - One CSV for top bids only type BidProcessorOpts struct { - Log *logrus.Entry - UID string - OutDir string - OutputTSV bool + Log *logrus.Entry + UID string + OutDir string + OutputTSV bool + WebserverAddr string } type OutFiles struct { @@ -44,6 +46,8 @@ type BidProcessor struct { csvSeparator string csvFileEnding string + + webserver *webserver.Server } func NewBidProcessor(opts *BidProcessorOpts) *BidProcessor { @@ -63,10 +67,20 @@ func NewBidProcessor(opts *BidProcessorOpts) *BidProcessor { c.csvFileEnding = "csv" } + if opts.WebserverAddr != "" { + c.webserver = webserver.New(&webserver.HTTPServerConfig{ + ListenAddr: opts.WebserverAddr, + Log: opts.Log, + }) + } return c } func (c *BidProcessor) Start() { + if c.webserver != nil { + c.webserver.RunInBackground() + } + for { time.Sleep(30 * time.Second) c.housekeeping() @@ -103,6 +117,7 @@ func (c *BidProcessor) processBids(bids []*types.CommonBid) { isNewBid = true } + // Send to subscribers // Write to CSV c.writeBidToFile(bid, isNewBid, isTopBid) } diff --git a/services/bidcollect/webserver/handler.go b/services/bidcollect/webserver/handler.go index d90c35e..34aef46 100644 --- a/services/bidcollect/webserver/handler.go +++ b/services/bidcollect/webserver/handler.go @@ -3,7 +3,9 @@ package webserver import ( "fmt" "net/http" + "strings" + "github.com/flashbots/relayscan/services/bidcollect/types" "github.com/google/uuid" ) @@ -30,7 +32,10 @@ func (srv *Server) handleSSESubscription(w http.ResponseWriter, r *http.Request) } srv.addSubscriber(&subscriber) - // pingTicker := time.NewTicker(5 * time.Second) + // Send CSV header + helloMsg := strings.Join(types.CommonBidCSVFields, "\t") + "\n" + fmt.Fprint(w, helloMsg) + w.(http.Flusher).Flush() //nolint:forcetypeassert // Wait for txs or end of request... for { @@ -41,12 +46,8 @@ func (srv *Server) handleSSESubscription(w http.ResponseWriter, r *http.Request) return case msg := <-subscriber.msgC: - fmt.Fprintf(w, "data: %s\n\n", msg) + fmt.Fprintf(w, "%s\n", msg) w.(http.Flusher).Flush() //nolint:forcetypeassert - - // case <-pingTicker.C: - // fmt.Fprintf(w, ": ping\n\n") - // w.(http.Flusher).Flush() //nolint:forcetypeassert } } } diff --git a/services/bidcollect/webserver/webserver.go b/services/bidcollect/webserver/webserver.go index 5282077..51c19ab 100644 --- a/services/bidcollect/webserver/webserver.go +++ b/services/bidcollect/webserver/webserver.go @@ -20,10 +20,8 @@ type HTTPServerConfig struct { ListenAddr string Log *logrus.Entry - DrainDuration time.Duration - GracefulShutdownDuration time.Duration - ReadTimeout time.Duration - WriteTimeout time.Duration + ReadTimeout time.Duration + WriteTimeout time.Duration } type Server struct { @@ -73,9 +71,7 @@ func (srv *Server) RunInBackground() { func (srv *Server) Shutdown() { // api - ctx, cancel := context.WithTimeout(context.Background(), srv.cfg.GracefulShutdownDuration) - defer cancel() - if err := srv.srv.Shutdown(ctx); err != nil { + if err := srv.srv.Shutdown(context.Background()); err != nil { srv.log.WithField("err", err).Error("Graceful HTTP server shutdown failed") } else { srv.log.Info("HTTP server gracefully stopped") @@ -107,18 +103,15 @@ func (srv *Server) SendBid(ctx context.Context, bid *types.CommonBid) error { return nil } - // txRLP, err := common.TxToRLPString(tx.Tx) - // if err != nil { - // return err - // } - - // // Send tx to all subscribers (only if channel is not full) - // for _, sub := range s.sseConnectionMap { - // select { - // case sub.txC <- txRLP: - // default: - // } - // } + msg := bid.ToCSVLine("\t") + + // Send tx to all subscribers (only if channel is not full) + for _, sub := range srv.sseConnectionMap { + select { + case sub.msgC <- msg: + default: + } + } return nil }