Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 8044: generic restore - allow to ignore WaitForFirstConsumer #8550

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelogs/unreleased/8550-Lyndon-Li
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix issue #8044, allow users to ignore WaitForFirtConsumer volumes' requirement of waiting for pod schedule for restorePVC of data mover
8 changes: 7 additions & 1 deletion pkg/cmd/cli/nodeagent/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,13 @@
s.logger.WithError(err).Fatal("Unable to create the data upload controller")
}

dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, podResources, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
var restorePVCConfig nodeagent.RestorePVC
if s.dataPathConfigs != nil && s.dataPathConfigs.RestorePVCConfig != nil {
restorePVCConfig = *s.dataPathConfigs.RestorePVCConfig
s.logger.Infof("Using customized restorePVC config %v", restorePVCConfig)
}

Check warning on line 360 in pkg/cmd/cli/nodeagent/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/nodeagent/server.go#L356-L360

Added lines #L356 - L360 were not covered by tests

dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, restorePVCConfig, podResources, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)

Check warning on line 362 in pkg/cmd/cli/nodeagent/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/nodeagent/server.go#L362

Added line #L362 was not covered by tests
if err = dataDownloadReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the data download controller")
}
Expand Down
14 changes: 12 additions & 2 deletions pkg/controller/data_download_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,15 @@ type DataDownloadReconciler struct {
restoreExposer exposer.GenericRestoreExposer
nodeName string
dataPathMgr *datapath.Manager
restorePVCConfig nodeagent.RestorePVC
podResources v1.ResourceRequirements
preparingTimeout time.Duration
metrics *metrics.ServerMetrics
}

func NewDataDownloadReconciler(client client.Client, mgr manager.Manager, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager,
podResources v1.ResourceRequirements, nodeName string, preparingTimeout time.Duration, logger logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataDownloadReconciler {
restorePVCConfig nodeagent.RestorePVC, podResources v1.ResourceRequirements, nodeName string, preparingTimeout time.Duration,
logger logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataDownloadReconciler {
return &DataDownloadReconciler{
client: client,
kubeClient: kubeClient,
Expand All @@ -79,6 +81,7 @@ func NewDataDownloadReconciler(client client.Client, mgr manager.Manager, kubeCl
Clock: &clock.RealClock{},
nodeName: nodeName,
restoreExposer: exposer.NewGenericRestoreExposer(kubeClient, logger),
restorePVCConfig: restorePVCConfig,
dataPathMgr: dataPathMgr,
podResources: podResources,
preparingTimeout: preparingTimeout,
Expand Down Expand Up @@ -194,7 +197,14 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
// Expose() will trigger to create one pod whose volume is restored by a given volume snapshot,
// but the pod maybe is not in the same node of the current controller, so we need to return it here.
// And then only the controller who is in the same node could do the rest work.
err = r.restoreExposer.Expose(ctx, getDataDownloadOwnerObject(dd), dd.Spec.TargetVolume.PVC, dd.Spec.TargetVolume.Namespace, hostingPodLabels, r.podResources, dd.Spec.OperationTimeout.Duration)
err = r.restoreExposer.Expose(ctx, getDataDownloadOwnerObject(dd), exposer.GenericRestoreExposeParam{
TargetPVCName: dd.Spec.TargetVolume.PVC,
SourceNamespace: dd.Spec.TargetVolume.Namespace,
HostingPodLabels: hostingPodLabels,
Resources: r.podResources,
ExposeTimeout: dd.Spec.OperationTimeout.Duration,
RestorePVCConfig: r.restorePVCConfig,
})
if err != nil {
if err := r.client.Get(ctx, req.NamespacedName, dd); err != nil {
if !apierrors.IsNotFound(err) {
Expand Down
5 changes: 3 additions & 2 deletions pkg/controller/data_download_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
datapathmockes "github.com/vmware-tanzu/velero/pkg/datapath/mocks"
"github.com/vmware-tanzu/velero/pkg/exposer"
"github.com/vmware-tanzu/velero/pkg/metrics"
"github.com/vmware-tanzu/velero/pkg/nodeagent"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
"github.com/vmware-tanzu/velero/pkg/uploader"

Expand Down Expand Up @@ -140,7 +141,7 @@ func initDataDownloadReconcilerWithError(objects []runtime.Object, needError ...

dataPathMgr := datapath.NewManager(1)

return NewDataDownloadReconciler(fakeClient, nil, fakeKubeClient, dataPathMgr, corev1.ResourceRequirements{}, "test-node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil
return NewDataDownloadReconciler(fakeClient, nil, fakeKubeClient, dataPathMgr, nodeagent.RestorePVC{}, corev1.ResourceRequirements{}, "test-node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil
}

func TestDataDownloadReconcile(t *testing.T) {
Expand Down Expand Up @@ -959,7 +960,7 @@ func (dt *ddResumeTestHelper) resumeCancellableDataPath(_ *DataUploadReconciler,
return dt.resumeErr
}

func (dt *ddResumeTestHelper) Expose(context.Context, corev1.ObjectReference, string, string, map[string]string, corev1.ResourceRequirements, time.Duration) error {
func (dt *ddResumeTestHelper) Expose(context.Context, corev1.ObjectReference, exposer.GenericRestoreExposeParam) error {
return nil
}

Expand Down
39 changes: 30 additions & 9 deletions pkg/exposer/generic_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,31 @@ import (
"github.com/vmware-tanzu/velero/pkg/util/kube"
)

// GenericRestoreExposeParam define the input param for Generic Restore Expose
type GenericRestoreExposeParam struct {
// TargetPVCName is the target volume name to be restored
TargetPVCName string

// SourceNamespace is the original namespace of the volume that the snapshot is taken for
SourceNamespace string

// HostingPodLabels is the labels that are going to apply to the hosting pod
HostingPodLabels map[string]string

// Resources defines the resource requirements of the hosting pod
Resources corev1.ResourceRequirements

// ExposeTimeout specifies the timeout for the entire expose process
ExposeTimeout time.Duration

// RestorePVCConfig is the config for restorePVC (intermediate PVC) of generic restore
RestorePVCConfig nodeagent.RestorePVC
}

// GenericRestoreExposer is the interfaces for a generic restore exposer
type GenericRestoreExposer interface {
// Expose starts the process to a restore expose, the expose process may take long time
Expose(context.Context, corev1.ObjectReference, string, string, map[string]string, corev1.ResourceRequirements, time.Duration) error
Expose(context.Context, corev1.ObjectReference, GenericRestoreExposeParam) error

// GetExposed polls the status of the expose.
// If the expose is accessible by the current caller, it waits the expose ready and returns the expose result.
Expand Down Expand Up @@ -74,25 +95,25 @@ type genericRestoreExposer struct {
log logrus.FieldLogger
}

func (e *genericRestoreExposer) Expose(ctx context.Context, ownerObject corev1.ObjectReference, targetPVCName string, sourceNamespace string, hostingPodLabels map[string]string, resources corev1.ResourceRequirements, timeout time.Duration) error {
func (e *genericRestoreExposer) Expose(ctx context.Context, ownerObject corev1.ObjectReference, param GenericRestoreExposeParam) error {
curLog := e.log.WithFields(logrus.Fields{
"owner": ownerObject.Name,
"target PVC": targetPVCName,
"source namespace": sourceNamespace,
"target PVC": param.TargetPVCName,
"source namespace": param.SourceNamespace,
})

selectedNode, targetPVC, err := kube.WaitPVCConsumed(ctx, e.kubeClient.CoreV1(), targetPVCName, sourceNamespace, e.kubeClient.StorageV1(), timeout)
selectedNode, targetPVC, err := kube.WaitPVCConsumed(ctx, e.kubeClient.CoreV1(), param.TargetPVCName, param.SourceNamespace, e.kubeClient.StorageV1(), param.ExposeTimeout, param.RestorePVCConfig.IgnoreWaitForFirstConsumer)
if err != nil {
return errors.Wrapf(err, "error to wait target PVC consumed, %s/%s", sourceNamespace, targetPVCName)
return errors.Wrapf(err, "error to wait target PVC consumed, %s/%s", param.SourceNamespace, param.TargetPVCName)
}

curLog.WithField("target PVC", targetPVCName).WithField("selected node", selectedNode).Info("Target PVC is consumed")
curLog.WithField("target PVC", param.TargetPVCName).WithField("selected node", selectedNode).Info("Target PVC is consumed")

if kube.IsPVCBound(targetPVC) {
return errors.Errorf("Target PVC %s/%s has already been bound, abort", sourceNamespace, targetPVCName)
return errors.Errorf("Target PVC %s/%s has already been bound, abort", param.SourceNamespace, param.TargetPVCName)
}

restorePod, err := e.createRestorePod(ctx, ownerObject, targetPVC, timeout, hostingPodLabels, selectedNode, resources)
restorePod, err := e.createRestorePod(ctx, ownerObject, targetPVC, param.ExposeTimeout, param.HostingPodLabels, selectedNode, param.Resources)
if err != nil {
return errors.Wrapf(err, "error to create restore pod")
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/exposer/generic_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,12 @@ func TestRestoreExpose(t *testing.T) {
}
}

err := exposer.Expose(context.Background(), ownerObject, test.targetPVCName, test.sourceNamespace, map[string]string{}, corev1.ResourceRequirements{}, time.Millisecond)
err := exposer.Expose(context.Background(), ownerObject, GenericRestoreExposeParam{
TargetPVCName: test.targetPVCName,
SourceNamespace: test.sourceNamespace,
HostingPodLabels: map[string]string{},
Resources: corev1.ResourceRequirements{},
ExposeTimeout: time.Millisecond})
assert.EqualError(t, err, test.err)
})
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/exposer/mocks/GenericRestoreExposer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions pkg/nodeagent/node_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ type BackupPVC struct {
SPCNoRelabeling bool `json:"spcNoRelabeling,omitempty"`
}

type RestorePVC struct {
// IgnoreWaitForFirstConsumer indicates to ignore the schedule status of restorePod if the PVC is in WaitForFirstConsumer mode
IgnoreWaitForFirstConsumer bool `json:"ignoreWaitForFirstConsumer,omitempty"`
}

type Configs struct {
// LoadConcurrency is the config for data path load concurrency per node.
LoadConcurrency *LoadConcurrency `json:"loadConcurrency,omitempty"`
Expand All @@ -88,6 +93,9 @@ type Configs struct {
// BackupPVCConfig is the config for backupPVC (intermediate PVC) of snapshot data movement
BackupPVCConfig map[string]BackupPVC `json:"backupPVC,omitempty"`

// RestoreVCConfig is the config for restorePVC (intermediate PVC) of generic restore
RestorePVCConfig *RestorePVC `json:"restorePVC,omitempty"`

// PodResources is the resource config for various types of pods launched by node-agent, i.e., data mover pods.
PodResources *kube.PodResources `json:"podResources,omitempty"`
}
Expand Down
24 changes: 13 additions & 11 deletions pkg/util/kube/pvc_pv.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func SetPVReclaimPolicy(ctx context.Context, pvGetter corev1client.CoreV1Interfa
// nothing if the consuming doesn't affect the PV provision.
// The latest PVC and the selected node will be returned.
func WaitPVCConsumed(ctx context.Context, pvcGetter corev1client.CoreV1Interface, pvc string, namespace string,
storageClient storagev1.StorageV1Interface, timeout time.Duration) (string, *corev1api.PersistentVolumeClaim, error) {
storageClient storagev1.StorageV1Interface, timeout time.Duration, ignoreConsume bool) (string, *corev1api.PersistentVolumeClaim, error) {
selectedNode := ""
var updated *corev1api.PersistentVolumeClaim
var storageClass *storagev1api.StorageClass
Expand All @@ -281,18 +281,20 @@ func WaitPVCConsumed(ctx context.Context, pvcGetter corev1client.CoreV1Interface
return false, errors.Wrapf(err, "error to get pvc %s/%s", namespace, pvc)
}

if tmpPVC.Spec.StorageClassName != nil && storageClass == nil {
storageClass, err = storageClient.StorageClasses().Get(ctx, *tmpPVC.Spec.StorageClassName, metav1.GetOptions{})
if err != nil {
return false, errors.Wrapf(err, "error to get storage class %s", *tmpPVC.Spec.StorageClassName)
if !ignoreConsume {
if tmpPVC.Spec.StorageClassName != nil && storageClass == nil {
storageClass, err = storageClient.StorageClasses().Get(ctx, *tmpPVC.Spec.StorageClassName, metav1.GetOptions{})
if err != nil {
return false, errors.Wrapf(err, "error to get storage class %s", *tmpPVC.Spec.StorageClassName)
}
}
}

if storageClass != nil {
if storageClass.VolumeBindingMode != nil && *storageClass.VolumeBindingMode == storagev1api.VolumeBindingWaitForFirstConsumer {
selectedNode = tmpPVC.Annotations[KubeAnnSelectedNode]
if selectedNode == "" {
return false, nil
if storageClass != nil {
if storageClass.VolumeBindingMode != nil && *storageClass.VolumeBindingMode == storagev1api.VolumeBindingWaitForFirstConsumer {
selectedNode = tmpPVC.Annotations[KubeAnnSelectedNode]
if selectedNode == "" {
return false, nil
}
}
}
}
Expand Down
29 changes: 20 additions & 9 deletions pkg/util/kube/pvc_pv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,14 +188,15 @@ func TestWaitPVCConsumed(t *testing.T) {
}

tests := []struct {
name string
pvcName string
pvcNamespace string
kubeClientObj []runtime.Object
kubeReactors []reactor
expectedPVC *corev1api.PersistentVolumeClaim
selectedNode string
err string
name string
pvcName string
pvcNamespace string
kubeClientObj []runtime.Object
kubeReactors []reactor
expectedPVC *corev1api.PersistentVolumeClaim
selectedNode string
ignoreWaitForFirstConsumer bool
err string
}{
{
name: "get pvc error",
Expand All @@ -212,6 +213,16 @@ func TestWaitPVCConsumed(t *testing.T) {
},
expectedPVC: pvcObject,
},
{
name: "success when ignore wait for first consumer",
pvcName: "fake-pvc-2",
pvcNamespace: "fake-namespace",
ignoreWaitForFirstConsumer: true,
kubeClientObj: []runtime.Object{
pvcObjectWithSC,
},
expectedPVC: pvcObjectWithSC,
},
{
name: "get sc fail",
pvcName: "fake-pvc-2",
Expand Down Expand Up @@ -274,7 +285,7 @@ func TestWaitPVCConsumed(t *testing.T) {

var kubeClient kubernetes.Interface = fakeKubeClient

selectedNode, pvc, err := WaitPVCConsumed(context.Background(), kubeClient.CoreV1(), test.pvcName, test.pvcNamespace, kubeClient.StorageV1(), time.Millisecond)
selectedNode, pvc, err := WaitPVCConsumed(context.Background(), kubeClient.CoreV1(), test.pvcName, test.pvcNamespace, kubeClient.StorageV1(), time.Millisecond, test.ignoreWaitForFirstConsumer)

if err != nil {
assert.EqualError(t, err, test.err)
Expand Down
Loading