From c6765590910836964f988fe219d1904d0de255cf Mon Sep 17 00:00:00 2001 From: Chris Hager Date: Sun, 2 Jun 2024 10:05:03 +0200 Subject: [PATCH] simplify a bit --- docs/adrs/202405-bidcollect.md | 6 ++-- services/bidcollect/bid-processor.go | 48 ++++++++++++++------------ services/bidcollect/bidcollector.go | 4 +-- services/bidcollect/consts.go | 9 +++++ services/bidcollect/data-api-poller.go | 3 +- 5 files changed, 42 insertions(+), 28 deletions(-) diff --git a/docs/adrs/202405-bidcollect.md b/docs/adrs/202405-bidcollect.md index 07d4139..35a7ce4 100644 --- a/docs/adrs/202405-bidcollect.md +++ b/docs/adrs/202405-bidcollect.md @@ -34,10 +34,12 @@ go run . service bidcollect --out csv --data-api --ultrasound-stream ### Next up (must have) - Diagram showing the flow of data and the components involved +- Consider methodology of storing "relay" - File Output - Combine all individual files into a big file - Consider gzipped CSV output: https://gist.github.com/mchirico/6147687 (currently, an hour of bids is about 300MB) - Consider Parquet output files + - Upload to S3 + R2 (see also mempool dumpster scripts) ### Could have @@ -45,8 +47,8 @@ go run . service bidcollect --out csv --data-api --ultrasound-stream - consider improvements to timing - relay-specific rate limits? -**Output** -- Stream (websocket or SSE) +**Stream Output** +- Websockets or SSE subscription **getHeader polling** - some already implemented in [collect-live-bids.go](/cmd/service/collect-live-bids.go)) diff --git a/services/bidcollect/bid-processor.go b/services/bidcollect/bid-processor.go index 8178298..af0fb7d 100644 --- a/services/bidcollect/bid-processor.go +++ b/services/bidcollect/bid-processor.go @@ -63,7 +63,9 @@ func (c *BidProcessor) processBids(bids []*CommonBid) { c.bidCacheLock.Lock() defer c.bidCacheLock.Unlock() + var isTopBid, isNewBid bool for _, bid := range bids { + isNewBid, isTopBid = false, false if _, ok := c.bidCache[bid.Slot]; !ok { c.bidCache[bid.Slot] = make(map[string]*CommonBid) } @@ -71,11 +73,12 @@ func (c *BidProcessor) processBids(bids []*CommonBid) { // Check if bid is new top bid if topBid, ok := c.topBidCache[bid.Slot]; !ok { c.topBidCache[bid.Slot] = bid // first one for the slot + isTopBid = true } else { // if current bid has higher value, use it as new top bid if bid.ValueAsBigInt().Cmp(topBid.ValueAsBigInt()) == 1 { c.topBidCache[bid.Slot] = bid - c.exportTopBid(bid) + isTopBid = true } } @@ -83,34 +86,33 @@ func (c *BidProcessor) processBids(bids []*CommonBid) { if _, ok := c.bidCache[bid.Slot][bid.UniqueKey()]; !ok { // yet unknown bid, save it c.bidCache[bid.Slot][bid.UniqueKey()] = bid - c.exportBid(bid) + isNewBid = true } - } -} -func (c *BidProcessor) exportBid(bid *CommonBid) { - outF, _, err := c.getFiles(bid) - if err != nil { - c.log.WithError(err).Error("get get output file") - return - } - _, err = fmt.Fprint(outF, bid.ToCSVLine("\t")+"\n") - if err != nil { - c.log.WithError(err).Error("couldn't write bid to file") - return + // Write to CSV + c.writeBidToFile(bid, isNewBid, isTopBid) } } -func (c *BidProcessor) exportTopBid(bid *CommonBid) { - _, outF, err := c.getFiles(bid) +func (c *BidProcessor) writeBidToFile(bid *CommonBid, isNewBid, isTopBid bool) { + fAll, fTop, err := c.getFiles(bid) if err != nil { c.log.WithError(err).Error("get get output file") return } - _, err = fmt.Fprint(outF, bid.ToCSVLine("\t")+"\n") - if err != nil { - c.log.WithError(err).Error("couldn't write bid to file") - return + if isNewBid { + _, err = fmt.Fprint(fAll, bid.ToCSVLine(csvSeparator)+"\n") + if err != nil { + c.log.WithError(err).Error("couldn't write bid to file") + return + } + } + if isTopBid { + _, err = fmt.Fprint(fTop, bid.ToCSVLine(csvSeparator)+"\n") + if err != nil { + c.log.WithError(err).Error("couldn't write bid to file") + return + } } } @@ -129,7 +131,7 @@ func (c *BidProcessor) getFiles(bid *CommonBid) (fAll, fTop *os.File, err error) return outFiles.FAll, outFiles.FTop, nil } - // Create output files + // Create output directory dir := filepath.Join(c.opts.OutDir, t.Format(time.DateOnly)) err = os.MkdirAll(dir, os.ModePerm) if err != nil { @@ -147,7 +149,7 @@ func (c *BidProcessor) getFiles(bid *CommonBid) (fAll, fTop *os.File, err error) c.log.WithError(err).Fatal("failed stat on output file") } if fi.Size() == 0 { - _, err = fmt.Fprint(fAll, strings.Join(CommonBidCSVFields, "\t")+"\n") + _, err = fmt.Fprint(fAll, strings.Join(CommonBidCSVFields, csvSeparator)+"\n") if err != nil { c.log.WithError(err).Fatal("failed to write header to output file") } @@ -164,7 +166,7 @@ func (c *BidProcessor) getFiles(bid *CommonBid) (fAll, fTop *os.File, err error) c.log.WithError(err).Fatal("failed stat on output file") } if fi.Size() == 0 { - _, err = fmt.Fprint(fTop, strings.Join(CommonBidCSVFields, "\t")+"\n") + _, err = fmt.Fprint(fTop, strings.Join(CommonBidCSVFields, csvSeparator)+"\n") if err != nil { c.log.WithError(err).Fatal("failed to write header to output file") } diff --git a/services/bidcollect/bidcollector.go b/services/bidcollect/bidcollector.go index 02d52d2..917a99e 100644 --- a/services/bidcollect/bidcollector.go +++ b/services/bidcollect/bidcollector.go @@ -40,8 +40,8 @@ func NewBidCollector(opts *BidCollectorOpts) *BidCollector { } // inputs - c.dataAPIBidC = make(chan DataAPIPollerBidsMsg, 1000) - c.ultrasoundBidC = make(chan UltrasoundStreamBidsMsg, 1-00) + c.dataAPIBidC = make(chan DataAPIPollerBidsMsg, bidCollectorInputChannelSize) + c.ultrasoundBidC = make(chan UltrasoundStreamBidsMsg, bidCollectorInputChannelSize) // output c.processor = NewBidProcessor(&BidProcessorOpts{ diff --git a/services/bidcollect/consts.go b/services/bidcollect/consts.go index 57e222e..7cfff5e 100644 --- a/services/bidcollect/consts.go +++ b/services/bidcollect/consts.go @@ -1,5 +1,9 @@ package bidcollect +import ( + relaycommon "github.com/flashbots/mev-boost-relay/common" +) + const ( ultrasoundStreamDefaultURL = "ws://relay-builders-eu.ultrasound.money/ws/v1/top_bid" initialBackoffSec = 5 @@ -7,4 +11,9 @@ const ( // bucketMinutes is the number of minutes to write into each CSV file (i.e. new file created for every X minutes bucket) bucketMinutes = 60 + + // channel size for bid collector inputs + bidCollectorInputChannelSize = 1000 ) + +var csvSeparator = relaycommon.GetEnv("CSV_SEP", "\t") diff --git a/services/bidcollect/data-api-poller.go b/services/bidcollect/data-api-poller.go index 375a509..cf8eddb 100644 --- a/services/bidcollect/data-api-poller.go +++ b/services/bidcollect/data-api-poller.go @@ -46,8 +46,9 @@ func (poller *DataAPIPoller) Start() { nextSlot := slot + 1 tNextSlot := common.SlotToTime(nextSlot) untilNextSlot := tNextSlot.Sub(t) - time.Sleep(untilNextSlot) + poller.Log.Infof("[data-api poller] waiting until start of next slot (%d, %s from now)", nextSlot, untilNextSlot.String()) + time.Sleep(untilNextSlot) // then run polling loop for {