diff --git a/operator/pkg/k3d/k3d.go b/operator/pkg/k3d/k3d.go index 03439d158..998a9e068 100644 --- a/operator/pkg/k3d/k3d.go +++ b/operator/pkg/k3d/k3d.go @@ -19,15 +19,19 @@ import ( "embed" "fmt" "io/fs" + "net" "os" "os/exec" "slices" + "strconv" "strings" "sync" + "syscall" "time" "github.com/cockroachdb/errors" "github.com/redpanda-data/helm-charts/pkg/kube" + "golang.org/x/sys/unix" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -76,10 +80,16 @@ func NewCluster(name string) (*Cluster, error) { image = override } + port, err := reserveEphemeralPort("tcp4") + if err != nil { + return nil, err + } + args := []string{ "cluster", "create", name, + fmt.Sprintf("--api-port=127.0.0.1:%d", port), fmt.Sprintf("--agents=%d", 3), fmt.Sprintf("--timeout=%s", 30*time.Second), fmt.Sprintf("--image=%s", image), @@ -268,3 +278,61 @@ var startupManifests = sync.OnceValue(func() []client.Object { return objs }) + +// reserveEphemeralPort attempts to "reserve" an ephemeral OS port by putting +// it into a TIME_WAIT state which prevents the OS re-allocating it when +// binding to port 0. +// Big thanks to: +// - https://github.com/libp2p/go-reuseport +// - https://github.com/Yelp/ephemeral-port-reserve +func reserveEphemeralPort(network string) (int, error) { + lc := net.ListenConfig{ + Control: func(network, address string, c syscall.RawConn) error { + var optErr error + ctrlErr := c.Control(func(fd uintptr) { + // Setting SO_REUSEADDR allows this port to be rebound after we finish using it. + // It works roughly the same way on macos and linux. + optErr = unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_REUSEADDR, 1) + }) + return errors.Join(ctrlErr, optErr) + }, + } + + // Listen to port 0 to get an OS allocated port. + lis, err := lc.Listen(context.Background(), network, "127.0.0.1:0") + if err != nil { + return 0, errors.WithStack(err) + } + + defer lis.Close() + + // According to ephemeral-port-reserve, we need to accept a connection to + // actually put this port into the TIME_WAIT state. + errCh := make(chan error, 1) + go func() { + conn, err := net.Dial(lis.Addr().Network(), lis.Addr().String()) + if err != nil { + errCh <- err + return + } + errCh <- conn.Close() + }() + + conn, err := lis.Accept() + if err != nil { + return 0, err + } + + defer conn.Close() + + if err := <-errCh; err != nil { + return 0, err + } + + _, port, err := net.SplitHostPort(lis.Addr().String()) + if err != nil { + return 0, err + } + + return strconv.Atoi(port) +} diff --git a/operator/pkg/k3d/k3d_test.go b/operator/pkg/k3d/k3d_test.go new file mode 100644 index 000000000..67422d399 --- /dev/null +++ b/operator/pkg/k3d/k3d_test.go @@ -0,0 +1,61 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package k3d + +import ( + "fmt" + "net" + "strconv" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestReservePort(t *testing.T) { + const network = "tcp4" + const iterations = 100 + + reserved := map[int]struct{}{} + for i := 0; i < iterations; i++ { + port, err := reserveEphemeralPort("tcp4") + assert.NoError(t, err) + assert.NotZero(t, port) + + // Assert that we can listen on the provided port. + lis, err := net.Listen("tcp4", fmt.Sprintf("127.0.0.1:%d", port)) + assert.NoError(t, err) + assert.NoError(t, lis.Close()) + + reserved[port] = struct{}{} + } + + // Not the best test as failures are exceptionally unlikely to be + // reproducible. + // Bind a bunch of ephemeral ports and assert that we don't get allocated + // any of the ports we've reserved. + for i := 0; i < iterations; i++ { + lis, err := net.Listen(network, "127.0.0.1:0") + assert.NoError(t, err) + + // Defer closing of this listener to ensure we always get new ports + // from listening to 0. + defer lis.Close() + + _, portStr, err := net.SplitHostPort(lis.Addr().String()) + assert.NoError(t, err) + + port, err := strconv.Atoi(portStr) + assert.NoError(t, err) + + t.Logf("%d", port) + + assert.NotContains(t, reserved, port) + } +}