Skip to content

Commit

Permalink
Cleanup microcluster/k8s when scaling down cluster nodes (#15)
Browse files Browse the repository at this point in the history
  • Loading branch information
bschimke95 authored Jun 28, 2024
1 parent 12c38eb commit 7bac9bc
Show file tree
Hide file tree
Showing 12 changed files with 152 additions and 69 deletions.
11 changes: 10 additions & 1 deletion bootstrap/api/v1beta2/ck8sconfig_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,22 @@ type CK8sControlPlaneConfig struct {

// MicroclusterPort is the port to use for microcluster. If unset, ":2380" (etcd peer) will be used.
// +optional
MicroclusterPort int `json:"microclusterPort,omitempty"`
MicroclusterPort *int `json:"microclusterPort,omitempty"`

// ExtraKubeAPIServerArgs is extra arguments to add to kube-apiserver.
// +optional
ExtraKubeAPIServerArgs map[string]*string `json:"extraKubeAPIServerArgs,omitempty"`
}

// GetMicroclusterPort returns the port to use for microcluster.
// If unset, 2380 (etcd peer) will be used.
func (c *CK8sControlPlaneConfig) GetMicroclusterPort() int {
if c.MicroclusterPort == nil {
return 2380
}
return *c.MicroclusterPort
}

// CK8sConfigStatus defines the observed state of CK8sConfig.
type CK8sConfigStatus struct {
// Ready indicates the BootstrapData field is ready to be consumed
Expand Down
25 changes: 5 additions & 20 deletions bootstrap/controllers/ck8sconfig_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,22 +220,12 @@ func (r *CK8sConfigReconciler) joinControlplane(ctx context.Context, scope *Scop
// injects into config.Version values from top level object
r.reconcileTopLevelObjectSettings(scope.Cluster, machine, scope.Config)

authToken, err := token.Lookup(ctx, r.Client, client.ObjectKeyFromObject(scope.Cluster))
if err != nil {
conditions.MarkFalse(scope.Config, bootstrapv1.DataSecretAvailableCondition, bootstrapv1.DataSecretGenerationFailedReason, clusterv1.ConditionSeverityWarning, err.Error())
return err
}

if authToken == nil {
return fmt.Errorf("auth token not yet generated")
}

workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(scope.Cluster))
workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(scope.Cluster), scope.Config.Spec.ControlPlaneConfig.GetMicroclusterPort())
if err != nil {
return fmt.Errorf("failed to create remote cluster client: %w", err)
}

joinToken, err := workloadCluster.NewControlPlaneJoinToken(ctx, *authToken, scope.Config.Spec.ControlPlaneConfig.MicroclusterPort, scope.Config.Name)
joinToken, err := workloadCluster.NewControlPlaneJoinToken(ctx, scope.Config.Name)
if err != nil {
return fmt.Errorf("failed to request join token: %w", err)
}
Expand Down Expand Up @@ -303,12 +293,12 @@ func (r *CK8sConfigReconciler) joinWorker(ctx context.Context, scope *Scope) err
return fmt.Errorf("auth token not yet generated")
}

workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(scope.Cluster))
workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(scope.Cluster), scope.Config.Spec.ControlPlaneConfig.GetMicroclusterPort())
if err != nil {
return fmt.Errorf("failed to create remote cluster client: %w", err)
}

joinToken, err := workloadCluster.NewWorkerJoinToken(ctx, *authToken, scope.Config.Spec.ControlPlaneConfig.MicroclusterPort, scope.Config.Name)
joinToken, err := workloadCluster.NewWorkerJoinToken(ctx)
if err != nil {
return fmt.Errorf("failed to request join token: %w", err)
}
Expand Down Expand Up @@ -473,12 +463,7 @@ func (r *CK8sConfigReconciler) handleClusterNotInitialized(ctx context.Context,
return ctrl.Result{}, err
}

var microclusterPort int
microclusterPort = scope.Config.Spec.ControlPlaneConfig.MicroclusterPort
if microclusterPort == 0 {
microclusterPort = 2380
}

microclusterPort := scope.Config.Spec.ControlPlaneConfig.GetMicroclusterPort()
ds, err := ck8s.RenderK8sdProxyDaemonSetManifest(ck8s.K8sdProxyDaemonSetInput{K8sdPort: microclusterPort})
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed to render k8sd-proxy daemonset: %w", err)
Expand Down
8 changes: 5 additions & 3 deletions controlplane/controllers/ck8scontrolplane_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func (r *CK8sControlPlaneReconciler) SetupWithManager(ctx context.Context, mgr c

if r.managementClusterUncached == nil {
r.managementClusterUncached = &ck8s.Management{
Client: mgr.GetAPIReader(),
Client: mgr.GetClient(),
K8sdDialTimeout: r.K8sdDialTimeout,
}
}
Expand Down Expand Up @@ -378,7 +378,8 @@ func (r *CK8sControlPlaneReconciler) updateStatus(ctx context.Context, kcp *cont
}
}

workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(cluster))
microclusterPort := kcp.Spec.CK8sConfigSpec.ControlPlaneConfig.GetMicroclusterPort()
workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(cluster), microclusterPort)
if err != nil {
return fmt.Errorf("failed to create remote cluster client: %w", err)
}
Expand Down Expand Up @@ -665,7 +666,8 @@ func (r *CK8sControlPlaneReconciler) reconcileControlPlaneConditions(ctx context
return nil
}

workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(controlPlane.Cluster))
microclusterPort := controlPlane.KCP.Spec.CK8sConfigSpec.ControlPlaneConfig.GetMicroclusterPort()
workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(controlPlane.Cluster), microclusterPort)
if err != nil {
return fmt.Errorf("cannot get remote client to workload cluster: %w", err)
}
Expand Down
14 changes: 6 additions & 8 deletions controlplane/controllers/machine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"time"

"github.com/canonical/cluster-api-k8s/pkg/ck8s"
"github.com/go-logr/logr"
"github.com/pkg/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -24,10 +25,7 @@ type MachineReconciler struct {

K8sdDialTimeout time.Duration

// NOTE(neoaggelos): See note below
/**
managementCluster ck8s.ManagementCluster
**/
}

func (r *MachineReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, log *logr.Logger) error {
Expand All @@ -36,15 +34,16 @@ func (r *MachineReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manag
Build(r)

// NOTE(neoaggelos): See note below
/**
if r.managementCluster == nil {
r.managementCluster = &ck8s.Management{
Client: r.Client,
EtcdDialTimeout: r.EtcdDialTimeout,
EtcdCallTimeout: r.EtcdCallTimeout,
K8sdDialTimeout: r.K8sdDialTimeout,
/*
EtcdDialTimeout: r.EtcdDialTimeout,
EtcdCallTimeout: r.EtcdCallTimeout,
*/
}
}
**/

return err
}
Expand Down Expand Up @@ -101,7 +100,6 @@ func (r *MachineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
logger.Info("unable to get cluster.")
return ctrl.Result{}, errors.Wrapf(err, "unable to get cluster")
}
workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(cluster))
if err != nil {
logger.Error(err, "failed to create client to workload cluster")
Expand Down
13 changes: 13 additions & 0 deletions controlplane/controllers/remediation.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/klog/v2"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/annotations"
"sigs.k8s.io/cluster-api/util/collections"
"sigs.k8s.io/cluster-api/util/conditions"
Expand Down Expand Up @@ -236,6 +237,18 @@ func (r *CK8sControlPlaneReconciler) reconcileUnhealthyMachines(ctx context.Cont
**/
}

microclusterPort := controlPlane.KCP.Spec.CK8sConfigSpec.ControlPlaneConfig.GetMicroclusterPort()
clusterObjectKey := util.ObjectKey(controlPlane.Cluster)
workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, clusterObjectKey, microclusterPort)
if err != nil {
log.Error(err, "failed to create client to workload cluster")
return ctrl.Result{}, errors.Wrapf(err, "failed to create client to workload cluster")
}

if err := workloadCluster.RemoveMachineFromCluster(ctx, machineToBeRemediated); err != nil {
log.Error(err, "failed to remove machine from microcluster")
}

// Delete the machine
if err := r.Client.Delete(ctx, machineToBeRemediated); err != nil {
conditions.MarkFalse(machineToBeRemediated, clusterv1.MachineOwnerRemediatedCondition, clusterv1.RemediationFailedReason, clusterv1.ConditionSeverityError, err.Error())
Expand Down
12 changes: 12 additions & 0 deletions controlplane/controllers/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,18 @@ func (r *CK8sControlPlaneReconciler) scaleDownControlPlane(
}
**/

microclusterPort := controlPlane.KCP.Spec.CK8sConfigSpec.ControlPlaneConfig.GetMicroclusterPort()
clusterObjectKey := util.ObjectKey(cluster)
workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, clusterObjectKey, microclusterPort)
if err != nil {
logger.Error(err, "failed to create client to workload cluster")
return ctrl.Result{}, errors.Wrapf(err, "failed to create client to workload cluster")
}

if err := workloadCluster.RemoveMachineFromCluster(ctx, machineToDelete); err != nil {
logger.Error(err, "failed to remove machine from microcluster")
}

logger = logger.WithValues("machine", machineToDelete)
if err := r.Client.Delete(ctx, machineToDelete); err != nil && !apierrors.IsNotFound(err) {
logger.Error(err, "Failed to delete control plane machine")
Expand Down
2 changes: 1 addition & 1 deletion controlplane/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func main() {
flag.DurationVar(&syncPeriod, "sync-period", 10*time.Minute,
"The minimum interval at which watched resources are reconciled (e.g. 15m)")

flag.DurationVar(&k8sdDialTimeout, "k8sd-dial-timeout-duration", 10*time.Second,
flag.DurationVar(&k8sdDialTimeout, "k8sd-dial-timeout-duration", 60*time.Second,
"Duration that the proxy client waits at most to establish a connection with k8sd")

flag.Parse()
Expand Down
7 changes: 7 additions & 0 deletions pkg/ck8s/api/cluster_node.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package apiv1

// RemoveNodeRequest is used to request to remove a node from the cluster.
type RemoveNodeRequest struct {
Name string `json:"name"`
Force bool `json:"force"`
}
20 changes: 17 additions & 3 deletions pkg/ck8s/management_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"time"

"github.com/canonical/cluster-api-k8s/pkg/token"
"k8s.io/client-go/kubernetes/scheme"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/controllers/remote"
Expand All @@ -17,14 +18,14 @@ type ManagementCluster interface {
client.Reader

GetMachinesForCluster(ctx context.Context, cluster client.ObjectKey, filters ...collections.Func) (collections.Machines, error)
GetWorkloadCluster(ctx context.Context, clusterKey client.ObjectKey) (*Workload, error)
GetWorkloadCluster(ctx context.Context, clusterKey client.ObjectKey, microclusterPort int) (*Workload, error)
}

// Management holds operations on the management cluster.
type Management struct {
ManagementCluster

Client client.Reader
Client client.Client

K8sdDialTimeout time.Duration
}
Expand Down Expand Up @@ -70,7 +71,7 @@ const (

// GetWorkloadCluster builds a cluster object.
// The cluster comes with an etcd client generator to connect to any etcd pod living on a managed machine.
func (m *Management) GetWorkloadCluster(ctx context.Context, clusterKey client.ObjectKey) (*Workload, error) {
func (m *Management) GetWorkloadCluster(ctx context.Context, clusterKey client.ObjectKey, microclusterPort int) (*Workload, error) {
restConfig, err := remote.RESTConfig(ctx, CK8sControlPlaneControllerName, m.Client, clusterKey)
if err != nil {
return nil, fmt.Errorf("failed to get config: %w", err)
Expand All @@ -87,10 +88,21 @@ func (m *Management) GetWorkloadCluster(ctx context.Context, clusterKey client.O
return nil, &RemoteClusterConnectionError{Name: clusterKey.String(), Err: err}
}

authToken, err := token.Lookup(ctx, m.Client, clusterKey)
if err != nil {
return nil, fmt.Errorf("failed to lookup auth token: %w", err)
}

if authToken == nil {
return nil, fmt.Errorf("auth token not yet generated")
}

workload := &Workload{
authToken: *authToken,
Client: c,
ClientRestConfig: restConfig,
K8sdClientGenerator: g,
microclusterPort: microclusterPort,

/**
CoreDNSMigrator: &CoreDNSMigrator{},
Expand Down Expand Up @@ -155,3 +167,5 @@ func (m *Management) getEtcdCAKeyPair(ctx context.Context, clusterKey client.Obj
return crtData, keyData, nil
}
**/

var _ ManagementCluster = &Management{}
Loading

0 comments on commit 7bac9bc

Please sign in to comment.