diff --git a/simulator/go.mod b/simulator/go.mod index bd62f8e7..06c4fe4b 100644 --- a/simulator/go.mod +++ b/simulator/go.mod @@ -43,6 +43,7 @@ require ( github.com/spf13/cobra v1.7.0 github.com/stretchr/testify v1.9.0 go.etcd.io/etcd/client/v3 v3.5.10 + golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e golang.org/x/sync v0.8.0 golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 gopkg.in/yaml.v2 v2.4.0 @@ -56,6 +57,7 @@ require ( k8s.io/kube-scheduler v0.30.4 k8s.io/kubernetes v1.30.4 k8s.io/utils v0.0.0-20230726121419-3b25d923346b + sigs.k8s.io/kube-scheduler-wasm-extension/scheduler v0.0.0-20241111155045-2ce7072312c8 ) require ( @@ -121,6 +123,7 @@ require ( github.com/prometheus/procfs v0.10.1 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/stoewer/go-strcase v1.2.0 // indirect + github.com/tetratelabs/wazero v1.7.2 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasttemplate v1.2.1 // indirect go.etcd.io/etcd/api/v3 v3.5.10 // indirect @@ -138,7 +141,6 @@ require ( go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.26.0 // indirect golang.org/x/crypto v0.26.0 // indirect - golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e // indirect golang.org/x/mod v0.17.0 // indirect golang.org/x/net v0.28.0 // indirect golang.org/x/oauth2 v0.22.0 // indirect diff --git a/simulator/go.sum b/simulator/go.sum index 9dcb15bb..c7c086fa 100644 --- a/simulator/go.sum +++ b/simulator/go.sum @@ -220,6 +220,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/tetratelabs/wazero v1.7.2 h1:1+z5nXJNwMLPAWaTePFi49SSTL0IMx/i3Fg8Yc25GDc= +github.com/tetratelabs/wazero v1.7.2/go.mod h1:ytl6Zuh20R/eROuyDaGPkp82O9C/DJfXAwJfQ3X6/7Y= github.com/tmc/grpc-websocket-proxy v0.0.0-20220101234140-673ab2c3ae75 h1:6fotK7otjonDflCTK0BCfls4SPy3NcCVb5dqqmbRknE= github.com/tmc/grpc-websocket-proxy v0.0.0-20220101234140-673ab2c3ae75/go.mod h1:KO6IkyS8Y3j8OdNO85qEYBsRPuteD+YciPomcXdrMnk= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= @@ -430,6 +432,8 @@ k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 h1:BZqlfIlq5YbRMFko6/PM7F k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340/go.mod h1:yD4MZYeKMBwQKVht279WycxKyM84kkAx2DPrTXaeb98= k8s.io/kube-scheduler v0.30.4 h1:6H+NfNuJ4RvUdSD1dtnJBXuYGAp+ETGZaxFk4sLGPdU= k8s.io/kube-scheduler v0.30.4/go.mod h1:7MMICziKySYQQ7rOFGBsSkn1PE3urPwsxlp6uwU+NhU= +k8s.io/kubectl v0.30.4 h1:jU+6WSvcSgzs07H624RA2L79FnqppyV6BKI+dHWDzyo= +k8s.io/kubectl v0.30.4/go.mod h1:4KnGCshO4fFxd/tncWcbKH3Nj9wtoFYwMYPj8CUnduE= k8s.io/kubelet v0.30.4 h1:2TP59RVxuWuKpD58gQ6qow1Oy2Ys2uOH4hfSD/qv5EQ= k8s.io/kubelet v0.30.4/go.mod h1:v0lRl+1y2NNId5OlFiJ1rhjXc9D8Tp7PqvQYJS7W/L0= k8s.io/kubernetes v1.30.4 h1:LfWX7JNmT9Hp8uFVHsB9gQCZesjcWTQ02PHwMz6dGqk= @@ -442,6 +446,8 @@ sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.29.0 h1:/U5vjBbQn3RCh sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.29.0/go.mod h1:z7+wmGM2dfIiLRfrC6jb5kV2Mq/sK1ZP303cxzkV5Y4= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= +sigs.k8s.io/kube-scheduler-wasm-extension/scheduler v0.0.0-20241111155045-2ce7072312c8 h1:jOyiY8zbvXtuMb7ImqNcSFtVOmnXhWQ8pxhECz95PKk= +sigs.k8s.io/kube-scheduler-wasm-extension/scheduler v0.0.0-20241111155045-2ce7072312c8/go.mod h1:N0BrfQ58AMkBbmjeGupvhUv/FFErpuYYs8D1YPPyY0Q= sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+sGiqlzvrtq4= sigs.k8s.io/structured-merge-diff/v4 v4.4.1/go.mod h1:N8hJocpFajUSSeSJ9bOZ77VzejKZaXsTtZo4/u7Io08= sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo= diff --git a/simulator/pkg/debuggablescheduler/debuggable_scheduler.go b/simulator/pkg/debuggablescheduler/debuggable_scheduler.go index 2343fbee..433a007f 100644 --- a/simulator/pkg/debuggablescheduler/debuggable_scheduler.go +++ b/simulator/pkg/debuggablescheduler/debuggable_scheduler.go @@ -57,6 +57,12 @@ func NewConfigs() (Configs, error) { return Configs{}, xerrors.Errorf("load scheduler config: %w", err) } + // Register wasm plugins to the wasm registry. + // This _needs_ to happen before the scheduler configuration is converted. + if err := simulatorschedulerconfig.RegisterWasmPlugins(versionedcfg); err != nil { + return Configs{}, xerrors.Errorf("register wasm plugins: %w", err) + } + versioned, err := scheduler.ConvertConfigurationForSimulator(versionedcfg) if err != nil { return Configs{}, xerrors.Errorf("convert scheduler config to apply: %w", err) diff --git a/simulator/scheduler/config/wasm.go b/simulator/scheduler/config/wasm.go new file mode 100644 index 00000000..8ab97257 --- /dev/null +++ b/simulator/scheduler/config/wasm.go @@ -0,0 +1,58 @@ +package config + +import ( + "golang.org/x/xerrors" + "k8s.io/apimachinery/pkg/util/sets" + configv1 "k8s.io/kube-scheduler/config/v1" + "k8s.io/kubernetes/pkg/scheduler/apis/config" + "k8s.io/kubernetes/pkg/scheduler/apis/config/scheme" + "k8s.io/kubernetes/pkg/scheduler/framework/runtime" + wasm "sigs.k8s.io/kube-scheduler-wasm-extension/scheduler/plugin" +) + +// RegisterWasmPlugins registers wasm plugins from the given configuration. +func RegisterWasmPlugins(versionedCfg *configv1.KubeSchedulerConfiguration) error { + cfg := config.KubeSchedulerConfiguration{} + if err := scheme.Scheme.Convert(versionedCfg, &cfg, nil); err != nil { + return xerrors.Errorf("convert configuration: %w", err) + } + + registry, err := getWasmRegistryFromUnversionedConfig(&cfg) + if err != nil { + return err + } + + SetOutOfTreeRegistries(registry) + + return nil +} + +// getWasmRegistryFromUnversionedConfig registers wasm plugins from the given unversioned configuration. +func getWasmRegistryFromUnversionedConfig(cfg *config.KubeSchedulerConfiguration) (runtime.Registry, error) { + registry := runtime.Registry{} + + for _, profile := range cfg.Profiles { + wasmplugins := sets.New[string]() + // look for the wasm plugin in the plugin config. + for _, config := range profile.PluginConfig { + if err := runtime.DecodeInto(config.Args, &wasm.PluginConfig{}); err != nil { + // not wasm plugin. + continue + } + + wasmplugins.Insert(config.Name) + } + + // look for the wasm plugin in the enabled plugins. + // (assuming that the wasm plugin is specified as a multi-point plugin.) + for _, plugin := range profile.Plugins.MultiPoint.Enabled { + if wasmplugins.Has(plugin.Name) { + if err := registry.Register(plugin.Name, wasm.PluginFactory(plugin.Name)); err != nil { + return nil, xerrors.Errorf("register plugin %s: %w", plugin.Name, err) + } + } + } + } + + return registry, nil +} diff --git a/simulator/scheduler/config/wasm_test.go b/simulator/scheduler/config/wasm_test.go new file mode 100644 index 00000000..8a2fd01e --- /dev/null +++ b/simulator/scheduler/config/wasm_test.go @@ -0,0 +1,121 @@ +package config + +import ( + "testing" + + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/kubernetes/pkg/scheduler/apis/config" +) + +func TestGetWasmRegistryFromUnversionedConfig(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + cfg *config.KubeSchedulerConfiguration + expected int + }{ + { + name: "no profiles", + cfg: &config.KubeSchedulerConfiguration{}, + expected: 0, + }, + { + name: "no wasm plugins", + cfg: &config.KubeSchedulerConfiguration{ + Profiles: []config.KubeSchedulerProfile{ + { + PluginConfig: []config.PluginConfig{ + { + Name: "DefaultPreemption", + Args: &config.DefaultPreemptionArgs{}, + }, + }, + Plugins: &config.Plugins{ + MultiPoint: config.PluginSet{ + Enabled: []config.Plugin{ + {Name: "DefaultPreemption"}, + }, + }, + }, + }, + }, + }, + expected: 0, + }, + { + name: "one wasm plugin", + cfg: &config.KubeSchedulerConfiguration{ + Profiles: []config.KubeSchedulerProfile{ + { + PluginConfig: []config.PluginConfig{ + { + Name: "DefaultPreemption", + Args: &config.DefaultPreemptionArgs{}, + }, + {Name: "wasmPlugin", Args: &runtime.Unknown{ + ContentType: runtime.ContentTypeJSON, + Raw: []byte(`{"guestURL":"http://example.com/plugin.wasm"}`), + }}, + }, + Plugins: &config.Plugins{ + MultiPoint: config.PluginSet{ + Enabled: []config.Plugin{ + {Name: "DefaultPreemption"}, + {Name: "wasmPlugin"}, + }, + }, + }, + }, + }, + }, + expected: 1, + }, + { + name: "multiple wasm plugins", + cfg: &config.KubeSchedulerConfiguration{ + Profiles: []config.KubeSchedulerProfile{ + { + PluginConfig: []config.PluginConfig{ + { + Name: "DefaultPreemption", + Args: &config.DefaultPreemptionArgs{}, + }, + {Name: "wasmPlugin1", Args: &runtime.Unknown{ + ContentType: runtime.ContentTypeJSON, + Raw: []byte(`{"guestURL":"http://example.com/plugin1.wasm"}`), + }}, + {Name: "wasmPlugin2", Args: &runtime.Unknown{ + ContentType: runtime.ContentTypeJSON, + Raw: []byte(`{"guestURL":"http://example.com/plugin2.wasm"}`), + }}, + }, + Plugins: &config.Plugins{ + MultiPoint: config.PluginSet{ + Enabled: []config.Plugin{ + {Name: "DefaultPreemption"}, + {Name: "wasmPlugin1"}, + {Name: "wasmPlugin2"}, + }, + }, + }, + }, + }, + }, + expected: 2, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + registry, err := getWasmRegistryFromUnversionedConfig(tt.cfg) + require.NoError(t, err, "check error") + if len(registry) != tt.expected { + t.Errorf("expected %d plugins, got %d", tt.expected, len(registry)) + } + }) + } +}