From abb84c99a6a379d6d631b1d902df26c888772edc Mon Sep 17 00:00:00 2001 From: Mike Date: Sat, 11 May 2024 15:15:29 -0700 Subject: [PATCH] Custom inline cache implementation --- cache/remotecache/import.go | 5 +- cache/remotecache/inline.go | 253 ++++++++++++++++++++++ cache/remotecache/registry/inline.go | 58 +++++ cmd/buildkitd/main.go | 1 + control/control.go | 3 + executor/runcexecutor/monitor_stats.go | 2 +- exporter/containerimage/exptypes/types.go | 1 + exporter/containerimage/writer.go | 20 +- exporter/earthlyoutputs/export.go | 7 + solver/jobs.go | 37 ++-- solver/llbsolver/bridge.go | 120 +++++----- solver/llbsolver/inline.go | 94 ++++++++ solver/llbsolver/solver.go | 58 +++-- solver/simple.go | 244 +++++++++------------ worker/simple.go | 113 ++++++++-- 15 files changed, 768 insertions(+), 248 deletions(-) create mode 100644 cache/remotecache/inline.go create mode 100644 cache/remotecache/registry/inline.go create mode 100644 solver/llbsolver/inline.go diff --git a/cache/remotecache/import.go b/cache/remotecache/import.go index 99b9695f8..22244ea45 100644 --- a/cache/remotecache/import.go +++ b/cache/remotecache/import.go @@ -302,8 +302,9 @@ type image struct { Rootfs struct { DiffIDs []digest.Digest `json:"diff_ids"` } `json:"rootfs"` - Cache []byte `json:"moby.buildkit.cache.v0"` - History []struct { + Cache []byte `json:"moby.buildkit.cache.v0"` + EarthlyInlineCache []byte `json:"earthly.inlinecache.v0"` + History []struct { Created *time.Time `json:"created,omitempty"` CreatedBy string `json:"created_by,omitempty"` EmptyLayer bool `json:"empty_layer,omitempty"` diff --git a/cache/remotecache/inline.go b/cache/remotecache/inline.go new file mode 100644 index 000000000..fafeafa74 --- /dev/null +++ b/cache/remotecache/inline.go @@ -0,0 +1,253 @@ +package remotecache + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/images" + "github.com/containerd/containerd/labels" + v1 "github.com/moby/buildkit/cache/remotecache/v1" + "github.com/moby/buildkit/solver" + "github.com/moby/buildkit/util/bklog" + "github.com/moby/buildkit/util/imageutil" + "github.com/moby/buildkit/util/progress" + "github.com/moby/buildkit/worker" + digest "github.com/opencontainers/go-digest" + ocispecs "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/pkg/errors" +) + +// earthlyInlineCacheItem stores a relation between a simple solver cache key & +// a remote image descriptor. Used for inline caching. +type earthlyInlineCacheItem struct { + Key digest.Digest `json:"cacheKey"` + Descriptor digest.Digest `json:"descriptor"` +} + +// EarthlyInlineCacheRemotes produces a map of cache keys to remote sources by +// parsing inline-cache metadata from a remote image's config data. +func EarthlyInlineCacheRemotes(ctx context.Context, provider content.Provider, desc ocispecs.Descriptor, w worker.Worker) (map[digest.Digest]*solver.Remote, error) { + dt, err := readBlob(ctx, provider, desc) + if err != nil { + return nil, err + } + + manifestType, err := imageutil.DetectManifestBlobMediaType(dt) + if err != nil { + return nil, err + } + + layerDone := progress.OneOff(ctx, fmt.Sprintf("inferred cache manifest type: %s", manifestType)) + layerDone(nil) + + configDesc, err := configDescriptor(dt, manifestType) + if err != nil { + return nil, err + } + + if configDesc.Digest != "" { + return nil, errors.New("expected empty digest value") + } + + m := map[digest.Digest][]byte{} + + if err := allDistributionManifests(ctx, provider, dt, m); err != nil { + return nil, err + } + + remotes := map[digest.Digest]*solver.Remote{} + + for _, dt := range m { + var m ocispecs.Manifest + + if err := json.Unmarshal(dt, &m); err != nil { + return nil, errors.Wrap(err, "failed to unmarshal manifest") + } + + if m.Config.Digest == "" || len(m.Layers) == 0 { + continue + } + + p, err := content.ReadBlob(ctx, provider, m.Config) + if err != nil { + return nil, errors.Wrap(err, "failed to read blob") + } + + var img image + + if err := json.Unmarshal(p, &img); err != nil { + return nil, errors.Wrap(err, "failed to unmarshal image") + } + + if len(img.Rootfs.DiffIDs) != len(m.Layers) { + bklog.G(ctx).Warnf("invalid image with mismatching manifest and config") + continue + } + + if img.EarthlyInlineCache == nil { + continue + } + + cacheItems := []earthlyInlineCacheItem{} + if err := json.Unmarshal(img.EarthlyInlineCache, &cacheItems); err != nil { + return nil, errors.Wrap(err, "failed to unmarshal cache items") + } + + layers, err := preprocessLayers(img, m) + if err != nil { + return nil, err + } + + found := extractRemotes(provider, cacheItems, layers) + for key, remote := range found { + remotes[key] = remote + } + } + + return remotes, nil +} + +// extractRemotes constructs a list of descriptors--which represent the layer +// chain for the given digest--for each of the items discovered in the inline +// metadata. +func extractRemotes(provider content.Provider, cacheItems []earthlyInlineCacheItem, layers []ocispecs.Descriptor) map[digest.Digest]*solver.Remote { + + remotes := map[digest.Digest]*solver.Remote{} + + for _, cacheItem := range cacheItems { + descs := []ocispecs.Descriptor{} + + found := false + for _, layer := range layers { + descs = append(descs, layer) + if layer.Digest == cacheItem.Descriptor { + found = true + break + } + } + + if found { + remote := &solver.Remote{ + Descriptors: descs, + Provider: provider, + } + + remotes[cacheItem.Key] = remote + } + } + + return remotes +} + +// preprocessLayers adds custom annotations which are used later when +// reconstructing the ref. +func preprocessLayers(img image, m ocispecs.Manifest) ([]ocispecs.Descriptor, error) { + createdDates, createdMsg, err := parseCreatedLayerInfo(img) + if err != nil { + return nil, err + } + + n := len(m.Layers) + + if len(createdDates) != n { + return nil, errors.New("unexpected creation dates length") + } + + if len(createdMsg) != n { + return nil, errors.New("unexpected creation messages length") + } + + if len(img.Rootfs.DiffIDs) != n { + return nil, errors.New("unexpected rootfs diff IDs") + } + + ret := []ocispecs.Descriptor{} + + for i, layer := range m.Layers { + if layer.Annotations == nil { + layer.Annotations = map[string]string{} + } + + if createdAt := createdDates[i]; createdAt != "" { + layer.Annotations["buildkit/createdat"] = createdAt + } + + if createdBy := createdMsg[i]; createdBy != "" { + layer.Annotations["buildkit/description"] = createdBy + } + + layer.Annotations[labels.LabelUncompressed] = img.Rootfs.DiffIDs[i].String() + + ret = append(ret, layer) + } + + return ret, nil +} + +// configDescriptor parses and returns the correct manifest for the given manifest type. +func configDescriptor(dt []byte, manifestType string) (ocispecs.Descriptor, error) { + var configDesc ocispecs.Descriptor + + switch manifestType { + case images.MediaTypeDockerSchema2ManifestList, ocispecs.MediaTypeImageIndex: + var mfst ocispecs.Index + if err := json.Unmarshal(dt, &mfst); err != nil { + return ocispecs.Descriptor{}, err + } + + for _, m := range mfst.Manifests { + if m.MediaType == v1.CacheConfigMediaTypeV0 { + configDesc = m + continue + } + } + case images.MediaTypeDockerSchema2Manifest, ocispecs.MediaTypeImageManifest: + var mfst ocispecs.Manifest + if err := json.Unmarshal(dt, &mfst); err != nil { + return ocispecs.Descriptor{}, err + } + + if mfst.Config.MediaType == v1.CacheConfigMediaTypeV0 { + configDesc = mfst.Config + } + default: + return ocispecs.Descriptor{}, errors.Errorf("unsupported or uninferrable manifest type %s", manifestType) + } + + return configDesc, nil +} + +// allDistributionManifests pulls all manifest data & linked manifests using the provider. +func allDistributionManifests(ctx context.Context, provider content.Provider, dt []byte, m map[digest.Digest][]byte) error { + mt, err := imageutil.DetectManifestBlobMediaType(dt) + if err != nil { + return err + } + + switch mt { + case images.MediaTypeDockerSchema2Manifest, ocispecs.MediaTypeImageManifest: + m[digest.FromBytes(dt)] = dt + case images.MediaTypeDockerSchema2ManifestList, ocispecs.MediaTypeImageIndex: + var index ocispecs.Index + if err := json.Unmarshal(dt, &index); err != nil { + return errors.WithStack(err) + } + + for _, d := range index.Manifests { + if _, ok := m[d.Digest]; ok { + continue + } + p, err := content.ReadBlob(ctx, provider, d) + if err != nil { + return errors.WithStack(err) + } + if err := allDistributionManifests(ctx, provider, p, m); err != nil { + return err + } + } + } + + return nil +} diff --git a/cache/remotecache/registry/inline.go b/cache/remotecache/registry/inline.go new file mode 100644 index 000000000..267e1dca4 --- /dev/null +++ b/cache/remotecache/registry/inline.go @@ -0,0 +1,58 @@ +package registry + +import ( + "context" + "strconv" + + "github.com/containerd/containerd/remotes/docker" + "github.com/moby/buildkit/cache/remotecache" + "github.com/moby/buildkit/session" + "github.com/moby/buildkit/solver" + "github.com/moby/buildkit/util/contentutil" + "github.com/moby/buildkit/util/resolver" + "github.com/moby/buildkit/util/resolver/limited" + "github.com/moby/buildkit/worker" + digest "github.com/opencontainers/go-digest" + "github.com/pkg/errors" +) + +// EarthlyInlineCacheRemotes fetches a group of remote sources based on values +// discovered in a remote image's inline-cache metadata field. +func EarthlyInlineCacheRemotes(ctx context.Context, sm *session.Manager, w worker.Worker, hosts docker.RegistryHosts, g session.Group, attrs map[string]string) (map[digest.Digest]*solver.Remote, error) { + ref, err := canonicalizeRef(attrs[attrRef]) + if err != nil { + return nil, err + } + + refString := ref.String() + + insecure := false + if v, ok := attrs[attrInsecure]; ok { + val, err := strconv.ParseBool(v) + if err != nil { + return nil, errors.Wrapf(err, "failed to parse %s", attrInsecure) + } + insecure = val + } + + scope, hosts := registryConfig(hosts, ref, "pull", insecure) + remote := resolver.DefaultPool.GetResolver(hosts, refString, scope, sm, g) + + xref, desc, err := remote.Resolve(ctx, refString) + if err != nil { + return nil, err + } + + fetcher, err := remote.Fetcher(ctx, xref) + if err != nil { + return nil, err + } + + src := &withDistributionSourceLabel{ + Provider: contentutil.FromFetcher(limited.Default.WrapFetcher(fetcher, refString)), + ref: refString, + source: w.ContentStore(), + } + + return remotecache.EarthlyInlineCacheRemotes(ctx, src, desc, w) +} diff --git a/cmd/buildkitd/main.go b/cmd/buildkitd/main.go index 0b0d61498..c59c2cc12 100644 --- a/cmd/buildkitd/main.go +++ b/cmd/buildkitd/main.go @@ -844,6 +844,7 @@ func newController(c *cli.Context, cfg *config.Config, shutdownCh chan struct{}) ContentStore: w.ContentStore(), HistoryConfig: cfg.History, RootDir: cfg.Root, + RegistryHosts: resolverFn, }) } diff --git a/control/control.go b/control/control.go index 7b7a158e8..d3422f0aa 100644 --- a/control/control.go +++ b/control/control.go @@ -11,6 +11,7 @@ import ( contentapi "github.com/containerd/containerd/api/services/content/v1" "github.com/containerd/containerd/content" + "github.com/containerd/containerd/remotes/docker" "github.com/containerd/containerd/services/content/contentserver" "github.com/distribution/reference" "github.com/hashicorp/go-multierror" @@ -69,6 +70,7 @@ type Opt struct { ContentStore *containerdsnapshot.Store HistoryConfig *config.HistoryConfig RootDir string + RegistryHosts docker.RegistryHosts } type Controller struct { // TODO: ControlService @@ -107,6 +109,7 @@ func NewController(opt Opt) (*Controller, error) { Entitlements: opt.Entitlements, HistoryQueue: hq, RootDir: opt.RootDir, + RegistryHosts: opt.RegistryHosts, }) if err != nil { return nil, errors.Wrap(err, "failed to create solver") diff --git a/executor/runcexecutor/monitor_stats.go b/executor/runcexecutor/monitor_stats.go index 931ae041d..e1a03e5c1 100644 --- a/executor/runcexecutor/monitor_stats.go +++ b/executor/runcexecutor/monitor_stats.go @@ -53,7 +53,7 @@ func (w *runcExecutor) monitorContainerStats(ctx context.Context, id string, sam for { select { case <-ctx.Done(): - bklog.G(ctx).Infof("stats collection context done: %v", ctx.Err()) + bklog.G(ctx).Debugf("stats collection context done: %v", ctx.Err()) return case <-timer.C: // Initial sleep will give container the chance to start. stats, err := w.runc.Stats(ctx, id) diff --git a/exporter/containerimage/exptypes/types.go b/exporter/containerimage/exptypes/types.go index c4d5721ea..83a85c379 100644 --- a/exporter/containerimage/exptypes/types.go +++ b/exporter/containerimage/exptypes/types.go @@ -11,6 +11,7 @@ const ( ExporterImageConfigDigestKey = "containerimage.config.digest" ExporterImageDescriptorKey = "containerimage.descriptor" ExporterInlineCache = "containerimage.inlinecache" + EarthlyInlineCache = "earthly.inlinecache" ExporterPlatformsKey = "refs.platforms" ) diff --git a/exporter/containerimage/writer.go b/exporter/containerimage/writer.go index 0c1e91bf1..22525b579 100644 --- a/exporter/containerimage/writer.go +++ b/exporter/containerimage/writer.go @@ -138,6 +138,7 @@ func (ic *ImageWriter) Commit(ctx context.Context, inp *exporter.Source, session config := exptypes.ParseKey(inp.Metadata, exptypes.ExporterImageConfigKey, p) inlineCache := exptypes.ParseKey(inp.Metadata, exptypes.ExporterInlineCache, p) + earthlyInlineCache := exptypes.ParseKey(inp.Metadata, exptypes.EarthlyInlineCache, p) remote := &remotes[0] if opts.RewriteTimestamp { remote, err = ic.rewriteRemoteWithEpoch(ctx, opts, remote) @@ -145,7 +146,7 @@ func (ic *ImageWriter) Commit(ctx context.Context, inp *exporter.Source, session return nil, err } } - mfstDesc, configDesc, err := ic.commitDistributionManifest(ctx, opts, ref, config, remote, annotations, inlineCache, opts.Epoch, session.NewGroup(sessionID)) + mfstDesc, configDesc, err := ic.commitDistributionManifest(ctx, opts, ref, config, remote, annotations, inlineCache, earthlyInlineCache, opts.Epoch, session.NewGroup(sessionID)) if err != nil { return nil, err } @@ -203,6 +204,7 @@ func (ic *ImageWriter) Commit(ctx context.Context, inp *exporter.Source, session } config := exptypes.ParseKey(inp.Metadata, exptypes.ExporterImageConfigKey, p) inlineCache := exptypes.ParseKey(inp.Metadata, exptypes.ExporterInlineCache, p) + earthlyInlineCache := exptypes.ParseKey(inp.Metadata, exptypes.EarthlyInlineCache, p) remote := &remotes[remotesMap[p.ID]] if remote == nil { @@ -218,7 +220,7 @@ func (ic *ImageWriter) Commit(ctx context.Context, inp *exporter.Source, session } } - desc, _, err := ic.commitDistributionManifest(ctx, opts, r, config, remote, opts.Annotations.Platform(&p.Platform), inlineCache, opts.Epoch, session.NewGroup(sessionID)) + desc, _, err := ic.commitDistributionManifest(ctx, opts, r, config, remote, opts.Annotations.Platform(&p.Platform), inlineCache, earthlyInlineCache, opts.Epoch, session.NewGroup(sessionID)) if err != nil { return nil, err } @@ -388,7 +390,7 @@ func (ic *ImageWriter) rewriteRemoteWithEpoch(ctx context.Context, opts *ImageCo }, nil } -func (ic *ImageWriter) commitDistributionManifest(ctx context.Context, opts *ImageCommitOpts, ref cache.ImmutableRef, config []byte, remote *solver.Remote, annotations *Annotations, inlineCache []byte, epoch *time.Time, sg session.Group) (*ocispecs.Descriptor, *ocispecs.Descriptor, error) { +func (ic *ImageWriter) commitDistributionManifest(ctx context.Context, opts *ImageCommitOpts, ref cache.ImmutableRef, config []byte, remote *solver.Remote, annotations *Annotations, inlineCache, earthlyInlineCache []byte, epoch *time.Time, sg session.Group) (*ocispecs.Descriptor, *ocispecs.Descriptor, error) { if len(config) == 0 { var err error config, err = defaultImageConfig() @@ -407,7 +409,7 @@ func (ic *ImageWriter) commitDistributionManifest(ctx context.Context, opts *Ima return nil, nil, err } - config, err = patchImageConfig(config, remote.Descriptors, history, inlineCache, epoch) + config, err = patchImageConfig(config, remote.Descriptors, history, inlineCache, earthlyInlineCache, epoch) if err != nil { return nil, nil, err } @@ -633,7 +635,7 @@ func parseHistoryFromConfig(dt []byte) ([]ocispecs.History, error) { return config.History, nil } -func patchImageConfig(dt []byte, descs []ocispecs.Descriptor, history []ocispecs.History, cache []byte, epoch *time.Time) ([]byte, error) { +func patchImageConfig(dt []byte, descs []ocispecs.Descriptor, history []ocispecs.History, cache, earthlyInlineCache []byte, epoch *time.Time) ([]byte, error) { m := map[string]json.RawMessage{} if err := json.Unmarshal(dt, &m); err != nil { return nil, errors.Wrap(err, "failed to parse image config for patch") @@ -701,6 +703,14 @@ func patchImageConfig(dt []byte, descs []ocispecs.Descriptor, history []ocispecs m["moby.buildkit.cache.v0"] = dt } + if earthlyInlineCache != nil { + dt, err := json.Marshal(earthlyInlineCache) + if err != nil { + return nil, err + } + m["earthly.inlinecache.v0"] = dt + } + dt, err = json.Marshal(m) return dt, errors.Wrap(err, "failed to marshal config after patch") } diff --git a/exporter/earthlyoutputs/export.go b/exporter/earthlyoutputs/export.go index 06c74cb8a..c4c02df7c 100644 --- a/exporter/earthlyoutputs/export.go +++ b/exporter/earthlyoutputs/export.go @@ -268,6 +268,13 @@ func (e *imageExporterInstance) Export(ctx context.Context, src *exporter.Source simpleMd[exptypes.ExporterInlineCache] = inlineCache } + // TODO: Remove the above (legacy) option. + earthlyInlineCacheK := fmt.Sprintf("%s/%s", exptypes.EarthlyInlineCache, k) + earthlyInlineCache, ok := src.Metadata[earthlyInlineCacheK] + if ok { + simpleMd[exptypes.EarthlyInlineCache] = earthlyInlineCache + } + opts := e.opts as, _, err := containerimage.ParseAnnotations(simpleMd) if err != nil { diff --git a/solver/jobs.go b/solver/jobs.go index 5116644d1..b23a853a6 100644 --- a/solver/jobs.go +++ b/solver/jobs.go @@ -256,37 +256,36 @@ type Job struct { } type SolverOpt struct { - ResolveOpFunc ResolveOpFunc - DefaultCache CacheManager - WorkerResultGetter workerResultGetter - CommitRefFunc CommitRefFunc - RootDir string + ResolveOpFunc ResolveOpFunc + DefaultCache CacheManager + ResultSource ResultSource + RefIDStore *RefIDStore + CommitRefFunc CommitRefFunc } func NewSolver(opts SolverOpt) *Solver { if opts.DefaultCache == nil { opts.DefaultCache = NewInMemoryCacheManager() } - jl := &Solver{ + solver := &Solver{ jobs: make(map[string]*Job), actives: make(map[digest.Digest]*state), opts: opts, index: newEdgeIndex(), } - // TODO: This should be hoisted up a few layers as not to be bound to the - // original solver. For now, we just need a convenient place to initialize - // it once. - c, err := newDiskCache(opts.WorkerResultGetter, opts.RootDir) - if err != nil { - panic(err) // TODO: Handle error appropriately once the new solver code is moved. - } - simple := newSimpleSolver(opts.ResolveOpFunc, opts.CommitRefFunc, jl, c) - jl.simple = simple - - jl.s = newScheduler(jl) - jl.updateCond = sync.NewCond(jl.mu.RLocker()) - return jl + simple := newSimpleSolver( + opts.ResolveOpFunc, + opts.CommitRefFunc, + solver, + opts.RefIDStore, + opts.ResultSource, + ) + solver.simple = simple + + solver.s = newScheduler(solver) + solver.updateCond = sync.NewCond(solver.mu.RLocker()) + return solver } func (jl *Solver) setEdge(e Edge, newEdge *edge) { diff --git a/solver/llbsolver/bridge.go b/solver/llbsolver/bridge.go index 8bfd96e46..e445c9539 100644 --- a/solver/llbsolver/bridge.go +++ b/solver/llbsolver/bridge.go @@ -8,9 +8,11 @@ import ( "time" "github.com/containerd/containerd/platforms" + "github.com/containerd/containerd/remotes/docker" "github.com/mitchellh/hashstructure/v2" "github.com/moby/buildkit/cache" "github.com/moby/buildkit/cache/remotecache" + "github.com/moby/buildkit/cache/remotecache/registry" "github.com/moby/buildkit/client" "github.com/moby/buildkit/client/llb" "github.com/moby/buildkit/exporter" @@ -31,7 +33,6 @@ import ( "github.com/moby/buildkit/worker" digest "github.com/opencontainers/go-digest" "github.com/pkg/errors" - "golang.org/x/sync/errgroup" ) type llbBridge struct { @@ -43,6 +44,10 @@ type llbBridge struct { cms map[string]solver.CacheManager cmsMu sync.Mutex sm *session.Manager + registryHosts docker.RegistryHosts + workerRemoteSource *worker.WorkerRemoteSource + importDone map[string]chan struct{} + importMu sync.Mutex } func (b *llbBridge) Warn(ctx context.Context, dgst digest.Digest, msg string, opts frontend.WarnOpts) error { @@ -78,11 +83,6 @@ func (b *llbBridge) loadResult(ctx context.Context, def *pb.Definition, cacheImp return nil, err } - // TODO FIXME earthly-specific wait group is required to ensure the remotecache/registry's ResolveCacheImporterFunc can run - // which requires the session to remain open in order to get dockerhub (or any other registry) credentials. - // It seems like the cleaner approach is to bake this in somewhere into the edge or Load - eg, _ := errgroup.WithContext(ctx) - srcPol, err := loadSourcePolicy(b.builder) if err != nil { return nil, err @@ -94,62 +94,13 @@ func (b *llbBridge) loadResult(ctx context.Context, def *pb.Definition, cacheImp } polEngine = sourcepolicy.NewEngine(pol) - if err != nil { - return nil, err - } - } - var cms []solver.CacheManager - for _, im := range cacheImports { - cmID, err := cmKey(im) - if err != nil { - return nil, err - } - b.cmsMu.Lock() - var cm solver.CacheManager - if prevCm, ok := b.cms[cmID]; !ok { - func(cmID string, im gw.CacheOptionsEntry) { - cm = newLazyCacheManager(cmID, func() (solver.CacheManager, error) { - var cmNew solver.CacheManager - if err := inBuilderContext(context.TODO(), b.builder, "importing cache manifest from "+cmID, "", func(ctx context.Context, g session.Group) error { - resolveCI, ok := b.resolveCacheImporterFuncs[im.Type] - if !ok { - return errors.Errorf("unknown cache importer: %s", im.Type) - } - ci, desc, err := resolveCI(ctx, g, im.Attrs) - if err != nil { - return errors.Wrapf(err, "failed to configure %v cache importer", im.Type) - } - cmNew, err = ci.Resolve(ctx, desc, cmID, w) - return err - }); err != nil { - bklog.G(ctx).Debugf("error while importing cache manifest from cmId=%s: %v", cmID, err) - return nil, err - } - return cmNew, nil - }) - - cmInst := cm - eg.Go(func() error { - if lcm, ok := cmInst.(*lazyCacheManager); ok { - lcm.wait() - } - return nil - }) - }(cmID, im) - b.cms[cmID] = cm - } else { - cm = prevCm - } - cms = append(cms, cm) - b.cmsMu.Unlock() - } - err = eg.Wait() - if err != nil { - return nil, err } + + b.processImports(ctx, cacheImports, w) + dpc := &detectPrunedCacheID{} - edge, err := Load(ctx, def, polEngine, dpc.Load, ValidateEntitlements(ent), WithCacheSources(cms), NormalizeRuntimePlatforms(), WithValidateCaps()) + edge, err := Load(ctx, def, polEngine, dpc.Load, ValidateEntitlements(ent), NormalizeRuntimePlatforms(), WithValidateCaps()) if err != nil { return nil, errors.Wrap(err, "failed to load LLB") } @@ -173,6 +124,57 @@ func (b *llbBridge) loadResult(ctx context.Context, def *pb.Definition, cacheImp return res, nil } +func (b *llbBridge) processImports(ctx context.Context, cacheImports []gw.CacheOptionsEntry, w worker.Worker) { + var importRefs []string + + // Earthly custom inline cache handling. Other cache import types are ignored. + for _, cacheImport := range cacheImports { + if cacheImport.Type != "registry" { + continue + } + + importRef := cacheImport.Attrs["ref"] + importRefs = append(importRefs, importRef) + + b.importMu.Lock() + _, ok := b.importDone[importRef] + if ok { + b.importMu.Unlock() + continue + } + done := make(chan struct{}) + b.importDone[importRef] = done + b.importMu.Unlock() + + remotes := map[digest.Digest]*solver.Remote{} + name := fmt.Sprintf("importing cache manifest from %s", importRef) + + err := inBuilderContext(ctx, b.builder, name, "", func(ctx context.Context, g session.Group) error { + var err error + remotes, err = registry.EarthlyInlineCacheRemotes(ctx, b.sm, w, b.registryHosts, g, cacheImport.Attrs) + return err + }) + if err != nil { + bklog.G(ctx).Warnf("failed to import cache manifest from %s", importRef) + } + + if len(remotes) > 0 { + for cacheKey, remote := range remotes { + b.workerRemoteSource.AddResult(ctx, cacheKey, remote) + } + } + + close(done) + } + + for _, importRef := range importRefs { + b.importMu.Lock() + done := b.importDone[importRef] + b.importMu.Unlock() + <-done + } +} + // getExporter is earthly specific code which extracts the configured exporter // from the job's metadata func (b *llbBridge) getExporter(ctx context.Context) (*ExporterRequest, error) { diff --git a/solver/llbsolver/inline.go b/solver/llbsolver/inline.go new file mode 100644 index 000000000..133aa13e3 --- /dev/null +++ b/solver/llbsolver/inline.go @@ -0,0 +1,94 @@ +package llbsolver + +import ( + "context" + "encoding/json" + "fmt" + + cacheconfig "github.com/moby/buildkit/cache/config" + "github.com/moby/buildkit/exporter" + "github.com/moby/buildkit/exporter/containerimage/exptypes" + "github.com/moby/buildkit/session" + "github.com/moby/buildkit/solver" + "github.com/moby/buildkit/solver/result" + "github.com/moby/buildkit/worker" + digest "github.com/opencontainers/go-digest" + "github.com/pkg/errors" +) + +type earthlyInlineCacheItem struct { + Key digest.Digest `json:"cacheKey"` + Descriptor digest.Digest `json:"descriptor"` +} + +// earthlyInlineCache attaches custom "inline cache" metadata which can be used +// by a new build to load image layer blobs and use them as cache results. +func earthlyInlineCache(ctx context.Context, job *solver.Job, exp exporter.ExporterInstance, cached *result.Result[solver.CachedResult]) (map[string][]byte, error) { + if cached.Ref != nil { + return nil, errors.New("unexpected ref") + } + + meta := map[string][]byte{} + + err := inBuilderContext(ctx, job, "preparing layers for inline cache", job.SessionID+"-cache-inline", func(ctx context.Context, _ session.Group) error { + for k, res := range cached.Refs { + val, err := earthlyInlineCacheDigests(ctx, job, exp, res) + if err != nil { + return err + } + meta[fmt.Sprintf("%s/%s", exptypes.EarthlyInlineCache, k)] = val + } + return nil + }) + + if err != nil { + return nil, err + } + + return meta, nil +} + +// earthlyInlineCacheDigests creates a map of computed cache keys to manifest +// layer hashes which will be used to load inline cache blobs. +func earthlyInlineCacheDigests(ctx context.Context, job *solver.Job, exp exporter.ExporterInstance, res solver.CachedResult) ([]byte, error) { + workerRef, ok := res.Sys().(*worker.WorkerRef) + if !ok { + return nil, errors.Errorf("invalid reference: %T", res.Sys()) + } + + sess := session.NewGroup(job.SessionID) + + remotes, err := workerRef.GetRemotes(ctx, true, cacheconfig.RefConfig{Compression: exp.Config().Compression()}, false, sess) + if err != nil || len(remotes) == 0 { + return nil, nil + } + + var ( + remote = remotes[0] + cacheItems = []earthlyInlineCacheItem{} + cacheKeys = res.CacheKeys() + ) + + for i := 0; i < len(cacheKeys) && i < len(remote.Descriptors); i++ { + cacheItems = append(cacheItems, earthlyInlineCacheItem{ + Key: cacheKeys[i].Digest(), + Descriptor: remote.Descriptors[i].Digest, + }) + } + + val, err := json.Marshal(cacheItems) + if err != nil { + return nil, err + } + + return val, nil +} + +func hasInlineCacheExporter(exporters []RemoteCacheExporter) bool { + for _, exp := range exporters { + if _, ok := asInlineCache(exp.Exporter); ok { + return true + } + } + return false +} diff --git a/solver/llbsolver/solver.go b/solver/llbsolver/solver.go index 5cc90aa5f..12dae8052 100644 --- a/solver/llbsolver/solver.go +++ b/solver/llbsolver/solver.go @@ -9,6 +9,7 @@ import ( "sync" "time" + "github.com/containerd/containerd/remotes/docker" intoto "github.com/in-toto/in-toto-golang/in_toto" slsa02 "github.com/in-toto/in-toto-golang/in_toto/slsa_provenance/v0.2" controlapi "github.com/moby/buildkit/api/services/control" @@ -80,6 +81,7 @@ type Opt struct { HistoryQueue *HistoryQueue ResourceMonitor *resources.Monitor RootDir string + RegistryHosts docker.RegistryHosts } type Solver struct { @@ -94,6 +96,8 @@ type Solver struct { entitlements []string history *HistoryQueue sysSampler *resources.Sampler[*resourcetypes.SysSample] + registryHosts docker.RegistryHosts + workerRemoteSource *worker.WorkerRemoteSource } // Processor defines a processing function to be applied after solving, but @@ -101,6 +105,13 @@ type Solver struct { type Processor func(ctx context.Context, result *Result, s *Solver, j *solver.Job, usage *resources.SysSampler) (*Result, error) func New(opt Opt) (*Solver, error) { + defaultWorker, err := opt.WorkerController.GetDefault() + if err != nil { + return nil, err + } + + remoteSource := worker.NewWorkerRemoteSource(defaultWorker) + s := &Solver{ workerController: opt.WorkerController, resolveWorker: defaultResolver(opt.WorkerController), @@ -111,6 +122,8 @@ func New(opt Opt) (*Solver, error) { sm: opt.SessionManager, entitlements: opt.Entitlements, history: opt.HistoryQueue, + registryHosts: opt.RegistryHosts, + workerRemoteSource: remoteSource, } sampler, err := resources.NewSysSampler() @@ -119,12 +132,22 @@ func New(opt Opt) (*Solver, error) { } s.sysSampler = sampler + refIDStore, err := solver.NewRefIDStore(opt.RootDir) + if err != nil { + return nil, err + } + + sources := worker.NewCombinedResultSource( + worker.NewWorkerResultSource(opt.WorkerController, refIDStore), + remoteSource, + ) + s.solver = solver.NewSolver(solver.SolverOpt{ - ResolveOpFunc: s.resolver(), - DefaultCache: opt.CacheManager, - WorkerResultGetter: worker.NewWorkerResultGetter(opt.WorkerController), - CommitRefFunc: worker.FinalizeRef, - RootDir: opt.RootDir, + ResolveOpFunc: s.resolver(), + DefaultCache: opt.CacheManager, + ResultSource: sources, + CommitRefFunc: worker.FinalizeRef, + RefIDStore: refIDStore, }) return s, nil } @@ -148,6 +171,10 @@ func (s *Solver) bridge(b solver.Builder) *provenanceBridge { resolveCacheImporterFuncs: s.resolveCacheImporterFuncs, cms: map[string]solver.CacheManager{}, sm: s.sm, + registryHosts: s.registryHosts, + workerRemoteSource: s.workerRemoteSource, + importDone: map[string]chan struct{}{}, + importMu: sync.Mutex{}, }} } @@ -557,16 +584,17 @@ func (s *Solver) Solve(ctx context.Context, id string, sessionID string, req fro return nil, err } - cacheExporters, inlineCacheExporter := splitCacheExporters(exp.CacheExporters) - + cacheExporters, _ := splitCacheExporters(exp.CacheExporters) var exporterResponse map[string]string if e := exp.Exporter; e != nil { - meta, err := runInlineCacheExporter(ctx, e, inlineCacheExporter, j, cached) - if err != nil { - return nil, err - } - for k, v := range meta { - inp.AddMeta(k, v) + if hasInlineCacheExporter(exp.CacheExporters) { + meta, err := earthlyInlineCache(ctx, j, e, cached) + if err != nil { + return nil, errors.Wrap(err, "failed prepare inline cache") + } + for k, v := range meta { + inp.AddMeta(k, v) + } } if err := inBuilderContext(ctx, j, e.Name(), j.SessionID+"-export", func(ctx context.Context, _ session.Group) error { @@ -577,6 +605,7 @@ func (s *Solver) Solve(ctx context.Context, id string, sessionID string, req fro } } + // Deprecated. Can be removed later. cacheExporterResponse, err := runCacheExporters(ctx, cacheExporters, j, cached, inp) if err != nil { return nil, err @@ -602,6 +631,7 @@ func (s *Solver) Solve(ctx context.Context, id string, sessionID string, req fro }, nil } +// Deprecated. Can be removed later. func runCacheExporters(ctx context.Context, exporters []RemoteCacheExporter, j *solver.Job, cached *result.Result[solver.CachedResult], inp *result.Result[cache.ImmutableRef]) (map[string]string, error) { eg, ctx := errgroup.WithContext(ctx) g := session.NewGroup(j.SessionID) @@ -654,6 +684,7 @@ func runCacheExporters(ctx context.Context, exporters []RemoteCacheExporter, j * return cacheExporterResponse, nil } +// Deprecated. Can be removed later. func runInlineCacheExporter(ctx context.Context, e exporter.ExporterInstance, inlineExporter *RemoteCacheExporter, j *solver.Job, cached *result.Result[solver.CachedResult]) (map[string][]byte, error) { meta := map[string][]byte{} if inlineExporter == nil { @@ -835,6 +866,7 @@ func asInlineCache(e remotecache.Exporter) (inlineCacheExporter, bool) { return ie, ok } +// Deprecated. Can be removed later. func inlineCache(ctx context.Context, e remotecache.Exporter, res solver.CachedResult, compressionopt compression.Config, g session.Group) ([]byte, error) { ie, ok := asInlineCache(e) if !ok { diff --git a/solver/simple.go b/solver/simple.go index 4ecb3e369..274545d38 100644 --- a/solver/simple.go +++ b/solver/simple.go @@ -18,29 +18,41 @@ import ( bolt "go.etcd.io/bbolt" ) -var ErrRefNotFound = errors.New("ref not found") - // CommitRefFunc can be used to finalize a Result's ImmutableRef. type CommitRefFunc func(ctx context.Context, result Result) error +// ResultSource can be any source (local or remote) that allows one to load a +// Result using a cache key digest. +type ResultSource interface { + Load(ctx context.Context, cacheKey digest.Digest) (Result, bool, error) +} + type simpleSolver struct { resolveOpFunc ResolveOpFunc commitRefFunc CommitRefFunc solver *Solver parallelGuard *parallelGuard - resultCache resultCache + refIDStore *RefIDStore + resultSource ResultSource cacheKeyManager *cacheKeyManager mu sync.Mutex } -func newSimpleSolver(resolveOpFunc ResolveOpFunc, commitRefFunc CommitRefFunc, solver *Solver, cache resultCache) *simpleSolver { +func newSimpleSolver( + resolveOpFunc ResolveOpFunc, + commitRefFunc CommitRefFunc, + solver *Solver, + refIDStore *RefIDStore, + resultSource ResultSource, +) *simpleSolver { return &simpleSolver{ cacheKeyManager: newCacheKeyManager(), - resultCache: cache, parallelGuard: newParallelGuard(time.Millisecond * 100), resolveOpFunc: resolveOpFunc, commitRefFunc: commitRefFunc, solver: solver, + refIDStore: refIDStore, + resultSource: resultSource, } } @@ -49,7 +61,8 @@ func (s *simpleSolver) build(ctx context.Context, job *Job, e Edge) (CachedResul // Ordered list of vertices to build. digests, vertices := s.exploreVertices(e) - var ret CachedResult + var ret Result + var expKeys []ExportableCacheKey for _, d := range digests { vertex, ok := vertices[d] @@ -57,20 +70,29 @@ func (s *simpleSolver) build(ctx context.Context, job *Job, e Edge) (CachedResul return nil, errors.Errorf("digest %s not found", d) } - res, expCacheKeys, err := s.buildOne(ctx, d, vertex, job, e) + res, cacheKey, err := s.buildOne(ctx, d, vertex, job, e) if err != nil { return nil, err } - ret = NewCachedResult(res, expCacheKeys) + ret = res + + // Hijack the CacheKey type in order to export a reference from the new cache key to the ref ID. + expKeys = append(expKeys, ExportableCacheKey{ + CacheKey: &CacheKey{ + ID: res.ID(), + digest: cacheKey, + }, + Exporter: nil, // We're not using an exporter here. + }) } - return ret, nil + return NewCachedResult(ret, expKeys), nil } -func (s *simpleSolver) buildOne(ctx context.Context, d digest.Digest, vertex Vertex, job *Job, e Edge) (Result, []ExportableCacheKey, error) { +func (s *simpleSolver) buildOne(ctx context.Context, d digest.Digest, vertex Vertex, job *Job, e Edge) (Result, digest.Digest, error) { // Ensure we don't have multiple threads working on the same digest. - wait, done := s.parallelGuard.acquire(ctx, d.String()) + wait, done := s.parallelGuard.acquire(ctx, d) defer done() <-wait @@ -82,37 +104,33 @@ func (s *simpleSolver) buildOne(ctx context.Context, d digest.Digest, vertex Ver // CacheMap populates required fields in SourceOp. cm, err := st.op.CacheMap(ctx, int(e.Index)) if err != nil { - return nil, nil, err + return nil, "", err } inputs, err := s.preprocessInputs(ctx, st, vertex, cm.CacheMap, job) if err != nil { notifyError(ctx, st, false, err) - return nil, nil, err + return nil, "", err } - cacheKey, err := s.cacheKeyManager.cacheKey(ctx, d.String()) + cacheKey, err := s.cacheKeyManager.cacheKey(ctx, d) if err != nil { - return nil, nil, err + return nil, "", err } - v, ok, err := s.resultCache.get(ctx, cacheKey) + v, ok, err := s.resultSource.Load(ctx, cacheKey) if err != nil { - return nil, nil, err - } - - expCacheKeys := []ExportableCacheKey{ - {Exporter: &simpleExporter{cacheKey: cacheKey}}, + return nil, "", err } if ok && v != nil { notifyError(ctx, st, true, nil) - return v, expCacheKeys, nil + return v, cacheKey, nil } results, _, err := st.op.Exec(ctx, inputs) if err != nil { - return nil, nil, err + return nil, "", err } // Ensure all results are finalized (committed to cache). It may be better @@ -120,18 +138,18 @@ func (s *simpleSolver) buildOne(ctx context.Context, d digest.Digest, vertex Ver for _, res := range results { err = s.commitRefFunc(ctx, res) if err != nil { - return nil, nil, err + return nil, "", err } } res := results[int(e.Index)] - err = s.resultCache.set(ctx, cacheKey, res) + err = s.refIDStore.Set(ctx, cacheKey, res.ID()) if err != nil { - return nil, nil, err + return nil, "", err } - return res, expCacheKeys, nil + return res, cacheKey, nil } func notifyError(ctx context.Context, st *state, cached bool, err error) { @@ -222,9 +240,9 @@ func (s *simpleSolver) preprocessInputs(ctx context.Context, st *state, vertex V // This struct is used to reconstruct a cache key from an LLB digest & all // parents using consistent digests that depend on the full dependency chain. scm := simpleCacheMap{ - digest: cm.Digest.String(), + digest: cm.Digest, deps: make([]cacheMapDep, len(cm.Deps)), - inputs: make([]string, len(cm.Deps)), + inputs: make([]digest.Digest, len(cm.Deps)), } // By default we generate a cache key that's not salted as the keys need to @@ -239,22 +257,22 @@ func (s *simpleSolver) preprocessInputs(ctx context.Context, st *state, vertex V for i, in := range vertex.Inputs() { - digest := in.Vertex.Digest().String() + d := in.Vertex.Digest() // Compute a cache key given the LLB digest value. - cacheKey, err := s.cacheKeyManager.cacheKey(ctx, digest) + cacheKey, err := s.cacheKeyManager.cacheKey(ctx, d) if err != nil { return nil, err } // Lookup the result for that cache key. - res, ok, err := s.resultCache.get(ctx, cacheKey) + res, ok, err := s.resultSource.Load(ctx, cacheKey) if err != nil { return nil, err } if !ok { - return nil, errors.Errorf("result not found for digest: %s", digest) + return nil, errors.Errorf("result not found for digest: %s", d) } dep := cm.Deps[i] @@ -269,7 +287,7 @@ func (s *simpleSolver) preprocessInputs(ctx context.Context, st *state, vertex V // Add selectors (usually file references) to the struct. scm.deps[i] = cacheMapDep{ - selector: dep.Selector.String(), + selector: dep.Selector, } // ComputeDigestFunc will usually checksum files. This is then used as @@ -281,66 +299,66 @@ func (s *simpleSolver) preprocessInputs(ctx context.Context, st *state, vertex V bklog.G(ctx).Warnf("failed to compute digest: %v", err) return nil, err } else { - scm.deps[i].computed = compDigest.String() + scm.deps[i].computed = compDigest } } // Add input references to the struct as to link dependencies. - scm.inputs[i] = in.Vertex.Digest().String() + scm.inputs[i] = in.Vertex.Digest() // Add the cached result to the input set. These inputs are used to // reconstruct dependencies (mounts, etc.) for a new container run. inputs = append(inputs, res) } - s.cacheKeyManager.add(vertex.Digest().String(), &scm) + s.cacheKeyManager.add(vertex.Digest(), &scm) return inputs, nil } type cacheKeyManager struct { - cacheMaps map[string]*simpleCacheMap + cacheMaps map[digest.Digest]*simpleCacheMap mu sync.Mutex } type cacheMapDep struct { - selector string - computed string + selector digest.Digest + computed digest.Digest } type simpleCacheMap struct { - digest string - inputs []string + digest digest.Digest + inputs []digest.Digest deps []cacheMapDep salt string } func newCacheKeyManager() *cacheKeyManager { return &cacheKeyManager{ - cacheMaps: map[string]*simpleCacheMap{}, + cacheMaps: map[digest.Digest]*simpleCacheMap{}, } } -func (m *cacheKeyManager) add(key string, s *simpleCacheMap) { +func (m *cacheKeyManager) add(d digest.Digest, s *simpleCacheMap) { m.mu.Lock() - m.cacheMaps[key] = s + m.cacheMaps[d] = s m.mu.Unlock() } // cacheKey recursively generates a cache key based on a sequence of ancestor // operations & their cacheable values. -func (m *cacheKeyManager) cacheKey(ctx context.Context, digest string) (string, error) { +func (m *cacheKeyManager) cacheKey(ctx context.Context, d digest.Digest) (digest.Digest, error) { h := sha256.New() - err := m.cacheKeyRecurse(ctx, digest, h) + err := m.cacheKeyRecurse(ctx, d, h) if err != nil { return "", err } - return fmt.Sprintf("%x", h.Sum(nil)), nil + return newDigest(fmt.Sprintf("%x", h.Sum(nil))), nil } -func (m *cacheKeyManager) cacheKeyRecurse(ctx context.Context, d string, h hash.Hash) error { +func (m *cacheKeyManager) cacheKeyRecurse(ctx context.Context, d digest.Digest, h hash.Hash) error { m.mu.Lock() c, ok := m.cacheMaps[d] m.mu.Unlock() @@ -359,13 +377,13 @@ func (m *cacheKeyManager) cacheKeyRecurse(ctx context.Context, d string, h hash. } } - io.WriteString(h, c.digest) + io.WriteString(h, c.digest.String()) for _, dep := range c.deps { if dep.selector != "" { - io.WriteString(h, dep.selector) + io.WriteString(h, dep.selector.String()) } if dep.computed != "" { - io.WriteString(h, dep.computed) + io.WriteString(h, dep.computed.String()) } } @@ -374,15 +392,15 @@ func (m *cacheKeyManager) cacheKeyRecurse(ctx context.Context, d string, h hash. type parallelGuard struct { wait time.Duration - active map[string]struct{} + active map[digest.Digest]struct{} mu sync.Mutex } func newParallelGuard(wait time.Duration) *parallelGuard { - return ¶llelGuard{wait: wait, active: map[string]struct{}{}} + return ¶llelGuard{wait: wait, active: map[digest.Digest]struct{}{}} } -func (f *parallelGuard) acquire(ctx context.Context, d string) (<-chan struct{}, func()) { +func (f *parallelGuard) acquire(ctx context.Context, d digest.Digest) (<-chan struct{}, func()) { ch := make(chan struct{}) @@ -426,62 +444,30 @@ func (f *parallelGuard) acquire(ctx context.Context, d string) (<-chan struct{}, return ch, closer } -type resultCache interface { - set(ctx context.Context, key string, r Result) error - get(ctx context.Context, key string) (Result, bool, error) -} - -type inMemCache struct { - cache map[string]Result - mu sync.Mutex -} - -func newInMemCache() *inMemCache { - return &inMemCache{cache: map[string]Result{}} +// RefIDStore uses a BoltDB database to store links from computed cache keys to +// worker ref IDs. +type RefIDStore struct { + db *bolt.DB + bucketName string + rootDir string } -func (c *inMemCache) set(ctx context.Context, key string, r Result) error { - c.mu.Lock() - c.cache[key] = r - c.mu.Unlock() - return nil -} - -func (c *inMemCache) get(ctx context.Context, key string) (Result, bool, error) { - c.mu.Lock() - r, ok := c.cache[key] - c.mu.Unlock() - return r, ok, nil -} - -var _ resultCache = &inMemCache{} - -type diskCache struct { - resultGetter workerResultGetter - db *bolt.DB - bucketName string - rootDir string -} - -type workerResultGetter interface { - Get(ctx context.Context, id string) (Result, error) -} - -func newDiskCache(resultGetter workerResultGetter, rootDir string) (*diskCache, error) { - c := &diskCache{ - bucketName: "ids", - resultGetter: resultGetter, - rootDir: rootDir, +// NewRefIDStore creates and returns a new store and initializes a BoltDB +// instance in the specified root directory. +func NewRefIDStore(rootDir string) (*RefIDStore, error) { + r := &RefIDStore{ + bucketName: "ids", + rootDir: rootDir, } - err := c.init() + err := r.init() if err != nil { return nil, err } - return c, nil + return r, nil } -func (c *diskCache) init() error { - db, err := bolt.Open(filepath.Join(c.rootDir, "ids.db"), 0755, nil) +func (r *RefIDStore) init() error { + db, err := bolt.Open(filepath.Join(r.rootDir, "ids.db"), 0755, nil) if err != nil { return err } @@ -492,56 +478,42 @@ func (c *diskCache) init() error { if err != nil { return err } - c.db = db + r.db = db return nil } -func (c *diskCache) set(ctx context.Context, key string, r Result) error { - return c.db.Update(func(tx *bolt.Tx) error { - b := tx.Bucket([]byte(c.bucketName)) - return b.Put([]byte(key), []byte(r.ID())) +// Set a cache key digest to the value of the worker ref ID. +func (r *RefIDStore) Set(ctx context.Context, key digest.Digest, id string) error { + return r.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(r.bucketName)) + return b.Put([]byte(key), []byte(id)) }) } -func (c *diskCache) get(ctx context.Context, key string) (Result, bool, error) { +// Get a worker ref ID given a cache key digest. +func (r *RefIDStore) Get(ctx context.Context, cacheKey digest.Digest) (string, bool, error) { var id string - err := c.db.View(func(tx *bolt.Tx) error { - b := tx.Bucket([]byte(c.bucketName)) - id = string(b.Get([]byte(key))) + err := r.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(r.bucketName)) + id = string(b.Get([]byte(cacheKey))) return nil }) if err != nil { - return nil, false, err + return "", false, err } if id == "" { - return nil, false, nil + return "", false, nil } - res, err := c.resultGetter.Get(ctx, id) - if err != nil { - if errors.Is(err, ErrRefNotFound) { - if err := c.delete(ctx, key); err != nil { - bklog.G(ctx).Warnf("failed to delete cache key: %v", err) - } - return nil, false, nil - } - return nil, false, err - } - return res, true, nil + return id, true, nil } -func (c *diskCache) delete(_ context.Context, key string) error { - return c.db.Update(func(tx *bolt.Tx) error { - b := tx.Bucket([]byte(c.bucketName)) +func (r *RefIDStore) delete(_ context.Context, key string) error { + return r.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(r.bucketName)) return b.Delete([]byte(key)) }) } -var _ resultCache = &diskCache{} - -type simpleExporter struct { - cacheKey string -} - -func (s *simpleExporter) ExportTo(ctx context.Context, t CacheExporterTarget, opt CacheExportOpt) ([]CacheExporterRecord, error) { - return nil, nil +func newDigest(s string) digest.Digest { + return digest.NewDigestFromEncoded(digest.SHA256, s) } diff --git a/worker/simple.go b/worker/simple.go index dec48e1d3..a5a4a4344 100644 --- a/worker/simple.go +++ b/worker/simple.go @@ -2,47 +2,66 @@ package worker import ( "context" + "sync" "github.com/moby/buildkit/cache" "github.com/moby/buildkit/solver" "github.com/moby/buildkit/util/bklog" + digest "github.com/opencontainers/go-digest" ) -// WorkerResultGetter abstracts the work involved in loading a Result from a +// RefIDSource allows the caller to translate between a cache key and a worker ref ID. +type RefIDSource interface { + Get(ctx context.Context, cacheKey digest.Digest) (string, bool, error) +} + +// WorkerResultSource abstracts the work involved in loading a Result from a // worker using a ref ID. -type WorkerResultGetter struct { - wc *Controller +type WorkerResultSource struct { + wc *Controller + ids RefIDSource } -// NewWorkerResultGetter creates and returns a new *WorkerResultGetter. -func NewWorkerResultGetter(wc *Controller) *WorkerResultGetter { - return &WorkerResultGetter{wc: wc} +// NewWorkerResultSource creates and returns a new *WorkerResultSource. +func NewWorkerResultSource(wc *Controller, ids RefIDSource) *WorkerResultSource { + return &WorkerResultSource{wc: wc, ids: ids} } -// Get a cached results from a worker. -func (w *WorkerResultGetter) Get(ctx context.Context, id string) (solver.Result, error) { +// Load a cached result from a worker. +func (w *WorkerResultSource) Load(ctx context.Context, cacheKey digest.Digest) (solver.Result, bool, error) { + id, ok, err := w.ids.Get(ctx, cacheKey) + if err != nil { + return nil, false, err + } + + if !ok { + return nil, false, nil + } + workerID, refID, err := parseWorkerRef(id) if err != nil { - return nil, err + return nil, false, err } worker, err := w.wc.Get(workerID) if err != nil { - return nil, err + return nil, false, err } ref, err := worker.LoadRef(ctx, refID, false) if err != nil { if cache.IsNotFound(err) { bklog.G(ctx).Warnf("could not load ref from worker: %v", err) - return nil, solver.ErrRefNotFound + return nil, false, nil } - return nil, err + return nil, false, err } - return NewWorkerRefResult(ref, worker), nil + return NewWorkerRefResult(ref, worker), true, nil } +var _ solver.ResultSource = &WorkerResultSource{} + // FinalizeRef is a convenience function that calls Finalize on a Result's // ImmutableRef. The 'worker' package cannot be imported by 'solver' due to an // import cycle, so this function is passed in with solver.SolverOpt. @@ -56,3 +75,71 @@ func FinalizeRef(ctx context.Context, res solver.Result) error { } return nil } + +// WorkerRemoteSource can be used to fetch a remote worker source. +type WorkerRemoteSource struct { + worker Worker + remotes map[digest.Digest]*solver.Remote + mu sync.Mutex +} + +// NewWorkerRemoteSource creates and returns a remote result source. +func NewWorkerRemoteSource(worker Worker) *WorkerRemoteSource { + return &WorkerRemoteSource{ + worker: worker, + remotes: map[digest.Digest]*solver.Remote{}, + } +} + +// Load a Result from the worker. +func (w *WorkerRemoteSource) Load(ctx context.Context, cacheKey digest.Digest) (solver.Result, bool, error) { + w.mu.Lock() + remote, ok := w.remotes[cacheKey] + w.mu.Unlock() + + if !ok { + return nil, false, nil + } + + ref, err := w.worker.FromRemote(ctx, remote) + if err != nil { + return nil, false, err + } + + return NewWorkerRefResult(ref, w.worker), true, nil +} + +// AddResult adds a solver.Remote source for the given cache key. +func (w *WorkerRemoteSource) AddResult(ctx context.Context, cacheKey digest.Digest, remote *solver.Remote) { + w.mu.Lock() + defer w.mu.Unlock() + w.remotes[cacheKey] = remote +} + +var _ solver.ResultSource = &WorkerRemoteSource{} + +// CombinedResultSource implements solver.ResultSource over a list of sources. +type CombinedResultSource struct { + sources []solver.ResultSource +} + +// NewCombinedResultSource creates and returns a new source from a list of sources. +func NewCombinedResultSource(sources ...solver.ResultSource) *CombinedResultSource { + return &CombinedResultSource{sources: sources} +} + +// Load attempts to load a Result from all underlying sources. +func (c *CombinedResultSource) Load(ctx context.Context, cacheKey digest.Digest) (solver.Result, bool, error) { + for _, source := range c.sources { + res, ok, err := source.Load(ctx, cacheKey) + if err != nil { + return nil, false, err + } + if ok { + return res, true, nil + } + } + return nil, false, nil +} + +var _ solver.ResultSource = &CombinedResultSource{}