Skip to content

Commit

Permalink
Wait for namespace to terminate before restoring
Browse files Browse the repository at this point in the history
This change also waits for persistent volumes that are or had been bound
to pods/persistent volume claims in a terminating namespace to be fully
deleted before trying to restore them from a snapshot.

Fixes vmware-tanzu#691

Signed-off-by: Nolan Brubaker <[email protected]>
  • Loading branch information
Nolan Brubaker committed Oct 31, 2018
1 parent 7d497e6 commit 69347be
Show file tree
Hide file tree
Showing 3 changed files with 310 additions and 75 deletions.
118 changes: 67 additions & 51 deletions pkg/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,6 @@ import (
"sync"
"time"

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
kubeerrs "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
corev1informers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"

api "github.com/heptio/ark/pkg/apis/ark/v1"
"github.com/heptio/ark/pkg/backup"
"github.com/heptio/ark/pkg/buildinfo"
Expand All @@ -67,6 +48,25 @@ import (
"github.com/heptio/ark/pkg/util/kube"
"github.com/heptio/ark/pkg/util/logging"
"github.com/heptio/ark/pkg/util/stringslice"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
kubeerrs "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
kubeinformers "k8s.io/client-go/informers"
corev1informers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)

const (
Expand All @@ -75,14 +75,15 @@ const (

defaultBackupSyncPeriod = time.Minute
defaultPodVolumeOperationTimeout = 60 * time.Minute
defaultNamespaceTimeout = 10 * time.Minute
)

type serverConfig struct {
pluginDir, metricsAddress, defaultBackupLocation string
backupSyncPeriod, podVolumeOperationTimeout time.Duration
restoreResourcePriorities []string
defaultVolumeSnapshotLocations map[string]string
restoreOnly bool
pluginDir, metricsAddress, defaultBackupLocation string
backupSyncPeriod, podVolumeOperationTimeout, namespaceTimeout time.Duration
restoreResourcePriorities []string
defaultVolumeSnapshotLocations map[string]string
restoreOnly bool
}

func NewCommand() *cobra.Command {
Expand All @@ -97,6 +98,7 @@ func NewCommand() *cobra.Command {
backupSyncPeriod: defaultBackupSyncPeriod,
podVolumeOperationTimeout: defaultPodVolumeOperationTimeout,
restoreResourcePriorities: defaultRestorePriorities,
namespaceTimeout: defaultNamespaceTimeout,
}
)

Expand Down Expand Up @@ -152,6 +154,7 @@ func NewCommand() *cobra.Command {
command.Flags().StringSliceVar(&config.restoreResourcePriorities, "restore-resource-priorities", config.restoreResourcePriorities, "desired order of resource restores; any resource not in the list will be restored alphabetically after the prioritized resources")
command.Flags().StringVar(&config.defaultBackupLocation, "default-backup-storage-location", config.defaultBackupLocation, "name of the default backup storage location")
command.Flags().Var(&volumeSnapshotLocations, "default-volume-snapshot-locations", "list of unique volume providers and default volume snapshot location (provider1:location-01,provider2:location-02,...)")
command.Flags().DurationVar(&config.namespaceTimeout, "namespace-timeout", config.namespaceTimeout, "duration to wait on namespace termination before failing a restore")

return command
}
Expand All @@ -171,24 +174,25 @@ func getServerNamespace(namespaceFlag *pflag.Flag) string {
}

type server struct {
namespace string
metricsAddress string
kubeClientConfig *rest.Config
kubeClient kubernetes.Interface
arkClient clientset.Interface
discoveryClient discovery.DiscoveryInterface
discoveryHelper arkdiscovery.Helper
dynamicClient dynamic.Interface
sharedInformerFactory informers.SharedInformerFactory
ctx context.Context
cancelFunc context.CancelFunc
logger logrus.FieldLogger
logLevel logrus.Level
pluginRegistry plugin.Registry
pluginManager plugin.Manager
resticManager restic.RepositoryManager
metrics *metrics.ServerMetrics
config serverConfig
namespace string
metricsAddress string
kubeClientConfig *rest.Config
kubeClient kubernetes.Interface
arkClient clientset.Interface
discoveryClient discovery.DiscoveryInterface
discoveryHelper arkdiscovery.Helper
dynamicClient dynamic.Interface
sharedInformerFactory informers.SharedInformerFactory
kubeSharedInformerFactory kubeinformers.SharedInformerFactory
ctx context.Context
cancelFunc context.CancelFunc
logger logrus.FieldLogger
logLevel logrus.Level
pluginRegistry plugin.Registry
pluginManager plugin.Manager
resticManager restic.RepositoryManager
metrics *metrics.ServerMetrics
config serverConfig
}

func newServer(namespace, baseName string, config serverConfig, logger *logrus.Logger) (*server, error) {
Expand Down Expand Up @@ -224,14 +228,15 @@ func newServer(namespace, baseName string, config serverConfig, logger *logrus.L
ctx, cancelFunc := context.WithCancel(context.Background())

s := &server{
namespace: namespace,
metricsAddress: config.metricsAddress,
kubeClientConfig: clientConfig,
kubeClient: kubeClient,
arkClient: arkClient,
discoveryClient: arkClient.Discovery(),
dynamicClient: dynamicClient,
sharedInformerFactory: informers.NewSharedInformerFactoryWithOptions(arkClient, 0, informers.WithNamespace(namespace)),
namespace: namespace,
metricsAddress: config.metricsAddress,
kubeClientConfig: clientConfig,
kubeClient: kubeClient,
arkClient: arkClient,
discoveryClient: arkClient.Discovery(),
dynamicClient: dynamicClient,
sharedInformerFactory: informers.NewSharedInformerFactoryWithOptions(arkClient, 0, informers.WithNamespace(namespace)),
kubeSharedInformerFactory: kubeinformers.NewSharedInformerFactory(kubeClient, 0),
ctx: ctx,
cancelFunc: cancelFunc,
logger: logger,
Expand Down Expand Up @@ -584,20 +589,31 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
backupDeletionController.Run(ctx, 1)
wg.Done()
}()

}

restorer, err := restore.NewKubernetesRestorer(
s.discoveryHelper,
client.NewDynamicFactory(s.dynamicClient),
s.config.restoreResourcePriorities,
s.kubeClient.CoreV1().Namespaces(),
s.kubeSharedInformerFactory.Core().V1().Namespaces(),
s.kubeClient.CoreV1().PersistentVolumes(),
s.kubeSharedInformerFactory.Core().V1().PersistentVolumes(),
s.resticManager,
s.config.podVolumeOperationTimeout,
s.config.namespaceTimeout,
s.logger,
)
cmd.CheckError(err)

// Start the informat and wait for cache sync here so that the event handlers added in NewKubernetesRestorer are ready by the time
// the restore controller is started
s.logger.Debugln("Starting kube shared informer factory")
go s.kubeSharedInformerFactory.Start(ctx.Done())
s.logger.Debugln("Waiting for kubesharedinformer to sync")
cache.WaitForCacheSync(ctx.Done(), s.kubeSharedInformerFactory.Core().V1().Namespaces().Informer().HasSynced)
s.logger.Debugln("kubesharedinformer synced")

restoreController := controller.NewRestoreController(
s.namespace,
s.sharedInformerFactory.Ark().V1().Restores(),
Expand Down
Loading

0 comments on commit 69347be

Please sign in to comment.