diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index c5e7d5709d..911ff63829 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -28,25 +28,6 @@ import ( "sync" "time" - "github.com/pkg/errors" - "github.com/prometheus/client_golang/prometheus/promhttp" - "github.com/sirupsen/logrus" - "github.com/spf13/cobra" - "github.com/spf13/pflag" - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/types" - kubeerrs "k8s.io/apimachinery/pkg/util/errors" - "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/discovery" - "k8s.io/client-go/dynamic" - corev1informers "k8s.io/client-go/informers/core/v1" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/cache" - api "github.com/heptio/ark/pkg/apis/ark/v1" "github.com/heptio/ark/pkg/backup" "github.com/heptio/ark/pkg/buildinfo" @@ -67,6 +48,25 @@ import ( "github.com/heptio/ark/pkg/util/kube" "github.com/heptio/ark/pkg/util/logging" "github.com/heptio/ark/pkg/util/stringslice" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "github.com/spf13/pflag" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + kubeerrs "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/discovery" + "k8s.io/client-go/dynamic" + kubeinformers "k8s.io/client-go/informers" + corev1informers "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" ) const ( @@ -75,14 +75,15 @@ const ( defaultBackupSyncPeriod = time.Minute defaultPodVolumeOperationTimeout = 60 * time.Minute + defaultNamespaceTimeout = 10 * time.Minute ) type serverConfig struct { - pluginDir, metricsAddress, defaultBackupLocation string - backupSyncPeriod, podVolumeOperationTimeout time.Duration - restoreResourcePriorities []string - defaultVolumeSnapshotLocations map[string]string - restoreOnly bool + pluginDir, metricsAddress, defaultBackupLocation string + backupSyncPeriod, podVolumeOperationTimeout, namespaceTimeout time.Duration + restoreResourcePriorities []string + defaultVolumeSnapshotLocations map[string]string + restoreOnly bool } func NewCommand() *cobra.Command { @@ -97,6 +98,7 @@ func NewCommand() *cobra.Command { backupSyncPeriod: defaultBackupSyncPeriod, podVolumeOperationTimeout: defaultPodVolumeOperationTimeout, restoreResourcePriorities: defaultRestorePriorities, + namespaceTimeout: defaultNamespaceTimeout, } ) @@ -152,6 +154,7 @@ func NewCommand() *cobra.Command { command.Flags().StringSliceVar(&config.restoreResourcePriorities, "restore-resource-priorities", config.restoreResourcePriorities, "desired order of resource restores; any resource not in the list will be restored alphabetically after the prioritized resources") command.Flags().StringVar(&config.defaultBackupLocation, "default-backup-storage-location", config.defaultBackupLocation, "name of the default backup storage location") command.Flags().Var(&volumeSnapshotLocations, "default-volume-snapshot-locations", "list of unique volume providers and default volume snapshot location (provider1:location-01,provider2:location-02,...)") + command.Flags().DurationVar(&config.namespaceTimeout, "namespace-timeout", config.namespaceTimeout, "duration to wait on namespace termination before failing a restore") return command } @@ -171,24 +174,25 @@ func getServerNamespace(namespaceFlag *pflag.Flag) string { } type server struct { - namespace string - metricsAddress string - kubeClientConfig *rest.Config - kubeClient kubernetes.Interface - arkClient clientset.Interface - discoveryClient discovery.DiscoveryInterface - discoveryHelper arkdiscovery.Helper - dynamicClient dynamic.Interface - sharedInformerFactory informers.SharedInformerFactory - ctx context.Context - cancelFunc context.CancelFunc - logger logrus.FieldLogger - logLevel logrus.Level - pluginRegistry plugin.Registry - pluginManager plugin.Manager - resticManager restic.RepositoryManager - metrics *metrics.ServerMetrics - config serverConfig + namespace string + metricsAddress string + kubeClientConfig *rest.Config + kubeClient kubernetes.Interface + arkClient clientset.Interface + discoveryClient discovery.DiscoveryInterface + discoveryHelper arkdiscovery.Helper + dynamicClient dynamic.Interface + sharedInformerFactory informers.SharedInformerFactory + kubeSharedInformerFactory kubeinformers.SharedInformerFactory + ctx context.Context + cancelFunc context.CancelFunc + logger logrus.FieldLogger + logLevel logrus.Level + pluginRegistry plugin.Registry + pluginManager plugin.Manager + resticManager restic.RepositoryManager + metrics *metrics.ServerMetrics + config serverConfig } func newServer(namespace, baseName string, config serverConfig, logger *logrus.Logger) (*server, error) { @@ -224,14 +228,15 @@ func newServer(namespace, baseName string, config serverConfig, logger *logrus.L ctx, cancelFunc := context.WithCancel(context.Background()) s := &server{ - namespace: namespace, - metricsAddress: config.metricsAddress, - kubeClientConfig: clientConfig, - kubeClient: kubeClient, - arkClient: arkClient, - discoveryClient: arkClient.Discovery(), - dynamicClient: dynamicClient, - sharedInformerFactory: informers.NewSharedInformerFactoryWithOptions(arkClient, 0, informers.WithNamespace(namespace)), + namespace: namespace, + metricsAddress: config.metricsAddress, + kubeClientConfig: clientConfig, + kubeClient: kubeClient, + arkClient: arkClient, + discoveryClient: arkClient.Discovery(), + dynamicClient: dynamicClient, + sharedInformerFactory: informers.NewSharedInformerFactoryWithOptions(arkClient, 0, informers.WithNamespace(namespace)), + kubeSharedInformerFactory: kubeinformers.NewSharedInformerFactory(kubeClient, 0), ctx: ctx, cancelFunc: cancelFunc, logger: logger, @@ -584,7 +589,6 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string backupDeletionController.Run(ctx, 1) wg.Done() }() - } restorer, err := restore.NewKubernetesRestorer( @@ -592,12 +596,24 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string client.NewDynamicFactory(s.dynamicClient), s.config.restoreResourcePriorities, s.kubeClient.CoreV1().Namespaces(), + s.kubeSharedInformerFactory.Core().V1().Namespaces(), + s.kubeClient.CoreV1().PersistentVolumes(), + s.kubeSharedInformerFactory.Core().V1().PersistentVolumes(), s.resticManager, s.config.podVolumeOperationTimeout, + s.config.namespaceTimeout, s.logger, ) cmd.CheckError(err) + // Start the informat and wait for cache sync here so that the event handlers added in NewKubernetesRestorer are ready by the time + // the restore controller is started + s.logger.Debugln("Starting kube shared informer factory") + go s.kubeSharedInformerFactory.Start(ctx.Done()) + s.logger.Debugln("Waiting for kubesharedinformer to sync") + cache.WaitForCacheSync(ctx.Done(), s.kubeSharedInformerFactory.Core().V1().Namespaces().Informer().HasSynced) + s.logger.Debugln("kubesharedinformer synced") + restoreController := controller.NewRestoreController( s.namespace, s.sharedInformerFactory.Ark().V1().Restores(), diff --git a/pkg/restore/restore.go b/pkg/restore/restore.go index 8fe1b31282..04e9b7ec30 100644 --- a/pkg/restore/restore.go +++ b/pkg/restore/restore.go @@ -30,6 +30,19 @@ import ( "sync" "time" + api "github.com/heptio/ark/pkg/apis/ark/v1" + "github.com/heptio/ark/pkg/client" + "github.com/heptio/ark/pkg/cloudprovider" + "github.com/heptio/ark/pkg/discovery" + listers "github.com/heptio/ark/pkg/generated/listers/ark/v1" + "github.com/heptio/ark/pkg/kuberesource" + "github.com/heptio/ark/pkg/restic" + "github.com/heptio/ark/pkg/util/boolptr" + "github.com/heptio/ark/pkg/util/collections" + "github.com/heptio/ark/pkg/util/filesystem" + "github.com/heptio/ark/pkg/util/kube" + arksync "github.com/heptio/ark/pkg/util/sync" + "github.com/heptio/ark/pkg/volume" "github.com/pkg/errors" "github.com/sirupsen/logrus" "k8s.io/api/core/v1" @@ -43,21 +56,9 @@ import ( kubeerrs "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/watch" + corev1informers "k8s.io/client-go/informers/core/v1" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" - - api "github.com/heptio/ark/pkg/apis/ark/v1" - "github.com/heptio/ark/pkg/client" - "github.com/heptio/ark/pkg/cloudprovider" - "github.com/heptio/ark/pkg/discovery" - listers "github.com/heptio/ark/pkg/generated/listers/ark/v1" - "github.com/heptio/ark/pkg/kuberesource" - "github.com/heptio/ark/pkg/restic" - "github.com/heptio/ark/pkg/util/boolptr" - "github.com/heptio/ark/pkg/util/collections" - "github.com/heptio/ark/pkg/util/filesystem" - "github.com/heptio/ark/pkg/util/kube" - arksync "github.com/heptio/ark/pkg/util/sync" - "github.com/heptio/ark/pkg/volume" + "k8s.io/client-go/tools/cache" ) type BlockStoreGetter interface { @@ -86,8 +87,12 @@ type kubernetesRestorer struct { discoveryHelper discovery.Helper dynamicFactory client.DynamicFactory namespaceClient corev1.NamespaceInterface + pvClient corev1.PersistentVolumeInterface resticRestorerFactory restic.RestorerFactory resticTimeout time.Duration + namespaceTimeout time.Duration + nsWaiter *namespaceWaiter + pvWaiter *pvWaiter resourcePriorities []string fileSystem filesystem.Interface logger logrus.FieldLogger @@ -157,20 +162,77 @@ func NewKubernetesRestorer( dynamicFactory client.DynamicFactory, resourcePriorities []string, namespaceClient corev1.NamespaceInterface, + namespaceInformer corev1informers.NamespaceInformer, + pvClient corev1.PersistentVolumeInterface, + pvInformer corev1informers.PersistentVolumeInformer, resticRestorerFactory restic.RestorerFactory, resticTimeout time.Duration, + namespaceTimeout time.Duration, logger logrus.FieldLogger, ) (Restorer, error) { - return &kubernetesRestorer{ + kr := &kubernetesRestorer{ discoveryHelper: discoveryHelper, dynamicFactory: dynamicFactory, namespaceClient: namespaceClient, + pvClient: pvClient, resticRestorerFactory: resticRestorerFactory, resticTimeout: resticTimeout, + namespaceTimeout: namespaceTimeout, + nsWaiter: &namespaceWaiter{chans: make(map[string]chan *v1.Namespace)}, + pvWaiter: &pvWaiter{chans: make(map[string]chan *v1.PersistentVolume)}, resourcePriorities: resourcePriorities, logger: logger, fileSystem: filesystem.NewFileSystem(), - }, nil + } + + namespaceInformer.Informer().AddEventHandler( + cache.ResourceEventHandlerFuncs{ + DeleteFunc: func(obj interface{}) { + // DeletedFinalStateUnknown is a status that an object is given if an informer missed the actual delete event but caught up later. + if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok { + obj = tombstone.Obj + } + + ns, ok := obj.(*v1.Namespace) + // See if the NS is being watched for delete events. If not, no-op. + deleteChan, ok := kr.nsWaiter.getDeleteChan(ns.Name) + if !ok { + return + } + + // If the the name of the namespace is found in nsDeleteChans, + // add the NS object to the channel and wait for it to be received. + deleteChan <- ns + kr.nsWaiter.removeDeleteChan(ns.Name) + }, + }, + ) + + // Watch for PVs being deleted, in case we need to wait on them. + pvInformer.Informer().AddEventHandler( + cache.ResourceEventHandlerFuncs{ + DeleteFunc: func(obj interface{}) { + if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok { + obj = tombstone.Obj + } + + pv, ok := obj.(*v1.PersistentVolume) + if !ok { + return + } + + updateChan, ok := kr.pvWaiter.getDeleteChan(pv.Name) + if !ok { + return + } + + updateChan <- pv + kr.pvWaiter.removeDeleteChan(pv.Name) + }, + }, + ) + + return kr, nil } // Restore executes a restore into the target Kubernetes cluster according to the restore spec @@ -254,12 +316,16 @@ func (kr *kubernetesRestorer) Restore( dynamicFactory: kr.dynamicFactory, fileSystem: kr.fileSystem, namespaceClient: kr.namespaceClient, + nsWaiter: kr.nsWaiter, + pvClient: kr.pvClient, + pvWaiter: kr.pvWaiter, actions: resolvedActions, blockStoreGetter: blockStoreGetter, resticRestorer: resticRestorer, pvsToProvision: sets.NewString(), pvRestorer: pvRestorer, volumeSnapshots: volumeSnapshots, + namespaceTimeout: kr.namespaceTimeout, } return restoreCtx.execute() @@ -326,6 +392,97 @@ func resolveActions(actions []ItemAction, helper discovery.Helper) ([]resolvedAc return resolved, nil } +// namespaceWaiter encapsulates channels for namespaces waiting to terminate, along with a mutex for the map. +type namespaceWaiter struct { + chansLock sync.Mutex + chans map[string]chan *v1.Namespace +} + +func (nsw *namespaceWaiter) getDeleteChan(ns string) (chan *v1.Namespace, bool) { + nsw.chansLock.Lock() + defer nsw.chansLock.Unlock() + + ch, ok := nsw.chans[ns] + if !ok { + return nil, ok + } + return ch, ok +} + +func (nsw *namespaceWaiter) addDeleteChan(ns string) { + nsw.chansLock.Lock() + defer nsw.chansLock.Unlock() + + nsw.chans[ns] = make(chan *v1.Namespace) +} + +func (nsw *namespaceWaiter) removeDeleteChan(ns string) { + nsw.chansLock.Lock() + defer nsw.chansLock.Unlock() + + delete(nsw.chans, ns) +} + +func (nsw *namespaceWaiter) waitForDelete(ns string, timeout time.Duration) (*v1.Namespace, error) { + deleteChan, ok := nsw.getDeleteChan(ns) + if !ok { + return nil, errors.Errorf("Namespace %s not found in list of channels waiting to delete", ns) + } + + select { + case <-time.After(timeout): + return nil, errors.Errorf("Namespace %s did not terminate before the alloted timeout.", ns) + case res := <-deleteChan: + return res, nil + } +} + +// pvWaiter encapsulates waiting for a PV with a reclaim policy of 'Delete' to exit the 'Released' state. +// This is to avoid the condition where a PV associated with a PV is released but not yet deleted not getting restored. +type pvWaiter struct { + chansLock sync.Mutex + chans map[string]chan *v1.PersistentVolume +} + +func (pvw *pvWaiter) getDeleteChan(name string) (chan *v1.PersistentVolume, bool) { + pvw.chansLock.Lock() + defer pvw.chansLock.Unlock() + + ch, ok := pvw.chans[name] + if !ok { + return nil, ok + } + return ch, ok +} + +func (pvw *pvWaiter) addDeleteChan(name string) { + pvw.chansLock.Lock() + defer pvw.chansLock.Unlock() + + pvw.chans[name] = make(chan *v1.PersistentVolume) +} + +func (pvw *pvWaiter) removeDeleteChan(name string) { + pvw.chansLock.Lock() + defer pvw.chansLock.Unlock() + + delete(pvw.chans, name) +} + +func (pvw *pvWaiter) waitForDelete(name string, timeout time.Duration) (*v1.PersistentVolume, error) { + deleteChan, ok := pvw.getDeleteChan(name) + if !ok { + return nil, errors.Errorf("PV %s not found in list of channels waiting to delete", name) + } + + select { + case <-time.After(timeout): + return nil, errors.Errorf("PV %s did not delete before the alloted timeout.", name) + case res := <-deleteChan: + return res, nil + } +} + type context struct { backup *api.Backup backupReader io.Reader @@ -336,6 +493,9 @@ type context struct { dynamicFactory client.DynamicFactory fileSystem filesystem.Interface namespaceClient corev1.NamespaceInterface + nsWaiter *namespaceWaiter + pvClient corev1.PersistentVolumeInterface + pvWaiter *pvWaiter actions []resolvedAction blockStoreGetter BlockStoreGetter resticRestorer restic.Restorer @@ -345,6 +505,7 @@ type context struct { pvsToProvision sets.String pvRestorer PVRestorer volumeSnapshots []*volume.Snapshot + namespaceTimeout time.Duration } func (ctx *context) execute() (api.RestoreResult, api.RestoreResult) { @@ -409,6 +570,8 @@ func (ctx *context) restoreFromDir(dir string) (api.RestoreResult, api.RestoreRe for _, resource := range ctx.prioritizedResources { // we don't want to explicitly restore namespace API objs because we'll handle // them as a special case prior to restoring anything into them + // They're a special case since a user may choose to restore only pods, but those pods need to + // be placed in a namespace. if resource == kuberesource.Namespaces { continue } @@ -474,11 +637,36 @@ func (ctx *context) restoreFromDir(dir string) (api.RestoreResult, api.RestoreRe if !existingNamespaces.Has(mappedNsName) { logger := ctx.log.WithField("namespace", nsName) ns := getNamespace(logger, filepath.Join(dir, api.ResourcesDir, "namespaces", api.ClusterScopedDir, nsName+".json"), mappedNsName) - if _, err := kube.EnsureNamespaceExists(ns, ctx.namespaceClient); err != nil { + nsIsReady, err := kube.EnsureNamespaceExistsAndIsReady(ns, ctx.namespaceClient) + if err != nil { addArkError(&errs, err) continue } + if !nsIsReady { + tempNs, err := ctx.namespaceClient.Get(mappedNsName, metav1.GetOptions{}) + if err != nil && apierrors.IsNotFound(err) { + addArkError(&errs, err) + continue + } + + if isNSTerminating(tempNs) { + logger.Infof("Namespace still terminating, waiting") + + ctx.nsWaiter.addDeleteChan(mappedNsName) + _, err = ctx.nsWaiter.waitForDelete(mappedNsName, ctx.namespaceTimeout) + + if err != nil { + addArkError(&errs, err) + return warnings, errs + } + + // If we got here, the terminiation was successful, so go ahead and try to make the NS again. + logger.Infof("Termination successful, restoring namespace") + nsIsReady, err = kube.EnsureNamespaceExistsAndIsReady(ns, ctx.namespaceClient) + } + } + // keep track of namespaces that we know exist so we don't // have to try to create them multiple times existingNamespaces.Insert(mappedNsName) @@ -696,10 +884,19 @@ func (ctx *context) restoreResource(resource, namespace, resourcePath string) (a // Check if the PV exists in the cluster before attempting to create // a volume from the snapshot, in order to avoid orphaned volumes (GH #609) - pv, err := resourceClient.Get(name, metav1.GetOptions{}) + pv, err := ctx.pvClient.Get(name, metav1.GetOptions{}) if err != nil && !apierrors.IsNotFound(err) { addToResult(&errs, namespace, fmt.Errorf("error checking existence for PV %s: %v", name, err)) - continue + } + + if isPVReleasedOrBound(pv) { + ctx.log.Infof("Persistent volume %q still in use, waiting for deletion", name) + ctx.pvWaiter.addDeleteChan(name) + _, err := ctx.pvWaiter.waitForDelete(name, ctx.namespaceTimeout) + if err != nil { + addArkError(&errs, err) + } + ctx.log.Infof("Persistent volume %q deleted, proceeding", name) } // PV's existence will be recorded later. Just skip the volume restore logic. @@ -1068,6 +1265,20 @@ func (r *pvRestorer) executePVAction(obj *unstructured.Unstructured) (*unstructu return updated2, nil } +func isNSTerminating(ns *v1.Namespace) bool { + if ns == nil { + return false + } + return ns.Status.Phase == v1.NamespaceTerminating +} + +func isPVReleasedOrBound(pv *v1.PersistentVolume) bool { + if pv == nil { + return false + } + + return pv.Status.Phase == "Released" || pv.Status.Phase == "Bound" +} func isPVReady(obj runtime.Unstructured) bool { phase, err := collections.GetString(obj.UnstructuredContent(), "status.phase") if err != nil { diff --git a/pkg/util/kube/utils.go b/pkg/util/kube/utils.go index 0e26437290..e816326cb4 100644 --- a/pkg/util/kube/utils.go +++ b/pkg/util/kube/utils.go @@ -35,15 +35,23 @@ func NamespaceAndName(objMeta metav1.Object) string { return fmt.Sprintf("%s/%s", objMeta.GetNamespace(), objMeta.GetName()) } -// EnsureNamespaceExists attempts to create the provided Kubernetes namespace. It returns two values: -// a bool indicating whether or not the namespace was created, and an error if the create failed +// EnsureNamespaceExistsAndIsReady attempts to create the provided Kubernetes namespace. It returns two values: +// a bool indicating whether or not the namespace is ready, and an error if the create failed // for a reason other than that the namespace already exists. Note that in the case where the -// namespace already exists, this function will return (false, nil). -func EnsureNamespaceExists(namespace *corev1api.Namespace, client corev1client.NamespaceInterface) (bool, error) { +// namespace already exists and is not ready, this function will return (false, nil). +func EnsureNamespaceExistsAndIsReady(namespace *corev1api.Namespace, client corev1client.NamespaceInterface) (bool, error) { if _, err := client.Create(namespace); err == nil { return true, nil } else if apierrors.IsAlreadyExists(err) { - return false, nil + // Do a follow up Get because Create returns an uninitialized namespace object, not the one that exists. + ns, err := client.Get(namespace.Name, metav1.GetOptions{}) + if err != nil { + return false, errors.Wrapf(err, "error getting namespace %s", namespace.Name) + } + if ns.Status.Phase == corev1api.NamespaceTerminating { + return false, nil + } + return true, nil } else { return false, errors.Wrapf(err, "error creating namespace %s", namespace.Name) }