Skip to content

Commit

Permalink
Fix DRPC stuck in 'peer not ready' when VolSync is disabled
Browse files Browse the repository at this point in the history
  • Loading branch information
BenamarMk committed May 18, 2022
1 parent 28dfcc3 commit 8ea33c7
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 33 deletions.
70 changes: 41 additions & 29 deletions controllers/drplacementcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (d *DRPCInstance) RunInitialDeployment() (bool, error) {
metav1.ConditionTrue, rmn.ReasonSuccess, "Ready")
}

d.instance.Status.Progression = ""
d.instance.Status.Progression = "Completed"

if d.instance.Status.ActionDuration == nil {
duration := time.Since(d.instance.Status.ActionStartTime.Time)
Expand Down Expand Up @@ -368,7 +368,7 @@ func (d *DRPCInstance) RunFailover() (bool, error) {
return !done, err
}

d.instance.Status.Progression = ""
d.instance.Status.Progression = "Completed"

if d.instance.Status.ActionDuration == nil {
duration := time.Since(d.instance.Status.ActionStartTime.Time)
Expand Down Expand Up @@ -505,7 +505,7 @@ func (d *DRPCInstance) getCurrentHomeClusterName() string {
// - Relocate!
//nolint:funlen
func (d *DRPCInstance) RunRelocate() (bool, error) { //nolint:gocognit,cyclop
d.log.Info("Entering RunRelocate", "state", d.getLastDRState())
d.log.Info("Entering RunRelocate", "state", d.getLastDRState(), "progression", d.getProgression())

if !d.isRelocatingOrRelocated() {
d.instance.Status.ActionStartTime = &metav1.Time{Time: time.Now()}
Expand Down Expand Up @@ -539,7 +539,7 @@ func (d *DRPCInstance) RunRelocate() (bool, error) { //nolint:gocognit,cyclop
return !done, err
}

d.instance.Status.Progression = ""
d.instance.Status.Progression = "Competed"

if d.instance.Status.ActionDuration == nil {
duration := time.Since(d.instance.Status.ActionStartTime.Time)
Expand Down Expand Up @@ -579,15 +579,23 @@ func (d *DRPCInstance) RunRelocate() (bool, error) { //nolint:gocognit,cyclop
return d.relocate(preferredCluster, preferredClusterNamespace, rmn.Relocating)
}

func (d *DRPCInstance) ensureCleanupAndVolSyncReplicationSetup(targetCluster string) error {
func (d *DRPCInstance) ensureCleanupAndVolSyncReplicationSetup(srcCluster string) error {
// If we have VolSync replication, this is the perfect time to reset the RDSpec
// on the primary. This will cause the RD to be cleared on the primary
err := d.ResetVolSyncRDOnPrimary(targetCluster)
err := d.ResetVolSyncRDOnPrimary(srcCluster)
if err != nil {
return err
}

clusterToSkip := targetCluster
// Check if the reset has already been applied. ResetVolSyncRDOnPrimary resets the VRG
// in the MW, but the VRGs in the vrgs slice are fetched using using MCV.
vrg, ok := d.vrgs[srcCluster]
if !ok || len(vrg.Spec.VolSync.RDSpec) != 0 {
return fmt.Errorf(fmt.Sprintf("Waiting for RDSpec count on cluster %s to go to zero. VRG OK? %v",
srcCluster, ok))
}

clusterToSkip := srcCluster

err = d.EnsureCleanup(clusterToSkip)
if err != nil {
Expand All @@ -596,7 +604,7 @@ func (d *DRPCInstance) ensureCleanupAndVolSyncReplicationSetup(targetCluster str

// After we ensured peers are clean, The VolSync ReplicationSource (RS) will automatically get
// created, but for the ReplicationDestination, we need to explicitly tell the VRG to create it.
err = d.EnsureVolSyncReplicationSetup(targetCluster)
err = d.EnsureVolSyncReplicationSetup(srcCluster)
if err != nil {
return err
}
Expand Down Expand Up @@ -960,7 +968,7 @@ func (d *DRPCInstance) createVRGManifestWorkAsPrimary(targetCluster string) (boo
return false, nil
}

err := d.updateVRGState(targetCluster, rmn.Primary)
_, err := d.updateVRGState(targetCluster, rmn.Primary)
if err != nil {
d.log.Info(fmt.Sprintf("Failed to update VRG to primary on cluster %s. Err (%v)", targetCluster, err))

Expand Down Expand Up @@ -1011,7 +1019,7 @@ func (d *DRPCInstance) moveVRGToSecondaryEverywhere() bool {
failedCount := 0

for _, clusterName := range rmnutil.DrpolicyClusterNames(d.drPolicy) {
err := d.updateVRGState(clusterName, rmn.Secondary)
_, err := d.updateVRGState(clusterName, rmn.Secondary)
if err != nil {
if errors.IsNotFound(err) {
continue
Expand Down Expand Up @@ -1041,7 +1049,7 @@ func (d *DRPCInstance) moveVRGToSecondaryOnPeers(clusterToSkip string) error {
continue
}

err := d.updateVRGState(clusterName, rmn.Secondary)
_, err := d.updateVRGState(clusterName, rmn.Secondary)
if err != nil {
d.log.Info(fmt.Sprintf("Failed to update VRG to secondary on cluster %s. Err (%v)", clusterName, err))

Expand Down Expand Up @@ -1350,11 +1358,6 @@ func (d *DRPCInstance) EnsureCleanup(clusterToSkip string) error {

d.log.Info(fmt.Sprintf("PeerReady Condition %v", condition))

err := d.moveVRGToSecondaryOnPeers(clusterToSkip)
if err != nil {
return fmt.Errorf("failed to ensure VRG is secondary on peers (%w)", err)
}

// IFF we have VolSync PVCs, then no need to clean up
homeCluster := clusterToSkip

Expand All @@ -1364,7 +1367,13 @@ func (d *DRPCInstance) EnsureCleanup(clusterToSkip string) error {
}

if repReq {
d.log.Info("No need to clean up secondaries. VolSync needs both VRGs")
d.log.Info("VolSync needs both VRGs. No need to clean up secondary")
d.log.Info("Ensure secondary on peer")

err := d.moveVRGToSecondaryOnPeers(clusterToSkip)
if err != nil {
return fmt.Errorf("failed to ensure VRG is secondary on peers (%w)", err)
}

peersReady := true

Expand Down Expand Up @@ -1449,13 +1458,12 @@ func (d *DRPCInstance) ensureVRGManifestWorkOnClusterDeleted(clusterName string)
return !done, fmt.Errorf("failed to retrieve ManifestWork (%w)", err)
}

// if !IsManifestInAppliedState(mw) {
// d.log.Info(fmt.Sprintf("ManifestWork %s/%s NOT in Applied state", mw.Namespace, mw.Name))
// // Wait for MW to be applied. The DRPC reconciliation will be called then
// return done, nil
// }

// d.log.Info("VRG ManifestWork is in Applied state", "name", mw.Name, "cluster", clusterName)
// If .spec.ReplicateSpec has not already been updated to secondary, then update it.
// If we do update it to secondary, then we have to wait for the MW to be applied
updated, err := d.updateVRGState(clusterName, rmn.Secondary)
if err != nil || updated {
return !done, err
}

if d.ensureVRGIsSecondaryOnCluster(clusterName) {
err := d.mwu.DeleteManifestWorksForCluster(clusterName)
Expand Down Expand Up @@ -1614,19 +1622,19 @@ func (d *DRPCInstance) ensureVRGDeleted(clusterName string) bool {
return false
}

func (d *DRPCInstance) updateVRGState(clusterName string, state rmn.ReplicationState) error {
func (d *DRPCInstance) updateVRGState(clusterName string, state rmn.ReplicationState) (bool, error) {
d.log.Info(fmt.Sprintf("Updating VRG to secondary for cluster %s", clusterName))

vrg, err := d.getVRGFromManifestWork(clusterName)
if err != nil {
return fmt.Errorf("failed to update VRG state. ClusterName %s (%w)",
return false, fmt.Errorf("failed to update VRG state. ClusterName %s (%w)",
clusterName, err)
}

if vrg.Spec.ReplicationState == state {
d.log.Info(fmt.Sprintf("VRG %s already %s on this cluster %s", vrg.Name, state, clusterName))

return nil
return false, nil
}

vrg.Spec.ReplicationState = state
Expand All @@ -1638,12 +1646,12 @@ func (d *DRPCInstance) updateVRGState(clusterName string, state rmn.ReplicationS

err = d.updateManifestWork(clusterName, vrg)
if err != nil {
return err
return false, err
}

d.log.Info(fmt.Sprintf("Updated VRG %s running in cluster %s to secondary", vrg.Name, clusterName))

return nil
return true, nil
}

func (d *DRPCInstance) updateVRGToPrepareForFinalSync(clusterName string) error {
Expand Down Expand Up @@ -1865,6 +1873,10 @@ func (d *DRPCInstance) getLastDRState() rmn.DRState {
return d.instance.Status.Phase
}

func (d *DRPCInstance) getProgression() string {
return d.instance.Status.Progression
}

//nolint:exhaustive
func (d *DRPCInstance) getRequeueDuration() time.Duration {
d.log.Info("Getting requeue duration", "last known DR state", d.getLastDRState())
Expand Down
5 changes: 1 addition & 4 deletions controllers/drplacementcontrolvolsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,6 @@ func (d *DRPCInstance) ensureVolSyncReplicationCommon(srcCluster string) error {
return fmt.Errorf("%w", err)
}

d.instance.Status.Progression = ""

return nil
}

Expand Down Expand Up @@ -125,12 +123,11 @@ func (d *DRPCInstance) ensureVolSyncReplicationDestination(srcCluster string) er
}
}

d.log.Info(fmt.Sprintf("Ensured VolSync replication destination for cluster %s", dstCluster))
// TODO: Should we handle more than one dstVRG? For now, just settle for one.
break
}

d.instance.Status.Progression = ""

return nil
}

Expand Down

0 comments on commit 8ea33c7

Please sign in to comment.