Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/hash annotations #583

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/redisoperator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (m *Main) Run() error {
}

// Create kubernetes service.
k8sservice := k8s.New(k8sClient, customClient, aeClientset, m.logger, metricsRecorder)
k8sservice := k8s.New(k8sClient, customClient, aeClientset, m.logger, metricsRecorder, m.flags.EnableObjectHashing)

// Create the redis clients
redisClient := redis.New(metricsRecorder)
Expand Down
2 changes: 2 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type CMDFlags struct {
K8sQueriesBurstable int
Concurrency int
LogLevel string
EnableObjectHashing bool
}

// Init initializes and parse the flags
Expand All @@ -39,6 +40,7 @@ func (c *CMDFlags) Init() {
// reference: https://github.com/spotahome/kooper/blob/master/controller/controller.go#L89
flag.IntVar(&c.Concurrency, "concurrency", 3, "Number of conccurent workers meant to process events")
flag.StringVar(&c.LogLevel, "log-level", "info", "set log level")
flag.BoolVar(&c.EnableObjectHashing, "enable-hash", false, "Add hashed annotations to k8s objects, apply changes only when theres a diff.")
// Parse flags
flag.Parse()

Expand Down
8 changes: 8 additions & 0 deletions service/k8s/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,14 @@ func (p *ConfigMapService) CreateOrUpdateConfigMap(namespace string, configMap *
return err
}

if hashingEnabled() {
if !shouldUpdate(configMap, storedConfigMap) {
p.logger.Debugf("%v/%v configmap is upto date, no need to apply changes...", configMap.Namespace, configMap.Name)
return nil
}
p.logger.Debugf("%v/%v configmap has a different resource hash, updating the object...", configMap.Namespace, configMap.Name)
addHashAnnotation(configMap)
}
// Already exists, need to Update.
// Set the correct resource version to ensure we are on the latest version. This way the only valid
// namespace is our spec(https://github.com/kubernetes/community/blob/master/contributors/devel/api-conventions.md#concurrency-control-and-consistency),
Expand Down
9 changes: 9 additions & 0 deletions service/k8s/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,15 @@ func (d *DeploymentService) CreateOrUpdateDeployment(namespace string, deploymen
return err
}

if hashingEnabled() {
if !shouldUpdate(deployment, storedDeployment) {
d.logger.Debugf("%v/%v deployment is upto date, no need to apply changes...", deployment.Namespace, deployment.Name)
return nil
}
d.logger.Debugf("%v/%v deployment has a different resource hash, updating the object...", deployment.Namespace, deployment.Name)
addHashAnnotation(deployment)
}

// Already exists, need to Update.
// Set the correct resource version to ensure we are on the latest version. This way the only valid
// namespace is our spec(https://github.com/kubernetes/community/blob/master/contributors/devel/api-conventions.md#concurrency-control-and-consistency),
Expand Down
64 changes: 64 additions & 0 deletions service/k8s/hash_annotations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package k8s

import (
"crypto/sha256"
"encoding/base64"
"hash"

"github.com/davecgh/go-spew/spew"
)

// taken from https://github.com/k8ssandra/cass-operator/blob/master/pkg/utils/hash_annotation.go

type Annotated interface {
GetAnnotations() map[string]string
SetAnnotations(annotations map[string]string)
GetName() string
}

const resourceHashAnnotationKey = "databases.spotahome.com/resource-hash"

// Create hash of a given object

func addHashAnnotation(r Annotated) {
hash := deepHashString(r)
m := r.GetAnnotations()
if m == nil {
m = map[string]string{}
}
m[resourceHashAnnotationKey] = hash
r.SetAnnotations(m)
}

func deepHashString(obj interface{}) string {
hasher := sha256.New()
deepHashObject(hasher, obj)
hashBytes := hasher.Sum([]byte{})
b64Hash := base64.StdEncoding.EncodeToString(hashBytes)
return b64Hash
}

// DeepHashObject writes specified object to hash using the spew library
// which follows pointers and prints actual values of the nested objects
// ensuring the hash does not change when a pointer changes.
func deepHashObject(hasher hash.Hash, objectToWrite interface{}) {
hasher.Reset()
printer := spew.ConfigState{
Indent: " ",
SortKeys: true,
DisableMethods: true,
SpewKeys: true,
}
printer.Fprintf(hasher, "%#v", objectToWrite)
}

func shouldUpdate(desired Annotated, stored Annotated) bool {

storedHash, exists := stored.GetAnnotations()[resourceHashAnnotationKey]
if !exists {
return true
}
desiredHash := deepHashString(desired)

return desiredHash != storedHash
}
11 changes: 10 additions & 1 deletion service/k8s/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ type Services interface {
StatefulSet
}

var (
objectHashingEnabled bool
)

func hashingEnabled() bool {
return objectHashingEnabled
}

type services struct {
ConfigMap
Secret
Expand All @@ -35,7 +43,8 @@ type services struct {
}

// New returns a new Kubernetes service.
func New(kubecli kubernetes.Interface, crdcli redisfailoverclientset.Interface, apiextcli apiextensionscli.Interface, logger log.Logger, metricsRecorder metrics.Recorder) Services {
func New(kubecli kubernetes.Interface, crdcli redisfailoverclientset.Interface, apiextcli apiextensionscli.Interface, logger log.Logger, metricsRecorder metrics.Recorder, enableHashing bool) Services {
objectHashingEnabled = enableHashing
return &services{
ConfigMap: NewConfigMapService(kubecli, logger, metricsRecorder),
Secret: NewSecretService(kubecli, logger, metricsRecorder),
Expand Down
9 changes: 9 additions & 0 deletions service/k8s/poddisruptionbudget.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,15 @@ func (p *PodDisruptionBudgetService) CreateOrUpdatePodDisruptionBudget(namespace
return err
}

if hashingEnabled() {
if !shouldUpdate(podDisruptionBudget, storedPodDisruptionBudget) {
p.logger.Debugf("%v/%v pdb is upto date, no need to apply changes...", podDisruptionBudget.Namespace, podDisruptionBudget.Name)
return nil
}
p.logger.Debugf("%v/%v pdb has a different resource hash, updating the object...", podDisruptionBudget.Namespace, podDisruptionBudget.Name)
addHashAnnotation(podDisruptionBudget)
}

// Already exists, need to Update.
// Set the correct resource version to ensure we are on the latest version. This way the only valid
// namespace is our spec(https://github.com/kubernetes/community/blob/master/contributors/devel/api-conventions.md#concurrency-control-and-consistency),
Expand Down
17 changes: 17 additions & 0 deletions service/k8s/rbac.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,14 @@ func (r *RBACService) CreateOrUpdateRole(namespace string, role *rbacv1.Role) er
return err
}

if hashingEnabled() {
if !shouldUpdate(role, storedRole) {
r.logger.Debugf("%v/%v role is upto date, no need to apply changes...", role.Namespace, role.Name)
return nil
}
r.logger.Debugf("%v/%v role has a different resource hash, updating the object...", role.Namespace, role.Name)
addHashAnnotation(role)
}
// Already exists, need to Update.
// Set the correct resource version to ensure we are on the latest version. This way the only valid
// namespace is our spec(https://github.com/kubernetes/community/blob/master/contributors/devel/api-conventions.md#concurrency-control-and-consistency),
Expand Down Expand Up @@ -148,6 +156,15 @@ func (r *RBACService) CreateOrUpdateRoleBinding(namespace string, binding *rbacv
return err
}

if hashingEnabled() {
if !shouldUpdate(binding, storedBinding) {
r.logger.Debugf("%v/%v rolebinding is upto date, no need to apply changes...", binding.Namespace, binding.Name)
return nil
}
r.logger.Debugf("%v/%v rolebinding has a different resource hash, updating the object...", binding.Namespace, binding.Name)
addHashAnnotation(binding)
}

// Check if the role ref has changed, roleref updates are not allowed, if changed then delete and create again the role binding.
// https://github.com/kubernetes/kubernetes/blob/0f0a5223dfc75337d03c9b80ae552ae8ef138eeb/pkg/apis/rbac/validation/validation.go#L157-L159
if storedBinding.RoleRef != binding.RoleRef {
Expand Down
9 changes: 9 additions & 0 deletions service/k8s/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,15 @@ func (s *ServiceService) CreateOrUpdateService(namespace string, service *corev1
return err
}

if hashingEnabled() {
if !shouldUpdate(service, storedService) {
s.logger.Debugf("%v/%v service is upto date, no need to apply changes...", service.Namespace, service.Name)
return nil
}
s.logger.Debugf("%v/%v service has a different resource hash, updating the object...", service.Namespace, service.Name)
addHashAnnotation(service)
}

// Already exists, need to Update.
// Set the correct resource version to ensure we are on the latest version. This way the only valid
// namespace is our spec(https://github.com/kubernetes/community/blob/master/contributors/devel/api-conventions.md#concurrency-control-and-consistency),
Expand Down
19 changes: 13 additions & 6 deletions service/k8s/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,19 +97,15 @@ func (s *StatefulSetService) UpdateStatefulSet(namespace string, statefulSet *ap
// CreateOrUpdateStatefulSet will update the statefulset or create it if does not exist
func (s *StatefulSetService) CreateOrUpdateStatefulSet(namespace string, statefulSet *appsv1.StatefulSet) error {
storedStatefulSet, err := s.GetStatefulSet(namespace, statefulSet.Name)

if err != nil {
// If no resource we need to create.
if errors.IsNotFound(err) {
addHashAnnotation(statefulSet)
return s.CreateStatefulSet(namespace, statefulSet)
}
return err
}

// Already exists, need to Update.
// Set the correct resource version to ensure we are on the latest version. This way the only valid
// namespace is our spec(https://github.com/kubernetes/community/blob/master/contributors/devel/api-conventions.md#concurrency-control-and-consistency),
// we will replace the current namespace state.
statefulSet.ResourceVersion = storedStatefulSet.ResourceVersion
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resource version always changes after apply, causing sts to have perpetual diff

// resize pvc
// 1.Get the data already stored internally
// 2.Get the desired data
Expand Down Expand Up @@ -171,6 +167,17 @@ func (s *StatefulSetService) CreateOrUpdateStatefulSet(namespace string, statefu
// set stored.volumeClaimTemplates
statefulSet.Spec.VolumeClaimTemplates = storedStatefulSet.Spec.VolumeClaimTemplates
statefulSet.Annotations = util.MergeAnnotations(storedStatefulSet.Annotations, statefulSet.Annotations)

if hashingEnabled() {
delete(statefulSet.Annotations, resourceHashAnnotationKey) // this will be regenerated if changes are required.
if !shouldUpdate(statefulSet, storedStatefulSet) {
s.logger.Debugf("%v/%v statefulset is upto date, no need to apply changes...", statefulSet.Namespace, statefulSet.Name)
return nil
}
s.logger.Debugf("%v/%v statefulset has a different resource hash, updating the object...", statefulSet.Namespace, statefulSet.Name)
addHashAnnotation(statefulSet)
}

return s.UpdateStatefulSet(namespace, statefulSet)
}

Expand Down
2 changes: 1 addition & 1 deletion test/integration/redisfailover/creation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestRedisFailover(t *testing.T) {
}

// Create kubernetes service.
k8sservice := k8s.New(k8sClient, customClient, aeClientset, log.Dummy, metrics.Dummy)
k8sservice := k8s.New(k8sClient, customClient, aeClientset, log.Dummy, metrics.Dummy, true)

// Prepare namespace
prepErr := clients.prepareNS()
Expand Down