From e870e14b3a73a76d281ad61003369a76e14f2c75 Mon Sep 17 00:00:00 2001 From: Chris Seto Date: Fri, 8 Nov 2024 15:58:37 -0500 Subject: [PATCH] implement ephemeral port reservation for k3d Prior to this commit we discovered that CI can flake when standing up multiple k3d clusters due to port conflicts. This commit attempts to implement "port reservation" such that pkg/k3d can always create k3d clusters without conflicts without coordination across different processes. It's unclear if this will actually work as engineering an intentional failure is not feasible. --- operator/pkg/k3d/k3d.go | 68 ++++++++++++++++++++++++++++++++++++ operator/pkg/k3d/k3d_test.go | 44 +++++++++++++++++++++++ 2 files changed, 112 insertions(+) diff --git a/operator/pkg/k3d/k3d.go b/operator/pkg/k3d/k3d.go index bca973bdd..ef2c38f60 100644 --- a/operator/pkg/k3d/k3d.go +++ b/operator/pkg/k3d/k3d.go @@ -19,15 +19,19 @@ import ( "embed" "fmt" "io/fs" + "net" "os" "os/exec" "slices" + "strconv" "strings" "sync" + "syscall" "time" "github.com/cockroachdb/errors" "github.com/redpanda-data/helm-charts/pkg/kube" + "golang.org/x/sys/unix" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -128,10 +132,16 @@ func NewCluster(name string, opts ...ClusterOpt) (*Cluster, error) { opt.apply(config) } + port, err := reserveEphemeralPort("tcp4") + if err != nil { + return nil, err + } + args := []string{ "cluster", "create", name, + fmt.Sprintf("--api-port=127.0.0.1:%d", port), fmt.Sprintf("--agents=%d", config.agents), fmt.Sprintf("--timeout=%s", config.timeout), fmt.Sprintf("--image=%s", config.image), @@ -332,3 +342,61 @@ var startupManifests = sync.OnceValue(func() []client.Object { return objs }) + +// reserveEphemeralPort attempts to "reserve" an ephemeral OS port by putting +// it into a TIME_WAIT state which prevents the OS re-allocating it when +// binding to port 0. +// Big thanks to: +// - https://github.com/libp2p/go-reuseport +// - https://github.com/Yelp/ephemeral-port-reserve +func reserveEphemeralPort(network string) (int, error) { + lc := net.ListenConfig{ + Control: func(network, address string, c syscall.RawConn) error { + var optErr error + ctrlErr := c.Control(func(fd uintptr) { + // Setting SO_REUSEADDR allows this port to be rebound after we finish using it. + // It works roughly the same way on macos and linux. + optErr = unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_REUSEADDR, 1) + }) + return errors.Join(ctrlErr, optErr) + }, + } + + // Listen to port 0 to get an OS allocated port. + lis, err := lc.Listen(context.Background(), network, "127.0.0.1:0") + if err != nil { + return 0, errors.WithStack(err) + } + + defer lis.Close() + + // According to ephemeral-port-reserve, we need to accept a connection to + // actually put this port into the TIME_WAIT state. + errCh := make(chan error, 1) + go func() { + conn, err := net.Dial(lis.Addr().Network(), lis.Addr().String()) + if err != nil { + errCh <- err + return + } + errCh <- conn.Close() + }() + + conn, err := lis.Accept() + if err != nil { + return 0, err + } + + defer conn.Close() + + if err := <-errCh; err != nil { + return 0, err + } + + _, port, err := net.SplitHostPort(lis.Addr().String()) + if err != nil { + return 0, err + } + + return strconv.Atoi(port) +} diff --git a/operator/pkg/k3d/k3d_test.go b/operator/pkg/k3d/k3d_test.go index 997546d7f..d3066e434 100644 --- a/operator/pkg/k3d/k3d_test.go +++ b/operator/pkg/k3d/k3d_test.go @@ -13,6 +13,8 @@ import ( "errors" "fmt" "sync" + "net" + "strconv" "testing" "github.com/stretchr/testify/assert" @@ -57,3 +59,45 @@ func TestMultiInstance(t *testing.T) { assert.NoError(t, errors.Join(errs...)) } + +func TestReservePort(t *testing.T) { + const network = "tcp4" + const iterations = 100 + + reserved := map[int]struct{}{} + for i := 0; i < iterations; i++ { + port, err := reserveEphemeralPort("tcp4") + assert.NoError(t, err) + assert.NotZero(t, port) + + // Assert that we can listen on the provided port. + lis, err := net.Listen("tcp4", fmt.Sprintf("127.0.0.1:%d", port)) + assert.NoError(t, err) + assert.NoError(t, lis.Close()) + + reserved[port] = struct{}{} + } + + // Not the best test as failures are exceptionally unlikely to be + // reproducible. + // Bind a bunch of ephemeral ports and assert that we don't get allocated + // any of the ports we've reserved. + for i := 0; i < iterations; i++ { + lis, err := net.Listen(network, "127.0.0.1:0") + assert.NoError(t, err) + + // Defer closing of this listener to ensure we always get new ports + // from listening to 0. + defer lis.Close() + + _, portStr, err := net.SplitHostPort(lis.Addr().String()) + assert.NoError(t, err) + + port, err := strconv.Atoi(portStr) + assert.NoError(t, err) + + t.Logf("%d", port) + + assert.NotContains(t, reserved, port) + } +}