From 348d2f723f913f09cc084067ba2b2adbd9eb0c69 Mon Sep 17 00:00:00 2001 From: Dev Ojha Date: Tue, 28 May 2024 15:02:30 +0900 Subject: [PATCH 01/30] perf(types): 3x speedup MakePartSet (#3117) This PR adds some benchmarks, and significantly speeds up types.MakePartSet, and Partset.AddPart. (Used by the block proposer, and every consensus instance) It does so by doing two things: - Saving mutexes on the newly created bit array, by defaulting every value to True (rather than setting it in a loop that goes through a mutex) - Uses the same hash object throughout, and avoids an extra copy of every leaf. (main speedup) I do the same hash optimization for proof.Verify, which is used in the add block part codepath for both the proposer and every full node. New: ``` BenchmarkMakePartSet/nParts=1-12 38616 29817 ns/op 568 B/op 12 allocs/op BenchmarkMakePartSet/nParts=2-12 19888 59866 ns/op 1000 B/op 22 allocs/op BenchmarkMakePartSet/nParts=3-12 12979 95691 ns/op 1528 B/op 33 allocs/op BenchmarkMakePartSet/nParts=4-12 8688 128192 ns/op 2024 B/op 44 allocs/op BenchmarkMakePartSet/nParts=5-12 7308 155224 ns/op 2888 B/op 57 allocs/op ``` Old: ``` BenchmarkMakePartSet/nParts=1-12 16647 106545 ns/op 74169 B/op 12 allocs/op BenchmarkMakePartSet/nParts=2-12 10000 106361 ns/op 148329 B/op 23 allocs/op BenchmarkMakePartSet/nParts=3-12 6992 337644 ns/op 222587 B/op 35 allocs/op BenchmarkMakePartSet/nParts=4-12 3488 480109 ns/op 296811 B/op 47 allocs/op BenchmarkMakePartSet/nParts=5-12 2228 557768 ns/op 371404 B/op 61 allocs/op ``` System wide, this is definitely not our issue (looks like roughly .1ms per blockpart), but still definitely useful time to remove --- - [x] Tests written/updated - [x] Changelog entry added in `.changelog` (we use [unclog](https://github.com/informalsystems/unclog) to manage our changelog) - [x] Updated relevant documentation (`docs/` or `spec/`) and code comments - [x] Title follows the [Conventional Commits](https://www.conventionalcommits.org/en/v1.0.0/) spec (cherry picked from commit 7b1c1f898cc04d624e742bf14cea54f9d4f55d7c) --- ...3117-significantly-speedup-make-partset.md | 4 +- crypto/merkle/bench_test.go | 22 ++++++++++ crypto/merkle/proof.go | 40 +++++++++++++------ crypto/merkle/proof_value.go | 2 +- types/part_set.go | 3 +- types/part_set_test.go | 13 ++++++ 6 files changed, 67 insertions(+), 17 deletions(-) diff --git a/.changelog/unreleased/improvements/3117-significantly-speedup-make-partset.md b/.changelog/unreleased/improvements/3117-significantly-speedup-make-partset.md index 9a0c481c46..c929308418 100644 --- a/.changelog/unreleased/improvements/3117-significantly-speedup-make-partset.md +++ b/.changelog/unreleased/improvements/3117-significantly-speedup-make-partset.md @@ -1,2 +1,2 @@ -- `[types]` Significantly speedup types.MakePartSet and types.AddPart, which are used in creating a block proposal - ([\#3117](https://github.com/cometbft/cometbft/issues/3117)) +- [`types`] Significantly speedup types.MakePartSet and types.AddPart, which are used in creating a block proposal + ([\#3117](https://github.com/cometbft/cometbft/issues/3117) diff --git a/crypto/merkle/bench_test.go b/crypto/merkle/bench_test.go index 0520bd2389..c6566c9da6 100644 --- a/crypto/merkle/bench_test.go +++ b/crypto/merkle/bench_test.go @@ -40,3 +40,25 @@ func BenchmarkInnerHash(b *testing.B) { b.Fatal("Benchmark did not run!") } } + +// Benchmark the time it takes to hash a 64kb leaf, which is the size of +// a block part. +// This helps determine whether its worth parallelizing this hash for the proposer. +func BenchmarkLeafHash64kb(b *testing.B) { + b.ReportAllocs() + leaf := make([]byte, 64*1024) + hash := sha256.New() + + for i := 0; i < b.N; i++ { + leaf[0] = byte(i) + got := leafHashOpt(hash, leaf) + if g, w := len(got), sha256.Size; g != w { + b.Fatalf("size discrepancy: got %d, want %d", g, w) + } + sink = got + } + + if sink == nil { + b.Fatal("Benchmark did not run!") + } +} diff --git a/crypto/merkle/proof.go b/crypto/merkle/proof.go index 30313f7d34..3265a8479f 100644 --- a/crypto/merkle/proof.go +++ b/crypto/merkle/proof.go @@ -4,6 +4,7 @@ import ( "bytes" "errors" "fmt" + "hash" cmtcrypto "github.com/cometbft/cometbft/api/cometbft/crypto/v1" "github.com/cometbft/cometbft/crypto/tmhash" @@ -91,13 +92,14 @@ func (sp *Proof) Verify(rootHash []byte, leaf []byte) error { Err: errors.New("negative proof index"), } } - leafHash := leafHash(leaf) + hash := tmhash.New() + leafHash := leafHashOpt(hash, leaf) if !bytes.Equal(sp.LeafHash, leafHash) { return ErrInvalidHash{ Err: fmt.Errorf("leaf %x, want %x", sp.LeafHash, leafHash), } } - computedHash, err := sp.computeRootHash() + computedHash, err := sp.computeRootHash(hash) if err != nil { return ErrInvalidHash{ Err: fmt.Errorf("compute root hash: %w", err), @@ -111,9 +113,19 @@ func (sp *Proof) Verify(rootHash []byte, leaf []byte) error { return nil } +// Compute the root hash given a leaf hash. Panics in case of errors. +func (sp *Proof) ComputeRootHash() []byte { + computedHash, err := sp.computeRootHash(tmhash.New()) + if err != nil { + panic(fmt.Errorf("ComputeRootHash errored %w", err)) + } + return computedHash +} + // Compute the root hash given a leaf hash. -func (sp *Proof) computeRootHash() ([]byte, error) { +func (sp *Proof) computeRootHash(hash hash.Hash) ([]byte, error) { return computeHashFromAunts( + hash, sp.Index, sp.Total, sp.LeafHash, @@ -200,7 +212,7 @@ func ProofFromProto(pb *cmtcrypto.Proof) (*Proof, error) { // Use the leafHash and innerHashes to get the root merkle hash. // If the length of the innerHashes slice isn't exactly correct, the result is nil. // Recursive impl. -func computeHashFromAunts(index, total int64, leafHash []byte, innerHashes [][]byte) ([]byte, error) { +func computeHashFromAunts(hash hash.Hash, index, total int64, leafHash []byte, innerHashes [][]byte) ([]byte, error) { if index >= total || index < 0 || total <= 0 { return nil, fmt.Errorf("invalid index %d and/or total %d", index, total) } @@ -218,18 +230,18 @@ func computeHashFromAunts(index, total int64, leafHash []byte, innerHashes [][]b } numLeft := getSplitPoint(total) if index < numLeft { - leftHash, err := computeHashFromAunts(index, numLeft, leafHash, innerHashes[:len(innerHashes)-1]) + leftHash, err := computeHashFromAunts(hash, index, numLeft, leafHash, innerHashes[:len(innerHashes)-1]) if err != nil { return nil, err } - return innerHash(leftHash, innerHashes[len(innerHashes)-1]), nil + return innerHashOpt(hash, leftHash, innerHashes[len(innerHashes)-1]), nil } - rightHash, err := computeHashFromAunts(index-numLeft, total-numLeft, leafHash, innerHashes[:len(innerHashes)-1]) + rightHash, err := computeHashFromAunts(hash, index-numLeft, total-numLeft, leafHash, innerHashes[:len(innerHashes)-1]) if err != nil { return nil, err } - return innerHash(innerHashes[len(innerHashes)-1], rightHash), nil + return innerHashOpt(hash, innerHashes[len(innerHashes)-1], rightHash), nil } } @@ -266,18 +278,22 @@ func (spn *ProofNode) FlattenAunts() [][]byte { // trails[0].Hash is the leaf hash for items[0]. // trails[i].Parent.Parent....Parent == root for all i. func trailsFromByteSlices(items [][]byte) (trails []*ProofNode, root *ProofNode) { + return trailsFromByteSlicesInternal(tmhash.New(), items) +} + +func trailsFromByteSlicesInternal(hash hash.Hash, items [][]byte) (trails []*ProofNode, root *ProofNode) { // Recursive impl. switch len(items) { case 0: return []*ProofNode{}, &ProofNode{emptyHash(), nil, nil, nil} case 1: - trail := &ProofNode{leafHash(items[0]), nil, nil, nil} + trail := &ProofNode{leafHashOpt(hash, items[0]), nil, nil, nil} return []*ProofNode{trail}, trail default: k := getSplitPoint(int64(len(items))) - lefts, leftRoot := trailsFromByteSlices(items[:k]) - rights, rightRoot := trailsFromByteSlices(items[k:]) - rootHash := innerHash(leftRoot.Hash, rightRoot.Hash) + lefts, leftRoot := trailsFromByteSlicesInternal(hash, items[:k]) + rights, rightRoot := trailsFromByteSlicesInternal(hash, items[k:]) + rootHash := innerHashOpt(hash, leftRoot.Hash, rightRoot.Hash) root := &ProofNode{rootHash, nil, nil, nil} leftRoot.Parent = root leftRoot.Right = rightRoot diff --git a/crypto/merkle/proof_value.go b/crypto/merkle/proof_value.go index dbb9600d9d..676d156060 100644 --- a/crypto/merkle/proof_value.go +++ b/crypto/merkle/proof_value.go @@ -104,7 +104,7 @@ func (op ValueOp) Run(args [][]byte) ([][]byte, error) { } } - rootHash, err := op.Proof.computeRootHash() + rootHash, err := op.Proof.computeRootHash(tmhash.New()) if err != nil { return nil, err } diff --git a/types/part_set.go b/types/part_set.go index eca1a3723b..4223a196a7 100644 --- a/types/part_set.go +++ b/types/part_set.go @@ -180,7 +180,6 @@ func NewPartSetFromData(data []byte, partSize uint32) *PartSet { total := (uint32(len(data)) + partSize - 1) / partSize parts := make([]*Part, total) partsBytes := make([][]byte, total) - partsBitArray := bits.NewBitArray(int(total)) for i := uint32(0); i < total; i++ { part := &Part{ Index: i, @@ -188,13 +187,13 @@ func NewPartSetFromData(data []byte, partSize uint32) *PartSet { } parts[i] = part partsBytes[i] = part.Bytes - partsBitArray.SetIndex(int(i), true) } // Compute merkle proofs root, proofs := merkle.ProofsFromByteSlices(partsBytes) for i := uint32(0); i < total; i++ { parts[i].Proof = *proofs[i] } + partsBitArray := bits.NewBitArrayFromFn(int(total), func(int) bool { return true }) return &PartSet{ total: total, hash: root, diff --git a/types/part_set_test.go b/types/part_set_test.go index 24f914bfe0..af2da7debc 100644 --- a/types/part_set_test.go +++ b/types/part_set_test.go @@ -1,6 +1,7 @@ package types import ( + "fmt" "io" "testing" @@ -219,3 +220,15 @@ func TestPartProtoBuf(t *testing.T) { } } } + +func BenchmarkMakePartSet(b *testing.B) { + for nParts := 1; nParts <= 5; nParts++ { + b.Run(fmt.Sprintf("nParts=%d", nParts), func(b *testing.B) { + data := cmtrand.Bytes(testPartSize * nParts) + b.ResetTimer() + for i := 0; i < b.N; i++ { + NewPartSetFromData(data, testPartSize) + } + }) + } +} From 119254ce7725f006d56980951641bfc3e340af93 Mon Sep 17 00:00:00 2001 From: Dev Ojha Date: Fri, 31 May 2024 12:36:25 +0900 Subject: [PATCH 02/30] Comment out expensive debug logs for now (cherry picked from commit 3dcd8601d6d7f50a3d5a60c4eabd67d61c7db8a9) --- internal/consensus/state.go | 14 +++++++------- p2p/conn/connection.go | 8 +++++--- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/internal/consensus/state.go b/internal/consensus/state.go index 06e4650d31..70bb4f5870 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -2286,7 +2286,7 @@ func (cs *State) addVote(vote *types.Vote, peerID p2p.ID) (added bool, err error return added, err } - cs.Logger.Debug("Added vote to last precommits", "last_commit", cs.LastCommit.StringShort()) + // cs.Logger.Debug("added vote to last precommits", "last_commit", cs.LastCommit.StringShort()) if err := cs.eventBus.PublishEventVote(types.EventDataVote{Vote: vote}); err != nil { return added, err } @@ -2376,7 +2376,7 @@ func (cs *State) addVote(vote *types.Vote, peerID p2p.ID) (added bool, err error switch vote.Type { case types.PrevoteType: prevotes := cs.Votes.Prevotes(vote.Round) - cs.Logger.Debug("Added vote to prevote", "vote", vote, "prevotes", prevotes.StringShort()) + // cs.Logger.Debug("added vote to prevote", "vote", vote, "prevotes", prevotes.StringShort()) // Check to see if >2/3 of the voting power on the network voted for any non-nil block. if blockID, ok := prevotes.TwoThirdsMajority(); ok && !blockID.IsNil() { @@ -2391,11 +2391,11 @@ func (cs *State) addVote(vote *types.Vote, peerID p2p.ID) (added bool, err error cs.ValidBlock = cs.ProposalBlock cs.ValidBlockParts = cs.ProposalBlockParts } else { - cs.Logger.Debug( - "Valid block we do not know about; set ProposalBlock=nil", - "proposal", log.NewLazyBlockHash(cs.ProposalBlock), - "block_id", blockID.Hash, - ) + // cs.Logger.Debug( + // "valid block we do not know about; set ProposalBlock=nil", + // "proposal", log.NewLazyBlockHash(cs.ProposalBlock), + // "block_id", blockID.Hash, + // ) // we're getting the wrong block cs.ProposalBlock = nil diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index 438008750c..9f8cebb530 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -360,7 +360,7 @@ func (c *MConnection) Send(chID byte, msgBytes []byte) bool { return false } - c.Logger.Debug("Send", "channel", chID, "conn", c, "msgBytes", log.NewLazySprintf("%X", msgBytes)) + // c.Logger.Debug("Send", "channel", chID, "conn", c, "msgBytes", log.NewLazySprintf("%X", msgBytes)) // Send message to channel. channel, ok := c.channelsIdx[chID] @@ -376,9 +376,11 @@ func (c *MConnection) Send(chID byte, msgBytes []byte) bool { case c.send <- struct{}{}: default: } - } else { - c.Logger.Debug("Send failed", "channel", chID, "conn", c, "msgBytes", log.NewLazySprintf("%X", msgBytes)) } + + // else { + // // c.Logger.Debug("Send failed", "channel", chID, "conn", c, "msgBytes", log.NewLazySprintf("%X", msgBytes)) + // } return success } From 3725dec5dfc25ef1750c91dea0ffbd450949a906 Mon Sep 17 00:00:00 2001 From: Dev Ojha Date: Fri, 31 May 2024 12:55:28 +0900 Subject: [PATCH 03/30] Comment out expensive logs (cherry picked from commit b3fabed69aef9a728959a7c1e5d1ce101a0344a9) --- internal/consensus/reactor.go | 12 ++++++------ p2p/conn/connection.go | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index 33b1ee99fb..a4db8053f9 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -1450,12 +1450,12 @@ func (ps *PeerState) SetHasVote(vote *types.Vote) { } func (ps *PeerState) setHasVote(height int64, round int32, voteType types.SignedMsgType, index int32) { - ps.logger.Debug("setHasVote", - "peerH/R", - log.NewLazySprintf("%d/%d", ps.PRS.Height, ps.PRS.Round), - "H/R", - log.NewLazySprintf("%d/%d", height, round), - "type", voteType, "index", index) + // ps.logger.Debug("setHasVote", + // "peerH/R", + // log.NewLazySprintf("%d/%d", ps.PRS.Height, ps.PRS.Round), + // "H/R", + // log.NewLazySprintf("%d/%d", height, round), + // "type", voteType, "index", index) // NOTE: some may be nil BitArrays -> no side effects. psVotes := ps.getVoteBitArray(height, round, voteType) diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index 9f8cebb530..b048a578a0 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -391,7 +391,7 @@ func (c *MConnection) TrySend(chID byte, msgBytes []byte) bool { return false } - c.Logger.Debug("TrySend", "channel", chID, "conn", c, "msgBytes", log.NewLazySprintf("%X", msgBytes)) + // c.Logger.Debug("TrySend", "channel", chID, "conn", c, "msgBytes", log.NewLazySprintf("%X", msgBytes)) // Send message to channel. channel, ok := c.channelsIdx[chID] From 095494503c4eb884d2eadc0814aaf8adb14c4ffb Mon Sep 17 00:00:00 2001 From: Dev Ojha Date: Tue, 4 Jun 2024 08:42:04 -0500 Subject: [PATCH 04/30] Run broadcast routines out of process (cherry picked from commit bfb00e71ae64d4c0b34a13be8e61160aa0a81e46) --- internal/consensus/reactor.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index a4db8053f9..e04bc67553 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -421,21 +421,21 @@ func (conR *Reactor) subscribeToBroadcastEvents() { const subscriber = "consensus-reactor" if err := conR.conS.evsw.AddListenerForEvent(subscriber, types.EventNewRoundStep, func(data cmtevents.EventData) { - conR.broadcastNewRoundStepMessage(data.(*cstypes.RoundState)) + go func() { conR.broadcastNewRoundStepMessage(data.(*cstypes.RoundState)) }() }); err != nil { conR.Logger.Error("Error adding listener for events (NewRoundStep)", "err", err) } if err := conR.conS.evsw.AddListenerForEvent(subscriber, types.EventValidBlock, func(data cmtevents.EventData) { - conR.broadcastNewValidBlockMessage(data.(*cstypes.RoundState)) + go func() { conR.broadcastNewValidBlockMessage(data.(*cstypes.RoundState)) }() }); err != nil { conR.Logger.Error("Error adding listener for events (ValidBlock)", "err", err) } if err := conR.conS.evsw.AddListenerForEvent(subscriber, types.EventVote, func(data cmtevents.EventData) { - conR.broadcastHasVoteMessage(data.(*types.Vote)) + go func() { conR.broadcastHasVoteMessage(data.(*types.Vote)) }() }); err != nil { conR.Logger.Error("Error adding listener for events (Vote)", "err", err) } From 10142c84a9b2c2c08a6aedb04eb68608e621f3fa Mon Sep 17 00:00:00 2001 From: Dev Ojha Date: Tue, 4 Jun 2024 09:05:47 -0500 Subject: [PATCH 05/30] Fix bug, add Changelog (cherry picked from commit 853f76cd7d0f34f295388d97190fa162a0755557) --- internal/consensus/reactor.go | 37 +++++++++++++++++++++-------------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index e04bc67553..00f0f0b9a4 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -421,21 +421,21 @@ func (conR *Reactor) subscribeToBroadcastEvents() { const subscriber = "consensus-reactor" if err := conR.conS.evsw.AddListenerForEvent(subscriber, types.EventNewRoundStep, func(data cmtevents.EventData) { - go func() { conR.broadcastNewRoundStepMessage(data.(*cstypes.RoundState)) }() + conR.broadcastNewRoundStepMessage(data.(*cstypes.RoundState)) }); err != nil { conR.Logger.Error("Error adding listener for events (NewRoundStep)", "err", err) } if err := conR.conS.evsw.AddListenerForEvent(subscriber, types.EventValidBlock, func(data cmtevents.EventData) { - go func() { conR.broadcastNewValidBlockMessage(data.(*cstypes.RoundState)) }() + conR.broadcastNewValidBlockMessage(data.(*cstypes.RoundState)) }); err != nil { conR.Logger.Error("Error adding listener for events (ValidBlock)", "err", err) } if err := conR.conS.evsw.AddListenerForEvent(subscriber, types.EventVote, func(data cmtevents.EventData) { - go func() { conR.broadcastHasVoteMessage(data.(*types.Vote)) }() + conR.broadcastHasVoteMessage(data.(*types.Vote)) }); err != nil { conR.Logger.Error("Error adding listener for events (Vote)", "err", err) } @@ -455,10 +455,12 @@ func (conR *Reactor) unsubscribeFromBroadcastEvents() { func (conR *Reactor) broadcastNewRoundStepMessage(rs *cstypes.RoundState) { nrsMsg := makeRoundStepMessage(rs) - conR.Switch.Broadcast(p2p.Envelope{ - ChannelID: StateChannel, - Message: nrsMsg, - }) + go func() { + conR.Switch.Broadcast(p2p.Envelope{ + ChannelID: StateChannel, + Message: nrsMsg, + }) + }() } func (conR *Reactor) broadcastNewValidBlockMessage(rs *cstypes.RoundState) { @@ -470,10 +472,12 @@ func (conR *Reactor) broadcastNewValidBlockMessage(rs *cstypes.RoundState) { BlockParts: rs.ProposalBlockParts.BitArray().ToProto(), IsCommit: rs.Step == cstypes.RoundStepCommit, } - conR.Switch.Broadcast(p2p.Envelope{ - ChannelID: StateChannel, - Message: csMsg, - }) + go func() { + conR.Switch.Broadcast(p2p.Envelope{ + ChannelID: StateChannel, + Message: csMsg, + }) + }() } // Broadcasts HasVoteMessage to peers that care. @@ -484,10 +488,13 @@ func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) { Type: vote.Type, Index: vote.ValidatorIndex, } - conR.Switch.Broadcast(p2p.Envelope{ - ChannelID: StateChannel, - Message: msg, - }) + + go func() { + conR.Switch.Broadcast(p2p.Envelope{ + ChannelID: StateChannel, + Message: msg, + }) + }() /* // TODO: Make this broadcast more selective. for _, peer := range conR.Switch.Peers().Copy() { From 818b2782ff2718c2e42e5a32ee5fdb64ebfa2357 Mon Sep 17 00:00:00 2001 From: Dev Ojha Date: Tue, 4 Jun 2024 09:32:41 -0500 Subject: [PATCH 06/30] Remove broadcast return channel (cherry picked from commit 9156eec9e2415afdedb8ffd75e88ead4cb25ed24) --- internal/blocksync/reactor.go | 2 +- internal/consensus/reactor.go | 8 ++++---- p2p/switch.go | 35 ++++++++++------------------------- p2p/switch_test.go | 19 ++++--------------- statesync/reactor.go | 2 +- 5 files changed, 20 insertions(+), 46 deletions(-) diff --git a/internal/blocksync/reactor.go b/internal/blocksync/reactor.go index aa6f09dc72..6d8027039c 100644 --- a/internal/blocksync/reactor.go +++ b/internal/blocksync/reactor.go @@ -556,7 +556,7 @@ FOR_LOOP: // BroadcastStatusRequest broadcasts `BlockStore` base and height. func (bcR *Reactor) BroadcastStatusRequest() { - bcR.Switch.Broadcast(p2p.Envelope{ + bcR.Switch.BroadcastEnvelope(p2p.Envelope{ ChannelID: BlocksyncChannel, Message: &bcproto.StatusRequest{}, }) diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index 00f0f0b9a4..7e3beefd87 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -456,7 +456,7 @@ func (conR *Reactor) unsubscribeFromBroadcastEvents() { func (conR *Reactor) broadcastNewRoundStepMessage(rs *cstypes.RoundState) { nrsMsg := makeRoundStepMessage(rs) go func() { - conR.Switch.Broadcast(p2p.Envelope{ + conR.Switch.BroadcastEnvelope(p2p.Envelope{ ChannelID: StateChannel, Message: nrsMsg, }) @@ -473,7 +473,7 @@ func (conR *Reactor) broadcastNewValidBlockMessage(rs *cstypes.RoundState) { IsCommit: rs.Step == cstypes.RoundStepCommit, } go func() { - conR.Switch.Broadcast(p2p.Envelope{ + conR.Switch.BroadcastEnvelope(p2p.Envelope{ ChannelID: StateChannel, Message: csMsg, }) @@ -490,7 +490,7 @@ func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) { } go func() { - conR.Switch.Broadcast(p2p.Envelope{ + conR.Switch.BroadcastEnvelope(p2p.Envelope{ ChannelID: StateChannel, Message: msg, }) @@ -526,7 +526,7 @@ func (conR *Reactor) broadcastHasProposalBlockPartMessage(partMsg *BlockPartMess Round: partMsg.Round, Index: int32(partMsg.Part.Index), } - conR.Switch.Broadcast(p2p.Envelope{ + conR.Switch.BroadcastEnvelope(p2p.Envelope{ ChannelID: StateChannel, Message: msg, }) diff --git a/p2p/switch.go b/p2p/switch.go index 4c1b2cb3e5..ced9278e33 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -4,7 +4,6 @@ import ( "errors" "fmt" "math" - "sync" "time" "github.com/cosmos/gogoproto/proto" @@ -271,32 +270,18 @@ func (sw *Switch) OnStop() { // success values for each attempted send (false if times out). Channel will be // closed once msg bytes are sent to all peers (or time out). // -// NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved. -func (sw *Switch) Broadcast(e Envelope) chan bool { +// NOTE: BroadcastEnvelope uses goroutines, so order of broadcast may not be preserved. +func (sw *Switch) BroadcastEnvelope(e Envelope) { sw.Logger.Debug("Broadcast", "channel", e.ChannelID) - var wg sync.WaitGroup - successChan := make(chan bool, sw.peers.Size()) - - sw.peers.ForEach(func(p Peer) { - wg.Add(1) // Incrementing by one is safer. - go func(peer Peer) { - defer wg.Done() - success := peer.Send(e) - // For rare cases where PeerSet changes between a call to `peers.Size()` and `peers.ForEach()`. - select { - case successChan <- success: - default: - } - }(p) - }) - - go func() { - wg.Wait() - close(successChan) - }() - - return successChan + for _, peer := range sw.peers.list { + go func(p Peer) { + // TODO: We don't use the success value. Should most behavior + // really be TrySend? + success := p.Send(e) + _ = success + }(peer) + } } // NumPeers returns the count of outbound/inbound and outbound-dialing peers. diff --git a/p2p/switch_test.go b/p2p/switch_test.go index 74f421dabc..6540b7522a 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -156,9 +156,9 @@ func TestSwitches(t *testing.T) { }, }, } - s1.Broadcast(Envelope{ChannelID: byte(0x00), Message: ch0Msg}) - s1.Broadcast(Envelope{ChannelID: byte(0x01), Message: ch1Msg}) - s1.Broadcast(Envelope{ChannelID: byte(0x02), Message: ch2Msg}) + s1.BroadcastEnvelope(Envelope{ChannelID: byte(0x00), Message: ch0Msg}) + s1.BroadcastEnvelope(Envelope{ChannelID: byte(0x01), Message: ch1Msg}) + s1.BroadcastEnvelope(Envelope{ChannelID: byte(0x02), Message: ch2Msg}) assertMsgReceivedWithTimeout(t, ch0Msg, byte(0x00), @@ -841,22 +841,11 @@ func BenchmarkSwitchBroadcast(b *testing.B) { b.ResetTimer() - numSuccess, numFailure := 0, 0 - // Send random message from foo channel to another for i := 0; i < b.N; i++ { chID := byte(i % 4) - successChan := s1.Broadcast(Envelope{ChannelID: chID}) - for s := range successChan { - if s { - numSuccess++ - } else { - numFailure++ - } - } + s1.BroadcastEnvelope(Envelope{ChannelID: chID}) } - - b.Logf("success: %v, failure: %v", numSuccess, numFailure) } func TestSwitchRemovalErr(t *testing.T) { diff --git a/statesync/reactor.go b/statesync/reactor.go index 9b71ffe4f2..55f83484ce 100644 --- a/statesync/reactor.go +++ b/statesync/reactor.go @@ -277,7 +277,7 @@ func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration) r.Logger.Debug("Requesting snapshots from known peers") // Request snapshots from all currently connected peers - r.Switch.Broadcast(p2p.Envelope{ + r.Switch.BroadcastEnvelope(p2p.Envelope{ ChannelID: SnapshotChannel, Message: &ssproto.SnapshotsRequest{}, }) From 2b69f2814dfe1f2f3f1ad0772f7c850b6e4bf8cc Mon Sep 17 00:00:00 2001 From: Dev Ojha Date: Wed, 5 Jun 2024 13:55:59 -0500 Subject: [PATCH 07/30] backport remaining commits (cherry picked from commit 423f7b6149dfb458074ef4d156b608d16c793e52) --- .../3184-remove-PeerSendBytesTotal-metric.md | 2 ++ docs/explanation/core/metrics.md | 1 - p2p/metrics.gen.go | 7 ------- p2p/metrics.go | 2 -- p2p/peer.go | 7 +------ 5 files changed, 3 insertions(+), 16 deletions(-) create mode 100644 .changelog/unreleased/breaking-changes/3184-remove-PeerSendBytesTotal-metric.md diff --git a/.changelog/unreleased/breaking-changes/3184-remove-PeerSendBytesTotal-metric.md b/.changelog/unreleased/breaking-changes/3184-remove-PeerSendBytesTotal-metric.md new file mode 100644 index 0000000000..7368fe02cc --- /dev/null +++ b/.changelog/unreleased/breaking-changes/3184-remove-PeerSendBytesTotal-metric.md @@ -0,0 +1,2 @@ +`[p2p]` Remove `PeerSendBytesTotal` metric as it is costly to track, +and not that informative in debugging. ([\#3184](https://github.com/cometbft/cometbft/issues/3184)) \ No newline at end of file diff --git a/docs/explanation/core/metrics.md b/docs/explanation/core/metrics.md index d41796a554..245e82c810 100644 --- a/docs/explanation/core/metrics.md +++ b/docs/explanation/core/metrics.md @@ -56,7 +56,6 @@ The following metrics are available: | p2p\_message\_receive\_bytes\_total | Counter | message\_type | Number of bytes received from all peers per message type | | p2p\_peers | Gauge | | Number of peers node's connected to | | p2p\_peer\_receive\_bytes\_total | Counter | peer\_id, chID | Number of bytes per channel received from a given peer | -| p2p\_peer\_send\_bytes\_total | Counter | peer\_id, chID | Number of bytes per channel sent to a given peer | | p2p\_peer\_pending\_send\_bytes | Gauge | peer\_id | Number of pending bytes to be sent to a given peer | | p2p\_num\_txs | Gauge | peer\_id | Number of transactions submitted by each peer\_id | | p2p\_pending\_send\_bytes | Gauge | peer\_id | Amount of data pending to be sent to peer | diff --git a/p2p/metrics.gen.go b/p2p/metrics.gen.go index e452f16535..bdd4fda872 100644 --- a/p2p/metrics.gen.go +++ b/p2p/metrics.gen.go @@ -26,12 +26,6 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { Name: "peer_receive_bytes_total", Help: "Number of bytes received from a given peer.", }, append(labels, "peer_id", "chID")).With(labelsAndValues...), - PeerSendBytesTotal: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ - Namespace: namespace, - Subsystem: MetricsSubsystem, - Name: "peer_send_bytes_total", - Help: "Number of bytes sent to a given peer.", - }, append(labels, "peer_id", "chID")).With(labelsAndValues...), PeerPendingSendBytes: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ Namespace: namespace, Subsystem: MetricsSubsystem, @@ -63,7 +57,6 @@ func NopMetrics() *Metrics { return &Metrics{ Peers: discard.NewGauge(), PeerReceiveBytesTotal: discard.NewCounter(), - PeerSendBytesTotal: discard.NewCounter(), PeerPendingSendBytes: discard.NewGauge(), NumTxs: discard.NewGauge(), MessageReceiveBytesTotal: discard.NewCounter(), diff --git a/p2p/metrics.go b/p2p/metrics.go index 5121857acc..6ba7592636 100644 --- a/p2p/metrics.go +++ b/p2p/metrics.go @@ -28,8 +28,6 @@ type Metrics struct { Peers metrics.Gauge // Number of bytes received from a given peer. PeerReceiveBytesTotal metrics.Counter `metrics_labels:"peer_id,chID"` - // Number of bytes sent to a given peer. - PeerSendBytesTotal metrics.Counter `metrics_labels:"peer_id,chID"` // Pending bytes to be sent to a given peer. PeerPendingSendBytes metrics.Gauge `metrics_labels:"peer_id"` // Number of transactions submitted by each peer. diff --git a/p2p/peer.go b/p2p/peer.go index 145bced19a..48af3c0cfb 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -282,12 +282,7 @@ func (p *peer) send(chID byte, msg proto.Message, sendFunc func(byte, []byte) bo } res := sendFunc(chID, msgBytes) if res { - p.metrics.PeerSendBytesTotal. - With("peer_id", string(p.ID()), "chID", p.mlc.ChIDToMetricLabel(chID)). - Add(float64(len(msgBytes))) - p.metrics.MessageSendBytesTotal. - With("message_type", metricLabelValue). - Add(float64(len(msgBytes))) + p.metrics.MessageSendBytesTotal.With("message_type", metricLabelValue).Add(float64(len(msgBytes))) } return res } From 1923de1830eaf4c2ba1d7f57ec819b5214f8377e Mon Sep 17 00:00:00 2001 From: Dev Ojha Date: Sat, 15 Jun 2024 14:35:32 -0400 Subject: [PATCH 08/30] Backport #3211 --- ...ake-cs-reactor-no-longer-takes-cs-locks.md | 4 ++ internal/consensus/reactor.go | 43 ++++++++++++------- internal/consensus/state.go | 10 ++++- 3 files changed, 40 insertions(+), 17 deletions(-) create mode 100644 .changelog/unreleased/improvements/3211-make-cs-reactor-no-longer-takes-cs-locks.md diff --git a/.changelog/unreleased/improvements/3211-make-cs-reactor-no-longer-takes-cs-locks.md b/.changelog/unreleased/improvements/3211-make-cs-reactor-no-longer-takes-cs-locks.md new file mode 100644 index 0000000000..72fae64598 --- /dev/null +++ b/.changelog/unreleased/improvements/3211-make-cs-reactor-no-longer-takes-cs-locks.md @@ -0,0 +1,4 @@ +- `[consensus]` Make the consensus reactor no longer have packets on receive take the consensus lock. +Consensus will now update the reactor's view after every relevant change through the existing +synchronous event bus subscription. + ([\#3211](https://github.com/cometbft/cometbft/pull/3211)) diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index 7e3beefd87..3a090d5a7c 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -46,8 +46,9 @@ type Reactor struct { waitSync atomic.Bool eventBus *types.EventBus - rsMtx cmtsync.Mutex - rs *cstypes.RoundState + rsMtx cmtsync.RWMutex + rs *cstypes.RoundState + initialHeight int64 // under rsMtx Metrics *Metrics } @@ -267,9 +268,9 @@ func (conR *Reactor) Receive(e p2p.Envelope) { case StateChannel: switch msg := msg.(type) { case *NewRoundStepMessage: - conR.conS.mtx.Lock() - initialHeight := conR.conS.state.InitialHeight - conR.conS.mtx.Unlock() + conR.rsMtx.RLock() + initialHeight := conR.initialHeight + conR.rsMtx.RUnlock() if err = msg.ValidateHeight(initialHeight); err != nil { conR.Logger.Error("Peer sent us invalid msg", "peer", e.Src, "msg", msg, "err", err) conR.Switch.StopPeerForError(e.Src, err) @@ -283,10 +284,9 @@ func (conR *Reactor) Receive(e p2p.Envelope) { case *HasProposalBlockPartMessage: ps.ApplyHasProposalBlockPartMessage(msg) case *VoteSetMaj23Message: - cs := conR.conS - cs.mtx.Lock() - height, votes := cs.Height, cs.Votes - cs.mtx.Unlock() + conR.rsMtx.RLock() + height, votes := conR.rs.Height, conR.rs.Votes + conR.rsMtx.RUnlock() if height != msg.Height { return } @@ -351,9 +351,9 @@ func (conR *Reactor) Receive(e p2p.Envelope) { switch msg := msg.(type) { case *VoteMessage: cs := conR.conS - cs.mtx.RLock() - height, valSize, lastCommitSize := cs.Height, cs.Validators.Size(), cs.LastCommit.Size() - cs.mtx.RUnlock() + conR.rsMtx.RLock() + height, valSize, lastCommitSize := conR.rs.Height, conR.rs.Validators.Size(), conR.rs.LastCommit.Size() + conR.rsMtx.RUnlock() ps.EnsureVoteBitArrays(height, valSize) ps.EnsureVoteBitArrays(height-1, lastCommitSize) ps.SetHasVote(msg.Vote) @@ -372,10 +372,9 @@ func (conR *Reactor) Receive(e p2p.Envelope) { } switch msg := msg.(type) { case *VoteSetBitsMessage: - cs := conR.conS - cs.mtx.Lock() - height, votes := cs.Height, cs.Votes - cs.mtx.Unlock() + conR.rsMtx.RLock() + height, votes := conR.rs.Height, conR.rs.Votes + conR.rsMtx.RUnlock() if height == msg.Height { var ourVotes *bits.BitArray @@ -421,6 +420,7 @@ func (conR *Reactor) subscribeToBroadcastEvents() { const subscriber = "consensus-reactor" if err := conR.conS.evsw.AddListenerForEvent(subscriber, types.EventNewRoundStep, func(data cmtevents.EventData) { + conR.updateRoundStateNoCsLock() conR.broadcastNewRoundStepMessage(data.(*cstypes.RoundState)) }); err != nil { conR.Logger.Error("Error adding listener for events (NewRoundStep)", "err", err) @@ -428,6 +428,7 @@ func (conR *Reactor) subscribeToBroadcastEvents() { if err := conR.conS.evsw.AddListenerForEvent(subscriber, types.EventValidBlock, func(data cmtevents.EventData) { + conR.updateRoundStateNoCsLock() conR.broadcastNewValidBlockMessage(data.(*cstypes.RoundState)) }); err != nil { conR.Logger.Error("Error adding listener for events (ValidBlock)", "err", err) @@ -435,6 +436,7 @@ func (conR *Reactor) subscribeToBroadcastEvents() { if err := conR.conS.evsw.AddListenerForEvent(subscriber, types.EventVote, func(data cmtevents.EventData) { + conR.updateRoundStateNoCsLock() conR.broadcastHasVoteMessage(data.(*types.Vote)) }); err != nil { conR.Logger.Error("Error adding listener for events (Vote)", "err", err) @@ -562,10 +564,19 @@ func (conR *Reactor) updateRoundStateRoutine() { rs := conR.conS.GetRoundState() conR.rsMtx.Lock() conR.rs = rs + conR.initialHeight = conR.conS.state.InitialHeight conR.rsMtx.Unlock() } } +func (conR *Reactor) updateRoundStateNoCsLock() { + rs := conR.conS.getRoundState() + conR.rsMtx.Lock() + conR.rs = rs + conR.initialHeight = conR.conS.state.InitialHeight + conR.rsMtx.Unlock() +} + func (conR *Reactor) getRoundState() *cstypes.RoundState { conR.rsMtx.Lock() defer conR.rsMtx.Unlock() diff --git a/internal/consensus/state.go b/internal/consensus/state.go index 70bb4f5870..45147e5580 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -244,10 +244,18 @@ func (cs *State) GetLastHeight() int64 { } // GetRoundState returns a shallow copy of the internal consensus state. +// This function is thread-safe. func (cs *State) GetRoundState() *cstypes.RoundState { cs.mtx.RLock() - rs := cs.RoundState // copy + rs := cs.getRoundState() cs.mtx.RUnlock() + return rs +} + +// getRoundState returns a shallow copy of the internal consensus state. +// This function is not thread-safe. Use GetRoundState for the thread-safe version. +func (cs *State) getRoundState() *cstypes.RoundState { + rs := cs.RoundState // copy return &rs } From 9712c457a30efd8da23892a3de0f3064da0e7938 Mon Sep 17 00:00:00 2001 From: Dev Ojha Date: Sat, 15 Jun 2024 16:16:51 -0400 Subject: [PATCH 09/30] Fix Race --- internal/consensus/reactor.go | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index 3a090d5a7c..75de02edf2 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -41,11 +41,10 @@ const ( type Reactor struct { p2p.BaseReactor // BaseService + p2p.Switch - conS *State - - waitSync atomic.Bool - eventBus *types.EventBus - + conS *State + mtx cmtsync.RWMutex + waitSync atomic.Bool + eventBus *types.EventBus rsMtx cmtsync.RWMutex rs *cstypes.RoundState initialHeight int64 // under rsMtx @@ -561,11 +560,13 @@ func (conR *Reactor) updateRoundStateRoutine() { if !conR.IsRunning() { return } - rs := conR.conS.GetRoundState() - conR.rsMtx.Lock() + conR.conS.mtx.RLock() + rs, initialHeight := conR.conS.getRoundState(), conR.conS.state.InitialHeight + conR.conS.mtx.RUnlock() + conR.mtx.Lock() conR.rs = rs - conR.initialHeight = conR.conS.state.InitialHeight - conR.rsMtx.Unlock() + conR.initialHeight = initialHeight + conR.mtx.Unlock() } } From aaeb60d14f3c8dbd7fb5ab9a10f78003553efc74 Mon Sep 17 00:00:00 2001 From: Dev Ojha Date: Sat, 15 Jun 2024 16:24:36 -0400 Subject: [PATCH 10/30] bp #3157 --- .../3157-late-votes-dont-take-cs-mtx.md | 4 ++++ internal/consensus/reactor.go | 20 +++++++++++++------ 2 files changed, 18 insertions(+), 6 deletions(-) create mode 100644 .changelog/unreleased/improvements/3157-late-votes-dont-take-cs-mtx.md diff --git a/.changelog/unreleased/improvements/3157-late-votes-dont-take-cs-mtx.md b/.changelog/unreleased/improvements/3157-late-votes-dont-take-cs-mtx.md new file mode 100644 index 0000000000..fa6c49a057 --- /dev/null +++ b/.changelog/unreleased/improvements/3157-late-votes-dont-take-cs-mtx.md @@ -0,0 +1,4 @@ +- [`consensus`] Check for late votes in the reactor, preventing them from entering the +single-threaded consensus logic. This change is a performance optimization that reduces the number +of redundant votes that are processed by the consensus logic. + ([\#3157](https://github.com/cometbft/cometbft/issues/3157) diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index 75de02edf2..55f1720545 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -350,14 +350,22 @@ func (conR *Reactor) Receive(e p2p.Envelope) { switch msg := msg.(type) { case *VoteMessage: cs := conR.conS - conR.rsMtx.RLock() - height, valSize, lastCommitSize := conR.rs.Height, conR.rs.Validators.Size(), conR.rs.LastCommit.Size() - conR.rsMtx.RUnlock() - ps.EnsureVoteBitArrays(height, valSize) - ps.EnsureVoteBitArrays(height-1, lastCommitSize) + conR.mtx.RLock() + height, round := conR.rs.Height, conR.rs.Round + conR.mtx.RUnlock() ps.SetHasVote(msg.Vote) - cs.peerMsgQueue <- msgInfo{msg, e.Src.ID(), time.Time{}} + // if vote is late, and is not a precommit for the last block, mark it late and return. + isLate := msg.Vote.Height < height || (msg.Vote.Height == height && msg.Vote.Round < round) + if isLate { + notLastBlockPrecommit := msg.Vote.Type != types.PrecommitType || msg.Vote.Height != height-1 + if notLastBlockPrecommit { + cs.metrics.MarkLateVote(msg.Vote.Type) + return + } + } + + cs.peerMsgQueue <- msgInfo{msg, e.Src.ID(), cmttime.Now()} default: // don't punish (leave room for soft upgrades) From 5a67d238de16c541e37390cdb7b8d025a0086acb Mon Sep 17 00:00:00 2001 From: Dev Ojha Date: Sat, 15 Jun 2024 18:34:49 -0400 Subject: [PATCH 11/30] Speedup tests that were hitting timeouts --- internal/consensus/invalid_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/consensus/invalid_test.go b/internal/consensus/invalid_test.go index 1ef066ba79..40c67f0306 100644 --- a/internal/consensus/invalid_test.go +++ b/internal/consensus/invalid_test.go @@ -51,7 +51,7 @@ func TestReactorInvalidPrecommit(t *testing.T) { } byzVal.mtx.Unlock() - // wait for a bunch of blocks + // wait for a bunch of blocks, from each validator // TODO: make this tighter by ensuring the halt happens by block 2 for i := 0; i < 10; i++ { timeoutWaitGroup(n, func(j int) { From 468c6f06f2d23c530fe6d4c5529bd51483262351 Mon Sep 17 00:00:00 2001 From: Dev Ojha Date: Sat, 15 Jun 2024 20:21:17 -0400 Subject: [PATCH 12/30] bp #3161 --- ...lockpart-redundancy-checks-into-reactor.md | 2 + internal/consensus/reactor.go | 13 ++++- internal/consensus/state.go | 55 ++++++++++++++----- 3 files changed, 55 insertions(+), 15 deletions(-) create mode 100644 .changelog/unreleased/improvements/3161-move-blockpart-redundancy-checks-into-reactor.md diff --git a/.changelog/unreleased/improvements/3161-move-blockpart-redundancy-checks-into-reactor.md b/.changelog/unreleased/improvements/3161-move-blockpart-redundancy-checks-into-reactor.md new file mode 100644 index 0000000000..285c906c7e --- /dev/null +++ b/.changelog/unreleased/improvements/3161-move-blockpart-redundancy-checks-into-reactor.md @@ -0,0 +1,2 @@ +- [`consensus`] Move blockpart redundancy/lateness checks into the reactor, preventing late / duplicate parts from blocking consensus. + ([\#3161](https://github.com/cometbft/cometbft/issues/3161) diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index 55f1720545..caefc92d06 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -337,7 +337,18 @@ func (conR *Reactor) Receive(e p2p.Envelope) { case *BlockPartMessage: ps.SetHasProposalBlockPart(msg.Height, msg.Round, int(msg.Part.Index)) conR.Metrics.BlockParts.With("peer_id", string(e.Src.ID())).Add(1) - conR.conS.peerMsgQueue <- msgInfo{msg, e.Src.ID(), time.Time{}} + + conR.mtx.RLock() + height, blockParts := conR.rs.Height, conR.rs.ProposalBlockParts + conR.mtx.RUnlock() + + allowFutureBlockPart := true + ok := allowProcessingProposalBlockPart(msg, conR.Logger, conR.Metrics, height, blockParts, allowFutureBlockPart, e.Src.ID()) + if !ok { + return + } + + conR.conS.peerMsgQueue <- msgInfo{msg, e.Src.ID(), time.Now()} default: conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) } diff --git a/internal/consensus/state.go b/internal/consensus/state.go index 45147e5580..284453358b 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -2087,31 +2087,58 @@ func (cs *State) defaultSetProposal(proposal *types.Proposal, recvTime time.Time return nil } -// NOTE: block is not necessarily valid. -// Asynchronously triggers either enterPrevote (before we timeout of propose) or tryFinalizeCommit, -// once we have the full block. -func (cs *State) addProposalBlockPart(msg *BlockPartMessage, peerID p2p.ID) (added bool, err error) { +// checks if we should allow processing the proposal block part. +// Shared code between reactor and state machine. +// This must not modify csBlockParts, only take read-only accesses to it. +// Returns true if the block part is not old or duplicated. +func allowProcessingProposalBlockPart(msg *BlockPartMessage, logger log.Logger, metrics *Metrics, csHeight int64, csBlockParts *types.PartSet, allowFutureHeights bool, peerID p2p.ID) bool { height, round, part := msg.Height, msg.Round, msg.Part - // Blocks might be reused, so round mismatch is OK - if cs.Height != height { - cs.Logger.Debug("Received block part from wrong height", "height", height, "round", round) - cs.metrics.BlockGossipPartsReceived.With("matches_current", "false").Add(1) - return false, nil + // Blocks might be reused, so round mismatch is OK. Meant for reactor, where we may get + // future block parts while the proposal for the next block is still in message queue. + if allowFutureHeights && height > csHeight { + return true + } + if csHeight != height { + logger.Debug("received block part from wrong height", "height", height, "round", round) + metrics.BlockGossipPartsReceived.With("matches_current", "false").Add(1) + return false } // We're not expecting a block part. - if cs.ProposalBlockParts == nil { - cs.metrics.BlockGossipPartsReceived.With("matches_current", "false").Add(1) + if csBlockParts == nil { + if allowFutureHeights { + return true + } + metrics.BlockGossipPartsReceived.With("matches_current", "false").Add(1) // NOTE: this can happen when we've gone to a higher round and // then receive parts from the previous round - not necessarily a bad peer. - cs.Logger.Debug( - "Received a block part when we are not expecting any", + logger.Debug( + "received a block part when we are not expecting any", "height", height, "round", round, "index", part.Index, "peer", peerID, ) + return false + } + + if csBlockParts.IsComplete() || csBlockParts.GetPart(int(part.Index)) != nil { + return false + } + + return true +} + +// NOTE: block is not necessarily valid. +// Asynchronously triggers either enterPrevote (before we timeout of propose) or tryFinalizeCommit, +// once we have the full block. +func (cs *State) addProposalBlockPart(msg *BlockPartMessage, peerID p2p.ID) (added bool, err error) { + part := msg.Part + // TODO: better handle block parts for future heights, by saving them and processing them later. + allowFutureBlockPart := false + ok := allowProcessingProposalBlockPart(msg, cs.Logger, cs.metrics, cs.Height, cs.ProposalBlockParts, allowFutureBlockPart, peerID) + if !ok { return false, nil } @@ -2133,7 +2160,7 @@ func (cs *State) addProposalBlockPart(msg *BlockPartMessage, peerID p2p.ID) (add } count, total := cs.ProposalBlockParts.Count(), cs.ProposalBlockParts.Total() - cs.Logger.Debug("Receive block part", "height", height, "round", round, + cs.Logger.Debug("Receive block part", "height", cs.Height, "round", cs.Round, "index", part.Index, "count", count, "total", total, "from", peerID) maxBytes := cs.state.ConsensusParams.Block.MaxBytes From f7dd15daa3cadc2c272abfe2a7e1fca51ab93657 Mon Sep 17 00:00:00 2001 From: Dev Ojha Date: Sat, 15 Jun 2024 20:27:26 -0400 Subject: [PATCH 13/30] Fix data race --- internal/consensus/state.go | 2 +- types/part_set.go | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/internal/consensus/state.go b/internal/consensus/state.go index 284453358b..2852d790d7 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -2123,7 +2123,7 @@ func allowProcessingProposalBlockPart(msg *BlockPartMessage, logger log.Logger, return false } - if csBlockParts.IsComplete() || csBlockParts.GetPart(int(part.Index)) != nil { + if csBlockParts.IsCompleteMtx() || csBlockParts.GetPart(int(part.Index)) != nil { return false } diff --git a/types/part_set.go b/types/part_set.go index 4223a196a7..4f5ff70519 100644 --- a/types/part_set.go +++ b/types/part_set.go @@ -322,6 +322,12 @@ func (ps *PartSet) IsComplete() bool { return ps.count == ps.total } +func (ps *PartSet) IsCompleteMtx() bool { + ps.mtx.Lock() + defer ps.mtx.Unlock() + return ps.count == ps.total +} + func (ps *PartSet) GetReader() io.Reader { if !ps.IsComplete() { panic("Cannot GetReader() on incomplete PartSet") From b880795de6a71769cddbe4d46e534947862f9ad0 Mon Sep 17 00:00:00 2001 From: Dev Ojha Date: Thu, 20 Jun 2024 15:40:21 -0400 Subject: [PATCH 14/30] Revert reactor change (somehow hit a failure) --- internal/consensus/reactor.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index caefc92d06..8b6c7ca645 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -338,15 +338,15 @@ func (conR *Reactor) Receive(e p2p.Envelope) { ps.SetHasProposalBlockPart(msg.Height, msg.Round, int(msg.Part.Index)) conR.Metrics.BlockParts.With("peer_id", string(e.Src.ID())).Add(1) - conR.mtx.RLock() - height, blockParts := conR.rs.Height, conR.rs.ProposalBlockParts - conR.mtx.RUnlock() - - allowFutureBlockPart := true - ok := allowProcessingProposalBlockPart(msg, conR.Logger, conR.Metrics, height, blockParts, allowFutureBlockPart, e.Src.ID()) - if !ok { - return - } + // conR.mtx.RLock() + // height, blockParts := conR.rs.Height, conR.rs.ProposalBlockParts + // conR.mtx.RUnlock() + + // allowFutureBlockPart := true + // ok := allowProcessingProposalBlockPart(msg, conR.Logger, conR.Metrics, height, blockParts, allowFutureBlockPart, e.Src.ID()) + // if !ok { + // return + // } conR.conS.peerMsgQueue <- msgInfo{msg, e.Src.ID(), time.Now()} default: From 1fd6499d40bab0507a6d90a05ed31407171d9814 Mon Sep 17 00:00:00 2001 From: Dev Ojha Date: Thu, 20 Jun 2024 15:53:47 -0400 Subject: [PATCH 15/30] More proper fix --- internal/consensus/reactor.go | 18 +++++++++--------- internal/consensus/state.go | 4 ++++ 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index 8b6c7ca645..caefc92d06 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -338,15 +338,15 @@ func (conR *Reactor) Receive(e p2p.Envelope) { ps.SetHasProposalBlockPart(msg.Height, msg.Round, int(msg.Part.Index)) conR.Metrics.BlockParts.With("peer_id", string(e.Src.ID())).Add(1) - // conR.mtx.RLock() - // height, blockParts := conR.rs.Height, conR.rs.ProposalBlockParts - // conR.mtx.RUnlock() - - // allowFutureBlockPart := true - // ok := allowProcessingProposalBlockPart(msg, conR.Logger, conR.Metrics, height, blockParts, allowFutureBlockPart, e.Src.ID()) - // if !ok { - // return - // } + conR.mtx.RLock() + height, blockParts := conR.rs.Height, conR.rs.ProposalBlockParts + conR.mtx.RUnlock() + + allowFutureBlockPart := true + ok := allowProcessingProposalBlockPart(msg, conR.Logger, conR.Metrics, height, blockParts, allowFutureBlockPart, e.Src.ID()) + if !ok { + return + } conR.conS.peerMsgQueue <- msgInfo{msg, e.Src.ID(), time.Now()} default: diff --git a/internal/consensus/state.go b/internal/consensus/state.go index 2852d790d7..c237fe5fca 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -2123,6 +2123,10 @@ func allowProcessingProposalBlockPart(msg *BlockPartMessage, logger log.Logger, return false } + if part.Index >= csBlockParts.Total() { + return false + } + if csBlockParts.IsCompleteMtx() || csBlockParts.GetPart(int(part.Index)) != nil { return false } From 5f565c9a27e46700613f8a139e1cde7542060c7b Mon Sep 17 00:00:00 2001 From: Dev Ojha Date: Wed, 26 Jun 2024 11:16:44 -0500 Subject: [PATCH 16/30] perf(p2p/secretconn): Buffer secret connection writes #3346 (#115) * Buffer secret connection writes * Add changelog * Add changelog v2 --- .../3346-buffer-secret-conn-writs.md | 2 + CHANGELOG.md | 9 ++++ p2p/conn/evil_secret_connection_test.go | 15 ++++--- p2p/conn/secret_connection.go | 43 ++++++++++++------- 4 files changed, 48 insertions(+), 21 deletions(-) create mode 100644 .changelog/unreleased/improvements/3346-buffer-secret-conn-writs.md diff --git a/.changelog/unreleased/improvements/3346-buffer-secret-conn-writs.md b/.changelog/unreleased/improvements/3346-buffer-secret-conn-writs.md new file mode 100644 index 0000000000..e597b88f64 --- /dev/null +++ b/.changelog/unreleased/improvements/3346-buffer-secret-conn-writs.md @@ -0,0 +1,2 @@ +- `[p2p/secretconn]` Speedup secretconnection large writes, by buffering the write to the underlying connection. + ([\#3346](https://github.com/cometbft/cometbft/pull/3346)) \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index e59416dcc6..9600f6f3a7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,15 @@ # CHANGELOG +<<<<<<< HEAD ## Unreleased +======= +## v0.37.4-v25-osmo-10 + +* [#115](https://github.com/osmosis-labs/cometbft/pull/115) perf(p2p/secretconn): Buffer secret connection writes (#3346) + + +## v0.37.4-v25-osmo-9 +>>>>>>> 9f773defd ( perf(p2p/secretconn): Buffer secret connection writes #3346 (#115)) *July 1, 2024* diff --git a/p2p/conn/evil_secret_connection_test.go b/p2p/conn/evil_secret_connection_test.go index c59b27d8c4..678f589739 100644 --- a/p2p/conn/evil_secret_connection_test.go +++ b/p2p/conn/evil_secret_connection_test.go @@ -1,6 +1,7 @@ package conn import ( + "bufio" "bytes" "errors" "io" @@ -222,12 +223,14 @@ func (c *evilConn) signChallenge() []byte { b := &buffer{} c.secretConn = &SecretConnection{ - conn: b, - recvBuffer: nil, - recvNonce: new([aeadNonceSize]byte), - sendNonce: new([aeadNonceSize]byte), - recvAead: recvAead, - sendAead: sendAead, + underlyingConn: b, + connReader: b, + connWriter: bufio.NewWriterSize(b, 65536), + recvBuffer: nil, + recvNonce: new([aeadNonceSize]byte), + sendNonce: new([aeadNonceSize]byte), + recvAead: recvAead, + sendAead: sendAead, } c.buffer = b diff --git a/p2p/conn/secret_connection.go b/p2p/conn/secret_connection.go index 6dc626ef9e..07f555c869 100644 --- a/p2p/conn/secret_connection.go +++ b/p2p/conn/secret_connection.go @@ -1,6 +1,7 @@ package conn import ( + "bufio" "bytes" "crypto/cipher" crand "crypto/rand" @@ -43,6 +44,9 @@ const ( labelEphemeralUpperPublicKey = "EPHEMERAL_UPPER_PUBLIC_KEY" labelDHSecret = "DH_SECRET" labelSecretConnectionMac = "SECRET_CONNECTION_MAC" + + defaultWriteBufferSize = 1024 * 1024 + defaultReadBufferSize = 65536 ) var ( @@ -66,7 +70,10 @@ type SecretConnection struct { sendAead cipher.AEAD remPubKey crypto.PubKey - conn io.ReadWriteCloser + + underlyingConn io.ReadWriteCloser + connWriter *bufio.Writer + connReader io.Reader // net.Conn must be thread safe: // https://golang.org/pkg/net/#Conn. @@ -141,12 +148,14 @@ func MakeSecretConnection(conn io.ReadWriteCloser, locPrivKey crypto.PrivKey) (* } sc := &SecretConnection{ - conn: conn, - recvBuffer: nil, - recvNonce: new([aeadNonceSize]byte), - sendNonce: new([aeadNonceSize]byte), - recvAead: recvAead, - sendAead: sendAead, + underlyingConn: conn, + connWriter: bufio.NewWriterSize(conn, defaultWriteBufferSize), + connReader: conn, + recvBuffer: nil, + recvNonce: new([aeadNonceSize]byte), + sendNonce: new([aeadNonceSize]byte), + recvAead: recvAead, + sendAead: sendAead, } // Sign the challenge bytes for authentication. @@ -210,7 +219,7 @@ func (sc *SecretConnection) Write(data []byte) (n int, err error) { incrNonce(sc.sendNonce) // end encryption - _, err = sc.conn.Write(sealedFrame) + _, err = sc.connWriter.Write(sealedFrame) if err != nil { return err } @@ -220,6 +229,7 @@ func (sc *SecretConnection) Write(data []byte) (n int, err error) { return n, err } } + sc.connWriter.Flush() return n, err } @@ -238,7 +248,7 @@ func (sc *SecretConnection) Read(data []byte) (n int, err error) { // read off the conn sealedFrame := pool.Get(aeadSizeOverhead + totalFrameSize) defer pool.Put(sealedFrame) - _, err = io.ReadFull(sc.conn, sealedFrame) + _, err = io.ReadFull(sc.connReader, sealedFrame) if err != nil { return n, err } @@ -270,16 +280,19 @@ func (sc *SecretConnection) Read(data []byte) (n int, err error) { } // Implements net.Conn. -func (sc *SecretConnection) Close() error { return sc.conn.Close() } -func (sc *SecretConnection) LocalAddr() net.Addr { return sc.conn.(net.Conn).LocalAddr() } -func (sc *SecretConnection) RemoteAddr() net.Addr { return sc.conn.(net.Conn).RemoteAddr() } -func (sc *SecretConnection) SetDeadline(t time.Time) error { return sc.conn.(net.Conn).SetDeadline(t) } +func (sc *SecretConnection) Close() error { return sc.underlyingConn.Close() } +func (sc *SecretConnection) LocalAddr() net.Addr { return sc.underlyingConn.(net.Conn).LocalAddr() } +func (sc *SecretConnection) RemoteAddr() net.Addr { return sc.underlyingConn.(net.Conn).RemoteAddr() } +func (sc *SecretConnection) SetDeadline(t time.Time) error { + return sc.underlyingConn.(net.Conn).SetDeadline(t) +} + func (sc *SecretConnection) SetReadDeadline(t time.Time) error { - return sc.conn.(net.Conn).SetReadDeadline(t) + return sc.underlyingConn.(net.Conn).SetReadDeadline(t) } func (sc *SecretConnection) SetWriteDeadline(t time.Time) error { - return sc.conn.(net.Conn).SetWriteDeadline(t) + return sc.underlyingConn.(net.Conn).SetWriteDeadline(t) } func genEphKeys() (ephPub, ephPriv *[32]byte) { From 6153ad5d7bb5c058ab8a698eca1d2c78236c1cc6 Mon Sep 17 00:00:00 2001 From: Dev Ojha Date: Wed, 3 Jul 2024 15:10:23 +0100 Subject: [PATCH 17/30] Switch HasVote gossip to TrySend (#120) --- internal/consensus/reactor.go | 2 +- p2p/switch.go | 14 ++++++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index caefc92d06..c292a3a1ee 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -510,7 +510,7 @@ func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) { } go func() { - conR.Switch.BroadcastEnvelope(p2p.Envelope{ + conR.Switch.TryBroadcast(p2p.Envelope{ ChannelID: StateChannel, Message: msg, }) diff --git a/p2p/switch.go b/p2p/switch.go index ced9278e33..e30a561b42 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -284,6 +284,20 @@ func (sw *Switch) BroadcastEnvelope(e Envelope) { } } +// TryBroadcast runs a go routine for each attempted send. +// +// NOTE: TryBroadcast uses goroutines, so order of broadcast may not be preserved. +func (sw *Switch) TryBroadcast(e Envelope) { + sw.Logger.Debug("TryBroadcast", "channel", e.ChannelID) + + peers := sw.peers.List() + for _, peer := range peers { + go func(p Peer) { + p.TrySendEnvelope(e) + }(peer) + } +} + // NumPeers returns the count of outbound/inbound and outbound-dialing peers. // unconditional peers are not counted here. func (sw *Switch) NumPeers() (outbound, inbound, dialing int) { From 390fe593ec87873fce81c718843b19552f749f43 Mon Sep 17 00:00:00 2001 From: Dev Ojha Date: Wed, 3 Jul 2024 15:10:58 +0100 Subject: [PATCH 18/30] Remove one more expensive debug log --- p2p/conn/connection.go | 2 +- p2p/switch.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index b048a578a0..71d8f4a8a7 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -881,7 +881,7 @@ func (ch *Channel) writePacketMsgTo(w protoio.Writer) (n int, err error) { // complete. NOTE message bytes may change on next call to recvPacketMsg. // Not goroutine-safe. func (ch *Channel) recvPacketMsg(packet tmp2p.PacketMsg) ([]byte, error) { - ch.Logger.Debug("Read PacketMsg", "conn", ch.conn, "packet", packet) + // ch.Logger.Debug("Read PacketMsg", "conn", ch.conn, "packet", packet) recvCap, recvReceived := ch.desc.RecvMessageCapacity, len(ch.recving)+len(packet.Data) if recvCap < recvReceived { return nil, fmt.Errorf("received message exceeds available capacity: %v < %v", recvCap, recvReceived) diff --git a/p2p/switch.go b/p2p/switch.go index e30a561b42..a30bdd0684 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -290,10 +290,10 @@ func (sw *Switch) BroadcastEnvelope(e Envelope) { func (sw *Switch) TryBroadcast(e Envelope) { sw.Logger.Debug("TryBroadcast", "channel", e.ChannelID) - peers := sw.peers.List() + peers := sw.peers.list for _, peer := range peers { go func(p Peer) { - p.TrySendEnvelope(e) + p.TrySend(e) }(peer) } } From 04815278e72329f0038f7a41947eac8b8f8f98ae Mon Sep 17 00:00:00 2001 From: Dev Ojha Date: Wed, 3 Jul 2024 18:10:16 +0100 Subject: [PATCH 19/30] one more expensive debug log comment-out --- p2p/peer.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/p2p/peer.go b/p2p/peer.go index 48af3c0cfb..fd903e676e 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -311,13 +311,13 @@ func (p *peer) hasChannel(chID byte) bool { } // NOTE: probably will want to remove this // but could be helpful while the feature is new - p.Logger.Debug( - "Unknown channel for peer", - "channel", - chID, - "channels", - p.channels, - ) + // p.Logger.Debug( + // "Unknown channel for peer", + // "channel", + // chID, + // "channels", + // p.channels, + // ) return false } From c2847a1b65342a632d75815557eb79c00ddfcf62 Mon Sep 17 00:00:00 2001 From: Dev Ojha Date: Wed, 3 Jul 2024 23:04:56 +0100 Subject: [PATCH 20/30] Backport 3411, batch per-messagetype bytes received/sent metrics (#121) --- p2p/metrics.go | 70 +++++++++++++++++++++++++++++++++++++++++++++++--- p2p/peer.go | 49 +++++++++++++++++++++++------------ 2 files changed, 99 insertions(+), 20 deletions(-) diff --git a/p2p/metrics.go b/p2p/metrics.go index 6ba7592636..492cff6ee1 100644 --- a/p2p/metrics.go +++ b/p2p/metrics.go @@ -56,12 +56,70 @@ func (m *metricsLabelCache) ChIDToMetricLabel(chID byte) string { return m.chIDLabelNames[chID] } +type peerPendingMetricsCache struct { + mtx *sync.Mutex + perMessageCache map[reflect.Type]peerPendingMetricsCacheEntry +} + +type peerPendingMetricsCacheEntry struct { + label string + pendingSendBytes int + pendingRecvBytes int +} + +func peerPendingMetricsCacheFromMlc(mlc *metricsLabelCache) *peerPendingMetricsCache { + pendingCache := &peerPendingMetricsCache{ + mtx: &sync.Mutex{}, + perMessageCache: make(map[reflect.Type]peerPendingMetricsCacheEntry), + } + if mlc != nil { + mlc.mtx.RLock() + for k, v := range mlc.messageLabelNames { + pendingCache.perMessageCache[k] = peerPendingMetricsCacheEntry{label: v} + } + mlc.mtx.RUnlock() + } + return pendingCache +} + +func (c *peerPendingMetricsCache) AddPendingSendBytes(msgType reflect.Type, addBytes int) { + c.mtx.Lock() + defer c.mtx.Unlock() + if entry, ok := c.perMessageCache[msgType]; ok { + entry.pendingSendBytes += addBytes + c.perMessageCache[msgType] = entry + } else { + c.perMessageCache[msgType] = peerPendingMetricsCacheEntry{ + label: buildLabel(msgType), + pendingSendBytes: addBytes, + } + } +} + +func (c *peerPendingMetricsCache) AddPendingRecvBytes(msgType reflect.Type, addBytes int) { + c.mtx.Lock() + defer c.mtx.Unlock() + if entry, ok := c.perMessageCache[msgType]; ok { + entry.pendingRecvBytes += addBytes + c.perMessageCache[msgType] = entry + } else { + c.perMessageCache[msgType] = peerPendingMetricsCacheEntry{ + label: buildLabel(msgType), + pendingRecvBytes: addBytes, + } + } +} + +func getMsgType(i any) reflect.Type { + return reflect.TypeOf(i) +} + // ValueToMetricLabel is a method that is used to produce a prometheus label value of the golang // type that is passed in. // This method uses a map on the Metrics struct so that each label name only needs // to be produced once to prevent expensive string operations. func (m *metricsLabelCache) ValueToMetricLabel(i any) string { - t := reflect.TypeOf(i) + t := getMsgType(i) m.mtx.RLock() if s, ok := m.messageLabelNames[t]; ok { @@ -70,15 +128,19 @@ func (m *metricsLabelCache) ValueToMetricLabel(i any) string { } m.mtx.RUnlock() - s := t.String() - ss := valueToLabelRegexp.FindStringSubmatch(s) - l := fmt.Sprintf("%s_%s", ss[1], ss[2]) + l := buildLabel(t) m.mtx.Lock() defer m.mtx.Unlock() m.messageLabelNames[t] = l return l } +func buildLabel(msgType reflect.Type) string { + s := msgType.String() + ss := valueToLabelRegexp.FindStringSubmatch(s) + return fmt.Sprintf("%s_%s", ss[1], ss[2]) +} + func newMetricsLabelCache() *metricsLabelCache { return &metricsLabelCache{ mtx: &sync.RWMutex{}, diff --git a/p2p/peer.go b/p2p/peer.go index fd903e676e..451a9d36c2 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -120,8 +120,9 @@ type peer struct { // User data Data *cmap.CMap - metrics *Metrics - mlc *metricsLabelCache + metrics *Metrics + metricsTicker *time.Ticker + pendingMetrics *peerPendingMetricsCache // When removal of a peer fails, we set this flag removalAttemptFailed bool @@ -141,12 +142,13 @@ func newPeer( options ...PeerOption, ) *peer { p := &peer{ - peerConn: pc, - nodeInfo: nodeInfo, - channels: nodeInfo.(DefaultNodeInfo).Channels, - Data: cmap.NewCMap(), - metrics: NopMetrics(), - mlc: mlc, + peerConn: pc, + nodeInfo: nodeInfo, + channels: nodeInfo.(DefaultNodeInfo).Channels, + Data: cmap.NewCMap(), + metricsTicker: time.NewTicker(metricsTickerDuration), + metrics: NopMetrics(), + pendingMetrics: peerPendingMetricsCacheFromMlc(mlc), } p.mconn = createMConnection( @@ -271,7 +273,7 @@ func (p *peer) send(chID byte, msg proto.Message, sendFunc func(byte, []byte) bo } else if !p.hasChannel(chID) { return false } - metricLabelValue := p.mlc.ValueToMetricLabel(msg) + msgType := getMsgType(msg) if w, ok := msg.(types.Wrapper); ok { msg = w.Wrap() } @@ -282,7 +284,7 @@ func (p *peer) send(chID byte, msg proto.Message, sendFunc func(byte, []byte) bo } res := sendFunc(chID, msgBytes) if res { - p.metrics.MessageSendBytesTotal.With("message_type", metricLabelValue).Add(float64(len(msgBytes))) + p.pendingMetrics.AddPendingSendBytes(msgType, len(msgBytes)) } return res } @@ -378,6 +380,26 @@ func (p *peer) metricsReporter() { } p.metrics.PeerPendingSendBytes.With("peer_id", string(p.ID())).Set(sendQueueSize) + // Report per peer, per message total bytes, since the last interval + func() { + p.pendingMetrics.mtx.Lock() + defer p.pendingMetrics.mtx.Unlock() + for msgType, entry := range p.pendingMetrics.perMessageCache { + if entry.pendingSendBytes > 0 { + p.metrics.MessageSendBytesTotal. + With("message_type", entry.label). + Add(float64(entry.pendingSendBytes)) + entry.pendingSendBytes = 0 + } + if entry.pendingRecvBytes > 0 { + p.metrics.MessageReceiveBytesTotal. + With("message_type", entry.label). + Add(float64(entry.pendingRecvBytes)) + entry.pendingRecvBytes = 0 + } + p.pendingMetrics.perMessageCache[msgType] = entry + } + }() case <-p.Quit(): return } @@ -415,12 +437,7 @@ func createMConnection( panic(fmt.Sprintf("unwrapping message: %v", err)) } } - p.metrics.PeerReceiveBytesTotal. - With("peer_id", string(p.ID()), "chID", p.mlc.ChIDToMetricLabel(chID)). - Add(float64(len(msgBytes))) - p.metrics.MessageReceiveBytesTotal. - With("message_type", p.mlc.ValueToMetricLabel(msg)). - Add(float64(len(msgBytes))) + p.pendingMetrics.AddPendingRecvBytes(getMsgType(msg), len(msgBytes)) reactor.Receive(Envelope{ ChannelID: chID, Src: p, From af1810fb4041c0139d27d9fa24d1d697f125195b Mon Sep 17 00:00:00 2001 From: Dev Ojha Date: Wed, 3 Jul 2024 23:18:12 +0100 Subject: [PATCH 21/30] Secret conn remove pool buffer, align with upstream (#123) * Secret conn remove pool buffer, align with upstream * add changelog --- ...-remove-pool-buffer-usage-in-secretconn.md | 2 + CHANGELOG.md | 17 +++++ go.mod | 1 - go.sum | 2 - p2p/conn/evil_secret_connection_test.go | 19 +++--- p2p/conn/secret_connection.go | 68 +++++++++---------- 6 files changed, 63 insertions(+), 46 deletions(-) create mode 100644 .changelog/unreleased/improvements/3403-remove-pool-buffer-usage-in-secretconn.md diff --git a/.changelog/unreleased/improvements/3403-remove-pool-buffer-usage-in-secretconn.md b/.changelog/unreleased/improvements/3403-remove-pool-buffer-usage-in-secretconn.md new file mode 100644 index 0000000000..4069a79ef3 --- /dev/null +++ b/.changelog/unreleased/improvements/3403-remove-pool-buffer-usage-in-secretconn.md @@ -0,0 +1,2 @@ +- `[p2p/conn]` Remove the usage of a synchronous pool of buffers in secret connection, storing instead the buffer in the connection struct. This reduces the synchronization primitive usage, speeding up the code. + ([\#3403](https://github.com/cometbft/cometbft/issues/3403)) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9600f6f3a7..d775593865 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,8 +1,25 @@ # CHANGELOG +<<<<<<< HEAD <<<<<<< HEAD ## Unreleased ======= +======= +## Unreleased (I think) + +* [#123](https://github.com/osmosis-labs/cometbft/pull/123) perf(p2p/conn): Remove unneeded global pool buffers in secret connection #3403 +* perf(p2p): Delete expensive debug log already slated for deletion #3412 +* perf(p2p): Reduce the p2p metrics overhead. #3411 +* commit f663bd35153b0b366c1e1e6b41e7f2dcff7963fd : one more debug log deletion +* [#120](https://github.com/osmosis-labs/cometbft/pull/120) perf(consensus): Use TrySend for hasVote/HasBlockPart messages #3407 + + +## v0.37.4-v25-osmo-11 + +* [#117](https://github.com/osmosis-labs/cometbft/pull/117) fix(mempool)!: stop accepting TXs in the mempool if we can't keep up +* [#118](https://github.com/osmosis-labs/cometbft/pull/118) perf: bring back stop recheck + +>>>>>>> d941920d2 (Secret conn remove pool buffer, align with upstream (#123)) ## v0.37.4-v25-osmo-10 * [#115](https://github.com/osmosis-labs/cometbft/pull/115) perf(p2p/secretconn): Buffer secret connection writes (#3346) diff --git a/go.mod b/go.mod index 8b0816eef3..c9497f64eb 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,6 @@ require ( github.com/google/orderedcode v0.0.1 github.com/gorilla/websocket v1.5.3 github.com/lib/pq v1.10.9 - github.com/libp2p/go-buffer-pool v0.1.0 github.com/minio/highwayhash v1.0.2 github.com/ory/dockertest v3.3.5+incompatible github.com/pkg/errors v0.9.1 diff --git a/go.sum b/go.sum index 1fbaa5f9a7..6675dc06ca 100644 --- a/go.sum +++ b/go.sum @@ -257,8 +257,6 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= -github.com/libp2p/go-buffer-pool v0.1.0 h1:oK4mSFcQz7cTQIfqbe4MIj9gLW+mnanjyFtc6cdF0Y8= -github.com/libp2p/go-buffer-pool v0.1.0/go.mod h1:N+vh8gMqimBzdKkSMVuydVDq+UV5QTWy5HSiZacSbPg= github.com/linxGnu/grocksdb v1.8.14 h1:HTgyYalNwBSG/1qCQUIott44wU5b2Y9Kr3z7SK5OfGQ= github.com/linxGnu/grocksdb v1.8.14/go.mod h1:QYiYypR2d4v63Wj1adOOfzglnoII0gLj3PNh4fZkcFA= github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= diff --git a/p2p/conn/evil_secret_connection_test.go b/p2p/conn/evil_secret_connection_test.go index 678f589739..7c096a3274 100644 --- a/p2p/conn/evil_secret_connection_test.go +++ b/p2p/conn/evil_secret_connection_test.go @@ -223,14 +223,17 @@ func (c *evilConn) signChallenge() []byte { b := &buffer{} c.secretConn = &SecretConnection{ - underlyingConn: b, - connReader: b, - connWriter: bufio.NewWriterSize(b, 65536), - recvBuffer: nil, - recvNonce: new([aeadNonceSize]byte), - sendNonce: new([aeadNonceSize]byte), - recvAead: recvAead, - sendAead: sendAead, + conn: b, + connWriter: bufio.NewWriter(b), + recvBuffer: nil, + recvNonce: new([aeadNonceSize]byte), + sendNonce: new([aeadNonceSize]byte), + recvAead: recvAead, + sendAead: sendAead, + recvFrame: make([]byte, totalFrameSize), + recvSealedFrame: make([]byte, totalFrameSize+aeadSizeOverhead), + sendFrame: make([]byte, totalFrameSize), + sendSealedFrame: make([]byte, totalFrameSize+aeadSizeOverhead), } c.buffer = b diff --git a/p2p/conn/secret_connection.go b/p2p/conn/secret_connection.go index 07f555c869..3d44756e4e 100644 --- a/p2p/conn/secret_connection.go +++ b/p2p/conn/secret_connection.go @@ -15,7 +15,6 @@ import ( "time" gogotypes "github.com/cosmos/gogoproto/types" - pool "github.com/libp2p/go-buffer-pool" "github.com/oasisprotocol/curve25519-voi/primitives/merlin" "golang.org/x/crypto/chacha20poly1305" "golang.org/x/crypto/curve25519" @@ -71,9 +70,8 @@ type SecretConnection struct { remPubKey crypto.PubKey - underlyingConn io.ReadWriteCloser - connWriter *bufio.Writer - connReader io.Reader + conn io.ReadWriteCloser + connWriter *bufio.Writer // net.Conn must be thread safe: // https://golang.org/pkg/net/#Conn. @@ -82,12 +80,16 @@ type SecretConnection struct { // are independent, so we can use two mtxs. // All .Read are covered by recvMtx, // all .Write are covered by sendMtx. - recvMtx cmtsync.Mutex - recvBuffer []byte - recvNonce *[aeadNonceSize]byte - - sendMtx cmtsync.Mutex - sendNonce *[aeadNonceSize]byte + recvMtx cmtsync.Mutex + recvBuffer []byte + recvNonce *[aeadNonceSize]byte + recvFrame []byte + recvSealedFrame []byte + + sendMtx cmtsync.Mutex + sendNonce *[aeadNonceSize]byte + sendFrame []byte + sendSealedFrame []byte } // MakeSecretConnection performs handshake and returns a new authenticated @@ -148,14 +150,17 @@ func MakeSecretConnection(conn io.ReadWriteCloser, locPrivKey crypto.PrivKey) (* } sc := &SecretConnection{ - underlyingConn: conn, - connWriter: bufio.NewWriterSize(conn, defaultWriteBufferSize), - connReader: conn, - recvBuffer: nil, - recvNonce: new([aeadNonceSize]byte), - sendNonce: new([aeadNonceSize]byte), - recvAead: recvAead, - sendAead: sendAead, + conn: conn, + connWriter: bufio.NewWriterSize(conn, defaultWriteBufferSize), + recvBuffer: nil, + recvNonce: new([aeadNonceSize]byte), + sendNonce: new([aeadNonceSize]byte), + recvAead: recvAead, + sendAead: sendAead, + recvFrame: make([]byte, totalFrameSize), + recvSealedFrame: make([]byte, aeadSizeOverhead+totalFrameSize), + sendFrame: make([]byte, totalFrameSize), + sendSealedFrame: make([]byte, aeadSizeOverhead+totalFrameSize), } // Sign the challenge bytes for authentication. @@ -193,15 +198,10 @@ func (sc *SecretConnection) RemotePubKey() crypto.PubKey { func (sc *SecretConnection) Write(data []byte) (n int, err error) { sc.sendMtx.Lock() defer sc.sendMtx.Unlock() + sealedFrame, frame := sc.sendSealedFrame, sc.sendFrame for 0 < len(data) { if err := func() error { - sealedFrame := pool.Get(aeadSizeOverhead + totalFrameSize) - frame := pool.Get(totalFrameSize) - defer func() { - pool.Put(sealedFrame) - pool.Put(frame) - }() var chunk []byte if dataMaxSize < len(data) { chunk = data[:dataMaxSize] @@ -246,17 +246,15 @@ func (sc *SecretConnection) Read(data []byte) (n int, err error) { } // read off the conn - sealedFrame := pool.Get(aeadSizeOverhead + totalFrameSize) - defer pool.Put(sealedFrame) - _, err = io.ReadFull(sc.connReader, sealedFrame) + sealedFrame := sc.recvSealedFrame + _, err = io.ReadFull(sc.conn, sealedFrame) if err != nil { return n, err } // decrypt the frame. // reads and updates the sc.recvNonce - frame := pool.Get(totalFrameSize) - defer pool.Put(frame) + frame := sc.recvFrame _, err = sc.recvAead.Open(frame[:0], sc.recvNonce[:], sealedFrame, nil) if err != nil { return n, fmt.Errorf("failed to decrypt SecretConnection: %w", err) @@ -280,19 +278,19 @@ func (sc *SecretConnection) Read(data []byte) (n int, err error) { } // Implements net.Conn. -func (sc *SecretConnection) Close() error { return sc.underlyingConn.Close() } -func (sc *SecretConnection) LocalAddr() net.Addr { return sc.underlyingConn.(net.Conn).LocalAddr() } -func (sc *SecretConnection) RemoteAddr() net.Addr { return sc.underlyingConn.(net.Conn).RemoteAddr() } +func (sc *SecretConnection) Close() error { return sc.conn.Close() } +func (sc *SecretConnection) LocalAddr() net.Addr { return sc.conn.(net.Conn).LocalAddr() } +func (sc *SecretConnection) RemoteAddr() net.Addr { return sc.conn.(net.Conn).RemoteAddr() } func (sc *SecretConnection) SetDeadline(t time.Time) error { - return sc.underlyingConn.(net.Conn).SetDeadline(t) + return sc.conn.(net.Conn).SetDeadline(t) } func (sc *SecretConnection) SetReadDeadline(t time.Time) error { - return sc.underlyingConn.(net.Conn).SetReadDeadline(t) + return sc.conn.(net.Conn).SetReadDeadline(t) } func (sc *SecretConnection) SetWriteDeadline(t time.Time) error { - return sc.underlyingConn.(net.Conn).SetWriteDeadline(t) + return sc.conn.(net.Conn).SetWriteDeadline(t) } func genEphKeys() (ephPub, ephPriv *[32]byte) { From 1cb097af7859bd9dfad72e62e2e39dfdcd5bf3e8 Mon Sep 17 00:00:00 2001 From: Dev Ojha Date: Wed, 3 Jul 2024 11:18:30 +0100 Subject: [PATCH 22/30] perf(p2p): Only update send monitor once per batch packet msg send (#3382) Small optimization to outbound packet gossip, I expect this to be a 1-2% speedup to outbound packet gossip as is right now. Will test on mainnet soon This is safe as outbound packet gossip is single threaded per peer as is right now. Technically makes the send monitor marginally less real time, but this is irrelevant as the send monitor works on 20ms sliding windows anyway --- - [ ] Tests written/updated - [x] Changelog entry added in `.changelog` (we use [unclog](https://github.com/informalsystems/unclog) to manage our changelog) - [ ] Updated relevant documentation (`docs/` or `spec/`) and code comments - [x] Title follows the [Conventional Commits](https://www.conventionalcommits.org/en/v1.0.0/) spec --------- Co-authored-by: Anton Kaliaev Co-authored-by: Daniel (cherry picked from commit 20d863053637005edbcfa61f2f724888b6f78d5d) --- .../3382-single-send-monitor-per-packet.md | 2 ++ p2p/conn/connection.go | 21 ++++++++++++------- 2 files changed, 16 insertions(+), 7 deletions(-) create mode 100644 .changelog/unreleased/improvements/3382-single-send-monitor-per-packet.md diff --git a/.changelog/unreleased/improvements/3382-single-send-monitor-per-packet.md b/.changelog/unreleased/improvements/3382-single-send-monitor-per-packet.md new file mode 100644 index 0000000000..efa5e3cc27 --- /dev/null +++ b/.changelog/unreleased/improvements/3382-single-send-monitor-per-packet.md @@ -0,0 +1,2 @@ +- `[p2p/conn]` Update send monitor, used for sending rate limiting, once per batch of packets sent + ([\#3382](https://github.com/cometbft/cometbft/pull/3382)) diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index 71d8f4a8a7..25ddd4989d 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -523,16 +523,23 @@ func (c *MConnection) sendSomePacketMsgs(w protoio.Writer) bool { // Returns true if messages from channels were exhausted. func (c *MConnection) sendBatchPacketMsgs(w protoio.Writer, batchSize int) bool { // Send a batch of PacketMsgs. + totalBytesWritten := 0 + defer func() { + if totalBytesWritten > 0 { + c.sendMonitor.Update(totalBytesWritten) + } + }() for i := 0; i < batchSize; i++ { channel := selectChannelToGossipOn(c.channels) // nothing to send across any channel. if channel == nil { return true } - err := c.sendPacketMsgOnChannel(w, channel) + bytesWritten, err := c.sendPacketMsgOnChannel(w, channel) if err { return true } + totalBytesWritten += bytesWritten } return false } @@ -564,18 +571,18 @@ func selectChannelToGossipOn(channels []*Channel) *Channel { return leastChannel } -func (c *MConnection) sendPacketMsgOnChannel(w protoio.Writer, sendChannel *Channel) bool { +// returns (num_bytes_written, error_occurred). +func (c *MConnection) sendPacketMsgOnChannel(w protoio.Writer, sendChannel *Channel) (int, bool) { // Make & send a PacketMsg from this channel - _n, err := sendChannel.writePacketMsgTo(w) + n, err := sendChannel.writePacketMsgTo(w) if err != nil { c.Logger.Error("Failed to write PacketMsg", "err", err) c.stopForError(err) - return true + return n, true } - // TODO: Change this to only do one update for the entire bawtch. - c.sendMonitor.Update(_n) + // TODO: Change this to only add flush signals at the start and end of the batch. c.flushTimer.Set() - return false + return n, false } // recvRoutine reads PacketMsgs and reconstructs the message using the channels' "recving" buffer. From eb265f54a807d2e8fcc406668671b2ee8012f88d Mon Sep 17 00:00:00 2001 From: Dev Ojha Date: Thu, 4 Jul 2024 11:09:37 +0100 Subject: [PATCH 23/30] Don't remarshal within broadcast (#125) * Don't remarshal within broadcast * fix one more mock * Remove concurrency as its now net negative --- p2p/mock/peer.go | 7 ++++--- p2p/mocks/peer.go | 24 ++++++++++++------------ p2p/peer.go | 25 ++++++++++++++++--------- p2p/peer_set_test.go | 33 +++++++++++++++++---------------- p2p/switch.go | 16 +++++++++++++--- p2p/types.go | 24 ++++++++++++++++++++++++ 6 files changed, 86 insertions(+), 43 deletions(-) diff --git a/p2p/mock/peer.go b/p2p/mock/peer.go index 0d6882cd44..b4b443c199 100644 --- a/p2p/mock/peer.go +++ b/p2p/mock/peer.go @@ -42,9 +42,10 @@ func NewPeer(ip net.IP) *Peer { return mp } -func (mp *Peer) FlushStop() { mp.Stop() } //nolint:errcheck //ignore error -func (*Peer) TrySend(_ p2p.Envelope) bool { return true } -func (*Peer) Send(_ p2p.Envelope) bool { return true } +func (mp *Peer) FlushStop() { mp.Stop() } //nolint:errcheck //ignore error +func (*Peer) TrySend(_ p2p.Envelope) bool { return true } +func (*Peer) Send(_ p2p.Envelope) bool { return true } +func (*Peer) TrySendMarshalled(_ p2p.MarshalledEnvelope) bool { return true } func (mp *Peer) NodeInfo() p2p.NodeInfo { return p2p.DefaultNodeInfo{ DefaultNodeID: mp.addr.ID, diff --git a/p2p/mocks/peer.go b/p2p/mocks/peer.go index bce2c24d03..168fe46c80 100644 --- a/p2p/mocks/peer.go +++ b/p2p/mocks/peer.go @@ -433,16 +433,16 @@ func (_m *Peer) TrySend(e p2p.Envelope) bool { return r0 } -// NewPeer creates a new instance of Peer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewPeer(t interface { - mock.TestingT - Cleanup(func()) -}) *Peer { - mock := &Peer{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock +// TrySendMarshalled provides a mock function with given fields: _a0 +func (_m *Peer) TrySendMarshalled(_a0 p2p.MarshalledEnvelope) bool { + ret := _m.Called(_a0) + + var r0 bool + if rf, ok := ret.Get(0).(func(p2p.MarshalledEnvelope) bool); ok { + r0 = rf(_a0) + } else { + r0 = ret.Get(0).(bool) + } + + return r0 } diff --git a/p2p/peer.go b/p2p/peer.go index 451a9d36c2..0d6a0093fc 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -39,6 +39,7 @@ type Peer interface { Send(e Envelope) bool TrySend(e Envelope) bool + TrySendMarshalled(me MarshalledEnvelope) bool Set(key string, value any) Get(key string) any @@ -259,20 +260,17 @@ func (p *peer) Send(e Envelope) bool { return p.send(e.ChannelID, e.Message, p.mconn.Send) } -// TrySend msg bytes to the channel identified by chID byte. Immediately returns -// false if the send queue is full. -// -// thread safe. +func (p *peer) TrySendMarshalled(e MarshalledEnvelope) bool { + return p.sendMarshalled(e.ChannelID, getMsgType(e.Message), e.MarshalledMessage, p.mconn.TrySend) +} + +// TrySend attempts to sends the message in the envelope on the channel specified by the +// envelope. Returns false immediately if the connection's internal queue is full. func (p *peer) TrySend(e Envelope) bool { return p.send(e.ChannelID, e.Message, p.mconn.TrySend) } func (p *peer) send(chID byte, msg proto.Message, sendFunc func(byte, []byte) bool) bool { - if !p.IsRunning() { - return false - } else if !p.hasChannel(chID) { - return false - } msgType := getMsgType(msg) if w, ok := msg.(types.Wrapper); ok { msg = w.Wrap() @@ -282,6 +280,15 @@ func (p *peer) send(chID byte, msg proto.Message, sendFunc func(byte, []byte) bo p.Logger.Error("marshaling message to send", "error", err) return false } + return p.sendMarshalled(chID, msgType, msgBytes, sendFunc) +} + +func (p *peer) sendMarshalled(chID byte, msgType reflect.Type, msgBytes []byte, sendFunc func(byte, []byte) bool) bool { + if !p.IsRunning() { + return false + } else if !p.hasChannel(chID) { + return false + } res := sendFunc(chID, msgBytes) if res { p.pendingMetrics.AddPendingSendBytes(msgType, len(msgBytes)) diff --git a/p2p/peer_set_test.go b/p2p/peer_set_test.go index d9c209bf7b..9e377ea7f3 100644 --- a/p2p/peer_set_test.go +++ b/p2p/peer_set_test.go @@ -18,22 +18,23 @@ type mockPeer struct { id ID } -func (mp *mockPeer) FlushStop() { mp.Stop() } //nolint:errcheck // ignore error -func (*mockPeer) TrySend(Envelope) bool { return true } -func (*mockPeer) Send(Envelope) bool { return true } -func (*mockPeer) NodeInfo() NodeInfo { return DefaultNodeInfo{} } -func (*mockPeer) Status() ConnectionStatus { return ConnectionStatus{} } -func (mp *mockPeer) ID() ID { return mp.id } -func (*mockPeer) IsOutbound() bool { return false } -func (*mockPeer) IsPersistent() bool { return true } -func (*mockPeer) Get(s string) any { return s } -func (*mockPeer) Set(string, any) {} -func (mp *mockPeer) RemoteIP() net.IP { return mp.ip } -func (*mockPeer) SocketAddr() *NetAddress { return nil } -func (mp *mockPeer) RemoteAddr() net.Addr { return &net.TCPAddr{IP: mp.ip, Port: 8800} } -func (*mockPeer) CloseConn() error { return nil } -func (*mockPeer) SetRemovalFailed() {} -func (*mockPeer) GetRemovalFailed() bool { return false } +func (mp *mockPeer) FlushStop() { mp.Stop() } //nolint:errcheck // ignore error +func (*mockPeer) TrySend(Envelope) bool { return true } +func (*mockPeer) TrySendMarshalled(MarshalledEnvelope) bool { return true } +func (*mockPeer) Send(Envelope) bool { return true } +func (*mockPeer) NodeInfo() NodeInfo { return DefaultNodeInfo{} } +func (*mockPeer) Status() ConnectionStatus { return ConnectionStatus{} } +func (mp *mockPeer) ID() ID { return mp.id } +func (*mockPeer) IsOutbound() bool { return false } +func (*mockPeer) IsPersistent() bool { return true } +func (*mockPeer) Get(s string) any { return s } +func (*mockPeer) Set(string, any) {} +func (mp *mockPeer) RemoteIP() net.IP { return mp.ip } +func (*mockPeer) SocketAddr() *NetAddress { return nil } +func (mp *mockPeer) RemoteAddr() net.Addr { return &net.TCPAddr{IP: mp.ip, Port: 8800} } +func (*mockPeer) CloseConn() error { return nil } +func (*mockPeer) SetRemovalFailed() {} +func (*mockPeer) GetRemovalFailed() bool { return false } // Returns a mock peer. func newMockPeer(ip net.IP) *mockPeer { diff --git a/p2p/switch.go b/p2p/switch.go index a30bdd0684..9b92859f6b 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -290,11 +290,21 @@ func (sw *Switch) BroadcastEnvelope(e Envelope) { func (sw *Switch) TryBroadcast(e Envelope) { sw.Logger.Debug("TryBroadcast", "channel", e.ChannelID) + marshalMsg := e.Message + if wrapper, ok := e.Message.(Wrapper); ok { + marshalMsg = wrapper.Wrap() + } + marshalledMsg, err := proto.Marshal(marshalMsg) + if err != nil { + return + } + marshalledEnvelope := MarshalledEnvelope{ + Envelope: e, + MarshalledMessage: marshalledMsg, + } peers := sw.peers.list for _, peer := range peers { - go func(p Peer) { - p.TrySend(e) - }(peer) + peer.TrySendMarshalled(marshalledEnvelope) } } diff --git a/p2p/types.go b/p2p/types.go index 153c5d7b38..56fe5717d0 100644 --- a/p2p/types.go +++ b/p2p/types.go @@ -20,6 +20,30 @@ type Envelope struct { ChannelID byte } +// MarshalledEnvelope contains a proto message, its marshaled message, with sender routing info. +type MarshalledEnvelope struct { + Envelope + MarshalledMessage []byte +} + +// Unwrapper is a Protobuf message that can contain a variety of inner messages +// (e.g. via oneof fields). If a Channel's message type implements Unwrapper, the +// p2p layer will automatically unwrap inbound messages so that reactors do not have to do this themselves. +type Unwrapper interface { + proto.Message + + // Unwrap will unwrap the inner message contained in this message. + Unwrap() (proto.Message, error) +} + +// Wrapper is a companion type to Unwrapper. It is a Protobuf message that can contain a variety of inner messages. The p2p layer will automatically wrap outbound messages so that the reactors do not have to do it themselves. +type Wrapper interface { + proto.Message + + // Wrap will take the underlying message and wrap it in its wrapper type. + Wrap() proto.Message +} + var ( _ types.Wrapper = &tmp2p.PexRequest{} _ types.Wrapper = &tmp2p.PexAddrs{} From 91b9121412d2afa90a2ebeaa1193ef212b61471e Mon Sep 17 00:00:00 2001 From: itsdevbear Date: Thu, 4 Jul 2024 13:42:21 -0400 Subject: [PATCH 24/30] fix merge issue --- internal/consensus/reactor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index c292a3a1ee..1816c3ca29 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -376,7 +376,7 @@ func (conR *Reactor) Receive(e p2p.Envelope) { } } - cs.peerMsgQueue <- msgInfo{msg, e.Src.ID(), cmttime.Now()} + cs.peerMsgQueue <- msgInfo{msg, e.Src.ID(), time.Time{}} default: // don't punish (leave room for soft upgrades) From d3e09a7ca6e3ea247a8067ecde28847b843c8f57 Mon Sep 17 00:00:00 2001 From: itsdevbear Date: Thu, 4 Jul 2024 13:52:21 -0400 Subject: [PATCH 25/30] perf(consensus): Use TrySend for hasVote/HasBlockPart messages #3407 --- internal/consensus/reactor.go | 4 ++-- p2p/switch.go | 23 +++++------------------ 2 files changed, 7 insertions(+), 20 deletions(-) diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index 1816c3ca29..7e930656a8 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -493,7 +493,7 @@ func (conR *Reactor) broadcastNewValidBlockMessage(rs *cstypes.RoundState) { IsCommit: rs.Step == cstypes.RoundStepCommit, } go func() { - conR.Switch.BroadcastEnvelope(p2p.Envelope{ + conR.Switch.TryBroadcast(p2p.Envelope{ ChannelID: StateChannel, Message: csMsg, }) @@ -546,7 +546,7 @@ func (conR *Reactor) broadcastHasProposalBlockPartMessage(partMsg *BlockPartMess Round: partMsg.Round, Index: int32(partMsg.Part.Index), } - conR.Switch.BroadcastEnvelope(p2p.Envelope{ + conR.Switch.TryBroadcast(p2p.Envelope{ ChannelID: StateChannel, Message: msg, }) diff --git a/p2p/switch.go b/p2p/switch.go index 9b92859f6b..7433f32863 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -276,8 +276,6 @@ func (sw *Switch) BroadcastEnvelope(e Envelope) { for _, peer := range sw.peers.list { go func(p Peer) { - // TODO: We don't use the success value. Should most behavior - // really be TrySend? success := p.Send(e) _ = success }(peer) @@ -290,22 +288,11 @@ func (sw *Switch) BroadcastEnvelope(e Envelope) { func (sw *Switch) TryBroadcast(e Envelope) { sw.Logger.Debug("TryBroadcast", "channel", e.ChannelID) - marshalMsg := e.Message - if wrapper, ok := e.Message.(Wrapper); ok { - marshalMsg = wrapper.Wrap() - } - marshalledMsg, err := proto.Marshal(marshalMsg) - if err != nil { - return - } - marshalledEnvelope := MarshalledEnvelope{ - Envelope: e, - MarshalledMessage: marshalledMsg, - } - peers := sw.peers.list - for _, peer := range peers { - peer.TrySendMarshalled(marshalledEnvelope) - } + sw.peers.ForEach(func(p Peer) { + go func(peer Peer) { + peer.TrySend(e) + }(p) + }) } // NumPeers returns the count of outbound/inbound and outbound-dialing peers. From e78166588fb63783d7dce5348b6e35034f2162a2 Mon Sep 17 00:00:00 2001 From: itsdevbear Date: Thu, 4 Jul 2024 15:06:40 -0400 Subject: [PATCH 26/30] remove try send marshalled --- internal/blocksync/reactor.go | 2 +- internal/consensus/reactor.go | 2 +- p2p/mock/peer.go | 7 +++---- p2p/mocks/peer.go | 14 -------------- p2p/peer.go | 5 ----- p2p/peer_set_test.go | 33 ++++++++++++++++----------------- p2p/switch.go | 4 ++-- p2p/switch_test.go | 8 ++++---- statesync/reactor.go | 2 +- 9 files changed, 28 insertions(+), 49 deletions(-) diff --git a/internal/blocksync/reactor.go b/internal/blocksync/reactor.go index 6d8027039c..aa6f09dc72 100644 --- a/internal/blocksync/reactor.go +++ b/internal/blocksync/reactor.go @@ -556,7 +556,7 @@ FOR_LOOP: // BroadcastStatusRequest broadcasts `BlockStore` base and height. func (bcR *Reactor) BroadcastStatusRequest() { - bcR.Switch.BroadcastEnvelope(p2p.Envelope{ + bcR.Switch.Broadcast(p2p.Envelope{ ChannelID: BlocksyncChannel, Message: &bcproto.StatusRequest{}, }) diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index 7e930656a8..75e3f6cfd1 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -476,7 +476,7 @@ func (conR *Reactor) unsubscribeFromBroadcastEvents() { func (conR *Reactor) broadcastNewRoundStepMessage(rs *cstypes.RoundState) { nrsMsg := makeRoundStepMessage(rs) go func() { - conR.Switch.BroadcastEnvelope(p2p.Envelope{ + conR.Switch.Broadcast(p2p.Envelope{ ChannelID: StateChannel, Message: nrsMsg, }) diff --git a/p2p/mock/peer.go b/p2p/mock/peer.go index b4b443c199..0d6882cd44 100644 --- a/p2p/mock/peer.go +++ b/p2p/mock/peer.go @@ -42,10 +42,9 @@ func NewPeer(ip net.IP) *Peer { return mp } -func (mp *Peer) FlushStop() { mp.Stop() } //nolint:errcheck //ignore error -func (*Peer) TrySend(_ p2p.Envelope) bool { return true } -func (*Peer) Send(_ p2p.Envelope) bool { return true } -func (*Peer) TrySendMarshalled(_ p2p.MarshalledEnvelope) bool { return true } +func (mp *Peer) FlushStop() { mp.Stop() } //nolint:errcheck //ignore error +func (*Peer) TrySend(_ p2p.Envelope) bool { return true } +func (*Peer) Send(_ p2p.Envelope) bool { return true } func (mp *Peer) NodeInfo() p2p.NodeInfo { return p2p.DefaultNodeInfo{ DefaultNodeID: mp.addr.ID, diff --git a/p2p/mocks/peer.go b/p2p/mocks/peer.go index 168fe46c80..ba8c700421 100644 --- a/p2p/mocks/peer.go +++ b/p2p/mocks/peer.go @@ -432,17 +432,3 @@ func (_m *Peer) TrySend(e p2p.Envelope) bool { return r0 } - -// TrySendMarshalled provides a mock function with given fields: _a0 -func (_m *Peer) TrySendMarshalled(_a0 p2p.MarshalledEnvelope) bool { - ret := _m.Called(_a0) - - var r0 bool - if rf, ok := ret.Get(0).(func(p2p.MarshalledEnvelope) bool); ok { - r0 = rf(_a0) - } else { - r0 = ret.Get(0).(bool) - } - - return r0 -} diff --git a/p2p/peer.go b/p2p/peer.go index 0d6a0093fc..e72508f538 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -39,7 +39,6 @@ type Peer interface { Send(e Envelope) bool TrySend(e Envelope) bool - TrySendMarshalled(me MarshalledEnvelope) bool Set(key string, value any) Get(key string) any @@ -260,10 +259,6 @@ func (p *peer) Send(e Envelope) bool { return p.send(e.ChannelID, e.Message, p.mconn.Send) } -func (p *peer) TrySendMarshalled(e MarshalledEnvelope) bool { - return p.sendMarshalled(e.ChannelID, getMsgType(e.Message), e.MarshalledMessage, p.mconn.TrySend) -} - // TrySend attempts to sends the message in the envelope on the channel specified by the // envelope. Returns false immediately if the connection's internal queue is full. func (p *peer) TrySend(e Envelope) bool { diff --git a/p2p/peer_set_test.go b/p2p/peer_set_test.go index 9e377ea7f3..d9c209bf7b 100644 --- a/p2p/peer_set_test.go +++ b/p2p/peer_set_test.go @@ -18,23 +18,22 @@ type mockPeer struct { id ID } -func (mp *mockPeer) FlushStop() { mp.Stop() } //nolint:errcheck // ignore error -func (*mockPeer) TrySend(Envelope) bool { return true } -func (*mockPeer) TrySendMarshalled(MarshalledEnvelope) bool { return true } -func (*mockPeer) Send(Envelope) bool { return true } -func (*mockPeer) NodeInfo() NodeInfo { return DefaultNodeInfo{} } -func (*mockPeer) Status() ConnectionStatus { return ConnectionStatus{} } -func (mp *mockPeer) ID() ID { return mp.id } -func (*mockPeer) IsOutbound() bool { return false } -func (*mockPeer) IsPersistent() bool { return true } -func (*mockPeer) Get(s string) any { return s } -func (*mockPeer) Set(string, any) {} -func (mp *mockPeer) RemoteIP() net.IP { return mp.ip } -func (*mockPeer) SocketAddr() *NetAddress { return nil } -func (mp *mockPeer) RemoteAddr() net.Addr { return &net.TCPAddr{IP: mp.ip, Port: 8800} } -func (*mockPeer) CloseConn() error { return nil } -func (*mockPeer) SetRemovalFailed() {} -func (*mockPeer) GetRemovalFailed() bool { return false } +func (mp *mockPeer) FlushStop() { mp.Stop() } //nolint:errcheck // ignore error +func (*mockPeer) TrySend(Envelope) bool { return true } +func (*mockPeer) Send(Envelope) bool { return true } +func (*mockPeer) NodeInfo() NodeInfo { return DefaultNodeInfo{} } +func (*mockPeer) Status() ConnectionStatus { return ConnectionStatus{} } +func (mp *mockPeer) ID() ID { return mp.id } +func (*mockPeer) IsOutbound() bool { return false } +func (*mockPeer) IsPersistent() bool { return true } +func (*mockPeer) Get(s string) any { return s } +func (*mockPeer) Set(string, any) {} +func (mp *mockPeer) RemoteIP() net.IP { return mp.ip } +func (*mockPeer) SocketAddr() *NetAddress { return nil } +func (mp *mockPeer) RemoteAddr() net.Addr { return &net.TCPAddr{IP: mp.ip, Port: 8800} } +func (*mockPeer) CloseConn() error { return nil } +func (*mockPeer) SetRemovalFailed() {} +func (*mockPeer) GetRemovalFailed() bool { return false } // Returns a mock peer. func newMockPeer(ip net.IP) *mockPeer { diff --git a/p2p/switch.go b/p2p/switch.go index 7433f32863..f3b061c21e 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -270,8 +270,8 @@ func (sw *Switch) OnStop() { // success values for each attempted send (false if times out). Channel will be // closed once msg bytes are sent to all peers (or time out). // -// NOTE: BroadcastEnvelope uses goroutines, so order of broadcast may not be preserved. -func (sw *Switch) BroadcastEnvelope(e Envelope) { +// NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved. +func (sw *Switch) Broadcast(e Envelope) { sw.Logger.Debug("Broadcast", "channel", e.ChannelID) for _, peer := range sw.peers.list { diff --git a/p2p/switch_test.go b/p2p/switch_test.go index 6540b7522a..02bb9d98be 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -156,9 +156,9 @@ func TestSwitches(t *testing.T) { }, }, } - s1.BroadcastEnvelope(Envelope{ChannelID: byte(0x00), Message: ch0Msg}) - s1.BroadcastEnvelope(Envelope{ChannelID: byte(0x01), Message: ch1Msg}) - s1.BroadcastEnvelope(Envelope{ChannelID: byte(0x02), Message: ch2Msg}) + s1.Broadcast(Envelope{ChannelID: byte(0x00), Message: ch0Msg}) + s1.Broadcast(Envelope{ChannelID: byte(0x01), Message: ch1Msg}) + s1.Broadcast(Envelope{ChannelID: byte(0x02), Message: ch2Msg}) assertMsgReceivedWithTimeout(t, ch0Msg, byte(0x00), @@ -844,7 +844,7 @@ func BenchmarkSwitchBroadcast(b *testing.B) { // Send random message from foo channel to another for i := 0; i < b.N; i++ { chID := byte(i % 4) - s1.BroadcastEnvelope(Envelope{ChannelID: chID}) + s1.Broadcast(Envelope{ChannelID: chID}) } } diff --git a/statesync/reactor.go b/statesync/reactor.go index 55f83484ce..9b71ffe4f2 100644 --- a/statesync/reactor.go +++ b/statesync/reactor.go @@ -277,7 +277,7 @@ func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration) r.Logger.Debug("Requesting snapshots from known peers") // Request snapshots from all currently connected peers - r.Switch.BroadcastEnvelope(p2p.Envelope{ + r.Switch.Broadcast(p2p.Envelope{ ChannelID: SnapshotChannel, Message: &ssproto.SnapshotsRequest{}, }) From 03209cc77a6f10ac66915e194be7fa4b4733ec2e Mon Sep 17 00:00:00 2001 From: itsdevbear Date: Thu, 4 Jul 2024 15:13:29 -0400 Subject: [PATCH 27/30] feat(reactor): remove mtx --- internal/consensus/reactor.go | 30 ++++++++++++++---------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index 75e3f6cfd1..7fd30c1925 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -37,19 +37,17 @@ const ( // ----------------------------------------------------------------------------- +// Reactor defines a reactor for the consensus service. // Reactor defines a reactor for the consensus service. type Reactor struct { p2p.BaseReactor // BaseService + p2p.Switch - - conS *State - mtx cmtsync.RWMutex - waitSync atomic.Bool - eventBus *types.EventBus - rsMtx cmtsync.RWMutex - rs *cstypes.RoundState - initialHeight int64 // under rsMtx - - Metrics *Metrics + conS *State + waitSync atomic.Bool + eventBus *types.EventBus + rsMtx cmtsync.RWMutex + rs *cstypes.RoundState + Metrics *Metrics + initialHeight int64 } type ReactorOption func(*Reactor) @@ -338,9 +336,9 @@ func (conR *Reactor) Receive(e p2p.Envelope) { ps.SetHasProposalBlockPart(msg.Height, msg.Round, int(msg.Part.Index)) conR.Metrics.BlockParts.With("peer_id", string(e.Src.ID())).Add(1) - conR.mtx.RLock() + conR.conS.mtx.RLock() height, blockParts := conR.rs.Height, conR.rs.ProposalBlockParts - conR.mtx.RUnlock() + conR.conS.mtx.RUnlock() allowFutureBlockPart := true ok := allowProcessingProposalBlockPart(msg, conR.Logger, conR.Metrics, height, blockParts, allowFutureBlockPart, e.Src.ID()) @@ -361,9 +359,9 @@ func (conR *Reactor) Receive(e p2p.Envelope) { switch msg := msg.(type) { case *VoteMessage: cs := conR.conS - conR.mtx.RLock() + conR.conS.mtx.RLock() height, round := conR.rs.Height, conR.rs.Round - conR.mtx.RUnlock() + conR.conS.mtx.RUnlock() ps.SetHasVote(msg.Vote) // if vote is late, and is not a precommit for the last block, mark it late and return. @@ -582,10 +580,10 @@ func (conR *Reactor) updateRoundStateRoutine() { conR.conS.mtx.RLock() rs, initialHeight := conR.conS.getRoundState(), conR.conS.state.InitialHeight conR.conS.mtx.RUnlock() - conR.mtx.Lock() + conR.conS.mtx.Lock() conR.rs = rs conR.initialHeight = initialHeight - conR.mtx.Unlock() + conR.conS.mtx.Unlock() } } From 7cd01a07018d50a32964dff0335625054cd8e5fe Mon Sep 17 00:00:00 2001 From: itsdevbear Date: Thu, 4 Jul 2024 15:34:28 -0400 Subject: [PATCH 28/30] https://github.com/osmosis-labs/cometbft/pull/126 --- p2p/conn/connection.go | 59 +++++++++++++++++++++++------------------- 1 file changed, 33 insertions(+), 26 deletions(-) diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index 25ddd4989d..690611f751 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -781,6 +781,10 @@ type Channel struct { sending []byte recentlySent int64 // exponential moving average + nextPacketMsg *tmp2p.PacketMsg + nextP2pWrapperPacketMsg *tmp2p.Packet_PacketMsg + nextPacket *tmp2p.Packet + maxPacketMsgPayloadSize int Logger log.Logger @@ -795,6 +799,9 @@ func newChannel(conn *MConnection, desc ChannelDescriptor) *Channel { conn: conn, desc: desc, sendQueue: make(chan []byte, desc.SendQueueCapacity), + nextPacketMsg: &tmp2p.PacketMsg{ChannelID: int32(desc.ID)}, + nextP2pWrapperPacketMsg: &tmp2p.Packet_PacketMsg{}, + nextPacket: &tmp2p.Packet{}, recving: make([]byte, 0, desc.RecvBufferCapacity), maxPacketMsgPayloadSize: conn.config.MaxPacketMsgPayloadSize, } @@ -854,29 +861,28 @@ func (ch *Channel) isSendPending() bool { return true } -// Creates a new PacketMsg to send. +// Updates the nextPacket proto message for us to send. // Not goroutine-safe. -func (ch *Channel) nextPacketMsg() tmp2p.PacketMsg { - packet := tmp2p.PacketMsg{ChannelID: int32(ch.desc.ID)} +func (ch *Channel) updateNextPacket() { maxSize := ch.maxPacketMsgPayloadSize if len(ch.sending) <= maxSize { - packet.Data = ch.sending - packet.EOF = true + ch.nextPacketMsg.Data = ch.sending + ch.nextPacketMsg.EOF = true ch.sending = nil atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize } else { - packet.Data = ch.sending[:maxSize] - packet.EOF = false + ch.nextPacketMsg.Data = ch.sending[:maxSize] + ch.nextPacketMsg.EOF = false ch.sending = ch.sending[maxSize:] } - return packet + wrapPacketMsgWithBuffers(ch.nextPacketMsg, ch.nextP2pWrapperPacketMsg, ch.nextPacket) } // Writes next PacketMsg to w and updates c.recentlySent. // Not goroutine-safe. func (ch *Channel) writePacketMsgTo(w protoio.Writer) (n int, err error) { - packet := ch.nextPacketMsg() - n, err = w.WriteMsg(mustWrapPacket(&packet)) + ch.updateNextPacket() + n, err = w.WriteMsg(ch.nextPacket) if err != nil { return 0, err } @@ -920,32 +926,33 @@ func (ch *Channel) updateStats() { // mustWrapPacket takes a packet kind (oneof) and wraps it in a tmp2p.Packet message. func mustWrapPacket(pb proto.Message) *tmp2p.Packet { - var msg tmp2p.Packet + msg := &tmp2p.Packet{} + mustWrapPacketInto(pb, msg) + return msg +} +func mustWrapPacketInto(pb proto.Message, dst *tmp2p.Packet) { switch pb := pb.(type) { - case *tmp2p.Packet: // already a packet - msg = *pb + case *tmp2p.Packet: // already a packet, make a copy + *dst = *pb case *tmp2p.PacketPing: - msg = tmp2p.Packet{ - Sum: &tmp2p.Packet_PacketPing{ - PacketPing: pb, - }, + dst.Sum = &tmp2p.Packet_PacketPing{ + PacketPing: pb, } case *tmp2p.PacketPong: - msg = tmp2p.Packet{ - Sum: &tmp2p.Packet_PacketPong{ - PacketPong: pb, - }, + dst.Sum = &tmp2p.Packet_PacketPong{ + PacketPong: pb, } case *tmp2p.PacketMsg: - msg = tmp2p.Packet{ - Sum: &tmp2p.Packet_PacketMsg{ - PacketMsg: pb, - }, + dst.Sum = &tmp2p.Packet_PacketMsg{ + PacketMsg: pb, } default: panic(fmt.Errorf("unknown packet type %T", pb)) } +} - return &msg +func wrapPacketMsgWithBuffers(pb *tmp2p.PacketMsg, sumArg *tmp2p.Packet_PacketMsg, dst *tmp2p.Packet) { + sumArg.PacketMsg = pb + dst.Sum = sumArg } From fd81024ca57e8633c4466df2ef6b5f1ed19733ff Mon Sep 17 00:00:00 2001 From: itsdevbear Date: Thu, 4 Jul 2024 15:42:00 -0400 Subject: [PATCH 29/30] fix(reactor): make locks consistent with main --- internal/consensus/reactor.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index 7fd30c1925..1345200a93 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -336,9 +336,9 @@ func (conR *Reactor) Receive(e p2p.Envelope) { ps.SetHasProposalBlockPart(msg.Height, msg.Round, int(msg.Part.Index)) conR.Metrics.BlockParts.With("peer_id", string(e.Src.ID())).Add(1) - conR.conS.mtx.RLock() + conR.rsMtx.RLock() height, blockParts := conR.rs.Height, conR.rs.ProposalBlockParts - conR.conS.mtx.RUnlock() + conR.rsMtx.RUnlock() allowFutureBlockPart := true ok := allowProcessingProposalBlockPart(msg, conR.Logger, conR.Metrics, height, blockParts, allowFutureBlockPart, e.Src.ID()) @@ -359,9 +359,9 @@ func (conR *Reactor) Receive(e p2p.Envelope) { switch msg := msg.(type) { case *VoteMessage: cs := conR.conS - conR.conS.mtx.RLock() + conR.rsMtx.RLock() height, round := conR.rs.Height, conR.rs.Round - conR.conS.mtx.RUnlock() + conR.rsMtx.RUnlock() ps.SetHasVote(msg.Vote) // if vote is late, and is not a precommit for the last block, mark it late and return. @@ -577,13 +577,13 @@ func (conR *Reactor) updateRoundStateRoutine() { if !conR.IsRunning() { return } - conR.conS.mtx.RLock() + conR.rsMtx.RLock() rs, initialHeight := conR.conS.getRoundState(), conR.conS.state.InitialHeight - conR.conS.mtx.RUnlock() - conR.conS.mtx.Lock() + conR.rsMtx.RUnlock() + conR.rsMtx.Lock() conR.rs = rs conR.initialHeight = initialHeight - conR.conS.mtx.Unlock() + conR.rsMtx.Unlock() } } From b5c9a511a3ea77cb1afd536200063022c738b859 Mon Sep 17 00:00:00 2001 From: itsdevbear Date: Thu, 4 Jul 2024 15:48:58 -0400 Subject: [PATCH 30/30] bet --- internal/autofile/group.go | 32 +++++++++++++++++--------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/internal/autofile/group.go b/internal/autofile/group.go index e59917ace1..0eea30af5d 100644 --- a/internal/autofile/group.go +++ b/internal/autofile/group.go @@ -16,6 +16,8 @@ import ( "github.com/cometbft/cometbft/libs/service" ) +var indexedFilePattern = regexp.MustCompile(`^.+\.([0-9]{3,})$`) + const ( defaultGroupCheckDuration = 5000 * time.Millisecond defaultHeadSizeLimit = 10 * 1024 * 1024 // 10MB @@ -59,7 +61,7 @@ type Group struct { headBuf *bufio.Writer Dir string // Directory that contains .Head ticker *time.Ticker - mtx sync.Mutex + mtx sync.RWMutex headSizeLimit int64 totalSizeLimit int64 groupCheckDuration time.Duration @@ -170,29 +172,29 @@ func (g *Group) Close() { // HeadSizeLimit returns the current head size limit. func (g *Group) HeadSizeLimit() int64 { - g.mtx.Lock() - defer g.mtx.Unlock() + g.mtx.RLock() + defer g.mtx.RUnlock() return g.headSizeLimit } // TotalSizeLimit returns total size limit of the group. func (g *Group) TotalSizeLimit() int64 { - g.mtx.Lock() - defer g.mtx.Unlock() + g.mtx.RLock() + defer g.mtx.RUnlock() return g.totalSizeLimit } // MaxIndex returns index of the last file in the group. func (g *Group) MaxIndex() int { - g.mtx.Lock() - defer g.mtx.Unlock() + g.mtx.RLock() + defer g.mtx.RUnlock() return g.maxIndex } // MinIndex returns index of the first file in the group. func (g *Group) MinIndex() int { - g.mtx.Lock() - defer g.mtx.Unlock() + g.mtx.RLock() + defer g.mtx.RUnlock() return g.minIndex } @@ -219,8 +221,8 @@ func (g *Group) WriteLine(line string) error { // Buffered returns the size of the currently buffered data. func (g *Group) Buffered() int { - g.mtx.Lock() - defer g.mtx.Unlock() + g.mtx.RLock() + defer g.mtx.RUnlock() return g.headBuf.Buffered() } @@ -347,8 +349,8 @@ type GroupInfo struct { // Returns info after scanning all files in g.Head's dir. func (g *Group) ReadGroupInfo() GroupInfo { - g.mtx.Lock() - defer g.mtx.Unlock() + g.mtx.RLock() + defer g.mtx.RUnlock() return g.readGroupInfo() } @@ -380,7 +382,7 @@ func (g *Group) readGroupInfo() GroupInfo { } else if strings.HasPrefix(fileInfo.Name(), headBase) { fileSize := fileInfo.Size() totalSize += fileSize - indexedFilePattern := regexp.MustCompile(`^.+\.([0-9]{3,})$`) + submatch := indexedFilePattern.FindSubmatch([]byte(fileInfo.Name())) if len(submatch) != 0 { // Matches @@ -422,7 +424,7 @@ func filePathForIndex(headPath string, index int, maxIndex int) string { // GroupReader provides an interface for reading from a Group. type GroupReader struct { *Group - mtx sync.Mutex + mtx sync.RWMutex curIndex int curFile *os.File curReader *bufio.Reader