Skip to content

Commit

Permalink
simplify a bit
Browse files Browse the repository at this point in the history
  • Loading branch information
metachris committed Jun 2, 2024
1 parent 93defa8 commit c676559
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 28 deletions.
6 changes: 4 additions & 2 deletions docs/adrs/202405-bidcollect.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,21 @@ go run . service bidcollect --out csv --data-api --ultrasound-stream
### Next up (must have)

- Diagram showing the flow of data and the components involved
- Consider methodology of storing "relay"
- File Output
- Combine all individual files into a big file
- Consider gzipped CSV output: https://gist.github.com/mchirico/6147687 (currently, an hour of bids is about 300MB)
- Consider Parquet output files
- Upload to S3 + R2 (see also mempool dumpster scripts)

### Could have

**Data API polling**
- consider improvements to timing
- relay-specific rate limits?

**Output**
- Stream (websocket or SSE)
**Stream Output**
- Websockets or SSE subscription

**getHeader polling**
- some already implemented in [collect-live-bids.go](/cmd/service/collect-live-bids.go))
Expand Down
48 changes: 25 additions & 23 deletions services/bidcollect/bid-processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,54 +63,56 @@ func (c *BidProcessor) processBids(bids []*CommonBid) {
c.bidCacheLock.Lock()
defer c.bidCacheLock.Unlock()

var isTopBid, isNewBid bool
for _, bid := range bids {
isNewBid, isTopBid = false, false
if _, ok := c.bidCache[bid.Slot]; !ok {
c.bidCache[bid.Slot] = make(map[string]*CommonBid)
}

// Check if bid is new top bid
if topBid, ok := c.topBidCache[bid.Slot]; !ok {
c.topBidCache[bid.Slot] = bid // first one for the slot
isTopBid = true
} else {
// if current bid has higher value, use it as new top bid
if bid.ValueAsBigInt().Cmp(topBid.ValueAsBigInt()) == 1 {
c.topBidCache[bid.Slot] = bid
c.exportTopBid(bid)
isTopBid = true
}
}

// process regular bids only once per unique key (slot+blockhash+parenthash+builderpubkey+value)
if _, ok := c.bidCache[bid.Slot][bid.UniqueKey()]; !ok {
// yet unknown bid, save it
c.bidCache[bid.Slot][bid.UniqueKey()] = bid
c.exportBid(bid)
isNewBid = true
}
}
}

func (c *BidProcessor) exportBid(bid *CommonBid) {
outF, _, err := c.getFiles(bid)
if err != nil {
c.log.WithError(err).Error("get get output file")
return
}
_, err = fmt.Fprint(outF, bid.ToCSVLine("\t")+"\n")
if err != nil {
c.log.WithError(err).Error("couldn't write bid to file")
return
// Write to CSV
c.writeBidToFile(bid, isNewBid, isTopBid)
}
}

func (c *BidProcessor) exportTopBid(bid *CommonBid) {
_, outF, err := c.getFiles(bid)
func (c *BidProcessor) writeBidToFile(bid *CommonBid, isNewBid, isTopBid bool) {
fAll, fTop, err := c.getFiles(bid)
if err != nil {
c.log.WithError(err).Error("get get output file")
return
}
_, err = fmt.Fprint(outF, bid.ToCSVLine("\t")+"\n")
if err != nil {
c.log.WithError(err).Error("couldn't write bid to file")
return
if isNewBid {
_, err = fmt.Fprint(fAll, bid.ToCSVLine(csvSeparator)+"\n")
if err != nil {
c.log.WithError(err).Error("couldn't write bid to file")
return
}
}
if isTopBid {
_, err = fmt.Fprint(fTop, bid.ToCSVLine(csvSeparator)+"\n")
if err != nil {
c.log.WithError(err).Error("couldn't write bid to file")
return
}
}
}

Expand All @@ -129,7 +131,7 @@ func (c *BidProcessor) getFiles(bid *CommonBid) (fAll, fTop *os.File, err error)
return outFiles.FAll, outFiles.FTop, nil
}

// Create output files
// Create output directory
dir := filepath.Join(c.opts.OutDir, t.Format(time.DateOnly))
err = os.MkdirAll(dir, os.ModePerm)
if err != nil {
Expand All @@ -147,7 +149,7 @@ func (c *BidProcessor) getFiles(bid *CommonBid) (fAll, fTop *os.File, err error)
c.log.WithError(err).Fatal("failed stat on output file")
}
if fi.Size() == 0 {
_, err = fmt.Fprint(fAll, strings.Join(CommonBidCSVFields, "\t")+"\n")
_, err = fmt.Fprint(fAll, strings.Join(CommonBidCSVFields, csvSeparator)+"\n")
if err != nil {
c.log.WithError(err).Fatal("failed to write header to output file")
}
Expand All @@ -164,7 +166,7 @@ func (c *BidProcessor) getFiles(bid *CommonBid) (fAll, fTop *os.File, err error)
c.log.WithError(err).Fatal("failed stat on output file")
}
if fi.Size() == 0 {
_, err = fmt.Fprint(fTop, strings.Join(CommonBidCSVFields, "\t")+"\n")
_, err = fmt.Fprint(fTop, strings.Join(CommonBidCSVFields, csvSeparator)+"\n")
if err != nil {
c.log.WithError(err).Fatal("failed to write header to output file")
}
Expand Down
4 changes: 2 additions & 2 deletions services/bidcollect/bidcollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ func NewBidCollector(opts *BidCollectorOpts) *BidCollector {
}

// inputs
c.dataAPIBidC = make(chan DataAPIPollerBidsMsg, 1000)
c.ultrasoundBidC = make(chan UltrasoundStreamBidsMsg, 1-00)
c.dataAPIBidC = make(chan DataAPIPollerBidsMsg, bidCollectorInputChannelSize)
c.ultrasoundBidC = make(chan UltrasoundStreamBidsMsg, bidCollectorInputChannelSize)

// output
c.processor = NewBidProcessor(&BidProcessorOpts{
Expand Down
9 changes: 9 additions & 0 deletions services/bidcollect/consts.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
package bidcollect

import (
relaycommon "github.com/flashbots/mev-boost-relay/common"
)

const (
ultrasoundStreamDefaultURL = "ws://relay-builders-eu.ultrasound.money/ws/v1/top_bid"
initialBackoffSec = 5
maxBackoffSec = 120

// bucketMinutes is the number of minutes to write into each CSV file (i.e. new file created for every X minutes bucket)
bucketMinutes = 60

// channel size for bid collector inputs
bidCollectorInputChannelSize = 1000
)

var csvSeparator = relaycommon.GetEnv("CSV_SEP", "\t")
3 changes: 2 additions & 1 deletion services/bidcollect/data-api-poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ func (poller *DataAPIPoller) Start() {
nextSlot := slot + 1
tNextSlot := common.SlotToTime(nextSlot)
untilNextSlot := tNextSlot.Sub(t)
time.Sleep(untilNextSlot)

poller.Log.Infof("[data-api poller] waiting until start of next slot (%d, %s from now)", nextSlot, untilNextSlot.String())
time.Sleep(untilNextSlot)

// then run polling loop
for {
Expand Down

0 comments on commit c676559

Please sign in to comment.