Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Support for deleting keys in memberlist KV store. #605

Draft
wants to merge 17 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
6c9bdd3
Initial support for deleting keys in memberlist KV store.
pstibrany Oct 9, 2024
caa0691
Initial support for deleting keys in memberlist KV store.
pstibrany Oct 9, 2024
2e104a8
cache: Add ability to manually advance "now" for mock cache (#601)
56quarters Oct 9, 2024
006b024
Broadcast Delete change.
pstibrany Oct 9, 2024
f4d4811
Really fix flaky `TestMultipleClients` tests (#603)
julienduchesne Oct 10, 2024
879ff5a
Add concurrency to the memberlist transport's `WriteTo` method (#525)
aldernero Oct 10, 2024
a36b0cc
`TestMultipleClients*`: Poll for the memberlist status for 10 seconds…
julienduchesne Oct 11, 2024
8e7752e
Memberlist: support for debouncing notifications (#592)
seizethedave Oct 11, 2024
cf9b0bc
`ReusableGoroutinesPool`: Fix datarace on `Close` (#607)
julienduchesne Oct 11, 2024
619c421
spanlogger: fix double logging of caller information (#604)
charleskorn Oct 13, 2024
d3f80b0
Remove mutex from ReusableGoroutinesPool (#610)
colega Oct 14, 2024
21f60cf
memberlist `WriteTo`: Track dropped packets properly (#611)
julienduchesne Oct 15, 2024
9018c0e
Tests: Listen on `localhost` instead of `0.0.0.0` (#609)
julienduchesne Oct 21, 2024
3259202
Initial support for deleting keys in memberlist KV store.
pstibrany Oct 9, 2024
18a3348
Initial support for deleting keys in memberlist KV store.
pstibrany Oct 9, 2024
5c59354
Broadcast Delete change.
pstibrany Oct 9, 2024
1408522
Merge remote-tracking branch 'origin/Initial-support-for-deleting-key…
aldernero Oct 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
* [CHANGE] Changed `ShouldLog()` function signature in `middleware.OptionalLogging` interface to `ShouldLog(context.Context) (bool, string)`: the returned `string` contains an optional reason. When reason is valued, `GRPCServerLog` adds `(<reason>)` suffix to the error. #514
* [CHANGE] Cache: Remove superfluous `cache.RemoteCacheClient` interface and unify all caches using the `cache.Cache` interface. #520
* [CHANGE] Updated the minimum required Go version to 1.21. #540
* [CHANGE] Backoff: added `Backoff.ErrCause()` which is like `Backoff.Err()` but returns the context cause if backoff is terminated because the context has been canceled. #538
* [CHANGE] memberlist: Metric `memberlist_client_messages_in_broadcast_queue` is now split into `queue="local"` and `queue="gossip"` values. #539
* [CHANGE] memberlist: Failure to fast-join a cluster via contacting a node is now logged at `info` instead of `debug`. #585
* [CHANGE] `Service.AddListener` and `Manager.AddListener` now return function for stopping the listener. #564
Expand Down Expand Up @@ -232,7 +233,9 @@
* [ENHANCEMENT] grpcclient: Support custom gRPC compressors. #583
* [ENHANCEMENT] Adapt `metrics.SendSumOfGaugesPerTenant` to use `metrics.MetricOption`. #584
* [ENHANCEMENT] Cache: Add `.Add()` and `.Set()` methods to cache clients. #591
* [CHANGE] Backoff: added `Backoff.ErrCause()` which is like `Backoff.Err()` but returns the context cause if backoff is terminated because the context has been canceled. #538
* [ENHANCEMENT] Cache: Add `.Advance()` methods to mock cache clients for easier testing of TTLs. #601
* [ENHANCEMENT] Memberlist: Add concurrency to the transport's WriteTo method. #525
* [ENHANCEMENT] Memberlist: Notifications can now be processed once per interval specified by `-memberlist.notify-interval` to reduce notify storms in large clusters. #592
* [BUGFIX] spanlogger: Support multiple tenant IDs. #59
* [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85
* [BUGFIX] Ring: `ring_member_ownership_percent` and `ring_tokens_owned` metrics are not updated on scale down. #109
Expand Down
30 changes: 23 additions & 7 deletions cache/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,26 @@ var (
type MockCache struct {
mu sync.Mutex
cache map[string]Item
now time.Time
}

func NewMockCache() *MockCache {
c := &MockCache{}
c := &MockCache{now: time.Now()}
c.Flush()
return c
}

func (m *MockCache) SetAsync(key string, value []byte, ttl time.Duration) {
m.mu.Lock()
defer m.mu.Unlock()
m.cache[key] = Item{Data: value, ExpiresAt: time.Now().Add(ttl)}
m.cache[key] = Item{Data: value, ExpiresAt: m.now.Add(ttl)}
}

func (m *MockCache) SetMultiAsync(data map[string][]byte, ttl time.Duration) {
m.mu.Lock()
defer m.mu.Unlock()

exp := time.Now().Add(ttl)
exp := m.now.Add(ttl)
for key, val := range data {
m.cache[key] = Item{Data: val, ExpiresAt: exp}
}
Expand All @@ -46,19 +47,19 @@ func (m *MockCache) SetMultiAsync(data map[string][]byte, ttl time.Duration) {
func (m *MockCache) Set(_ context.Context, key string, value []byte, ttl time.Duration) error {
m.mu.Lock()
defer m.mu.Unlock()
m.cache[key] = Item{Data: value, ExpiresAt: time.Now().Add(ttl)}
m.cache[key] = Item{Data: value, ExpiresAt: m.now.Add(ttl)}
return nil
}

func (m *MockCache) Add(_ context.Context, key string, value []byte, ttl time.Duration) error {
m.mu.Lock()
defer m.mu.Unlock()

if _, ok := m.cache[key]; ok {
if i, ok := m.cache[key]; ok && i.ExpiresAt.After(m.now) {
return ErrNotStored
}

m.cache[key] = Item{Data: value, ExpiresAt: time.Now().Add(ttl)}
m.cache[key] = Item{Data: value, ExpiresAt: m.now.Add(ttl)}
return nil
}

Expand All @@ -68,7 +69,7 @@ func (m *MockCache) GetMulti(_ context.Context, keys []string, _ ...Option) map[

found := make(map[string][]byte, len(keys))

now := time.Now()
now := m.now
for _, k := range keys {
v, ok := m.cache[k]
if ok && now.Before(v.ExpiresAt) {
Expand Down Expand Up @@ -107,13 +108,22 @@ func (m *MockCache) Delete(_ context.Context, key string) error {
return nil
}

// Flush removes all entries from the cache
func (m *MockCache) Flush() {
m.mu.Lock()
defer m.mu.Unlock()

m.cache = map[string]Item{}
}

// Advance changes "now" by the given duration
func (m *MockCache) Advance(d time.Duration) {
m.mu.Lock()
defer m.mu.Unlock()

m.now = m.now.Add(d)
}

// InstrumentedMockCache is a mocked cache implementation which also tracks the number
// of times its functions are called.
type InstrumentedMockCache struct {
Expand Down Expand Up @@ -172,10 +182,16 @@ func (m *InstrumentedMockCache) GetItems() map[string]Item {
return m.cache.GetItems()
}

// Flush removes all entries from the cache
func (m *InstrumentedMockCache) Flush() {
m.cache.Flush()
}

// Advance changes "now" by the given duration
func (m *InstrumentedMockCache) Advance(d time.Duration) {
m.cache.Advance(d)
}

func (m *InstrumentedMockCache) CountStoreCalls() int {
return int(m.storeCount.Load())
}
Expand Down
21 changes: 15 additions & 6 deletions concurrency/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,27 @@ package concurrency
// If all workers are busy, Go() will spawn a new goroutine to run the workload.
func NewReusableGoroutinesPool(size int) *ReusableGoroutinesPool {
p := &ReusableGoroutinesPool{
jobs: make(chan func()),
jobs: make(chan func()),
closed: make(chan struct{}),
}
for i := 0; i < size; i++ {
go func() {
for f := range p.jobs {
f()
for {
select {
case f := <-p.jobs:
f()
case <-p.closed:
return
}
}
}()
}
return p
}

type ReusableGoroutinesPool struct {
jobs chan func()
jobs chan func()
closed chan struct{}
}

// Go will run the given function in a worker of the pool.
Expand All @@ -32,7 +39,9 @@ func (p *ReusableGoroutinesPool) Go(f func()) {
}

// Close stops the workers of the pool.
// No new Do() calls should be performed after calling Close().
// No new Go() calls should be performed after calling Close().
// Close does NOT wait for all jobs to finish, it is the caller's responsibility to ensure that in the provided workloads.
// Close is intended to be used in tests to ensure that no goroutines are leaked.
func (p *ReusableGoroutinesPool) Close() { close(p.jobs) }
func (p *ReusableGoroutinesPool) Close() {
close(p.closed)
}
28 changes: 28 additions & 0 deletions concurrency/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"regexp"
"runtime"
"strings"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.uber.org/atomic"
)

func TestReusableGoroutinesPool(t *testing.T) {
Expand Down Expand Up @@ -59,3 +61,29 @@ func TestReusableGoroutinesPool(t *testing.T) {
}
t.Fatalf("expected %d goroutines after closing, got %d", 0, countGoroutines())
}

// TestReusableGoroutinesPool_Race tests that Close() and Go() can be called concurrently.
func TestReusableGoroutinesPool_Race(t *testing.T) {
w := NewReusableGoroutinesPool(2)

var runCountAtomic atomic.Int32
const maxMsgCount = 10

var testWG sync.WaitGroup
testWG.Add(1)
go func() {
defer testWG.Done()
for i := 0; i < maxMsgCount; i++ {
w.Go(func() {
runCountAtomic.Add(1)
})
time.Sleep(10 * time.Millisecond)
}
}()
time.Sleep(10 * time.Millisecond)
w.Close() // close the pool
testWG.Wait() // wait for the test to finish

runCt := int(runCountAtomic.Load())
require.Equal(t, runCt, 10, "expected all functions to run")
}
64 changes: 36 additions & 28 deletions crypto/tls/test/tls_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ import (
"path/filepath"
"runtime"
"strconv"
"strings"
"testing"
"time"

"github.com/gogo/status"
"github.com/hashicorp/go-cleanhttp"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -103,6 +105,7 @@ func newIntegrationClientServer(

serv, err := server.New(cfg)
require.NoError(t, err)
defer serv.Shutdown()

serv.HTTP.HandleFunc("/hello", func(w http.ResponseWriter, _ *http.Request) {
fmt.Fprintf(w, "OK")
Expand All @@ -115,36 +118,42 @@ func newIntegrationClientServer(
require.NoError(t, err)
}()

// Wait until the server is up and running
assert.Eventually(t, func() bool {
conn, err := net.DialTimeout("tcp", httpAddr.String(), 1*time.Second)
if err != nil {
t.Logf("error dialing http: %v", err)
return false
}
defer conn.Close()
grpcConn, err := net.DialTimeout("tcp", grpcAddr.String(), 1*time.Second)
if err != nil {
t.Logf("error dialing grpc: %v", err)
return false
}
defer grpcConn.Close()
return true
}, 2500*time.Millisecond, 1*time.Second, "server is not up")

httpURL := fmt.Sprintf("https://localhost:%d/hello", httpAddr.Port)
grpcHost := net.JoinHostPort("localhost", strconv.Itoa(grpcAddr.Port))

for _, tc := range tcs {
tlsClientConfig, err := tc.tlsConfig.GetTLSConfig()
require.NoError(t, err)

// HTTP
t.Run("HTTP/"+tc.name, func(t *testing.T) {
transport := &http.Transport{TLSClientConfig: tlsClientConfig}
tlsClientConfig, err := tc.tlsConfig.GetTLSConfig()
require.NoError(t, err)

transport := cleanhttp.DefaultTransport()
transport.TLSClientConfig = tlsClientConfig
client := &http.Client{Transport: transport}

resp, err := client.Get(httpURL)
cancellableCtx, cancel := context.WithCancel(context.Background())
defer cancel()

req, err := http.NewRequestWithContext(cancellableCtx, http.MethodGet, httpURL, nil)
require.NoError(t, err)

resp, err := client.Do(req)
// We retry the request a few times in case of a TCP reset (and we're expecting an error)
// Sometimes, the server resets the connection rather than sending the TLS error
// Seems that even Google have issues with RST flakiness: https://go-review.googlesource.com/c/go/+/527196
isRST := func(err error) bool {
if err == nil {
return false
}
return strings.Contains(err.Error(), "connection reset by peer") || strings.Contains(err.Error(), "broken pipe")
}
for i := 0; i < 3 && isRST(err) && tc.httpExpectError != nil; i++ {
time.Sleep(100 * time.Millisecond)
resp, err = client.Do(req)
if err == nil {
defer resp.Body.Close()
}
}
if err == nil {
defer resp.Body.Close()
}
Expand Down Expand Up @@ -175,16 +184,18 @@ func newIntegrationClientServer(
dialOptions = append([]grpc.DialOption{grpc.WithDefaultCallOptions(clientConfig.CallOptions()...)}, dialOptions...)

conn, err := grpc.NewClient(grpcHost, dialOptions...)
assert.NoError(t, err, tc.name)
require.NoError(t, err, tc.name)
require.NoError(t, err, tc.name)
defer conn.Close()

cancellableCtx, cancel := context.WithCancel(context.Background())
defer cancel()

client := grpc_health_v1.NewHealthClient(conn)

// TODO: Investigate why the client doesn't really receive the
// error about the bad certificate from the server side and just
// see connection closed instead
resp, err := client.Check(context.TODO(), &grpc_health_v1.HealthCheckRequest{})
resp, err := client.Check(cancellableCtx, &grpc_health_v1.HealthCheckRequest{})
if tc.grpcExpectError != nil {
tc.grpcExpectError(t, err)
return
Expand All @@ -194,10 +205,7 @@ func newIntegrationClientServer(
assert.Equal(t, grpc_health_v1.HealthCheckResponse_SERVING, resp.Status)
}
})

}

serv.Shutdown()
}

func TestServerWithoutTlsEnabled(t *testing.T) {
Expand Down
Loading
Loading