Skip to content

Commit

Permalink
Remove the Velero generated informer from PVB and PVR.
Browse files Browse the repository at this point in the history
Signed-off-by: Xun Jiang <[email protected]>
  • Loading branch information
Xun Jiang committed Nov 1, 2023
1 parent b94b8da commit fefcb66
Show file tree
Hide file tree
Showing 9 changed files with 96 additions and 60 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/7041-blackpiglet
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Remove the Velero generated client.
10 changes: 10 additions & 0 deletions pkg/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ type server struct {
dynamicClient dynamic.Interface
csiSnapshotClient *snapshotv1client.Clientset
csiSnapshotLister snapshotv1listers.VolumeSnapshotLister
kubeWatchClient ctrlclient.WithWatch
ctx context.Context
cancelFunc context.CancelFunc
logger logrus.FieldLogger
Expand Down Expand Up @@ -305,6 +306,11 @@ func newServer(f client.Factory, config serverConfig, logger *logrus.Logger) (*s
return nil, err
}

kubeWatchClient, err := f.KubebuilderWatchClient()
if err != nil {
return nil, err
}

Check warning on line 312 in pkg/cmd/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/server/server.go#L309-L312

Added lines #L309 - L312 were not covered by tests

pluginRegistry := process.NewRegistry(config.pluginDir, logger, logger.Level)
if err := pluginRegistry.DiscoverPlugins(); err != nil {
return nil, err
Expand Down Expand Up @@ -383,6 +389,7 @@ func newServer(f client.Factory, config serverConfig, logger *logrus.Logger) (*s
veleroClient: veleroClient,
discoveryClient: veleroClient.Discovery(),
dynamicClient: dynamicClient,
kubeWatchClient: kubeWatchClient,

Check warning on line 392 in pkg/cmd/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/server/server.go#L392

Added line #L392 was not covered by tests
ctx: ctx,
cancelFunc: cancelFunc,
logger: logger,
Expand Down Expand Up @@ -740,6 +747,7 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
s.kubeClient.CoreV1(),
s.kubeClient.CoreV1(),
s.kubeClient.CoreV1(),
s.kubeWatchClient,

Check warning on line 750 in pkg/cmd/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/server/server.go#L750

Added line #L750 was not covered by tests
s.logger,
),
s.config.podVolumeOperationTimeout,
Expand Down Expand Up @@ -822,6 +830,7 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
s.kubeClient.CoreV1(),
s.kubeClient.CoreV1(),
s.kubeClient.CoreV1(),
s.kubeWatchClient,

Check warning on line 833 in pkg/cmd/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/server/server.go#L833

Added line #L833 was not covered by tests
s.logger,
),
s.config.podVolumeOperationTimeout,
Expand Down Expand Up @@ -922,6 +931,7 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
s.kubeClient.CoreV1(),
s.kubeClient.CoreV1(),
s.kubeClient,
s.kubeWatchClient,

Check warning on line 934 in pkg/cmd/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/server/server.go#L934

Added line #L934 was not covered by tests
s.logger,
),
s.config.podVolumeOperationTimeout,
Expand Down
9 changes: 7 additions & 2 deletions pkg/podvolume/backupper.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,13 @@ func newBackupper(
ctx context.Context,
repoLocker *repository.RepoLocker,
repoEnsurer *repository.Ensurer,
podVolumeBackupInformer cache.SharedIndexInformer,
sharedInformer cache.SharedInformer,
veleroClient clientset.Interface,
pvcClient corev1client.PersistentVolumeClaimsGetter,
pvClient corev1client.PersistentVolumesGetter,
podClient corev1client.PodsGetter,
uploaderType string,
backup *velerov1api.Backup,
log logrus.FieldLogger,
) *backupper {
b := &backupper{
Expand All @@ -124,11 +125,15 @@ func newBackupper(
results: make(map[string]chan *velerov1api.PodVolumeBackup),
}

podVolumeBackupInformer.AddEventHandler(
sharedInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
UpdateFunc: func(_, obj interface{}) {
pvb := obj.(*velerov1api.PodVolumeBackup)

if pvb.GetLabels()[velerov1api.BackupUIDLabel] != string(backup.UID) {
return
}

Check warning on line 135 in pkg/podvolume/backupper.go

View check run for this annotation

Codecov / codecov/patch

pkg/podvolume/backupper.go#L133-L135

Added lines #L133 - L135 were not covered by tests

if pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseCompleted || pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed {
b.resultsLock.Lock()
defer b.resultsLock.Unlock()
Expand Down
55 changes: 28 additions & 27 deletions pkg/podvolume/backupper_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@ package podvolume

import (
"context"
"fmt"
"time"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"

velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
clientset "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned"
velerov1informers "github.com/vmware-tanzu/velero/pkg/generated/informers/externalversions/velero/v1"
"github.com/vmware-tanzu/velero/pkg/repository"
"github.com/vmware-tanzu/velero/pkg/util/kube"
)

// BackupperFactory can construct pod volumes backuppers.
Expand All @@ -45,41 +45,42 @@ func NewBackupperFactory(
pvcClient corev1client.PersistentVolumeClaimsGetter,
pvClient corev1client.PersistentVolumesGetter,
podClient corev1client.PodsGetter,
kubeWatchClient ctrlclient.WithWatch,
log logrus.FieldLogger,
) BackupperFactory {
return &backupperFactory{
repoLocker: repoLocker,
repoEnsurer: repoEnsurer,
veleroClient: veleroClient,
pvcClient: pvcClient,
pvClient: pvClient,
podClient: podClient,
log: log,
repoLocker: repoLocker,
repoEnsurer: repoEnsurer,
veleroClient: veleroClient,
pvcClient: pvcClient,
pvClient: pvClient,
podClient: podClient,
kubeWatchClient: kubeWatchClient,
log: log,
}
}

type backupperFactory struct {
repoLocker *repository.RepoLocker
repoEnsurer *repository.Ensurer
veleroClient clientset.Interface
pvcClient corev1client.PersistentVolumeClaimsGetter
pvClient corev1client.PersistentVolumesGetter
podClient corev1client.PodsGetter
log logrus.FieldLogger
repoLocker *repository.RepoLocker
repoEnsurer *repository.Ensurer
veleroClient clientset.Interface
pvcClient corev1client.PersistentVolumeClaimsGetter
pvClient corev1client.PersistentVolumesGetter
podClient corev1client.PodsGetter
kubeWatchClient ctrlclient.WithWatch
log logrus.FieldLogger
}

func (bf *backupperFactory) NewBackupper(ctx context.Context, backup *velerov1api.Backup, uploaderType string) (Backupper, error) {
informer := velerov1informers.NewFilteredPodVolumeBackupInformer(
bf.veleroClient,
backup.Namespace,
0,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
func(opts *metav1.ListOptions) {
opts.LabelSelector = fmt.Sprintf("%s=%s", velerov1api.BackupUIDLabel, backup.UID)
},
)
lw := kube.InternalLW{
Client: bf.kubeWatchClient,
Namespace: backup.Namespace,
ObjectList: new(velerov1api.PodVolumeBackupList),
}

informer := cache.NewSharedInformer(&lw, &velerov1api.PodVolumeBackup{}, time.Second)

b := newBackupper(ctx, bf.repoLocker, bf.repoEnsurer, informer, bf.veleroClient, bf.pvcClient, bf.pvClient, bf.podClient, uploaderType, bf.log)
b := newBackupper(ctx, bf.repoLocker, bf.repoEnsurer, informer, bf.veleroClient, bf.pvcClient, bf.pvClient, bf.podClient, uploaderType, backup, bf.log)

go informer.Run(ctx.Done())
if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) {
Expand Down
4 changes: 3 additions & 1 deletion pkg/podvolume/backupper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,12 +641,14 @@ func TestBackupPodVolumes(t *testing.T) {
}
var veleroClient versioned.Interface = fakeVeleroClient

fakeKubeWatchClient := velerotest.NewFakeControllerRuntimeWatchClient(t, test.kubeClientObj...)

ensurer := repository.NewEnsurer(fakeCtlClient, velerotest.NewLogger(), time.Millisecond)

backupObj := builder.ForBackup(velerov1api.DefaultNamespace, "fake-backup").Result()
backupObj.Spec.StorageLocation = test.bsl

factory := NewBackupperFactory(repository.NewRepoLocker(), ensurer, veleroClient, kubeClient.CoreV1(), kubeClient.CoreV1(), kubeClient.CoreV1(), velerotest.NewLogger())
factory := NewBackupperFactory(repository.NewRepoLocker(), ensurer, veleroClient, kubeClient.CoreV1(), kubeClient.CoreV1(), kubeClient.CoreV1(), fakeKubeWatchClient, velerotest.NewLogger())
bp, err := factory.NewBackupper(ctx, backupObj, test.uploaderType)

require.NoError(t, err)
Expand Down
8 changes: 6 additions & 2 deletions pkg/podvolume/restorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,12 @@ func newRestorer(
ctx context.Context,
repoLocker *repository.RepoLocker,
repoEnsurer *repository.Ensurer,
podVolumeRestoreInformer cache.SharedIndexInformer,
sharedInformer cache.SharedInformer,
veleroClient clientset.Interface,
pvcClient corev1client.PersistentVolumeClaimsGetter,
podClient corev1client.PodsGetter,
kubeClient kubernetes.Interface,
restore *velerov1api.Restore,
log logrus.FieldLogger,
) *restorer {
r := &restorer{
Expand All @@ -92,10 +93,13 @@ func newRestorer(
log: log,
}

podVolumeRestoreInformer.AddEventHandler(
sharedInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
UpdateFunc: func(_, obj interface{}) {
pvr := obj.(*velerov1api.PodVolumeRestore)
if pvr.GetLabels()[velerov1api.RestoreUIDLabel] != string(restore.UID) {
return
}

Check warning on line 102 in pkg/podvolume/restorer.go

View check run for this annotation

Codecov / codecov/patch

pkg/podvolume/restorer.go#L100-L102

Added lines #L100 - L102 were not covered by tests

if pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseCompleted || pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseFailed {
r.resultsLock.Lock()
Expand Down
55 changes: 28 additions & 27 deletions pkg/podvolume/restorer_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,19 @@ package podvolume

import (
"context"
"fmt"
"time"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"

velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
clientset "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned"
velerov1informers "github.com/vmware-tanzu/velero/pkg/generated/informers/externalversions/velero/v1"
"github.com/vmware-tanzu/velero/pkg/repository"
"github.com/vmware-tanzu/velero/pkg/util/kube"
)

// RestorerFactory can construct pod volumes restorers.
Expand All @@ -45,40 +45,41 @@ func NewRestorerFactory(repoLocker *repository.RepoLocker,
pvcClient corev1client.PersistentVolumeClaimsGetter,
podClient corev1client.PodsGetter,
kubeClient kubernetes.Interface,
kubeWatchClient ctrlclient.WithWatch,
log logrus.FieldLogger) RestorerFactory {
return &restorerFactory{
repoLocker: repoLocker,
repoEnsurer: repoEnsurer,
veleroClient: veleroClient,
pvcClient: pvcClient,
podClient: podClient,
kubeClient: kubeClient,
log: log,
repoLocker: repoLocker,
repoEnsurer: repoEnsurer,
veleroClient: veleroClient,
pvcClient: pvcClient,
podClient: podClient,
kubeClient: kubeClient,
kubeWatchClient: kubeWatchClient,
log: log,
}
}

type restorerFactory struct {
repoLocker *repository.RepoLocker
repoEnsurer *repository.Ensurer
veleroClient clientset.Interface
pvcClient corev1client.PersistentVolumeClaimsGetter
podClient corev1client.PodsGetter
kubeClient kubernetes.Interface
log logrus.FieldLogger
repoLocker *repository.RepoLocker
repoEnsurer *repository.Ensurer
veleroClient clientset.Interface
pvcClient corev1client.PersistentVolumeClaimsGetter
podClient corev1client.PodsGetter
kubeClient kubernetes.Interface
kubeWatchClient ctrlclient.WithWatch
log logrus.FieldLogger
}

func (rf *restorerFactory) NewRestorer(ctx context.Context, restore *velerov1api.Restore) (Restorer, error) {
informer := velerov1informers.NewFilteredPodVolumeRestoreInformer(
rf.veleroClient,
restore.Namespace,
0,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
func(opts *metav1.ListOptions) {
opts.LabelSelector = fmt.Sprintf("%s=%s", velerov1api.RestoreUIDLabel, restore.UID)
},
)
lw := kube.InternalLW{
Client: rf.kubeWatchClient,
Namespace: restore.Namespace,
ObjectList: new(velerov1api.PodVolumeRestoreList),
}

informer := cache.NewSharedInformer(&lw, &velerov1api.PodVolumeRestore{}, time.Second)

r := newRestorer(ctx, rf.repoLocker, rf.repoEnsurer, informer, rf.veleroClient, rf.pvcClient, rf.podClient, rf.kubeClient, rf.log)
r := newRestorer(ctx, rf.repoLocker, rf.repoEnsurer, informer, rf.veleroClient, rf.pvcClient, rf.podClient, rf.kubeClient, restore, rf.log)

go informer.Run(ctx.Done())
if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) {
Expand Down
5 changes: 4 additions & 1 deletion pkg/podvolume/restorer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,11 +418,14 @@ func TestRestorePodVolumes(t *testing.T) {
}
var veleroClient versioned.Interface = fakeVeleroClient

fakeKubeWatchClient := velerotest.NewFakeControllerRuntimeWatchClient(t, test.kubeClientObj...)

ensurer := repository.NewEnsurer(fakeCtlClient, velerotest.NewLogger(), time.Millisecond)

restoreObj := builder.ForRestore(velerov1api.DefaultNamespace, "fake-restore").Result()

factory := NewRestorerFactory(repository.NewRepoLocker(), ensurer, veleroClient, kubeClient.CoreV1(), kubeClient.CoreV1(), kubeClient, velerotest.NewLogger())
factory := NewRestorerFactory(repository.NewRepoLocker(), ensurer, veleroClient, kubeClient.CoreV1(),
kubeClient.CoreV1(), kubeClient, fakeKubeWatchClient, velerotest.NewLogger())
rs, err := factory.NewRestorer(ctx, restoreObj)

require.NoError(t, err)
Expand Down
9 changes: 9 additions & 0 deletions pkg/test/fake_controller_runtime_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
"github.com/stretchr/testify/require"
appsv1api "k8s.io/api/apps/v1"
corev1api "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -38,6 +39,8 @@ func NewFakeControllerRuntimeClientBuilder(t *testing.T) *k8sfake.ClientBuilder
require.NoError(t, err)
err = corev1api.AddToScheme(scheme)
require.NoError(t, err)
err = appsv1api.AddToScheme(scheme)
require.NoError(t, err)
err = snapshotv1api.AddToScheme(scheme)
require.NoError(t, err)
return k8sfake.NewClientBuilder().WithScheme(scheme)
Expand All @@ -51,7 +54,13 @@ func NewFakeControllerRuntimeClient(t *testing.T, initObjs ...runtime.Object) cl
require.NoError(t, err)
err = corev1api.AddToScheme(scheme)
require.NoError(t, err)
err = appsv1api.AddToScheme(scheme)
require.NoError(t, err)
err = snapshotv1api.AddToScheme(scheme)
require.NoError(t, err)
return k8sfake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(initObjs...).Build()
}

func NewFakeControllerRuntimeWatchClient(t *testing.T, initObjs ...runtime.Object) client.WithWatch {
return NewFakeControllerRuntimeClientBuilder(t).WithRuntimeObjects(initObjs...).Build()
}

0 comments on commit fefcb66

Please sign in to comment.