-
Notifications
You must be signed in to change notification settings - Fork 17
/
kubernetes.go
222 lines (209 loc) · 6.42 KB
/
kubernetes.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
package main
import (
"fmt"
"log"
"os"
"path/filepath"
drainer "github.com/openshift/kubernetes-drain"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
const clusterAutoscalerScaleDownDisabledFlag = "cluster-autoscaler.kubernetes.io/scale-down-disabled"
type kubernetesReadiness struct {
clientset *kubernetes.Clientset
ignoreDaemonSets bool
deleteLocalData bool
}
func (k *kubernetesReadiness) getUnreadyCount(hostnames []string, ids []string) (int, error) {
hostHash := map[string]bool{}
for _, h := range hostnames {
hostHash[h] = true
}
/*
in AWS, the `name` of the node *always* is the internal private DNS name
you can get a node by name by doing Nodes().Get(name)
In other words the `name` of the node is set independently and does not care what
the kubelet had for --hostname-override.
However, if you want multiple nodes, you need to use the `List()` interface.
This interface does not accept multiple hostnames. It lists everything, subject only to a filter
The filter, however, can filter only on labels, and not on the name.
We _should_ be able to just filter on kubernetes.io/hostname label, but this label *does*
respect --hostname-override, which we do not know if it is set or not. Oops.
This, for now, we are stuck doing multiple Get(), one for each hostname, or doing a List() of all nodes
*/
nodes, err := k.clientset.CoreV1().Nodes().List(v1.ListOptions{})
if err != nil {
return 0, fmt.Errorf("Unexpected error getting nodes for cluster: %v", err)
}
unReadyCount := 0
for _, n := range nodes.Items {
// first make sure that this is one of the new nodes we care about
if _, ok := hostHash[n.ObjectMeta.Name]; !ok {
continue
}
// next check its status
conditions := n.Status.Conditions
if conditions[len(conditions)-1].Type != corev1.NodeReady {
unReadyCount++
}
}
return unReadyCount, nil
}
func (k *kubernetesReadiness) prepareTermination(hostnames []string, ids []string, drain, drainForce bool) error {
// get the node reference - first need the hostname
var (
node *corev1.Node
err error
)
// Skip drain
if !drain {
return nil
}
for _, h := range hostnames {
node, err = k.clientset.CoreV1().Nodes().Get(h, v1.GetOptions{})
if err != nil {
return fmt.Errorf("Unexpected error getting kubernetes node %s: %v", h, err)
}
// set options and drain nodes
err = drainer.Drain(k.clientset, []*corev1.Node{node}, &drainer.DrainOptions{
IgnoreDaemonsets: k.ignoreDaemonSets,
GracePeriodSeconds: -1,
Force: drainForce,
DeleteLocalData: k.deleteLocalData,
})
if err != nil {
return fmt.Errorf("Unexpected error draining kubernetes node %s: %v", h, err)
}
}
return nil
}
func kubeGetClientset(kubernetesEnabled bool) (*kubernetes.Clientset, error) {
// if it is *explicitly* set to false, then do nothing
if !kubernetesEnabled {
return nil, nil
}
// creates the in-cluster config
config, err := rest.InClusterConfig()
if err != nil {
if err == rest.ErrNotInCluster {
if !kubernetesEnabled {
return nil, nil
}
config, err = getKubeOutOfCluster()
if err != nil {
return nil, err
}
} else {
return nil, fmt.Errorf("Error getting kubernetes config from within cluster")
}
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
return clientset, nil
}
func getKubeOutOfCluster() (*rest.Config, error) {
kubeconfig := os.Getenv("KUBECONFIG")
if kubeconfig == "" {
if home := homeDir(); home != "" {
kubeconfig = filepath.Join(home, ".kube", "config")
} else {
return nil, fmt.Errorf("Not KUBECONFIG provided and no home available")
}
}
// use the current context in kubeconfig
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
panic(err.Error())
}
return config, nil
}
func homeDir() string {
if h := os.Getenv("HOME"); h != "" {
return h
}
return os.Getenv("USERPROFILE") // windows
}
func kubeGetReadinessHandler(kubernetesEnabled, ignoreDaemonSets, deleteLocalData bool) (readiness, error) {
clientset, err := kubeGetClientset(kubernetesEnabled)
if err != nil {
log.Fatalf("Error getting kubernetes connection: %v", err)
}
if clientset == nil {
return nil, nil
}
return &kubernetesReadiness{clientset: clientset, ignoreDaemonSets: ignoreDaemonSets, deleteLocalData: deleteLocalData}, nil
}
// setScaleDownDisabledAnnotation set the "cluster-autoscaler.kubernetes.io/scale-down-disabled" annotation
// on the list of nodes if required. Returns a list of 151 where the annotation
// is applied.
func setScaleDownDisabledAnnotation(kubernetesEnabled bool, hostnames []string) ([]string, error) {
// get the node reference - first need the hostname
var (
node *corev1.Node
err error
key = clusterAutoscalerScaleDownDisabledFlag
annotated = []string{}
)
clientset, err := kubeGetClientset(kubernetesEnabled)
if err != nil {
log.Fatalf("Error getting kubernetes connection: %v", err)
}
if clientset == nil {
return annotated, nil
}
nodes := clientset.CoreV1().Nodes()
for _, h := range hostnames {
node, err = nodes.Get(h, v1.GetOptions{})
if err != nil {
return annotated, fmt.Errorf("Unexpected error getting kubernetes node %s: %v", h, err)
}
annotations := node.GetAnnotations()
if value := annotations[key]; value != "true" {
annotations[key] = "true"
node.SetAnnotations(annotations)
_, err := nodes.Update(node)
if err != nil {
return annotated, err
}
annotated = append(annotated, h)
}
}
return annotated, nil
}
func removeScaleDownDisabledAnnotation(kubernetesEnabled bool, hostnames []string) error {
// get the node reference - first need the hostname
var (
node *corev1.Node
err error
key = clusterAutoscalerScaleDownDisabledFlag
)
clientset, err := kubeGetClientset(kubernetesEnabled)
if err != nil {
log.Fatalf("Error getting kubernetes connection: %v", err)
}
if clientset == nil {
return nil
}
nodes := clientset.CoreV1().Nodes()
for _, h := range hostnames {
node, err = nodes.Get(h, v1.GetOptions{})
if err != nil {
return fmt.Errorf("Unexpected error getting kubernetes node %s: %v", h, err)
}
annotations := node.GetAnnotations()
if _, ok := annotations[key]; ok {
delete(annotations, key)
node.SetAnnotations(annotations)
_, err := nodes.Update(node)
if err != nil {
return err
}
}
}
return nil
}