From 4dea3a48e8a4cc56debe634d62b758ae5b2c92ce Mon Sep 17 00:00:00 2001 From: Lyndon-Li Date: Fri, 9 Aug 2024 14:34:48 +0800 Subject: [PATCH 1/3] data mover ms smoking test Signed-off-by: Lyndon-Li --- pkg/cmd/cli/datamover/backup.go | 1 + pkg/cmd/cli/datamover/restore.go | 1 + pkg/controller/data_download_controller.go | 72 ++++---- .../data_download_controller_test.go | 49 +++++- pkg/controller/data_upload_controller.go | 75 +++++---- pkg/controller/data_upload_controller_test.go | 156 ++++++++++-------- pkg/datamover/backup_micro_service.go | 14 +- pkg/datamover/backup_micro_service_test.go | 6 +- pkg/datamover/restore_micro_service.go | 10 +- pkg/datamover/restore_micro_service_test.go | 6 +- pkg/datapath/file_system.go | 46 +++++- pkg/datapath/file_system_test.go | 2 + pkg/datapath/micro_service_watcher.go | 94 ++++++----- pkg/datapath/micro_service_watcher_test.go | 2 +- 14 files changed, 324 insertions(+), 210 deletions(-) diff --git a/pkg/cmd/cli/datamover/backup.go b/pkg/cmd/cli/datamover/backup.go index 35f483d921..ca600faab5 100644 --- a/pkg/cmd/cli/datamover/backup.go +++ b/pkg/cmd/cli/datamover/backup.go @@ -233,6 +233,7 @@ func (s *dataMoverBackup) runDataPath() { result, err := dpService.RunCancelableDataPath(s.ctx) if err != nil { + dpService.Shutdown() s.cancelFunc() funcExitWithMessage(s.logger, false, "Failed to run data path service for DataUpload %s: %v", s.config.duName, err) return diff --git a/pkg/cmd/cli/datamover/restore.go b/pkg/cmd/cli/datamover/restore.go index fb46abd409..fc74a64f15 100644 --- a/pkg/cmd/cli/datamover/restore.go +++ b/pkg/cmd/cli/datamover/restore.go @@ -223,6 +223,7 @@ func (s *dataMoverRestore) runDataPath() { result, err := dpService.RunCancelableDataPath(s.ctx) if err != nil { s.cancelFunc() + dpService.Shutdown() funcExitWithMessage(s.logger, false, "Failed to run data path service for DataDownload %s: %v", s.config.ddName, err) return } diff --git a/pkg/controller/data_download_controller.go b/pkg/controller/data_download_controller.go index f0c0c1728d..12365e03cc 100644 --- a/pkg/controller/data_download_controller.go +++ b/pkg/controller/data_download_controller.go @@ -217,9 +217,9 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request } else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted { if dd.Spec.Cancel { log.Debugf("Data download is been canceled %s in Phase %s", dd.GetName(), dd.Status.Phase) - r.TryCancelDataDownload(ctx, dd, "") + r.tryCancelAcceptedDataDownload(ctx, dd, "") } else if peekErr := r.restoreExposer.PeekExposed(ctx, getDataDownloadOwnerObject(dd)); peekErr != nil { - r.TryCancelDataDownload(ctx, dd, fmt.Sprintf("found a dataupload %s/%s with expose error: %s. mark it as cancel", dd.Namespace, dd.Name, peekErr)) + r.tryCancelAcceptedDataDownload(ctx, dd, fmt.Sprintf("found a dataupload %s/%s with expose error: %s. mark it as cancel", dd.Namespace, dd.Name, peekErr)) log.Errorf("Cancel dd %s/%s because of expose error %s", dd.Namespace, dd.Name, peekErr) } else if dd.Status.StartTimestamp != nil { if time.Since(dd.Status.StartTimestamp.Time) >= r.preparingTimeout { @@ -272,23 +272,35 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request return r.errorOut(ctx, dd, err, "error to create data path", log) } } + + if err := r.initCancelableDataPath(ctx, asyncBR, result, log); err != nil { + log.WithError(err).Warnf("Failed to init cancelable data path for %s, will close and retry", dd.Name) + + r.closeDataPath(ctx, dd.Name) + return r.errorOut(ctx, dd, err, "error initializing data path", log) + } + // Update status to InProgress original := dd.DeepCopy() dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseInProgress dd.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} if err := r.client.Patch(ctx, dd, client.MergeFrom(original)); err != nil { - log.WithError(err).Error("Unable to update status to in progress") - return ctrl.Result{}, err + log.WithError(err).Warnf("Failed to update datadownload %s to InProgress, will close data path and retry", dd.Name) + + r.closeDataPath(ctx, dd.Name) + return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil } log.Info("Data download is marked as in progress") - reconcileResult, err := r.runCancelableDataPath(ctx, asyncBR, dd, result, log) - if err != nil { - log.Errorf("Failed to run cancelable data path for %s with err %v", dd.Name, err) + if err := r.startCancelableDataPath(asyncBR, dd, result, log); err != nil { + log.WithError(err).Errorf("Failed to start cancelable data path for %s", dd.Name) + r.closeDataPath(ctx, dd.Name) + return r.errorOut(ctx, dd, err, "error starting data path", log) } - return reconcileResult, err + + return ctrl.Result{}, nil } else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress { log.Info("Data download is in progress") if dd.Spec.Cancel { @@ -331,27 +343,33 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request } } -func (r *DataDownloadReconciler) runCancelableDataPath(ctx context.Context, asyncBR datapath.AsyncBR, dd *velerov2alpha1api.DataDownload, res *exposer.ExposeResult, log logrus.FieldLogger) (reconcile.Result, error) { +func (r *DataDownloadReconciler) initCancelableDataPath(ctx context.Context, asyncBR datapath.AsyncBR, res *exposer.ExposeResult, log logrus.FieldLogger) error { + log.Info("Init cancelable dataDownload") + if err := asyncBR.Init(ctx, nil); err != nil { - return r.errorOut(ctx, dd, err, "error to initialize asyncBR", log) + return errors.Wrap(err, "error initializing asyncBR") } log.Infof("async restore init for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName) + return nil +} + +func (r *DataDownloadReconciler) startCancelableDataPath(asyncBR datapath.AsyncBR, dd *velerov2alpha1api.DataDownload, res *exposer.ExposeResult, log logrus.FieldLogger) error { + log.Info("Start cancelable dataDownload") + if err := asyncBR.StartRestore(dd.Spec.SnapshotID, datapath.AccessPoint{ ByPath: res.ByPod.VolumeName, }, dd.Spec.DataMoverConfig); err != nil { - return r.errorOut(ctx, dd, err, fmt.Sprintf("error starting async restore for pod %s, volume %s", res.ByPod.HostingPod, res.ByPod.VolumeName), log) + return errors.Wrapf(err, "error starting async restore for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName) } - log.Infof("Async restore started for pod %s, volume %s", res.ByPod.HostingPod, res.ByPod.VolumeName) - return ctrl.Result{}, nil + log.Info("Async restore started for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName) + return nil } func (r *DataDownloadReconciler) OnDataDownloadCompleted(ctx context.Context, namespace string, ddName string, result datapath.Result) { - defer func() { - go r.closeDataPath(ctx, ddName) - }() + defer r.dataPathMgr.RemoveAsyncBR(ddName) log := r.logger.WithField("datadownload", ddName) log.Info("Async fs restore data path completed") @@ -384,9 +402,7 @@ func (r *DataDownloadReconciler) OnDataDownloadCompleted(ctx context.Context, na } func (r *DataDownloadReconciler) OnDataDownloadFailed(ctx context.Context, namespace string, ddName string, err error) { - defer func() { - go r.closeDataPath(ctx, ddName) - }() + defer r.dataPathMgr.RemoveAsyncBR(ddName) log := r.logger.WithField("datadownload", ddName) @@ -396,16 +412,12 @@ func (r *DataDownloadReconciler) OnDataDownloadFailed(ctx context.Context, names if getErr := r.client.Get(ctx, types.NamespacedName{Name: ddName, Namespace: namespace}, &dd); getErr != nil { log.WithError(getErr).Warn("Failed to get data download on failure") } else { - if _, errOut := r.errorOut(ctx, &dd, err, "data path restore failed", log); err != nil { - log.WithError(err).Warnf("Failed to patch data download with err %v", errOut) - } + r.errorOut(ctx, &dd, err, "data path restore failed", log) } } func (r *DataDownloadReconciler) OnDataDownloadCancelled(ctx context.Context, namespace string, ddName string) { - defer func() { - go r.closeDataPath(ctx, ddName) - }() + defer r.dataPathMgr.RemoveAsyncBR(ddName) log := r.logger.WithField("datadownload", ddName) @@ -432,9 +444,9 @@ func (r *DataDownloadReconciler) OnDataDownloadCancelled(ctx context.Context, na } } -func (r *DataDownloadReconciler) TryCancelDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload, message string) { +func (r *DataDownloadReconciler) tryCancelAcceptedDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload, message string) { log := r.logger.WithField("datadownload", dd.Name) - log.Warn("Async fs backup data path canceled") + log.Warn("Accepted data download is canceled") succeeded, err := r.exclusiveUpdateDataDownload(ctx, dd, func(dataDownload *velerov2alpha1api.DataDownload) { dataDownload.Status.Phase = velerov2alpha1api.DataDownloadPhaseCanceled @@ -442,7 +454,10 @@ func (r *DataDownloadReconciler) TryCancelDataDownload(ctx context.Context, dd * dataDownload.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} } dataDownload.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()} - dataDownload.Status.Message = message + + if message != "" { + dataDownload.Status.Message = message + } }) if err != nil { @@ -456,7 +471,6 @@ func (r *DataDownloadReconciler) TryCancelDataDownload(ctx context.Context, dd * // success update r.metrics.RegisterDataDownloadCancel(r.nodeName) r.restoreExposer.CleanUp(ctx, getDataDownloadOwnerObject(dd)) - r.closeDataPath(ctx, dd.Name) } func (r *DataDownloadReconciler) OnDataDownloadProgress(ctx context.Context, namespace string, ddName string, progress *uploader.Progress) { diff --git a/pkg/controller/data_download_controller_test.go b/pkg/controller/data_download_controller_test.go index c6c2697f7a..385870426f 100644 --- a/pkg/controller/data_download_controller_test.go +++ b/pkg/controller/data_download_controller_test.go @@ -192,6 +192,10 @@ func TestDataDownloadReconcile(t *testing.T) { isFSBRRestoreErr bool notNilExpose bool notMockCleanUp bool + mockInit bool + mockInitErr error + mockStart bool + mockStartErr error mockCancel bool mockClose bool expected *velerov2alpha1api.DataDownload @@ -264,13 +268,36 @@ func TestDataDownloadReconcile(t *testing.T) { expectedResult: &ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, }, { - name: "Unable to update status to in progress for data download", + name: "data path init error", dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(), targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), - needErrs: []bool{false, false, false, true}, + mockInit: true, + mockInitErr: errors.New("fake-data-path-init-error"), + mockClose: true, notNilExpose: true, - notMockCleanUp: true, - expectedStatusMsg: "Patch error", + expectedStatusMsg: "error initializing asyncBR: fake-data-path-init-error", + }, + { + name: "Unable to update status to in progress for data download", + dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(), + targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), + needErrs: []bool{false, false, false, true}, + mockInit: true, + mockClose: true, + notNilExpose: true, + notMockCleanUp: true, + expectedResult: &ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, + }, + { + name: "data path start error", + dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(), + targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), + mockInit: true, + mockStart: true, + mockStartErr: errors.New("fake-data-path-start-error"), + mockClose: true, + notNilExpose: true, + expectedStatusMsg: "error starting async restore for pod test-name, volume test-pvc: fake-data-path-start-error", }, { name: "accept DataDownload error", @@ -399,6 +426,14 @@ func TestDataDownloadReconcile(t *testing.T) { datapath.MicroServiceBRWatcherCreator = func(kbclient.Client, kubernetes.Interface, manager.Manager, string, string, string, string, string, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR { asyncBR := datapathmockes.NewAsyncBR(t) + if test.mockInit { + asyncBR.On("Init", mock.Anything, mock.Anything).Return(test.mockInitErr) + } + + if test.mockStart { + asyncBR.On("StartRestore", mock.Anything, mock.Anything, mock.Anything).Return(test.mockStartErr) + } + if test.mockCancel { asyncBR.On("Cancel").Return() } @@ -488,6 +523,10 @@ func TestDataDownloadReconcile(t *testing.T) { assert.True(t, true, apierrors.IsNotFound(err)) } + if !test.needCreateFSBR { + assert.Nil(t, r.dataPathMgr.GetAsyncBR(test.dd.Name)) + } + t.Logf("%s: \n %v \n", test.name, dd) }) } @@ -845,7 +884,7 @@ func TestTryCancelDataDownload(t *testing.T) { err = r.client.Create(ctx, test.dd) require.NoError(t, err) - r.TryCancelDataDownload(ctx, test.dd, "") + r.tryCancelAcceptedDataDownload(ctx, test.dd, "") if test.expectedErr == "" { assert.NoError(t, err) diff --git a/pkg/controller/data_upload_controller.go b/pkg/controller/data_upload_controller.go index 91413a8ccf..b10f1f6367 100644 --- a/pkg/controller/data_upload_controller.go +++ b/pkg/controller/data_upload_controller.go @@ -230,9 +230,9 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) // we don't want to update CR into cancel status forcely as it may conflict with CR update in Expose action // we could retry when the CR requeue in periodcally log.Debugf("Data upload is been canceled %s in Phase %s", du.GetName(), du.Status.Phase) - r.TryCancelDataUpload(ctx, du, "") + r.tryCancelAcceptedDataUpload(ctx, du, "") } else if peekErr := ep.PeekExposed(ctx, getOwnerObject(du)); peekErr != nil { - r.TryCancelDataUpload(ctx, du, fmt.Sprintf("found a dataupload %s/%s with expose error: %s. mark it as cancel", du.Namespace, du.Name, peekErr)) + r.tryCancelAcceptedDataUpload(ctx, du, fmt.Sprintf("found a dataupload %s/%s with expose error: %s. mark it as cancel", du.Namespace, du.Name, peekErr)) log.Errorf("Cancel du %s/%s because of expose error %s", du.Namespace, du.Name, peekErr) } else if du.Status.StartTimestamp != nil { if time.Since(du.Status.StartTimestamp.Time) >= r.preparingTimeout { @@ -283,21 +283,35 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) return r.errorOut(ctx, du, err, "error to create data path", log) } } + + if err := r.initCancelableDataPath(ctx, asyncBR, res, log); err != nil { + log.WithError(err).Warnf("Failed to init cancelable data path for %s, will close and retry", du.Name) + + r.closeDataPath(ctx, du.Name) + return r.errorOut(ctx, du, err, "error initializing data path", log) + } + // Update status to InProgress original := du.DeepCopy() du.Status.Phase = velerov2alpha1api.DataUploadPhaseInProgress du.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} if err := r.client.Patch(ctx, du, client.MergeFrom(original)); err != nil { - return r.errorOut(ctx, du, err, "error updating dataupload status", log) + log.WithError(err).Warnf("Failed to update dataupload %s to InProgress, will data path close and retry", du.Name) + + r.closeDataPath(ctx, du.Name) + return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil } log.Info("Data upload is marked as in progress") - result, err := r.runCancelableDataUpload(ctx, asyncBR, du, res, log) - if err != nil { - log.Errorf("Failed to run cancelable data path for %s with err %v", du.Name, err) + + if err := r.startCancelableDataPath(asyncBR, du, res, log); err != nil { + log.WithError(err).Errorf("Failed to start cancelable data path for %s", du.Name) r.closeDataPath(ctx, du.Name) + + return r.errorOut(ctx, du, err, "error starting data path", log) } - return result, err + + return ctrl.Result{}, nil } else if du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress { log.Info("Data upload is in progress") if du.Spec.Cancel { @@ -340,29 +354,33 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) } } -func (r *DataUploadReconciler) runCancelableDataUpload(ctx context.Context, asyncBR datapath.AsyncBR, du *velerov2alpha1api.DataUpload, res *exposer.ExposeResult, log logrus.FieldLogger) (reconcile.Result, error) { - log.Info("Run cancelable dataUpload") +func (r *DataUploadReconciler) initCancelableDataPath(ctx context.Context, asyncBR datapath.AsyncBR, res *exposer.ExposeResult, log logrus.FieldLogger) error { + log.Info("Init cancelable dataUpload") if err := asyncBR.Init(ctx, nil); err != nil { - return r.errorOut(ctx, du, err, "error to initialize asyncBR", log) + return errors.Wrap(err, "error initializing asyncBR") } log.Infof("async backup init for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName) + return nil +} + +func (r *DataUploadReconciler) startCancelableDataPath(asyncBR datapath.AsyncBR, du *velerov2alpha1api.DataUpload, res *exposer.ExposeResult, log logrus.FieldLogger) error { + log.Info("Start cancelable dataUpload") + if err := asyncBR.StartBackup(datapath.AccessPoint{ ByPath: res.ByPod.VolumeName, }, du.Spec.DataMoverConfig, nil); err != nil { - return r.errorOut(ctx, du, err, fmt.Sprintf("error starting async backup for pod %s, volume %s", res.ByPod.HostingPod, res.ByPod.VolumeName), log) + return errors.Wrapf(err, "error starting async backup for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName) } - log.Infof("Async backup started for pod %s, volume %s", res.ByPod.HostingPod, res.ByPod.VolumeName) - return ctrl.Result{}, nil + log.Info("Async backup started for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName) + return nil } func (r *DataUploadReconciler) OnDataUploadCompleted(ctx context.Context, namespace string, duName string, result datapath.Result) { - defer func() { - go r.closeDataPath(ctx, duName) - }() + defer r.dataPathMgr.RemoveAsyncBR(duName) log := r.logger.WithField("dataupload", duName) @@ -406,9 +424,7 @@ func (r *DataUploadReconciler) OnDataUploadCompleted(ctx context.Context, namesp } func (r *DataUploadReconciler) OnDataUploadFailed(ctx context.Context, namespace, duName string, err error) { - defer func() { - go r.closeDataPath(ctx, duName) - }() + defer r.dataPathMgr.RemoveAsyncBR(duName) log := r.logger.WithField("dataupload", duName) @@ -418,16 +434,12 @@ func (r *DataUploadReconciler) OnDataUploadFailed(ctx context.Context, namespace if getErr := r.client.Get(ctx, types.NamespacedName{Name: duName, Namespace: namespace}, &du); getErr != nil { log.WithError(getErr).Warn("Failed to get dataupload on failure") } else { - if _, errOut := r.errorOut(ctx, &du, err, "data path backup failed", log); err != nil { - log.WithError(err).Warnf("Failed to patch dataupload with err %v", errOut) - } + r.errorOut(ctx, &du, err, "data path backup failed", log) } } func (r *DataUploadReconciler) OnDataUploadCancelled(ctx context.Context, namespace string, duName string) { - defer func() { - go r.closeDataPath(ctx, duName) - }() + defer r.dataPathMgr.RemoveAsyncBR(duName) log := r.logger.WithField("dataupload", duName) @@ -453,17 +465,19 @@ func (r *DataUploadReconciler) OnDataUploadCancelled(ctx context.Context, namesp } } -// TryCancelDataUpload clear up resources only when update success -func (r *DataUploadReconciler) TryCancelDataUpload(ctx context.Context, du *velerov2alpha1api.DataUpload, message string) { +func (r *DataUploadReconciler) tryCancelAcceptedDataUpload(ctx context.Context, du *velerov2alpha1api.DataUpload, message string) { log := r.logger.WithField("dataupload", du.Name) - log.Warn("Async fs backup data path canceled") + log.Warn("Accepted data upload is canceled") succeeded, err := r.exclusiveUpdateDataUpload(ctx, du, func(dataUpload *velerov2alpha1api.DataUpload) { dataUpload.Status.Phase = velerov2alpha1api.DataUploadPhaseCanceled if dataUpload.Status.StartTimestamp.IsZero() { dataUpload.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} } dataUpload.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()} - dataUpload.Status.Message = message + + if message != "" { + dataUpload.Status.Message = message + } }) if err != nil { @@ -478,7 +492,6 @@ func (r *DataUploadReconciler) TryCancelDataUpload(ctx context.Context, du *vele r.metrics.RegisterDataUploadCancel(r.nodeName) // cleans up any objects generated during the snapshot expose r.cleanUp(ctx, du, log) - r.closeDataPath(ctx, du.Name) } func (r *DataUploadReconciler) cleanUp(ctx context.Context, du *velerov2alpha1api.DataUpload, log *logrus.Entry) { @@ -692,7 +705,7 @@ func (r *DataUploadReconciler) errorOut(ctx context.Context, du *velerov2alpha1a } se.CleanUp(ctx, getOwnerObject(du), volumeSnapshotName, du.Spec.SourceNamespace) } else { - err = errors.Wrapf(err, "failed to clean up exposed snapshot with could not find %s snapshot exposer", du.Spec.SnapshotType) + log.Warnf("failed to clean up exposed snapshot could not find %s snapshot exposer", du.Spec.SnapshotType) } return ctrl.Result{}, r.updateStatusToFailed(ctx, du, err, msg, log) diff --git a/pkg/controller/data_upload_controller_test.go b/pkg/controller/data_upload_controller_test.go index ee73372b1b..ea7603b904 100644 --- a/pkg/controller/data_upload_controller_test.go +++ b/pkg/controller/data_upload_controller_test.go @@ -306,20 +306,16 @@ type fakeDataUploadFSBR struct { du *velerov2alpha1api.DataUpload kubeClient kbclient.Client clock clock.WithTickerAndDelayedExecution + initErr error + startErr error } func (f *fakeDataUploadFSBR) Init(ctx context.Context, param interface{}) error { - return nil + return f.initErr } func (f *fakeDataUploadFSBR) StartBackup(source datapath.AccessPoint, uploaderConfigs map[string]string, param interface{}) error { - du := f.du - original := f.du.DeepCopy() - du.Status.Phase = velerov2alpha1api.DataUploadPhaseCompleted - du.Status.CompletionTimestamp = &metav1.Time{Time: f.clock.Now()} - f.kubeClient.Patch(context.Background(), du, kbclient.MergeFrom(original)) - - return nil + return f.startErr } func (f *fakeDataUploadFSBR) StartRestore(snapshotID string, target datapath.AccessPoint, uploaderConfigs map[string]string) error { @@ -348,27 +344,24 @@ func TestReconcile(t *testing.T) { needErrs []bool peekErr error notCreateFSBR bool + fsBRInitErr error + fsBRStartErr error }{ { - name: "Dataupload is not initialized", - du: builder.ForDataUpload("unknown-ns", "unknown-name").Result(), - expectedProcessed: false, - expected: nil, - expectedRequeue: ctrl.Result{}, + name: "Dataupload is not initialized", + du: builder.ForDataUpload("unknown-ns", "unknown-name").Result(), + expectedRequeue: ctrl.Result{}, }, { - name: "Error get Dataupload", - du: builder.ForDataUpload(velerov1api.DefaultNamespace, "unknown-name").Result(), - expectedProcessed: false, - expected: nil, - expectedRequeue: ctrl.Result{}, - expectedErrMsg: "getting DataUpload: Get error", - needErrs: []bool{true, false, false, false}, + name: "Error get Dataupload", + du: builder.ForDataUpload(velerov1api.DefaultNamespace, "unknown-name").Result(), + expectedRequeue: ctrl.Result{}, + expectedErrMsg: "getting DataUpload: Get error", + needErrs: []bool{true, false, false, false}, }, { - name: "Unsupported data mover type", - du: dataUploadBuilder().DataMover("unknown type").Result(), - expectedProcessed: false, - expected: dataUploadBuilder().Phase("").Result(), - expectedRequeue: ctrl.Result{}, + name: "Unsupported data mover type", + du: dataUploadBuilder().DataMover("unknown type").Result(), + expected: dataUploadBuilder().Phase("").Result(), + expectedRequeue: ctrl.Result{}, }, { name: "Unknown type of snapshot exposer is not initialized", du: dataUploadBuilder().SnapshotType("unknown type").Result(), @@ -377,13 +370,12 @@ func TestReconcile(t *testing.T) { expectedRequeue: ctrl.Result{}, expectedErrMsg: "unknown type type of snapshot exposer is not exist", }, { - name: "Dataupload should be accepted", - du: dataUploadBuilder().Result(), - pod: builder.ForPod("fake-ns", dataUploadName).Volumes(&corev1.Volume{Name: "test-pvc"}).Result(), - pvc: builder.ForPersistentVolumeClaim("fake-ns", "test-pvc").Result(), - expectedProcessed: false, - expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(), - expectedRequeue: ctrl.Result{}, + name: "Dataupload should be accepted", + du: dataUploadBuilder().Result(), + pod: builder.ForPod("fake-ns", dataUploadName).Volumes(&corev1.Volume{Name: "test-pvc"}).Result(), + pvc: builder.ForPersistentVolumeClaim("fake-ns", "test-pvc").Result(), + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(), + expectedRequeue: ctrl.Result{}, }, { name: "Dataupload should fail to get PVC information", @@ -395,34 +387,31 @@ func TestReconcile(t *testing.T) { expectedErrMsg: "failed to get PVC", }, { - name: "Dataupload should be prepared", - du: dataUploadBuilder().SnapshotType(fakeSnapshotType).Result(), - expectedProcessed: false, - expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(), - expectedRequeue: ctrl.Result{}, - }, { - name: "Dataupload prepared should be completed", - pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), - du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(), - expectedProcessed: true, - expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseCompleted).Result(), - expectedRequeue: ctrl.Result{}, + name: "Dataupload should be prepared", + du: dataUploadBuilder().SnapshotType(fakeSnapshotType).Result(), + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(), + expectedRequeue: ctrl.Result{}, }, { - name: "Dataupload with not enabled cancel", - pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), - du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).SnapshotType(fakeSnapshotType).Cancel(false).Result(), - expectedProcessed: false, - expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(), - expectedRequeue: ctrl.Result{}, + name: "Dataupload prepared should be completed", + pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(), + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(), + expectedRequeue: ctrl.Result{}, }, { - name: "Dataupload should be cancel", - pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), - du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).SnapshotType(fakeSnapshotType).Cancel(true).Result(), - expectedProcessed: false, - expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseCanceling).Result(), - expectedRequeue: ctrl.Result{}, + name: "Dataupload with not enabled cancel", + pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).SnapshotType(fakeSnapshotType).Cancel(false).Result(), + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(), + expectedRequeue: ctrl.Result{}, + }, + { + name: "Dataupload should be cancel", + pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).SnapshotType(fakeSnapshotType).Cancel(true).Result(), + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseCanceling).Result(), + expectedRequeue: ctrl.Result{}, }, { name: "Dataupload should be cancel with match node", @@ -445,19 +434,43 @@ func TestReconcile(t *testing.T) { du.Status.Node = "different_node" return du }(), - expectedProcessed: false, - expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(), - expectedRequeue: ctrl.Result{}, - notCreateFSBR: true, + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(), + expectedRequeue: ctrl.Result{}, + notCreateFSBR: true, + }, + { + name: "runCancelableDataUpload is concurrent limited", + dataMgr: datapath.NewManager(0), + pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(), + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(), + expectedRequeue: ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, }, { - name: "runCancelableDataUpload is concurrent limited", - dataMgr: datapath.NewManager(0), + name: "data path init error", pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(), - expectedProcessed: false, - expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(), - expectedRequeue: ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, + fsBRInitErr: errors.New("fake-data-path-init-error"), + expectedProcessed: true, + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).SnapshotType(fakeSnapshotType).Result(), + expectedErrMsg: "error initializing asyncBR: fake-data-path-init-error", + }, + { + name: "Unable to update status to in progress for data download", + pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(), + needErrs: []bool{false, false, false, true}, + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(), + expectedRequeue: ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, + }, + { + name: "data path start error", + pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(), + fsBRStartErr: errors.New("fake-data-path-start-error"), + expectedProcessed: true, + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).SnapshotType(fakeSnapshotType).Result(), + expectedErrMsg: "error starting async backup for pod dataupload-1, volume dataupload-1: fake-data-path-start-error", }, { name: "prepare timeout", @@ -480,7 +493,6 @@ func TestReconcile(t *testing.T) { du.DeletionTimestamp = &metav1.Time{Time: time.Now()} return du }(), - expectedProcessed: false, checkFunc: func(du velerov2alpha1api.DataUpload) bool { return du.Spec.Cancel }, @@ -496,7 +508,6 @@ func TestReconcile(t *testing.T) { du.DeletionTimestamp = &metav1.Time{Time: time.Now()} return du }(), - expectedProcessed: false, checkFunc: func(du velerov2alpha1api.DataUpload) bool { return !controllerutil.ContainsFinalizer(&du, DataUploadDownloadFinalizer) }, @@ -555,12 +566,16 @@ func TestReconcile(t *testing.T) { du: test.du, kubeClient: r.client, clock: r.Clock, + initErr: test.fsBRInitErr, + startErr: test.fsBRStartErr, } } } + testCreateFsBR := false if test.du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress && !test.notCreateFSBR { if fsBR := r.dataPathMgr.GetAsyncBR(test.du.Name); fsBR == nil { + testCreateFsBR = true _, err := r.dataPathMgr.CreateMicroServiceBRWatcher(ctx, r.client, nil, nil, datapath.TaskTypeBackup, test.du.Name, velerov1api.DefaultNamespace, "", "", "", datapath.Callbacks{OnCancelled: r.OnDataUploadCancelled}, false, velerotest.NewLogger()) require.NoError(t, err) } @@ -605,6 +620,11 @@ func TestReconcile(t *testing.T) { if test.checkFunc != nil { assert.True(t, test.checkFunc(du)) } + + if !testCreateFsBR && du.Status.Phase != velerov2alpha1api.DataUploadPhaseInProgress { + assert.Nil(t, r.dataPathMgr.GetAsyncBR(test.du.Name)) + } + }) } } @@ -926,7 +946,7 @@ func TestTryCancelDataUpload(t *testing.T) { err = r.client.Create(ctx, test.dd) require.NoError(t, err) - r.TryCancelDataUpload(ctx, test.dd, "") + r.tryCancelAcceptedDataUpload(ctx, test.dd, "") if test.expectedErr == "" { assert.NoError(t, err) diff --git a/pkg/datamover/backup_micro_service.go b/pkg/datamover/backup_micro_service.go index cedacc0ce4..fa48b3523e 100644 --- a/pkg/datamover/backup_micro_service.go +++ b/pkg/datamover/backup_micro_service.go @@ -127,15 +127,13 @@ func (r *BackupMicroService) Init() error { return err } -var waitControllerTimeout time.Duration = time.Minute * 2 - func (r *BackupMicroService) RunCancelableDataPath(ctx context.Context) (string, error) { log := r.logger.WithFields(logrus.Fields{ "dataupload": r.dataUploadName, }) du := &velerov2alpha1api.DataUpload{} - err := wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, waitControllerTimeout, true, func(ctx context.Context) (bool, error) { + err := wait.PollUntilContextCancel(ctx, 500*time.Millisecond, true, func(ctx context.Context) (bool, error) { err := r.client.Get(ctx, types.NamespacedName{ Namespace: r.namespace, Name: r.dataUploadName, @@ -241,8 +239,6 @@ func (r *BackupMicroService) Shutdown() { var funcMarshal = json.Marshal func (r *BackupMicroService) OnDataUploadCompleted(ctx context.Context, namespace string, duName string, result datapath.Result) { - defer r.closeDataPath(ctx, duName) - log := r.logger.WithField("dataupload", duName) backupBytes, err := funcMarshal(result.Backup) @@ -262,8 +258,6 @@ func (r *BackupMicroService) OnDataUploadCompleted(ctx context.Context, namespac } func (r *BackupMicroService) OnDataUploadFailed(ctx context.Context, namespace string, duName string, err error) { - defer r.closeDataPath(ctx, duName) - log := r.logger.WithField("dataupload", duName) log.WithError(err).Error("Async fs backup data path failed") @@ -274,8 +268,6 @@ func (r *BackupMicroService) OnDataUploadFailed(ctx context.Context, namespace s } func (r *BackupMicroService) OnDataUploadCancelled(ctx context.Context, namespace string, duName string) { - defer r.closeDataPath(ctx, duName) - log := r.logger.WithField("dataupload", duName) log.Warn("Async fs backup data path canceled") @@ -296,8 +288,6 @@ func (r *BackupMicroService) OnDataUploadProgress(ctx context.Context, namespace return } - log.Infof("Sending event for progress %v (%s)", progress, string(progressBytes)) - r.eventRecorder.Event(r.dataUpload, false, datapath.EventReasonProgress, string(progressBytes)) } @@ -313,7 +303,7 @@ func (r *BackupMicroService) closeDataPath(ctx context.Context, duName string) { func (r *BackupMicroService) cancelDataUpload(du *velerov2alpha1api.DataUpload) { r.logger.WithField("DataUpload", du.Name).Info("Data upload is being canceled") - r.eventRecorder.Event(du, false, "Canceling", "Canceling for data upload %s", du.Name) + r.eventRecorder.Event(du, false, datapath.EventReasonCancelling, "Canceling for data upload %s", du.Name) fsBackup := r.dataPathMgr.GetAsyncBR(du.Name) if fsBackup == nil { diff --git a/pkg/datamover/backup_micro_service_test.go b/pkg/datamover/backup_micro_service_test.go index 9db38d4c0d..cca428ee88 100644 --- a/pkg/datamover/backup_micro_service_test.go +++ b/pkg/datamover/backup_micro_service_test.go @@ -301,12 +301,12 @@ func TestRunCancelableDataPath(t *testing.T) { }{ { name: "no du", - ctx: context.Background(), + ctx: ctxTimeout, expectedErr: "error waiting for du: context deadline exceeded", }, { name: "du not in in-progress", - ctx: context.Background(), + ctx: ctxTimeout, kubeClientObj: []runtime.Object{du}, expectedErr: "error waiting for du: context deadline exceeded", }, @@ -412,8 +412,6 @@ func TestRunCancelableDataPath(t *testing.T) { return fsBR } - waitControllerTimeout = time.Second - if test.result != nil { go func() { time.Sleep(time.Millisecond * 500) diff --git a/pkg/datamover/restore_micro_service.go b/pkg/datamover/restore_micro_service.go index 508469701a..d0a4c6f50c 100644 --- a/pkg/datamover/restore_micro_service.go +++ b/pkg/datamover/restore_micro_service.go @@ -122,7 +122,7 @@ func (r *RestoreMicroService) RunCancelableDataPath(ctx context.Context) (string }) dd := &velerov2alpha1api.DataDownload{} - err := wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, waitControllerTimeout, true, func(ctx context.Context) (bool, error) { + err := wait.PollUntilContextCancel(ctx, 500*time.Millisecond, true, func(ctx context.Context) (bool, error) { err := r.client.Get(ctx, types.NamespacedName{ Namespace: r.namespace, Name: r.dataDownloadName, @@ -214,8 +214,6 @@ func (r *RestoreMicroService) Shutdown() { } func (r *RestoreMicroService) OnDataDownloadCompleted(ctx context.Context, namespace string, ddName string, result datapath.Result) { - defer r.closeDataPath(ctx, ddName) - log := r.logger.WithField("datadownload", ddName) restoreBytes, err := funcMarshal(result.Restore) @@ -235,8 +233,6 @@ func (r *RestoreMicroService) OnDataDownloadCompleted(ctx context.Context, names } func (r *RestoreMicroService) OnDataDownloadFailed(ctx context.Context, namespace string, ddName string, err error) { - defer r.closeDataPath(ctx, ddName) - log := r.logger.WithField("datadownload", ddName) log.WithError(err).Error("Async fs restore data path failed") @@ -247,8 +243,6 @@ func (r *RestoreMicroService) OnDataDownloadFailed(ctx context.Context, namespac } func (r *RestoreMicroService) OnDataDownloadCancelled(ctx context.Context, namespace string, ddName string) { - defer r.closeDataPath(ctx, ddName) - log := r.logger.WithField("datadownload", ddName) log.Warn("Async fs restore data path canceled") @@ -284,7 +278,7 @@ func (r *RestoreMicroService) closeDataPath(ctx context.Context, ddName string) func (r *RestoreMicroService) cancelDataDownload(dd *velerov2alpha1api.DataDownload) { r.logger.WithField("DataDownload", dd.Name).Info("Data download is being canceled") - r.eventRecorder.Event(dd, false, "Canceling", "Canceling for data download %s", dd.Name) + r.eventRecorder.Event(dd, false, datapath.EventReasonCancelling, "Canceling for data download %s", dd.Name) fsBackup := r.dataPathMgr.GetAsyncBR(dd.Name) if fsBackup == nil { diff --git a/pkg/datamover/restore_micro_service_test.go b/pkg/datamover/restore_micro_service_test.go index e3ef8701da..8a3ed61e1f 100644 --- a/pkg/datamover/restore_micro_service_test.go +++ b/pkg/datamover/restore_micro_service_test.go @@ -254,12 +254,12 @@ func TestRunCancelableRestore(t *testing.T) { }{ { name: "no dd", - ctx: context.Background(), + ctx: ctxTimeout, expectedErr: "error waiting for dd: context deadline exceeded", }, { name: "dd not in in-progress", - ctx: context.Background(), + ctx: ctxTimeout, kubeClientObj: []runtime.Object{dd}, expectedErr: "error waiting for dd: context deadline exceeded", }, @@ -365,8 +365,6 @@ func TestRunCancelableRestore(t *testing.T) { return fsBR } - waitControllerTimeout = time.Second - if test.result != nil { go func() { time.Sleep(time.Millisecond * 500) diff --git a/pkg/datapath/file_system.go b/pkg/datapath/file_system.go index 762c91b188..5d3b54f281 100644 --- a/pkg/datapath/file_system.go +++ b/pkg/datapath/file_system.go @@ -18,6 +18,7 @@ package datapath import ( "context" + "sync" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -66,6 +67,8 @@ type fileSystemBR struct { callbacks Callbacks jobName string requestorType string + wgDataPath sync.WaitGroup + dataPathLock sync.Mutex } func newFileSystemBR(jobName string, requestorType string, client client.Client, namespace string, callbacks Callbacks, log logrus.FieldLogger) AsyncBR { @@ -75,6 +78,7 @@ func newFileSystemBR(jobName string, requestorType string, client client.Client, client: client, namespace: namespace, callbacks: callbacks, + wgDataPath: sync.WaitGroup{}, log: log, } @@ -134,6 +138,23 @@ func (fs *fileSystemBR) Init(ctx context.Context, param interface{}) error { } func (fs *fileSystemBR) Close(ctx context.Context) { + if fs.cancel != nil { + fs.cancel() + } + + fs.log.WithField("user", fs.jobName).Info("Closing FileSystemBR") + + fs.wgDataPath.Wait() + + fs.close(ctx) + + fs.log.WithField("user", fs.jobName).Info("FileSystemBR is closed") +} + +func (fs *fileSystemBR) close(ctx context.Context) { + fs.dataPathLock.Lock() + defer fs.dataPathLock.Unlock() + if fs.uploaderProv != nil { if err := fs.uploaderProv.Close(ctx); err != nil { fs.log.Errorf("failed to close uploader provider with error %v", err) @@ -141,13 +162,6 @@ func (fs *fileSystemBR) Close(ctx context.Context) { fs.uploaderProv = nil } - - if fs.cancel != nil { - fs.cancel() - fs.cancel = nil - } - - fs.log.WithField("user", fs.jobName).Info("FileSystemBR is closed") } func (fs *fileSystemBR) StartBackup(source AccessPoint, uploaderConfig map[string]string, param interface{}) error { @@ -155,9 +169,18 @@ func (fs *fileSystemBR) StartBackup(source AccessPoint, uploaderConfig map[strin return errors.New("file system data path is not initialized") } + fs.wgDataPath.Add(1) + backupParam := param.(*FSBRStartParam) go func() { + fs.log.Info("Start data path backup") + + defer func() { + fs.close(context.Background()) + fs.wgDataPath.Done() + }() + snapshotID, emptySnapshot, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, backupParam.RealSource, backupParam.Tags, backupParam.ForceFull, backupParam.ParentSnapshot, source.VolMode, uploaderConfig, fs) @@ -182,7 +205,16 @@ func (fs *fileSystemBR) StartRestore(snapshotID string, target AccessPoint, uplo return errors.New("file system data path is not initialized") } + fs.wgDataPath.Add(1) + go func() { + fs.log.Info("Start data path restore") + + defer func() { + fs.close(context.Background()) + fs.wgDataPath.Done() + }() + err := fs.uploaderProv.RunRestore(fs.ctx, snapshotID, target.ByPath, target.VolMode, uploaderConfigs, fs) if err == provider.ErrorCanceled { diff --git a/pkg/datapath/file_system_test.go b/pkg/datapath/file_system_test.go index 85c6df08d9..fab33df1c0 100644 --- a/pkg/datapath/file_system_test.go +++ b/pkg/datapath/file_system_test.go @@ -96,6 +96,7 @@ func TestAsyncBackup(t *testing.T) { fs := newFileSystemBR("job-1", "test", nil, "velero", Callbacks{}, velerotest.NewLogger()).(*fileSystemBR) mockProvider := providerMock.NewProvider(t) mockProvider.On("RunBackup", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(test.result.Backup.SnapshotID, test.result.Backup.EmptySnapshot, test.err) + mockProvider.On("Close", mock.Anything).Return(nil) fs.uploaderProv = mockProvider fs.initialized = true fs.callbacks = test.callbacks @@ -179,6 +180,7 @@ func TestAsyncRestore(t *testing.T) { fs := newFileSystemBR("job-1", "test", nil, "velero", Callbacks{}, velerotest.NewLogger()).(*fileSystemBR) mockProvider := providerMock.NewProvider(t) mockProvider.On("RunRestore", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(test.err) + mockProvider.On("Close", mock.Anything).Return(nil) fs.uploaderProv = mockProvider fs.initialized = true fs.callbacks = test.callbacks diff --git a/pkg/datapath/micro_service_watcher.go b/pkg/datapath/micro_service_watcher.go index a5826459a4..8a129bde8f 100644 --- a/pkg/datapath/micro_service_watcher.go +++ b/pkg/datapath/micro_service_watcher.go @@ -46,11 +46,12 @@ const ( ErrCancelled = "data path is canceled" - EventReasonStarted = "Data-Path-Started" - EventReasonCompleted = "Data-Path-Completed" - EventReasonFailed = "Data-Path-Failed" - EventReasonCancelled = "Data-Path-Canceled" - EventReasonProgress = "Data-Path-Progress" + EventReasonStarted = "Data-Path-Started" + EventReasonCompleted = "Data-Path-Completed" + EventReasonFailed = "Data-Path-Failed" + EventReasonCancelled = "Data-Path-Canceled" + EventReasonProgress = "Data-Path-Progress" + EventReasonCancelling = "Data-Path-Canceling" ) type microServiceBRWatcher struct { @@ -76,6 +77,7 @@ type microServiceBRWatcher struct { podInformer ctrlcache.Informer eventHandler cache.ResourceEventHandlerRegistration podHandler cache.ResourceEventHandlerRegistration + watcherLock sync.Mutex } func newMicroServiceBRWatcher(client client.Client, kubeClient kubernetes.Interface, mgr manager.Manager, taskType string, taskName string, namespace string, @@ -121,8 +123,6 @@ func (ms *microServiceBRWatcher) Init(ctx context.Context, param interface{}) er return } - ms.log.Infof("Pushed adding event %s/%s, message %s for object %v", evt.Namespace, evt.Name, evt.Message, evt.InvolvedObject) - ms.eventCh <- evt }, UpdateFunc: func(_, obj interface{}) { @@ -131,8 +131,6 @@ func (ms *microServiceBRWatcher) Init(ctx context.Context, param interface{}) er return } - ms.log.Infof("Pushed updating event %s/%s, message %s for object %v", evt.Namespace, evt.Name, evt.Message, evt.InvolvedObject) - ms.eventCh <- evt }, }, @@ -177,12 +175,9 @@ func (ms *microServiceBRWatcher) Init(ctx context.Context, param interface{}) er } }() - ms.log.WithFields( - logrus.Fields{ - "taskType": ms.taskType, - "taskName": ms.taskName, - "thisPod": ms.thisPod, - }).Info("MicroServiceBR is initialized") + if err := ms.reEnsureThisPod(ctx); err != nil { + return err + } ms.eventInformer = eventInformer ms.podInformer = podInformer @@ -191,42 +186,56 @@ func (ms *microServiceBRWatcher) Init(ctx context.Context, param interface{}) er ms.ctx, ms.cancel = context.WithCancel(ctx) + ms.log.WithFields( + logrus.Fields{ + "taskType": ms.taskType, + "taskName": ms.taskName, + "thisPod": ms.thisPod, + }).Info("MicroServiceBR is initialized") + succeeded = true return nil + } func (ms *microServiceBRWatcher) Close(ctx context.Context) { if ms.cancel != nil { ms.cancel() - ms.cancel = nil } ms.log.WithField("taskType", ms.taskType).WithField("taskName", ms.taskName).Info("Closing MicroServiceBR") ms.wgWatcher.Wait() - if ms.eventInformer != nil && ms.eventHandler != nil { + ms.close() + + ms.log.WithField("taskType", ms.taskType).WithField("taskName", ms.taskName).Info("MicroServiceBR is closed") +} + +func (ms *microServiceBRWatcher) close() { + ms.watcherLock.Lock() + defer ms.watcherLock.Unlock() + + if ms.eventHandler != nil { if err := ms.eventInformer.RemoveEventHandler(ms.eventHandler); err != nil { ms.log.WithError(err).Warn("Failed to remove event handler") } + + ms.eventHandler = nil } - if ms.podInformer != nil && ms.podHandler != nil { + if ms.podHandler != nil { if err := ms.podInformer.RemoveEventHandler(ms.podHandler); err != nil { ms.log.WithError(err).Warn("Failed to remove pod handler") } - } - ms.log.WithField("taskType", ms.taskType).WithField("taskName", ms.taskName).Info("MicroServiceBR is closed") + ms.podHandler = nil + } } func (ms *microServiceBRWatcher) StartBackup(source AccessPoint, uploaderConfig map[string]string, param interface{}) error { - ms.log.Infof("Start watching backup ms for source %v", source) - - if err := ms.reEnsureThisPod(); err != nil { - return err - } + ms.log.Infof("Start watching backup ms for source %v", source.ByPath) ms.startWatch() @@ -234,20 +243,16 @@ func (ms *microServiceBRWatcher) StartBackup(source AccessPoint, uploaderConfig } func (ms *microServiceBRWatcher) StartRestore(snapshotID string, target AccessPoint, uploaderConfigs map[string]string) error { - ms.log.Infof("Start watching restore ms to target %v, from snapshot %s", target, snapshotID) - - if err := ms.reEnsureThisPod(); err != nil { - return err - } + ms.log.Infof("Start watching restore ms to target %s, from snapshot %s", target.ByPath, snapshotID) ms.startWatch() return nil } -func (ms *microServiceBRWatcher) reEnsureThisPod() error { +func (ms *microServiceBRWatcher) reEnsureThisPod(ctx context.Context) error { thisPod := &v1.Pod{} - if err := ms.client.Get(ms.ctx, types.NamespacedName{ + if err := ms.client.Get(ctx, types.NamespacedName{ Namespace: ms.namespace, Name: ms.thisPod, }, thisPod); err != nil { @@ -275,6 +280,11 @@ func (ms *microServiceBRWatcher) startWatch() { go func() { ms.log.Info("Start watching data path pod") + defer func() { + ms.close() + ms.wgWatcher.Done() + }() + var lastPod *v1.Pod watchLoop: @@ -291,14 +301,16 @@ func (ms *microServiceBRWatcher) startWatch() { } if lastPod == nil { - ms.log.Warn("Data path pod watch loop is canceled") - ms.wgWatcher.Done() + ms.log.Warn("Watch loop is cancelled on waiting data path pod") return } epilogLoop: for !ms.startedFromEvent || !ms.terminatedFromEvent { select { + case <-ms.ctx.Done(): + ms.log.Warn("Watch loop is cancelled on waiting final event") + return case <-time.After(eventWaitTimeout): break epilogLoop case evt := <-ms.eventCh: @@ -339,8 +351,6 @@ func (ms *microServiceBRWatcher) startWatch() { } logger.Info("Complete callback on data path pod termination") - - ms.wgWatcher.Done() }() } @@ -348,20 +358,22 @@ func (ms *microServiceBRWatcher) onEvent(evt *v1.Event) { switch evt.Reason { case EventReasonStarted: ms.startedFromEvent = true - ms.log.Infof("Received data path start message %s", evt.Message) + ms.log.Infof("Received data path start message: %s", evt.Message) case EventReasonProgress: ms.callbacks.OnProgress(ms.ctx, ms.namespace, ms.taskName, funcGetProgressFromMessage(evt.Message, ms.log)) case EventReasonCompleted: - ms.log.Infof("Received data path completed message %v", funcGetResultFromMessage(ms.taskType, evt.Message, ms.log)) + ms.log.Infof("Received data path completed message: %v", funcGetResultFromMessage(ms.taskType, evt.Message, ms.log)) ms.terminatedFromEvent = true case EventReasonCancelled: - ms.log.Infof("Received data path canceled message %s", evt.Message) + ms.log.Infof("Received data path canceled message: %s", evt.Message) ms.terminatedFromEvent = true case EventReasonFailed: - ms.log.Infof("Received data path failed message %s", evt.Message) + ms.log.Infof("Received data path failed message: %s", evt.Message) ms.terminatedFromEvent = true + case EventReasonCancelling: + ms.log.Infof("Received data path canceling message: %s", evt.Message) default: - ms.log.Debugf("Received event for data mover %s.[reason %s, message %s]", ms.taskName, evt.Reason, evt.Message) + ms.log.Infof("Received event for data path %s,reason: %s, message: %s", ms.taskName, evt.Reason, evt.Message) } } diff --git a/pkg/datapath/micro_service_watcher_test.go b/pkg/datapath/micro_service_watcher_test.go index f10f6b3310..f926363e07 100644 --- a/pkg/datapath/micro_service_watcher_test.go +++ b/pkg/datapath/micro_service_watcher_test.go @@ -102,7 +102,7 @@ func TestReEnsureThisPod(t *testing.T) { log: velerotest.NewLogger(), } - err := ms.reEnsureThisPod() + err := ms.reEnsureThisPod(context.Background()) if test.expectErr != "" { assert.EqualError(t, err, test.expectErr) } else { From ed0ef67c16dbd119365527e2581a6af505b555f1 Mon Sep 17 00:00:00 2001 From: Lyndon-Li Date: Wed, 14 Aug 2024 13:16:35 +0800 Subject: [PATCH 2/3] data mover ms smoke testing Signed-off-by: Lyndon-Li --- pkg/cmd/cli/datamover/backup.go | 1 + pkg/cmd/cli/datamover/restore.go | 3 +- pkg/cmd/cli/nodeagent/server.go | 6 +- pkg/controller/data_download_controller.go | 58 ++++++++----------- .../data_download_controller_test.go | 13 +---- pkg/controller/data_upload_controller.go | 24 +++----- pkg/controller/data_upload_controller_test.go | 21 +------ .../pod_volume_backup_controller.go | 6 +- .../pod_volume_restore_controller.go | 6 +- pkg/datapath/micro_service_watcher.go | 29 +--------- 10 files changed, 50 insertions(+), 117 deletions(-) diff --git a/pkg/cmd/cli/datamover/backup.go b/pkg/cmd/cli/datamover/backup.go index ca600faab5..4d704b04c1 100644 --- a/pkg/cmd/cli/datamover/backup.go +++ b/pkg/cmd/cli/datamover/backup.go @@ -224,6 +224,7 @@ func (s *dataMoverBackup) runDataPath() { err = dpService.Init() if err != nil { + dpService.Shutdown() s.cancelFunc() funcExitWithMessage(s.logger, false, "Failed to init data path service for DataUpload %s: %v", s.config.duName, err) return diff --git a/pkg/cmd/cli/datamover/restore.go b/pkg/cmd/cli/datamover/restore.go index fc74a64f15..244060cc9a 100644 --- a/pkg/cmd/cli/datamover/restore.go +++ b/pkg/cmd/cli/datamover/restore.go @@ -215,6 +215,7 @@ func (s *dataMoverRestore) runDataPath() { err = dpService.Init() if err != nil { + dpService.Shutdown() s.cancelFunc() funcExitWithMessage(s.logger, false, "Failed to init data path service for DataDownload %s: %v", s.config.ddName, err) return @@ -222,8 +223,8 @@ func (s *dataMoverRestore) runDataPath() { result, err := dpService.RunCancelableDataPath(s.ctx) if err != nil { - s.cancelFunc() dpService.Shutdown() + s.cancelFunc() funcExitWithMessage(s.logger, false, "Failed to run data path service for DataDownload %s: %v", s.config.ddName, err) return } diff --git a/pkg/cmd/cli/nodeagent/server.go b/pkg/cmd/cli/nodeagent/server.go index 181afbf690..f30c8df4aa 100644 --- a/pkg/cmd/cli/nodeagent/server.go +++ b/pkg/cmd/cli/nodeagent/server.go @@ -292,12 +292,12 @@ func (s *nodeAgentServer) run() { if s.dataPathConfigs != nil && len(s.dataPathConfigs.LoadAffinity) > 0 { loadAffinity = s.dataPathConfigs.LoadAffinity[0] } - dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.csiSnapshotClient.SnapshotV1(), s.dataPathMgr, loadAffinity, repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger, s.metrics) + dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.csiSnapshotClient.SnapshotV1(), s.dataPathMgr, loadAffinity, clock.RealClock{}, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics) if err = dataUploadReconciler.SetupWithManager(s.mgr); err != nil { s.logger.WithError(err).Fatal("Unable to create the data upload controller") } - dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, repoEnsurer, credentialGetter, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics) + dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics) if err = dataDownloadReconciler.SetupWithManager(s.mgr); err != nil { s.logger.WithError(err).Fatal("Unable to create the data download controller") } @@ -312,8 +312,6 @@ func (s *nodeAgentServer) run() { if err := dataDownloadReconciler.AttemptDataDownloadResume(s.ctx, s.mgr.GetClient(), s.logger.WithField("node", s.nodeName), s.namespace); err != nil { s.logger.WithError(errors.WithStack(err)).Error("failed to attempt data download resume") } - - s.logger.Info("Attempt complete to resume dataUploads and dataDownloads") }() s.logger.Info("Controllers starting...") diff --git a/pkg/controller/data_download_controller.go b/pkg/controller/data_download_controller.go index a161f60bd4..0b9805a0ac 100644 --- a/pkg/controller/data_download_controller.go +++ b/pkg/controller/data_download_controller.go @@ -39,7 +39,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" - "github.com/vmware-tanzu/velero/internal/credentials" "github.com/vmware-tanzu/velero/pkg/apis/velero/shared" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1" @@ -47,45 +46,37 @@ import ( "github.com/vmware-tanzu/velero/pkg/datapath" "github.com/vmware-tanzu/velero/pkg/exposer" "github.com/vmware-tanzu/velero/pkg/metrics" - repository "github.com/vmware-tanzu/velero/pkg/repository" "github.com/vmware-tanzu/velero/pkg/uploader" - "github.com/vmware-tanzu/velero/pkg/util/filesystem" "github.com/vmware-tanzu/velero/pkg/util/kube" ) // DataDownloadReconciler reconciles a DataDownload object type DataDownloadReconciler struct { - client client.Client - kubeClient kubernetes.Interface - mgr manager.Manager - logger logrus.FieldLogger - credentialGetter *credentials.CredentialGetter - fileSystem filesystem.Interface - Clock clock.WithTickerAndDelayedExecution - restoreExposer exposer.GenericRestoreExposer - nodeName string - repositoryEnsurer *repository.Ensurer - dataPathMgr *datapath.Manager - preparingTimeout time.Duration - metrics *metrics.ServerMetrics + client client.Client + kubeClient kubernetes.Interface + mgr manager.Manager + logger logrus.FieldLogger + Clock clock.WithTickerAndDelayedExecution + restoreExposer exposer.GenericRestoreExposer + nodeName string + dataPathMgr *datapath.Manager + preparingTimeout time.Duration + metrics *metrics.ServerMetrics } func NewDataDownloadReconciler(client client.Client, mgr manager.Manager, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager, - repoEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter, nodeName string, preparingTimeout time.Duration, logger logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataDownloadReconciler { + nodeName string, preparingTimeout time.Duration, logger logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataDownloadReconciler { return &DataDownloadReconciler{ - client: client, - kubeClient: kubeClient, - mgr: mgr, - logger: logger.WithField("controller", "DataDownload"), - credentialGetter: credentialGetter, - fileSystem: filesystem.NewFileSystem(), - Clock: &clock.RealClock{}, - nodeName: nodeName, - repositoryEnsurer: repoEnsurer, - restoreExposer: exposer.NewGenericRestoreExposer(kubeClient, logger), - dataPathMgr: dataPathMgr, - preparingTimeout: preparingTimeout, - metrics: metrics, + client: client, + kubeClient: kubeClient, + mgr: mgr, + logger: logger.WithField("controller", "DataDownload"), + Clock: &clock.RealClock{}, + nodeName: nodeName, + restoreExposer: exposer.NewGenericRestoreExposer(kubeClient, logger), + dataPathMgr: dataPathMgr, + preparingTimeout: preparingTimeout, + metrics: metrics, } } @@ -282,7 +273,7 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request } if err := r.initCancelableDataPath(ctx, asyncBR, result, log); err != nil { - log.WithError(err).Warnf("Failed to init cancelable data path for %s, will close and retry", dd.Name) + log.WithError(err).Errorf("Failed to init cancelable data path for %s", dd.Name) r.closeDataPath(ctx, dd.Name) return r.errorOut(ctx, dd, err, "error initializing data path", log) @@ -372,7 +363,7 @@ func (r *DataDownloadReconciler) startCancelableDataPath(asyncBR datapath.AsyncB return errors.Wrapf(err, "error starting async restore for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName) } - log.Info("Async restore started for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName) + log.Infof("Async restore started for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName) return nil } @@ -420,7 +411,7 @@ func (r *DataDownloadReconciler) OnDataDownloadFailed(ctx context.Context, names if getErr := r.client.Get(ctx, types.NamespacedName{Name: ddName, Namespace: namespace}, &dd); getErr != nil { log.WithError(getErr).Warn("Failed to get data download on failure") } else { - r.errorOut(ctx, &dd, err, "data path restore failed", log) + _, _ = r.errorOut(ctx, &dd, err, "data path restore failed", log) } } @@ -797,6 +788,7 @@ func (r *DataDownloadReconciler) AttemptDataDownloadResume(ctx context.Context, err := funcResumeCancellableDataRestore(r, ctx, dd, logger) if err == nil { + logger.WithField("dd", dd.Name).WithField("current node", r.nodeName).Info("Completed to resume in progress DD") continue } diff --git a/pkg/controller/data_download_controller_test.go b/pkg/controller/data_download_controller_test.go index d20db30f0c..bb9fe6f7c9 100644 --- a/pkg/controller/data_download_controller_test.go +++ b/pkg/controller/data_download_controller_test.go @@ -43,7 +43,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client/fake" - "github.com/vmware-tanzu/velero/internal/credentials" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1" "github.com/vmware-tanzu/velero/pkg/builder" @@ -139,19 +138,9 @@ func initDataDownloadReconcilerWithError(objects []runtime.Object, needError ... return nil, err } - credentialFileStore, err := credentials.NewNamespacedFileStore( - fakeClient, - velerov1api.DefaultNamespace, - "/tmp/credentials", - fakeFS, - ) - if err != nil { - return nil, err - } - dataPathMgr := datapath.NewManager(1) - return NewDataDownloadReconciler(fakeClient, nil, fakeKubeClient, dataPathMgr, nil, &credentials.CredentialGetter{FromFile: credentialFileStore}, "test-node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil + return NewDataDownloadReconciler(fakeClient, nil, fakeKubeClient, dataPathMgr, "test-node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil } func TestDataDownloadReconcile(t *testing.T) { diff --git a/pkg/controller/data_upload_controller.go b/pkg/controller/data_upload_controller.go index c6a15ceca8..fb9e75709d 100644 --- a/pkg/controller/data_upload_controller.go +++ b/pkg/controller/data_upload_controller.go @@ -41,7 +41,6 @@ import ( snapshotter "github.com/kubernetes-csi/external-snapshotter/client/v7/clientset/versioned/typed/volumesnapshot/v1" - "github.com/vmware-tanzu/velero/internal/credentials" "github.com/vmware-tanzu/velero/pkg/apis/velero/shared" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1" @@ -50,9 +49,7 @@ import ( "github.com/vmware-tanzu/velero/pkg/exposer" "github.com/vmware-tanzu/velero/pkg/metrics" "github.com/vmware-tanzu/velero/pkg/nodeagent" - "github.com/vmware-tanzu/velero/pkg/repository" "github.com/vmware-tanzu/velero/pkg/uploader" - "github.com/vmware-tanzu/velero/pkg/util/filesystem" "github.com/vmware-tanzu/velero/pkg/util/kube" ) @@ -69,11 +66,8 @@ type DataUploadReconciler struct { kubeClient kubernetes.Interface csiSnapshotClient snapshotter.SnapshotV1Interface mgr manager.Manager - repoEnsurer *repository.Ensurer Clock clocks.WithTickerAndDelayedExecution - credentialGetter *credentials.CredentialGetter nodeName string - fileSystem filesystem.Interface logger logrus.FieldLogger snapshotExposerList map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer dataPathMgr *datapath.Manager @@ -83,19 +77,16 @@ type DataUploadReconciler struct { } func NewDataUploadReconciler(client client.Client, mgr manager.Manager, kubeClient kubernetes.Interface, csiSnapshotClient snapshotter.SnapshotV1Interface, - dataPathMgr *datapath.Manager, loadAffinity *nodeagent.LoadAffinity, repoEnsurer *repository.Ensurer, clock clocks.WithTickerAndDelayedExecution, - cred *credentials.CredentialGetter, nodeName string, fs filesystem.Interface, preparingTimeout time.Duration, log logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataUploadReconciler { + dataPathMgr *datapath.Manager, loadAffinity *nodeagent.LoadAffinity, clock clocks.WithTickerAndDelayedExecution, nodeName string, preparingTimeout time.Duration, + log logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataUploadReconciler { return &DataUploadReconciler{ client: client, mgr: mgr, kubeClient: kubeClient, csiSnapshotClient: csiSnapshotClient, Clock: clock, - credentialGetter: cred, nodeName: nodeName, - fileSystem: fs, logger: log, - repoEnsurer: repoEnsurer, snapshotExposerList: map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer{velerov2alpha1api.SnapshotTypeCSI: exposer.NewCSISnapshotExposer(kubeClient, csiSnapshotClient, log)}, dataPathMgr: dataPathMgr, loadAffinity: loadAffinity, @@ -293,7 +284,7 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) } if err := r.initCancelableDataPath(ctx, asyncBR, res, log); err != nil { - log.WithError(err).Warnf("Failed to init cancelable data path for %s, will close and retry", du.Name) + log.WithError(err).Errorf("Failed to init cancelable data path for %s", du.Name) r.closeDataPath(ctx, du.Name) return r.errorOut(ctx, du, err, "error initializing data path", log) @@ -383,7 +374,7 @@ func (r *DataUploadReconciler) startCancelableDataPath(asyncBR datapath.AsyncBR, return errors.Wrapf(err, "error starting async backup for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName) } - log.Info("Async backup started for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName) + log.Infof("Async backup started for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName) return nil } @@ -442,7 +433,7 @@ func (r *DataUploadReconciler) OnDataUploadFailed(ctx context.Context, namespace if getErr := r.client.Get(ctx, types.NamespacedName{Name: duName, Namespace: namespace}, &du); getErr != nil { log.WithError(getErr).Warn("Failed to get dataupload on failure") } else { - r.errorOut(ctx, &du, err, "data path backup failed", log) + _, _ = r.errorOut(ctx, &du, err, "data path backup failed", log) } } @@ -502,7 +493,7 @@ func (r *DataUploadReconciler) tryCancelAcceptedDataUpload(ctx context.Context, r.cleanUp(ctx, du, log) } -func (r *DataUploadReconciler) cleanUp(ctx context.Context, du *velerov2alpha1api.DataUpload, log *logrus.Entry) { +func (r *DataUploadReconciler) cleanUp(ctx context.Context, du *velerov2alpha1api.DataUpload, log logrus.FieldLogger) { ep, ok := r.snapshotExposerList[du.Spec.SnapshotType] if !ok { log.WithError(fmt.Errorf("%v type of snapshot exposer is not exist", du.Spec.SnapshotType)). @@ -650,7 +641,7 @@ func (r *DataUploadReconciler) errorOut(ctx context.Context, du *velerov2alpha1a } se.CleanUp(ctx, getOwnerObject(du), volumeSnapshotName, du.Spec.SourceNamespace) } else { - log.Warnf("failed to clean up exposed snapshot could not find %s snapshot exposer", du.Spec.SnapshotType) + log.Errorf("failed to clean up exposed snapshot could not find %s snapshot exposer", du.Spec.SnapshotType) } return ctrl.Result{}, r.updateStatusToFailed(ctx, du, err, msg, log) @@ -894,6 +885,7 @@ func (r *DataUploadReconciler) AttemptDataUploadResume(ctx context.Context, cli err := funcResumeCancellableDataBackup(r, ctx, du, logger) if err == nil { + logger.WithField("du", du.Name).WithField("current node", r.nodeName).Info("Completed to resume in progress DU") continue } diff --git a/pkg/controller/data_upload_controller_test.go b/pkg/controller/data_upload_controller_test.go index 8df98b60dc..e4c98f1968 100644 --- a/pkg/controller/data_upload_controller_test.go +++ b/pkg/controller/data_upload_controller_test.go @@ -47,7 +47,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" - "github.com/vmware-tanzu/velero/internal/credentials" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1" "github.com/vmware-tanzu/velero/pkg/builder" @@ -229,24 +228,9 @@ func initDataUploaderReconcilerWithError(needError ...error) (*DataUploadReconci fakeSnapshotClient := snapshotFake.NewSimpleClientset(vsObject, vscObj) fakeKubeClient := clientgofake.NewSimpleClientset(daemonSet) - fakeFS := velerotest.NewFakeFileSystem() - pathGlob := fmt.Sprintf("/host_pods/%s/volumes/*/%s", "", dataUploadName) - _, err = fakeFS.Create(pathGlob) - if err != nil { - return nil, err - } - credentialFileStore, err := credentials.NewNamespacedFileStore( - fakeClient, - velerov1api.DefaultNamespace, - "/tmp/credentials", - fakeFS, - ) - if err != nil { - return nil, err - } - return NewDataUploadReconciler(fakeClient, nil, fakeKubeClient, fakeSnapshotClient.SnapshotV1(), dataPathMgr, nil, nil, - testclocks.NewFakeClock(now), &credentials.CredentialGetter{FromFile: credentialFileStore}, "test-node", fakeFS, time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil + return NewDataUploadReconciler(fakeClient, nil, fakeKubeClient, fakeSnapshotClient.SnapshotV1(), dataPathMgr, nil, + testclocks.NewFakeClock(now), "test-node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil } func dataUploadBuilder() *builder.DataUploadBuilder { @@ -626,7 +610,6 @@ func TestReconcile(t *testing.T) { if !testCreateFsBR && du.Status.Phase != velerov2alpha1api.DataUploadPhaseInProgress { assert.Nil(t, r.dataPathMgr.GetAsyncBR(test.du.Name)) } - }) } } diff --git a/pkg/controller/pod_volume_backup_controller.go b/pkg/controller/pod_volume_backup_controller.go index e626df2b35..548ab0d0c6 100644 --- a/pkg/controller/pod_volume_backup_controller.go +++ b/pkg/controller/pod_volume_backup_controller.go @@ -202,7 +202,7 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ } func (r *PodVolumeBackupReconciler) OnDataPathCompleted(ctx context.Context, namespace string, pvbName string, result datapath.Result) { - defer r.closeDataPath(ctx, pvbName) + defer r.dataPathMgr.RemoveAsyncBR(pvbName) log := r.logger.WithField("pvb", pvbName) @@ -240,7 +240,7 @@ func (r *PodVolumeBackupReconciler) OnDataPathCompleted(ctx context.Context, nam } func (r *PodVolumeBackupReconciler) OnDataPathFailed(ctx context.Context, namespace, pvbName string, err error) { - defer r.closeDataPath(ctx, pvbName) + defer r.dataPathMgr.RemoveAsyncBR(pvbName) log := r.logger.WithField("pvb", pvbName) @@ -255,7 +255,7 @@ func (r *PodVolumeBackupReconciler) OnDataPathFailed(ctx context.Context, namesp } func (r *PodVolumeBackupReconciler) OnDataPathCancelled(ctx context.Context, namespace string, pvbName string) { - defer r.closeDataPath(ctx, pvbName) + defer r.dataPathMgr.RemoveAsyncBR(pvbName) log := r.logger.WithField("pvb", pvbName) diff --git a/pkg/controller/pod_volume_restore_controller.go b/pkg/controller/pod_volume_restore_controller.go index 3f285789db..c4a3e7451f 100644 --- a/pkg/controller/pod_volume_restore_controller.go +++ b/pkg/controller/pod_volume_restore_controller.go @@ -265,7 +265,7 @@ func getInitContainerIndex(pod *corev1api.Pod) int { } func (c *PodVolumeRestoreReconciler) OnDataPathCompleted(ctx context.Context, namespace string, pvrName string, result datapath.Result) { - defer c.closeDataPath(ctx, pvrName) + defer c.dataPathMgr.RemoveAsyncBR(pvrName) log := c.logger.WithField("pvr", pvrName) @@ -325,7 +325,7 @@ func (c *PodVolumeRestoreReconciler) OnDataPathCompleted(ctx context.Context, na } func (c *PodVolumeRestoreReconciler) OnDataPathFailed(ctx context.Context, namespace string, pvrName string, err error) { - defer c.closeDataPath(ctx, pvrName) + defer c.dataPathMgr.RemoveAsyncBR(pvrName) log := c.logger.WithField("pvr", pvrName) @@ -340,7 +340,7 @@ func (c *PodVolumeRestoreReconciler) OnDataPathFailed(ctx context.Context, names } func (c *PodVolumeRestoreReconciler) OnDataPathCancelled(ctx context.Context, namespace string, pvrName string) { - defer c.closeDataPath(ctx, pvrName) + defer c.dataPathMgr.RemoveAsyncBR(pvrName) log := c.logger.WithField("pvr", pvrName) diff --git a/pkg/datapath/micro_service_watcher.go b/pkg/datapath/micro_service_watcher.go index 8a129bde8f..d74ca2fc2c 100644 --- a/pkg/datapath/micro_service_watcher.go +++ b/pkg/datapath/micro_service_watcher.go @@ -103,8 +103,6 @@ func newMicroServiceBRWatcher(client client.Client, kubeClient kubernetes.Interf } func (ms *microServiceBRWatcher) Init(ctx context.Context, param interface{}) error { - succeeded := false - eventInformer, err := ms.mgr.GetCache().GetInformer(ctx, &v1.Event{}) if err != nil { return errors.Wrap(err, "error getting event informer") @@ -135,19 +133,10 @@ func (ms *microServiceBRWatcher) Init(ctx context.Context, param interface{}) er }, }, ) - if err != nil { return errors.Wrap(err, "error registering event handler") } - defer func() { - if !succeeded { - if err := eventInformer.RemoveEventHandler(eventHandler); err != nil { - ms.log.WithError(err).Warn("Failed to remove event handler") - } - } - }() - podHandler, err := podInformer.AddEventHandler( cache.ResourceEventHandlerFuncs{ UpdateFunc: func(_, obj interface{}) { @@ -162,19 +151,10 @@ func (ms *microServiceBRWatcher) Init(ctx context.Context, param interface{}) er }, }, ) - if err != nil { return errors.Wrap(err, "error registering pod handler") } - defer func() { - if !succeeded { - if err := podInformer.RemoveEventHandler(podHandler); err != nil { - ms.log.WithError(err).Warn("Failed to remove pod handler") - } - } - }() - if err := ms.reEnsureThisPod(ctx); err != nil { return err } @@ -193,10 +173,7 @@ func (ms *microServiceBRWatcher) Init(ctx context.Context, param interface{}) er "thisPod": ms.thisPod, }).Info("MicroServiceBR is initialized") - succeeded = true - return nil - } func (ms *microServiceBRWatcher) Close(ctx context.Context) { @@ -301,7 +278,7 @@ func (ms *microServiceBRWatcher) startWatch() { } if lastPod == nil { - ms.log.Warn("Watch loop is cancelled on waiting data path pod") + ms.log.Warn("Watch loop is canceled on waiting data path pod") return } @@ -309,7 +286,7 @@ func (ms *microServiceBRWatcher) startWatch() { for !ms.startedFromEvent || !ms.terminatedFromEvent { select { case <-ms.ctx.Done(): - ms.log.Warn("Watch loop is cancelled on waiting final event") + ms.log.Warn("Watch loop is canceled on waiting final event") return case <-time.After(eventWaitTimeout): break epilogLoop @@ -373,7 +350,7 @@ func (ms *microServiceBRWatcher) onEvent(evt *v1.Event) { case EventReasonCancelling: ms.log.Infof("Received data path canceling message: %s", evt.Message) default: - ms.log.Infof("Received event for data path %s,reason: %s, message: %s", ms.taskName, evt.Reason, evt.Message) + ms.log.Infof("Received event for data path %s, reason: %s, message: %s", ms.taskName, evt.Reason, evt.Message) } } From 0ed1a7fc8699a4205e538d8cf65c9b24ac9dd2ad Mon Sep 17 00:00:00 2001 From: Lyndon-Li Date: Thu, 15 Aug 2024 15:06:31 +0800 Subject: [PATCH 3/3] data mover ms smoke testing Signed-off-by: Lyndon-Li --- pkg/cmd/cli/nodeagent/server.go | 38 ++++++++++++++++--- pkg/controller/data_download_controller.go | 8 ++-- .../data_download_controller_test.go | 2 +- pkg/controller/data_upload_controller.go | 8 ++-- pkg/controller/data_upload_controller_test.go | 2 +- 5 files changed, 43 insertions(+), 15 deletions(-) diff --git a/pkg/cmd/cli/nodeagent/server.go b/pkg/cmd/cli/nodeagent/server.go index 513eb89de1..8aa52b1c56 100644 --- a/pkg/cmd/cli/nodeagent/server.go +++ b/pkg/cmd/cli/nodeagent/server.go @@ -61,6 +61,8 @@ import ( "github.com/vmware-tanzu/velero/pkg/repository" "github.com/vmware-tanzu/velero/pkg/util/filesystem" "github.com/vmware-tanzu/velero/pkg/util/logging" + + cacheutil "k8s.io/client-go/tools/cache" ) var ( @@ -309,14 +311,17 @@ func (s *nodeAgentServer) run() { } go func() { - s.mgr.GetCache().WaitForCacheSync(s.ctx) + if err := s.waitCacheForResume(); err != nil { + s.logger.WithError(err).Error("Failed to wait cache for resume, will not resume DU/DD") + return + } - if err := dataUploadReconciler.AttemptDataUploadResume(s.ctx, s.mgr.GetClient(), s.logger.WithField("node", s.nodeName), s.namespace); err != nil { - s.logger.WithError(errors.WithStack(err)).Error("failed to attempt data upload resume") + if err := dataUploadReconciler.AttemptDataUploadResume(s.ctx, s.logger.WithField("node", s.nodeName), s.namespace); err != nil { + s.logger.WithError(errors.WithStack(err)).Error("Failed to attempt data upload resume") } - if err := dataDownloadReconciler.AttemptDataDownloadResume(s.ctx, s.mgr.GetClient(), s.logger.WithField("node", s.nodeName), s.namespace); err != nil { - s.logger.WithError(errors.WithStack(err)).Error("failed to attempt data download resume") + if err := dataDownloadReconciler.AttemptDataDownloadResume(s.ctx, s.logger.WithField("node", s.nodeName), s.namespace); err != nil { + s.logger.WithError(errors.WithStack(err)).Error("Failed to attempt data download resume") } }() @@ -327,6 +332,29 @@ func (s *nodeAgentServer) run() { } } +func (s *nodeAgentServer) waitCacheForResume() error { + podInformer, err := s.mgr.GetCache().GetInformer(s.ctx, &v1.Pod{}) + if err != nil { + return errors.Wrap(err, "error getting pod informer") + } + + duInformer, err := s.mgr.GetCache().GetInformer(s.ctx, &velerov2alpha1api.DataUpload{}) + if err != nil { + return errors.Wrap(err, "error getting du informer") + } + + ddInformer, err := s.mgr.GetCache().GetInformer(s.ctx, &velerov2alpha1api.DataDownload{}) + if err != nil { + return errors.Wrap(err, "error getting dd informer") + } + + if !cacheutil.WaitForCacheSync(s.ctx.Done(), podInformer.HasSynced, duInformer.HasSynced, ddInformer.HasSynced) { + return errors.New("error waiting informer synced") + } + + return nil +} + // validatePodVolumesHostPath validates that the pod volumes path contains a // directory for each Pod running on this node func (s *nodeAgentServer) validatePodVolumesHostPath(client kubernetes.Interface) error { diff --git a/pkg/controller/data_download_controller.go b/pkg/controller/data_download_controller.go index 0b9805a0ac..990d7455b4 100644 --- a/pkg/controller/data_download_controller.go +++ b/pkg/controller/data_download_controller.go @@ -767,9 +767,9 @@ func UpdateDataDownloadWithRetry(ctx context.Context, client client.Client, name var funcResumeCancellableDataRestore = (*DataDownloadReconciler).resumeCancellableDataPath -func (r *DataDownloadReconciler) AttemptDataDownloadResume(ctx context.Context, cli client.Client, logger *logrus.Entry, ns string) error { +func (r *DataDownloadReconciler) AttemptDataDownloadResume(ctx context.Context, logger *logrus.Entry, ns string) error { dataDownloads := &velerov2alpha1api.DataDownloadList{} - if err := cli.List(ctx, dataDownloads, &client.ListOptions{Namespace: ns}); err != nil { + if err := r.client.List(ctx, dataDownloads, &client.ListOptions{Namespace: ns}); err != nil { r.logger.WithError(errors.WithStack(err)).Error("failed to list datadownloads") return errors.Wrapf(err, "error to list datadownloads") } @@ -795,7 +795,7 @@ func (r *DataDownloadReconciler) AttemptDataDownloadResume(ctx context.Context, logger.WithField("datadownload", dd.GetName()).WithError(err).Warn("Failed to resume data path for dd, have to cancel it") resumeErr := err - err = UpdateDataDownloadWithRetry(ctx, cli, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, logger.WithField("datadownload", dd.Name), + err = UpdateDataDownloadWithRetry(ctx, r.client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, logger.WithField("datadownload", dd.Name), func(dataDownload *velerov2alpha1api.DataDownload) bool { if dataDownload.Spec.Cancel { return false @@ -812,7 +812,7 @@ func (r *DataDownloadReconciler) AttemptDataDownloadResume(ctx context.Context, } else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted { r.logger.WithField("datadownload", dd.GetName()).Warn("Cancel dd under Accepted phase") - err := UpdateDataDownloadWithRetry(ctx, cli, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, + err := UpdateDataDownloadWithRetry(ctx, r.client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, r.logger.WithField("datadownload", dd.Name), func(dataDownload *velerov2alpha1api.DataDownload) bool { if dataDownload.Spec.Cancel { return false diff --git a/pkg/controller/data_download_controller_test.go b/pkg/controller/data_download_controller_test.go index bb9fe6f7c9..a54135d034 100644 --- a/pkg/controller/data_download_controller_test.go +++ b/pkg/controller/data_download_controller_test.go @@ -1079,7 +1079,7 @@ func TestAttemptDataDownloadResume(t *testing.T) { funcResumeCancellableDataBackup = dt.resumeCancellableDataPath // Run the test - err = r.AttemptDataDownloadResume(ctx, r.client, r.logger.WithField("name", test.name), test.dd.Namespace) + err = r.AttemptDataDownloadResume(ctx, r.logger.WithField("name", test.name), test.dd.Namespace) if test.expectedError != "" { assert.EqualError(t, err, test.expectedError) diff --git a/pkg/controller/data_upload_controller.go b/pkg/controller/data_upload_controller.go index 4c26baf38c..22dc59bd53 100644 --- a/pkg/controller/data_upload_controller.go +++ b/pkg/controller/data_upload_controller.go @@ -867,9 +867,9 @@ func UpdateDataUploadWithRetry(ctx context.Context, client client.Client, namesp var funcResumeCancellableDataBackup = (*DataUploadReconciler).resumeCancellableDataPath -func (r *DataUploadReconciler) AttemptDataUploadResume(ctx context.Context, cli client.Client, logger *logrus.Entry, ns string) error { +func (r *DataUploadReconciler) AttemptDataUploadResume(ctx context.Context, logger *logrus.Entry, ns string) error { dataUploads := &velerov2alpha1api.DataUploadList{} - if err := cli.List(ctx, dataUploads, &client.ListOptions{Namespace: ns}); err != nil { + if err := r.client.List(ctx, dataUploads, &client.ListOptions{Namespace: ns}); err != nil { r.logger.WithError(errors.WithStack(err)).Error("failed to list datauploads") return errors.Wrapf(err, "error to list datauploads") } @@ -895,7 +895,7 @@ func (r *DataUploadReconciler) AttemptDataUploadResume(ctx context.Context, cli logger.WithField("dataupload", du.GetName()).WithError(err).Warn("Failed to resume data path for du, have to cancel it") resumeErr := err - err = UpdateDataUploadWithRetry(ctx, cli, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, logger.WithField("dataupload", du.Name), + err = UpdateDataUploadWithRetry(ctx, r.client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, logger.WithField("dataupload", du.Name), func(dataUpload *velerov2alpha1api.DataUpload) bool { if dataUpload.Spec.Cancel { return false @@ -912,7 +912,7 @@ func (r *DataUploadReconciler) AttemptDataUploadResume(ctx context.Context, cli } else if du.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted { r.logger.WithField("dataupload", du.GetName()).Warn("Cancel du under Accepted phase") - err := UpdateDataUploadWithRetry(ctx, cli, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, r.logger.WithField("dataupload", du.Name), + err := UpdateDataUploadWithRetry(ctx, r.client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, r.logger.WithField("dataupload", du.Name), func(dataUpload *velerov2alpha1api.DataUpload) bool { if dataUpload.Spec.Cancel { return false diff --git a/pkg/controller/data_upload_controller_test.go b/pkg/controller/data_upload_controller_test.go index b373a7a6af..a6ee25574d 100644 --- a/pkg/controller/data_upload_controller_test.go +++ b/pkg/controller/data_upload_controller_test.go @@ -1127,7 +1127,7 @@ func TestAttemptDataUploadResume(t *testing.T) { funcResumeCancellableDataBackup = dt.resumeCancellableDataPath // Run the test - err = r.AttemptDataUploadResume(ctx, r.client, r.logger.WithField("name", test.name), test.du.Namespace) + err = r.AttemptDataUploadResume(ctx, r.logger.WithField("name", test.name), test.du.Namespace) if test.expectedError != "" { assert.EqualError(t, err, test.expectedError)