Skip to content

Commit

Permalink
Fix race condition in mockCluster map
Browse files Browse the repository at this point in the history
  • Loading branch information
yujunz committed Jul 17, 2020
1 parent 6633dbd commit 9698675
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 3 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ before_script:
- sh -c "sleep 5" # give Redis a(n additional) chance to start

script:
- go test -v ./...
- go test -v -race ./...
28 changes: 26 additions & 2 deletions farm/mock_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"reflect"
"sort"
"sync"
"sync/atomic"
"testing"

Expand Down Expand Up @@ -78,25 +79,31 @@ type mockCluster struct {
countScore int32
countKeys int32
countOpenChannels int32
mutex *sync.Mutex
}

var mockClusterIDs int32

func newMockCluster() *mockCluster {
return &mockCluster{
id: atomic.AddInt32(&mockClusterIDs, 1),
m: map[string]map[string]float64{},
id: atomic.AddInt32(&mockClusterIDs, 1),
m: map[string]map[string]float64{},
mutex: &sync.Mutex{},
}
}

func newFailingMockCluster() *mockCluster {
return &mockCluster{
m: map[string]map[string]float64{},
failing: true,
mutex: &sync.Mutex{},
}
}

func (c *mockCluster) Insert(keyScoreMembers []common.KeyScoreMember) error {
c.mutex.Lock()
defer c.mutex.Unlock()

atomic.AddInt32(&c.countInsert, 1)
if c.failing {
return errors.New("failtown, population you")
Expand Down Expand Up @@ -129,6 +136,9 @@ func (c *mockCluster) SelectOffset(keys []string, offset, limit int) <-chan clus
}
atomic.AddInt32(&c.countOpenChannels, 1)
go func() {
c.mutex.Lock()
defer c.mutex.Unlock()

defer func() {
close(ch)
atomic.AddInt32(&c.countOpenChannels, -1)
Expand Down Expand Up @@ -193,6 +203,9 @@ func (a scoreMemberSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a scoreMemberSlice) Less(i, j int) bool { return a[i].score > a[j].score }

func (c *mockCluster) Delete(keyScoreMembers []common.KeyScoreMember) error {
c.mutex.Lock()
defer c.mutex.Unlock()

atomic.AddInt32(&c.countDelete, 1)
if c.failing {
return errors.New("failtown, population you")
Expand Down Expand Up @@ -223,12 +236,16 @@ func (c *mockCluster) Delete(keyScoreMembers []common.KeyScoreMember) error {
// Score in this mock implementation will never return a score for
// deleted entries.
func (c *mockCluster) Score(keyMembers []common.KeyMember) (map[common.KeyMember]cluster.Presence, error) {
c.mutex.Lock()
defer c.mutex.Unlock()

atomic.AddInt32(&c.countScore, 1)
if c.failing {
return map[common.KeyMember]cluster.Presence{}, errors.New("failtown, population you")
}

m := map[common.KeyMember]cluster.Presence{}

for _, keyMember := range keyMembers {
members, ok := c.m[keyMember.Key]
if !ok {
Expand All @@ -250,11 +267,15 @@ func (c *mockCluster) Score(keyMembers []common.KeyMember) (map[common.KeyMember
}

func (c *mockCluster) Keys(batchSize int) <-chan []string {
c.mutex.Lock()
defer c.mutex.Unlock()

atomic.AddInt32(&c.countKeys, 1)

// Copy keys from c.m, so that at least after this method has returned,
// we don't run into issues with concurrent modifications.
a := make([]string, 0, len(c.m))

for key := range c.m {
a = append(a, key)
}
Expand All @@ -278,6 +299,9 @@ func (c *mockCluster) Keys(batchSize int) <-chan []string {
}

func (c *mockCluster) clear() {
c.mutex.Lock()
defer c.mutex.Unlock()

c.m = map[string]map[string]float64{}
}

Expand Down
1 change: 1 addition & 0 deletions farm/repair_strategies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ func TestAllRepairs(t *testing.T) {
// Make inserts, no repair.
first := common.KeyScoreMember{Key: "foo", Score: 1., Member: "bar"}
second := common.KeyScoreMember{Key: "foo", Score: 2.34, Member: "bar"}

farm.Insert([]common.KeyScoreMember{first}) // perfect insert
clusters[0].Insert([]common.KeyScoreMember{second}) // imperfect insert

Expand Down

0 comments on commit 9698675

Please sign in to comment.