Skip to content

Commit

Permalink
feat(pre-sync-mode): add an optional pre-copy stage to reduce downtime
Browse files Browse the repository at this point in the history
permits to reduce the downtime for the actual migration: we create the
destination PVCs and already rsync the data, without having to scale
down the pods/deployments/statefulsets.
once the copy/rsync stage is complete, we pursue by scaling down pods,
doing another copy/rsync (to ensure data consistency), finally swapping
the PVCs and scaling up

fix: proper matching PVCs count
feat:  add max-pvs=n flags
feat(pre-sync-mode): run a "prefetch" copy before actually migrating

Signed-off-by: Clément Nussbaumer <[email protected]>
  • Loading branch information
clementnuss committed Oct 18, 2024
1 parent 6c58960 commit 2ced425
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 46 deletions.
2 changes: 2 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ func main() {
flag.StringVar(&options.Namespace, "namespace", "", "only migrate PVCs within this namespace")
flag.BoolVar(&options.SetDefaults, "set-defaults", false, "change default storage class from source to dest")
flag.BoolVar(&options.VerboseCopy, "verbose-copy", false, "show output from the rsync command used to copy data between PVCs")
flag.BoolVar(&options.PreSyncMode, "pre-sync-mode", false, "create the new PVC and copy the data, then scale down, run another copy and finally swap the PVCs")
flag.BoolVar(&options.SkipSourceValidation, "skip-source-validation", false, "migrate from PVCs using a particular StorageClass name, even if that StorageClass does not exist")
flag.IntVar(&options.MaxPVs, "max-pvs", 0, "maximum number of PVs to process. default to 0 (unlimited)")
flag.IntVar(&podReadyTimeout, "pod-ready-timeout", 60, "length of time to wait (in seconds) for validation pod(s) to go into Ready phase")
flag.IntVar(&deletePVTimeout, "delete-pv-timeout", 300, "length of time to wait (in seconds) for backing PV to be removed when temporary PVC is deleted")
flag.BoolVar(&skipPreflightValidation, "skip-preflight-validation", false, "skip preflight migration validation on the destination storage provider")
Expand Down
4 changes: 2 additions & 2 deletions pkg/k8sutil/truncate.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package k8sutil

import "fmt"

const nameSuffix = "-pvcmigrate"
const PVCNameSuffix = "-pvcmigrate"

// if the length after adding the suffix is more than 253 characters, we need to reduce that to fit within k8s limits
// pruning from the end runs the risk of dropping the '0'/'1'/etc of a statefulset's PVC name
// pruning from the front runs the risk of making a-replica-... and b-replica-... collide
// so this removes characters from the middle of the string
func NewPvcName(originalName string) string {
candidate := originalName + nameSuffix
candidate := originalName + PVCNameSuffix
if len(candidate) <= 253 {
return candidate
}
Expand Down
104 changes: 64 additions & 40 deletions pkg/migrate/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io"
"log"
"strconv"
"strings"
"text/tabwriter"
"time"

Expand Down Expand Up @@ -50,6 +51,8 @@ type Options struct {
Namespace string
SetDefaults bool
VerboseCopy bool
PreSyncMode bool
MaxPVs int
SkipSourceValidation bool
PodReadyTimeout time.Duration
DeletePVTimeout time.Duration
Expand All @@ -62,17 +65,28 @@ func Migrate(ctx context.Context, w *log.Logger, clientset k8sclient.Interface,
return err
}

matchingPVCs, namespaces, err := getPVCs(ctx, w, clientset, options.SourceSCName, options.DestSCName, options.Namespace)
if options.PreSyncMode {
w.Println("\nRunning in pre-sync-mode: we first copy the PVC live, without scaling down pods. Once that pre-sync is completed, we scale down, do another copy/sync and finally swap the PVCs.")
}

matchingPVCs, namespaces, err := getPVCs(ctx, w, clientset, &options)
if err != nil {
return err
}

updatedMatchingPVCs, err := scaleDownPods(ctx, w, clientset, matchingPVCs, time.Second*5)
if options.PreSyncMode {
err = copyAllPVCs(ctx, w, clientset, &options, matchingPVCs, 1*time.Second)
if err != nil {
return err
}
}

err = scaleDownPods(ctx, w, clientset, matchingPVCs, time.Second*5)
if err != nil {
return fmt.Errorf("failed to scale down pods: %w", err)
}

err = copyAllPVCs(ctx, w, clientset, options.SourceSCName, options.DestSCName, options.RsyncImage, updatedMatchingPVCs, options.VerboseCopy, time.Second, options.RsyncFlags)
err = copyAllPVCs(ctx, w, clientset, &options, matchingPVCs, 1*time.Second)
if err != nil {
return err
}
Expand Down Expand Up @@ -169,15 +183,15 @@ func swapDefaultStorageClasses(ctx context.Context, w *log.Logger, clientset k8s
return nil
}

func copyAllPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, sourceSCName string, destSCName string, rsyncImage string, matchingPVCs map[string][]*corev1.PersistentVolumeClaim, verboseCopy bool, waitTime time.Duration, rsyncFlags []string) error {
func copyAllPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, options *Options, matchingPVCs map[string][]*corev1.PersistentVolumeClaim, waitTime time.Duration) error {
// create a pod for each PVC migration, and wait for it to finish
w.Printf("\nCopying data from %s PVCs to %s PVCs\n", sourceSCName, destSCName)
w.Printf("\nCopying data from %s PVCs to %s PVCs\n", options.SourceSCName, options.DestSCName)
for ns, nsPvcs := range matchingPVCs {
for _, nsPvc := range nsPvcs {
sourcePvcName, destPvcName := nsPvc.Name, k8sutil.NewPvcName(nsPvc.Name)
w.Printf("Copying data from %s (%s) to %s in %s\n", sourcePvcName, nsPvc.Spec.VolumeName, destPvcName, ns)

err := copyOnePVC(ctx, w, clientset, ns, sourcePvcName, destPvcName, rsyncImage, verboseCopy, waitTime, rsyncFlags)
err := copyOnePVC(ctx, w, clientset, ns, sourcePvcName, destPvcName, options, waitTime)
if err != nil {
return fmt.Errorf("failed to copy PVC %s in %s: %w", nsPvc.Name, ns, err)
}
Expand All @@ -186,15 +200,15 @@ func copyAllPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interfa
return nil
}

func copyOnePVC(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, ns string, sourcePvcName string, destPvcName string, rsyncImage string, verboseCopy bool, waitTime time.Duration, rsyncFlags []string) error {
func copyOnePVC(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, ns string, sourcePvcName string, destPvcName string, options *Options, waitTime time.Duration) error {
w.Printf("Determining the node to migrate PVC %s on\n", sourcePvcName)
nodeName, err := getDesiredNode(ctx, clientset, ns, sourcePvcName)
if err != nil {
return fmt.Errorf("failed to get node for PVC %s in %s: %w", sourcePvcName, ns, err)
}

w.Printf("Creating pvc migrator pod on node %s\n", nodeName)
createdPod, err := createMigrationPod(ctx, clientset, ns, sourcePvcName, destPvcName, rsyncImage, nodeName, rsyncFlags)
createdPod, err := createMigrationPod(ctx, clientset, ns, sourcePvcName, destPvcName, options.RsyncImage, nodeName, options.RsyncFlags)
if err != nil {
return err
}
Expand Down Expand Up @@ -278,13 +292,13 @@ func copyOnePVC(ctx context.Context, w *log.Logger, clientset k8sclient.Interfac
w.Printf("failed to read pod logs: %v\n", err)
break
}
if verboseCopy {
if options.VerboseCopy {
w.Printf(" %s\n", line)
} else {
_, _ = fmt.Fprintf(w.Writer(), ".") // one dot per line of output
}
}
if !verboseCopy {
if !options.VerboseCopy {
_, _ = fmt.Fprintf(w.Writer(), "done!\n") // add a newline at the end of the dots if not showing pod logs
}

Expand Down Expand Up @@ -416,7 +430,7 @@ func createMigrationPod(ctx context.Context, clientset k8sclient.Interface, ns s
// a map of namespaces to arrays of original PVCs
// an array of namespaces that the PVCs were found within
// an error, if one was encountered
func getPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, sourceSCName, destSCName string, Namespace string) (map[string][]*corev1.PersistentVolumeClaim, []string, error) {
func getPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, opts *Options) (map[string][]*corev1.PersistentVolumeClaim, []string, error) {
// get PVs using the specified storage provider
pvs, err := clientset.CoreV1().PersistentVolumes().List(ctx, metav1.ListOptions{})
if err != nil {
Expand All @@ -425,26 +439,38 @@ func getPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface,
matchingPVs := []corev1.PersistentVolume{}
pvsByName := map[string]corev1.PersistentVolume{}
for _, pv := range pvs.Items {
if pv.Spec.StorageClassName == sourceSCName {
if pv.Spec.StorageClassName == opts.SourceSCName {
matchingPVs = append(matchingPVs, pv)
pvsByName[pv.Name] = pv
} else {
w.Printf("PV %s does not match source SC %s, not migrating\n", pv.Name, sourceSCName)
w.Printf("PV %s does not match source SC %s, not migrating\n", pv.Name, opts.SourceSCName)
}
}

// get PVCs using specified PVs
matchingPVCsCount := 0
matchingPVCs := map[string][]*corev1.PersistentVolumeClaim{}
for _, pv := range matchingPVs {
if opts.MaxPVs > 0 && matchingPVCsCount >= opts.MaxPVs {
break
}
if pv.Spec.ClaimRef != nil {
if len(opts.Namespace) > 0 && pv.Spec.ClaimRef.Namespace != opts.Namespace {
continue // early continue, to prevent logging info regarding PV/PVCs in other namespaces
}

if strings.HasSuffix(pv.Spec.ClaimRef.Name, k8sutil.PVCNameSuffix) {
w.Printf("Skipping PV %s as the claiming PVC has the %s suffix", pv.Name, k8sutil.PVCNameSuffix)
continue
}

pvc, err := clientset.CoreV1().PersistentVolumeClaims(pv.Spec.ClaimRef.Namespace).Get(ctx, pv.Spec.ClaimRef.Name, metav1.GetOptions{})
if err != nil {
return nil, nil, fmt.Errorf("failed to get PVC for PV %s in %s: %w", pv.Spec.ClaimRef.Name, pv.Spec.ClaimRef.Namespace, err)
}

if pv.Spec.ClaimRef.Namespace == Namespace || Namespace == "" {
matchingPVCs[pv.Spec.ClaimRef.Namespace] = append(matchingPVCs[pv.Spec.ClaimRef.Namespace], pvc)
}
matchingPVCsCount++
matchingPVCs[pv.Spec.ClaimRef.Namespace] = append(matchingPVCs[pv.Spec.ClaimRef.Namespace], pvc)

} else {
return nil, nil, fmt.Errorf("PV %s does not have an associated PVC - resolve this before rerunning", pv.Name)
Expand All @@ -456,7 +482,7 @@ func getPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface,
pvcNamespaces = append(pvcNamespaces, idx)
}

w.Printf("\nFound %d matching PVCs to migrate across %d namespaces:\n", len(matchingPVCs), len(pvcNamespaces))
w.Printf("\nFound %d matching PVCs to migrate across %d namespaces:\n", matchingPVCsCount, len(pvcNamespaces))
tw := tabwriter.NewWriter(w.Writer(), 2, 2, 1, ' ', 0)
_, _ = fmt.Fprintf(tw, "namespace:\tpvc:\tpv:\tsize:\t\n")
for ns, nsPvcs := range matchingPVCs {
Expand All @@ -471,7 +497,7 @@ func getPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface,
}

// create new PVCs for each matching PVC
w.Printf("\nCreating new PVCs to migrate data to using the %s StorageClass\n", destSCName)
w.Printf("\nCreating new PVCs to migrate data to using the %s StorageClass\n", opts.DestSCName)
for ns, nsPvcs := range matchingPVCs {
for _, nsPvc := range nsPvcs {
newName := k8sutil.NewPvcName(nsPvc.Name)
Expand All @@ -493,7 +519,7 @@ func getPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface,
return nil, nil, fmt.Errorf("failed to find existing PVC: %w", err)
}
} else if existingPVC != nil {
if existingPVC.Spec.StorageClassName != nil && *existingPVC.Spec.StorageClassName == destSCName {
if existingPVC.Spec.StorageClassName != nil && *existingPVC.Spec.StorageClassName == opts.DestSCName {
existingSize := existingPVC.Spec.Resources.Requests.Storage().String()

if existingSize == desiredPvStorage.String() {
Expand Down Expand Up @@ -525,7 +551,7 @@ func getPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface,
},
},
Spec: corev1.PersistentVolumeClaimSpec{
StorageClassName: &destSCName,
StorageClassName: &opts.DestSCName,
Resources: corev1.VolumeResourceRequirements{
Requests: map[corev1.ResourceName]resource.Quantity{
corev1.ResourceStorage: desiredPvStorage,
Expand Down Expand Up @@ -675,17 +701,15 @@ func mutateSC(ctx context.Context, w *log.Logger, clientset k8sclient.Interface,
// if a pod is not created by pvmigrate, and is not controlled by a statefulset/deployment, this function will return an error.
// if waitForCleanup is true, after scaling down deployments/statefulsets it will wait for all pods to be deleted.
// It returns a map of namespace to PVCs and any errors encountered.
func scaleDownPods(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, matchingPVCs map[string][]*corev1.PersistentVolumeClaim, checkInterval time.Duration) (map[string][]*corev1.PersistentVolumeClaim, error) {
// build new map with complete pvcCtx
updatedPVCs := matchingPVCs
func scaleDownPods(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, matchingPVCs map[string][]*corev1.PersistentVolumeClaim, checkInterval time.Duration) error {

// get pods using specified PVCs
matchingPods := map[string][]corev1.Pod{}
matchingPodsCount := 0
for ns, nsPvcs := range matchingPVCs {
nsPods, err := clientset.CoreV1().Pods(ns).List(ctx, metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get pods in %s: %w", ns, err)
return fmt.Errorf("failed to get pods in %s: %w", ns, err)
}
for _, nsPod := range nsPods.Items {
perPodLoop:
Expand All @@ -706,7 +730,7 @@ func scaleDownPods(ctx context.Context, w *log.Logger, clientset k8sclient.Inter
for ns, nsPvcs := range matchingPVCs {
nsPods, err := clientset.CoreV1().Pods(ns).List(ctx, metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get pods in %s: %w", ns, err)
return fmt.Errorf("failed to get pods in %s: %w", ns, err)
}
for _, nsPod := range nsPods.Items {
for _, podVol := range nsPod.Spec.Volumes {
Expand All @@ -728,7 +752,7 @@ func scaleDownPods(ctx context.Context, w *log.Logger, clientset k8sclient.Inter
return volume.Annotations[sourceNodeAnnotation] == nsPod.Spec.NodeName
})
if err != nil {
return nil, fmt.Errorf("failed to annotate pv %s (backs pvc %s) with node name %s: %w", nsPvClaim.Spec.VolumeName, nsPvClaim.ObjectMeta.Name, nsPod.Spec.NodeName, err)
return fmt.Errorf("failed to annotate pv %s (backs pvc %s) with node name %s: %w", nsPvClaim.Spec.VolumeName, nsPvClaim.ObjectMeta.Name, nsPod.Spec.NodeName, err)
}
}
}
Expand All @@ -747,7 +771,7 @@ func scaleDownPods(ctx context.Context, w *log.Logger, clientset k8sclient.Inter
}
err := tw.Flush()
if err != nil {
return nil, fmt.Errorf("failed to print Pods: %w", err)
return fmt.Errorf("failed to print Pods: %w", err)
}

// get owners controlling specified pods
Expand All @@ -771,11 +795,11 @@ func scaleDownPods(ctx context.Context, w *log.Logger, clientset k8sclient.Inter
// this pod was created by pvmigrate, so it can be deleted by pvmigrate
err := clientset.CoreV1().Pods(ns).Delete(ctx, nsPod.Name, metav1.DeleteOptions{})
if err != nil {
return nil, fmt.Errorf("migration pod %s in %s leftover from a previous run failed to delete, please delete it before retrying: %w", nsPod.Name, ns, err)
return fmt.Errorf("migration pod %s in %s leftover from a previous run failed to delete, please delete it before retrying: %w", nsPod.Name, ns, err)
}
} else {
// TODO: handle properly
return nil, fmt.Errorf("pod %s in %s did not have any owners!\nPlease delete it before retrying", nsPod.Name, ns)
return fmt.Errorf("pod %s in %s did not have any owners!\nPlease delete it before retrying", nsPod.Name, ns)
}
}
}
Expand All @@ -791,7 +815,7 @@ func scaleDownPods(ctx context.Context, w *log.Logger, clientset k8sclient.Inter
case "StatefulSet":
ss, err := clientset.AppsV1().StatefulSets(ns).Get(ctx, ownerName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get statefulset %s scale in %s: %w", ownerName, ns, err)
return fmt.Errorf("failed to get statefulset %s scale in %s: %w", ownerName, ns, err)
}

formerScale := int32(1)
Expand All @@ -812,24 +836,24 @@ func scaleDownPods(ctx context.Context, w *log.Logger, clientset k8sclient.Inter
w.Printf("scaling StatefulSet %s from %d to 0 in %s\n", ownerName, formerScale, ns)
_, err = clientset.AppsV1().StatefulSets(ns).Update(ctx, ss, metav1.UpdateOptions{})
if err != nil {
return nil, fmt.Errorf("failed to scale statefulset %s to zero in %s: %w", ownerName, ns, err)
return fmt.Errorf("failed to scale statefulset %s to zero in %s: %w", ownerName, ns, err)
}
case "ReplicaSet":
rs, err := clientset.AppsV1().ReplicaSets(ns).Get(ctx, ownerName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get replicaset %s in %s: %w", ownerName, ns, err)
return fmt.Errorf("failed to get replicaset %s in %s: %w", ownerName, ns, err)
}

if len(rs.OwnerReferences) != 1 {
return nil, fmt.Errorf("expected 1 owner for replicaset %s in %s, found %d instead", ownerName, ns, len(rs.OwnerReferences))
return fmt.Errorf("expected 1 owner for replicaset %s in %s, found %d instead", ownerName, ns, len(rs.OwnerReferences))
}
if rs.OwnerReferences[0].Kind != "Deployment" {
return nil, fmt.Errorf("expected owner for replicaset %s in %s to be a deployment, found %s of kind %s instead", ownerName, ns, rs.OwnerReferences[0].Name, rs.OwnerReferences[0].Kind)
return fmt.Errorf("expected owner for replicaset %s in %s to be a deployment, found %s of kind %s instead", ownerName, ns, rs.OwnerReferences[0].Name, rs.OwnerReferences[0].Kind)
}

dep, err := clientset.AppsV1().Deployments(ns).Get(ctx, rs.OwnerReferences[0].Name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get deployment %s scale in %s: %w", ownerName, ns, err)
return fmt.Errorf("failed to get deployment %s scale in %s: %w", ownerName, ns, err)
}

formerScale := int32(1)
Expand All @@ -850,10 +874,10 @@ func scaleDownPods(ctx context.Context, w *log.Logger, clientset k8sclient.Inter
w.Printf("scaling Deployment %s from %d to 0 in %s\n", ownerName, formerScale, ns)
_, err = clientset.AppsV1().Deployments(ns).Update(ctx, dep, metav1.UpdateOptions{})
if err != nil {
return nil, fmt.Errorf("failed to scale statefulset %s to zero in %s: %w", ownerName, ns, err)
return fmt.Errorf("failed to scale statefulset %s to zero in %s: %w", ownerName, ns, err)
}
default:
return nil, fmt.Errorf("scaling pods controlled by a %s is not supported, please delete the pods controlled by %s in %s before retrying", ownerKind, ownerKind, ns)
return fmt.Errorf("scaling pods controlled by a %s is not supported, please delete the pods controlled by %s in %s before retrying", ownerKind, ownerKind, ns)
}
}
}
Expand All @@ -867,15 +891,15 @@ checkPvcPodLoop:
for ns, nsPvcs := range matchingPVCs {
nsPods, err := clientset.CoreV1().Pods(ns).List(ctx, metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get pods in %s: %w", ns, err)
return fmt.Errorf("failed to get pods in %s: %w", ns, err)
}
for _, nsPod := range nsPods.Items {
for _, podVol := range nsPod.Spec.Volumes {
if podVol.PersistentVolumeClaim != nil {
for _, nsClaim := range nsPvcs {
if podVol.PersistentVolumeClaim.ClaimName == nsClaim.Name {
if nsPod.CreationTimestamp.After(migrationStartTime) {
return nil, fmt.Errorf("pod %s in %s mounting %s was created at %s, after scale-down started at %s. It is likely that there is some other operator scaling this back up", nsPod.Name, ns, nsClaim.Name, nsPod.CreationTimestamp.Format(time.RFC3339), migrationStartTime.Format(time.RFC3339))
return fmt.Errorf("pod %s in %s mounting %s was created at %s, after scale-down started at %s. It is likely that there is some other operator scaling this back up", nsPod.Name, ns, nsClaim.Name, nsPod.CreationTimestamp.Format(time.RFC3339), migrationStartTime.Format(time.RFC3339))
}

w.Printf("Found pod %s in %s mounting to-be-migrated PVC %s, waiting\n", nsPod.Name, ns, nsClaim.Name)
Expand All @@ -892,7 +916,7 @@ checkPvcPodLoop:
}

w.Printf("All pods removed successfully\n")
return updatedPVCs, nil
return nil
}

func scaleUpPods(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, namespaces []string) error {
Expand Down
Loading

0 comments on commit 2ced425

Please sign in to comment.