Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor vrg-kubeobject functions #1712

136 changes: 78 additions & 58 deletions internal/controller/vrg_kubeobjects.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,84 +495,68 @@ func (v *VRGInstance) kubeObjectsCaptureStatus(status metav1.ConditionStatus, re
}
}

func (v *VRGInstance) kubeObjectsRecover(result *ctrl.Result,
s3StoreProfile ramen.S3StoreProfile, objectStorer ObjectStorer,
) error {
if v.kubeObjectProtectionDisabled("recovery") {
return nil
}
func (v *VRGInstance) getVRGFromS3Profile(s3ProfileName string) (*ramen.VolumeReplicationGroup, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have this function: GetLastKnownVRGPrimaryFromS3. Can we have a common function?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BenamarMk that is a function in DRPC and this is a function in VRG as a method on VRGInstance, can I take up the refactoring in a later PR?

pathName := s3PathNamePrefix(v.instance.Namespace, v.instance.Name)

localS3StoreAccessor, err := v.findS3StoreAccessor(s3StoreProfile)
objectStore, _, err := v.reconciler.ObjStoreGetter.ObjectStore(
v.ctx, v.reconciler.APIReader, s3ProfileName, v.namespacedName, v.log)
if err != nil {
return err
return nil, fmt.Errorf("object store inaccessible for profile %v: %v", s3ProfileName, err)
}

vrg := v.instance
sourceVrgNamespaceName, sourceVrgName := vrg.Namespace, vrg.Name
sourcePathNamePrefix := s3PathNamePrefix(sourceVrgNamespaceName, sourceVrgName)
vrg := &ramen.VolumeReplicationGroup{}
if err := vrgObjectDownload(objectStore, pathName, vrg); err != nil {
return nil, fmt.Errorf("vrg download failed, vrg namespace:%v, vrg name: %v, s3Profile: %v, error: %v",
v.instance.Namespace, v.instance.Name, s3ProfileName, err)
}

sourceVrg := &ramen.VolumeReplicationGroup{}
if err := vrgObjectDownload(objectStorer, sourcePathNamePrefix, sourceVrg); err != nil {
v.log.Error(err, "Kube objects capture-to-recover-from identifier get error")
return vrg, nil
}

func (v *VRGInstance) kubeObjectsRecover(result *ctrl.Result, s3ProfileName string) error {
if v.kubeObjectProtectionDisabled("recovery") {
return nil
}

captureToRecoverFromIdentifier := sourceVrg.Status.KubeObjectProtection.CaptureToRecoverFrom
if captureToRecoverFromIdentifier == nil {
v.log.Info("Kube objects capture-to-recover-from identifier nil")
if v.instance.Spec.Action == "" {
v.log.Info("Skipping kube objects restore in fresh deployment case")

return nil
}

vrg.Status.KubeObjectProtection.CaptureToRecoverFrom = captureToRecoverFromIdentifier
veleroNamespaceName := v.veleroNamespaceName()
labels := util.OwnerLabels(vrg)
log := v.log.WithValues("number", captureToRecoverFromIdentifier.Number, "profile", localS3StoreAccessor.S3ProfileName)

captureRequestsStruct, err := v.reconciler.kubeObjects.ProtectRequestsGet(
v.ctx, v.reconciler.APIReader, veleroNamespaceName, labels)
sourceVrg, err := v.getVRGFromS3Profile(s3ProfileName)
if err != nil {
log.Error(err, "Kube objects capture requests query error")

return err
return fmt.Errorf("kube objects source VRG get error: %v", err)
}

recoverRequestsStruct, err := v.reconciler.kubeObjects.RecoverRequestsGet(
v.ctx, v.reconciler.APIReader, veleroNamespaceName, labels)
if err != nil {
log.Error(err, "Kube objects recover requests query error")

return err
captureToRecoverFromIdentifier := sourceVrg.Status.KubeObjectProtection.CaptureToRecoverFrom
if captureToRecoverFromIdentifier == nil {
return fmt.Errorf("kube objects source VRG capture-to-recover-from identifier nil: %v", err)
}

return v.kubeObjectsRecoveryStartOrResume(
result,
s3StoreAccessor{objectStorer, localS3StoreAccessor.S3StoreProfile},
sourceVrgNamespaceName, sourceVrgName, captureToRecoverFromIdentifier,
kubeobjects.RequestsMapKeyedByName(captureRequestsStruct),
kubeobjects.RequestsMapKeyedByName(recoverRequestsStruct),
veleroNamespaceName, labels, log,
)
v.instance.Status.KubeObjectProtection.CaptureToRecoverFrom = captureToRecoverFromIdentifier
log := v.log.WithValues("number", captureToRecoverFromIdentifier.Number, "profile", s3ProfileName)

return v.kubeObjectsRecoveryStartOrResume(result, s3ProfileName, captureToRecoverFromIdentifier, log)
}

func (v *VRGInstance) findS3StoreAccessor(s3StoreProfile ramen.S3StoreProfile) (s3StoreAccessor, error) {
func (v *VRGInstance) findS3StoreAccessor(s3ProfileName string) (s3StoreAccessor, error) {
for _, s3StoreAccessor := range v.s3StoreAccessors {
if s3StoreAccessor.S3StoreProfile.S3ProfileName == s3StoreProfile.S3ProfileName {
if s3StoreAccessor.S3StoreProfile.S3ProfileName == s3ProfileName {
return s3StoreAccessor, nil
}
}

return s3StoreAccessor{},
fmt.Errorf("s3StoreProfile (%s) not found in s3StoreAccessor list", s3StoreProfile.S3ProfileName)
fmt.Errorf("s3StoreProfile (%s) not found in s3StoreAccessor list", s3ProfileName)
}

func (v *VRGInstance) getRecoverOrProtectRequest(
captureRequests, recoverRequests map[string]kubeobjects.Request,
s3StoreAccessor s3StoreAccessor, sourceVrgNamespaceName, sourceVrgName string,
captureToRecoverFromIdentifier *ramen.KubeObjectsCaptureIdentifier,
groupNumber int, recoverGroup kubeobjects.RecoverSpec,
veleroNamespaceName string, labels map[string]string, log logr.Logger,
labels map[string]string, log logr.Logger,
) (kubeobjects.Request, bool, func() (kubeobjects.Request, error), func(kubeobjects.Request)) {
vrg := v.instance
annotations := map[string]string{}
Expand All @@ -591,7 +575,7 @@ func (v *VRGInstance) getRecoverOrProtectRequest(
s3StoreAccessor.S3CompatibleEndpoint, s3StoreAccessor.S3Bucket, s3StoreAccessor.S3Region, pathName,
s3StoreAccessor.VeleroNamespaceSecretKeyRef,
s3StoreAccessor.CACertificates,
recoverGroup.Spec, veleroNamespaceName,
recoverGroup.Spec, v.veleroNamespaceName(),
captureName,
labels, annotations)
},
Expand All @@ -615,7 +599,7 @@ func (v *VRGInstance) getRecoverOrProtectRequest(
s3StoreAccessor.S3CompatibleEndpoint, s3StoreAccessor.S3Bucket, s3StoreAccessor.S3Region, pathName,
s3StoreAccessor.VeleroNamespaceSecretKeyRef,
s3StoreAccessor.CACertificates,
recoverGroup, veleroNamespaceName,
recoverGroup, v.veleroNamespaceName(),
captureName, captureRequest,
recoverName,
labels, annotations)
Expand All @@ -627,16 +611,51 @@ func (v *VRGInstance) getRecoverOrProtectRequest(
}
}

func (v *VRGInstance) getCaptureRequests() (map[string]kubeobjects.Request, error) {
captureRequestsStruct, err := v.reconciler.kubeObjects.ProtectRequestsGet(
v.ctx, v.reconciler.APIReader, v.veleroNamespaceName(), util.OwnerLabels(v.instance))
if err != nil {
return nil, fmt.Errorf("kube objects capture requests query error: %v", err)
}

return kubeobjects.RequestsMapKeyedByName(captureRequestsStruct), nil
}

func (v *VRGInstance) getRecoverRequests() (map[string]kubeobjects.Request, error) {
recoverRequestsStruct, err := v.reconciler.kubeObjects.RecoverRequestsGet(
v.ctx, v.reconciler.APIReader, v.veleroNamespaceName(), util.OwnerLabels(v.instance))
if err != nil {
return nil, fmt.Errorf("kube objects recover requests query error: %v", err)
}

return kubeobjects.RequestsMapKeyedByName(recoverRequestsStruct), nil
}

func (v *VRGInstance) kubeObjectsRecoveryStartOrResume(
result *ctrl.Result, s3StoreAccessor s3StoreAccessor,
sourceVrgNamespaceName, sourceVrgName string,
result *ctrl.Result, s3ProfileName string,
captureToRecoverFromIdentifier *ramen.KubeObjectsCaptureIdentifier,
captureRequests, recoverRequests map[string]kubeobjects.Request,
veleroNamespaceName string, labels map[string]string, log logr.Logger,
log logr.Logger,
) error {
labels := util.OwnerLabels(v.instance)

captureRequests, err := v.getCaptureRequests()
if err != nil {
return err
}

recoverRequests, err := v.getRecoverRequests()
if err != nil {
return err
}

groups := v.recipeElements.RecoverWorkflow
requests := make([]kubeobjects.Request, len(groups))

s3StoreAccessor, err := v.findS3StoreAccessor(s3ProfileName)
if err != nil {
return fmt.Errorf("kube objects recovery couldn't build s3StoreAccessor: %v", err)
}

for groupNumber, recoverGroup := range groups {
rg := recoverGroup
log1 := log.WithValues("group", groupNumber, "name", rg.BackupName)
Expand All @@ -646,9 +665,9 @@ func (v *VRGInstance) kubeObjectsRecoveryStartOrResume(
return fmt.Errorf("check hook execution failed during restore %s: %v", rg.Hook.Name, err)
}
} else {
if err := v.executeRecoverGroup(result, s3StoreAccessor, sourceVrgNamespaceName,
sourceVrgName, captureToRecoverFromIdentifier, captureRequests,
recoverRequests, veleroNamespaceName, labels, groupNumber, rg,
if err := v.executeRecoverGroup(result, s3StoreAccessor,
captureToRecoverFromIdentifier, captureRequests,
recoverRequests, labels, groupNumber, rg,
requests, log1); err != nil {
return err
}
Expand All @@ -659,21 +678,22 @@ func (v *VRGInstance) kubeObjectsRecoveryStartOrResume(
duration := time.Since(startTime.Time)
log.Info("Kube objects recovered", "groups", len(groups), "start", startTime, "duration", duration)

return v.kubeObjectsRecoverRequestsDelete(result, veleroNamespaceName, labels)
return v.kubeObjectsRecoverRequestsDelete(result, v.veleroNamespaceName(), labels)
}

func (v *VRGInstance) executeRecoverGroup(result *ctrl.Result, s3StoreAccessor s3StoreAccessor,
sourceVrgNamespaceName, sourceVrgName string,
captureToRecoverFromIdentifier *ramen.KubeObjectsCaptureIdentifier,
captureRequests, recoverRequests map[string]kubeobjects.Request,
veleroNamespaceName string, labels map[string]string, groupNumber int,
labels map[string]string, groupNumber int,
rg kubeobjects.RecoverSpec, requests []kubeobjects.Request, log1 logr.Logger,
) error {
sourceVrgName := v.instance.Name
sourceVrgNamespaceName := v.instance.Namespace
request, ok, submit, cleanup := v.getRecoverOrProtectRequest(
captureRequests, recoverRequests, s3StoreAccessor,
sourceVrgNamespaceName, sourceVrgName,
captureToRecoverFromIdentifier,
groupNumber, rg, veleroNamespaceName, labels, log1,
groupNumber, rg, labels, log1,
)

var err error
Expand Down
6 changes: 2 additions & 4 deletions internal/controller/vrg_volrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -2010,9 +2010,7 @@ func (v *VRGInstance) restorePVsAndPVCsFromS3(result *ctrl.Result) (int, error)

var objectStore ObjectStorer

var s3StoreProfile ramendrv1alpha1.S3StoreProfile

objectStore, s3StoreProfile, err = v.reconciler.ObjStoreGetter.ObjectStore(
objectStore, _, err = v.reconciler.ObjStoreGetter.ObjectStore(
v.ctx, v.reconciler.APIReader, s3ProfileName, v.namespacedName, v.log)
if err != nil {
v.log.Error(err, "Kube objects recovery object store inaccessible", "profile", s3ProfileName)
Expand Down Expand Up @@ -2046,7 +2044,7 @@ func (v *VRGInstance) restorePVsAndPVCsFromS3(result *ctrl.Result) (int, error)

v.log.Info(fmt.Sprintf("Restored %d PVs and %d PVCs using profile %s", pvCount, pvcCount, s3ProfileName))

return pvCount + pvcCount, v.kubeObjectsRecover(result, s3StoreProfile, objectStore)
return pvCount + pvcCount, v.kubeObjectsRecover(result, s3ProfileName)
}

if NoS3 {
Expand Down