From 34dcdf571e0e0293ab566e97b3a98fa84d9f74bc Mon Sep 17 00:00:00 2001 From: Chris Hager Date: Wed, 26 Jun 2024 17:34:22 +0200 Subject: [PATCH 1/2] make top-bid websocket stream generic --- Makefile | 1 - cmd/service/bidcollect.go | 34 ++++--- common/relayentry.go | 2 +- common/ultrasoundbid.go | 2 +- common/ultrasoundbid_encoding.go | 14 +-- common/ultrasoundbid_test.go | 2 +- docs/2024-06_bidcollect.md | 12 +-- services/bidcollect/bidcollector.go | 37 ++++--- services/bidcollect/consts.go | 10 +- .../bidcollect/top-bid-websocket-stream.go | 98 ++++++++++++++++++ services/bidcollect/types.go | 2 +- services/bidcollect/ultrasound-stream.go | 99 ------------------- vars/relays.go | 10 +- 13 files changed, 167 insertions(+), 156 deletions(-) create mode 100644 services/bidcollect/top-bid-websocket-stream.go delete mode 100644 services/bidcollect/ultrasound-stream.go diff --git a/Makefile b/Makefile index e239df3..7c19d55 100644 --- a/Makefile +++ b/Makefile @@ -51,7 +51,6 @@ cover-html: docker-image: DOCKER_BUILDKIT=1 docker build --platform linux/amd64 --build-arg VERSION=${VERSION} . -t relayscan - generate-ssz: rm -f common/ultrasoundbid_encoding.go sszgen --path common --objs UltrasoundStreamBid diff --git a/cmd/service/bidcollect.go b/cmd/service/bidcollect.go index 3650a6f..a4fc8bd 100644 --- a/cmd/service/bidcollect.go +++ b/cmd/service/bidcollect.go @@ -14,10 +14,10 @@ import ( ) var ( - collectUltrasoundStream bool - collectGetHeader bool - collectDataAPI bool - useAllRelays bool + collectTopBidWebsocketStream bool + collectGetHeader bool + collectDataAPI bool + useAllRelays bool outDir string outputTSV bool // by default: CSV, but can be changed to TSV with this setting @@ -32,7 +32,7 @@ var ( ) func init() { - bidCollectCmd.Flags().BoolVar(&collectUltrasoundStream, "ultrasound-stream", false, "use ultrasound top-bid stream") + bidCollectCmd.Flags().BoolVar(&collectTopBidWebsocketStream, "top-bid-ws-stream", false, "use top-bid websocket streams") bidCollectCmd.Flags().BoolVar(&collectGetHeader, "get-header", false, "use getHeader API") bidCollectCmd.Flags().BoolVar(&collectDataAPI, "data-api", false, "use data API") bidCollectCmd.Flags().BoolVar(&useAllRelays, "all-relays", false, "use all relays") @@ -93,16 +93,22 @@ var bidCollectCmd = &cobra.Command{ log.Infof("- relay #%d: %s", index+1, relay.Hostname()) } + topBidRelays := []common.RelayEntry{} + for _, url := range vars.TopBidStreamURLs { + topBidRelays = append(topBidRelays, common.MustNewRelayEntry(url, false)) + } + opts := bidcollect.BidCollectorOpts{ - Log: log, - UID: uid, - Relays: relays, - CollectUltrasoundStream: collectUltrasoundStream, - CollectGetHeader: collectGetHeader, - CollectDataAPI: collectDataAPI, - BeaconNodeURI: beaconNodeURI, - OutDir: outDir, - OutputTSV: outputTSV, + Log: log, + UID: uid, + Relays: relays, + CollectTopBidWebsocketStream: collectTopBidWebsocketStream, + TopBidWebsocketRelays: topBidRelays, + CollectGetHeader: collectGetHeader, + CollectDataAPI: collectDataAPI, + BeaconNodeURI: beaconNodeURI, + OutDir: outDir, + OutputTSV: outputTSV, } bidCollector := bidcollect.NewBidCollector(&opts) diff --git a/common/relayentry.go b/common/relayentry.go index 23fddf8..39f9bc8 100644 --- a/common/relayentry.go +++ b/common/relayentry.go @@ -31,7 +31,7 @@ func (r *RelayEntry) GetURI(path string) string { // relayURL can be IP@PORT, PUBKEY@IP:PORT, https://IP, etc. func NewRelayEntry(relayURL string, requireUser bool) (entry RelayEntry, err error) { // Add protocol scheme prefix if it does not exist. - if !strings.HasPrefix(relayURL, "http") { + if !strings.Contains(relayURL, "://") { relayURL = "https://" + relayURL } diff --git a/common/ultrasoundbid.go b/common/ultrasoundbid.go index 7a54d7c..c728b0a 100644 --- a/common/ultrasoundbid.go +++ b/common/ultrasoundbid.go @@ -16,7 +16,7 @@ func (n *U256) String() string { return new(big.Int).SetBytes(ReverseBytes(n[:])).String() } -type UltrasoundStreamBid struct { +type TopBidWebsocketStreamBid struct { Timestamp uint64 `json:"timestamp"` Slot uint64 `json:"slot"` BlockNumber uint64 `json:"block_number"` diff --git a/common/ultrasoundbid_encoding.go b/common/ultrasoundbid_encoding.go index 6857eb6..fb08736 100644 --- a/common/ultrasoundbid_encoding.go +++ b/common/ultrasoundbid_encoding.go @@ -5,12 +5,12 @@ import ( ) // MarshalSSZ ssz marshals the UltrasoundStreamBid object -func (u *UltrasoundStreamBid) MarshalSSZ() ([]byte, error) { +func (u *TopBidWebsocketStreamBid) MarshalSSZ() ([]byte, error) { return ssz.MarshalSSZ(u) } // MarshalSSZTo ssz marshals the UltrasoundStreamBid object to a target array -func (u *UltrasoundStreamBid) MarshalSSZTo(buf []byte) (dst []byte, err error) { +func (u *TopBidWebsocketStreamBid) MarshalSSZTo(buf []byte) (dst []byte, err error) { dst = buf // Field (0) 'Timestamp' @@ -41,7 +41,7 @@ func (u *UltrasoundStreamBid) MarshalSSZTo(buf []byte) (dst []byte, err error) { } // UnmarshalSSZ ssz unmarshals the UltrasoundStreamBid object -func (u *UltrasoundStreamBid) UnmarshalSSZ(buf []byte) error { +func (u *TopBidWebsocketStreamBid) UnmarshalSSZ(buf []byte) error { var err error size := uint64(len(buf)) if size != 188 { @@ -76,18 +76,18 @@ func (u *UltrasoundStreamBid) UnmarshalSSZ(buf []byte) error { } // SizeSSZ returns the ssz encoded size in bytes for the UltrasoundStreamBid object -func (u *UltrasoundStreamBid) SizeSSZ() (size int) { +func (u *TopBidWebsocketStreamBid) SizeSSZ() (size int) { size = 188 return } // HashTreeRoot ssz hashes the UltrasoundStreamBid object -func (u *UltrasoundStreamBid) HashTreeRoot() ([32]byte, error) { +func (u *TopBidWebsocketStreamBid) HashTreeRoot() ([32]byte, error) { return ssz.HashWithDefaultHasher(u) } // HashTreeRootWith ssz hashes the UltrasoundStreamBid object with a hasher -func (u *UltrasoundStreamBid) HashTreeRootWith(hh ssz.HashWalker) (err error) { +func (u *TopBidWebsocketStreamBid) HashTreeRootWith(hh ssz.HashWalker) (err error) { indx := hh.Index() // Field (0) 'Timestamp' @@ -119,6 +119,6 @@ func (u *UltrasoundStreamBid) HashTreeRootWith(hh ssz.HashWalker) (err error) { } // GetTree ssz hashes the UltrasoundStreamBid object -func (u *UltrasoundStreamBid) GetTree() (*ssz.Node, error) { +func (u *TopBidWebsocketStreamBid) GetTree() (*ssz.Node, error) { return ssz.ProofTree(u) } diff --git a/common/ultrasoundbid_test.go b/common/ultrasoundbid_test.go index 8314b97..2b9ef42 100644 --- a/common/ultrasoundbid_test.go +++ b/common/ultrasoundbid_test.go @@ -19,7 +19,7 @@ func TestValueDecoding(t *testing.T) { func TestUltrasoundBidSSZDecoding(t *testing.T) { hex := "0x704b87ce8f010000a94b8c0000000000b6043101000000002c02b28fd8fdb45fd6ac43dd04adad1449a35b64247b1ed23a723a1fcf6cac074d0668c9e0912134628c32a54854b952234ebb6c1fdd6b053566ac2d2a09498da03b00ddb78b2c111450a5417a8c368c40f1f140cdf97d95b7fa9565467e0bbbe27877d08e01c69b4e5b02b144e6a265df99a0839818b3f120ebac9b73f82b617dc6a5556c71794b1a9c5400000000000000000000000000000000000000000000000000" bytes := hexutil.MustDecode(hex) - bid := new(UltrasoundStreamBid) + bid := new(TopBidWebsocketStreamBid) err := bid.UnmarshalSSZ(bytes) require.NoError(t, err) diff --git a/docs/2024-06_bidcollect.md b/docs/2024-06_bidcollect.md index 1873010..901be1c 100644 --- a/docs/2024-06_bidcollect.md +++ b/docs/2024-06_bidcollect.md @@ -16,13 +16,13 @@ For every day, there are two CSV files: - `0`: [GetHeader polling](https://ethereum.github.io/builder-specs/#/Builder/getHeader) - `1`: [Data API polling](https://flashbots.github.io/relay-specs/#/Data/getReceivedBids) -- `2`: [Ultrasound top-bid websocket stream](https://github.com/ultrasoundmoney/docs/blob/main/top-bid-websocket.md) +- `2`: [Top-bid websocket stream (Ultrasound + Aestus)](https://github.com/ultrasoundmoney/docs/blob/main/top-bid-websocket.md) ### Collected fields | Field | Description | Source Types | | ------------------------ | ---------------------------------------------------------- | ------------ | -| `source_type` | 0: GetHeader, 1: Data API, 2: Ultrasound stream | all | +| `source_type` | 0: GetHeader, 1: Data API, 2: Top-Bid WS Stream | all | | `received_at_ms` | When the bid was first received by the relayscan collector | all | | `timestamp_ms` | When the bid was received by the relay | 1 + 2 | | `slot` | Slot the bid was submitted for | all | @@ -51,7 +51,7 @@ For every day, there are two CSV files: Source types: - `0`: `GetHeader` polling - `1`: Data API polling -- `2`: Ultrasound top-bid Websockets stream +- `2`: Top-bid Websocket stream Different data sources have different limitations: @@ -66,7 +66,7 @@ Different data sources have different limitations: - Has all the necessary information - Due to rate limits, we only poll at specific times - Polling at t-4, t-2, t-0.5, t+0.5, t+2 (see also [`/services/bidcollect/data-api-poller.go`](/services/bidcollect/data-api-poller.go#64-69)) -- Ultrasound websocket stream ([code](/services/bidcollect/ultrasound-stream.go): +- Top-bid websocket stream ([code](/services/bidcollect/top-bid-websocket-stream.go): - doesn't expose optimistic, thus that field is always `false` ## Other notes @@ -82,8 +82,8 @@ Different data sources have different limitations: By default, the collector will output CSV into `//.csv` ```bash -# Start data API and ultrasound stream collectors -go run . service bidcollect --data-api --ultrasound-stream --all-relays +# Start data API and top-bid websocket stream collectors +go run . service bidcollect --data-api --top-bid-ws-stream --all-relays # GetHeader needs a beacon node too go run . service bidcollect --get-header --beacon-uri http://localhost:3500 --all-relays diff --git a/services/bidcollect/bidcollector.go b/services/bidcollect/bidcollector.go index 16c7625..45a8a20 100644 --- a/services/bidcollect/bidcollector.go +++ b/services/bidcollect/bidcollector.go @@ -10,12 +10,13 @@ type BidCollectorOpts struct { Log *logrus.Entry UID string - CollectUltrasoundStream bool - CollectGetHeader bool - CollectDataAPI bool + CollectTopBidWebsocketStream bool + CollectDataAPI bool + CollectGetHeader bool + BeaconNodeURI string // for getHeader - Relays []common.RelayEntry - BeaconNodeURI string // for getHeader + Relays []common.RelayEntry + TopBidWebsocketRelays []common.RelayEntry OutDir string OutputTSV bool @@ -25,9 +26,9 @@ type BidCollector struct { opts *BidCollectorOpts log *logrus.Entry - ultrasoundBidC chan UltrasoundStreamBidsMsg - dataAPIBidC chan DataAPIPollerBidsMsg - getHeaderBidC chan GetHeaderPollerBidsMsg + topBidWebsocketC chan TopBidWebsocketStreamBidsMsg + dataAPIBidC chan DataAPIPollerBidsMsg + getHeaderBidC chan GetHeaderPollerBidsMsg processor *BidProcessor } @@ -44,7 +45,7 @@ func NewBidCollector(opts *BidCollectorOpts) *BidCollector { // inputs c.dataAPIBidC = make(chan DataAPIPollerBidsMsg, bidCollectorInputChannelSize) - c.ultrasoundBidC = make(chan UltrasoundStreamBidsMsg, bidCollectorInputChannelSize) + c.topBidWebsocketC = make(chan TopBidWebsocketStreamBidsMsg, bidCollectorInputChannelSize) c.getHeaderBidC = make(chan GetHeaderPollerBidsMsg, bidCollectorInputChannelSize) // output @@ -79,17 +80,21 @@ func (c *BidCollector) MustStart() { go poller.Start() } - if c.opts.CollectUltrasoundStream { - ultrasoundStream := NewUltrasoundStreamConnection(UltrasoundStreamOpts{ - Log: c.log, - BidC: c.ultrasoundBidC, - }) - go ultrasoundStream.Start() + if c.opts.CollectTopBidWebsocketStream { + for _, relay := range c.opts.TopBidWebsocketRelays { + c.log.Infof("Starting top bid websocket stream for %s...", relay.String()) + topBidWebsocketStream := NewTopBidWebsocketStreamConnection(TopBidWebsocketStreamOpts{ + Log: c.log, + Relay: relay, + BidC: c.topBidWebsocketC, + }) + go topBidWebsocketStream.Start() + } } for { select { - case bid := <-c.ultrasoundBidC: + case bid := <-c.topBidWebsocketC: commonBid := UltrasoundStreamToCommonBid(&bid) c.processor.processBids([]*CommonBid{commonBid}) case bids := <-c.dataAPIBidC: diff --git a/services/bidcollect/consts.go b/services/bidcollect/consts.go index 16df1cd..f5142fc 100644 --- a/services/bidcollect/consts.go +++ b/services/bidcollect/consts.go @@ -5,9 +5,8 @@ const ( SourceTypeDataAPI = 1 SourceTypeUltrasoundStream = 2 - ultrasoundStreamDefaultURL = "ws://relay-builders-eu.ultrasound.money/ws/v1/top_bid" - initialBackoffSec = 5 - maxBackoffSec = 120 + initialBackoffSec = 5 + maxBackoffSec = 120 // bucketMinutes is the number of minutes to write into each CSV file (i.e. new file created for every X minutes bucket) bucketMinutes = 60 @@ -15,8 +14,3 @@ const ( // channel size for bid collector inputs bidCollectorInputChannelSize = 1000 ) - -var ( -// csvFileEnding = relaycommon.GetEnv("CSV_FILE_END", "tsv") -// csvSeparator = relaycommon.GetEnv("CSV_SEP", "\t") -) diff --git a/services/bidcollect/top-bid-websocket-stream.go b/services/bidcollect/top-bid-websocket-stream.go new file mode 100644 index 0000000..cefcc32 --- /dev/null +++ b/services/bidcollect/top-bid-websocket-stream.go @@ -0,0 +1,98 @@ +package bidcollect + +import ( + "time" + + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/flashbots/relayscan/common" + "github.com/gorilla/websocket" + "github.com/sirupsen/logrus" +) + +type TopBidWebsocketStreamBidsMsg struct { + Bid common.TopBidWebsocketStreamBid + Relay string + ReceivedAt time.Time +} + +type TopBidWebsocketStreamOpts struct { + Log *logrus.Entry + Relay common.RelayEntry + BidC chan TopBidWebsocketStreamBidsMsg +} + +type TopBidWebsocketStreamConnection struct { + log *logrus.Entry + relay common.RelayEntry + bidC chan TopBidWebsocketStreamBidsMsg + backoffSec int +} + +func NewTopBidWebsocketStreamConnection(opts TopBidWebsocketStreamOpts) *TopBidWebsocketStreamConnection { + return &TopBidWebsocketStreamConnection{ + log: opts.Log.WithField("uri", opts.Relay.String()), + relay: opts.Relay, + bidC: opts.BidC, + backoffSec: initialBackoffSec, + } +} + +func (ustream *TopBidWebsocketStreamConnection) Start() { + ustream.connect() +} + +func (ustream *TopBidWebsocketStreamConnection) reconnect() { + backoffDuration := time.Duration(ustream.backoffSec) * time.Second + ustream.log.Infof("[websocket-stream] reconnecting to websocket stream in %s sec ...", backoffDuration.String()) + time.Sleep(backoffDuration) + + // increase backoff timeout for next try + ustream.backoffSec *= 2 + if ustream.backoffSec > maxBackoffSec { + ustream.backoffSec = maxBackoffSec + } + + ustream.connect() +} + +func (ustream *TopBidWebsocketStreamConnection) connect() { + ustream.log.Info("[websocket-stream] Starting bid stream...") + + dialer := websocket.DefaultDialer + wsSubscriber, resp, err := dialer.Dial(ustream.relay.String(), nil) + if err != nil { + ustream.log.WithError(err).Error("[websocket-stream] failed to connect to websocket stream, reconnecting in a bit...") + go ustream.reconnect() + return + } + defer wsSubscriber.Close() + defer resp.Body.Close() + + ustream.log.Info("[websocket-stream] stream connection successful") + ustream.backoffSec = initialBackoffSec // reset backoff timeout + + bid := new(common.TopBidWebsocketStreamBid) + + for { + _, nextNotification, err := wsSubscriber.ReadMessage() + if err != nil { + // Handle websocket errors, by closing and reconnecting. Errors seen previously: + ustream.log.WithError(err).Error("websocket stream websocket error") + go ustream.reconnect() + return + } + + // Unmarshal SSZ + err = bid.UnmarshalSSZ(nextNotification) + if err != nil { + ustream.log.WithError(err).WithField("msg", hexutil.Encode(nextNotification)).Error("[websocket-stream] failed to unmarshal websocket stream message") + continue + } + + ustream.bidC <- TopBidWebsocketStreamBidsMsg{ + Bid: *bid, + Relay: ustream.relay.Hostname(), + ReceivedAt: time.Now().UTC(), + } + } +} diff --git a/services/bidcollect/types.go b/services/bidcollect/types.go index 25ecffa..31fd2ff 100644 --- a/services/bidcollect/types.go +++ b/services/bidcollect/types.go @@ -125,7 +125,7 @@ func boolToString(b bool) string { return "false" } -func UltrasoundStreamToCommonBid(bid *UltrasoundStreamBidsMsg) *CommonBid { +func UltrasoundStreamToCommonBid(bid *TopBidWebsocketStreamBidsMsg) *CommonBid { blockHash := hexutil.Encode(bid.Bid.BlockHash[:]) parentHash := hexutil.Encode(bid.Bid.ParentHash[:]) builderPubkey := hexutil.Encode(bid.Bid.BuilderPubkey[:]) diff --git a/services/bidcollect/ultrasound-stream.go b/services/bidcollect/ultrasound-stream.go deleted file mode 100644 index 97be61c..0000000 --- a/services/bidcollect/ultrasound-stream.go +++ /dev/null @@ -1,99 +0,0 @@ -package bidcollect - -import ( - "time" - - "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/flashbots/relayscan/common" - "github.com/gorilla/websocket" - "github.com/sirupsen/logrus" -) - -type UltrasoundStreamBidsMsg struct { - Bid common.UltrasoundStreamBid - Relay string - ReceivedAt time.Time -} - -type UltrasoundStreamOpts struct { - Log *logrus.Entry - BidC chan UltrasoundStreamBidsMsg -} - -type UltrasoundStreamConnection struct { - log *logrus.Entry - url string - bidC chan UltrasoundStreamBidsMsg - backoffSec int -} - -func NewUltrasoundStreamConnection(opts UltrasoundStreamOpts) *UltrasoundStreamConnection { - return &UltrasoundStreamConnection{ - log: opts.Log, - url: ultrasoundStreamDefaultURL, - bidC: opts.BidC, - backoffSec: initialBackoffSec, - } -} - -func (ustream *UltrasoundStreamConnection) Start() { - ustream.connect() -} - -func (ustream *UltrasoundStreamConnection) reconnect() { - backoffDuration := time.Duration(ustream.backoffSec) * time.Second - ustream.log.Infof("[ultrasounds-stream] reconnecting to ultrasound stream in %s sec ...", backoffDuration.String()) - time.Sleep(backoffDuration) - - // increase backoff timeout for next try - ustream.backoffSec *= 2 - if ustream.backoffSec > maxBackoffSec { - ustream.backoffSec = maxBackoffSec - } - - ustream.connect() -} - -func (ustream *UltrasoundStreamConnection) connect() { - ustream.log.WithField("uri", ustream.url).Info("[ultrasounds-stream] Starting bid stream...") - - dialer := websocket.DefaultDialer - wsSubscriber, resp, err := dialer.Dial(ustream.url, nil) - if err != nil { - ustream.log.WithError(err).Error("[ultrasounds-stream] failed to connect to bloxroute, reconnecting in a bit...") - go ustream.reconnect() - return - } - defer wsSubscriber.Close() - defer resp.Body.Close() - - ustream.log.Info("[ultrasounds-stream] stream connection successful") - ustream.backoffSec = initialBackoffSec // reset backoff timeout - - bid := new(common.UltrasoundStreamBid) - - for { - _, nextNotification, err := wsSubscriber.ReadMessage() - if err != nil { - // Handle websocket errors, by closing and reconnecting. Errors seen previously: - ustream.log.WithError(err).Error("ultrasound stream websocket error") - go ustream.reconnect() - return - } - - // nc.log.WithField("msg", hexutil.Encode(nextNotification)).Info("got message from ultrasound stream") - - // Unmarshal SSZ - err = bid.UnmarshalSSZ(nextNotification) - if err != nil { - ustream.log.WithError(err).WithField("msg", hexutil.Encode(nextNotification)).Error("[ultrasounds-stream] failed to unmarshal ultrasound stream message") - continue - } - - ustream.bidC <- UltrasoundStreamBidsMsg{ - Bid: *bid, - Relay: "relay.ultrasound.money", - ReceivedAt: time.Now().UTC(), - } - } -} diff --git a/vars/relays.go b/vars/relays.go index a66fd87..878942d 100644 --- a/vars/relays.go +++ b/vars/relays.go @@ -13,9 +13,17 @@ var ( "https://0xa7ab7a996c8584251c8f925da3170bdfd6ebc75d50f5ddc4050a6fdc77f2a3b5fce2cc750d0865e05d7228af97d69561@agnostic-relay.net", "https://0xa15b52576bcbf1072f4a011c0f99f9fb6c66f3e1ff321f11f461d15e31b1cb359caa092c71bbded0bae5b5ea401aab7e@aestus.live", "https://0x8c7d33605ecef85403f8b7289c8058f440cbb6bf72b055dfe2f3e2c6695b6a1ea5a9cd0eb3a7982927a463feb4c3dae2@relay.wenmerge.com", + "https://0x8c4ed5e24fe5c6ae21018437bde147693f68cda427cd1122cf20819c30eda7ed74f72dece09bb313f2a1855595ab677d@titanrelay.xyz", // added 2024-02-22 + + // deprecated relays: // "https://0x95a0a6af2566fa7db732020bb2724be61963ac1eb760aa1046365eb443bd4e3cc0fba0265d40a2d81dd94366643e986a@blockspace.frontier.tech", // data API doesn't work anymore (as of June 1, 2024) // "https://0xad0a8bb54565c2211cee576363f3a347089d2f07cf72679d16911d740262694cadb62d7fd7483f27afd714ca0f1b9118@bloxroute.ethical.blxrbdn.com", // deactivated aug 2023: https://twitter.com/bloXrouteLabs/status/1690065892778926080 // "https://0x9000009807ed12c1f08bf4e81c6da3ba8e3fc3d953898ce0102433094e5f22f21102ec057841fcb81978ed1ea0fa8246@builder-relay-mainnet.blocknative.com", // deactivated sept. 27, 2023: https://twitter.com/blocknative/status/1706685103286485364 - "https://0x8c4ed5e24fe5c6ae21018437bde147693f68cda427cd1122cf20819c30eda7ed74f72dece09bb313f2a1855595ab677d@titanrelay.xyz", // added 2024-02-22 + } + + // Top Bid Stream URLs (SSZ): https://github.com/ultrasoundmoney/docs/blob/main/top-bid-websocket.md + TopBidStreamURLs = []string{ + "ws://relay-builders-eu.ultrasound.money/ws/v1/top_bid", + "wss://aestus.live/ws/v1/bids?format=ssz", } ) From 0b0fb588018165b630a7a1b3c90c97b4cf92f62c Mon Sep 17 00:00:00 2001 From: Chris Hager Date: Wed, 26 Jun 2024 20:31:19 +0200 Subject: [PATCH 2/2] cleanup --- Makefile | 4 +- cmd/service/bidcollect.go | 4 - .../{ultrasoundbid.go => top-bid-ws-bid.go} | 0 common/top-bid-ws-bid_encoding.go | 126 ++++++++++++++++++ ...oundbid_test.go => top-bid-ws-bid_test.go} | 2 +- common/ultrasoundbid_encoding.go | 124 ----------------- services/bidcollect/bidcollector.go | 2 +- services/bidcollect/consts.go | 6 +- services/bidcollect/types.go | 8 +- services/bidcollect/types_test.go | 2 +- 10 files changed, 138 insertions(+), 140 deletions(-) rename common/{ultrasoundbid.go => top-bid-ws-bid.go} (100%) create mode 100644 common/top-bid-ws-bid_encoding.go rename common/{ultrasoundbid_test.go => top-bid-ws-bid_test.go} (95%) delete mode 100644 common/ultrasoundbid_encoding.go diff --git a/Makefile b/Makefile index 7c19d55..855d6ed 100644 --- a/Makefile +++ b/Makefile @@ -52,8 +52,8 @@ docker-image: DOCKER_BUILDKIT=1 docker build --platform linux/amd64 --build-arg VERSION=${VERSION} . -t relayscan generate-ssz: - rm -f common/ultrasoundbid_encoding.go - sszgen --path common --objs UltrasoundStreamBid + rm -f common/top-bid-ws-bid_encoding.go + sszgen --path common --objs TopBidWebsocketStreamBid update-bids-website: go run . service bidcollect --build-website --build-website-upload diff --git a/cmd/service/bidcollect.go b/cmd/service/bidcollect.go index a4fc8bd..f6ff009 100644 --- a/cmd/service/bidcollect.go +++ b/cmd/service/bidcollect.go @@ -1,9 +1,5 @@ package service -/** - * https://github.com/ultrasoundmoney/docs/blob/main/top-bid-websocket.md - */ - import ( "github.com/flashbots/relayscan/common" "github.com/flashbots/relayscan/services/bidcollect" diff --git a/common/ultrasoundbid.go b/common/top-bid-ws-bid.go similarity index 100% rename from common/ultrasoundbid.go rename to common/top-bid-ws-bid.go diff --git a/common/top-bid-ws-bid_encoding.go b/common/top-bid-ws-bid_encoding.go new file mode 100644 index 0000000..7d01218 --- /dev/null +++ b/common/top-bid-ws-bid_encoding.go @@ -0,0 +1,126 @@ +// Code generated by fastssz. DO NOT EDIT. +// Hash: bf0af0358455608406d75e88b42f6733f2953e08fc502fe12bdb1a3a125a09e7 +package common + +import ( + ssz "github.com/ferranbt/fastssz" +) + +// MarshalSSZ ssz marshals the TopBidWebsocketStreamBid object +func (t *TopBidWebsocketStreamBid) MarshalSSZ() ([]byte, error) { + return ssz.MarshalSSZ(t) +} + +// MarshalSSZTo ssz marshals the TopBidWebsocketStreamBid object to a target array +func (t *TopBidWebsocketStreamBid) MarshalSSZTo(buf []byte) (dst []byte, err error) { + dst = buf + + // Field (0) 'Timestamp' + dst = ssz.MarshalUint64(dst, t.Timestamp) + + // Field (1) 'Slot' + dst = ssz.MarshalUint64(dst, t.Slot) + + // Field (2) 'BlockNumber' + dst = ssz.MarshalUint64(dst, t.BlockNumber) + + // Field (3) 'BlockHash' + dst = append(dst, t.BlockHash[:]...) + + // Field (4) 'ParentHash' + dst = append(dst, t.ParentHash[:]...) + + // Field (5) 'BuilderPubkey' + dst = append(dst, t.BuilderPubkey[:]...) + + // Field (6) 'FeeRecipient' + dst = append(dst, t.FeeRecipient[:]...) + + // Field (7) 'Value' + dst = append(dst, t.Value[:]...) + + return +} + +// UnmarshalSSZ ssz unmarshals the TopBidWebsocketStreamBid object +func (t *TopBidWebsocketStreamBid) UnmarshalSSZ(buf []byte) error { + var err error + size := uint64(len(buf)) + if size != 188 { + return ssz.ErrSize + } + + // Field (0) 'Timestamp' + t.Timestamp = ssz.UnmarshallUint64(buf[0:8]) + + // Field (1) 'Slot' + t.Slot = ssz.UnmarshallUint64(buf[8:16]) + + // Field (2) 'BlockNumber' + t.BlockNumber = ssz.UnmarshallUint64(buf[16:24]) + + // Field (3) 'BlockHash' + copy(t.BlockHash[:], buf[24:56]) + + // Field (4) 'ParentHash' + copy(t.ParentHash[:], buf[56:88]) + + // Field (5) 'BuilderPubkey' + copy(t.BuilderPubkey[:], buf[88:136]) + + // Field (6) 'FeeRecipient' + copy(t.FeeRecipient[:], buf[136:156]) + + // Field (7) 'Value' + copy(t.Value[:], buf[156:188]) + + return err +} + +// SizeSSZ returns the ssz encoded size in bytes for the TopBidWebsocketStreamBid object +func (t *TopBidWebsocketStreamBid) SizeSSZ() (size int) { + size = 188 + return +} + +// HashTreeRoot ssz hashes the TopBidWebsocketStreamBid object +func (t *TopBidWebsocketStreamBid) HashTreeRoot() ([32]byte, error) { + return ssz.HashWithDefaultHasher(t) +} + +// HashTreeRootWith ssz hashes the TopBidWebsocketStreamBid object with a hasher +func (t *TopBidWebsocketStreamBid) HashTreeRootWith(hh ssz.HashWalker) (err error) { + indx := hh.Index() + + // Field (0) 'Timestamp' + hh.PutUint64(t.Timestamp) + + // Field (1) 'Slot' + hh.PutUint64(t.Slot) + + // Field (2) 'BlockNumber' + hh.PutUint64(t.BlockNumber) + + // Field (3) 'BlockHash' + hh.PutBytes(t.BlockHash[:]) + + // Field (4) 'ParentHash' + hh.PutBytes(t.ParentHash[:]) + + // Field (5) 'BuilderPubkey' + hh.PutBytes(t.BuilderPubkey[:]) + + // Field (6) 'FeeRecipient' + hh.PutBytes(t.FeeRecipient[:]) + + // Field (7) 'Value' + hh.PutBytes(t.Value[:]) + + hh.Merkleize(indx) + return +} + +// GetTree ssz hashes the TopBidWebsocketStreamBid object +func (t *TopBidWebsocketStreamBid) GetTree() (*ssz.Node, error) { + return ssz.ProofTree(t) +} diff --git a/common/ultrasoundbid_test.go b/common/top-bid-ws-bid_test.go similarity index 95% rename from common/ultrasoundbid_test.go rename to common/top-bid-ws-bid_test.go index 2b9ef42..3e068c6 100644 --- a/common/ultrasoundbid_test.go +++ b/common/top-bid-ws-bid_test.go @@ -16,7 +16,7 @@ func TestValueDecoding(t *testing.T) { require.Equal(t, expected, value) } -func TestUltrasoundBidSSZDecoding(t *testing.T) { +func TestTopBidWSStreamBidSSZDecoding(t *testing.T) { hex := "0x704b87ce8f010000a94b8c0000000000b6043101000000002c02b28fd8fdb45fd6ac43dd04adad1449a35b64247b1ed23a723a1fcf6cac074d0668c9e0912134628c32a54854b952234ebb6c1fdd6b053566ac2d2a09498da03b00ddb78b2c111450a5417a8c368c40f1f140cdf97d95b7fa9565467e0bbbe27877d08e01c69b4e5b02b144e6a265df99a0839818b3f120ebac9b73f82b617dc6a5556c71794b1a9c5400000000000000000000000000000000000000000000000000" bytes := hexutil.MustDecode(hex) bid := new(TopBidWebsocketStreamBid) diff --git a/common/ultrasoundbid_encoding.go b/common/ultrasoundbid_encoding.go deleted file mode 100644 index fb08736..0000000 --- a/common/ultrasoundbid_encoding.go +++ /dev/null @@ -1,124 +0,0 @@ -package common - -import ( - ssz "github.com/ferranbt/fastssz" -) - -// MarshalSSZ ssz marshals the UltrasoundStreamBid object -func (u *TopBidWebsocketStreamBid) MarshalSSZ() ([]byte, error) { - return ssz.MarshalSSZ(u) -} - -// MarshalSSZTo ssz marshals the UltrasoundStreamBid object to a target array -func (u *TopBidWebsocketStreamBid) MarshalSSZTo(buf []byte) (dst []byte, err error) { - dst = buf - - // Field (0) 'Timestamp' - dst = ssz.MarshalUint64(dst, u.Timestamp) - - // Field (1) 'Slot' - dst = ssz.MarshalUint64(dst, u.Slot) - - // Field (2) 'BlockNumber' - dst = ssz.MarshalUint64(dst, u.BlockNumber) - - // Field (3) 'BlockHash' - dst = append(dst, u.BlockHash[:]...) - - // Field (4) 'ParentHash' - dst = append(dst, u.ParentHash[:]...) - - // Field (5) 'BuilderPubkey' - dst = append(dst, u.BuilderPubkey[:]...) - - // Field (6) 'FeeRecipient' - dst = append(dst, u.FeeRecipient[:]...) - - // Field (7) 'Value' - dst = append(dst, u.Value[:]...) - - return -} - -// UnmarshalSSZ ssz unmarshals the UltrasoundStreamBid object -func (u *TopBidWebsocketStreamBid) UnmarshalSSZ(buf []byte) error { - var err error - size := uint64(len(buf)) - if size != 188 { - return ssz.ErrSize - } - - // Field (0) 'Timestamp' - u.Timestamp = ssz.UnmarshallUint64(buf[0:8]) - - // Field (1) 'Slot' - u.Slot = ssz.UnmarshallUint64(buf[8:16]) - - // Field (2) 'BlockNumber' - u.BlockNumber = ssz.UnmarshallUint64(buf[16:24]) - - // Field (3) 'BlockHash' - copy(u.BlockHash[:], buf[24:56]) - - // Field (4) 'ParentHash' - copy(u.ParentHash[:], buf[56:88]) - - // Field (5) 'BuilderPubkey' - copy(u.BuilderPubkey[:], buf[88:136]) - - // Field (6) 'FeeRecipient' - copy(u.FeeRecipient[:], buf[136:156]) - - // Field (7) 'Value' - copy(u.Value[:], buf[156:188]) - - return err -} - -// SizeSSZ returns the ssz encoded size in bytes for the UltrasoundStreamBid object -func (u *TopBidWebsocketStreamBid) SizeSSZ() (size int) { - size = 188 - return -} - -// HashTreeRoot ssz hashes the UltrasoundStreamBid object -func (u *TopBidWebsocketStreamBid) HashTreeRoot() ([32]byte, error) { - return ssz.HashWithDefaultHasher(u) -} - -// HashTreeRootWith ssz hashes the UltrasoundStreamBid object with a hasher -func (u *TopBidWebsocketStreamBid) HashTreeRootWith(hh ssz.HashWalker) (err error) { - indx := hh.Index() - - // Field (0) 'Timestamp' - hh.PutUint64(u.Timestamp) - - // Field (1) 'Slot' - hh.PutUint64(u.Slot) - - // Field (2) 'BlockNumber' - hh.PutUint64(u.BlockNumber) - - // Field (3) 'BlockHash' - hh.PutBytes(u.BlockHash[:]) - - // Field (4) 'ParentHash' - hh.PutBytes(u.ParentHash[:]) - - // Field (5) 'BuilderPubkey' - hh.PutBytes(u.BuilderPubkey[:]) - - // Field (6) 'FeeRecipient' - hh.PutBytes(u.FeeRecipient[:]) - - // Field (7) 'Value' - hh.PutBytes(u.Value[:]) - - hh.Merkleize(indx) - return -} - -// GetTree ssz hashes the UltrasoundStreamBid object -func (u *TopBidWebsocketStreamBid) GetTree() (*ssz.Node, error) { - return ssz.ProofTree(u) -} diff --git a/services/bidcollect/bidcollector.go b/services/bidcollect/bidcollector.go index 45a8a20..67e76ad 100644 --- a/services/bidcollect/bidcollector.go +++ b/services/bidcollect/bidcollector.go @@ -95,7 +95,7 @@ func (c *BidCollector) MustStart() { for { select { case bid := <-c.topBidWebsocketC: - commonBid := UltrasoundStreamToCommonBid(&bid) + commonBid := TopBidWebsocketStreamToCommonBid(&bid) c.processor.processBids([]*CommonBid{commonBid}) case bids := <-c.dataAPIBidC: commonBids := DataAPIToCommonBids(bids) diff --git a/services/bidcollect/consts.go b/services/bidcollect/consts.go index f5142fc..b190a47 100644 --- a/services/bidcollect/consts.go +++ b/services/bidcollect/consts.go @@ -1,9 +1,9 @@ package bidcollect const ( - SourceTypeGetHeader = 0 - SourceTypeDataAPI = 1 - SourceTypeUltrasoundStream = 2 + SourceTypeGetHeader = 0 + SourceTypeDataAPI = 1 + SourceTypeTopBidWSStream = 2 initialBackoffSec = 5 maxBackoffSec = 120 diff --git a/services/bidcollect/types.go b/services/bidcollect/types.go index 31fd2ff..d72e66b 100644 --- a/services/bidcollect/types.go +++ b/services/bidcollect/types.go @@ -43,7 +43,7 @@ type CommonBid struct { BuilderPubkey string `json:"builder_pubkey"` Value string `json:"value"` - // Ultrasound top-bid stream - https://github.com/ultrasoundmoney/docs/blob/main/top-bid-websocket.md + // Top-bid WS stream - https://github.com/ultrasoundmoney/docs/blob/main/top-bid-websocket.md BlockFeeRecipient string `json:"block_fee_recipient"` // Data API @@ -101,7 +101,7 @@ func (bid *CommonBid) ToCSVFields() []string { bid.BuilderPubkey, fmt.Sprint(bid.BlockNumber), - // Ultrasound top-bid stream + // Top-bid WS stream bid.BlockFeeRecipient, // Relay is common too @@ -125,14 +125,14 @@ func boolToString(b bool) string { return "false" } -func UltrasoundStreamToCommonBid(bid *TopBidWebsocketStreamBidsMsg) *CommonBid { +func TopBidWebsocketStreamToCommonBid(bid *TopBidWebsocketStreamBidsMsg) *CommonBid { blockHash := hexutil.Encode(bid.Bid.BlockHash[:]) parentHash := hexutil.Encode(bid.Bid.ParentHash[:]) builderPubkey := hexutil.Encode(bid.Bid.BuilderPubkey[:]) blockFeeRecipient := hexutil.Encode(bid.Bid.FeeRecipient[:]) return &CommonBid{ - SourceType: SourceTypeUltrasoundStream, + SourceType: SourceTypeTopBidWSStream, ReceivedAtMs: bid.ReceivedAt.UnixMilli(), TimestampMs: int64(bid.Bid.Timestamp), diff --git a/services/bidcollect/types_test.go b/services/bidcollect/types_test.go index 2b61b4b..26697c6 100644 --- a/services/bidcollect/types_test.go +++ b/services/bidcollect/types_test.go @@ -9,5 +9,5 @@ import ( func TestSourceTypes(t *testing.T) { require.Equal(t, 0, SourceTypeGetHeader) require.Equal(t, 1, SourceTypeDataAPI) - require.Equal(t, 2, SourceTypeUltrasoundStream) + require.Equal(t, 2, SourceTypeTopBidWSStream) }