From cad319dd5dea67d3ecda4bc008b332bcfea63a87 Mon Sep 17 00:00:00 2001 From: Jean-Philippe Evrard Date: Thu, 10 Oct 2024 22:56:33 +0200 Subject: [PATCH] Make locks more generic Implementation details of lock should not leak into the calling methods. Without this path, calls are a bit more complex and error handling is harder to find. This is a problem for long term maintenance, as it is tougher to refactor the locks without impacting the main. Decoupling the two (main usage of the lock, and the lock themselves) will allow us to introduce other kinds of locks easily. I solve this by inlining into the daemonsetlock package: - including all the methods for managing locks from the main.go functions. Those were mostly doing error handling where code became no-op by introducing multiple daemonsetlock types - adding the lock release delay part of lock info I also did not like the pattern include in Test method, which added a reference to nodeMeta: It was not very clear that Test was storing the current metadata of the node, or was returning the current state. (Metadata here only means unschedulable). The problem I saw was that the metadata was silently mutated from a lock Test method, which was very not obvious. Instead, I picked to explicitly return the lock data instead. I also made it explicit that the Acquire lock method is passing the node metadata as structured information, rather than an interface{}. This is a bit more fragile at runtime, but I prefer having very explicit errors if the locks are incorrect, rather than having to deal with unvalidated data. For the lock release delay, it was part of the rebootasrequired loop, where I believe it makes more sense to be part of the Release method itself, for readability. Yet, it hides the delay into the implementation detail, but it keeps the reboot as required goroutine more readable. Instead of passing the argument rebootDelay as parameter of the rebootasrequired method, this refactor took creation of the lock object in the main loop, close to all the variables, and then pass the lock object to the rebootasrequired. This makes the call for rebootasrequired more clear, and lock is now encompassing everything needed to acquire, release, or get info about the lock. Signed-off-by: Jean-Philippe Evrard --- cmd/kured/main.go | 163 +++++-------- pkg/daemonsetlock/daemonsetlock.go | 306 +++++++++++++++--------- pkg/daemonsetlock/daemonsetlock_test.go | 22 +- 3 files changed, 262 insertions(+), 229 deletions(-) diff --git a/cmd/kured/main.go b/cmd/kured/main.go index 1ab4b8424..96e229f7d 100644 --- a/cmd/kured/main.go +++ b/cmd/kured/main.go @@ -224,18 +224,6 @@ func main() { log.Warnf(err.Error()) } - log.Infof("Lock Annotation: %s/%s:%s", dsNamespace, dsName, lockAnnotation) - if lockTTL > 0 { - log.Infof("Lock TTL set, lock will expire after: %v", lockTTL) - } else { - log.Info("Lock TTL not set, lock will remain until being released") - } - if lockReleaseDelay > 0 { - log.Infof("Lock release delay set, lock release will be delayed by: %v", lockReleaseDelay) - } else { - log.Info("Lock release delay not set, lock will be released immediately after rebooting") - } - log.Infof("PreferNoSchedule taint: %s", preferNoScheduleTaintName) // This should be printed from blocker list instead of only blocking pod selectors @@ -278,7 +266,30 @@ func main() { checker = checkers.NewFileRebootChecker(rebootSentinelFile) } - go rebootAsRequired(nodeID, rebooter, checker, window, lockTTL, lockReleaseDelay) + config, err := rest.InClusterConfig() + if err != nil { + log.Fatal(err) + } + + client, err := kubernetes.NewForConfig(config) + if err != nil { + log.Fatal(err) + } + + log.Infof("Lock Annotation: %s/%s:%s", dsNamespace, dsName, lockAnnotation) + if lockTTL > 0 { + log.Infof("Lock TTL set, lock will expire after: %v", lockTTL) + } else { + log.Info("Lock TTL not set, lock will remain until being released") + } + if lockReleaseDelay > 0 { + log.Infof("Lock release delay set, lock release will be delayed by: %v", lockReleaseDelay) + } else { + log.Info("Lock release delay not set, lock will be released immediately after rebooting") + } + lock := daemonsetlock.New(client, nodeID, dsNamespace, dsName, lockAnnotation, lockTTL, concurrency, lockReleaseDelay) + + go rebootAsRequired(nodeID, rebooter, checker, window, lock, client) go maintainRebootRequiredMetric(nodeID, checker) http.Handle("/metrics", promhttp.Handler()) @@ -400,68 +411,6 @@ func stripQuotes(str string) string { return str } -func holding(lock *daemonsetlock.DaemonSetLock, metadata interface{}, isMultiLock bool) bool { - var holding bool - var err error - if isMultiLock { - holding, err = lock.TestMultiple() - } else { - holding, err = lock.Test(metadata) - } - if err != nil { - log.Fatalf("Error testing lock: %v", err) - } - if holding { - log.Infof("Holding lock") - } - return holding -} - -func acquire(lock *daemonsetlock.DaemonSetLock, metadata interface{}, TTL time.Duration, maxOwners int) bool { - var holding bool - var holder string - var err error - if maxOwners > 1 { - var holders []string - holding, holders, err = lock.AcquireMultiple(metadata, TTL, maxOwners) - holder = strings.Join(holders, ",") - } else { - holding, holder, err = lock.Acquire(metadata, TTL) - } - switch { - case err != nil: - log.Fatalf("Error acquiring lock: %v", err) - return false - case !holding: - log.Warnf("Lock already held: %v", holder) - return false - default: - log.Infof("Acquired reboot lock") - return true - } -} - -func throttle(releaseDelay time.Duration) { - if releaseDelay > 0 { - log.Infof("Delaying lock release by %v", releaseDelay) - time.Sleep(releaseDelay) - } -} - -func release(lock *daemonsetlock.DaemonSetLock, isMultiLock bool) { - log.Infof("Releasing lock") - - var err error - if isMultiLock { - err = lock.ReleaseMultiple() - } else { - err = lock.Release() - } - if err != nil { - log.Fatalf("Error releasing lock: %v", err) - } -} - func drain(client *kubernetes.Clientset, node *v1.Node) error { nodename := node.GetName() @@ -537,11 +486,6 @@ func maintainRebootRequiredMetric(nodeID string, checker checkers.Checker) { } } -// nodeMeta is used to remember information across reboots -type nodeMeta struct { - Unschedulable bool `json:"unschedulable"` -} - func addNodeAnnotations(client *kubernetes.Clientset, nodeID string, annotations map[string]string) error { node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeID, metav1.GetOptions{}) if err != nil { @@ -616,30 +560,23 @@ func updateNodeLabels(client *kubernetes.Clientset, node *v1.Node, labels []stri } } -func rebootAsRequired(nodeID string, rebooter reboot.Rebooter, checker checkers.Checker, window *timewindow.TimeWindow, TTL time.Duration, releaseDelay time.Duration) { - config, err := rest.InClusterConfig() - if err != nil { - log.Fatal(err) - } - - client, err := kubernetes.NewForConfig(config) - if err != nil { - log.Fatal(err) - } - - lock := daemonsetlock.New(client, nodeID, dsNamespace, dsName, lockAnnotation) +func rebootAsRequired(nodeID string, rebooter reboot.Rebooter, checker checkers.Checker, window *timewindow.TimeWindow, lock daemonsetlock.Lock, client *kubernetes.Clientset) { - nodeMeta := nodeMeta{} source := rand.NewSource(time.Now().UnixNano()) tick := delaytick.New(source, 1*time.Minute) for range tick { - if holding(lock, &nodeMeta, concurrency > 1) { + holding, lockData, err := lock.Holding() + if err != nil { + log.Errorf("Error testing lock: %v", err) + } + if holding { node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeID, metav1.GetOptions{}) if err != nil { log.Errorf("Error retrieving node object via k8s API: %v", err) continue } - if !nodeMeta.Unschedulable { + + if !lockData.Metadata.Unschedulable { err = uncordon(client, node) if err != nil { log.Errorf("Unable to uncordon %s: %v, will continue to hold lock and retry uncordon", node.GetName(), err) @@ -665,8 +602,12 @@ func rebootAsRequired(nodeID string, rebooter reboot.Rebooter, checker checkers. } } } - throttle(releaseDelay) - release(lock, concurrency > 1) + + err = lock.Release() + if err != nil { + log.Errorf("Error releasing lock, will retry: %v", err) + continue + } break } else { break @@ -705,7 +646,8 @@ func rebootAsRequired(nodeID string, rebooter reboot.Rebooter, checker checkers. if err != nil { log.Fatalf("Error retrieving node object via k8s API: %v", err) } - nodeMeta.Unschedulable = node.Spec.Unschedulable + + nodeMeta := daemonsetlock.NodeMeta{Unschedulable: node.Spec.Unschedulable} var timeNowString string if annotateNodes { @@ -738,17 +680,32 @@ func rebootAsRequired(nodeID string, rebooter reboot.Rebooter, checker checkers. } log.Infof("Reboot required%s", rebootRequiredBlockCondition) - if !holding(lock, &nodeMeta, concurrency > 1) && !acquire(lock, &nodeMeta, TTL, concurrency) { - // Prefer to not schedule pods onto this node to avoid draing the same pod multiple times. - preferNoScheduleTaint.Enable() - continue + holding, _, err := lock.Holding() + if err != nil { + log.Errorf("Error testing lock: %v", err) + } + + if !holding { + acquired, holder, err := lock.Acquire(nodeMeta) + if err != nil { + log.Errorf("Error acquiring lock: %v", err) + } + if !acquired { + log.Warnf("Lock already held: %v", holder) + // Prefer to not schedule pods onto this node to avoid draing the same pod multiple times. + preferNoScheduleTaint.Enable() + continue + } } err = drain(client, node) if err != nil { if !forceReboot { log.Errorf("Unable to cordon or drain %s: %v, will release lock and retry cordon and drain before rebooting when lock is next acquired", node.GetName(), err) - release(lock, concurrency > 1) + err = lock.Release() + if err != nil { + log.Errorf("Error releasing lock: %v", err) + } log.Infof("Performing a best-effort uncordon after failed cordon and drain") uncordon(client, node) continue diff --git a/pkg/daemonsetlock/daemonsetlock.go b/pkg/daemonsetlock/daemonsetlock.go index 2d8949b5a..f9386bb06 100644 --- a/pkg/daemonsetlock/daemonsetlock.go +++ b/pkg/daemonsetlock/daemonsetlock.go @@ -4,6 +4,8 @@ import ( "context" "encoding/json" "fmt" + log "github.com/sirupsen/logrus" + "strings" "time" v1 "k8s.io/api/apps/v1" @@ -18,6 +20,21 @@ const ( k8sAPICallRetryTimeout = 5 * time.Minute // How long to wait until we determine that the k8s API is definitively unavailable ) +type Lock interface { + Acquire(NodeMeta) (bool, string, error) + Release() error + Holding() (bool, LockAnnotationValue, error) +} + +type GenericLock struct { + TTL time.Duration + releaseDelay time.Duration +} + +type NodeMeta struct { + Unschedulable bool `json:"unschedulable"` +} + // DaemonSetLock holds all necessary information to do actions // on the kured ds which holds lock info through annotations. type DaemonSetLock struct { @@ -28,25 +45,92 @@ type DaemonSetLock struct { annotation string } -type lockAnnotationValue struct { +// DaemonSetSingleLock holds all necessary information to do actions +// on the kured ds which holds lock info through annotations. +type DaemonSetSingleLock struct { + GenericLock + DaemonSetLock +} + +// DaemonSetMultiLock holds all necessary information to do actions +// on the kured ds which holds lock info through annotations, valid +// for multiple nodes +type DaemonSetMultiLock struct { + GenericLock + DaemonSetLock + maxOwners int +} + +// LockAnnotationValue contains the lock data, +// which allows persistence across reboots, particularily recording if the +// node was already unschedulable before kured reboot. +// To be modified when using another type of lock storage. +type LockAnnotationValue struct { NodeID string `json:"nodeID"` - Metadata interface{} `json:"metadata,omitempty"` + Metadata NodeMeta `json:"metadata,omitempty"` Created time.Time `json:"created"` TTL time.Duration `json:"TTL"` } type multiLockAnnotationValue struct { MaxOwners int `json:"maxOwners"` - LockAnnotations []lockAnnotationValue `json:"locks"` + LockAnnotations []LockAnnotationValue `json:"locks"` } // New creates a daemonsetLock object containing the necessary data for follow up k8s requests -func New(client *kubernetes.Clientset, nodeID, namespace, name, annotation string) *DaemonSetLock { - return &DaemonSetLock{client, nodeID, namespace, name, annotation} +func New(client *kubernetes.Clientset, nodeID, namespace, name, annotation string, TTL time.Duration, concurrency int, lockReleaseDelay time.Duration) Lock { + if concurrency > 1 { + return &DaemonSetMultiLock{ + GenericLock: GenericLock{ + TTL: TTL, + releaseDelay: lockReleaseDelay, + }, + DaemonSetLock: DaemonSetLock{ + client: client, + nodeID: nodeID, + namespace: namespace, + name: name, + annotation: annotation, + }, + maxOwners: concurrency, + } + } else { + return &DaemonSetSingleLock{ + GenericLock: GenericLock{ + TTL: TTL, + releaseDelay: lockReleaseDelay, + }, + DaemonSetLock: DaemonSetLock{ + client: client, + nodeID: nodeID, + namespace: namespace, + name: name, + annotation: annotation, + }, + } + } +} + +// GetDaemonSet returns the named DaemonSet resource from the DaemonSetLock's configured client +func (dsl *DaemonSetLock) GetDaemonSet(sleep, timeout time.Duration) (*v1.DaemonSet, error) { + var ds *v1.DaemonSet + var lastError error + err := wait.PollImmediate(sleep, timeout, func() (bool, error) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + if ds, lastError = dsl.client.AppsV1().DaemonSets(dsl.namespace).Get(ctx, dsl.name, metav1.GetOptions{}); lastError != nil { + return false, nil + } + return true, nil + }) + if err != nil { + return nil, fmt.Errorf("Timed out trying to get daemonset %s in namespace %s: %v", dsl.name, dsl.namespace, lastError) + } + return ds, nil } // Acquire attempts to annotate the kured daemonset with lock info from instantiated DaemonSetLock using client-go -func (dsl *DaemonSetLock) Acquire(metadata interface{}, TTL time.Duration) (bool, string, error) { +func (dsl *DaemonSetSingleLock) Acquire(nodeMetadata NodeMeta) (bool, string, error) { for { ds, err := dsl.GetDaemonSet(k8sAPICallRetrySleep, k8sAPICallRetryTimeout) if err != nil { @@ -55,7 +139,7 @@ func (dsl *DaemonSetLock) Acquire(metadata interface{}, TTL time.Duration) (bool valueString, exists := ds.ObjectMeta.Annotations[dsl.annotation] if exists { - value := lockAnnotationValue{} + value := LockAnnotationValue{} if err := json.Unmarshal([]byte(valueString), &value); err != nil { return false, "", err } @@ -68,7 +152,7 @@ func (dsl *DaemonSetLock) Acquire(metadata interface{}, TTL time.Duration) (bool if ds.ObjectMeta.Annotations == nil { ds.ObjectMeta.Annotations = make(map[string]string) } - value := lockAnnotationValue{NodeID: dsl.nodeID, Metadata: metadata, Created: time.Now().UTC(), TTL: TTL} + value := LockAnnotationValue{NodeID: dsl.nodeID, Metadata: nodeMetadata, Created: time.Now().UTC(), TTL: dsl.TTL} valueBytes, err := json.Marshal(&value) if err != nil { return false, "", err @@ -89,49 +173,78 @@ func (dsl *DaemonSetLock) Acquire(metadata interface{}, TTL time.Duration) (bool } } -// AcquireMultiple creates and annotates the daemonset with a multiple owner lock -func (dsl *DaemonSetLock) AcquireMultiple(metadata interface{}, TTL time.Duration, maxOwners int) (bool, []string, error) { +// Test attempts to check the kured daemonset lock status (existence, expiry) from instantiated DaemonSetLock using client-go +func (dsl *DaemonSetSingleLock) Holding() (bool, LockAnnotationValue, error) { + var lockData LockAnnotationValue + ds, err := dsl.GetDaemonSet(k8sAPICallRetrySleep, k8sAPICallRetryTimeout) + if err != nil { + return false, lockData, fmt.Errorf("timed out trying to get daemonset %s in namespace %s: %w", dsl.name, dsl.namespace, err) + } + + valueString, exists := ds.ObjectMeta.Annotations[dsl.annotation] + if exists { + value := LockAnnotationValue{} + if err := json.Unmarshal([]byte(valueString), &value); err != nil { + return false, lockData, err + } + + if !ttlExpired(value.Created, value.TTL) { + return value.NodeID == dsl.nodeID, value, nil + } + } + + return false, lockData, nil +} + +// Release attempts to remove the lock data from the kured ds annotations using client-go +func (dsl *DaemonSetSingleLock) Release() error { + if dsl.releaseDelay > 0 { + log.Infof("Waiting %v before releasing lock", dsl.releaseDelay) + time.Sleep(dsl.releaseDelay) + } for { ds, err := dsl.GetDaemonSet(k8sAPICallRetrySleep, k8sAPICallRetryTimeout) if err != nil { - return false, []string{}, fmt.Errorf("timed out trying to get daemonset %s in namespace %s: %w", dsl.name, dsl.namespace, err) + return fmt.Errorf("timed out trying to get daemonset %s in namespace %s: %w", dsl.name, dsl.namespace, err) } - annotation := multiLockAnnotationValue{} valueString, exists := ds.ObjectMeta.Annotations[dsl.annotation] if exists { - if err := json.Unmarshal([]byte(valueString), &annotation); err != nil { - return false, []string{}, fmt.Errorf("error getting multi lock: %w", err) + value := LockAnnotationValue{} + if err := json.Unmarshal([]byte(valueString), &value); err != nil { + return err } - } - lockPossible, newAnnotation := dsl.canAcquireMultiple(annotation, metadata, TTL, maxOwners) - if !lockPossible { - return false, nodeIDsFromMultiLock(newAnnotation), nil + if value.NodeID != dsl.nodeID { + return fmt.Errorf("Not lock holder: %v", value.NodeID) + } + } else { + return fmt.Errorf("Lock not held") } - if ds.ObjectMeta.Annotations == nil { - ds.ObjectMeta.Annotations = make(map[string]string) - } - newAnnotationBytes, err := json.Marshal(&newAnnotation) - if err != nil { - return false, []string{}, fmt.Errorf("error marshalling new annotation lock: %w", err) - } - ds.ObjectMeta.Annotations[dsl.annotation] = string(newAnnotationBytes) + delete(ds.ObjectMeta.Annotations, dsl.annotation) - _, err = dsl.client.AppsV1().DaemonSets(dsl.namespace).Update(context.Background(), ds, metav1.UpdateOptions{}) + _, err = dsl.client.AppsV1().DaemonSets(dsl.namespace).Update(context.TODO(), ds, metav1.UpdateOptions{}) if err != nil { if se, ok := err.(*errors.StatusError); ok && se.ErrStatus.Reason == metav1.StatusReasonConflict { + // Something else updated the resource between us reading and writing - try again soon time.Sleep(time.Second) continue } else { - return false, []string{}, fmt.Errorf("error updating daemonset with multi lock: %w", err) + return err } } - return true, nodeIDsFromMultiLock(newAnnotation), nil + return nil } } +func ttlExpired(created time.Time, ttl time.Duration) bool { + if ttl > 0 && time.Since(created) >= ttl { + return true + } + return false +} + func nodeIDsFromMultiLock(annotation multiLockAnnotationValue) []string { nodeIDs := make([]string, 0, len(annotation.LockAnnotations)) for _, nodeLock := range annotation.LockAnnotations { @@ -140,7 +253,7 @@ func nodeIDsFromMultiLock(annotation multiLockAnnotationValue) []string { return nodeIDs } -func (dsl *DaemonSetLock) canAcquireMultiple(annotation multiLockAnnotationValue, metadata interface{}, TTL time.Duration, maxOwners int) (bool, multiLockAnnotationValue) { +func (dsl *DaemonSetLock) canAcquireMultiple(annotation multiLockAnnotationValue, metadata NodeMeta, TTL time.Duration, maxOwners int) (bool, multiLockAnnotationValue) { newAnnotation := multiLockAnnotationValue{MaxOwners: maxOwners} freeSpace := false if annotation.LockAnnotations == nil || len(annotation.LockAnnotations) < maxOwners { @@ -162,7 +275,7 @@ func (dsl *DaemonSetLock) canAcquireMultiple(annotation multiLockAnnotationValue if freeSpace { newAnnotation.LockAnnotations = append( newAnnotation.LockAnnotations, - lockAnnotationValue{ + LockAnnotationValue{ NodeID: dsl.nodeID, Metadata: metadata, Created: time.Now().UTC(), @@ -175,92 +288,80 @@ func (dsl *DaemonSetLock) canAcquireMultiple(annotation multiLockAnnotationValue return false, multiLockAnnotationValue{} } -// Test attempts to check the kured daemonset lock status (existence, expiry) from instantiated DaemonSetLock using client-go -func (dsl *DaemonSetLock) Test(metadata interface{}) (bool, error) { - ds, err := dsl.GetDaemonSet(k8sAPICallRetrySleep, k8sAPICallRetryTimeout) - if err != nil { - return false, fmt.Errorf("timed out trying to get daemonset %s in namespace %s: %w", dsl.name, dsl.namespace, err) - } +// Acquire creates and annotates the daemonset with a multiple owner lock +func (dsl *DaemonSetMultiLock) Acquire(nodeMetaData NodeMeta) (bool, string, error) { + for { + ds, err := dsl.GetDaemonSet(k8sAPICallRetrySleep, k8sAPICallRetryTimeout) + if err != nil { + return false, "", fmt.Errorf("timed out trying to get daemonset %s in namespace %s: %w", dsl.name, dsl.namespace, err) + } - valueString, exists := ds.ObjectMeta.Annotations[dsl.annotation] - if exists { - value := lockAnnotationValue{Metadata: metadata} - if err := json.Unmarshal([]byte(valueString), &value); err != nil { - return false, err + annotation := multiLockAnnotationValue{} + valueString, exists := ds.ObjectMeta.Annotations[dsl.annotation] + if exists { + if err := json.Unmarshal([]byte(valueString), &annotation); err != nil { + return false, "", fmt.Errorf("error getting multi lock: %w", err) + } } - if !ttlExpired(value.Created, value.TTL) { - return value.NodeID == dsl.nodeID, nil + lockPossible, newAnnotation := dsl.canAcquireMultiple(annotation, nodeMetaData, dsl.TTL, dsl.maxOwners) + if !lockPossible { + return false, strings.Join(nodeIDsFromMultiLock(newAnnotation), ","), nil } - } - return false, nil + if ds.ObjectMeta.Annotations == nil { + ds.ObjectMeta.Annotations = make(map[string]string) + } + newAnnotationBytes, err := json.Marshal(&newAnnotation) + if err != nil { + return false, "", fmt.Errorf("error marshalling new annotation lock: %w", err) + } + ds.ObjectMeta.Annotations[dsl.annotation] = string(newAnnotationBytes) + + _, err = dsl.client.AppsV1().DaemonSets(dsl.namespace).Update(context.Background(), ds, metav1.UpdateOptions{}) + if err != nil { + if se, ok := err.(*errors.StatusError); ok && se.ErrStatus.Reason == metav1.StatusReasonConflict { + time.Sleep(time.Second) + continue + } else { + return false, "", fmt.Errorf("error updating daemonset with multi lock: %w", err) + } + } + return true, strings.Join(nodeIDsFromMultiLock(newAnnotation), ","), nil + } } // TestMultiple attempts to check the kured daemonset lock status for multi locks -func (dsl *DaemonSetLock) TestMultiple() (bool, error) { +func (dsl *DaemonSetMultiLock) Holding() (bool, LockAnnotationValue, error) { + var lockdata LockAnnotationValue ds, err := dsl.GetDaemonSet(k8sAPICallRetrySleep, k8sAPICallRetryTimeout) if err != nil { - return false, fmt.Errorf("timed out trying to get daemonset %s in namespace %s: %w", dsl.name, dsl.namespace, err) + return false, lockdata, fmt.Errorf("timed out trying to get daemonset %s in namespace %s: %w", dsl.name, dsl.namespace, err) } valueString, exists := ds.ObjectMeta.Annotations[dsl.annotation] if exists { value := multiLockAnnotationValue{} if err := json.Unmarshal([]byte(valueString), &value); err != nil { - return false, err + return false, lockdata, err } for _, nodeLock := range value.LockAnnotations { if nodeLock.NodeID == dsl.nodeID && !ttlExpired(nodeLock.Created, nodeLock.TTL) { - return true, nil + return true, nodeLock, nil } } } - return false, nil + return false, lockdata, nil } -// Release attempts to remove the lock data from the kured ds annotations using client-go -func (dsl *DaemonSetLock) Release() error { - for { - ds, err := dsl.GetDaemonSet(k8sAPICallRetrySleep, k8sAPICallRetryTimeout) - if err != nil { - return fmt.Errorf("timed out trying to get daemonset %s in namespace %s: %w", dsl.name, dsl.namespace, err) - } - - valueString, exists := ds.ObjectMeta.Annotations[dsl.annotation] - if exists { - value := lockAnnotationValue{} - if err := json.Unmarshal([]byte(valueString), &value); err != nil { - return err - } - - if value.NodeID != dsl.nodeID { - return fmt.Errorf("Not lock holder: %v", value.NodeID) - } - } else { - return fmt.Errorf("Lock not held") - } - - delete(ds.ObjectMeta.Annotations, dsl.annotation) - - _, err = dsl.client.AppsV1().DaemonSets(dsl.namespace).Update(context.TODO(), ds, metav1.UpdateOptions{}) - if err != nil { - if se, ok := err.(*errors.StatusError); ok && se.ErrStatus.Reason == metav1.StatusReasonConflict { - // Something else updated the resource between us reading and writing - try again soon - time.Sleep(time.Second) - continue - } else { - return err - } - } - return nil +// Release attempts to remove the lock data for a single node from the multi node annotation +func (dsl *DaemonSetMultiLock) Release() error { + if dsl.releaseDelay > 0 { + log.Infof("Waiting %v before releasing lock", dsl.releaseDelay) + time.Sleep(dsl.releaseDelay) } -} - -// ReleaseMultiple attempts to remove the lock data from the kured ds annotations using client-go -func (dsl *DaemonSetLock) ReleaseMultiple() error { for { ds, err := dsl.GetDaemonSet(k8sAPICallRetrySleep, k8sAPICallRetryTimeout) if err != nil { @@ -307,28 +408,3 @@ func (dsl *DaemonSetLock) ReleaseMultiple() error { return nil } } - -// GetDaemonSet returns the named DaemonSet resource from the DaemonSetLock's configured client -func (dsl *DaemonSetLock) GetDaemonSet(sleep, timeout time.Duration) (*v1.DaemonSet, error) { - var ds *v1.DaemonSet - var lastError error - err := wait.PollImmediate(sleep, timeout, func() (bool, error) { - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - if ds, lastError = dsl.client.AppsV1().DaemonSets(dsl.namespace).Get(ctx, dsl.name, metav1.GetOptions{}); lastError != nil { - return false, nil - } - return true, nil - }) - if err != nil { - return nil, fmt.Errorf("Timed out trying to get daemonset %s in namespace %s: %v", dsl.name, dsl.namespace, lastError) - } - return ds, nil -} - -func ttlExpired(created time.Time, ttl time.Duration) bool { - if ttl > 0 && time.Since(created) >= ttl { - return true - } - return false -} diff --git a/pkg/daemonsetlock/daemonsetlock_test.go b/pkg/daemonsetlock/daemonsetlock_test.go index b21bcc23c..f68a81e28 100644 --- a/pkg/daemonsetlock/daemonsetlock_test.go +++ b/pkg/daemonsetlock/daemonsetlock_test.go @@ -66,7 +66,7 @@ func TestCanAcquireMultiple(t *testing.T) { current: multiLockAnnotationValue{}, desired: multiLockAnnotationValue{ MaxOwners: 2, - LockAnnotations: []lockAnnotationValue{ + LockAnnotations: []LockAnnotationValue{ {NodeID: node1Name}, }, }, @@ -80,13 +80,13 @@ func TestCanAcquireMultiple(t *testing.T) { maxOwners: 2, current: multiLockAnnotationValue{ MaxOwners: 2, - LockAnnotations: []lockAnnotationValue{ + LockAnnotations: []LockAnnotationValue{ {NodeID: node2Name}, }, }, desired: multiLockAnnotationValue{ MaxOwners: 2, - LockAnnotations: []lockAnnotationValue{ + LockAnnotations: []LockAnnotationValue{ {NodeID: node1Name}, {NodeID: node2Name}, }, @@ -101,7 +101,7 @@ func TestCanAcquireMultiple(t *testing.T) { maxOwners: 2, current: multiLockAnnotationValue{ MaxOwners: 2, - LockAnnotations: []lockAnnotationValue{ + LockAnnotations: []LockAnnotationValue{ { NodeID: node2Name, Created: time.Now().UTC().Add(-1 * time.Minute), @@ -116,7 +116,7 @@ func TestCanAcquireMultiple(t *testing.T) { }, desired: multiLockAnnotationValue{ MaxOwners: 2, - LockAnnotations: []lockAnnotationValue{ + LockAnnotations: []LockAnnotationValue{ {NodeID: node2Name}, {NodeID: node3Name}, }, @@ -131,7 +131,7 @@ func TestCanAcquireMultiple(t *testing.T) { maxOwners: 2, current: multiLockAnnotationValue{ MaxOwners: 2, - LockAnnotations: []lockAnnotationValue{ + LockAnnotations: []LockAnnotationValue{ { NodeID: node2Name, Created: time.Now().UTC().Add(-1 * time.Hour), @@ -146,7 +146,7 @@ func TestCanAcquireMultiple(t *testing.T) { }, desired: multiLockAnnotationValue{ MaxOwners: 2, - LockAnnotations: []lockAnnotationValue{ + LockAnnotations: []LockAnnotationValue{ {NodeID: node1Name}, {NodeID: node3Name}, }, @@ -161,7 +161,7 @@ func TestCanAcquireMultiple(t *testing.T) { maxOwners: 2, current: multiLockAnnotationValue{ MaxOwners: 2, - LockAnnotations: []lockAnnotationValue{ + LockAnnotations: []LockAnnotationValue{ { NodeID: node2Name, Created: time.Now().UTC().Add(-1 * time.Hour), @@ -176,17 +176,17 @@ func TestCanAcquireMultiple(t *testing.T) { }, desired: multiLockAnnotationValue{ MaxOwners: 2, - LockAnnotations: []lockAnnotationValue{ + LockAnnotations: []LockAnnotationValue{ {NodeID: node1Name}, }, }, lockPossible: true, }, } - + nm := NodeMeta{Unschedulable: false} for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { - lockPossible, actual := testCase.daemonSetLock.canAcquireMultiple(testCase.current, struct{}{}, time.Minute, testCase.maxOwners) + lockPossible, actual := testCase.daemonSetLock.canAcquireMultiple(testCase.current, nm, time.Minute, testCase.maxOwners) if lockPossible != testCase.lockPossible { t.Fatalf( "unexpected result for lock possible (got %t expected %t new annotation %v",