Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Auto import images for embedded registry #10973

Draft
wants to merge 12 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion .github/workflows/e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:
strategy:
fail-fast: false
matrix:
etest: [startup, s3, btrfs, externalip, privateregistry, embeddedmirror, wasm]
etest: [autoimport, startup, s3, btrfs, externalip, privateregistry, embeddedmirror, wasm]
max-parallel: 3
steps:
- name: "Checkout"
Expand Down
54 changes: 54 additions & 0 deletions docs/adrs/add-auto-import-embedded-registry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Easy way for auto adding images to k3s

Date: 2024-10-2

## Status

Proposed

## Context

Since the feature for embedded registry, the users appeared with a question about having to manually import images, specially in edge environments.

As a result, there is a need for a folder who can handle this action, where every image there will be watched by a controller for changes or new images, this new images or new changes will be added to the containerd image store.

The controller will watch the agent/images folder that is the default folder for the images, as the first iteration about the controller he will mainly work with the default image folder, but in the future we can set to watch more folders.

The main idea for the controller is to create a map for the file infos maintaining the state for the files, with that we can see if a file was modified and if the size changed.

### Map to handle the state from the files

This map will have the entire filepath of the file in the `key` value, since we can get the value from the key with only the `event.Name`

```go
map[string]fs.FileInfo
```

### Why use fsnotify

With this library we can easily use for any linux distros without the need to port for a specify distro and can also run in windows.

The main idea for the watch will be taking care of the last time that was modified the image file.

fsnotify has a great toolset for handling changes in files, since the code will have a channel to receive events such as CREATE, RENAME, REMOVE and WRITE.

### How the controller will work with the events

When the controller receive a event saying that a file was created, he will add to the map and import the images if the event that he has received is not a directory and then import the image.

When the controller receive a event saying that a file was writen, he will verify if the file has the size changed and if the file has the time modified based on the time and size from the state.

When the controller receive a event saying that a file was renamed, or removed, he will delete this file from the state. when a file is renamed, it is created a new file with the same infos but with a the new name, so the watcher will sent for the controller a event saying that a file was created.

## Decision

- Not decided yet

## Consequences

Good:
- Better use of embedded containerd image store.
- Fsnotify it's a indirect dependency that upstream uses

Bad:
- The need for another dependency
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ require (
github.com/docker/docker v27.1.1+incompatible
github.com/erikdubbelboer/gspt v0.0.0-20190125194910-e68493906b83
github.com/flannel-io/flannel v0.25.6
github.com/fsnotify/fsnotify v1.7.0
github.com/go-bindata/go-bindata v3.1.2+incompatible
github.com/go-logr/logr v1.4.2
github.com/go-logr/stdr v1.2.3-0.20220714215716-96bad1d688c5
Expand Down Expand Up @@ -248,7 +249,6 @@ require (
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/flynn/noise v1.1.0 // indirect
github.com/francoispqt/gojay v1.2.13 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/ghodss/yaml v1.0.0 // indirect
github.com/go-errors/errors v1.4.2 // indirect
github.com/go-jose/go-jose/v4 v4.0.2 // indirect
Expand Down
15 changes: 12 additions & 3 deletions pkg/agent/containerd/containerd.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,14 @@ func Run(ctx context.Context, cfg *config.Node) error {
// any .txt files are processed as a list of images that should be pre-pulled from remote registries.
// If configured, imported images are retagged as being pulled from additional registries.
func PreloadImages(ctx context.Context, cfg *config.Node) error {
fileInfo, err := os.Stat(cfg.Images)
if os.IsNotExist(err) {
err := os.MkdirAll(cfg.Images, 0700)
if err != nil {
logrus.Errorf("Unable to create agent/images folder in %s: %v", cfg.Images, err)
return nil
} else if err != nil {
}

fileInfo, err := os.Stat(cfg.Images)
if err != nil {
logrus.Errorf("Unable to find images in %s: %v", cfg.Images, err)
return nil
}
Expand Down Expand Up @@ -176,6 +180,11 @@ func PreloadImages(ctx context.Context, cfg *config.Node) error {
}
logrus.Infof("Imported images from %s in %s", filePath, time.Since(start))
}

// create config and run watcher
watcher := createWatcher(ctx, cfg)
go watcher.run(ctx, cfg)

return nil
}

Expand Down
208 changes: 208 additions & 0 deletions pkg/agent/containerd/watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
package containerd

import (
"context"
"io/fs"
"os"
"path/filepath"

"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/pkg/cri/constants"
"github.com/fsnotify/fsnotify"
"github.com/k3s-io/k3s/pkg/agent/cri"
"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/sirupsen/logrus"
"k8s.io/client-go/util/workqueue"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
)

type Watcher struct {
nodeConfig *config.Node
imagesMap map[string]fs.FileInfo
workqueue workqueue.TypedRateLimitingInterface[fsnotify.Event]
}

func createWatcher(ctx context.Context, cfg *config.Node) *Watcher {
return &Watcher{
nodeConfig: cfg,
imagesMap: make(map[string]fs.FileInfo),
workqueue: workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[fsnotify.Event]()),
}
}

func (w *Watcher) runWorker() {
for w.processNextEvent() {
}
}

func (w *Watcher) processNextEvent() bool {
_, shutdown := w.workqueue.Get()

if shutdown {
return false
}

if err := w.processEvent(event); err != nil {
logrus.Errorf("Process event: %v", err)
}

return true
}

func (w *Watcher) processEvent(event fsnotify.Event) error {
defer w.workqueue.Done(event)

switch event.Op {
case fsnotify.Write:
newStateFile, err := os.Stat(event.Name)
if err != nil {
logrus.Errorf("Error encountered while getting file %s info for event write: %s", event.Name, err.Error())
continue
}

// we do not want to handle directorys, only files
if newStateFile.IsDir() {
continue
}

lastStateFile := w.imagesMap[event.Name]
w.imagesMap[event.Name] = newStateFile

if (newStateFile.Size() != lastStateFile.Size()) && newStateFile.ModTime().After(lastStateFile.ModTime()) {
logrus.Infof("File met the requirements for import to containerd image store: %s", event.Name)
w.workqueue.Add(event)
// start := time.Now()
// if err := preloadFile(ctx, cfg, client, imageClient, event.Name); err != nil {
// logrus.Errorf("Error encountered while importing %s: %v", event.Name, err)
// continue
// }
// logrus.Infof("Imported images from %s in %s", event.Name, time.Since(start))
}
case fsnotify.Create:
info, err := os.Stat(event.Name)
if err != nil {
logrus.Errorf("Error encountered while getting file %s info for event Create: %v", event.Name, err)
continue
}

if info.IsDir() {
continue
}

w.imagesMap[event.Name] = info
logrus.Infof("File added to watcher controller: %s", event.Name)

w.workqueue.Add(event)

// start := time.Now()
// if err := preloadFile(ctx, cfg, client, imageClient, event.Name); err != nil {
// logrus.Errorf("Error encountered while importing %s: %v", event.Name, err)
// continue
// }
// logrus.Infof("Imported images from %s in %s", event.Name, time.Since(start))
case fsnotify.Rename:
delete(w.imagesMap, event.Name)
logrus.Infof("Removed file from the watcher controller: %s", event.Name)
case fsnotify.Remove:
w.workqueue.Add(event)
delete(w.imagesMap, event.Name)
logrus.Infof("Removed file from the watcher controller: %s", event.Name)
}
}
// if key, ok = obj.(string); !ok {
// logrus.Errorf("expected string in workqueue but got %#v", obj)
// w.workqueue.Forget(event)
// return nil
// }
// keyParts := strings.SplitN(key, "/", 2)
// if err := k.updateStatus(keyParts[0], keyParts[1]); err != nil {
// w.workqueue.AddRateLimited(event)
// return fmt.Errorf("error updating LoadBalancer Status for %s: %v, requeueing", key, err)
// }

c.workqueue.Forget(obj)
return nil

}

func (w *Watcher) handleCreateImages(event fsnotify.Event) {

}

// watcher is a controller that watch the agent/images folder
// to ensure that every new file is added to the watcher state
func (w *Watcher) run(ctx context.Context, cfg *config.Node) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
logrus.Errorf("Error to create a watcher: %s", err.Error())
return
}

// Add agent/images path to the watcher.
err = watcher.Add(w.nodeConfig.Images)
if err != nil {
logrus.Errorf("Error when creating the watcher controller: %v", err)
return
}
//var ImagesWorkqueue workqueue.TypedRateLimitingInterface[fsnotify.Event]

client, err := Client(cfg.Containerd.Address)
if err != nil {
logrus.Errorf("Error to create containerd client: %v", err)
return
}

criConn, err := cri.Connection(ctx, cfg.Containerd.Address)
if err != nil {
logrus.Errorf("Error to create CRI connection: %v", err)
return
}

_ = runtimeapi.NewImageServiceClient(criConn)

defer watcher.Close()
defer client.Close()
defer criConn.Close()

fileInfos, err := os.ReadDir(w.nodeConfig.Images)
if err != nil {
logrus.Errorf("Unable to read images in %s: %v", w.nodeConfig.Images, err)
return
}

// Ensure that our images are imported into the correct namespace
ctx = namespaces.WithNamespace(ctx, constants.K8sContainerdNamespace)

// populate watcher map with the entrys from the directory
for _, dirEntry := range fileInfos {
if dirEntry.IsDir() {
continue
}

// get the file info to add to the state map
fileInfo, err := dirEntry.Info()
if err != nil {
logrus.Errorf("Error while getting the info from file: %v", err)
continue
}

// insert the file into the state map that will have the state from the file
w.imagesMap[filepath.Join(w.nodeConfig.Images, dirEntry.Name())] = fileInfo
}

for {
select {
case event, ok := <-watcher.Events:
if !ok {
return
}

w.workqueue.Add(event)
case err, ok := <-watcher.Errors:
if !ok {
return
}
logrus.Errorf("error in watcher controller: %v", err)
}
}
}
Loading
Loading