Skip to content

Commit

Permalink
feat: support the wasm extension in the debuggable scheduler (#393)
Browse files Browse the repository at this point in the history
  • Loading branch information
jgiannuzzi authored Nov 12, 2024
1 parent 70484f1 commit b3c7a44
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 1 deletion.
4 changes: 3 additions & 1 deletion simulator/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ require (
github.com/spf13/cobra v1.7.0
github.com/stretchr/testify v1.9.0
go.etcd.io/etcd/client/v3 v3.5.10
golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e
golang.org/x/sync v0.8.0
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2
gopkg.in/yaml.v2 v2.4.0
Expand All @@ -56,6 +57,7 @@ require (
k8s.io/kube-scheduler v0.30.4
k8s.io/kubernetes v1.30.4
k8s.io/utils v0.0.0-20230726121419-3b25d923346b
sigs.k8s.io/kube-scheduler-wasm-extension/scheduler v0.0.0-20241111155045-2ce7072312c8
)

require (
Expand Down Expand Up @@ -121,6 +123,7 @@ require (
github.com/prometheus/procfs v0.10.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stoewer/go-strcase v1.2.0 // indirect
github.com/tetratelabs/wazero v1.7.2 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasttemplate v1.2.1 // indirect
go.etcd.io/etcd/api/v3 v3.5.10 // indirect
Expand All @@ -138,7 +141,6 @@ require (
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.26.0 // indirect
golang.org/x/crypto v0.26.0 // indirect
golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/net v0.28.0 // indirect
golang.org/x/oauth2 v0.22.0 // indirect
Expand Down
6 changes: 6 additions & 0 deletions simulator/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tetratelabs/wazero v1.7.2 h1:1+z5nXJNwMLPAWaTePFi49SSTL0IMx/i3Fg8Yc25GDc=
github.com/tetratelabs/wazero v1.7.2/go.mod h1:ytl6Zuh20R/eROuyDaGPkp82O9C/DJfXAwJfQ3X6/7Y=
github.com/tmc/grpc-websocket-proxy v0.0.0-20220101234140-673ab2c3ae75 h1:6fotK7otjonDflCTK0BCfls4SPy3NcCVb5dqqmbRknE=
github.com/tmc/grpc-websocket-proxy v0.0.0-20220101234140-673ab2c3ae75/go.mod h1:KO6IkyS8Y3j8OdNO85qEYBsRPuteD+YciPomcXdrMnk=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
Expand Down Expand Up @@ -430,6 +432,8 @@ k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 h1:BZqlfIlq5YbRMFko6/PM7F
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340/go.mod h1:yD4MZYeKMBwQKVht279WycxKyM84kkAx2DPrTXaeb98=
k8s.io/kube-scheduler v0.30.4 h1:6H+NfNuJ4RvUdSD1dtnJBXuYGAp+ETGZaxFk4sLGPdU=
k8s.io/kube-scheduler v0.30.4/go.mod h1:7MMICziKySYQQ7rOFGBsSkn1PE3urPwsxlp6uwU+NhU=
k8s.io/kubectl v0.30.4 h1:jU+6WSvcSgzs07H624RA2L79FnqppyV6BKI+dHWDzyo=
k8s.io/kubectl v0.30.4/go.mod h1:4KnGCshO4fFxd/tncWcbKH3Nj9wtoFYwMYPj8CUnduE=
k8s.io/kubelet v0.30.4 h1:2TP59RVxuWuKpD58gQ6qow1Oy2Ys2uOH4hfSD/qv5EQ=
k8s.io/kubelet v0.30.4/go.mod h1:v0lRl+1y2NNId5OlFiJ1rhjXc9D8Tp7PqvQYJS7W/L0=
k8s.io/kubernetes v1.30.4 h1:LfWX7JNmT9Hp8uFVHsB9gQCZesjcWTQ02PHwMz6dGqk=
Expand All @@ -442,6 +446,8 @@ sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.29.0 h1:/U5vjBbQn3RCh
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.29.0/go.mod h1:z7+wmGM2dfIiLRfrC6jb5kV2Mq/sK1ZP303cxzkV5Y4=
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo=
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0=
sigs.k8s.io/kube-scheduler-wasm-extension/scheduler v0.0.0-20241111155045-2ce7072312c8 h1:jOyiY8zbvXtuMb7ImqNcSFtVOmnXhWQ8pxhECz95PKk=
sigs.k8s.io/kube-scheduler-wasm-extension/scheduler v0.0.0-20241111155045-2ce7072312c8/go.mod h1:N0BrfQ58AMkBbmjeGupvhUv/FFErpuYYs8D1YPPyY0Q=
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+sGiqlzvrtq4=
sigs.k8s.io/structured-merge-diff/v4 v4.4.1/go.mod h1:N8hJocpFajUSSeSJ9bOZ77VzejKZaXsTtZo4/u7Io08=
sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo=
Expand Down
6 changes: 6 additions & 0 deletions simulator/pkg/debuggablescheduler/debuggable_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ func NewConfigs() (Configs, error) {
return Configs{}, xerrors.Errorf("load scheduler config: %w", err)
}

// Register wasm plugins to the wasm registry.
// This _needs_ to happen before the scheduler configuration is converted.
if err := simulatorschedulerconfig.RegisterWasmPlugins(versionedcfg); err != nil {
return Configs{}, xerrors.Errorf("register wasm plugins: %w", err)
}

versioned, err := scheduler.ConvertConfigurationForSimulator(versionedcfg)
if err != nil {
return Configs{}, xerrors.Errorf("convert scheduler config to apply: %w", err)
Expand Down
58 changes: 58 additions & 0 deletions simulator/scheduler/config/wasm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package config

import (
"golang.org/x/xerrors"
"k8s.io/apimachinery/pkg/util/sets"
configv1 "k8s.io/kube-scheduler/config/v1"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/apis/config/scheme"
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
wasm "sigs.k8s.io/kube-scheduler-wasm-extension/scheduler/plugin"
)

// RegisterWasmPlugins registers wasm plugins from the given configuration.
func RegisterWasmPlugins(versionedCfg *configv1.KubeSchedulerConfiguration) error {
cfg := config.KubeSchedulerConfiguration{}
if err := scheme.Scheme.Convert(versionedCfg, &cfg, nil); err != nil {
return xerrors.Errorf("convert configuration: %w", err)
}

registry, err := getWasmRegistryFromUnversionedConfig(&cfg)
if err != nil {
return err
}

SetOutOfTreeRegistries(registry)

return nil
}

// getWasmRegistryFromUnversionedConfig registers wasm plugins from the given unversioned configuration.
func getWasmRegistryFromUnversionedConfig(cfg *config.KubeSchedulerConfiguration) (runtime.Registry, error) {
registry := runtime.Registry{}

for _, profile := range cfg.Profiles {
wasmplugins := sets.New[string]()
// look for the wasm plugin in the plugin config.
for _, config := range profile.PluginConfig {
if err := runtime.DecodeInto(config.Args, &wasm.PluginConfig{}); err != nil {
// not wasm plugin.
continue
}

wasmplugins.Insert(config.Name)
}

// look for the wasm plugin in the enabled plugins.
// (assuming that the wasm plugin is specified as a multi-point plugin.)
for _, plugin := range profile.Plugins.MultiPoint.Enabled {
if wasmplugins.Has(plugin.Name) {
if err := registry.Register(plugin.Name, wasm.PluginFactory(plugin.Name)); err != nil {
return nil, xerrors.Errorf("register plugin %s: %w", plugin.Name, err)
}
}
}
}

return registry, nil
}
121 changes: 121 additions & 0 deletions simulator/scheduler/config/wasm_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package config

import (
"testing"

"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
)

func TestGetWasmRegistryFromUnversionedConfig(t *testing.T) {
t.Parallel()

tests := []struct {
name string
cfg *config.KubeSchedulerConfiguration
expected int
}{
{
name: "no profiles",
cfg: &config.KubeSchedulerConfiguration{},
expected: 0,
},
{
name: "no wasm plugins",
cfg: &config.KubeSchedulerConfiguration{
Profiles: []config.KubeSchedulerProfile{
{
PluginConfig: []config.PluginConfig{
{
Name: "DefaultPreemption",
Args: &config.DefaultPreemptionArgs{},
},
},
Plugins: &config.Plugins{
MultiPoint: config.PluginSet{
Enabled: []config.Plugin{
{Name: "DefaultPreemption"},
},
},
},
},
},
},
expected: 0,
},
{
name: "one wasm plugin",
cfg: &config.KubeSchedulerConfiguration{
Profiles: []config.KubeSchedulerProfile{
{
PluginConfig: []config.PluginConfig{
{
Name: "DefaultPreemption",
Args: &config.DefaultPreemptionArgs{},
},
{Name: "wasmPlugin", Args: &runtime.Unknown{
ContentType: runtime.ContentTypeJSON,
Raw: []byte(`{"guestURL":"http://example.com/plugin.wasm"}`),
}},
},
Plugins: &config.Plugins{
MultiPoint: config.PluginSet{
Enabled: []config.Plugin{
{Name: "DefaultPreemption"},
{Name: "wasmPlugin"},
},
},
},
},
},
},
expected: 1,
},
{
name: "multiple wasm plugins",
cfg: &config.KubeSchedulerConfiguration{
Profiles: []config.KubeSchedulerProfile{
{
PluginConfig: []config.PluginConfig{
{
Name: "DefaultPreemption",
Args: &config.DefaultPreemptionArgs{},
},
{Name: "wasmPlugin1", Args: &runtime.Unknown{
ContentType: runtime.ContentTypeJSON,
Raw: []byte(`{"guestURL":"http://example.com/plugin1.wasm"}`),
}},
{Name: "wasmPlugin2", Args: &runtime.Unknown{
ContentType: runtime.ContentTypeJSON,
Raw: []byte(`{"guestURL":"http://example.com/plugin2.wasm"}`),
}},
},
Plugins: &config.Plugins{
MultiPoint: config.PluginSet{
Enabled: []config.Plugin{
{Name: "DefaultPreemption"},
{Name: "wasmPlugin1"},
{Name: "wasmPlugin2"},
},
},
},
},
},
},
expected: 2,
},
}

for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
registry, err := getWasmRegistryFromUnversionedConfig(tt.cfg)
require.NoError(t, err, "check error")
if len(registry) != tt.expected {
t.Errorf("expected %d plugins, got %d", tt.expected, len(registry))
}
})
}
}

0 comments on commit b3c7a44

Please sign in to comment.