Skip to content

Commit

Permalink
feat(pre-sync-mode): copy RWX PVCs without actually swapping them
Browse files Browse the repository at this point in the history
permits to reduce the downtime for the actual migration
can typically be used with RWX PVCs to create all destination
PVCs and already rsync the data, without having to scale down
the pods/deployments/statefulsets.

at a later stage, the actual swap of the PVC can be done, with
a lesser time duration, as most of the data will already have
been rsync'ed

Signed-off-by: Clément Nussbaumer <[email protected]>

fix(pre-sync-mode): skip volumes not supporting RWX access mode

Signed-off-by: Clément Nussbaumer <[email protected]>

fix: proper matching PVCs count

Signed-off-by: Clément Nussbaumer <[email protected]>

feat:  add max-pvs=n flags

Signed-off-by: Clément Nussbaumer <[email protected]>

feat(pre-sync-mode): run a "prefetch" copy before actually migrating

Signed-off-by: Clément Nussbaumer <[email protected]>
  • Loading branch information
clementnuss committed Oct 18, 2024
1 parent 6c58960 commit 696a8ce
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 46 deletions.
2 changes: 2 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ func main() {
flag.StringVar(&options.Namespace, "namespace", "", "only migrate PVCs within this namespace")
flag.BoolVar(&options.SetDefaults, "set-defaults", false, "change default storage class from source to dest")
flag.BoolVar(&options.VerboseCopy, "verbose-copy", false, "show output from the rsync command used to copy data between PVCs")
flag.BoolVar(&options.PreSyncMode, "pre-sync-mode", false, "create the new PVC and copy the data, then scale down, run another copy and finally swap the PVCs")
flag.BoolVar(&options.SkipSourceValidation, "skip-source-validation", false, "migrate from PVCs using a particular StorageClass name, even if that StorageClass does not exist")
flag.IntVar(&options.MaxPVs, "max-pvs", 0, "maximum number of PVs to process. default to 0 (unlimited)")
flag.IntVar(&podReadyTimeout, "pod-ready-timeout", 60, "length of time to wait (in seconds) for validation pod(s) to go into Ready phase")
flag.IntVar(&deletePVTimeout, "delete-pv-timeout", 300, "length of time to wait (in seconds) for backing PV to be removed when temporary PVC is deleted")
flag.BoolVar(&skipPreflightValidation, "skip-preflight-validation", false, "skip preflight migration validation on the destination storage provider")
Expand Down
4 changes: 2 additions & 2 deletions pkg/k8sutil/truncate.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package k8sutil

import "fmt"

const nameSuffix = "-pvcmigrate"
const PVCNameSuffix = "-pvcmigrate"

// if the length after adding the suffix is more than 253 characters, we need to reduce that to fit within k8s limits
// pruning from the end runs the risk of dropping the '0'/'1'/etc of a statefulset's PVC name
// pruning from the front runs the risk of making a-replica-... and b-replica-... collide
// so this removes characters from the middle of the string
func NewPvcName(originalName string) string {
candidate := originalName + nameSuffix
candidate := originalName + PVCNameSuffix
if len(candidate) <= 253 {
return candidate
}
Expand Down
104 changes: 64 additions & 40 deletions pkg/migrate/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io"
"log"
"strconv"
"strings"
"text/tabwriter"
"time"

Expand Down Expand Up @@ -50,6 +51,8 @@ type Options struct {
Namespace string
SetDefaults bool
VerboseCopy bool
PreSyncMode bool
MaxPVs int
SkipSourceValidation bool
PodReadyTimeout time.Duration
DeletePVTimeout time.Duration
Expand All @@ -62,17 +65,28 @@ func Migrate(ctx context.Context, w *log.Logger, clientset k8sclient.Interface,
return err
}

matchingPVCs, namespaces, err := getPVCs(ctx, w, clientset, options.SourceSCName, options.DestSCName, options.Namespace)
if options.PreSyncMode {
w.Println("\nRunning in pre-sync-mode: we first copy the PVC live, without scaling down pods. Once that pre-sync is completed, we scale down, do another copy/sync and finally swap the PVCs.")
}

matchingPVCs, namespaces, err := getPVCs(ctx, w, clientset, &options)
if err != nil {
return err
}

updatedMatchingPVCs, err := scaleDownPods(ctx, w, clientset, matchingPVCs, time.Second*5)
if options.PreSyncMode {
err = copyAllPVCs(ctx, w, clientset, &options, matchingPVCs, 1*time.Second)
if err != nil {
return err
}
}

err = scaleDownPods(ctx, w, clientset, matchingPVCs, time.Second*5)
if err != nil {
return fmt.Errorf("failed to scale down pods: %w", err)
}

err = copyAllPVCs(ctx, w, clientset, options.SourceSCName, options.DestSCName, options.RsyncImage, updatedMatchingPVCs, options.VerboseCopy, time.Second, options.RsyncFlags)
err = copyAllPVCs(ctx, w, clientset, &options, matchingPVCs, 1*time.Second)
if err != nil {
return err
}
Expand Down Expand Up @@ -169,15 +183,15 @@ func swapDefaultStorageClasses(ctx context.Context, w *log.Logger, clientset k8s
return nil
}

func copyAllPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, sourceSCName string, destSCName string, rsyncImage string, matchingPVCs map[string][]*corev1.PersistentVolumeClaim, verboseCopy bool, waitTime time.Duration, rsyncFlags []string) error {
func copyAllPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, options *Options, matchingPVCs map[string][]*corev1.PersistentVolumeClaim, waitTime time.Duration) error {
// create a pod for each PVC migration, and wait for it to finish
w.Printf("\nCopying data from %s PVCs to %s PVCs\n", sourceSCName, destSCName)
w.Printf("\nCopying data from %s PVCs to %s PVCs\n", options.SourceSCName, options.DestSCName)
for ns, nsPvcs := range matchingPVCs {
for _, nsPvc := range nsPvcs {
sourcePvcName, destPvcName := nsPvc.Name, k8sutil.NewPvcName(nsPvc.Name)
w.Printf("Copying data from %s (%s) to %s in %s\n", sourcePvcName, nsPvc.Spec.VolumeName, destPvcName, ns)

err := copyOnePVC(ctx, w, clientset, ns, sourcePvcName, destPvcName, rsyncImage, verboseCopy, waitTime, rsyncFlags)
err := copyOnePVC(ctx, w, clientset, ns, sourcePvcName, destPvcName, options, waitTime)
if err != nil {
return fmt.Errorf("failed to copy PVC %s in %s: %w", nsPvc.Name, ns, err)
}
Expand All @@ -186,15 +200,15 @@ func copyAllPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interfa
return nil
}

func copyOnePVC(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, ns string, sourcePvcName string, destPvcName string, rsyncImage string, verboseCopy bool, waitTime time.Duration, rsyncFlags []string) error {
func copyOnePVC(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, ns string, sourcePvcName string, destPvcName string, options *Options, waitTime time.Duration) error {
w.Printf("Determining the node to migrate PVC %s on\n", sourcePvcName)
nodeName, err := getDesiredNode(ctx, clientset, ns, sourcePvcName)
if err != nil {
return fmt.Errorf("failed to get node for PVC %s in %s: %w", sourcePvcName, ns, err)
}

w.Printf("Creating pvc migrator pod on node %s\n", nodeName)
createdPod, err := createMigrationPod(ctx, clientset, ns, sourcePvcName, destPvcName, rsyncImage, nodeName, rsyncFlags)
createdPod, err := createMigrationPod(ctx, clientset, ns, sourcePvcName, destPvcName, options.RsyncImage, nodeName, options.RsyncFlags)
if err != nil {
return err
}
Expand Down Expand Up @@ -278,13 +292,13 @@ func copyOnePVC(ctx context.Context, w *log.Logger, clientset k8sclient.Interfac
w.Printf("failed to read pod logs: %v\n", err)
break
}
if verboseCopy {
if options.VerboseCopy {
w.Printf(" %s\n", line)
} else {
_, _ = fmt.Fprintf(w.Writer(), ".") // one dot per line of output
}
}
if !verboseCopy {
if !options.VerboseCopy {
_, _ = fmt.Fprintf(w.Writer(), "done!\n") // add a newline at the end of the dots if not showing pod logs
}

Expand Down Expand Up @@ -416,7 +430,7 @@ func createMigrationPod(ctx context.Context, clientset k8sclient.Interface, ns s
// a map of namespaces to arrays of original PVCs
// an array of namespaces that the PVCs were found within
// an error, if one was encountered
func getPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, sourceSCName, destSCName string, Namespace string) (map[string][]*corev1.PersistentVolumeClaim, []string, error) {
func getPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, opts *Options) (map[string][]*corev1.PersistentVolumeClaim, []string, error) {
// get PVs using the specified storage provider
pvs, err := clientset.CoreV1().PersistentVolumes().List(ctx, metav1.ListOptions{})
if err != nil {
Expand All @@ -425,26 +439,38 @@ func getPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface,
matchingPVs := []corev1.PersistentVolume{}
pvsByName := map[string]corev1.PersistentVolume{}
for _, pv := range pvs.Items {
if pv.Spec.StorageClassName == sourceSCName {
if pv.Spec.StorageClassName == opts.SourceSCName {
matchingPVs = append(matchingPVs, pv)
pvsByName[pv.Name] = pv
} else {
w.Printf("PV %s does not match source SC %s, not migrating\n", pv.Name, sourceSCName)
w.Printf("PV %s does not match source SC %s, not migrating\n", pv.Name, opts.SourceSCName)
}
}

// get PVCs using specified PVs
matchingPVCsCount := 0
matchingPVCs := map[string][]*corev1.PersistentVolumeClaim{}
for _, pv := range matchingPVs {
if opts.MaxPVs > 0 && matchingPVCsCount >= opts.MaxPVs {
break
}
if pv.Spec.ClaimRef != nil {
if len(opts.Namespace) > 0 && pv.Spec.ClaimRef.Namespace != opts.Namespace {
continue // early continue, to prevent logging info regarding PV/PVCs in other namespaces
}

if strings.HasSuffix(pv.Spec.ClaimRef.Name, k8sutil.PVCNameSuffix) {
w.Printf("Skipping PV %s as the claiming PVC has the %s suffix", pv.Name, k8sutil.PVCNameSuffix)
continue
}

pvc, err := clientset.CoreV1().PersistentVolumeClaims(pv.Spec.ClaimRef.Namespace).Get(ctx, pv.Spec.ClaimRef.Name, metav1.GetOptions{})
if err != nil {
return nil, nil, fmt.Errorf("failed to get PVC for PV %s in %s: %w", pv.Spec.ClaimRef.Name, pv.Spec.ClaimRef.Namespace, err)
}

if pv.Spec.ClaimRef.Namespace == Namespace || Namespace == "" {
matchingPVCs[pv.Spec.ClaimRef.Namespace] = append(matchingPVCs[pv.Spec.ClaimRef.Namespace], pvc)
}
matchingPVCsCount++
matchingPVCs[pv.Spec.ClaimRef.Namespace] = append(matchingPVCs[pv.Spec.ClaimRef.Namespace], pvc)

} else {
return nil, nil, fmt.Errorf("PV %s does not have an associated PVC - resolve this before rerunning", pv.Name)
Expand All @@ -456,7 +482,7 @@ func getPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface,
pvcNamespaces = append(pvcNamespaces, idx)
}

w.Printf("\nFound %d matching PVCs to migrate across %d namespaces:\n", len(matchingPVCs), len(pvcNamespaces))
w.Printf("\nFound %d matching PVCs to migrate across %d namespaces:\n", matchingPVCsCount, len(pvcNamespaces))
tw := tabwriter.NewWriter(w.Writer(), 2, 2, 1, ' ', 0)
_, _ = fmt.Fprintf(tw, "namespace:\tpvc:\tpv:\tsize:\t\n")
for ns, nsPvcs := range matchingPVCs {
Expand All @@ -471,7 +497,7 @@ func getPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface,
}

// create new PVCs for each matching PVC
w.Printf("\nCreating new PVCs to migrate data to using the %s StorageClass\n", destSCName)
w.Printf("\nCreating new PVCs to migrate data to using the %s StorageClass\n", opts.DestSCName)
for ns, nsPvcs := range matchingPVCs {
for _, nsPvc := range nsPvcs {
newName := k8sutil.NewPvcName(nsPvc.Name)
Expand All @@ -493,7 +519,7 @@ func getPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface,
return nil, nil, fmt.Errorf("failed to find existing PVC: %w", err)
}
} else if existingPVC != nil {
if existingPVC.Spec.StorageClassName != nil && *existingPVC.Spec.StorageClassName == destSCName {
if existingPVC.Spec.StorageClassName != nil && *existingPVC.Spec.StorageClassName == opts.DestSCName {
existingSize := existingPVC.Spec.Resources.Requests.Storage().String()

if existingSize == desiredPvStorage.String() {
Expand Down Expand Up @@ -525,7 +551,7 @@ func getPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface,
},
},
Spec: corev1.PersistentVolumeClaimSpec{
StorageClassName: &destSCName,
StorageClassName: &opts.DestSCName,
Resources: corev1.VolumeResourceRequirements{
Requests: map[corev1.ResourceName]resource.Quantity{
corev1.ResourceStorage: desiredPvStorage,
Expand Down Expand Up @@ -675,17 +701,15 @@ func mutateSC(ctx context.Context, w *log.Logger, clientset k8sclient.Interface,
// if a pod is not created by pvmigrate, and is not controlled by a statefulset/deployment, this function will return an error.
// if waitForCleanup is true, after scaling down deployments/statefulsets it will wait for all pods to be deleted.
// It returns a map of namespace to PVCs and any errors encountered.
func scaleDownPods(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, matchingPVCs map[string][]*corev1.PersistentVolumeClaim, checkInterval time.Duration) (map[string][]*corev1.PersistentVolumeClaim, error) {
// build new map with complete pvcCtx
updatedPVCs := matchingPVCs
func scaleDownPods(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, matchingPVCs map[string][]*corev1.PersistentVolumeClaim, checkInterval time.Duration) error {

// get pods using specified PVCs
matchingPods := map[string][]corev1.Pod{}
matchingPodsCount := 0
for ns, nsPvcs := range matchingPVCs {
nsPods, err := clientset.CoreV1().Pods(ns).List(ctx, metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get pods in %s: %w", ns, err)
return fmt.Errorf("failed to get pods in %s: %w", ns, err)
}
for _, nsPod := range nsPods.Items {
perPodLoop:
Expand All @@ -706,7 +730,7 @@ func scaleDownPods(ctx context.Context, w *log.Logger, clientset k8sclient.Inter
for ns, nsPvcs := range matchingPVCs {
nsPods, err := clientset.CoreV1().Pods(ns).List(ctx, metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get pods in %s: %w", ns, err)
return fmt.Errorf("failed to get pods in %s: %w", ns, err)
}
for _, nsPod := range nsPods.Items {
for _, podVol := range nsPod.Spec.Volumes {
Expand All @@ -728,7 +752,7 @@ func scaleDownPods(ctx context.Context, w *log.Logger, clientset k8sclient.Inter
return volume.Annotations[sourceNodeAnnotation] == nsPod.Spec.NodeName
})
if err != nil {
return nil, fmt.Errorf("failed to annotate pv %s (backs pvc %s) with node name %s: %w", nsPvClaim.Spec.VolumeName, nsPvClaim.ObjectMeta.Name, nsPod.Spec.NodeName, err)
return fmt.Errorf("failed to annotate pv %s (backs pvc %s) with node name %s: %w", nsPvClaim.Spec.VolumeName, nsPvClaim.ObjectMeta.Name, nsPod.Spec.NodeName, err)
}
}
}
Expand All @@ -747,7 +771,7 @@ func scaleDownPods(ctx context.Context, w *log.Logger, clientset k8sclient.Inter
}
err := tw.Flush()
if err != nil {
return nil, fmt.Errorf("failed to print Pods: %w", err)
return fmt.Errorf("failed to print Pods: %w", err)
}

// get owners controlling specified pods
Expand All @@ -771,11 +795,11 @@ func scaleDownPods(ctx context.Context, w *log.Logger, clientset k8sclient.Inter
// this pod was created by pvmigrate, so it can be deleted by pvmigrate
err := clientset.CoreV1().Pods(ns).Delete(ctx, nsPod.Name, metav1.DeleteOptions{})
if err != nil {
return nil, fmt.Errorf("migration pod %s in %s leftover from a previous run failed to delete, please delete it before retrying: %w", nsPod.Name, ns, err)
return fmt.Errorf("migration pod %s in %s leftover from a previous run failed to delete, please delete it before retrying: %w", nsPod.Name, ns, err)
}
} else {
// TODO: handle properly
return nil, fmt.Errorf("pod %s in %s did not have any owners!\nPlease delete it before retrying", nsPod.Name, ns)
return fmt.Errorf("pod %s in %s did not have any owners!\nPlease delete it before retrying", nsPod.Name, ns)
}
}
}
Expand All @@ -791,7 +815,7 @@ func scaleDownPods(ctx context.Context, w *log.Logger, clientset k8sclient.Inter
case "StatefulSet":
ss, err := clientset.AppsV1().StatefulSets(ns).Get(ctx, ownerName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get statefulset %s scale in %s: %w", ownerName, ns, err)
return fmt.Errorf("failed to get statefulset %s scale in %s: %w", ownerName, ns, err)
}

formerScale := int32(1)
Expand All @@ -812,24 +836,24 @@ func scaleDownPods(ctx context.Context, w *log.Logger, clientset k8sclient.Inter
w.Printf("scaling StatefulSet %s from %d to 0 in %s\n", ownerName, formerScale, ns)
_, err = clientset.AppsV1().StatefulSets(ns).Update(ctx, ss, metav1.UpdateOptions{})
if err != nil {
return nil, fmt.Errorf("failed to scale statefulset %s to zero in %s: %w", ownerName, ns, err)
return fmt.Errorf("failed to scale statefulset %s to zero in %s: %w", ownerName, ns, err)
}
case "ReplicaSet":
rs, err := clientset.AppsV1().ReplicaSets(ns).Get(ctx, ownerName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get replicaset %s in %s: %w", ownerName, ns, err)
return fmt.Errorf("failed to get replicaset %s in %s: %w", ownerName, ns, err)
}

if len(rs.OwnerReferences) != 1 {
return nil, fmt.Errorf("expected 1 owner for replicaset %s in %s, found %d instead", ownerName, ns, len(rs.OwnerReferences))
return fmt.Errorf("expected 1 owner for replicaset %s in %s, found %d instead", ownerName, ns, len(rs.OwnerReferences))
}
if rs.OwnerReferences[0].Kind != "Deployment" {
return nil, fmt.Errorf("expected owner for replicaset %s in %s to be a deployment, found %s of kind %s instead", ownerName, ns, rs.OwnerReferences[0].Name, rs.OwnerReferences[0].Kind)
return fmt.Errorf("expected owner for replicaset %s in %s to be a deployment, found %s of kind %s instead", ownerName, ns, rs.OwnerReferences[0].Name, rs.OwnerReferences[0].Kind)
}

dep, err := clientset.AppsV1().Deployments(ns).Get(ctx, rs.OwnerReferences[0].Name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get deployment %s scale in %s: %w", ownerName, ns, err)
return fmt.Errorf("failed to get deployment %s scale in %s: %w", ownerName, ns, err)
}

formerScale := int32(1)
Expand All @@ -850,10 +874,10 @@ func scaleDownPods(ctx context.Context, w *log.Logger, clientset k8sclient.Inter
w.Printf("scaling Deployment %s from %d to 0 in %s\n", ownerName, formerScale, ns)
_, err = clientset.AppsV1().Deployments(ns).Update(ctx, dep, metav1.UpdateOptions{})
if err != nil {
return nil, fmt.Errorf("failed to scale statefulset %s to zero in %s: %w", ownerName, ns, err)
return fmt.Errorf("failed to scale statefulset %s to zero in %s: %w", ownerName, ns, err)
}
default:
return nil, fmt.Errorf("scaling pods controlled by a %s is not supported, please delete the pods controlled by %s in %s before retrying", ownerKind, ownerKind, ns)
return fmt.Errorf("scaling pods controlled by a %s is not supported, please delete the pods controlled by %s in %s before retrying", ownerKind, ownerKind, ns)
}
}
}
Expand All @@ -867,15 +891,15 @@ checkPvcPodLoop:
for ns, nsPvcs := range matchingPVCs {
nsPods, err := clientset.CoreV1().Pods(ns).List(ctx, metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get pods in %s: %w", ns, err)
return fmt.Errorf("failed to get pods in %s: %w", ns, err)
}
for _, nsPod := range nsPods.Items {
for _, podVol := range nsPod.Spec.Volumes {
if podVol.PersistentVolumeClaim != nil {
for _, nsClaim := range nsPvcs {
if podVol.PersistentVolumeClaim.ClaimName == nsClaim.Name {
if nsPod.CreationTimestamp.After(migrationStartTime) {
return nil, fmt.Errorf("pod %s in %s mounting %s was created at %s, after scale-down started at %s. It is likely that there is some other operator scaling this back up", nsPod.Name, ns, nsClaim.Name, nsPod.CreationTimestamp.Format(time.RFC3339), migrationStartTime.Format(time.RFC3339))
return fmt.Errorf("pod %s in %s mounting %s was created at %s, after scale-down started at %s. It is likely that there is some other operator scaling this back up", nsPod.Name, ns, nsClaim.Name, nsPod.CreationTimestamp.Format(time.RFC3339), migrationStartTime.Format(time.RFC3339))
}

w.Printf("Found pod %s in %s mounting to-be-migrated PVC %s, waiting\n", nsPod.Name, ns, nsClaim.Name)
Expand All @@ -892,7 +916,7 @@ checkPvcPodLoop:
}

w.Printf("All pods removed successfully\n")
return updatedPVCs, nil
return nil
}

func scaleUpPods(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, namespaces []string) error {
Expand Down
Loading

0 comments on commit 696a8ce

Please sign in to comment.