Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filter import resources use labelSelector #376

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion simulator/cmd/simulator/simulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func startSimulator() error {
// This must be called after `StartScheduler`
timeoutCtx, timeoutCancel := context.WithTimeout(ctx, importTimeout)
defer timeoutCancel()
if err := dic.OneshotClusterResourceImporter().ImportClusterResources(timeoutCtx); err != nil {
if err := dic.OneshotClusterResourceImporter().ImportClusterResources(timeoutCtx, cfg.ImportLabel); err != nil {
return xerrors.Errorf("import from the target cluster: %w", err)
}
}
Expand Down
2 changes: 2 additions & 0 deletions simulator/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type Config struct {
// ExternalImportEnabled indicates whether the simulator will import resources from a target cluster once
// when it's started.
ExternalImportEnabled bool
ImportLabel map[string]string
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need a comment

Suggested change
ImportLabel map[string]string
// ImportLabel indicates xxxx
ImportLabel map[string]string

// ExternalImportEnabled indicates whether the simulator will keep syncing resources from a target cluster.
ResourceSyncEnabled bool
// ExternalKubeClientCfg is KubeConfig to get resources from external cluster.
Expand Down Expand Up @@ -105,6 +106,7 @@ func NewConfig() (*Config, error) {
CorsAllowedOriginList: corsAllowedOriginList,
InitialSchedulerCfg: initialschedulerCfg,
ExternalImportEnabled: externalimportenabled,
ImportLabel: configYaml.ImportLabel,
ExternalKubeClientCfg: externalKubeClientCfg,
ExternalSchedulerEnabled: externalSchedEnabled,
ResourceSyncEnabled: resourceSyncEnabled,
Expand Down
2 changes: 2 additions & 0 deletions simulator/config/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ type SimulatorConfiguration struct {
// Note, this is still a beta feature.
ExternalImportEnabled bool `json:"externalImportEnabled,omitempty"`

ImportLabel map[string]string `json:"importLabel,omitempty"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto


// This variable indicates whether the simulator will
// sync resources from an user cluster's or not.
ResourceSyncEnabled bool `json:"resourceSyncEnabled,omitempty"`
Expand Down
6 changes: 3 additions & 3 deletions simulator/oneshotimporter/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type Service struct {

type ReplicateService interface {
// Snap will be used to export resources from target cluster.
Snap(ctx context.Context, opts ...snapshot.Option) (*snapshot.ResourcesForSnap, error)
Snap(ctx context.Context, importLabel map[string]string, opts ...snapshot.Option) (*snapshot.ResourcesForSnap, error)
// Load will be used to import resources the from data which was exported.
Load(ctx context.Context, resources *snapshot.ResourcesForLoad, opts ...snapshot.Option) error
IgnoreErr() snapshot.Option
Expand All @@ -40,8 +40,8 @@ func NewService(e ReplicateService, i ReplicateService) *Service {
// Note: this method doesn't handle scheduler configuration.
// If you want to use the scheduler configuration along with the imported resources on the simulator,
// you need to set the path of the scheduler configuration file to `kubeSchedulerConfigPath` value in the Simulator Server Configuration.
func (s *Service) ImportClusterResources(ctx context.Context) error {
expRes, err := s.exportService.Snap(ctx)
func (s *Service) ImportClusterResources(ctx context.Context, importLabel map[string]string) error {
expRes, err := s.exportService.Snap(ctx, importLabel)
if err != nil {
return xerrors.Errorf("call Snap of the exportService: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions simulator/server/di/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type SchedulerService interface {

// SnapshotService represents a service for exporting/importing resources on the simulator.
type SnapshotService interface {
Snap(ctx context.Context, opts ...snapshot.Option) (*snapshot.ResourcesForSnap, error)
Snap(ctx context.Context, importLabel map[string]string, opts ...snapshot.Option) (*snapshot.ResourcesForSnap, error)
Load(ctx context.Context, resources *snapshot.ResourcesForLoad, opts ...snapshot.Option) error
IgnoreErr() snapshot.Option
}
Expand All @@ -35,7 +35,7 @@ type ResetService interface {

// OneShotClusterResourceImporter represents a service to import resources from an target cluster when starting the simulator.
type OneShotClusterResourceImporter interface {
ImportClusterResources(ctx context.Context) error
ImportClusterResources(ctx context.Context, importLabel map[string]string) error
}

// ResourceSyncer represents a service to constantly sync resources from an target cluster.
Expand Down
3 changes: 2 additions & 1 deletion simulator/server/handler/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ func NewSnapshotHandler(s di.SnapshotService) *SnapshotHandler {
func (h *SnapshotHandler) Snap(c echo.Context) error {
ctx := c.Request().Context()

rs, err := h.service.Snap(ctx)
var label map[string]string
rs, err := h.service.Snap(ctx, label)
if err != nil {
klog.Errorf("failed to save all resources: %+v", err)
return echo.NewHTTPError(http.StatusInternalServerError)
Expand Down
111 changes: 87 additions & 24 deletions simulator/snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,29 +100,29 @@ func (s *Service) IgnoreSchedulerConfiguration() Option {
}

// get gets all resources from each service.
func (s *Service) get(ctx context.Context, opts options) (*ResourcesForSnap, error) {
func (s *Service) get(ctx context.Context, importLabel map[string]string, opts options) (*ResourcesForSnap, error) {
errgrp := util.NewErrGroupWithSemaphore(ctx)
resources := ResourcesForSnap{}

if err := s.listPods(ctx, &resources, errgrp, opts); err != nil {
if err := s.listPods(ctx, &resources, errgrp, importLabel, opts); err != nil {
return nil, xerrors.Errorf("call listPods: %w", err)
}
if err := s.listNodes(ctx, &resources, errgrp, opts); err != nil {
if err := s.listNodes(ctx, &resources, errgrp, importLabel, opts); err != nil {
return nil, xerrors.Errorf("call listNodes: %w", err)
}
if err := s.listPvs(ctx, &resources, errgrp, opts); err != nil {
if err := s.listPvs(ctx, &resources, errgrp, importLabel, opts); err != nil {
return nil, xerrors.Errorf("call listPvs: %w", err)
}
if err := s.listPvcs(ctx, &resources, errgrp, opts); err != nil {
if err := s.listPvcs(ctx, &resources, errgrp, importLabel, opts); err != nil {
return nil, xerrors.Errorf("call listPvcs: %w", err)
}
if err := s.listStorageClasses(ctx, &resources, errgrp, opts); err != nil {
if err := s.listStorageClasses(ctx, &resources, errgrp, importLabel, opts); err != nil {
return nil, xerrors.Errorf("call listStorageClasses: %w", err)
}
if err := s.listPcs(ctx, &resources, errgrp, opts); err != nil {
if err := s.listPcs(ctx, &resources, errgrp, importLabel, opts); err != nil {
return nil, xerrors.Errorf("call listPcs: %w", err)
}
if err := s.listNamespaces(ctx, &resources, errgrp, opts); err != nil {
if err := s.listNamespaces(ctx, &resources, errgrp, importLabel, opts); err != nil {
return nil, xerrors.Errorf("call listNamespaces: %w", err)
}
if err := s.getSchedulerConfig(&resources, errgrp, opts); err != nil {
Expand All @@ -136,12 +136,12 @@ func (s *Service) get(ctx context.Context, opts options) (*ResourcesForSnap, err
}

// Snap exports all resources as one data.
func (s *Service) Snap(ctx context.Context, opts ...Option) (*ResourcesForSnap, error) {
func (s *Service) Snap(ctx context.Context, importLabel map[string]string, opts ...Option) (*ResourcesForSnap, error) {
options := options{}
for _, o := range opts {
o.apply(&options)
}
resources, err := s.get(ctx, options)
resources, err := s.get(ctx, importLabel, options)
if err != nil {
return nil, xerrors.Errorf("failed to get(): %w", err)
}
Expand Down Expand Up @@ -214,9 +214,18 @@ func (s *Service) Load(ctx context.Context, resources *ResourcesForLoad, opts ..
return nil
}

func (s *Service) listPods(ctx context.Context, r *ResourcesForSnap, eg *util.SemaphoredErrGroup, opts options) error {
func (s *Service) listPods(ctx context.Context, r *ResourcesForSnap, eg *util.SemaphoredErrGroup, importLabel map[string]string, opts options) error {
if err := eg.Go(func() error {
pods, err := s.client.CoreV1().Pods(metav1.NamespaceAll).List(ctx, metav1.ListOptions{})
labelSelector := metav1.LabelSelector{
MatchLabels: importLabel,
}
selector, err := metav1.LabelSelectorAsSelector(&labelSelector)
if err != nil {
return xerrors.Errorf("failed to create label selector: %w", err)
}
pods, err := s.client.CoreV1().Pods(metav1.NamespaceAll).List(ctx, metav1.ListOptions{
LabelSelector: selector.String(),
})
if err != nil {
if !opts.ignoreErr {
return xerrors.Errorf("call list Pod: %w", err)
Expand All @@ -232,9 +241,18 @@ func (s *Service) listPods(ctx context.Context, r *ResourcesForSnap, eg *util.Se
return nil
}

func (s *Service) listNodes(ctx context.Context, r *ResourcesForSnap, eg *util.SemaphoredErrGroup, opts options) error {
func (s *Service) listNodes(ctx context.Context, r *ResourcesForSnap, eg *util.SemaphoredErrGroup, importLabel map[string]string, opts options) error {
if err := eg.Go(func() error {
nodes, err := s.client.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
labelSelector := metav1.LabelSelector{
MatchLabels: importLabel,
}
selector, err := metav1.LabelSelectorAsSelector(&labelSelector)
if err != nil {
return xerrors.Errorf("failed to create label selector: %w", err)
}
nodes, err := s.client.CoreV1().Nodes().List(ctx, metav1.ListOptions{
LabelSelector: selector.String(),
})
if err != nil {
if !opts.ignoreErr {
return xerrors.Errorf("call list Node: %w", err)
Expand All @@ -250,9 +268,18 @@ func (s *Service) listNodes(ctx context.Context, r *ResourcesForSnap, eg *util.S
return nil
}

func (s *Service) listPvs(ctx context.Context, r *ResourcesForSnap, eg *util.SemaphoredErrGroup, opts options) error {
func (s *Service) listPvs(ctx context.Context, r *ResourcesForSnap, eg *util.SemaphoredErrGroup, importLabel map[string]string, opts options) error {
if err := eg.Go(func() error {
pvs, err := s.client.CoreV1().PersistentVolumes().List(ctx, metav1.ListOptions{})
labelSelector := metav1.LabelSelector{
MatchLabels: importLabel,
}
selector, err := metav1.LabelSelectorAsSelector(&labelSelector)
if err != nil {
return xerrors.Errorf("failed to create label selector: %w", err)
}
pvs, err := s.client.CoreV1().PersistentVolumes().List(ctx, metav1.ListOptions{
LabelSelector: selector.String(),
})
if err != nil {
if !opts.ignoreErr {
return xerrors.Errorf("call list PersistentVolume: %w", err)
Expand All @@ -268,9 +295,18 @@ func (s *Service) listPvs(ctx context.Context, r *ResourcesForSnap, eg *util.Sem
return nil
}

func (s *Service) listPvcs(ctx context.Context, r *ResourcesForSnap, eg *util.SemaphoredErrGroup, opts options) error {
func (s *Service) listPvcs(ctx context.Context, r *ResourcesForSnap, eg *util.SemaphoredErrGroup, importLabel map[string]string, opts options) error {
if err := eg.Go(func() error {
pvcs, err := s.client.CoreV1().PersistentVolumeClaims(metav1.NamespaceAll).List(ctx, metav1.ListOptions{})
labelSelector := metav1.LabelSelector{
MatchLabels: importLabel,
}
selector, err := metav1.LabelSelectorAsSelector(&labelSelector)
if err != nil {
return xerrors.Errorf("failed to create label selector: %w", err)
}
pvcs, err := s.client.CoreV1().PersistentVolumeClaims(metav1.NamespaceAll).List(ctx, metav1.ListOptions{
LabelSelector: selector.String(),
})
if err != nil {
if !opts.ignoreErr {
return xerrors.Errorf("call list PersistentVolumeClaim: %w", err)
Expand All @@ -286,9 +322,18 @@ func (s *Service) listPvcs(ctx context.Context, r *ResourcesForSnap, eg *util.Se
return nil
}

func (s *Service) listStorageClasses(ctx context.Context, r *ResourcesForSnap, eg *util.SemaphoredErrGroup, opts options) error {
func (s *Service) listStorageClasses(ctx context.Context, r *ResourcesForSnap, eg *util.SemaphoredErrGroup, importLabel map[string]string, opts options) error {
if err := eg.Go(func() error {
scs, err := s.client.StorageV1().StorageClasses().List(ctx, metav1.ListOptions{})
labelSelector := metav1.LabelSelector{
MatchLabels: importLabel,
}
selector, err := metav1.LabelSelectorAsSelector(&labelSelector)
if err != nil {
return xerrors.Errorf("failed to create label selector: %w", err)
}
scs, err := s.client.StorageV1().StorageClasses().List(ctx, metav1.ListOptions{
LabelSelector: selector.String(),
})
if err != nil {
if !opts.ignoreErr {
return xerrors.Errorf("call list StorageClass: %w", err)
Expand All @@ -304,9 +349,18 @@ func (s *Service) listStorageClasses(ctx context.Context, r *ResourcesForSnap, e
return nil
}

func (s *Service) listPcs(ctx context.Context, r *ResourcesForSnap, eg *util.SemaphoredErrGroup, opts options) error {
func (s *Service) listPcs(ctx context.Context, r *ResourcesForSnap, eg *util.SemaphoredErrGroup, importLabel map[string]string, opts options) error {
if err := eg.Go(func() error {
pcs, err := s.client.SchedulingV1().PriorityClasses().List(ctx, metav1.ListOptions{})
labelSelector := metav1.LabelSelector{
MatchLabels: importLabel,
}
selector, err := metav1.LabelSelectorAsSelector(&labelSelector)
if err != nil {
return xerrors.Errorf("failed to create label selector: %w", err)
}
pcs, err := s.client.SchedulingV1().PriorityClasses().List(ctx, metav1.ListOptions{
LabelSelector: selector.String(),
})
if err != nil {
if !opts.ignoreErr {
return xerrors.Errorf("call list PriorityClass: %w", err)
Expand All @@ -328,9 +382,18 @@ func (s *Service) listPcs(ctx context.Context, r *ResourcesForSnap, eg *util.Sem
return nil
}

func (s *Service) listNamespaces(ctx context.Context, r *ResourcesForSnap, eg *util.SemaphoredErrGroup, opts options) error {
func (s *Service) listNamespaces(ctx context.Context, r *ResourcesForSnap, eg *util.SemaphoredErrGroup, importLabel map[string]string, opts options) error {
if err := eg.Go(func() error {
nss, err := s.client.CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
labelSelector := metav1.LabelSelector{
MatchLabels: importLabel,
}
selector, err := metav1.LabelSelectorAsSelector(&labelSelector)
if err != nil {
return xerrors.Errorf("failed to create label selector: %w", err)
}
nss, err := s.client.CoreV1().Namespaces().List(ctx, metav1.ListOptions{
LabelSelector: selector.String(),
})
if err != nil {
if !opts.ignoreErr {
return xerrors.Errorf("call list Namespace: %w", err)
Expand Down
54 changes: 52 additions & 2 deletions simulator/snapshot/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ func TestService_Snap(t *testing.T) {
name string
prepareEachServiceMockFn func(*mock_snapshot.MockSchedulerService)
prepareFakeClientSetFn func() *fake.Clientset
labels map[string]string
wantReturn func() *ResourcesForSnap
wantErr error
}{
Expand Down Expand Up @@ -520,6 +521,54 @@ func TestService_Snap(t *testing.T) {
return r
},
},
{
name: "Snap all Pods with different namespaces use labelSelector",
prepareEachServiceMockFn: func(ss *mock_snapshot.MockSchedulerService) {
ss.EXPECT().GetSchedulerConfig().Return(&configv1.KubeSchedulerConfiguration{}, nil)
},
prepareFakeClientSetFn: func() *fake.Clientset {
c := fake.NewSimpleClientset()
ctx := context.Background()
// add test data.
invokeResourcesFn(ctx, c, SettingClientFuncMap{
pod: func(ctx context.Context, c *fake.Clientset) {
c.CoreV1().Pods(testNamespace1).Create(ctx, &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
Labels: map[string]string{
"test": "test1",
},
},
Spec: corev1.PodSpec{
NodeName: "node1",
},
}, metav1.CreateOptions{})
c.CoreV1().Pods(testNamespace2).Create(ctx, &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod2",
},
Spec: corev1.PodSpec{
NodeName: "node1",
},
}, metav1.CreateOptions{})
},
}, defaultFuncs)
return c
},
labels: map[string]string{
"test": "test1",
},
wantReturn: func() *ResourcesForSnap {
r := defaultResForSnapFn()
r.Pods = []corev1.Pod{
{
ObjectMeta: metav1.ObjectMeta{Name: "pod1", Namespace: testNamespace1},
Spec: corev1.PodSpec{NodeName: "node1"},
},
}
return r
},
},
}
for _, tt := range tests {
tt := tt
Expand All @@ -531,7 +580,7 @@ func TestService_Snap(t *testing.T) {

s := NewService(fakeClientset, mockSchedulerSvc)
tt.prepareEachServiceMockFn(mockSchedulerSvc)
r, err := s.Snap(context.Background())
r, err := s.Snap(context.Background(), tt.labels)

var diffResponse string
if tt.wantReturn != nil {
Expand Down Expand Up @@ -750,7 +799,8 @@ func TestService_Snap_IgnoreErrOption(t *testing.T) {
mockSchedulerSvc := mock_snapshot.NewMockSchedulerService(ctrl)
s := NewService(fakeClientset, mockSchedulerSvc)
tt.prepareEachServiceMockFn(mockSchedulerSvc)
r, err := s.Snap(context.Background(), s.IgnoreErr())
var labels map[string]string
r, err := s.Snap(context.Background(), labels, s.IgnoreErr())

var diffResponse string
if tt.wantReturn != nil {
Expand Down