Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
- Rename CHANGELOG
- Mutex lock on shutdown rather than write
- Wait when workers are ended rather than for each write
  • Loading branch information
julienduchesne committed Oct 4, 2024
1 parent 4d2ac93 commit 55cb80a
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 16 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@
* [ENHANCEMENT] Runtimeconfig: support gzip-compressed files with `.gz` extension. #571
* [ENHANCEMENT] grpcclient: Support custom gRPC compressors. #583
* [ENHANCEMENT] Adapt `metrics.SendSumOfGaugesPerTenant` to use `metrics.MetricOption`. #584
* [ENHANCEMENT] Memberlist: Make `WriteTo` non-blocking. #525
* [ENHANCEMENT] Memberlist: Add concurrency to the transport's WriteTo method. #525
* [CHANGE] Backoff: added `Backoff.ErrCause()` which is like `Backoff.Err()` but returns the context cause if backoff is terminated because the context has been canceled. #538
* [BUGFIX] spanlogger: Support multiple tenant IDs. #59
* [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85
Expand Down
37 changes: 22 additions & 15 deletions kv/memberlist/tcp_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.uber.org/atomic"

dstls "github.com/grafana/dskit/crypto/tls"
"github.com/grafana/dskit/flagext"
Expand Down Expand Up @@ -100,11 +99,11 @@ type TCPTransport struct {
tcpListeners []net.Listener
tlsConfig *tls.Config

writeMu sync.RWMutex
writeCh chan writeRequest
writeWG sync.WaitGroup

shutdown atomic.Int32
shutdown bool
shutdownMu sync.RWMutex

advertiseMu sync.RWMutex
advertiseAddr string
Expand Down Expand Up @@ -145,6 +144,7 @@ func NewTCPTransport(config TCPTransportConfig, logger log.Logger, registerer pr
}

for i := 0; i < concurrentWrites; i++ {
t.writeWG.Add(1)
go t.writeWorker()
}

Expand Down Expand Up @@ -227,7 +227,10 @@ func (t *TCPTransport) tcpListen(tcpLn net.Listener) {
for {
conn, err := tcpLn.Accept()
if err != nil {
if s := t.shutdown.Load(); s == 1 {
t.shutdownMu.RLock()
isShuttingDown := t.shutdown
t.shutdownMu.RUnlock()
if isShuttingDown {
break
}

Expand Down Expand Up @@ -446,17 +449,17 @@ func (t *TCPTransport) getAdvertisedAddr() string {
// WriteTo is a packet-oriented interface that fires off the given
// payload to the given address.
func (t *TCPTransport) WriteTo(b []byte, addr string) (time.Time, error) {
if t.shutdown.Load() == 1 {
t.shutdownMu.RLock()
defer t.shutdownMu.RUnlock() // Unlock at the end to protect the chan
if t.shutdown {
return time.Time{}, errors.New("transport is shutting down")
}
t.writeMu.RLock()
defer t.writeMu.RUnlock()
t.writeWG.Add(1)
t.writeCh <- writeRequest{b: b, addr: addr}
return time.Now(), nil
}

func (t *TCPTransport) writeWorker() {
defer t.writeWG.Done()
for req := range t.writeCh {
b, addr := req.b, req.addr
t.sentPackets.Inc()
Expand All @@ -473,7 +476,6 @@ func (t *TCPTransport) writeWorker() {
}
logLevel.Log("msg", "WriteTo failed", "addr", addr, "err", err)
}
t.writeWG.Done()
}
}

Expand Down Expand Up @@ -591,24 +593,29 @@ func (t *TCPTransport) StreamCh() <-chan net.Conn {
// transport a chance to clean up any listeners.
// This will avoid log spam about errors when we shut down.
func (t *TCPTransport) Shutdown() error {
t.shutdownMu.Lock()
// This will avoid log spam about errors when we shut down.
if old := t.shutdown.Swap(1); old == 1 {
if t.shutdown {
t.shutdownMu.Unlock()
return nil // already shut down
}

// Set the shutdown flag and close the write channel.
t.shutdown = true
close(t.writeCh)
t.shutdownMu.Unlock()

// Rip through all the connections and shut them down.
for _, conn := range t.tcpListeners {
_ = conn.Close()
}

// Wait until all write workers have finished.
t.writeWG.Wait()

// Block until all the listener threads have died.
t.wg.Wait()

// Wait until the write channel is empty and close it (to end the writeWorker goroutines).
t.writeMu.Lock()
defer t.writeMu.Unlock()
t.writeWG.Wait()
close(t.writeCh)
return nil
}

Expand Down

0 comments on commit 55cb80a

Please sign in to comment.