diff --git a/pkg/kube/dialer.go b/pkg/kube/dialer.go index 5f110d2f8d..cf1f8918ee 100644 --- a/pkg/kube/dialer.go +++ b/pkg/kube/dialer.go @@ -140,7 +140,7 @@ func (p *PodDialer) DialContext(ctx context.Context, network string, address str p.cleanupConnection(pod) } - return wrapConn(ctx, onClose, network, address, dataStream, errorStream), nil + return wrapConn(onClose, network, address, dataStream, errorStream), nil } // parseDNS attempts to determine the intended pod to target, currently the @@ -282,13 +282,12 @@ type conn struct { onClose func() errCh chan error - stopCh chan error closed atomic.Bool } var _ net.Conn = (*conn)(nil) -func wrapConn(ctx context.Context, onClose func(), network, remote string, s, err httpstream.Stream) *conn { +func wrapConn(onClose func(), network, remote string, s, err httpstream.Stream) *conn { c := &conn{ dataStream: s, errorStream: err, @@ -296,24 +295,13 @@ func wrapConn(ctx context.Context, onClose func(), network, remote string, s, er remote: remote, onClose: onClose, errCh: make(chan error, 1), - stopCh: make(chan error), } go c.pollErrors() - go c.checkCancelation(ctx) return c } -func (c *conn) checkCancelation(ctx context.Context) { - select { - case <-ctx.Done(): - c.writeError(ctx.Err()) - c.Close() - case <-c.stopCh: - } -} - func (c *conn) pollErrors() { defer c.Close() @@ -389,10 +377,6 @@ func (c *conn) Close() error { // call our onClose cleanup handler defer c.onClose() - // signal to any underlying goroutines that we are - // stopping - defer close(c.stopCh) - // closing the underlying connection should cause // our error stream reading routine to stop c.errorStream.Reset() diff --git a/pkg/kube/dialer_test.go b/pkg/kube/dialer_test.go index 0bdfabe69a..2cfc2bf0de 100644 --- a/pkg/kube/dialer_test.go +++ b/pkg/kube/dialer_test.go @@ -3,6 +3,7 @@ package kube import ( "context" "crypto/tls" + "io" "net" "net/http" "testing" @@ -154,8 +155,31 @@ func TestDialer(t *testing.T) { "http://name.service.default.svc.cluster.local.", } { t.Run(host, func(t *testing.T) { - _, err = httpClient.Get(host) - require.NoError(t, err) + // Test the pooling behavior of HTTPClient by making requests to + // the same hosts a few times. + for i := 0; i < 5; i++ { + func() { + // Run in a closure so we have a context scoped to the life + // time of each request we make, which is distinct from the + // lifetime of the connection due to http.Transport's + // connection pooling. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, host, nil) + require.NoError(t, err) + + resp, err := httpClient.Do(req) + require.NoError(t, err) + + defer resp.Body.Close() + + _, err = io.ReadAll(resp.Body) + require.NoError(t, err) + + require.Equal(t, http.StatusOK, resp.StatusCode) + }() + } }) } }