Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache slow/computed keys using an LRU #65

Merged
merged 1 commit into from
May 30, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 86 additions & 33 deletions solver/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

const (
runOnceLRUSize = 2000
slowCacheLRUSize = 5000
parallelGuardWait = time.Millisecond * 100
)

Expand Down Expand Up @@ -65,14 +66,46 @@ func (s *runOnceCtrl) hasRun(d digest.Digest, sessionID string) bool {
return ret
}

type slowCacheStore struct {
lru *simplelru.LRU
mu sync.Mutex
}

func newSlowCacheStore() *slowCacheStore {
lru, _ := simplelru.NewLRU(slowCacheLRUSize, nil) // Error impossible on positive first argument.
return &slowCacheStore{lru: lru}
}

func (s *slowCacheStore) get(cacheKey digest.Digest, refID string) (digest.Digest, bool) {
s.mu.Lock()
defer s.mu.Unlock()

key := fmt.Sprintf("%s:%s", cacheKey, refID)
v, ok := s.lru.Get(key)
if !ok {
return "", false
}

return v.(digest.Digest), true
}

func (s *slowCacheStore) set(cacheKey digest.Digest, refID string, slow digest.Digest) {
s.mu.Lock()
defer s.mu.Unlock()

key := fmt.Sprintf("%s:%s", cacheKey, refID)
s.lru.Add(key, slow)
}

type simpleSolver struct {
resolveOpFunc ResolveOpFunc
isRunOnceFunc IsRunOnceFunc
commitRefFunc CommitRefFunc
solver *Solver
parallelGuard *parallelGuard
resultSource ResultSource
runOnceCtrl *runOnceCtrl
resolveOpFunc ResolveOpFunc
isRunOnceFunc IsRunOnceFunc
commitRefFunc CommitRefFunc
solver *Solver
parallelGuard *parallelGuard
resultSource ResultSource
runOnceCtrl *runOnceCtrl
slowCacheStore *slowCacheStore
}

func newSimpleSolver(
Expand All @@ -83,13 +116,14 @@ func newSimpleSolver(
isRunOnceFunc IsRunOnceFunc,
) *simpleSolver {
return &simpleSolver{
parallelGuard: newParallelGuard(parallelGuardWait),
resolveOpFunc: resolveOpFunc,
commitRefFunc: commitRefFunc,
solver: solver,
resultSource: resultSource,
isRunOnceFunc: isRunOnceFunc,
runOnceCtrl: newRunOnceCtrl(),
parallelGuard: newParallelGuard(parallelGuardWait),
resolveOpFunc: resolveOpFunc,
commitRefFunc: commitRefFunc,
solver: solver,
resultSource: resultSource,
isRunOnceFunc: isRunOnceFunc,
runOnceCtrl: newRunOnceCtrl(),
slowCacheStore: newSlowCacheStore(),
}
}

Expand All @@ -103,6 +137,8 @@ func (s *simpleSolver) build(ctx context.Context, job *Job, e Edge) (CachedResul

runCacheMan := newCacheKeyManager()

closers := []func(context.Context) error{}

for _, d := range digests {
vertex, ok := vertices[d]
if !ok {
Expand All @@ -114,17 +150,14 @@ func (s *simpleSolver) build(ctx context.Context, job *Job, e Edge) (CachedResul
return nil, err
}

// Release previous result as this is not the final return value.
if ret != nil {
err := ret.Release(ctx)
if err != nil {
return nil, err
}
}
closers = append(closers, func(ctx context.Context) error {
return res.Release(ctx)
})

ret = res

// Hijack the CacheKey type in order to export a reference from the new cache key to the ref ID.
// Hijack the CacheKey type in order to export a reference from the new
// cache key to the ref ID.
expKeys = append(expKeys, ExportableCacheKey{
CacheKey: &CacheKey{
ID: res.ID(),
Expand All @@ -134,10 +167,22 @@ func (s *simpleSolver) build(ctx context.Context, job *Job, e Edge) (CachedResul
})
}

// Defer releasing of results until this build has finished to limit
// performance impact.
go func() {
ctx := context.Background()
for i := len(closers) - 1; i >= 0; i-- {
if err := closers[i](ctx); err != nil {
bklog.G(ctx).Warnf("failed to release: %v", err)
}
}
}()

return NewCachedResult(ret, expKeys), nil
}

func (s *simpleSolver) buildOne(ctx context.Context, runCacheMan *cacheKeyManager, d digest.Digest, vertex Vertex, job *Job, e Edge) (Result, digest.Digest, error) {

st := s.state(vertex, job)

// Add cache opts to context as they will be accessed by cache retrieval.
Expand Down Expand Up @@ -362,25 +407,33 @@ func (s *simpleSolver) preprocessInputs(ctx context.Context, runCacheMan *cacheK

// ComputeDigestFunc will usually checksum files. This is then used as
// part of the cache key to ensure it's consistent & distinct for this
// operation.
// operation. The key is then cached based on the key calculated from
// all ancestors & the result ID.
if dep.ComputeDigestFunc != nil {
compDigest, err := dep.ComputeDigestFunc(ctx, res, st)
if err != nil {
bklog.G(ctx).Warnf("failed to compute digest: %v", err)
return nil, err
cachedSlowKey, ok := s.slowCacheStore.get(cacheKey, res.ID())
if ok {
scm.deps[i].computed = cachedSlowKey
} else {
scm.deps[i].computed = compDigest
slowKey, err := dep.ComputeDigestFunc(ctx, res, st)
if err != nil {
bklog.G(ctx).Warnf("failed to compute digest: %v", err)
return nil, err
} else {
scm.deps[i].computed = slowKey
s.slowCacheStore.set(cacheKey, res.ID(), slowKey)
}
}
}

// The result can be released now that the preprocess & slow cache
// digest functions have been run. This is crucial as failing to do so
// will lead to full file copying from previously executed source
// operations.
err = res.Release(ctx)
if err != nil {
return nil, err
}
// operations. Releasing can be slow, so we run these concurrently.
go func() {
if err := res.Release(ctx); err != nil {
bklog.G(ctx).Warnf("failed to release result: %v", err)
}
}()

// Add input references to the struct as to link dependencies.
scm.inputs[i] = in.Vertex.Digest()
Expand Down
Loading