Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set or update peerClasses for VRG #1638

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 194 additions & 38 deletions internal/controller/drplacementcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
clrapiv1beta1 "open-cluster-management.io/api/cluster/v1beta1"

rmn "github.com/ramendr/ramen/api/v1alpha1"
Expand Down Expand Up @@ -297,7 +298,7 @@ func (d *DRPCInstance) startDeploying(homeCluster, homeClusterNamespace string)
d.setProgression(rmn.ProgressionCreatingMW)
// Create VRG first, to leverage user PlacementRule decision to skip placement and move to cleanup
err := d.createVRGManifestWork(homeCluster, rmn.Primary)
if err != nil {
if err != nil && !errors.IsAlreadyExists(err) {
return false, err
}

Expand Down Expand Up @@ -1512,28 +1513,49 @@ func (d *DRPCInstance) updatePreferredDecision() {
}
}

// createVRGManifestWork is called to create a new VRG ManifestWork on homeCluster
func (d *DRPCInstance) createVRGManifestWork(homeCluster string, repState rmn.ReplicationState) error {
// TODO: check if VRG MW here as a less expensive way to validate if Namespace exists
err := d.ensureNamespaceManifestWork(homeCluster)
if err != nil {
return fmt.Errorf("createVRGManifestWork couldn't ensure namespace '%s' on cluster %s exists",
d.vrgNamespace, homeCluster)
}

// Safety latch to ensure VRG MW is not present
vrg, err := d.getVRGFromManifestWork(homeCluster)
if (err != nil && !errors.IsNotFound(err)) || vrg != nil {
if err != nil {
return fmt.Errorf("error (%w) determining ManifestWork for VolumeReplicationGroup resource "+
"exists on cluster %s", err, homeCluster)
}

if vrg.Spec.ReplicationState != repState {
return fmt.Errorf("ManifestWork for VolumeReplicationGroup resource "+
"exists with mismatching state (%s) on cluster %s",
vrg.Spec.ReplicationState, homeCluster)
}

return fmt.Errorf("VolumeReplicationGroup ManifestWork for cluster %s in state %s exists (%w)",
homeCluster, string(vrg.Spec.ReplicationState), errors.NewAlreadyExists(
schema.GroupResource{
Group: rmn.GroupVersion.Group,
Resource: "VolumeReplicationGroup",
}, vrg.Name))
}

// create VRG ManifestWork
d.log.Info("Creating VRG ManifestWork", "ReplicationState", repState,
"Last State:", d.getLastDRState(), "cluster", homeCluster)

vrg := d.newVRG(homeCluster, repState)

newVRG := d.newVRG(homeCluster, repState, nil)
annotations := make(map[string]string)

annotations[DRPCNameAnnotation] = d.instance.Name
annotations[DRPCNamespaceAnnotation] = d.instance.Namespace

if _, err := d.mwu.CreateOrUpdateVRGManifestWork(
d.instance.Name, d.vrgNamespace,
homeCluster, vrg, annotations); err != nil {
homeCluster, newVRG, annotations); err != nil {
d.log.Error(err, "failed to create or update VolumeReplicationGroup manifest")

return fmt.Errorf("failed to create or update VolumeReplicationGroup manifest in namespace %s (%w)", homeCluster, err)
Expand All @@ -1543,35 +1565,164 @@ func (d *DRPCInstance) createVRGManifestWork(homeCluster string, repState rmn.Re
}

// ensureVRGManifestWork ensures that the VRG ManifestWork exists and matches the current VRG state.
// TODO: This may be safe only when the VRG is primary - check if callers use this correctly.
func (d *DRPCInstance) ensureVRGManifestWork(homeCluster string) error {
d.log.Info("Ensure VRG ManifestWork",
"Last State:", d.getLastDRState(), "cluster", homeCluster)

mw, mwErr := d.mwu.FindManifestWorkByType(rmnutil.MWTypeVRG, homeCluster)
if mwErr != nil {
if errors.IsNotFound(mwErr) {
cachedVrg := d.vrgs[homeCluster]
if cachedVrg == nil {
return fmt.Errorf("failed to get vrg from cluster %s", homeCluster)
}

return d.createVRGManifestWork(homeCluster, cachedVrg.Spec.ReplicationState)
return fmt.Errorf("failed to find ManifestWork for VolumeReplicationGroup from cluster %s", homeCluster)
}

return fmt.Errorf("ensure VRG ManifestWork failed (%w)", mwErr)
return fmt.Errorf("error (%w) in finding ManifestWork for VolumeReplicationGroup from cluster %s",
mwErr, homeCluster)
}

vrg, err := rmnutil.ExtractVRGFromManifestWork(mw)
if err != nil {
return fmt.Errorf("error extracting VRG from ManifestWork for cluster %s. Error: %w", homeCluster, err)
}

d.updateVRGOptionalFields(vrg, homeCluster)
// Safety latch to ensure VRG to update is Primary
if vrg.Spec.ReplicationState != rmn.Primary {
return fmt.Errorf("invalid update for VolumeReplicationGroup in %s spec.replicationState on cluster %s",
vrg.Spec.ReplicationState, homeCluster)
}

// Safety latch to ensure a view exists for the existing VRG ManifestWork
if d.vrgs[homeCluster] == nil {
return fmt.Errorf("missing VolumeReplicationGroup view for cluster %s, while attempting to update an instance",
homeCluster)
}

d.updateVRGOptionalFields(vrg, d.vrgs[homeCluster], homeCluster)

return d.mwu.UpdateVRGManifestWork(vrg, mw)
}

// hasPeerClass finds a peer in the passed in list of peerClasses and returns true if a peer matches the passed in
// storage class name and represents the cluster in the clusterIDs list
// Also see peerClassMatchesPeer
func hasPeerClass(vrgPeerClasses []rmn.PeerClass, scName string, clusterIDs []string) bool {
for peerClassVRGIdx := range vrgPeerClasses {
if (vrgPeerClasses[peerClassVRGIdx].StorageClassName == scName) &&
(slices.Equal(vrgPeerClasses[peerClassVRGIdx].ClusterIDs, clusterIDs)) {
return true
}
}

return false
}

// updatePeers see updateVRGDRTypeSpec
func updatePeers(
vrgFromView *rmn.VolumeReplicationGroup,
vrgPeerClasses, policyPeerClasses []rmn.PeerClass,
) []rmn.PeerClass {
peerClasses := vrgPeerClasses

for pvcIdx := range vrgFromView.Status.ProtectedPVCs {
for policyPeerClassIdx := range policyPeerClasses {
if policyPeerClasses[policyPeerClassIdx].StorageClassName ==
*vrgFromView.Status.ProtectedPVCs[pvcIdx].StorageClassName {
if hasPeerClass(
vrgPeerClasses,
*vrgFromView.Status.ProtectedPVCs[pvcIdx].StorageClassName,
policyPeerClasses[policyPeerClassIdx].ClusterIDs,
) {
break
}

peerClasses = append(
peerClasses,
policyPeerClasses[policyPeerClassIdx],
)
}
}
}

return peerClasses
}

// updateVRGAsyncSpec see updateVRGDRTypeSpec
func (d *DRPCInstance) updateVRGAsyncSpec(vrgFromView, vrg *rmn.VolumeReplicationGroup) {
// vrg will be updated with final contents of asyncSpec
asyncSpec := d.newVRGSpecAsync()
if len(asyncSpec.PeerClasses) == 0 {
// Retain peerClasses from VRG
if vrg.Spec.Async != nil && len(vrg.Spec.Async.PeerClasses) != 0 {
asyncSpec.PeerClasses = vrg.Spec.Async.PeerClasses
}

vrg.Spec.Async = asyncSpec

return
}

// If there is no async spec in VRG, update it with generated spec
// NOTE: Cannot happen! VRG is type Async and is being updated so Async cannot be nil, this is just safety
if vrg.Spec.Async == nil {
vrg.Spec.Async = asyncSpec

return
}

asyncSpec.PeerClasses = updatePeers(vrgFromView, vrg.Spec.Async.PeerClasses, d.drPolicy.Status.Async.PeerClasses)

// TODO: prune peerClasses not in policy and not in use by VRG

vrg.Spec.Async = asyncSpec
}

// updateVRGSyncSpec see updateVRGDRTypeSpec
func (d *DRPCInstance) updateVRGSyncSpec(vrgFromView, vrg *rmn.VolumeReplicationGroup) {
// vrg will be updated with final contents of syncSpec
syncSpec := d.newVRGSpecSync()
if len(syncSpec.PeerClasses) == 0 {
// Retain peerClasses from VRG
if vrg.Spec.Sync != nil && len(vrg.Spec.Sync.PeerClasses) != 0 {
syncSpec.PeerClasses = vrg.Spec.Sync.PeerClasses
}

vrg.Spec.Sync = syncSpec

return
}

// If there is no sync spec in VRG, update it with generated spec
// NOTE: Cannot happen! VRG is type Sync and is being updated so Sync cannot be nil, this is just safety
if vrg.Spec.Sync == nil {
vrg.Spec.Sync = syncSpec

return
}

syncSpec.PeerClasses = updatePeers(vrgFromView, vrg.Spec.Sync.PeerClasses, d.drPolicy.Status.Sync.PeerClasses)

// TODO: prune peerClasses not in policy and not in use by VRG

vrg.Spec.Sync = syncSpec
}

// updateVRGDRTypeSpec updates VRG Sync/Async spec based on the DR type.
// Update works to ensure VRG is updated with peerClasses that it requires, based on reported PVCs that the VRG is
// attempting to protect. If a VRG is attempting to protect a PVC for which is is lacking a peerClass and that is
// available as part of the DRPolicy its peerClasses are updated. For existing peerClasses the VRG information is
// not updated, this is done to avoid any protection mechanism conflicts. For example, if a VRG carried a peerClass
// without the replicationID (ie it would choose to protect the PVC using Volsync and VolumeSnapshots), then it is not
// updated with a peerClass that NOW supports native VolumeReplication, as that would void existing protection.
// To change replication schemes a workload needs to be DR disabled and then reenabled to catch up to the latest
// available peer information for an SC.
func (d *DRPCInstance) updateVRGDRTypeSpec(vrgFromCluster, generatedVRG *rmn.VolumeReplicationGroup) {
switch d.drType {
case DRTypeSync:
d.updateVRGSyncSpec(vrgFromCluster, generatedVRG)
case DRTypeAsync:
d.updateVRGAsyncSpec(vrgFromCluster, generatedVRG)
}
}

// updateVRGOptionalFields ensures that the optional fields in the VRG object are up to date.
// This function does not modify the following fields:
// - ObjectMeta.Name
Expand All @@ -1584,7 +1735,7 @@ func (d *DRPCInstance) ensureVRGManifestWork(homeCluster string) error {
//
// These fields are either set during the initial creation of the VRG (e.g., name and namespace)
// or updated as needed, such as the PrepareForFinalSync and RunFinalSync fields.
func (d *DRPCInstance) updateVRGOptionalFields(vrg *rmn.VolumeReplicationGroup, homeCluster string) {
func (d *DRPCInstance) updateVRGOptionalFields(vrg, vrgFromView *rmn.VolumeReplicationGroup, homeCluster string) {
vrg.ObjectMeta.Annotations = map[string]string{
DestinationClusterAnnotationKey: homeCluster,
DoNotDeletePVCAnnotation: d.instance.GetAnnotations()[DoNotDeletePVCAnnotation],
Expand All @@ -1595,10 +1746,20 @@ func (d *DRPCInstance) updateVRGOptionalFields(vrg *rmn.VolumeReplicationGroup,
vrg.Spec.ProtectedNamespaces = d.instance.Spec.ProtectedNamespaces
vrg.Spec.S3Profiles = AvailableS3Profiles(d.drClusters)
vrg.Spec.KubeObjectProtection = d.instance.Spec.KubeObjectProtection
vrg.Spec.Async = d.generateVRGSpecAsync()
vrg.Spec.Sync = d.generateVRGSpecSync()
vrg.Spec.VolSync.Disabled = d.volSyncDisabled
d.setVRGAction(vrg)

// If vrgFromView nil, then vrg is newly generated, Sync/Async spec is updated unconditionally
if vrgFromView == nil {
switch d.drType {
case DRTypeSync:
vrg.Spec.Sync = d.newVRGSpecSync()
case DRTypeAsync:
vrg.Spec.Async = d.newVRGSpecAsync()
}
} else {
d.updateVRGDRTypeSpec(vrgFromView, vrg)
}
}

func (d *DRPCInstance) ensurePlacement(homeCluster string) error {
Expand Down Expand Up @@ -1633,7 +1794,11 @@ func (d *DRPCInstance) setVRGAction(vrg *rmn.VolumeReplicationGroup) {
vrg.Spec.Action = action
}

func (d *DRPCInstance) newVRG(dstCluster string, repState rmn.ReplicationState) rmn.VolumeReplicationGroup {
func (d *DRPCInstance) newVRG(
dstCluster string,
repState rmn.ReplicationState,
vrgFromView *rmn.VolumeReplicationGroup,
) rmn.VolumeReplicationGroup {
vrg := rmn.VolumeReplicationGroup{
TypeMeta: metav1.TypeMeta{Kind: "VolumeReplicationGroup", APIVersion: "ramendr.openshift.io/v1alpha1"},
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -1646,34 +1811,25 @@ func (d *DRPCInstance) newVRG(dstCluster string, repState rmn.ReplicationState)
},
}

d.updateVRGOptionalFields(&vrg, dstCluster)
d.updateVRGOptionalFields(&vrg, vrgFromView, dstCluster)

return vrg
}

func (d *DRPCInstance) generateVRGSpecAsync() *rmn.VRGAsyncSpec {
if dRPolicySupportsRegional(d.drPolicy, d.drClusters) {
return &rmn.VRGAsyncSpec{
ReplicationClassSelector: d.drPolicy.Spec.ReplicationClassSelector,
VolumeSnapshotClassSelector: d.drPolicy.Spec.VolumeSnapshotClassSelector,
VolumeGroupSnapshotClassSelector: d.drPolicy.Spec.VolumeGroupSnapshotClassSelector,
SchedulingInterval: d.drPolicy.Spec.SchedulingInterval,
}
func (d *DRPCInstance) newVRGSpecAsync() *rmn.VRGAsyncSpec {
return &rmn.VRGAsyncSpec{
ReplicationClassSelector: d.drPolicy.Spec.ReplicationClassSelector,
VolumeSnapshotClassSelector: d.drPolicy.Spec.VolumeSnapshotClassSelector,
VolumeGroupSnapshotClassSelector: d.drPolicy.Spec.VolumeGroupSnapshotClassSelector,
SchedulingInterval: d.drPolicy.Spec.SchedulingInterval,
PeerClasses: d.drPolicy.Status.Async.PeerClasses,
}

return nil
}

func (d *DRPCInstance) generateVRGSpecSync() *rmn.VRGSyncSpec {
if d.drType == DRTypeSync {
return &rmn.VRGSyncSpec{}
func (d *DRPCInstance) newVRGSpecSync() *rmn.VRGSyncSpec {
return &rmn.VRGSyncSpec{
PeerClasses: d.drPolicy.Status.Sync.PeerClasses,
}

return nil
}

func dRPolicySupportsRegional(drpolicy *rmn.DRPolicy, drClusters []rmn.DRCluster) bool {
return rmnutil.DrpolicyRegionNamesAsASet(drpolicy, drClusters).Len() > 1
}

func dRPolicySupportsMetro(drpolicy *rmn.DRPolicy, drclusters []rmn.DRCluster) (
Expand Down
22 changes: 14 additions & 8 deletions internal/controller/drplacementcontrolvolsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (d *DRPCInstance) createOrUpdateVolSyncDestManifestWork(srcCluster string)
annotations[DRPCNameAnnotation] = d.instance.Name
annotations[DRPCNamespaceAnnotation] = d.instance.Namespace

vrg, err := d.refreshRDSpec(srcCluster, dstCluster)
vrg, err := d.refreshVRGSecondarySpec(srcCluster, dstCluster)
if err != nil {
return ctrlutil.OperationResultNone, err
}
Expand All @@ -169,22 +169,28 @@ func (d *DRPCInstance) createOrUpdateVolSyncDestManifestWork(srcCluster string)
return ctrlutil.OperationResultNone, nil
}

func (d *DRPCInstance) refreshRDSpec(srcCluster, dstCluster string) (*rmn.VolumeReplicationGroup, error) {
func (d *DRPCInstance) refreshVRGSecondarySpec(srcCluster, dstCluster string) (*rmn.VolumeReplicationGroup, error) {
d.setProgression(rmn.ProgressionSettingupVolsyncDest)

srcVRG, found := d.vrgs[srcCluster]
srcVRGView, found := d.vrgs[srcCluster]
if !found {
return nil, fmt.Errorf("failed to find source VolSync VRG in cluster %s. VRGs %v", srcCluster, d.vrgs)
}

if len(srcVRG.Status.ProtectedPVCs) == 0 {
d.log.Info("ProtectedPVCs on pirmary cluster is empty")
srcVRG, err := d.getVRGFromManifestWork(srcCluster)
if err != nil {
return nil, fmt.Errorf("failed to find source VRG ManifestWork in cluster %s", srcCluster)
}

dstVRG := d.newVRG(dstCluster, rmn.Secondary, nil)

return nil, WaitForSourceCluster
if len(srcVRGView.Status.ProtectedPVCs) != 0 {
d.resetRDSpec(srcVRGView, &dstVRG)
}

dstVRG := d.newVRG(dstCluster, rmn.Secondary)
d.resetRDSpec(srcVRG, &dstVRG)
// Update destination VRG peerClasses with the source classes, such that when secondary is promoted to primary
// on actions, it uses the same peerClasses as the primary
dstVRG.Spec.Async.PeerClasses = srcVRG.Spec.Async.PeerClasses

return &dstVRG, nil
}
Expand Down