From 017445f4fdc90566b6bac9c69fb6c88f177a64fb Mon Sep 17 00:00:00 2001 From: Chris Seto Date: Thu, 29 Aug 2024 13:31:43 -0400 Subject: [PATCH] kube: remove link between conn and ctx's lifecycles Previously, the `context.Context` passed to `DialContext` was bound to the lifetime of the connection. This resulted in connection poolers such as http.Transport reusing closed connections. The `context.Context` passed to dial functions is meant to bound the time to establish the connection not the lifetime of the connection. It's up to callers to ensure that .Close is eventually called. --- pkg/kube/dialer.go | 20 ++------------------ pkg/kube/dialer_test.go | 28 ++++++++++++++++++++++++++-- 2 files changed, 28 insertions(+), 20 deletions(-) diff --git a/pkg/kube/dialer.go b/pkg/kube/dialer.go index 5f110d2f8d..cf1f8918ee 100644 --- a/pkg/kube/dialer.go +++ b/pkg/kube/dialer.go @@ -140,7 +140,7 @@ func (p *PodDialer) DialContext(ctx context.Context, network string, address str p.cleanupConnection(pod) } - return wrapConn(ctx, onClose, network, address, dataStream, errorStream), nil + return wrapConn(onClose, network, address, dataStream, errorStream), nil } // parseDNS attempts to determine the intended pod to target, currently the @@ -282,13 +282,12 @@ type conn struct { onClose func() errCh chan error - stopCh chan error closed atomic.Bool } var _ net.Conn = (*conn)(nil) -func wrapConn(ctx context.Context, onClose func(), network, remote string, s, err httpstream.Stream) *conn { +func wrapConn(onClose func(), network, remote string, s, err httpstream.Stream) *conn { c := &conn{ dataStream: s, errorStream: err, @@ -296,24 +295,13 @@ func wrapConn(ctx context.Context, onClose func(), network, remote string, s, er remote: remote, onClose: onClose, errCh: make(chan error, 1), - stopCh: make(chan error), } go c.pollErrors() - go c.checkCancelation(ctx) return c } -func (c *conn) checkCancelation(ctx context.Context) { - select { - case <-ctx.Done(): - c.writeError(ctx.Err()) - c.Close() - case <-c.stopCh: - } -} - func (c *conn) pollErrors() { defer c.Close() @@ -389,10 +377,6 @@ func (c *conn) Close() error { // call our onClose cleanup handler defer c.onClose() - // signal to any underlying goroutines that we are - // stopping - defer close(c.stopCh) - // closing the underlying connection should cause // our error stream reading routine to stop c.errorStream.Reset() diff --git a/pkg/kube/dialer_test.go b/pkg/kube/dialer_test.go index 0bdfabe69a..2cfc2bf0de 100644 --- a/pkg/kube/dialer_test.go +++ b/pkg/kube/dialer_test.go @@ -3,6 +3,7 @@ package kube import ( "context" "crypto/tls" + "io" "net" "net/http" "testing" @@ -154,8 +155,31 @@ func TestDialer(t *testing.T) { "http://name.service.default.svc.cluster.local.", } { t.Run(host, func(t *testing.T) { - _, err = httpClient.Get(host) - require.NoError(t, err) + // Test the pooling behavior of HTTPClient by making requests to + // the same hosts a few times. + for i := 0; i < 5; i++ { + func() { + // Run in a closure so we have a context scoped to the life + // time of each request we make, which is distinct from the + // lifetime of the connection due to http.Transport's + // connection pooling. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, host, nil) + require.NoError(t, err) + + resp, err := httpClient.Do(req) + require.NoError(t, err) + + defer resp.Body.Close() + + _, err = io.ReadAll(resp.Body) + require.NoError(t, err) + + require.Equal(t, http.StatusOK, resp.StatusCode) + }() + } }) } }