Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(reaper): refactor to allow retries and fix races #2728

Merged
merged 1 commit into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions container.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,17 @@ type DeprecatedContainer interface {

// Container allows getting info about and controlling a single container instance
type Container interface {
GetContainerID() string // get the container id from the provider
Endpoint(context.Context, string) (string, error) // get proto://ip:port string for the lowest exposed port
PortEndpoint(context.Context, nat.Port, string) (string, error) // get proto://ip:port string for the given exposed port
Host(context.Context) (string, error) // get host where the container port is exposed
Inspect(context.Context) (*types.ContainerJSON, error) // get container info
MappedPort(context.Context, nat.Port) (nat.Port, error) // get externally mapped port for a container port
Ports(context.Context) (nat.PortMap, error) // Deprecated: Use c.Inspect(ctx).NetworkSettings.Ports instead
SessionID() string // get session id
IsRunning() bool // IsRunning returns true if the container is running, false otherwise.
Start(context.Context) error // start the container
Stop(context.Context, *time.Duration) error // stop the container
GetContainerID() string // get the container id from the provider
Endpoint(context.Context, string) (string, error) // get proto://ip:port string for the lowest exposed port
PortEndpoint(ctx context.Context, port nat.Port, proto string) (string, error) // get proto://ip:port string for the given exposed port
Host(context.Context) (string, error) // get host where the container port is exposed
Inspect(context.Context) (*types.ContainerJSON, error) // get container info
MappedPort(context.Context, nat.Port) (nat.Port, error) // get externally mapped port for a container port
Ports(context.Context) (nat.PortMap, error) // Deprecated: Use c.Inspect(ctx).NetworkSettings.Ports instead
SessionID() string // get session id
IsRunning() bool // IsRunning returns true if the container is running, false otherwise.
Start(context.Context) error // start the container
Stop(context.Context, *time.Duration) error // stop the container

// Terminate stops and removes the container and its image if it was built and not flagged as kept.
Terminate(ctx context.Context) error
Expand Down
102 changes: 51 additions & 51 deletions docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (c *DockerContainer) Inspect(ctx context.Context) (*types.ContainerJSON, er
func (c *DockerContainer) MappedPort(ctx context.Context, port nat.Port) (nat.Port, error) {
inspect, err := c.Inspect(ctx)
if err != nil {
return "", err
return "", fmt.Errorf("inspect: %w", err)
}
if inspect.ContainerJSONBase.HostConfig.NetworkMode == "host" {
return port, nil
Expand All @@ -204,7 +204,7 @@ func (c *DockerContainer) MappedPort(ctx context.Context, port nat.Port) (nat.Po
return nat.NewPort(k.Proto(), p[0].HostPort)
}

return "", errors.New("port not found")
return "", errdefs.NotFound(fmt.Errorf("port %q not found", port))
}

// Deprecated: use c.Inspect(ctx).NetworkSettings.Ports instead.
Expand Down Expand Up @@ -980,9 +980,7 @@ func (p *DockerProvider) BuildImage(ctx context.Context, img ImageBuildInfo) (st
}

// CreateContainer fulfils a request for a container without starting it
func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerRequest) (Container, error) {
var err error

func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerRequest) (con Container, err error) { //nolint:nonamedreturns // Needed for error checking.
mdelapenya marked this conversation as resolved.
Show resolved Hide resolved
// defer the close of the Docker client connection the soonest
defer p.Close()

Expand Down Expand Up @@ -1027,22 +1025,23 @@ func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerReque
// the reaper does not need to start a reaper for itself
isReaperContainer := strings.HasSuffix(imageName, config.ReaperDefaultImage)
if !p.config.RyukDisabled && !isReaperContainer {
r, err := reuseOrCreateReaper(context.WithValue(ctx, core.DockerHostContextKey, p.host), core.SessionID(), p)
r, err := spawner.reaper(context.WithValue(ctx, core.DockerHostContextKey, p.host), core.SessionID(), p)
if err != nil {
return nil, fmt.Errorf("%w: creating reaper failed", err)
return nil, fmt.Errorf("reaper: %w", err)
}
termSignal, err = r.Connect()

termSignal, err := r.Connect()
if err != nil {
return nil, fmt.Errorf("%w: connecting to reaper failed", err)
return nil, fmt.Errorf("reaper connect: %w", err)
}
}

// Cleanup on error, otherwise set termSignal to nil before successful return.
defer func() {
if termSignal != nil {
termSignal <- true
}
}()
// Cleanup on error.
defer func() {
if err != nil {
termSignal <- true
}
}()
}

if err = req.Validate(); err != nil {
return nil, err
Expand Down Expand Up @@ -1108,10 +1107,9 @@ func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerReque
}

if !isReaperContainer {
// add the labels that the reaper will use to terminate the container to the request
for k, v := range core.DefaultLabels(core.SessionID()) {
req.Labels[k] = v
}
// Add the labels that identify this as a testcontainers container and
// allow the reaper to terminate it if requested.
AddGenericLabels(req.Labels)
}

dockerInput := &container.Config{
Expand Down Expand Up @@ -1205,9 +1203,6 @@ func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerReque
return nil, err
}

// Disable cleanup on success
termSignal = nil

return c, nil
}

Expand Down Expand Up @@ -1256,7 +1251,7 @@ func (p *DockerProvider) waitContainerCreation(ctx context.Context, name string)
)
}

func (p *DockerProvider) ReuseOrCreateContainer(ctx context.Context, req ContainerRequest) (Container, error) {
func (p *DockerProvider) ReuseOrCreateContainer(ctx context.Context, req ContainerRequest) (con Container, err error) { //nolint:nonamedreturns // Needed for error check.
mdelapenya marked this conversation as resolved.
Show resolved Hide resolved
c, err := p.findContainerByName(ctx, req.Name)
if err != nil {
return nil, err
Expand All @@ -1279,14 +1274,22 @@ func (p *DockerProvider) ReuseOrCreateContainer(ctx context.Context, req Contain

var termSignal chan bool
if !p.config.RyukDisabled {
r, err := reuseOrCreateReaper(context.WithValue(ctx, core.DockerHostContextKey, p.host), sessionID, p)
r, err := spawner.reaper(context.WithValue(ctx, core.DockerHostContextKey, p.host), sessionID, p)
if err != nil {
return nil, fmt.Errorf("reaper: %w", err)
}
termSignal, err = r.Connect()

termSignal, err := r.Connect()
if err != nil {
return nil, fmt.Errorf("%w: connecting to reaper failed", err)
return nil, fmt.Errorf("reaper connect: %w", err)
}

// Cleanup on error.
defer func() {
if err != nil {
termSignal <- true
}
}()
}

// default hooks include logger hook and pre-create hook
Expand Down Expand Up @@ -1454,9 +1457,7 @@ func daemonHost(ctx context.Context, p *DockerProvider) (string, error) {

// Deprecated: use network.New instead
// CreateNetwork returns the object representing a new network identified by its name
func (p *DockerProvider) CreateNetwork(ctx context.Context, req NetworkRequest) (Network, error) {
var err error

func (p *DockerProvider) CreateNetwork(ctx context.Context, req NetworkRequest) (net Network, err error) { //nolint:nonamedreturns // Needed for error check.
// defer the close of the Docker client connection the soonest
defer p.Close()

Expand Down Expand Up @@ -1485,31 +1486,30 @@ func (p *DockerProvider) CreateNetwork(ctx context.Context, req NetworkRequest)

var termSignal chan bool
if !p.config.RyukDisabled {
r, err := reuseOrCreateReaper(context.WithValue(ctx, core.DockerHostContextKey, p.host), sessionID, p)
r, err := spawner.reaper(context.WithValue(ctx, core.DockerHostContextKey, p.host), sessionID, p)
if err != nil {
return nil, fmt.Errorf("%w: creating network reaper failed", err)
return nil, fmt.Errorf("reaper: %w", err)
}
termSignal, err = r.Connect()

termSignal, err := r.Connect()
if err != nil {
return nil, fmt.Errorf("%w: connecting to network reaper failed", err)
return nil, fmt.Errorf("reaper connect: %w", err)
}
}

// add the labels that the reaper will use to terminate the network to the request
for k, v := range core.DefaultLabels(sessionID) {
req.Labels[k] = v
// Cleanup on error.
defer func() {
if err != nil {
termSignal <- true
}
}()
}

// Cleanup on error, otherwise set termSignal to nil before successful return.
defer func() {
if termSignal != nil {
termSignal <- true
}
}()
// add the labels that the reaper will use to terminate the network to the request
core.AddDefaultLabels(sessionID, req.Labels)

response, err := p.client.NetworkCreate(ctx, req.Name, nc)
if err != nil {
return &DockerNetwork{}, err
return &DockerNetwork{}, fmt.Errorf("create network: %w", err)
}

n := &DockerNetwork{
Expand All @@ -1520,9 +1520,6 @@ func (p *DockerProvider) CreateNetwork(ctx context.Context, req NetworkRequest)
provider: p,
}

// Disable cleanup on success
termSignal = nil

return n, nil
}

Expand Down Expand Up @@ -1592,9 +1589,12 @@ func (p *DockerProvider) getDefaultNetwork(ctx context.Context, cli client.APICl
_, err = cli.NetworkCreate(ctx, reaperNetwork, network.CreateOptions{
Driver: Bridge,
Attachable: true,
Labels: core.DefaultLabels(core.SessionID()),
Labels: GenericLabels(),
})
if err != nil {
// If the network already exists, we can ignore the error as that can
// happen if we are running multiple tests in parallel and we only
// need to ensure that the network exists.
if err != nil && !errdefs.IsConflict(err) {
return "", err
}
}
Expand Down Expand Up @@ -1632,7 +1632,7 @@ func containerFromDockerResponse(ctx context.Context, response types.Container)
// populate the raw representation of the container
jsonRaw, err := ctr.inspectRawContainer(ctx)
if err != nil {
return nil, err
return nil, fmt.Errorf("inspect raw container: %w", err)
}

// the health status of the container, if any
Expand Down
4 changes: 1 addition & 3 deletions docker_mounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,7 @@ func mapToDockerMounts(containerMounts ContainerMounts) []mount.Mount {
Labels: make(map[string]string),
}
}
for k, v := range GenericLabels() {
containerMount.VolumeOptions.Labels[k] = v
}
AddGenericLabels(containerMount.VolumeOptions.Labels)
}

mounts = append(mounts, containerMount)
Expand Down
12 changes: 11 additions & 1 deletion generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,17 @@ type GenericProvider interface {
ImageProvider
}

// GenericLabels returns a map of labels that can be used to identify containers created by this library
// GenericLabels returns a map of labels that can be used to identify resources
// created by this library. This includes the standard LabelSessionID if the
// reaper is enabled, otherwise this is excluded to prevent resources being
// incorrectly reaped.
func GenericLabels() map[string]string {
return core.DefaultLabels(core.SessionID())
}

// AddGenericLabels adds the generic labels to target.
func AddGenericLabels(target map[string]string) {
stevenh marked this conversation as resolved.
Show resolved Hide resolved
for k, v := range GenericLabels() {
target[k] = v
}
}
44 changes: 37 additions & 7 deletions internal/core/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,53 @@ import (
"strings"

"github.com/testcontainers/testcontainers-go/internal"
"github.com/testcontainers/testcontainers-go/internal/config"
)

const (
LabelBase = "org.testcontainers"
LabelLang = LabelBase + ".lang"
LabelReaper = LabelBase + ".reaper"
LabelRyuk = LabelBase + ".ryuk"
// LabelBase is the base label for all testcontainers labels.
LabelBase = "org.testcontainers"

// LabelLang specifies the language which created the test container.
LabelLang = LabelBase + ".lang"

// LabelReaper identifies the container as a reaper.
LabelReaper = LabelBase + ".reaper"

// LabelRyuk identifies the container as a ryuk.
LabelRyuk = LabelBase + ".ryuk"

// LabelSessionID specifies the session ID of the container.
LabelSessionID = LabelBase + ".sessionId"
LabelVersion = LabelBase + ".version"

// LabelVersion specifies the version of testcontainers which created the container.
LabelVersion = LabelBase + ".version"

// LabelReap specifies the container should be reaped by the reaper.
LabelReap = LabelBase + ".reap"
)

// DefaultLabels returns the standard set of labels which
// includes LabelSessionID if the reaper is enabled.
func DefaultLabels(sessionID string) map[string]string {
return map[string]string{
labels := map[string]string{
LabelBase: "true",
LabelLang: "go",
LabelSessionID: sessionID,
LabelVersion: internal.Version,
LabelSessionID: sessionID,
}

if !config.Read().RyukDisabled {
labels[LabelReap] = "true"
}

return labels
}

// AddDefaultLabels adds the default labels for sessionID to target.
func AddDefaultLabels(sessionID string, target map[string]string) {
stevenh marked this conversation as resolved.
Show resolved Hide resolved
for k, v := range DefaultLabels(sessionID) {
target[k] = v
}
}

Expand Down
2 changes: 1 addition & 1 deletion lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type ContainerRequestHook func(ctx context.Context, req ContainerRequest) error
// - Terminating
// - Terminated
// For that, it will receive a Container, modify it and return an error if needed.
type ContainerHook func(ctx context.Context, container Container) error
type ContainerHook func(ctx context.Context, ctr Container) error

// ContainerLifecycleHooks is a struct that contains all the hooks that can be used
// to modify the container lifecycle. All the container lifecycle hooks except the PreCreates hooks
Expand Down
18 changes: 0 additions & 18 deletions modules/compose/compose.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,23 +153,6 @@ func NewDockerComposeWith(opts ...ComposeStackOption) (*dockerCompose, error) {
return nil, fmt.Errorf("initialize docker client: %w", err)
}

reaperProvider, err := testcontainers.NewDockerProvider()
if err != nil {
return nil, fmt.Errorf("failed to create reaper provider for compose: %w", err)
}

var composeReaper *testcontainers.Reaper
if !reaperProvider.Config().Config.RyukDisabled {
// NewReaper is deprecated: we need to find a way to create the reaper for compose
// bypassing the deprecation.
r, err := testcontainers.NewReaper(context.Background(), testcontainers.SessionID(), reaperProvider, "")
if err != nil {
return nil, fmt.Errorf("failed to create reaper for compose: %w", err)
}

composeReaper = r
}

composeAPI := &dockerCompose{
name: composeOptions.Identifier,
configs: composeOptions.Paths,
Expand All @@ -182,7 +165,6 @@ func NewDockerComposeWith(opts ...ComposeStackOption) (*dockerCompose, error) {
containers: make(map[string]*testcontainers.DockerContainer),
networks: make(map[string]*testcontainers.DockerNetwork),
sessionID: testcontainers.SessionID(),
reaper: composeReaper,
}

return composeAPI, nil
Expand Down
Loading