Skip to content

Commit

Permalink
kube: remove link between conn and ctx's lifecycles
Browse files Browse the repository at this point in the history
Previously, the `context.Context` passed to `DialContext` was bound to the
lifetime of the connection. This resulted in connection poolers such as
http.Transport reusing closed connections. The `context.Context` passed to dial
functions is meant to bound the time to establish the connection not the
lifetime of the connection. It's up to callers to ensure that .Close is
eventually called.
  • Loading branch information
chrisseto committed Aug 30, 2024
1 parent f8ea0ae commit 017445f
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 20 deletions.
20 changes: 2 additions & 18 deletions pkg/kube/dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (p *PodDialer) DialContext(ctx context.Context, network string, address str
p.cleanupConnection(pod)
}

return wrapConn(ctx, onClose, network, address, dataStream, errorStream), nil
return wrapConn(onClose, network, address, dataStream, errorStream), nil
}

// parseDNS attempts to determine the intended pod to target, currently the
Expand Down Expand Up @@ -282,38 +282,26 @@ type conn struct {
onClose func()

errCh chan error
stopCh chan error
closed atomic.Bool
}

var _ net.Conn = (*conn)(nil)

func wrapConn(ctx context.Context, onClose func(), network, remote string, s, err httpstream.Stream) *conn {
func wrapConn(onClose func(), network, remote string, s, err httpstream.Stream) *conn {
c := &conn{
dataStream: s,
errorStream: err,
network: network,
remote: remote,
onClose: onClose,
errCh: make(chan error, 1),
stopCh: make(chan error),
}

go c.pollErrors()
go c.checkCancelation(ctx)

return c
}

func (c *conn) checkCancelation(ctx context.Context) {
select {
case <-ctx.Done():
c.writeError(ctx.Err())
c.Close()
case <-c.stopCh:
}
}

func (c *conn) pollErrors() {
defer c.Close()

Expand Down Expand Up @@ -389,10 +377,6 @@ func (c *conn) Close() error {
// call our onClose cleanup handler
defer c.onClose()

// signal to any underlying goroutines that we are
// stopping
defer close(c.stopCh)

// closing the underlying connection should cause
// our error stream reading routine to stop
c.errorStream.Reset()
Expand Down
28 changes: 26 additions & 2 deletions pkg/kube/dialer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kube
import (
"context"
"crypto/tls"
"io"
"net"
"net/http"
"testing"
Expand Down Expand Up @@ -154,8 +155,31 @@ func TestDialer(t *testing.T) {
"http://name.service.default.svc.cluster.local.",
} {
t.Run(host, func(t *testing.T) {
_, err = httpClient.Get(host)
require.NoError(t, err)
// Test the pooling behavior of HTTPClient by making requests to
// the same hosts a few times.
for i := 0; i < 5; i++ {
func() {
// Run in a closure so we have a context scoped to the life
// time of each request we make, which is distinct from the
// lifetime of the connection due to http.Transport's
// connection pooling.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

req, err := http.NewRequestWithContext(ctx, http.MethodGet, host, nil)
require.NoError(t, err)

resp, err := httpClient.Do(req)
require.NoError(t, err)

defer resp.Body.Close()

_, err = io.ReadAll(resp.Body)
require.NoError(t, err)

require.Equal(t, http.StatusOK, resp.StatusCode)
}()
}
})
}
}

0 comments on commit 017445f

Please sign in to comment.