diff --git a/operator/pkg/k3d/k3d.go b/operator/pkg/k3d/k3d.go index bca973bdd..ef2c38f60 100644 --- a/operator/pkg/k3d/k3d.go +++ b/operator/pkg/k3d/k3d.go @@ -19,15 +19,19 @@ import ( "embed" "fmt" "io/fs" + "net" "os" "os/exec" "slices" + "strconv" "strings" "sync" + "syscall" "time" "github.com/cockroachdb/errors" "github.com/redpanda-data/helm-charts/pkg/kube" + "golang.org/x/sys/unix" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -128,10 +132,16 @@ func NewCluster(name string, opts ...ClusterOpt) (*Cluster, error) { opt.apply(config) } + port, err := reserveEphemeralPort("tcp4") + if err != nil { + return nil, err + } + args := []string{ "cluster", "create", name, + fmt.Sprintf("--api-port=127.0.0.1:%d", port), fmt.Sprintf("--agents=%d", config.agents), fmt.Sprintf("--timeout=%s", config.timeout), fmt.Sprintf("--image=%s", config.image), @@ -332,3 +342,61 @@ var startupManifests = sync.OnceValue(func() []client.Object { return objs }) + +// reserveEphemeralPort attempts to "reserve" an ephemeral OS port by putting +// it into a TIME_WAIT state which prevents the OS re-allocating it when +// binding to port 0. +// Big thanks to: +// - https://github.com/libp2p/go-reuseport +// - https://github.com/Yelp/ephemeral-port-reserve +func reserveEphemeralPort(network string) (int, error) { + lc := net.ListenConfig{ + Control: func(network, address string, c syscall.RawConn) error { + var optErr error + ctrlErr := c.Control(func(fd uintptr) { + // Setting SO_REUSEADDR allows this port to be rebound after we finish using it. + // It works roughly the same way on macos and linux. + optErr = unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_REUSEADDR, 1) + }) + return errors.Join(ctrlErr, optErr) + }, + } + + // Listen to port 0 to get an OS allocated port. + lis, err := lc.Listen(context.Background(), network, "127.0.0.1:0") + if err != nil { + return 0, errors.WithStack(err) + } + + defer lis.Close() + + // According to ephemeral-port-reserve, we need to accept a connection to + // actually put this port into the TIME_WAIT state. + errCh := make(chan error, 1) + go func() { + conn, err := net.Dial(lis.Addr().Network(), lis.Addr().String()) + if err != nil { + errCh <- err + return + } + errCh <- conn.Close() + }() + + conn, err := lis.Accept() + if err != nil { + return 0, err + } + + defer conn.Close() + + if err := <-errCh; err != nil { + return 0, err + } + + _, port, err := net.SplitHostPort(lis.Addr().String()) + if err != nil { + return 0, err + } + + return strconv.Atoi(port) +} diff --git a/operator/pkg/k3d/k3d_test.go b/operator/pkg/k3d/k3d_test.go index 997546d7f..d3066e434 100644 --- a/operator/pkg/k3d/k3d_test.go +++ b/operator/pkg/k3d/k3d_test.go @@ -13,6 +13,8 @@ import ( "errors" "fmt" "sync" + "net" + "strconv" "testing" "github.com/stretchr/testify/assert" @@ -57,3 +59,45 @@ func TestMultiInstance(t *testing.T) { assert.NoError(t, errors.Join(errs...)) } + +func TestReservePort(t *testing.T) { + const network = "tcp4" + const iterations = 100 + + reserved := map[int]struct{}{} + for i := 0; i < iterations; i++ { + port, err := reserveEphemeralPort("tcp4") + assert.NoError(t, err) + assert.NotZero(t, port) + + // Assert that we can listen on the provided port. + lis, err := net.Listen("tcp4", fmt.Sprintf("127.0.0.1:%d", port)) + assert.NoError(t, err) + assert.NoError(t, lis.Close()) + + reserved[port] = struct{}{} + } + + // Not the best test as failures are exceptionally unlikely to be + // reproducible. + // Bind a bunch of ephemeral ports and assert that we don't get allocated + // any of the ports we've reserved. + for i := 0; i < iterations; i++ { + lis, err := net.Listen(network, "127.0.0.1:0") + assert.NoError(t, err) + + // Defer closing of this listener to ensure we always get new ports + // from listening to 0. + defer lis.Close() + + _, portStr, err := net.SplitHostPort(lis.Addr().String()) + assert.NoError(t, err) + + port, err := strconv.Atoi(portStr) + assert.NoError(t, err) + + t.Logf("%d", port) + + assert.NotContains(t, reserved, port) + } +}