From 1c88d5a13bf0fe19f5773279a0161a310750a633 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Mart=C3=AD?= Date: Sat, 30 Dec 2023 00:48:52 +0100 Subject: [PATCH] all: various sync improvements Avoid using pointers to sync mutexes or maps in struct fields. We can already take the address of a struct field, and all of these methods on the struct type take a pointer receiver, so there's no need to add another pointer indirection. Note that bufioWithMutex now stores a non-pointer mutex, so it has to itself be a pointer in the sync.Map entries. This is fine, since Go's interface values always hold pointers, so we can't really avoid bufioWithMutex from being stored as a pointer. adminToken is a scalar value, so it can be an atomic instead. Note that there isn't atomic.String like e.g. atomic.Bool, so instead we use atomic.Pointer[string], which is still easier. While here, make the field names in MockBlockStore more intuitive, and it itself doesn't need to be a pointer field either. --- httprouter/apirest/apirest.go | 16 ++++++++-------- subpub/stream.go | 10 +++++----- test/testcommon/testutil/types.go | 26 +++++++++++++------------- vochain/app.go | 2 +- vochain/apptest.go | 3 --- vochain/state/state.go | 3 +-- 6 files changed, 28 insertions(+), 32 deletions(-) diff --git a/httprouter/apirest/apirest.go b/httprouter/apirest/apirest.go index 6fa28e674..83ab6e7cb 100644 --- a/httprouter/apirest/apirest.go +++ b/httprouter/apirest/apirest.go @@ -9,6 +9,7 @@ import ( "path" "strings" "sync" + "sync/atomic" "time" "go.vocdoni.io/dvote/httprouter" @@ -45,8 +46,7 @@ type API struct { router *httprouter.HTTProuter basePath string authTokens sync.Map - adminToken string - adminTokenLock sync.RWMutex + adminToken atomic.Pointer[string] verboseAuthLog bool } @@ -153,9 +153,11 @@ func (a *API) AuthorizeRequest(data any, accessType httprouter.AuthAccessType) ( } switch accessType { case httprouter.AccessTypeAdmin: - a.adminTokenLock.RLock() - defer a.adminTokenLock.RUnlock() - if msg.AuthToken != a.adminToken { + var adminToken string + if t := a.adminToken.Load(); t != nil { + adminToken = *t + } + if adminToken != "" && msg.AuthToken != adminToken { return false, fmt.Errorf("admin token not valid") } return true, nil @@ -262,9 +264,7 @@ func (a *API) RegisterMethod(pattern, HTTPmethod string, accessType string, hand // SetAdminToken sets the bearer admin token capable to execute admin handlers func (a *API) SetAdminToken(bearerToken string) { - a.adminTokenLock.Lock() - defer a.adminTokenLock.Unlock() - a.adminToken = bearerToken + a.adminToken.Store(&bearerToken) } // AddAuthToken adds a new bearer token capable to perform up to n requests diff --git a/subpub/stream.go b/subpub/stream.go index 4771b3d65..4d5ac1f81 100644 --- a/subpub/stream.go +++ b/subpub/stream.go @@ -14,7 +14,7 @@ import ( // bufioWithMutex is a *bufio.Writer with methods Lock() and Unlock() type bufioWithMutex struct { *bufio.Writer - *sync.Mutex + sync.Mutex } func (ps *SubPub) handleStream(stream network.Stream) { @@ -27,7 +27,7 @@ func (ps *SubPub) handleStream(stream network.Stream) { go ps.readHandler(stream) // ps.readHandler just deals with chans so is thread-safe // Create a buffer stream for concurrent, non blocking writes. - ps.streams.Store(peer, bufioWithMutex{bufio.NewWriter(stream), new(sync.Mutex)}) + ps.streams.Store(peer, &bufioWithMutex{Writer: bufio.NewWriter(stream)}) if fn := ps.OnPeerAdd; fn != nil { fn(peer) @@ -40,9 +40,9 @@ func (ps *SubPub) sendStreamMessage(address string, message []byte) error { if err != nil { return fmt.Errorf("cannot decode %s into a peerID: %w", address, err) } - value, found := ps.streams.Load(peerID) - stream, ok := value.(bufioWithMutex) // check type to avoid panics - if !found || !ok { + value, _ := ps.streams.Load(peerID) + stream, ok := value.(*bufioWithMutex) // check type to avoid panics + if !ok { return fmt.Errorf("stream for peer %s not found", peerID) } diff --git a/test/testcommon/testutil/types.go b/test/testcommon/testutil/types.go index 6f8931f0b..3fbc4a6de 100644 --- a/test/testcommon/testutil/types.go +++ b/test/testcommon/testutil/types.go @@ -12,28 +12,28 @@ import ( ) type MockBlockStore struct { - store *sync.Map - count atomic.Int64 + blockByHeight sync.Map // map[int64]*tmtypes.Block + height atomic.Int64 } func (b *MockBlockStore) Init() { log.Info("init mock block store") - b.store = new(sync.Map) - b.count.Store(0) } func (b *MockBlockStore) Height() int64 { - return b.count.Load() + return b.height.Load() } func (b *MockBlockStore) AddTxToBlock(tx []byte) { - count := b.count.Load() + count := b.height.Load() log.Infow("add tx to block", "height", count) - b.Get(count).Data.Txs = append(b.Get(count).Data.Txs, tx) + block := b.Get(count) + // Note that this append is not safe to do concurrently. + block.Txs = append(block.Txs, tx) } func (b *MockBlockStore) NewBlock(height int64) { - if count := b.count.Load(); height != count { + if count := b.height.Load(); height != count { panic(fmt.Sprintf("height is not the expected one (got:%d expected:%d)", height, count)) } log.Infow("new block", "height", height) @@ -44,12 +44,12 @@ func (b *MockBlockStore) NewBlock(height int64) { } func (b *MockBlockStore) EndBlock() int64 { - log.Infow("end block", "height", b.count.Load()) - return b.count.Add(1) + log.Infow("end block", "height", b.height.Load()) + return b.height.Add(1) } func (b *MockBlockStore) Get(height int64) *tmtypes.Block { - val, ok := b.store.Load(height) + val, ok := b.blockByHeight.Load(height) if !ok { return nil } @@ -58,7 +58,7 @@ func (b *MockBlockStore) Get(height int64) *tmtypes.Block { func (b *MockBlockStore) GetByHash(hash []byte) *tmtypes.Block { var block *tmtypes.Block - b.store.Range(func(key, value any) bool { + b.blockByHeight.Range(func(key, value any) bool { if bytes.Equal(value.(*tmtypes.Block).Hash().Bytes(), hash) { block = value.(*tmtypes.Block) return false @@ -69,5 +69,5 @@ func (b *MockBlockStore) GetByHash(hash []byte) *tmtypes.Block { } func (b *MockBlockStore) set(height int64, block *tmtypes.Block) { - b.store.Store(height, block) + b.blockByHeight.Store(height, block) } diff --git a/vochain/app.go b/vochain/app.go index 80af49a7b..945c713f6 100644 --- a/vochain/app.go +++ b/vochain/app.go @@ -97,7 +97,7 @@ type BaseApplication struct { prepareProposalLock sync.Mutex // testMockBlockStore is used for testing purposes only - testMockBlockStore *testutil.MockBlockStore + testMockBlockStore testutil.MockBlockStore } // pendingTxReference is used to store the block height where the transaction was accepted by the mempool, and the number diff --git a/vochain/apptest.go b/vochain/apptest.go index dd5ac540b..df8bc7f90 100644 --- a/vochain/apptest.go +++ b/vochain/apptest.go @@ -11,7 +11,6 @@ import ( tmtypes "github.com/cometbft/cometbft/types" "go.vocdoni.io/dvote/config" "go.vocdoni.io/dvote/db/metadb" - "go.vocdoni.io/dvote/test/testcommon/testutil" "go.vocdoni.io/proto/build/go/models" "google.golang.org/protobuf/proto" ) @@ -61,8 +60,6 @@ func TestBaseApplicationWithChainID(tb testing.TB, chainID string) *BaseApplicat // SetTestingMethods assigns fnGetBlockByHash, fnGetBlockByHeight, fnSendTx to use mockBlockStore func (app *BaseApplication) SetTestingMethods() { - app.testMockBlockStore = new(testutil.MockBlockStore) - app.testMockBlockStore.Init() app.SetFnGetBlockByHash(app.testMockBlockStore.GetByHash) app.SetFnGetBlockByHeight(app.testMockBlockStore.Get) app.SetFnGetTx(func(height uint32, txIndex int32) (*models.SignedTx, error) { diff --git a/vochain/state/state.go b/vochain/state/state.go index e8f098dbd..fe592a787 100644 --- a/vochain/state/state.go +++ b/vochain/state/state.go @@ -91,7 +91,7 @@ type State struct { ProcessBlockRegistry *ProcessBlockRegistry validSIKRoots [][]byte - mtxValidSIKRoots *sync.Mutex + mtxValidSIKRoots sync.Mutex } // NewState creates a new State @@ -141,7 +141,6 @@ func NewState(dbType, dataDir string) (*State, error) { db: s.NoState(true), state: s, } - s.mtxValidSIKRoots = &sync.Mutex{} if err := s.FetchValidSIKRoots(); err != nil { return nil, fmt.Errorf("cannot update valid SIK roots: %w", err) }