diff --git a/cmd/main.go b/cmd/main.go index fdaad45..c1eafa4 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -68,10 +68,9 @@ var ( }) optionL1RPCUrl = altsrc.NewStringFlag(&cli.StringFlag{ - Name: "l1-rpc-url", - Usage: "URL for L1 RPC", - EnvVars: []string{"MEV_ORACLE_L1_RPC_URL"}, - Required: true, + Name: "l1-rpc-url", + Usage: "URL for L1 RPC", + EnvVars: []string{"MEV_ORACLE_L1_RPC_URL"}, }) optionSettlementRPCUrl = altsrc.NewStringFlag(&cli.StringFlag{ diff --git a/integrationtest/config.yaml b/integrationtest/config.yaml index 572d263..b1e59ab 100644 --- a/integrationtest/config.yaml +++ b/integrationtest/config.yaml @@ -1,18 +1,17 @@ -priv_key_file: /key -keystore_path: /keystore -keystore_password: primev -log_level: debug -l1_rpc_url: -settlement_rpc_url: http://sl-bootnode:8545 -oracle_contract_addr: 0xF82E60097df8A1c2c58dC95ce08fe00F30717beB -preconf_contract_addr: 0x451656c1E7eDf82397EBE04f38819c9970AA3658 -pg_host: oracle-db -pg_port: 5432 -pg_user: oracle_user -pg_password: oracle_pass -pg_dbname: oracle_db -laggerd_mode: 10 -override_winners: +priv-key-file: /key +keystore-path: /keystore +keystore-password: primev +log-level: debug +l1-rpc-url: +settlement-rpc-url: http://sl-bootnode:8545 +pg-host: oracle-db +pg-port: 5432 +pg-user: oracle_user +pg-password: oracle_pass +pg-dbname: oracle_db +laggerd-mode: 10 +override-winners: - 0x48ddC642514370bdaFAd81C91e23759B0302C915 - 0x972eb4Fc3c457da4C957306bE7Fa1976BB8F39A6 - 0xA1e8FDB3bb6A0DB7aA5Db49a3512B01671686DCB + - 0xdeadbeef00000000000000000000000000000000 diff --git a/pkg/settler/settler.go b/pkg/settler/settler.go index a1e4fcd..e9d70e8 100644 --- a/pkg/settler/settler.go +++ b/pkg/settler/settler.go @@ -3,7 +3,10 @@ package settler import ( "context" "errors" + "fmt" "math/big" + "strings" + "sync" "time" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -12,6 +15,7 @@ import ( "github.com/primevprotocol/mev-oracle/pkg/keysigner" "github.com/prometheus/client_golang/prometheus" "github.com/rs/zerolog/log" + "golang.org/x/sync/errgroup" ) var ( @@ -33,13 +37,28 @@ type Settlement struct { BlockNum int64 Builder string Amount uint64 + BidID []byte Type SettlementType } +type Return struct { + BidIDs [][32]byte +} + +func (r Return) String() string { + strs := make([]string, len(r.BidIDs)) + for idx, bidID := range r.BidIDs { + strs[idx] = fmt.Sprintf("%x", bidID) + } + + return fmt.Sprintf("[%s]", strings.Join(strs, ", ")) +} + type SettlerRegister interface { LastNonce() (int64, error) PendingTxnCount() (int, error) SubscribeSettlements(ctx context.Context) <-chan Settlement + SubscribeReturns(ctx context.Context, limit int) <-chan Return SettlementInitiated(ctx context.Context, commitmentIdx [][]byte, txHash common.Hash, nonce uint64) error MarkSettlementComplete(ctx context.Context, nonce uint64) (int, error) } @@ -71,6 +90,7 @@ type Settler struct { settlerRegister SettlerRegister client Transactor metrics *metrics + txMtx sync.Mutex } func NewSettler( @@ -132,176 +152,255 @@ func (s *Settler) Metrics() []prometheus.Collector { return s.metrics.Collectors() } -func (s *Settler) Start(ctx context.Context) <-chan struct{} { - doneChan := make(chan struct{}) +func (s *Settler) settlementUpdater(ctx context.Context) error { + queryTicker := time.NewTicker(500 * time.Millisecond) + defer queryTicker.Stop() - go func() { - queryTicker := time.NewTicker(500 * time.Millisecond) - defer queryTicker.Stop() - - lastBlock := uint64(0) - for { - select { - case <-ctx.Done(): - return - case <-queryTicker.C: - } + lastBlock := uint64(0) + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-queryTicker.C: + } - currentBlock, err := s.client.BlockNumber(ctx) - if err != nil { - log.Error().Err(err).Msg("failed to get block number") - continue - } + currentBlock, err := s.client.BlockNumber(ctx) + if err != nil { + log.Error().Err(err).Msg("failed to get block number") + continue + } - if currentBlock <= lastBlock { - continue - } + if currentBlock <= lastBlock { + continue + } - lastNonce, err := s.client.NonceAt( - ctx, - s.owner, - new(big.Int).SetUint64(currentBlock), - ) - if err != nil { - log.Error().Err(err).Msg("failed to get nonce") - continue + lastNonce, err := s.client.NonceAt( + ctx, + s.owner, + new(big.Int).SetUint64(currentBlock), + ) + if err != nil { + log.Error().Err(err).Msg("failed to get nonce") + continue + } + + count, err := s.settlerRegister.MarkSettlementComplete(ctx, lastNonce) + if err != nil { + log.Error().Err(err).Msg("failed to mark settlement complete") + continue + } + + s.metrics.LastConfirmedNonce.Set(float64(lastNonce)) + s.metrics.LastConfirmedBlock.Set(float64(currentBlock)) + s.metrics.SettlementsConfirmedCount.Add(float64(count)) + + if count > 0 { + log.Info().Int("count", count).Msg("marked settlement complete") + } + + lastBlock = currentBlock + } +} + +func (s *Settler) settlementExecutor(ctx context.Context) error { +RESTART: + cctx, unsub := context.WithCancel(ctx) + settlementChan := s.settlerRegister.SubscribeSettlements(cctx) + + for { + select { + case <-ctx.Done(): + unsub() + return ctx.Err() + case settlement, more := <-settlementChan: + if !more { + unsub() + goto RESTART } - count, err := s.settlerRegister.MarkSettlementComplete(ctx, lastNonce) + err := func() error { + s.txMtx.Lock() + defer s.txMtx.Unlock() + + if settlement.Type == SettlementTypeReturn { + log.Warn(). + Str("commitmentIdx", fmt.Sprintf("%x", settlement.CommitmentIdx)). + Msg("return settlement") + return nil + } + + pendingTxns, err := s.settlerRegister.PendingTxnCount() + if err != nil { + return err + } + + if pendingTxns > allowedPendingTxnCount { + return errors.New("too many pending txns") + } + + var ( + commitmentIdx [32]byte + ) + + opts, err := s.getTransactOpts(ctx) + if err != nil { + return err + } + + copy(commitmentIdx[:], settlement.CommitmentIdx) + + commitmentPostingTxn, err := s.rollupClient.ProcessBuilderCommitmentForBlockNumber( + opts, + commitmentIdx, + big.NewInt(settlement.BlockNum), + settlement.Builder, + settlement.Type == SettlementTypeSlash, + ) + if err != nil { + return fmt.Errorf("process commitment: %w nonce %d", err, opts.Nonce.Uint64()) + } + + err = s.settlerRegister.SettlementInitiated( + ctx, + [][]byte{settlement.BidID}, + commitmentPostingTxn.Hash(), + commitmentPostingTxn.Nonce(), + ) + if err != nil { + return fmt.Errorf("failed to mark settlement initiated: %w", err) + } + + s.metrics.LastUsedNonce.Set(float64(commitmentPostingTxn.Nonce())) + s.metrics.SettlementsPostedCount.Inc() + s.metrics.CurrentSettlementL1Block.Set(float64(settlement.BlockNum)) + + log.Info(). + Int64("blockNum", settlement.BlockNum). + Str("txHash", commitmentPostingTxn.Hash().Hex()). + Str("builder", settlement.Builder). + Str("settlementType", string(settlement.Type)). + Uint64("nonce", commitmentPostingTxn.Nonce()). + Msg("builder commitment processed") + + return nil + }() if err != nil { - log.Error().Err(err).Msg("failed to mark settlement complete") - continue + log.Error().Err(err).Msg("failed to process builder commitment") + unsub() + time.Sleep(5 * time.Second) + goto RESTART } + } + } +} - s.metrics.LastConfirmedNonce.Set(float64(lastNonce)) - s.metrics.LastConfirmedBlock.Set(float64(currentBlock)) - s.metrics.SettlementsConfirmedCount.Add(float64(count)) - - if count > 0 { - log.Info().Int("count", count).Msg("marked settlement complete") +func (s *Settler) returnExecutor(ctx context.Context) error { +RESTART: + cctx, unsub := context.WithCancel(ctx) + returnsChan := s.settlerRegister.SubscribeReturns(cctx, batchSize) + + for { + select { + case <-ctx.Done(): + unsub() + return ctx.Err() + case returns, more := <-returnsChan: + if !more { + unsub() + goto RESTART } - lastBlock = currentBlock - } + err := func() error { + s.txMtx.Lock() + defer s.txMtx.Unlock() - }() + pendingTxns, err := s.settlerRegister.PendingTxnCount() + if err != nil { + return err + } - go func() { - defer close(doneChan) + if pendingTxns > allowedPendingTxnCount { + return errors.New("too many pending txns") + } - RESTART: - cctx, unsub := context.WithCancel(ctx) - settlementChan := s.settlerRegister.SubscribeSettlements(cctx) - returns := make([][32]byte, 0, batchSize) + opts, err := s.getTransactOpts(ctx) + if err != nil { + return err + } - for { - select { - case <-ctx.Done(): - unsub() - return - case settlement, more := <-settlementChan: - if !more { - unsub() - goto RESTART + bidIDs := make([][]byte, 0, len(returns.BidIDs)) + for _, bidID := range returns.BidIDs { + b := make([]byte, 32) + copy(b, bidID[:]) + bidIDs = append(bidIDs, b) } - err := func() error { - pendingTxns, err := s.settlerRegister.PendingTxnCount() - if err != nil { - return err - } - - if pendingTxns > allowedPendingTxnCount { - time.Sleep(5 * time.Second) - return errors.New("too many pending txns") - } - - var ( - commitmentIdx [32]byte - commitmentIndexes [][]byte - commitmentPostingTxn *types.Transaction - opts *bind.TransactOpts - ) - - // if we are batching returns, we don't want to post the txn until we have a full batch - if settlement.Type != SettlementTypeReturn || len(returns) == batchSize-1 { - opts, err = s.getTransactOpts(ctx) - if err != nil { - return err - } - } - - copy(commitmentIdx[:], settlement.CommitmentIdx) - - switch settlement.Type { - case SettlementTypeReward: - fallthrough - case SettlementTypeSlash: - commitmentPostingTxn, err = s.rollupClient.ProcessBuilderCommitmentForBlockNumber( - opts, - commitmentIdx, - big.NewInt(settlement.BlockNum), - settlement.Builder, - settlement.Type == SettlementTypeSlash, - ) - if err != nil { - return err - } - commitmentIndexes = [][]byte{commitmentIdx[:]} - case SettlementTypeReturn: - returns = append(returns, commitmentIdx) - if len(returns) == batchSize { - commitmentPostingTxn, err = s.rollupClient.UnlockFunds( - opts, - returns, - ) - if err != nil { - return err - } - for _, idx := range returns { - commitmentIndexes = append(commitmentIndexes, idx[:]) - } - // reset batch - returns = returns[:0] - } - } - - if commitmentPostingTxn == nil { - // if we are batching returns, we don't want to post the txn - // until we have a full batch - return nil - } - - err = s.settlerRegister.SettlementInitiated( - ctx, - commitmentIndexes, - commitmentPostingTxn.Hash(), - commitmentPostingTxn.Nonce(), - ) - if err != nil { - return err - } - - s.metrics.LastUsedNonce.Set(float64(commitmentPostingTxn.Nonce())) - s.metrics.SettlementsPostedCount.Inc() - s.metrics.CurrentSettlementL1Block.Set(float64(settlement.BlockNum)) - - log.Info(). - Int64("blockNum", settlement.BlockNum). - Str("txHash", commitmentPostingTxn.Hash().Hex()). - Str("builder", settlement.Builder). - Str("settlementType", string(settlement.Type)). - Msg("builder commitment processed") + log.Debug(). + Stringer("bidIDs", returns). + Int("count", len(returns.BidIDs)). + Msg("processing return") - return nil - }() + commitmentPostingTxn, err := s.rollupClient.UnlockFunds( + opts, + returns.BidIDs, + ) + if err != nil { + return fmt.Errorf("process return: %w nonce %d", err, opts.Nonce.Uint64()) + } + + err = s.settlerRegister.SettlementInitiated( + ctx, + bidIDs, + commitmentPostingTxn.Hash(), + commitmentPostingTxn.Nonce(), + ) if err != nil { - log.Error().Err(err).Msg("failed to process builder commitment") - unsub() - goto RESTART + return fmt.Errorf("failed to mark settlement initiated: %w", err) } + + s.metrics.LastUsedNonce.Set(float64(commitmentPostingTxn.Nonce())) + s.metrics.SettlementsPostedCount.Inc() + + log.Info(). + Str("txHash", commitmentPostingTxn.Hash().Hex()). + Int("batchSize", len(returns.BidIDs)). + Uint64("nonce", commitmentPostingTxn.Nonce()). + Msg("builder return processed") + + return nil + }() + if err != nil { + log.Error().Err(err).Msg("failed to process return") + unsub() + time.Sleep(5 * time.Second) + goto RESTART } } + } +} + +func (s *Settler) Start(ctx context.Context) <-chan struct{} { + doneChan := make(chan struct{}) + + eg, egCtx := errgroup.WithContext(ctx) + + eg.Go(func() error { + return s.settlementUpdater(egCtx) + }) + + eg.Go(func() error { + return s.settlementExecutor(egCtx) + }) + + eg.Go(func() error { + return s.returnExecutor(egCtx) + }) + + go func() { + defer close(doneChan) + if err := eg.Wait(); err != nil { + log.Error().Err(err).Msg("settler error") + } }() return doneChan diff --git a/pkg/settler/settler_test.go b/pkg/settler/settler_test.go index f7bfd1e..b86d5be 100644 --- a/pkg/settler/settler_test.go +++ b/pkg/settler/settler_test.go @@ -22,6 +22,7 @@ type testRegister struct { currentNonce atomic.Int64 pendingTxns atomic.Int32 settlementChan chan settler.Settlement + returnsChan chan settler.Return mu sync.Mutex settlementsInitiated [][]byte settlementsCompleted atomic.Int32 @@ -51,6 +52,22 @@ func (t *testRegister) SubscribeSettlements(ctx context.Context) <-chan settler. return sc } +func (t *testRegister) SubscribeReturns(ctx context.Context, _ int) <-chan settler.Return { + rc := make(chan settler.Return) + go func() { + for { + select { + case <-ctx.Done(): + return + case r := <-t.returnsChan: + rc <- r + } + } + }() + + return rc +} + func (t *testRegister) SettlementInitiated(ctx context.Context, commitmentIdx [][]byte, txHash common.Hash, nonce uint64) error { t.mu.Lock() defer t.mu.Unlock() @@ -190,6 +207,7 @@ func TestSettler(t *testing.T) { orcl := &testOracle{key: key} reg := &testRegister{ settlementChan: make(chan settler.Settlement), + returnsChan: make(chan settler.Return), } transactor := &testTransactor{} @@ -219,6 +237,7 @@ func TestSettler(t *testing.T) { BlockNum: 100, Builder: "0x1234", Amount: 1000, + BidID: common.HexToHash(fmt.Sprintf("0x%02d", i)).Bytes(), Type: sType, } @@ -254,16 +273,12 @@ func TestSettler(t *testing.T) { t.Fatal(err) } + returns := settler.Return{} + for i := 0; i < 10; i++ { - reg.settlementChan <- settler.Settlement{ - CommitmentIdx: big.NewInt(int64(i + 1)).Bytes(), - TxHash: "0x1234", - BlockNum: 100, - Builder: "0x1234", - Amount: 1000, - Type: settler.SettlementTypeReturn, - } + returns.BidIDs = append(returns.BidIDs, common.HexToHash(fmt.Sprintf("0x%02d", i))) } + reg.returnsChan <- returns if err := waitForCount(5*time.Second, 20, reg.settlementsInitiatedCount); err != nil { t.Fatal(err) diff --git a/pkg/store/store.go b/pkg/store/store.go index 85c760e..a5fbe33 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -28,6 +28,7 @@ CREATE TABLE IF NOT EXISTS settlements ( builder_address BYTEA, type settlement_type, amount NUMERIC(24, 0), + bid_id BYTEA, chainhash BYTEA, nonce BIGINT, settled BOOLEAN @@ -44,6 +45,7 @@ type Store struct { db *sql.DB winnerT chan struct{} settlerT chan struct{} + returnT chan struct{} } func NewStore(db *sql.DB) (*Store, error) { @@ -58,6 +60,7 @@ func NewStore(db *sql.DB) (*Store, error) { db: db, winnerT: make(chan struct{}), settlerT: make(chan struct{}), + returnT: make(chan struct{}), }, nil } @@ -75,6 +78,13 @@ func (s *Store) triggerSettler() { } } +func (s *Store) triggerReturn() { + select { + case s.returnT <- struct{}{}: + default: + } +} + func (s *Store) RegisterWinner(ctx context.Context, blockNum int64, winner string) error { insertStr := "INSERT INTO winners (block_number, builder_address, processed) VALUES ($1, $2, $3)" @@ -104,14 +114,17 @@ func (s *Store) SubscribeWinners(ctx context.Context) <-chan updater.BlockWinner var bWinner updater.BlockWinner err = results.Scan(&bWinner.BlockNumber, &bWinner.Winner) if err != nil { + _ = results.Close() continue RETRY } select { case <-ctx.Done(): + _ = results.Close() return case resChan <- bWinner: } } + _ = results.Close() select { case <-ctx.Done(): @@ -144,6 +157,7 @@ func (s *Store) AddSettlement( blockNum int64, amount uint64, builder string, + bidID []byte, settlementType settler.SettlementType, ) error { columns := []string{ @@ -153,6 +167,7 @@ func (s *Store) AddSettlement( "builder_address", "type", "amount", + "bid_id", "settled", "chainhash", "nonce", @@ -164,6 +179,7 @@ func (s *Store) AddSettlement( builder, settlementType, amount, + bidID, false, nil, 0, @@ -195,9 +211,10 @@ func (s *Store) SubscribeSettlements(ctx context.Context) <-chan settler.Settlem RETRY: for { queryStr := ` - SELECT commitment_index, transaction, block_number, builder_address, amount, type + SELECT commitment_index, transaction, block_number, builder_address, amount, bid_id, type FROM settlements - WHERE settled = false AND chainhash IS NULL` + WHERE settled = false AND chainhash IS NULL AND type != 'return' + ORDER BY block_number ASC` results, err := s.db.QueryContext(ctx, queryStr) if err != nil { @@ -213,19 +230,99 @@ func (s *Store) SubscribeSettlements(ctx context.Context) <-chan settler.Settlem &s.BlockNum, &s.Builder, &s.Amount, + &s.BidID, &s.Type, ) if err != nil { + _ = results.Close() continue RETRY } select { case <-ctx.Done(): + _ = results.Close() return case resChan <- s: } } + _ = results.Close() + + select { + case <-ctx.Done(): + return + case <-s.settlerT: + } + } + }() + + return resChan +} + +func (s *Store) SubscribeReturns(ctx context.Context, limit int) <-chan settler.Return { + resChan := make(chan settler.Return) + + go func() { + defer close(resChan) + + RETRY: + for { + queryStr := ` + SELECT DISTINCT bid_id, block_number + FROM settlements + WHERE settled = false AND chainhash IS NULL AND type = 'return' + AND block_number < (SELECT MAX(block_number) FROM settlements WHERE settled = true) + ORDER BY block_number ASC` + + results, err := s.db.QueryContext(ctx, queryStr) + if err != nil { + fmt.Println("error", err) + return + } + + returns := make([][]byte, 0, limit) + + copyReturns := func() [][32]byte { + bidIDs := make([][32]byte, len(returns)) + for idx, bidID := range returns { + bidIDs[idx] = [32]byte{} + copy(bidIDs[idx][:], bidID) + } + return bidIDs + } + + for results.Next() { + var r []byte + err = results.Scan(&r, new(int64)) + if err != nil { + _ = results.Close() + continue RETRY + } + + returns = append(returns, r) + if len(returns) == limit { + select { + case <-ctx.Done(): + _ = results.Close() + return + case resChan <- settler.Return{BidIDs: copyReturns()}: + returns = returns[:0] + } + } + } + + if len(returns) > 0 { + select { + case <-ctx.Done(): + _ = results.Close() + return + case resChan <- settler.Return{BidIDs: copyReturns()}: + returns = returns[:0] + } + } + + _ = results.Close() + select { case <-ctx.Done(): return @@ -239,16 +336,16 @@ func (s *Store) SubscribeSettlements(ctx context.Context) <-chan settler.Settlem func (s *Store) SettlementInitiated( ctx context.Context, - commitmentIndexes [][]byte, + bidIDs [][]byte, txHash common.Hash, nonce uint64, ) error { _, err := s.db.ExecContext( ctx, - "UPDATE settlements SET chainhash = $1, nonce = $2 WHERE commitment_index = ANY($3::BYTEA[])", + "UPDATE settlements SET chainhash = $1, nonce = $2 WHERE bid_id = ANY($3::BYTEA[])", txHash.Bytes(), nonce, - pq.Array(commitmentIndexes), + pq.Array(bidIDs), ) if err != nil { return err @@ -269,6 +366,7 @@ func (s *Store) MarkSettlementComplete(ctx context.Context, nonce uint64) (int, if err != nil { return 0, err } + s.triggerReturn() return int(count), nil } @@ -296,11 +394,12 @@ type BlockInfo struct { BlockNumber int64 Builder string NoOfCommitments int - TotalAmount int + NoOfBids int + TotalAmount sql.NullString NoOfRewards int - TotalRewards int + TotalRewards sql.NullString NoOfSlashes int - TotalSlashes int + TotalSlashes sql.NullString NoOfSettlements int } @@ -311,12 +410,18 @@ func (s *Store) ProcessedBlocks(limit, offset int) ([]BlockInfo, error) { winners.block_number, winners.builder_address, COUNT(settlements.commitment_index) AS commitment_count, - SUM(settlements.amount) AS total_amount, + COUNT(DISTINCT settlements.bid_id) AS bid_count, + (SELECT SUM(amount) FROM ( + SELECT DISTINCT ON (bid_id) bid_id, amount + FROM settlements sub_settlements + WHERE sub_settlements.block_number = winners.block_number + ORDER BY bid_id, block_number + ) AS distinct_amounts) AS total_amount, COUNT(settlements.type = 'reward' OR NULL) AS reward_count, SUM(settlements.amount) FILTER (WHERE settlements.type = 'reward') AS total_rewards, COUNT(settlements.type = 'slash' OR NULL) AS slash_count, SUM(settlements.amount) FILTER (WHERE settlements.type = 'slash') AS total_slashes, - COUNT(settlements.settled) AS settled_count + COUNT(settlements.settled) FILTER (WHERE settlements.settled = true) AS settled_count FROM winners LEFT JOIN @@ -341,6 +446,7 @@ func (s *Store) ProcessedBlocks(limit, offset int) ([]BlockInfo, error) { &b.BlockNumber, &b.Builder, &b.NoOfCommitments, + &b.NoOfBids, &b.TotalAmount, &b.NoOfRewards, &b.TotalRewards, @@ -358,6 +464,7 @@ func (s *Store) ProcessedBlocks(limit, offset int) ([]BlockInfo, error) { type CommitmentStats struct { TotalCount int + BidCount int RewardCount int SlashCount int SettlementsCompletedCount int @@ -368,13 +475,15 @@ func (s *Store) CommitmentStats() (CommitmentStats, error) { err := s.db.QueryRow(` SELECT COUNT(*), + COUNT(DISTINCT bid_id), COUNT(type = 'reward' OR NULL), COUNT(type = 'slash' OR NULL), - COUNT(settled) + COUNT(settled) FILTER (WHERE settled = true) FROM settlements `).Scan( &stats.TotalCount, + &stats.BidCount, &stats.RewardCount, &stats.SlashCount, &stats.SettlementsCompletedCount, diff --git a/pkg/store/store_test.go b/pkg/store/store_test.go index b3a2d73..2bac2ec 100644 --- a/pkg/store/store_test.go +++ b/pkg/store/store_test.go @@ -1,6 +1,7 @@ package store_test import ( + "bytes" "context" "database/sql" "fmt" @@ -83,6 +84,7 @@ func TestStore(t *testing.T) { BlockNum: 1, Amount: 2000000, Builder: winners[0].Winner, + BidID: common.HexToHash("0x01").Bytes(), Type: settler.SettlementTypeReward, }, { @@ -91,6 +93,7 @@ func TestStore(t *testing.T) { BlockNum: 1, Amount: 1000000, Builder: winners[0].Winner, + BidID: common.HexToHash("0x02").Bytes(), Type: settler.SettlementTypeSlash, }, { @@ -99,6 +102,7 @@ func TestStore(t *testing.T) { BlockNum: 1, Amount: 1000000, Builder: winners[1].Winner, + BidID: common.HexToHash("0x03").Bytes(), Type: settler.SettlementTypeReturn, }, { @@ -107,6 +111,7 @@ func TestStore(t *testing.T) { BlockNum: 2, Amount: 2000000, Builder: winners[1].Winner, + BidID: common.HexToHash("0x04").Bytes(), Type: settler.SettlementTypeReward, }, { @@ -115,6 +120,7 @@ func TestStore(t *testing.T) { BlockNum: 2, Amount: 1000000, Builder: winners[1].Winner, + BidID: common.HexToHash("0x05").Bytes(), Type: settler.SettlementTypeSlash, }, { @@ -123,6 +129,7 @@ func TestStore(t *testing.T) { BlockNum: 2, Amount: 1000000, Builder: winners[0].Winner, + BidID: common.HexToHash("0x04").Bytes(), Type: settler.SettlementTypeReturn, }, } @@ -159,9 +166,6 @@ func TestStore(t *testing.T) { // Subscribe to winners winnerChan := st.SubscribeWinners(ctx) - if err != nil { - t.Fatalf("Failed to subscribe to winners: %s", err) - } for i := 0; i < 2; i++ { winner := <-winnerChan @@ -196,9 +200,6 @@ func TestStore(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) winnerChan := st.SubscribeWinners(ctx) - if err != nil { - t.Fatalf("Failed to subscribe to winners: %s", err) - } winner := <-winnerChan if winner.BlockNumber != winners[1].BlockNumber { @@ -230,6 +231,7 @@ func TestStore(t *testing.T) { settlement.BlockNum, settlement.Amount, settlement.Builder, + settlement.BidID, settlement.Type, ) if err != nil { @@ -247,11 +249,8 @@ func TestStore(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) settlementChan := st.SubscribeSettlements(ctx) - if err != nil { - t.Fatalf("Failed to subscribe to settlements: %s", err) - } - for i := 0; i < 6; i++ { + for _, i := range []int{0, 1, 3, 4} { settlement := <-settlementChan if diff := cmp.Diff(settlement, settlements[i]); diff != "" { t.Fatalf("Unexpected settlement: (-want +have):\n%s", diff) @@ -272,10 +271,11 @@ func TestStore(t *testing.T) { t.Fatalf("Failed to create store: %s", err) } + settlementIdxs := []int{0, 1, 3, 4} indexes := make([][]byte, 2) - for i := 0; i < 3; i++ { - indexes[0] = settlements[2*i].CommitmentIdx - indexes[1] = settlements[2*i+1].CommitmentIdx + for i := 0; i < 2; i++ { + indexes[0] = settlements[settlementIdxs[2*i]].BidID + indexes[1] = settlements[settlementIdxs[2*i+1]].BidID err = st.SettlementInitiated( context.Background(), @@ -299,16 +299,16 @@ func TestStore(t *testing.T) { if err != nil { t.Fatalf("Failed to get last nonce: %s", err) } - if lastNonce != 3 { - t.Fatalf("Expected last nonce 3, got %d", lastNonce) + if lastNonce != 2 { + t.Fatalf("Expected last nonce 2, got %d", lastNonce) } pendingTxnCount, err := st.PendingTxnCount() if err != nil { t.Fatalf("Failed to get pending txn count: %s", err) } - if pendingTxnCount != 3 { - t.Fatalf("Expected pending txn count 3, got %d", pendingTxnCount) + if pendingTxnCount != 2 { + t.Fatalf("Expected pending txn count 2, got %d", pendingTxnCount) } }) @@ -318,18 +318,89 @@ func TestStore(t *testing.T) { t.Fatalf("Failed to create store: %s", err) } - count, err := st.MarkSettlementComplete(context.Background(), 4) + count, err := st.MarkSettlementComplete(context.Background(), 3) if err != nil { t.Fatalf("Failed to mark settlement complete: %s", err) } - if count != 6 { - t.Fatalf("Expected count 6, got %d", count) + if count != 5 { + t.Fatalf("Expected count 5, got %d", count) + } + + pendingTxnCount, err := st.PendingTxnCount() + if err != nil { + t.Fatalf("Failed to get pending txn count: %s", err) + } + if pendingTxnCount != 0 { + t.Fatalf("Expected pending txn count 0, got %d", pendingTxnCount) + } + }) + + t.Run("SubscribeReturns", func(t *testing.T) { + st, err := store.NewStore(db) + if err != nil { + t.Fatalf("Failed to create store: %s", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + + returnChan := st.SubscribeReturns(ctx, 2) + + returnSettlement := <-returnChan + if len(returnSettlement.BidIDs) != 1 { + t.Fatalf("Expected 1 bid ID, got %d", len(returnSettlement.BidIDs)) + } + if !bytes.Equal(returnSettlement.BidIDs[0][:], settlements[2].BidID) { + t.Fatalf( + "Unexpected return settlement: want %x have %x\n", + settlements[2].BidID, + returnSettlement.BidIDs[0], + ) + } + + cancel() + + returnSettlement, ok := <-returnChan + if ok { + t.Fatalf("Expected channel to be closed, got %v", returnSettlement) + } + }) + + t.Run("SettleReturn", func(t *testing.T) { + st, err := store.NewStore(db) + if err != nil { + t.Fatalf("Failed to create store: %s", err) + } + + err = st.SettlementInitiated( + context.Background(), + [][]byte{settlements[2].BidID}, + common.HexToHash(fmt.Sprintf("0x%02d", 3)), + uint64(3), + ) + if err != nil { + t.Fatalf("Failed to initiate settlement: %s", err) } pendingTxnCount, err := st.PendingTxnCount() if err != nil { t.Fatalf("Failed to get pending txn count: %s", err) } + if pendingTxnCount != 1 { + t.Fatalf("Expected pending txn count 1, got %d", pendingTxnCount) + } + + count, err := st.MarkSettlementComplete(context.Background(), 4) + if err != nil { + t.Fatalf("Failed to mark settlement complete: %s", err) + } + if count != 1 { + t.Fatalf("Expected count 1, got %d", count) + } + + pendingTxnCount, err = st.PendingTxnCount() + if err != nil { + t.Fatalf("Failed to get pending txn count: %s", err) + } if pendingTxnCount != 0 { t.Fatalf("Expected pending txn count 0, got %d", pendingTxnCount) } @@ -348,6 +419,9 @@ func TestStore(t *testing.T) { if stats.TotalCount != 6 { t.Fatalf("Expected total count 6, got %d", stats.TotalCount) } + if stats.BidCount != 5 { + t.Fatalf("Expected bid count 5, got %d", stats.BidCount) + } if stats.RewardCount != 2 { t.Fatalf("Expected reward count 2, got %d", stats.RewardCount) } @@ -375,20 +449,23 @@ func TestStore(t *testing.T) { if block.NoOfCommitments != 3 { t.Fatalf("Expected no of commitments 3, got %d", block.NoOfCommitments) } - if block.TotalAmount != 4000000 { - t.Fatalf("Expected total amount 5000000, got %d", block.TotalAmount) + if block.NoOfBids != 3 { + t.Fatalf("Expected no of bids 3, got %d", block.NoOfBids) + } + if block.TotalAmount.String != "4000000" { + t.Fatalf("Expected total amount 4000000, got %s", block.TotalAmount.String) } if block.NoOfRewards != 1 { t.Fatalf("Expected no of rewards 1, got %d", block.NoOfRewards) } - if block.TotalRewards != 2000000 { - t.Fatalf("Expected total rewards 2000000, got %d", block.TotalRewards) + if block.TotalRewards.String != "2000000" { + t.Fatalf("Expected total rewards 2000000, got %s", block.TotalRewards.String) } if block.NoOfSlashes != 1 { - t.Fatalf("Expected no of slashes 2, got %d", block.NoOfSlashes) + t.Fatalf("Expected no of slashes 1, got %d", block.NoOfSlashes) } - if block.TotalSlashes != 1000000 { - t.Fatalf("Expected total slashes 2000000, got %d", block.TotalSlashes) + if block.TotalSlashes.String != "1000000" { + t.Fatalf("Expected total slashes 1000000, got %s", block.TotalSlashes.String) } if block.NoOfSettlements != 3 { t.Fatalf("Expected no of settlements 3, got %d", block.NoOfSettlements) diff --git a/pkg/updater/updater.go b/pkg/updater/updater.go index 28ca316..2704f40 100644 --- a/pkg/updater/updater.go +++ b/pkg/updater/updater.go @@ -31,6 +31,7 @@ type WinnerRegister interface { blockNum int64, amount uint64, builder string, + bidID []byte, settlementType settler.SettlementType, ) error } @@ -123,7 +124,7 @@ func (u *Updater) Start(ctx context.Context) <-chan struct{} { txnsInBlock := make(map[string]int) for posInBlock, tx := range blk.Transactions() { - txnsInBlock[tx.Hash().Hex()] = posInBlock + txnsInBlock[strings.TrimPrefix(tx.Hash().Hex(), "0x")] = posInBlock } commitmentIndexes, err := u.preconfClient.GetCommitmentsByBlockNumber( @@ -168,7 +169,8 @@ func (u *Updater) Start(ctx context.Context) <-chan struct{} { commitment.TxnHash, winner.BlockNumber, commitment.Bid, - winner.Winner, + commitment.Commiter.Hex(), + commitment.CommitmentHash[:], settlementType, ) if err != nil { diff --git a/pkg/updater/updater_test.go b/pkg/updater/updater_test.go index ba60290..9c219e1 100644 --- a/pkg/updater/updater_test.go +++ b/pkg/updater/updater_test.go @@ -3,8 +3,10 @@ package updater_test import ( "context" "errors" + "fmt" "hash" "math/big" + "strings" "testing" "time" @@ -78,13 +80,15 @@ func TestUpdater(t *testing.T) { if i%2 == 0 { commitments[string(idxBytes[:])] = preconf.PreConfCommitmentStorePreConfCommitment{ - Commiter: builderAddr, - TxnHash: txn.Hash().Hex(), + Commiter: builderAddr, + TxnHash: strings.TrimPrefix(txn.Hash().Hex(), "0x"), + CommitmentHash: common.HexToHash(fmt.Sprintf("0x%02d", i)), } } else { commitments[string(idxBytes[:])] = preconf.PreConfCommitmentStorePreConfCommitment{ - Commiter: otherBuilderAddr, - TxnHash: txn.Hash().Hex(), + Commiter: otherBuilderAddr, + TxnHash: strings.TrimPrefix(txn.Hash().Hex(), "0x"), + CommitmentHash: common.HexToHash(fmt.Sprintf("0x%02d", i)), } } } @@ -93,14 +97,15 @@ func TestUpdater(t *testing.T) { for i := 0; i < 10; i++ { idxBytes := getIdxBytes(int64(i + 10)) - bundle := txns[i].Hash().Hex() + bundle := strings.TrimPrefix(txns[i].Hash().Hex(), "0x") for j := i + 1; j < 10; j++ { - bundle += "," + txns[j].Hash().Hex() + bundle += "," + strings.TrimPrefix(txns[j].Hash().Hex(), "0x") } commitments[string(idxBytes[:])] = preconf.PreConfCommitmentStorePreConfCommitment{ - Commiter: builderAddr, - TxnHash: bundle, + Commiter: builderAddr, + TxnHash: bundle, + CommitmentHash: common.HexToHash(fmt.Sprintf("0x%02d", i)), } } @@ -150,9 +155,6 @@ func TestUpdater(t *testing.T) { if settlement.blockNum != 5 { t.Fatal("wrong block number") } - if settlement.builder != "test" { - t.Fatal("wrong builder") - } if settlement.settlementType == settler.SettlementTypeSlash { t.Fatal("should not be slash") } @@ -269,9 +271,6 @@ func TestUpdaterBundlesFailure(t *testing.T) { if settlement.blockNum != 5 { t.Fatal("wrong block number") } - if settlement.builder != "test" { - t.Fatal("wrong builder") - } if settlement.settlementType != settler.SettlementTypeSlash { t.Fatalf("should be slash, got %s", settlement.settlementType) } @@ -323,6 +322,7 @@ func (t *testWinnerRegister) AddSettlement( blockNum int64, amount uint64, builder string, + _ []byte, settlementType settler.SettlementType, ) error { t.settlements <- testSettlement{