Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upstream inconsistent graph state fix #58

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion executor/runcexecutor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,8 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root executor.Mount,
}
}

statsCtx, statsCancel := context.WithCancel(context.Background())

trace.SpanFromContext(ctx).AddEvent("Container created")
err = w.run(ctx, id, bundle, process, func() {
startedOnce.Do(func() {
Expand All @@ -331,7 +333,7 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root executor.Mount,
close(started)
}
if process.StatsStream != nil {
go w.monitorContainerStats(ctx, id, w.sampleFrequency, process.StatsStream) // earthly-specific
go w.monitorContainerStats(statsCtx, id, w.sampleFrequency, process.StatsStream) // earthly-specific
}
if rec != nil {
rec.Start()
Expand All @@ -340,6 +342,7 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root executor.Mount,
}, true)

releaseContainer := func(ctx context.Context) error {
statsCancel()
err := w.runc.Delete(ctx, id, &runc.DeleteOpts{})
err1 := namespace.Close()
if err == nil {
Expand Down
51 changes: 29 additions & 22 deletions executor/runcexecutor/monitor_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,33 +46,40 @@ func writeStatsToStream(w io.Writer, stats *runc.Stats) error {

func (w *runcExecutor) monitorContainerStats(ctx context.Context, id string, sampleFrequency time.Duration, statsWriter io.WriteCloser) {
numFailuresAllowed := 10
for {
// sleep at the top of the loop to give it time to start
time.Sleep(sampleFrequency)

stats, err := w.runc.Stats(ctx, id)
if err != nil {
if errors.Is(err, context.Canceled) {
timer := time.NewTimer(sampleFrequency)
defer timer.Stop()

for {
select {
case <-ctx.Done():
bklog.G(ctx).Debugf("stats collection context done: %v", ctx.Err())
return
case <-timer.C:
stats, err := w.runc.Stats(ctx, id)
if err != nil {
if errors.Is(err, context.Canceled) {
return
}
if numFailuresAllowed > 0 {
// allow the initial calls to runc.Stats to fail, for cases where the program didn't start within the initial
// sampleFrequency; this should only occur under heavy workloads
bklog.G(ctx).Warnf("ignoring runc stats collection error: %s", err)
numFailuresAllowed--
continue
}
bklog.G(ctx).Errorf("runc stats collection error: %s", err)
return
}
if numFailuresAllowed > 0 {
// allow the initial calls to runc.Stats to fail, for cases where the program didn't start within the initial
// sampleFrequency; this should only occur under heavy workloads
bklog.G(ctx).Warnf("ignoring runc stats collection error: %s", err)
numFailuresAllowed--
continue
}
bklog.G(ctx).Errorf("runc stats collection error: %s", err)
return
}

// once runc.Stats has succeeded, don't ignore future errors
numFailuresAllowed = 0
// once runc.Stats has succeeded, don't ignore future errors
numFailuresAllowed = 0

err = writeStatsToStream(statsWriter, stats)
if err != nil {
bklog.G(ctx).Errorf("failed to send runc stats to client-stream: %s", err)
return
err = writeStatsToStream(statsWriter, stats)
if err != nil {
bklog.G(ctx).Errorf("failed to send runc stats to client-stream: %s", err)
return
}
}
}
}
62 changes: 60 additions & 2 deletions solver/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/moby/buildkit/identity"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/solver/errdefs"
"github.com/moby/buildkit/util/bklog"
"github.com/moby/buildkit/util/flightcontrol"
"github.com/moby/buildkit/util/progress"
"github.com/moby/buildkit/util/progress/controller"
Expand Down Expand Up @@ -156,7 +157,7 @@ func (s *state) getEdge(index Index) *edge {
return e
}

func (s *state) setEdge(index Index, targetEdge *edge) {
func (s *state) setEdge(index Index, targetEdge *edge, targetState *state) {
s.mu.Lock()
defer s.mu.Unlock()
e, ok := s.edges[index]
Expand All @@ -172,6 +173,60 @@ func (s *state) setEdge(index Index, targetEdge *edge) {
s.edges[index] = e
}
targetEdge.takeOwnership(e)

if targetState != nil {
targetState.addJobs(s, map[*state]struct{}{})

if _, ok := targetState.allPw[s.mpw]; !ok {
targetState.mpw.Add(s.mpw)
targetState.allPw[s.mpw] = struct{}{}
}
}
}

// addJobs recursively adds jobs to state and all its ancestors. currently
// only used during edge merges to add jobs from the source of the merge to the
// target and its ancestors.
// requires that Solver.mu is read-locked and srcState.mu is locked
func (s *state) addJobs(srcState *state, memo map[*state]struct{}) {
if _, ok := memo[s]; ok {
return
}
memo[s] = struct{}{}

s.mu.Lock()
defer s.mu.Unlock()

for j := range srcState.jobs {
s.jobs[j] = struct{}{}
}

for _, inputEdge := range s.vtx.Inputs() {
inputState, ok := s.solver.actives[inputEdge.Vertex.Digest()]
if !ok {
bklog.G(context.TODO()).
WithField("vertex_digest", inputEdge.Vertex.Digest()).
Error("input vertex not found during addJobs")
continue
}
inputState.addJobs(srcState, memo)

// tricky case: if the inputState's edge was *already* merged we should
// also add jobs to the merged edge's state
mergedInputEdge := inputState.getEdge(inputEdge.Index)
if mergedInputEdge == nil || mergedInputEdge.edge.Vertex.Digest() == inputEdge.Vertex.Digest() {
// not merged
continue
}
mergedInputState, ok := s.solver.actives[mergedInputEdge.edge.Vertex.Digest()]
if !ok {
bklog.G(context.TODO()).
WithField("vertex_digest", mergedInputEdge.edge.Vertex.Digest()).
Error("merged input vertex not found during addJobs")
continue
}
mergedInputState.addJobs(srcState, memo)
}
}

func (s *state) combinedCacheManager() CacheManager {
Expand Down Expand Up @@ -325,7 +380,10 @@ func (jl *Solver) setEdge(e Edge, targetEdge *edge) {
return
}

st.setEdge(e.Index, targetEdge)
// potentially passing nil targetSt is intentional and handled in st.setEdge
targetSt := jl.actives[targetEdge.edge.Vertex.Digest()]

st.setEdge(e.Index, targetEdge, targetSt)
}

func (jl *Solver) getState(e Edge) *state {
Expand Down
134 changes: 134 additions & 0 deletions solver/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3447,6 +3447,140 @@ func TestUnknownBuildID(t *testing.T) {
require.Contains(t, err.Error(), "no such job")
}

func TestStaleEdgeMerge(t *testing.T) {
// should not be possible to merge to an edge no longer in the actives map
t.Parallel()
ctx := context.TODO()

s := NewSolver(SolverOpt{
ResolveOpFunc: testOpResolver,
})
defer s.Close()

depV0 := vtxConst(1, vtxOpt{name: "depV0"})
depV1 := vtxConst(1, vtxOpt{name: "depV1"})
depV2 := vtxConst(1, vtxOpt{name: "depV2"})

// These should all end up edge merged
v0 := vtxAdd(2, vtxOpt{name: "v0", inputs: []Edge{
{Vertex: depV0},
}})
v1 := vtxAdd(2, vtxOpt{name: "v1", inputs: []Edge{
{Vertex: depV1},
}})
v2 := vtxAdd(2, vtxOpt{name: "v2", inputs: []Edge{
{Vertex: depV2},
}})

j0, err := s.NewJob("job0")
require.NoError(t, err)
g0 := Edge{Vertex: v0}
res, err := j0.Build(ctx, g0)
require.NoError(t, err)
require.NotNil(t, res)

require.Contains(t, s.actives, v0.Digest())
require.Contains(t, s.actives[v0.Digest()].jobs, j0)
require.Contains(t, s.actives, depV0.Digest())
require.Contains(t, s.actives[depV0.Digest()].jobs, j0)

// this edge should be merged with the one from j0
j1, err := s.NewJob("job1")
require.NoError(t, err)
g1 := Edge{Vertex: v1}
res, err = j1.Build(ctx, g1)
require.NoError(t, err)
require.NotNil(t, res)

require.Contains(t, s.actives, v0.Digest())
require.Contains(t, s.actives[v0.Digest()].jobs, j0)
require.Contains(t, s.actives[v0.Digest()].jobs, j1)
require.Contains(t, s.actives, depV0.Digest())
require.Contains(t, s.actives[depV0.Digest()].jobs, j0)
require.Contains(t, s.actives[depV0.Digest()].jobs, j1)

require.Contains(t, s.actives, v1.Digest())
require.NotContains(t, s.actives[v1.Digest()].jobs, j0)
require.Contains(t, s.actives[v1.Digest()].jobs, j1)
require.Contains(t, s.actives, depV1.Digest())
require.NotContains(t, s.actives[depV1.Digest()].jobs, j0)
require.Contains(t, s.actives[depV1.Digest()].jobs, j1)

// discard j0, verify that v0 is still active and it's state contains j1 since j1's
// edge was merged to v0's state
require.NoError(t, j0.Discard())

require.Contains(t, s.actives, v0.Digest())
require.NotContains(t, s.actives[v0.Digest()].jobs, j0)
require.Contains(t, s.actives[v0.Digest()].jobs, j1)
require.Contains(t, s.actives, depV0.Digest())
require.NotContains(t, s.actives[depV0.Digest()].jobs, j0)
require.Contains(t, s.actives[depV0.Digest()].jobs, j1)

require.Contains(t, s.actives, v1.Digest())
require.NotContains(t, s.actives[v1.Digest()].jobs, j0)
require.Contains(t, s.actives[v1.Digest()].jobs, j1)
require.Contains(t, s.actives, depV1.Digest())
require.NotContains(t, s.actives[depV1.Digest()].jobs, j0)
require.Contains(t, s.actives[depV1.Digest()].jobs, j1)

// verify another job can still merge
j2, err := s.NewJob("job2")
require.NoError(t, err)
g2 := Edge{Vertex: v2}
res, err = j2.Build(ctx, g2)
require.NoError(t, err)
require.NotNil(t, res)

require.Contains(t, s.actives, v0.Digest())
require.Contains(t, s.actives[v0.Digest()].jobs, j1)
require.Contains(t, s.actives[v0.Digest()].jobs, j2)
require.Contains(t, s.actives, depV0.Digest())
require.Contains(t, s.actives[depV0.Digest()].jobs, j1)
require.Contains(t, s.actives[depV0.Digest()].jobs, j2)

require.Contains(t, s.actives, v1.Digest())
require.Contains(t, s.actives[v1.Digest()].jobs, j1)
require.NotContains(t, s.actives[v1.Digest()].jobs, j2)
require.Contains(t, s.actives, depV1.Digest())
require.Contains(t, s.actives[depV1.Digest()].jobs, j1)
require.NotContains(t, s.actives[depV1.Digest()].jobs, j2)

require.Contains(t, s.actives, v2.Digest())
require.NotContains(t, s.actives[v2.Digest()].jobs, j1)
require.Contains(t, s.actives[v2.Digest()].jobs, j2)
require.Contains(t, s.actives, depV2.Digest())
require.NotContains(t, s.actives[depV2.Digest()].jobs, j1)
require.Contains(t, s.actives[depV2.Digest()].jobs, j2)

// discard j1, verify only referenced edges still exist
require.NoError(t, j1.Discard())

require.Contains(t, s.actives, v0.Digest())
require.NotContains(t, s.actives[v0.Digest()].jobs, j1)
require.Contains(t, s.actives[v0.Digest()].jobs, j2)
require.Contains(t, s.actives, depV0.Digest())
require.NotContains(t, s.actives[depV0.Digest()].jobs, j1)
require.Contains(t, s.actives[depV0.Digest()].jobs, j2)

require.NotContains(t, s.actives, v1.Digest())
require.NotContains(t, s.actives, depV1.Digest())

require.Contains(t, s.actives, v2.Digest())
require.Contains(t, s.actives[v2.Digest()].jobs, j2)
require.Contains(t, s.actives, depV2.Digest())
require.Contains(t, s.actives[depV2.Digest()].jobs, j2)

// discard the last job and verify everything was removed now
require.NoError(t, j2.Discard())
require.NotContains(t, s.actives, v0.Digest())
require.NotContains(t, s.actives, v1.Digest())
require.NotContains(t, s.actives, v2.Digest())
require.NotContains(t, s.actives, depV0.Digest())
require.NotContains(t, s.actives, depV1.Digest())
require.NotContains(t, s.actives, depV2.Digest())
}

func generateSubGraph(nodes int) (Edge, int) {
if nodes == 1 {
value := rand.Int() % 500 //nolint:gosec
Expand Down
2 changes: 2 additions & 0 deletions util/progress/multiwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type MultiWriter struct {
meta map[string]interface{}
}

var _ rawProgressWriter = &MultiWriter{}

func NewMultiWriter(opts ...WriterOption) *MultiWriter {
mw := &MultiWriter{
writers: map[rawProgressWriter]struct{}{},
Expand Down
Loading