diff --git a/cmd/service/bidcollect.go b/cmd/service/bidcollect.go index e400ccc..c5897a8 100644 --- a/cmd/service/bidcollect.go +++ b/cmd/service/bidcollect.go @@ -23,7 +23,7 @@ var ( outputTSV bool // by default: CSV, but can be changed to TSV with this setting uid string // used in output filenames, to avoid collissions between multiple collector instances - webserverListenAddr string + redisAddr string runDevServerOnly bool // used to play with file listing website devServerListenAddr string @@ -49,8 +49,8 @@ func init() { // utils bidCollectCmd.Flags().StringVar(&uid, "uid", "", "unique identifier for output files (to avoid collisions)") - // webserver for SSE - bidCollectCmd.Flags().StringVar(&webserverListenAddr, "webserver-addr", "", "listen address for SSE subscription webserver") + // Redis for pushing bids to + bidCollectCmd.Flags().StringVar(&redisAddr, "redis", "", "Redis address for publishing bids (optional)") // devserver provides the file listing for playing with file HTML bidCollectCmd.Flags().BoolVar(&runDevServerOnly, "devserver", false, "only run devserver to play with file listing website") @@ -108,10 +108,13 @@ var bidCollectCmd = &cobra.Command{ BeaconNodeURI: beaconNodeURI, OutDir: outDir, OutputTSV: outputTSV, - WebserverAddr: webserverListenAddr, + RedisAddr: redisAddr, } - bidCollector := bidcollect.NewBidCollector(&opts) + bidCollector, err := bidcollect.NewBidCollector(&opts) + if err != nil { + log.WithError(err).Fatal("failed to create bid collector") + } bidCollector.MustStart() }, } diff --git a/docs/2024-06_bidcollect.md b/docs/2024-06_bidcollect.md index 1873010..f76597e 100644 --- a/docs/2024-06_bidcollect.md +++ b/docs/2024-06_bidcollect.md @@ -89,6 +89,18 @@ go run . service bidcollect --data-api --ultrasound-stream --all-relays go run . service bidcollect --get-header --beacon-uri http://localhost:3500 --all-relays ``` +To enable the SSE server, first run Redis: + +``` +docker run --name redis -d -p 6379:6379 redis +``` + +Then start the collector with the `--redis ` flag: + +```bash +go run . service bidcollect --data-api --ultrasound-stream --redis localhost:6379 +``` + --- ## Useful Clickhouse queries diff --git a/go.mod b/go.mod index 8aa707b..f3e239e 100644 --- a/go.mod +++ b/go.mod @@ -49,6 +49,7 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/deckarep/golang-set/v2 v2.1.0 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/ethereum/c-kzg-4844 v0.4.0 // indirect github.com/fatih/color v1.15.0 // indirect github.com/gballet/go-verkle v0.1.1-0.20231031103413-a67434b50f46 // indirect @@ -85,6 +86,7 @@ require ( github.com/prometheus/procfs v0.9.0 // indirect github.com/prysmaticlabs/go-bitfield v0.0.0-20210809151128-385d8c5e3fb7 // indirect github.com/r3labs/sse/v2 v2.10.0 // indirect + github.com/redis/go-redis/v9 v9.6.1 // indirect github.com/rivo/uniseg v0.4.4 // indirect github.com/rogpeppe/go-internal v1.10.0 // indirect github.com/shirou/gopsutil v3.21.11+incompatible // indirect diff --git a/go.sum b/go.sum index 678dd98..98963ea 100644 --- a/go.sum +++ b/go.sum @@ -79,6 +79,8 @@ github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 h1:8UrgZ3GkP4i/CLijOJx79Yu+etly github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0= github.com/dgraph-io/badger v1.6.0/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= @@ -353,6 +355,8 @@ github.com/prysmaticlabs/go-bitfield v0.0.0-20210809151128-385d8c5e3fb7 h1:0tVE4 github.com/prysmaticlabs/go-bitfield v0.0.0-20210809151128-385d8c5e3fb7/go.mod h1:wmuf/mdK4VMD+jA9ThwcUKjg3a2XWM9cVfFYjDyY4j4= github.com/r3labs/sse/v2 v2.10.0 h1:hFEkLLFY4LDifoHdiCN/LlGBAdVJYsANaLqNYa1l/v0= github.com/r3labs/sse/v2 v2.10.0/go.mod h1:Igau6Whc+F17QUgML1fYe1VPZzTV6EMCnYktEmkNJ7I= +github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4= +github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis= github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= diff --git a/services/bidcollect/bid-processor.go b/services/bidcollect/bid-processor.go index 18de860..00d3337 100644 --- a/services/bidcollect/bid-processor.go +++ b/services/bidcollect/bid-processor.go @@ -1,6 +1,7 @@ package bidcollect import ( + "context" "fmt" "os" "path/filepath" @@ -10,7 +11,7 @@ import ( "github.com/flashbots/relayscan/common" "github.com/flashbots/relayscan/services/bidcollect/types" - "github.com/flashbots/relayscan/services/bidcollect/webserver" + "github.com/redis/go-redis/v9" "github.com/sirupsen/logrus" ) @@ -21,11 +22,11 @@ import ( // - One CSV for top bids only type BidProcessorOpts struct { - Log *logrus.Entry - UID string - OutDir string - OutputTSV bool - WebserverAddr string + Log *logrus.Entry + UID string + OutDir string + OutputTSV bool + RedisAddr string } type OutFiles struct { @@ -47,10 +48,10 @@ type BidProcessor struct { csvSeparator string csvFileEnding string - webserver *webserver.Server + redisClient *redis.Client } -func NewBidProcessor(opts *BidProcessorOpts) *BidProcessor { +func NewBidProcessor(opts *BidProcessorOpts) (*BidProcessor, error) { c := &BidProcessor{ log: opts.Log, opts: opts, @@ -67,20 +68,22 @@ func NewBidProcessor(opts *BidProcessorOpts) *BidProcessor { c.csvFileEnding = "csv" } - if opts.WebserverAddr != "" { - c.webserver = webserver.New(&webserver.HTTPServerConfig{ - ListenAddr: opts.WebserverAddr, - Log: opts.Log, + if opts.RedisAddr != "" { + c.redisClient = redis.NewClient(&redis.Options{ + Addr: opts.RedisAddr, + Password: "", // no password set + DB: 0, // use default DB }) + + // Make sure we can connect to redis to connect to redis + if _, err := c.redisClient.Ping(context.Background()).Result(); err != nil { + return nil, err + } } - return c + return c, nil } func (c *BidProcessor) Start() { - if c.webserver != nil { - c.webserver.RunInBackground() - } - for { time.Sleep(30 * time.Second) c.housekeeping() @@ -117,9 +120,12 @@ func (c *BidProcessor) processBids(bids []*types.CommonBid) { isNewBid = true } - // Send to subscribers - if c.webserver != nil { - c.webserver.SendBid(bid) + // Send to Redis + if c.redisClient != nil { + err := c.redisClient.Publish(context.Background(), "bidcollect/bids", bid.ToCSVLine(",")).Err() + if err != nil { + c.log.WithError(err).Error("failed to publish bid to redis") + } } // Write to CSV diff --git a/services/bidcollect/bidcollector.go b/services/bidcollect/bidcollector.go index f357b9e..44aef66 100644 --- a/services/bidcollect/bidcollector.go +++ b/services/bidcollect/bidcollector.go @@ -21,7 +21,7 @@ type BidCollectorOpts struct { OutDir string OutputTSV bool - WebserverAddr string + RedisAddr string } type BidCollector struct { @@ -35,8 +35,8 @@ type BidCollector struct { processor *BidProcessor } -func NewBidCollector(opts *BidCollectorOpts) *BidCollector { - c := &BidCollector{ +func NewBidCollector(opts *BidCollectorOpts) (c *BidCollector, err error) { + c = &BidCollector{ log: opts.Log, opts: opts, } @@ -51,14 +51,14 @@ func NewBidCollector(opts *BidCollectorOpts) *BidCollector { c.getHeaderBidC = make(chan GetHeaderPollerBidsMsg, types.BidCollectorInputChannelSize) // output - c.processor = NewBidProcessor(&BidProcessorOpts{ - Log: opts.Log, - UID: opts.UID, - OutDir: opts.OutDir, - OutputTSV: opts.OutputTSV, - WebserverAddr: opts.WebserverAddr, + c.processor, err = NewBidProcessor(&BidProcessorOpts{ + Log: opts.Log, + UID: opts.UID, + OutDir: opts.OutDir, + OutputTSV: opts.OutputTSV, + RedisAddr: opts.RedisAddr, }) - return c + return c, err } func (c *BidCollector) MustStart() {