Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] Switch to using kubelet config file for all supported flags #10433

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@ func main() {
}

if err := app.Run(configfilearg.MustParse(os.Args)); err != nil && !errors.Is(err, context.Canceled) {
logrus.Fatal(err)
logrus.Fatalf("Error: %v", err)
}
}
2 changes: 1 addition & 1 deletion cmd/k3s/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func main() {
}

if err := app.Run(os.Args); err != nil && !errors.Is(err, context.Canceled) {
logrus.Fatal(err)
logrus.Fatalf("Error: %v", err)
}
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,6 @@ func main() {
}

if err := app.Run(configfilearg.MustParse(os.Args)); err != nil && !errors.Is(err, context.Canceled) {
logrus.Fatal(err)
logrus.Fatalf("Error: %v", err)
}
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ require (
k8s.io/klog/v2 v2.130.1
k8s.io/kube-proxy v0.0.0
k8s.io/kubectl v0.31.1-rc.1
k8s.io/kubelet v0.31.1
k8s.io/kubernetes v1.31.1
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8
sigs.k8s.io/cri-tools v0.0.0-00010101000000-000000000000
Expand Down Expand Up @@ -477,7 +478,6 @@ require (
k8s.io/kube-controller-manager v0.0.0 // indirect
k8s.io/kube-openapi v0.0.0-20240730131305-7a9a4e85957e // indirect
k8s.io/kube-scheduler v0.0.0 // indirect
k8s.io/kubelet v0.31.1 // indirect
k8s.io/metrics v0.0.0 // indirect
k8s.io/mount-utils v0.31.1 // indirect
k8s.io/pod-security-admission v0.0.0 // indirect
Expand Down
7 changes: 7 additions & 0 deletions pkg/agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,12 @@ func get(ctx context.Context, envInfo *cmds.Agent, proxy proxy.Proxy) (*config.N
return nil, err
}

// Ensure kubelet config dir exists
kubeletConfigDir := filepath.Join(envInfo.DataDir, "agent", "etc", "kubelet.conf.d")
if err := os.MkdirAll(kubeletConfigDir, 0700); err != nil {
return nil, err
}

nodeConfig := &config.Node{
Docker: envInfo.Docker,
SELinux: envInfo.EnableSELinux,
Expand Down Expand Up @@ -562,6 +568,7 @@ func get(ctx context.Context, envInfo *cmds.Agent, proxy proxy.Proxy) (*config.N
nodeConfig.AgentConfig.ClusterDomain = controlConfig.ClusterDomain
nodeConfig.AgentConfig.ResolvConf = locateOrGenerateResolvConf(envInfo)
nodeConfig.AgentConfig.ClientCA = clientCAFile
nodeConfig.AgentConfig.KubeletConfigDir = kubeletConfigDir
nodeConfig.AgentConfig.KubeConfigKubelet = kubeconfigKubelet
nodeConfig.AgentConfig.KubeConfigKubeProxy = kubeconfigKubeproxy
nodeConfig.AgentConfig.KubeConfigK3sController = kubeconfigK3sController
Expand Down
201 changes: 198 additions & 3 deletions pkg/daemons/agent/agent.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,36 @@
package agent

import (
"bytes"
"context"
"fmt"
"math/rand"
"net"
"os"
"path/filepath"
"strings"
"time"

"github.com/k3s-io/k3s/pkg/agent/config"
"github.com/k3s-io/k3s/pkg/agent/proxy"
"github.com/k3s-io/k3s/pkg/agent/util"
daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/daemons/executor"
"github.com/k3s-io/k3s/pkg/version"
"github.com/otiai10/copy"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/component-base/logs"
logsapi "k8s.io/component-base/logs/api/v1"
logsv1 "k8s.io/component-base/logs/api/v1"
_ "k8s.io/component-base/metrics/prometheus/restclient" // for client metric registration
_ "k8s.io/component-base/metrics/prometheus/version" // for version metric registration
kubeletconfig "k8s.io/kubelet/config/v1beta1"
"k8s.io/kubernetes/pkg/util/taints"
utilsnet "k8s.io/utils/net"
utilsptr "k8s.io/utils/ptr"
"sigs.k8s.io/yaml"
)

func Agent(ctx context.Context, nodeConfig *daemonconfig.Node, proxy proxy.Proxy) error {
Expand All @@ -24,7 +40,7 @@ func Agent(ctx context.Context, nodeConfig *daemonconfig.Node, proxy proxy.Proxy
defer logs.FlushLogs()

if err := startKubelet(ctx, &nodeConfig.AgentConfig); err != nil {
return err
return errors.Wrap(err, "failed to start kubelet")
}

go func() {
Expand All @@ -46,9 +62,21 @@ func startKubeProxy(ctx context.Context, cfg *daemonconfig.Agent) error {
}

func startKubelet(ctx context.Context, cfg *daemonconfig.Agent) error {
argsMap := kubeletArgs(cfg)
argsMap, defaultConfig, err := kubeletArgsAndConfig(cfg)
if err != nil {
return errors.Wrap(err, "prepare default configuration drop-in")
}

extraArgs, err := extractConfigArgs(cfg.KubeletConfigDir, cfg.ExtraKubeletArgs, defaultConfig)
if err != nil {
return errors.Wrap(err, "prepare user configuration drop-ins")
}

if err := writeKubeletConfig(cfg.KubeletConfigDir, defaultConfig); err != nil {
return errors.Wrap(err, "generate default kubelet configuration drop-in")
}

args := daemonconfig.GetArgs(argsMap, cfg.ExtraKubeletArgs)
args := daemonconfig.GetArgs(argsMap, extraArgs)
logrus.Infof("Running kubelet %s", daemonconfig.ArgString(args))

return executor.Kubelet(ctx, args)
Expand All @@ -67,3 +95,170 @@ func ImageCredProvAvailable(cfg *daemonconfig.Agent) bool {
}
return true
}

// extractConfigArgs strips out any --config or --config-dir flags from the
// provided args list, and if set, copies the content of the file or dir into
// the target drop-in directory.
func extractConfigArgs(path string, extraArgs []string, config *kubeletconfig.KubeletConfiguration) ([]string, error) {
args := make([]string, 0, len(extraArgs))
strippedArgs := map[string]string{}
var skipVal bool
for i := range extraArgs {
if skipVal {
skipVal = false
continue
}

var val string
key := strings.TrimPrefix(extraArgs[i], "--")
if k, v, ok := strings.Cut(key, "="); ok {
// key=val pair
key = k
val = v
} else if len(extraArgs) > i+1 {
// key in this arg, value in next arg
val = extraArgs[i+1]
skipVal = true
}

switch key {
case "config", "config-dir":
if val == "" {
return nil, fmt.Errorf("value required for kubelet-arg --%s", key)
}
strippedArgs[key] = val
default:
args = append(args, extraArgs[i])
}
}

// copy the config file into our managed config dir, unless its already in there
if strippedArgs["config"] != "" && !strings.HasPrefix(strippedArgs["config"], path) {
if err := util.CopyFile(strippedArgs["config"], filepath.Join(path, "10-cli-config.conf"), false); err != nil {
return nil, errors.Wrap(err, "copy config into managed drop-in dir")
}
}
// copy the config-dir into our managed config dir, unless its already in there
if strippedArgs["config-dir"] != "" && !strings.HasPrefix(strippedArgs["config-dir"], path) {
if err := copy.Copy(strippedArgs["config"], filepath.Join(path, "20-cli-config-dir"), copy.Options{PreserveOwner: true}); err != nil {
return nil, errors.Wrap(err, "copy config-dir into managed drop-in dir")
}
}
return args, nil
}

// writeKubeletConfig marshals the provided KubeletConfiguration object into a
// drop-in config file in the target drop-in directory.
func writeKubeletConfig(path string, config *kubeletconfig.KubeletConfiguration) error {
b, err := yaml.Marshal(config)
if err != nil {
return err
}

// replace resolvConf with resolverConfig until Kubernetes 1.32
// ref: https://github.com/kubernetes/kubernetes/pull/127421
b = bytes.ReplaceAll(b, []byte("resolvConf: "), []byte("resolverConfig: "))
return os.WriteFile(filepath.Join(path, "00-"+version.Program+"-defaults.conf"), b, 0600)
}

func defaultKubeletConfig(cfg *daemonconfig.Agent) (*kubeletconfig.KubeletConfiguration, error) {
bindAddress := "127.0.0.1"
isIPv6 := utilsnet.IsIPv6(net.ParseIP([]string{cfg.NodeIP}[0]))
if isIPv6 {
bindAddress = "::1"
}

defaultConfig := &kubeletconfig.KubeletConfiguration{
TypeMeta: metav1.TypeMeta{
APIVersion: "kubelet.config.k8s.io/v1beta1",
Kind: "KubeletConfiguration",
},
CPUManagerReconcilePeriod: metav1.Duration{Duration: time.Second * 10},
CgroupDriver: "cgroupfs",
ClusterDomain: cfg.ClusterDomain,
EvictionPressureTransitionPeriod: metav1.Duration{Duration: time.Minute * 5},
FailSwapOn: utilsptr.To(false),
FileCheckFrequency: metav1.Duration{Duration: time.Second * 20},
HTTPCheckFrequency: metav1.Duration{Duration: time.Second * 20},
HealthzBindAddress: bindAddress,
ImageMinimumGCAge: metav1.Duration{Duration: time.Minute * 2},
NodeStatusReportFrequency: metav1.Duration{Duration: time.Minute * 5},
NodeStatusUpdateFrequency: metav1.Duration{Duration: time.Second * 10},
ProtectKernelDefaults: cfg.ProtectKernelDefaults,
ReadOnlyPort: 0,
RuntimeRequestTimeout: metav1.Duration{Duration: time.Minute * 2},
StreamingConnectionIdleTimeout: metav1.Duration{Duration: time.Hour * 4},
SyncFrequency: metav1.Duration{Duration: time.Minute},
VolumeStatsAggPeriod: metav1.Duration{Duration: time.Minute},
EvictionHard: map[string]string{
"imagefs.available": "5%",
"nodefs.available": "5%",
},
EvictionMinimumReclaim: map[string]string{
"imagefs.available": "10%",
"nodefs.available": "10%",
},
Authentication: kubeletconfig.KubeletAuthentication{
Anonymous: kubeletconfig.KubeletAnonymousAuthentication{
Enabled: utilsptr.To(false),
},
Webhook: kubeletconfig.KubeletWebhookAuthentication{
Enabled: utilsptr.To(true),
CacheTTL: metav1.Duration{Duration: time.Minute * 2},
},
},
Authorization: kubeletconfig.KubeletAuthorization{
Mode: kubeletconfig.KubeletAuthorizationModeWebhook,
Webhook: kubeletconfig.KubeletWebhookAuthorization{
CacheAuthorizedTTL: metav1.Duration{Duration: time.Minute * 5},
CacheUnauthorizedTTL: metav1.Duration{Duration: time.Second * 30},
},
},
Logging: logsv1.LoggingConfiguration{
Format: "text",
Verbosity: logsv1.VerbosityLevel(cfg.VLevel),
FlushFrequency: logsv1.TimeOrMetaDuration{
Duration: metav1.Duration{Duration: time.Second * 5},
SerializeAsString: true,
},
},
}

if cfg.ListenAddress != "" {
defaultConfig.Address = cfg.ListenAddress
}

if cfg.ClientCA != "" {
defaultConfig.Authentication.X509.ClientCAFile = cfg.ClientCA
}

if cfg.ServingKubeletCert != "" && cfg.ServingKubeletKey != "" {
defaultConfig.TLSCertFile = cfg.ServingKubeletCert
defaultConfig.TLSPrivateKeyFile = cfg.ServingKubeletKey
}

for _, addr := range cfg.ClusterDNSs {
defaultConfig.ClusterDNS = append(defaultConfig.ClusterDNS, addr.String())
}

if cfg.ResolvConf != "" {
defaultConfig.ResolverConfig = utilsptr.To(cfg.ResolvConf)
}

if cfg.PodManifests != "" && defaultConfig.StaticPodPath == "" {
defaultConfig.StaticPodPath = cfg.PodManifests
}
if err := os.MkdirAll(defaultConfig.StaticPodPath, 0750); err != nil {
return nil, errors.Wrapf(err, "failed to create static pod manifest dir %s", defaultConfig.StaticPodPath)
}

if t, _, err := taints.ParseTaints(cfg.NodeTaints); err != nil {
return nil, errors.Wrap(err, "failed to parse node taints")
} else {
defaultConfig.RegisterWithTaints = t
}

logsv1.VModuleConfigurationPflag(&defaultConfig.Logging.VModule).Set(cfg.VModule)

return defaultConfig, nil
}
Loading
Loading