Skip to content

Commit

Permalink
hook up webserver to bid-processor
Browse files Browse the repository at this point in the history
  • Loading branch information
metachris committed Aug 1, 2024
1 parent 06d1777 commit 1e3ef6e
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 29 deletions.
23 changes: 19 additions & 4 deletions services/bidcollect/bid-processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/flashbots/relayscan/common"
"github.com/flashbots/relayscan/services/bidcollect/types"
"github.com/flashbots/relayscan/services/bidcollect/webserver"
"github.com/sirupsen/logrus"
)

Expand All @@ -20,10 +21,11 @@ import (
// - One CSV for top bids only

type BidProcessorOpts struct {
Log *logrus.Entry
UID string
OutDir string
OutputTSV bool
Log *logrus.Entry
UID string
OutDir string
OutputTSV bool
WebserverAddr string
}

type OutFiles struct {
Expand All @@ -44,6 +46,8 @@ type BidProcessor struct {

csvSeparator string
csvFileEnding string

webserver *webserver.Server
}

func NewBidProcessor(opts *BidProcessorOpts) *BidProcessor {
Expand All @@ -63,10 +67,20 @@ func NewBidProcessor(opts *BidProcessorOpts) *BidProcessor {
c.csvFileEnding = "csv"
}

if opts.WebserverAddr != "" {
c.webserver = webserver.New(&webserver.HTTPServerConfig{
ListenAddr: opts.WebserverAddr,
Log: opts.Log,
})
}
return c
}

func (c *BidProcessor) Start() {
if c.webserver != nil {
c.webserver.RunInBackground()
}

for {
time.Sleep(30 * time.Second)
c.housekeeping()
Expand Down Expand Up @@ -103,6 +117,7 @@ func (c *BidProcessor) processBids(bids []*types.CommonBid) {
isNewBid = true
}

// Send to subscribers
// Write to CSV
c.writeBidToFile(bid, isNewBid, isTopBid)
}
Expand Down
13 changes: 7 additions & 6 deletions services/bidcollect/webserver/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package webserver
import (
"fmt"
"net/http"
"strings"

"github.com/flashbots/relayscan/services/bidcollect/types"
"github.com/google/uuid"
)

Expand All @@ -30,7 +32,10 @@ func (srv *Server) handleSSESubscription(w http.ResponseWriter, r *http.Request)
}
srv.addSubscriber(&subscriber)

// pingTicker := time.NewTicker(5 * time.Second)
// Send CSV header
helloMsg := strings.Join(types.CommonBidCSVFields, "\t") + "\n"
fmt.Fprint(w, helloMsg)
w.(http.Flusher).Flush() //nolint:forcetypeassert

// Wait for txs or end of request...
for {
Expand All @@ -41,12 +46,8 @@ func (srv *Server) handleSSESubscription(w http.ResponseWriter, r *http.Request)
return

case msg := <-subscriber.msgC:
fmt.Fprintf(w, "data: %s\n\n", msg)
fmt.Fprintf(w, "%s\n", msg)
w.(http.Flusher).Flush() //nolint:forcetypeassert

// case <-pingTicker.C:
// fmt.Fprintf(w, ": ping\n\n")
// w.(http.Flusher).Flush() //nolint:forcetypeassert
}
}
}
31 changes: 12 additions & 19 deletions services/bidcollect/webserver/webserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@ type HTTPServerConfig struct {
ListenAddr string
Log *logrus.Entry

DrainDuration time.Duration
GracefulShutdownDuration time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
}

type Server struct {
Expand Down Expand Up @@ -73,9 +71,7 @@ func (srv *Server) RunInBackground() {

func (srv *Server) Shutdown() {
// api
ctx, cancel := context.WithTimeout(context.Background(), srv.cfg.GracefulShutdownDuration)
defer cancel()
if err := srv.srv.Shutdown(ctx); err != nil {
if err := srv.srv.Shutdown(context.Background()); err != nil {
srv.log.WithField("err", err).Error("Graceful HTTP server shutdown failed")
} else {
srv.log.Info("HTTP server gracefully stopped")
Expand Down Expand Up @@ -107,18 +103,15 @@ func (srv *Server) SendBid(ctx context.Context, bid *types.CommonBid) error {
return nil
}

// txRLP, err := common.TxToRLPString(tx.Tx)
// if err != nil {
// return err
// }

// // Send tx to all subscribers (only if channel is not full)
// for _, sub := range s.sseConnectionMap {
// select {
// case sub.txC <- txRLP:
// default:
// }
// }
msg := bid.ToCSVLine("\t")

// Send tx to all subscribers (only if channel is not full)
for _, sub := range srv.sseConnectionMap {
select {
case sub.msgC <- msg:
default:
}
}

return nil
}

0 comments on commit 1e3ef6e

Please sign in to comment.