Skip to content

Commit

Permalink
make top-bid websocket stream generic
Browse files Browse the repository at this point in the history
  • Loading branch information
metachris committed Jun 26, 2024
1 parent 4412b41 commit 34dcdf5
Show file tree
Hide file tree
Showing 13 changed files with 167 additions and 156 deletions.
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ cover-html:
docker-image:
DOCKER_BUILDKIT=1 docker build --platform linux/amd64 --build-arg VERSION=${VERSION} . -t relayscan


generate-ssz:
rm -f common/ultrasoundbid_encoding.go
sszgen --path common --objs UltrasoundStreamBid
Expand Down
34 changes: 20 additions & 14 deletions cmd/service/bidcollect.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ import (
)

var (
collectUltrasoundStream bool
collectGetHeader bool
collectDataAPI bool
useAllRelays bool
collectTopBidWebsocketStream bool
collectGetHeader bool
collectDataAPI bool
useAllRelays bool

outDir string
outputTSV bool // by default: CSV, but can be changed to TSV with this setting
Expand All @@ -32,7 +32,7 @@ var (
)

func init() {
bidCollectCmd.Flags().BoolVar(&collectUltrasoundStream, "ultrasound-stream", false, "use ultrasound top-bid stream")
bidCollectCmd.Flags().BoolVar(&collectTopBidWebsocketStream, "top-bid-ws-stream", false, "use top-bid websocket streams")
bidCollectCmd.Flags().BoolVar(&collectGetHeader, "get-header", false, "use getHeader API")
bidCollectCmd.Flags().BoolVar(&collectDataAPI, "data-api", false, "use data API")
bidCollectCmd.Flags().BoolVar(&useAllRelays, "all-relays", false, "use all relays")
Expand Down Expand Up @@ -93,16 +93,22 @@ var bidCollectCmd = &cobra.Command{
log.Infof("- relay #%d: %s", index+1, relay.Hostname())
}

topBidRelays := []common.RelayEntry{}
for _, url := range vars.TopBidStreamURLs {
topBidRelays = append(topBidRelays, common.MustNewRelayEntry(url, false))
}

opts := bidcollect.BidCollectorOpts{
Log: log,
UID: uid,
Relays: relays,
CollectUltrasoundStream: collectUltrasoundStream,
CollectGetHeader: collectGetHeader,
CollectDataAPI: collectDataAPI,
BeaconNodeURI: beaconNodeURI,
OutDir: outDir,
OutputTSV: outputTSV,
Log: log,
UID: uid,
Relays: relays,
CollectTopBidWebsocketStream: collectTopBidWebsocketStream,
TopBidWebsocketRelays: topBidRelays,
CollectGetHeader: collectGetHeader,
CollectDataAPI: collectDataAPI,
BeaconNodeURI: beaconNodeURI,
OutDir: outDir,
OutputTSV: outputTSV,
}

bidCollector := bidcollect.NewBidCollector(&opts)
Expand Down
2 changes: 1 addition & 1 deletion common/relayentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (r *RelayEntry) GetURI(path string) string {
// relayURL can be IP@PORT, PUBKEY@IP:PORT, https://IP, etc.
func NewRelayEntry(relayURL string, requireUser bool) (entry RelayEntry, err error) {
// Add protocol scheme prefix if it does not exist.
if !strings.HasPrefix(relayURL, "http") {
if !strings.Contains(relayURL, "://") {
relayURL = "https://" + relayURL
}

Expand Down
2 changes: 1 addition & 1 deletion common/ultrasoundbid.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func (n *U256) String() string {
return new(big.Int).SetBytes(ReverseBytes(n[:])).String()
}

type UltrasoundStreamBid struct {
type TopBidWebsocketStreamBid struct {
Timestamp uint64 `json:"timestamp"`
Slot uint64 `json:"slot"`
BlockNumber uint64 `json:"block_number"`
Expand Down
14 changes: 7 additions & 7 deletions common/ultrasoundbid_encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ import (
)

// MarshalSSZ ssz marshals the UltrasoundStreamBid object
func (u *UltrasoundStreamBid) MarshalSSZ() ([]byte, error) {
func (u *TopBidWebsocketStreamBid) MarshalSSZ() ([]byte, error) {
return ssz.MarshalSSZ(u)
}

// MarshalSSZTo ssz marshals the UltrasoundStreamBid object to a target array
func (u *UltrasoundStreamBid) MarshalSSZTo(buf []byte) (dst []byte, err error) {
func (u *TopBidWebsocketStreamBid) MarshalSSZTo(buf []byte) (dst []byte, err error) {
dst = buf

// Field (0) 'Timestamp'
Expand Down Expand Up @@ -41,7 +41,7 @@ func (u *UltrasoundStreamBid) MarshalSSZTo(buf []byte) (dst []byte, err error) {
}

// UnmarshalSSZ ssz unmarshals the UltrasoundStreamBid object
func (u *UltrasoundStreamBid) UnmarshalSSZ(buf []byte) error {
func (u *TopBidWebsocketStreamBid) UnmarshalSSZ(buf []byte) error {
var err error
size := uint64(len(buf))
if size != 188 {
Expand Down Expand Up @@ -76,18 +76,18 @@ func (u *UltrasoundStreamBid) UnmarshalSSZ(buf []byte) error {
}

// SizeSSZ returns the ssz encoded size in bytes for the UltrasoundStreamBid object
func (u *UltrasoundStreamBid) SizeSSZ() (size int) {
func (u *TopBidWebsocketStreamBid) SizeSSZ() (size int) {
size = 188
return
}

// HashTreeRoot ssz hashes the UltrasoundStreamBid object
func (u *UltrasoundStreamBid) HashTreeRoot() ([32]byte, error) {
func (u *TopBidWebsocketStreamBid) HashTreeRoot() ([32]byte, error) {
return ssz.HashWithDefaultHasher(u)
}

// HashTreeRootWith ssz hashes the UltrasoundStreamBid object with a hasher
func (u *UltrasoundStreamBid) HashTreeRootWith(hh ssz.HashWalker) (err error) {
func (u *TopBidWebsocketStreamBid) HashTreeRootWith(hh ssz.HashWalker) (err error) {
indx := hh.Index()

// Field (0) 'Timestamp'
Expand Down Expand Up @@ -119,6 +119,6 @@ func (u *UltrasoundStreamBid) HashTreeRootWith(hh ssz.HashWalker) (err error) {
}

// GetTree ssz hashes the UltrasoundStreamBid object
func (u *UltrasoundStreamBid) GetTree() (*ssz.Node, error) {
func (u *TopBidWebsocketStreamBid) GetTree() (*ssz.Node, error) {
return ssz.ProofTree(u)
}
2 changes: 1 addition & 1 deletion common/ultrasoundbid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestValueDecoding(t *testing.T) {
func TestUltrasoundBidSSZDecoding(t *testing.T) {
hex := "0x704b87ce8f010000a94b8c0000000000b6043101000000002c02b28fd8fdb45fd6ac43dd04adad1449a35b64247b1ed23a723a1fcf6cac074d0668c9e0912134628c32a54854b952234ebb6c1fdd6b053566ac2d2a09498da03b00ddb78b2c111450a5417a8c368c40f1f140cdf97d95b7fa9565467e0bbbe27877d08e01c69b4e5b02b144e6a265df99a0839818b3f120ebac9b73f82b617dc6a5556c71794b1a9c5400000000000000000000000000000000000000000000000000"
bytes := hexutil.MustDecode(hex)
bid := new(UltrasoundStreamBid)
bid := new(TopBidWebsocketStreamBid)
err := bid.UnmarshalSSZ(bytes)
require.NoError(t, err)

Expand Down
12 changes: 6 additions & 6 deletions docs/2024-06_bidcollect.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ For every day, there are two CSV files:

- `0`: [GetHeader polling](https://ethereum.github.io/builder-specs/#/Builder/getHeader)
- `1`: [Data API polling](https://flashbots.github.io/relay-specs/#/Data/getReceivedBids)
- `2`: [Ultrasound top-bid websocket stream](https://github.com/ultrasoundmoney/docs/blob/main/top-bid-websocket.md)
- `2`: [Top-bid websocket stream (Ultrasound + Aestus)](https://github.com/ultrasoundmoney/docs/blob/main/top-bid-websocket.md)

### Collected fields

| Field | Description | Source Types |
| ------------------------ | ---------------------------------------------------------- | ------------ |
| `source_type` | 0: GetHeader, 1: Data API, 2: Ultrasound stream | all |
| `source_type` | 0: GetHeader, 1: Data API, 2: Top-Bid WS Stream | all |
| `received_at_ms` | When the bid was first received by the relayscan collector | all |
| `timestamp_ms` | When the bid was received by the relay | 1 + 2 |
| `slot` | Slot the bid was submitted for | all |
Expand Down Expand Up @@ -51,7 +51,7 @@ For every day, there are two CSV files:
Source types:
- `0`: `GetHeader` polling
- `1`: Data API polling
- `2`: Ultrasound top-bid Websockets stream
- `2`: Top-bid Websocket stream

Different data sources have different limitations:

Expand All @@ -66,7 +66,7 @@ Different data sources have different limitations:
- Has all the necessary information
- Due to rate limits, we only poll at specific times
- Polling at t-4, t-2, t-0.5, t+0.5, t+2 (see also [`/services/bidcollect/data-api-poller.go`](/services/bidcollect/data-api-poller.go#64-69))
- Ultrasound websocket stream ([code](/services/bidcollect/ultrasound-stream.go):
- Top-bid websocket stream ([code](/services/bidcollect/top-bid-websocket-stream.go):
- doesn't expose optimistic, thus that field is always `false`

## Other notes
Expand All @@ -82,8 +82,8 @@ Different data sources have different limitations:
By default, the collector will output CSV into `<outdir>/<date>/<filename>.csv`

```bash
# Start data API and ultrasound stream collectors
go run . service bidcollect --data-api --ultrasound-stream --all-relays
# Start data API and top-bid websocket stream collectors
go run . service bidcollect --data-api --top-bid-ws-stream --all-relays

# GetHeader needs a beacon node too
go run . service bidcollect --get-header --beacon-uri http://localhost:3500 --all-relays
Expand Down
37 changes: 21 additions & 16 deletions services/bidcollect/bidcollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ type BidCollectorOpts struct {
Log *logrus.Entry
UID string

CollectUltrasoundStream bool
CollectGetHeader bool
CollectDataAPI bool
CollectTopBidWebsocketStream bool
CollectDataAPI bool
CollectGetHeader bool
BeaconNodeURI string // for getHeader

Relays []common.RelayEntry
BeaconNodeURI string // for getHeader
Relays []common.RelayEntry
TopBidWebsocketRelays []common.RelayEntry

OutDir string
OutputTSV bool
Expand All @@ -25,9 +26,9 @@ type BidCollector struct {
opts *BidCollectorOpts
log *logrus.Entry

ultrasoundBidC chan UltrasoundStreamBidsMsg
dataAPIBidC chan DataAPIPollerBidsMsg
getHeaderBidC chan GetHeaderPollerBidsMsg
topBidWebsocketC chan TopBidWebsocketStreamBidsMsg
dataAPIBidC chan DataAPIPollerBidsMsg
getHeaderBidC chan GetHeaderPollerBidsMsg

processor *BidProcessor
}
Expand All @@ -44,7 +45,7 @@ func NewBidCollector(opts *BidCollectorOpts) *BidCollector {

// inputs
c.dataAPIBidC = make(chan DataAPIPollerBidsMsg, bidCollectorInputChannelSize)
c.ultrasoundBidC = make(chan UltrasoundStreamBidsMsg, bidCollectorInputChannelSize)
c.topBidWebsocketC = make(chan TopBidWebsocketStreamBidsMsg, bidCollectorInputChannelSize)
c.getHeaderBidC = make(chan GetHeaderPollerBidsMsg, bidCollectorInputChannelSize)

// output
Expand Down Expand Up @@ -79,17 +80,21 @@ func (c *BidCollector) MustStart() {
go poller.Start()
}

if c.opts.CollectUltrasoundStream {
ultrasoundStream := NewUltrasoundStreamConnection(UltrasoundStreamOpts{
Log: c.log,
BidC: c.ultrasoundBidC,
})
go ultrasoundStream.Start()
if c.opts.CollectTopBidWebsocketStream {
for _, relay := range c.opts.TopBidWebsocketRelays {
c.log.Infof("Starting top bid websocket stream for %s...", relay.String())
topBidWebsocketStream := NewTopBidWebsocketStreamConnection(TopBidWebsocketStreamOpts{
Log: c.log,
Relay: relay,
BidC: c.topBidWebsocketC,
})
go topBidWebsocketStream.Start()
}
}

for {
select {
case bid := <-c.ultrasoundBidC:
case bid := <-c.topBidWebsocketC:
commonBid := UltrasoundStreamToCommonBid(&bid)
c.processor.processBids([]*CommonBid{commonBid})
case bids := <-c.dataAPIBidC:
Expand Down
10 changes: 2 additions & 8 deletions services/bidcollect/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,12 @@ const (
SourceTypeDataAPI = 1
SourceTypeUltrasoundStream = 2

ultrasoundStreamDefaultURL = "ws://relay-builders-eu.ultrasound.money/ws/v1/top_bid"
initialBackoffSec = 5
maxBackoffSec = 120
initialBackoffSec = 5
maxBackoffSec = 120

// bucketMinutes is the number of minutes to write into each CSV file (i.e. new file created for every X minutes bucket)
bucketMinutes = 60

// channel size for bid collector inputs
bidCollectorInputChannelSize = 1000
)

var (
// csvFileEnding = relaycommon.GetEnv("CSV_FILE_END", "tsv")
// csvSeparator = relaycommon.GetEnv("CSV_SEP", "\t")
)
98 changes: 98 additions & 0 deletions services/bidcollect/top-bid-websocket-stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package bidcollect

import (
"time"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/flashbots/relayscan/common"
"github.com/gorilla/websocket"
"github.com/sirupsen/logrus"
)

type TopBidWebsocketStreamBidsMsg struct {
Bid common.TopBidWebsocketStreamBid
Relay string
ReceivedAt time.Time
}

type TopBidWebsocketStreamOpts struct {
Log *logrus.Entry
Relay common.RelayEntry
BidC chan TopBidWebsocketStreamBidsMsg
}

type TopBidWebsocketStreamConnection struct {
log *logrus.Entry
relay common.RelayEntry
bidC chan TopBidWebsocketStreamBidsMsg
backoffSec int
}

func NewTopBidWebsocketStreamConnection(opts TopBidWebsocketStreamOpts) *TopBidWebsocketStreamConnection {
return &TopBidWebsocketStreamConnection{
log: opts.Log.WithField("uri", opts.Relay.String()),
relay: opts.Relay,
bidC: opts.BidC,
backoffSec: initialBackoffSec,
}
}

func (ustream *TopBidWebsocketStreamConnection) Start() {
ustream.connect()
}

func (ustream *TopBidWebsocketStreamConnection) reconnect() {
backoffDuration := time.Duration(ustream.backoffSec) * time.Second
ustream.log.Infof("[websocket-stream] reconnecting to websocket stream in %s sec ...", backoffDuration.String())
time.Sleep(backoffDuration)

// increase backoff timeout for next try
ustream.backoffSec *= 2
if ustream.backoffSec > maxBackoffSec {
ustream.backoffSec = maxBackoffSec
}

ustream.connect()
}

func (ustream *TopBidWebsocketStreamConnection) connect() {
ustream.log.Info("[websocket-stream] Starting bid stream...")

dialer := websocket.DefaultDialer
wsSubscriber, resp, err := dialer.Dial(ustream.relay.String(), nil)
if err != nil {
ustream.log.WithError(err).Error("[websocket-stream] failed to connect to websocket stream, reconnecting in a bit...")
go ustream.reconnect()
return
}
defer wsSubscriber.Close()
defer resp.Body.Close()

ustream.log.Info("[websocket-stream] stream connection successful")
ustream.backoffSec = initialBackoffSec // reset backoff timeout

bid := new(common.TopBidWebsocketStreamBid)

for {
_, nextNotification, err := wsSubscriber.ReadMessage()
if err != nil {
// Handle websocket errors, by closing and reconnecting. Errors seen previously:
ustream.log.WithError(err).Error("websocket stream websocket error")
go ustream.reconnect()
return
}

// Unmarshal SSZ
err = bid.UnmarshalSSZ(nextNotification)
if err != nil {
ustream.log.WithError(err).WithField("msg", hexutil.Encode(nextNotification)).Error("[websocket-stream] failed to unmarshal websocket stream message")
continue
}

ustream.bidC <- TopBidWebsocketStreamBidsMsg{
Bid: *bid,
Relay: ustream.relay.Hostname(),
ReceivedAt: time.Now().UTC(),
}
}
}
2 changes: 1 addition & 1 deletion services/bidcollect/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func boolToString(b bool) string {
return "false"
}

func UltrasoundStreamToCommonBid(bid *UltrasoundStreamBidsMsg) *CommonBid {
func UltrasoundStreamToCommonBid(bid *TopBidWebsocketStreamBidsMsg) *CommonBid {
blockHash := hexutil.Encode(bid.Bid.BlockHash[:])
parentHash := hexutil.Encode(bid.Bid.ParentHash[:])
builderPubkey := hexutil.Encode(bid.Bid.BuilderPubkey[:])
Expand Down
Loading

0 comments on commit 34dcdf5

Please sign in to comment.