Skip to content

Commit

Permalink
implement ephemeral port reservation for k3d
Browse files Browse the repository at this point in the history
Prior to this commit we discovered that CI can flake when standing up multiple
k3d clusters due to port conflicts.

This commit attempts to implement "port reservation" such that pkg/k3d can
always create k3d clusters without conflicts without coordination across
different processes.

It's unclear if this will actually work as engineering an intentional failure
is not feasible.
  • Loading branch information
chrisseto committed Nov 8, 2024
1 parent a1865c2 commit 8e942a3
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 0 deletions.
68 changes: 68 additions & 0 deletions operator/pkg/k3d/k3d.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@ import (
"embed"
"fmt"
"io/fs"
"net"
"os"
"os/exec"
"slices"
"strconv"
"strings"
"sync"
"syscall"
"time"

"github.com/cockroachdb/errors"
"github.com/redpanda-data/helm-charts/pkg/kube"
"golang.org/x/sys/unix"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand Down Expand Up @@ -76,10 +80,16 @@ func NewCluster(name string) (*Cluster, error) {
image = override
}

port, err := reserveEphemeralPort("tcp4")
if err != nil {
return nil, err
}

args := []string{
"cluster",
"create",
name,
fmt.Sprintf("--api-port=127.0.0.1:%d", port),
fmt.Sprintf("--agents=%d", 3),
fmt.Sprintf("--timeout=%s", 30*time.Second),
fmt.Sprintf("--image=%s", image),
Expand Down Expand Up @@ -268,3 +278,61 @@ var startupManifests = sync.OnceValue(func() []client.Object {

return objs
})

// reserveEphemeralPort attempts to "reserve" an ephemeral OS port by putting
// it into a TIME_WAIT state which prevents the OS re-allocating it when
// binding to port 0.
// Big thanks to:
// - https://github.com/libp2p/go-reuseport
// - https://github.com/Yelp/ephemeral-port-reserve
func reserveEphemeralPort(network string) (int, error) {
lc := net.ListenConfig{
Control: func(network, address string, c syscall.RawConn) error {
var optErr error
ctrlErr := c.Control(func(fd uintptr) {
// Setting SO_REUSEADDR allows this port to be rebound after we finish using it.
// It works roughly the same way on macos and linux.
optErr = unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_REUSEADDR, 1)
})
return errors.Join(ctrlErr, optErr)
},
}

// Listen to port 0 to get an OS allocated port.
lis, err := lc.Listen(context.Background(), network, "127.0.0.1:0")
if err != nil {
return 0, errors.WithStack(err)
}

defer lis.Close()

// According to ephemeral-port-reserve, we need to accept a connection to
// actually put this port into the TIME_WAIT state.
errCh := make(chan error, 1)
go func() {
conn, err := net.Dial(lis.Addr().Network(), lis.Addr().String())
if err != nil {
errCh <- err
return
}
errCh <- conn.Close()
}()

conn, err := lis.Accept()
if err != nil {
return 0, err
}

defer conn.Close()

if err := <-errCh; err != nil {
return 0, err
}

_, port, err := net.SplitHostPort(lis.Addr().String())
if err != nil {
return 0, err
}

return strconv.Atoi(port)
}
61 changes: 61 additions & 0 deletions operator/pkg/k3d/k3d_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package k3d

import (
"fmt"
"net"
"strconv"
"testing"

"github.com/stretchr/testify/assert"
)

func TestReservePort(t *testing.T) {
const network = "tcp4"
const iterations = 100

reserved := map[int]struct{}{}
for i := 0; i < iterations; i++ {
port, err := reserveEphemeralPort("tcp4")
assert.NoError(t, err)
assert.NotZero(t, port)

// Assert that we can listen on the provided port.
lis, err := net.Listen("tcp4", fmt.Sprintf("127.0.0.1:%d", port))
assert.NoError(t, err)
assert.NoError(t, lis.Close())

reserved[port] = struct{}{}
}

// Not the best test as failures are exceptionally unlikely to be
// reproducible.
// Bind a bunch of ephemeral ports and assert that we don't get allocated
// any of the ports we've reserved.
for i := 0; i < iterations; i++ {
lis, err := net.Listen(network, "127.0.0.1:0")
assert.NoError(t, err)

// Defer closing of this listener to ensure we always get new ports
// from listening to 0.
defer lis.Close()

_, portStr, err := net.SplitHostPort(lis.Addr().String())
assert.NoError(t, err)

port, err := strconv.Atoi(portStr)
assert.NoError(t, err)

t.Logf("%d", port)

assert.NotContains(t, reserved, port)
}
}

0 comments on commit 8e942a3

Please sign in to comment.