From 8e942a363fdf60a4db16d0d8dbb1b7af0695e530 Mon Sep 17 00:00:00 2001 From: Chris Seto Date: Fri, 8 Nov 2024 15:58:37 -0500 Subject: [PATCH] implement ephemeral port reservation for k3d Prior to this commit we discovered that CI can flake when standing up multiple k3d clusters due to port conflicts. This commit attempts to implement "port reservation" such that pkg/k3d can always create k3d clusters without conflicts without coordination across different processes. It's unclear if this will actually work as engineering an intentional failure is not feasible. --- operator/pkg/k3d/k3d.go | 68 ++++++++++++++++++++++++++++++++++++ operator/pkg/k3d/k3d_test.go | 61 ++++++++++++++++++++++++++++++++ 2 files changed, 129 insertions(+) create mode 100644 operator/pkg/k3d/k3d_test.go diff --git a/operator/pkg/k3d/k3d.go b/operator/pkg/k3d/k3d.go index 03439d158..998a9e068 100644 --- a/operator/pkg/k3d/k3d.go +++ b/operator/pkg/k3d/k3d.go @@ -19,15 +19,19 @@ import ( "embed" "fmt" "io/fs" + "net" "os" "os/exec" "slices" + "strconv" "strings" "sync" + "syscall" "time" "github.com/cockroachdb/errors" "github.com/redpanda-data/helm-charts/pkg/kube" + "golang.org/x/sys/unix" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -76,10 +80,16 @@ func NewCluster(name string) (*Cluster, error) { image = override } + port, err := reserveEphemeralPort("tcp4") + if err != nil { + return nil, err + } + args := []string{ "cluster", "create", name, + fmt.Sprintf("--api-port=127.0.0.1:%d", port), fmt.Sprintf("--agents=%d", 3), fmt.Sprintf("--timeout=%s", 30*time.Second), fmt.Sprintf("--image=%s", image), @@ -268,3 +278,61 @@ var startupManifests = sync.OnceValue(func() []client.Object { return objs }) + +// reserveEphemeralPort attempts to "reserve" an ephemeral OS port by putting +// it into a TIME_WAIT state which prevents the OS re-allocating it when +// binding to port 0. +// Big thanks to: +// - https://github.com/libp2p/go-reuseport +// - https://github.com/Yelp/ephemeral-port-reserve +func reserveEphemeralPort(network string) (int, error) { + lc := net.ListenConfig{ + Control: func(network, address string, c syscall.RawConn) error { + var optErr error + ctrlErr := c.Control(func(fd uintptr) { + // Setting SO_REUSEADDR allows this port to be rebound after we finish using it. + // It works roughly the same way on macos and linux. + optErr = unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_REUSEADDR, 1) + }) + return errors.Join(ctrlErr, optErr) + }, + } + + // Listen to port 0 to get an OS allocated port. + lis, err := lc.Listen(context.Background(), network, "127.0.0.1:0") + if err != nil { + return 0, errors.WithStack(err) + } + + defer lis.Close() + + // According to ephemeral-port-reserve, we need to accept a connection to + // actually put this port into the TIME_WAIT state. + errCh := make(chan error, 1) + go func() { + conn, err := net.Dial(lis.Addr().Network(), lis.Addr().String()) + if err != nil { + errCh <- err + return + } + errCh <- conn.Close() + }() + + conn, err := lis.Accept() + if err != nil { + return 0, err + } + + defer conn.Close() + + if err := <-errCh; err != nil { + return 0, err + } + + _, port, err := net.SplitHostPort(lis.Addr().String()) + if err != nil { + return 0, err + } + + return strconv.Atoi(port) +} diff --git a/operator/pkg/k3d/k3d_test.go b/operator/pkg/k3d/k3d_test.go new file mode 100644 index 000000000..67422d399 --- /dev/null +++ b/operator/pkg/k3d/k3d_test.go @@ -0,0 +1,61 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package k3d + +import ( + "fmt" + "net" + "strconv" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestReservePort(t *testing.T) { + const network = "tcp4" + const iterations = 100 + + reserved := map[int]struct{}{} + for i := 0; i < iterations; i++ { + port, err := reserveEphemeralPort("tcp4") + assert.NoError(t, err) + assert.NotZero(t, port) + + // Assert that we can listen on the provided port. + lis, err := net.Listen("tcp4", fmt.Sprintf("127.0.0.1:%d", port)) + assert.NoError(t, err) + assert.NoError(t, lis.Close()) + + reserved[port] = struct{}{} + } + + // Not the best test as failures are exceptionally unlikely to be + // reproducible. + // Bind a bunch of ephemeral ports and assert that we don't get allocated + // any of the ports we've reserved. + for i := 0; i < iterations; i++ { + lis, err := net.Listen(network, "127.0.0.1:0") + assert.NoError(t, err) + + // Defer closing of this listener to ensure we always get new ports + // from listening to 0. + defer lis.Close() + + _, portStr, err := net.SplitHostPort(lis.Addr().String()) + assert.NoError(t, err) + + port, err := strconv.Atoi(portStr) + assert.NoError(t, err) + + t.Logf("%d", port) + + assert.NotContains(t, reserved, port) + } +}