From 55cb80a113c79523443db6b0188cd58c6472d784 Mon Sep 17 00:00:00 2001 From: Julien Duchesne Date: Fri, 4 Oct 2024 12:44:46 -0400 Subject: [PATCH] Address PR comments - Rename CHANGELOG - Mutex lock on shutdown rather than write - Wait when workers are ended rather than for each write --- CHANGELOG.md | 2 +- kv/memberlist/tcp_transport.go | 37 ++++++++++++++++++++-------------- 2 files changed, 23 insertions(+), 16 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b75628878..61b546627 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -230,7 +230,7 @@ * [ENHANCEMENT] Runtimeconfig: support gzip-compressed files with `.gz` extension. #571 * [ENHANCEMENT] grpcclient: Support custom gRPC compressors. #583 * [ENHANCEMENT] Adapt `metrics.SendSumOfGaugesPerTenant` to use `metrics.MetricOption`. #584 -* [ENHANCEMENT] Memberlist: Make `WriteTo` non-blocking. #525 +* [ENHANCEMENT] Memberlist: Add concurrency to the transport's WriteTo method. #525 * [CHANGE] Backoff: added `Backoff.ErrCause()` which is like `Backoff.Err()` but returns the context cause if backoff is terminated because the context has been canceled. #538 * [BUGFIX] spanlogger: Support multiple tenant IDs. #59 * [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85 diff --git a/kv/memberlist/tcp_transport.go b/kv/memberlist/tcp_transport.go index c74e50d17..bd13d9393 100644 --- a/kv/memberlist/tcp_transport.go +++ b/kv/memberlist/tcp_transport.go @@ -19,7 +19,6 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "go.uber.org/atomic" dstls "github.com/grafana/dskit/crypto/tls" "github.com/grafana/dskit/flagext" @@ -100,11 +99,11 @@ type TCPTransport struct { tcpListeners []net.Listener tlsConfig *tls.Config - writeMu sync.RWMutex writeCh chan writeRequest writeWG sync.WaitGroup - shutdown atomic.Int32 + shutdown bool + shutdownMu sync.RWMutex advertiseMu sync.RWMutex advertiseAddr string @@ -145,6 +144,7 @@ func NewTCPTransport(config TCPTransportConfig, logger log.Logger, registerer pr } for i := 0; i < concurrentWrites; i++ { + t.writeWG.Add(1) go t.writeWorker() } @@ -227,7 +227,10 @@ func (t *TCPTransport) tcpListen(tcpLn net.Listener) { for { conn, err := tcpLn.Accept() if err != nil { - if s := t.shutdown.Load(); s == 1 { + t.shutdownMu.RLock() + isShuttingDown := t.shutdown + t.shutdownMu.RUnlock() + if isShuttingDown { break } @@ -446,17 +449,17 @@ func (t *TCPTransport) getAdvertisedAddr() string { // WriteTo is a packet-oriented interface that fires off the given // payload to the given address. func (t *TCPTransport) WriteTo(b []byte, addr string) (time.Time, error) { - if t.shutdown.Load() == 1 { + t.shutdownMu.RLock() + defer t.shutdownMu.RUnlock() // Unlock at the end to protect the chan + if t.shutdown { return time.Time{}, errors.New("transport is shutting down") } - t.writeMu.RLock() - defer t.writeMu.RUnlock() - t.writeWG.Add(1) t.writeCh <- writeRequest{b: b, addr: addr} return time.Now(), nil } func (t *TCPTransport) writeWorker() { + defer t.writeWG.Done() for req := range t.writeCh { b, addr := req.b, req.addr t.sentPackets.Inc() @@ -473,7 +476,6 @@ func (t *TCPTransport) writeWorker() { } logLevel.Log("msg", "WriteTo failed", "addr", addr, "err", err) } - t.writeWG.Done() } } @@ -591,24 +593,29 @@ func (t *TCPTransport) StreamCh() <-chan net.Conn { // transport a chance to clean up any listeners. // This will avoid log spam about errors when we shut down. func (t *TCPTransport) Shutdown() error { + t.shutdownMu.Lock() // This will avoid log spam about errors when we shut down. - if old := t.shutdown.Swap(1); old == 1 { + if t.shutdown { + t.shutdownMu.Unlock() return nil // already shut down } + // Set the shutdown flag and close the write channel. + t.shutdown = true + close(t.writeCh) + t.shutdownMu.Unlock() + // Rip through all the connections and shut them down. for _, conn := range t.tcpListeners { _ = conn.Close() } + // Wait until all write workers have finished. + t.writeWG.Wait() + // Block until all the listener threads have died. t.wg.Wait() - // Wait until the write channel is empty and close it (to end the writeWorker goroutines). - t.writeMu.Lock() - defer t.writeMu.Unlock() - t.writeWG.Wait() - close(t.writeCh) return nil }