Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publish new bids to Redis #50

Merged
merged 6 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions cmd/service/bidcollect.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package service
import (
"github.com/flashbots/relayscan/common"
"github.com/flashbots/relayscan/services/bidcollect"
"github.com/flashbots/relayscan/services/bidcollect/webserver"
"github.com/flashbots/relayscan/services/bidcollect/website"
"github.com/flashbots/relayscan/vars"
"github.com/lithammer/shortuuid"
Expand All @@ -23,12 +24,18 @@ var (
outputTSV bool // by default: CSV, but can be changed to TSV with this setting
uid string // used in output filenames, to avoid collissions between multiple collector instances

useRedis bool
redisAddr string

runDevServerOnly bool // used to play with file listing website
devServerListenAddr string

buildWebsite bool
buildWebsiteUpload bool
buildWebsiteOutDir string

runWebserverOnly bool // provides a SSE stream of new bids
WebserverListenAddr string
)

func init() {
Expand All @@ -47,7 +54,15 @@ func init() {
// utils
bidCollectCmd.Flags().StringVar(&uid, "uid", "", "unique identifier for output files (to avoid collisions)")

// for dev purposes
// Redis for pushing bids to
bidCollectCmd.Flags().BoolVar(&useRedis, "redis", false, "Publish bids to Redis")
bidCollectCmd.Flags().StringVar(&redisAddr, "redis-addr", "localhost:6379", "Redis address for publishing bids (optional)")

// Webserver mode
bidCollectCmd.Flags().BoolVar(&runWebserverOnly, "webserver", false, "only run webserver for SSE stream")
bidCollectCmd.Flags().StringVar(&WebserverListenAddr, "webserver-addr", "localhost:8080", "listen address for webserver")

// devserver provides the file listing for playing with file HTML
bidCollectCmd.Flags().BoolVar(&runDevServerOnly, "devserver", false, "only run devserver to play with file listing website")
bidCollectCmd.Flags().StringVar(&devServerListenAddr, "devserver-addr", "localhost:8095", "listen address for devserver")

Expand All @@ -61,6 +76,15 @@ var bidCollectCmd = &cobra.Command{
Use: "bidcollect",
Short: "Collect bids",
Run: func(cmd *cobra.Command, args []string) {
if runWebserverOnly {
srv := webserver.New(&webserver.HTTPServerConfig{
ListenAddr: WebserverListenAddr,
RedisAddr: redisAddr,
Log: log,
})
srv.MustStart()
return
}
if runDevServerOnly {
log.Infof("Bidcollect %s devserver starting on %s ...", vars.Version, devServerListenAddr)
fileListingDevServer()
Expand Down Expand Up @@ -103,9 +127,13 @@ var bidCollectCmd = &cobra.Command{
BeaconNodeURI: beaconNodeURI,
OutDir: outDir,
OutputTSV: outputTSV,
RedisAddr: redisAddr,
}

bidCollector := bidcollect.NewBidCollector(&opts)
bidCollector, err := bidcollect.NewBidCollector(&opts)
if err != nil {
log.WithError(err).Fatal("failed to create bid collector")
}
bidCollector.MustStart()
},
}
Expand Down
26 changes: 26 additions & 0 deletions docs/2024-06_bidcollect.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ Different data sources have different limitations:
- Bids are deduplicated based on this key:
- `fmt.Sprintf("%d-%s-%s-%s-%s", bid.Slot, bid.BlockHash, bid.ParentHash, bid.BuilderPubkey, bid.Value)`
- this means only the first bid for a given key is stored, even if - for instance - other relays also deliver the same bid
- Bids can be published to Redis (to be consumed by whatever, i.e. a webserver). The channel is called `bidcollect/bids`.
- Enable publishing to Redis with the `--redis` flag
- You can start a webserver that publishes the data via a SSE stream with `--webserver`

---

Expand All @@ -89,6 +92,29 @@ go run . service bidcollect --data-api --ultrasound-stream --all-relays
go run . service bidcollect --get-header --beacon-uri http://localhost:3500 --all-relays
```

Publish new bids to Redis:

```bash
# Start Redis
docker run --name redis -d -p 6379:6379 redis

# Start the collector with the `--redis <addr>` flag:
go run . service bidcollect --data-api --ultrasound-stream --redis

# Subscribe to the `bidcollect/bids` channel
redis-cli SUBSCRIBE bidcollect/bids
```

SSE stream of bids via the built-in webserver:

```bash
# Start the webserver in another process to subscribe to Redis and publish bids as SSE stream:
go run . service bidcollect --webserver

# Check if it works by subscribing with curl
curl localhost:8080/v1/sse/bids
```

---

## Useful Clickhouse queries
Expand Down
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@ require (
github.com/flashbots/go-boost-utils v1.6.0
github.com/flashbots/go-utils v0.4.9
github.com/flashbots/mev-boost-relay v1.0.0-alpha4.0.20230519091033-0453fc247553
github.com/go-chi/chi/v5 v5.1.0
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.5.1
github.com/jmoiron/sqlx v1.3.5
github.com/lib/pq v1.10.9
github.com/lithammer/shortuuid v3.0.0+incompatible
github.com/metachris/flashbotsrpc v0.5.0
github.com/olekukonko/tablewriter v0.0.5
github.com/redis/go-redis/v9 v9.6.1
github.com/rubenv/sql-migrate v1.7.0
github.com/sirupsen/logrus v1.9.2
github.com/spf13/cobra v1.7.0
Expand Down Expand Up @@ -48,6 +51,7 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/deckarep/golang-set/v2 v2.1.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/ethereum/c-kzg-4844 v0.4.0 // indirect
github.com/fatih/color v1.15.0 // indirect
github.com/gballet/go-verkle v0.1.1-0.20231031103413-a67434b50f46 // indirect
Expand All @@ -59,7 +63,6 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/holiman/uint256 v1.2.4 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jarcoal/httpmock v1.2.0 // indirect
Expand Down
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bits-and-blooms/bitset v1.10.0 h1:ePXTeiPEazB5+opbv5fr8umg2R/1NlzgDsyepwsSr88=
github.com/bits-and-blooms/bitset v1.10.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/btcsuite/btcd v0.23.0 h1:V2/ZgjfDFIygAX3ZapeigkVBoVUtOJKSwrhZdlpSvaA=
github.com/btcsuite/btcd/btcec/v2 v2.3.2 h1:5n0X6hX0Zk+6omWcihdYvdAlGf2DfasC0GMf7DClJ3U=
github.com/btcsuite/btcd/btcec/v2 v2.3.2/go.mod h1:zYzJ8etWJQIv1Ogk7OzpWjowwOdXY1W/17j2MW85J04=
Expand Down Expand Up @@ -79,6 +83,8 @@ github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 h1:8UrgZ3GkP4i/CLijOJx79Yu+etly
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0=
github.com/dgraph-io/badger v1.6.0/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
Expand Down Expand Up @@ -121,6 +127,8 @@ github.com/getsentry/sentry-go v0.21.0/go.mod h1:lc76E2QywIyW8WuBnwl8Lc4bkmQH4+w
github.com/gin-contrib/sse v0.0.0-20190301062529-5545eab6dad3/go.mod h1:VJ0WA2NBN22VlZ2dKZQPAPnyWw5XTlK1KymzLKsr59s=
github.com/gin-gonic/gin v1.4.0/go.mod h1:OW2EZn3DO8Ln9oIKOvM++LBO+5UPHJJDH72/q/3rZdM=
github.com/go-check/check v0.0.0-20180628173108-788fd7840127/go.mod h1:9ES+weclKsC9YodN5RgxqK/VD9HM9JsCSh7rNhMZE98=
github.com/go-chi/chi/v5 v5.1.0 h1:acVI1TYaD+hhedDJ3r54HyA6sExp3HfXq7QWEEY/xMw=
github.com/go-chi/chi/v5 v5.1.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q=
github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA=
github.com/go-errors/errors v1.4.2/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og=
Expand Down Expand Up @@ -351,6 +359,8 @@ github.com/prysmaticlabs/go-bitfield v0.0.0-20210809151128-385d8c5e3fb7 h1:0tVE4
github.com/prysmaticlabs/go-bitfield v0.0.0-20210809151128-385d8c5e3fb7/go.mod h1:wmuf/mdK4VMD+jA9ThwcUKjg3a2XWM9cVfFYjDyY4j4=
github.com/r3labs/sse/v2 v2.10.0 h1:hFEkLLFY4LDifoHdiCN/LlGBAdVJYsANaLqNYa1l/v0=
github.com/r3labs/sse/v2 v2.10.0/go.mod h1:Igau6Whc+F17QUgML1fYe1VPZzTV6EMCnYktEmkNJ7I=
github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4=
github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis=
github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
Expand Down
54 changes: 40 additions & 14 deletions services/bidcollect/bid-processor.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package bidcollect

import (
"context"
"fmt"
"os"
"path/filepath"
Expand All @@ -9,6 +10,8 @@ import (
"time"

"github.com/flashbots/relayscan/common"
"github.com/flashbots/relayscan/services/bidcollect/types"
"github.com/redis/go-redis/v9"
"github.com/sirupsen/logrus"
)

Expand All @@ -23,6 +26,7 @@ type BidProcessorOpts struct {
UID string
OutDir string
OutputTSV bool
RedisAddr string
}

type OutFiles struct {
Expand All @@ -37,21 +41,23 @@ type BidProcessor struct {
outFiles map[int64]*OutFiles // map[slot][bidUniqueKey]Bid
outFilesLock sync.RWMutex

bidCache map[uint64]map[string]*CommonBid // map[slot][bidUniqueKey]Bid
topBidCache map[uint64]*CommonBid // map[slot]Bid
bidCache map[uint64]map[string]*types.CommonBid // map[slot][bidUniqueKey]Bid
topBidCache map[uint64]*types.CommonBid // map[slot]Bid
bidCacheLock sync.RWMutex

csvSeparator string
csvFileEnding string

redisClient *redis.Client
}

func NewBidProcessor(opts *BidProcessorOpts) *BidProcessor {
func NewBidProcessor(opts *BidProcessorOpts) (*BidProcessor, error) {
c := &BidProcessor{
log: opts.Log,
opts: opts,
outFiles: make(map[int64]*OutFiles),
bidCache: make(map[uint64]map[string]*CommonBid),
topBidCache: make(map[uint64]*CommonBid),
bidCache: make(map[uint64]map[string]*types.CommonBid),
topBidCache: make(map[uint64]*types.CommonBid),
}

if opts.OutputTSV {
Expand All @@ -62,7 +68,19 @@ func NewBidProcessor(opts *BidProcessorOpts) *BidProcessor {
c.csvFileEnding = "csv"
}

return c
if opts.RedisAddr != "" {
c.redisClient = redis.NewClient(&redis.Options{
Addr: opts.RedisAddr,
Password: "", // no password set
DB: 0, // use default DB
})

// Make sure we can connect to redis to connect to redis
if _, err := c.redisClient.Ping(context.Background()).Result(); err != nil {
return nil, err
}
}
return c, nil
}

func (c *BidProcessor) Start() {
Expand All @@ -72,15 +90,15 @@ func (c *BidProcessor) Start() {
}
}

func (c *BidProcessor) processBids(bids []*CommonBid) {
func (c *BidProcessor) processBids(bids []*types.CommonBid) {
c.bidCacheLock.Lock()
defer c.bidCacheLock.Unlock()

var isTopBid, isNewBid bool
for _, bid := range bids {
isNewBid, isTopBid = false, false
if _, ok := c.bidCache[bid.Slot]; !ok {
c.bidCache[bid.Slot] = make(map[string]*CommonBid)
c.bidCache[bid.Slot] = make(map[string]*types.CommonBid)
}

// Check if bid is new top bid
Expand All @@ -102,12 +120,20 @@ func (c *BidProcessor) processBids(bids []*CommonBid) {
isNewBid = true
}

// Send to Redis
if c.redisClient != nil {
err := c.redisClient.Publish(context.Background(), types.RedisChannel, bid.ToCSVLine(",")).Err()
if err != nil {
c.log.WithError(err).Error("failed to publish bid to redis")
}
}

// Write to CSV
c.writeBidToFile(bid, isNewBid, isTopBid)
}
}

func (c *BidProcessor) writeBidToFile(bid *CommonBid, isNewBid, isTopBid bool) {
func (c *BidProcessor) writeBidToFile(bid *types.CommonBid, isNewBid, isTopBid bool) {
fAll, fTop, err := c.getFiles(bid)
if err != nil {
c.log.WithError(err).Error("get get output file")
Expand All @@ -129,9 +155,9 @@ func (c *BidProcessor) writeBidToFile(bid *CommonBid, isNewBid, isTopBid bool) {
}
}

func (c *BidProcessor) getFiles(bid *CommonBid) (fAll, fTop *os.File, err error) {
func (c *BidProcessor) getFiles(bid *types.CommonBid) (fAll, fTop *os.File, err error) {
// hourlybucket
sec := int64(bucketMinutes * 60)
sec := int64(types.BucketMinutes * 60)
bucketTS := bid.ReceivedAtMs / 1000 / sec * sec // timestamp down-round to start of bucket
t := time.Unix(bucketTS, 0).UTC()

Expand Down Expand Up @@ -162,7 +188,7 @@ func (c *BidProcessor) getFiles(bid *CommonBid) (fAll, fTop *os.File, err error)
c.log.WithError(err).Fatal("failed stat on output file")
}
if fi.Size() == 0 {
_, err = fmt.Fprint(fAll, strings.Join(CommonBidCSVFields, c.csvSeparator)+"\n")
_, err = fmt.Fprint(fAll, strings.Join(types.CommonBidCSVFields, c.csvSeparator)+"\n")
if err != nil {
c.log.WithError(err).Fatal("failed to write header to output file")
}
Expand All @@ -179,7 +205,7 @@ func (c *BidProcessor) getFiles(bid *CommonBid) (fAll, fTop *os.File, err error)
c.log.WithError(err).Fatal("failed stat on output file")
}
if fi.Size() == 0 {
_, err = fmt.Fprint(fTop, strings.Join(CommonBidCSVFields, c.csvSeparator)+"\n")
_, err = fmt.Fprint(fTop, strings.Join(types.CommonBidCSVFields, c.csvSeparator)+"\n")
if err != nil {
c.log.WithError(err).Fatal("failed to write header to output file")
}
Expand Down Expand Up @@ -229,7 +255,7 @@ func (c *BidProcessor) housekeeping() {
filesBefore := len(c.outFiles)
c.outFilesLock.Lock()
for timestamp, outFiles := range c.outFiles {
usageSec := bucketMinutes * 60 * 2
usageSec := types.BucketMinutes * 60 * 2
if now-timestamp > int64(usageSec) { // remove all handles from 2x usage seconds ago
c.log.Info("closing output files", timestamp)
delete(c.outFiles, timestamp)
Expand Down
Loading
Loading