Skip to content

Commit

Permalink
all: various sync improvements
Browse files Browse the repository at this point in the history
Avoid using pointers to sync mutexes or maps in struct fields.
We can already take the address of a struct field,
and all of these methods on the struct type take a pointer receiver,
so there's no need to add another pointer indirection.

Note that bufioWithMutex now stores a non-pointer mutex,
so it has to itself be a pointer in the sync.Map entries.
This is fine, since Go's interface values always hold pointers,
so we can't really avoid bufioWithMutex from being stored as a pointer.

adminToken is a scalar value, so it can be an atomic instead.
Note that there isn't atomic.String like e.g. atomic.Bool,
so instead we use atomic.Pointer[string], which is still easier.

While here, make the field names in MockBlockStore more intuitive,
and it itself doesn't need to be a pointer field either.
  • Loading branch information
mvdan committed Jan 2, 2024
1 parent 5eaeb58 commit 1c88d5a
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 32 deletions.
16 changes: 8 additions & 8 deletions httprouter/apirest/apirest.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"path"
"strings"
"sync"
"sync/atomic"
"time"

"go.vocdoni.io/dvote/httprouter"
Expand Down Expand Up @@ -45,8 +46,7 @@ type API struct {
router *httprouter.HTTProuter
basePath string
authTokens sync.Map
adminToken string
adminTokenLock sync.RWMutex
adminToken atomic.Pointer[string]
verboseAuthLog bool
}

Expand Down Expand Up @@ -153,9 +153,11 @@ func (a *API) AuthorizeRequest(data any, accessType httprouter.AuthAccessType) (
}
switch accessType {
case httprouter.AccessTypeAdmin:
a.adminTokenLock.RLock()
defer a.adminTokenLock.RUnlock()
if msg.AuthToken != a.adminToken {
var adminToken string
if t := a.adminToken.Load(); t != nil {
adminToken = *t
}
if adminToken != "" && msg.AuthToken != adminToken {
return false, fmt.Errorf("admin token not valid")
}
return true, nil
Expand Down Expand Up @@ -262,9 +264,7 @@ func (a *API) RegisterMethod(pattern, HTTPmethod string, accessType string, hand

// SetAdminToken sets the bearer admin token capable to execute admin handlers
func (a *API) SetAdminToken(bearerToken string) {
a.adminTokenLock.Lock()
defer a.adminTokenLock.Unlock()
a.adminToken = bearerToken
a.adminToken.Store(&bearerToken)
}

// AddAuthToken adds a new bearer token capable to perform up to n requests
Expand Down
10 changes: 5 additions & 5 deletions subpub/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
// bufioWithMutex is a *bufio.Writer with methods Lock() and Unlock()
type bufioWithMutex struct {
*bufio.Writer
*sync.Mutex
sync.Mutex
}

func (ps *SubPub) handleStream(stream network.Stream) {
Expand All @@ -27,7 +27,7 @@ func (ps *SubPub) handleStream(stream network.Stream) {
go ps.readHandler(stream) // ps.readHandler just deals with chans so is thread-safe

// Create a buffer stream for concurrent, non blocking writes.
ps.streams.Store(peer, bufioWithMutex{bufio.NewWriter(stream), new(sync.Mutex)})
ps.streams.Store(peer, &bufioWithMutex{Writer: bufio.NewWriter(stream)})

if fn := ps.OnPeerAdd; fn != nil {
fn(peer)
Expand All @@ -40,9 +40,9 @@ func (ps *SubPub) sendStreamMessage(address string, message []byte) error {
if err != nil {
return fmt.Errorf("cannot decode %s into a peerID: %w", address, err)
}
value, found := ps.streams.Load(peerID)
stream, ok := value.(bufioWithMutex) // check type to avoid panics
if !found || !ok {
value, _ := ps.streams.Load(peerID)
stream, ok := value.(*bufioWithMutex) // check type to avoid panics
if !ok {
return fmt.Errorf("stream for peer %s not found", peerID)
}

Expand Down
26 changes: 13 additions & 13 deletions test/testcommon/testutil/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,28 @@ import (
)

type MockBlockStore struct {
store *sync.Map
count atomic.Int64
blockByHeight sync.Map // map[int64]*tmtypes.Block
height atomic.Int64
}

func (b *MockBlockStore) Init() {
log.Info("init mock block store")
b.store = new(sync.Map)
b.count.Store(0)
}

func (b *MockBlockStore) Height() int64 {
return b.count.Load()
return b.height.Load()
}

func (b *MockBlockStore) AddTxToBlock(tx []byte) {
count := b.count.Load()
count := b.height.Load()
log.Infow("add tx to block", "height", count)
b.Get(count).Data.Txs = append(b.Get(count).Data.Txs, tx)
block := b.Get(count)
// Note that this append is not safe to do concurrently.
block.Txs = append(block.Txs, tx)
}

func (b *MockBlockStore) NewBlock(height int64) {
if count := b.count.Load(); height != count {
if count := b.height.Load(); height != count {
panic(fmt.Sprintf("height is not the expected one (got:%d expected:%d)", height, count))
}
log.Infow("new block", "height", height)
Expand All @@ -44,12 +44,12 @@ func (b *MockBlockStore) NewBlock(height int64) {
}

func (b *MockBlockStore) EndBlock() int64 {
log.Infow("end block", "height", b.count.Load())
return b.count.Add(1)
log.Infow("end block", "height", b.height.Load())
return b.height.Add(1)
}

func (b *MockBlockStore) Get(height int64) *tmtypes.Block {
val, ok := b.store.Load(height)
val, ok := b.blockByHeight.Load(height)
if !ok {
return nil
}
Expand All @@ -58,7 +58,7 @@ func (b *MockBlockStore) Get(height int64) *tmtypes.Block {

func (b *MockBlockStore) GetByHash(hash []byte) *tmtypes.Block {
var block *tmtypes.Block
b.store.Range(func(key, value any) bool {
b.blockByHeight.Range(func(key, value any) bool {
if bytes.Equal(value.(*tmtypes.Block).Hash().Bytes(), hash) {
block = value.(*tmtypes.Block)
return false
Expand All @@ -69,5 +69,5 @@ func (b *MockBlockStore) GetByHash(hash []byte) *tmtypes.Block {
}

func (b *MockBlockStore) set(height int64, block *tmtypes.Block) {
b.store.Store(height, block)
b.blockByHeight.Store(height, block)
}
2 changes: 1 addition & 1 deletion vochain/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ type BaseApplication struct {
prepareProposalLock sync.Mutex

// testMockBlockStore is used for testing purposes only
testMockBlockStore *testutil.MockBlockStore
testMockBlockStore testutil.MockBlockStore
}

// pendingTxReference is used to store the block height where the transaction was accepted by the mempool, and the number
Expand Down
3 changes: 0 additions & 3 deletions vochain/apptest.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
tmtypes "github.com/cometbft/cometbft/types"
"go.vocdoni.io/dvote/config"
"go.vocdoni.io/dvote/db/metadb"
"go.vocdoni.io/dvote/test/testcommon/testutil"
"go.vocdoni.io/proto/build/go/models"
"google.golang.org/protobuf/proto"
)
Expand Down Expand Up @@ -61,8 +60,6 @@ func TestBaseApplicationWithChainID(tb testing.TB, chainID string) *BaseApplicat

// SetTestingMethods assigns fnGetBlockByHash, fnGetBlockByHeight, fnSendTx to use mockBlockStore
func (app *BaseApplication) SetTestingMethods() {
app.testMockBlockStore = new(testutil.MockBlockStore)
app.testMockBlockStore.Init()
app.SetFnGetBlockByHash(app.testMockBlockStore.GetByHash)
app.SetFnGetBlockByHeight(app.testMockBlockStore.Get)
app.SetFnGetTx(func(height uint32, txIndex int32) (*models.SignedTx, error) {
Expand Down
3 changes: 1 addition & 2 deletions vochain/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ type State struct {
ProcessBlockRegistry *ProcessBlockRegistry

validSIKRoots [][]byte
mtxValidSIKRoots *sync.Mutex
mtxValidSIKRoots sync.Mutex
}

// NewState creates a new State
Expand Down Expand Up @@ -141,7 +141,6 @@ func NewState(dbType, dataDir string) (*State, error) {
db: s.NoState(true),
state: s,
}
s.mtxValidSIKRoots = &sync.Mutex{}
if err := s.FetchValidSIKRoots(); err != nil {
return nil, fmt.Errorf("cannot update valid SIK roots: %w", err)
}
Expand Down

0 comments on commit 1c88d5a

Please sign in to comment.