Skip to content

Commit

Permalink
fix(server): keep retrying connections till node/pod is deleted
Browse files Browse the repository at this point in the history
- refactor to informer code, eliminated the for loop where we kept on retrying connection on any case
- we keep retrying connections now, till we don't get a pod deleted event
- handled 2 go lint errors S1033,S1000

Signed-off-by: daemon1024 <[email protected]>
  • Loading branch information
daemon1024 committed Nov 13, 2024
1 parent 90cf7ba commit e3b04fd
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 66 deletions.
13 changes: 13 additions & 0 deletions relay-server/server/k8sHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,19 @@ func (kh *K8sHandler) getKaPodInformer(ipsChan chan string) cache.SharedIndexInf
if ok {
if newPod.Status.PodIP != "" {
ipsChan <- newPod.Status.PodIP
if oldPod, ok := old.(*corev1.Pod); ok {
if oldPod.Status.PodIP != newPod.Status.PodIP {
DeleteClientEntry(oldPod.Status.PodIP)
}
}
}
}
},
DeleteFunc: func(obj interface{}) {
pod, ok := obj.(*corev1.Pod)
if ok {
if pod.Status.PodIP != "" {
DeleteClientEntry(pod.Status.PodIP)
}
}
},
Expand Down
137 changes: 71 additions & 66 deletions relay-server/server/relayServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,12 @@ var Running bool
var ClientList map[string]int

// ClientListLock Lock
var ClientListLock *sync.Mutex
var ClientListLock *sync.RWMutex

func init() {
Running = true
ClientList = map[string]int{}
ClientListLock = &sync.Mutex{}

ClientListLock = &sync.RWMutex{}
}

// ========== //
Expand Down Expand Up @@ -706,11 +705,7 @@ func DeleteClientEntry(nodeIP string) {
ClientListLock.Lock()
defer ClientListLock.Unlock()

_, exists := ClientList[nodeIP]

if exists {
delete(ClientList, nodeIP)
}
delete(ClientList, nodeIP)
}

// =============== //
Expand All @@ -722,66 +717,78 @@ func connectToKubeArmor(nodeIP, port string) error {
// create connection info
server := nodeIP + ":" + port

defer DeleteClientEntry(nodeIP)
for Running {
ClientListLock.RLock()
_, found := ClientList[nodeIP]
ClientListLock.RUnlock()
if !found {
// KubeArmor with this IP is deleted or the IP has changed
// parent function will spawn a new goroutine accordingly
break
}

// create a client
client := NewClient(server)
if client == nil {
return nil
}
// create a client
client := NewClient(server)
if client == nil {
time.Sleep(5 * time.Second) // wait for 5 second before retrying
continue
}

// do healthcheck
if ok := client.DoHealthCheck(); !ok {
kg.Warnf("Failed to check the liveness of KubeArmor's gRPC service (%s)", server)
return nil
}
kg.Printf("Checked the liveness of KubeArmor's gRPC service (%s)", server)
// do healthcheck
if ok := client.DoHealthCheck(); !ok {
kg.Warnf("Failed to check the liveness of KubeArmor's gRPC service (%s)", server)
time.Sleep(5 * time.Second) // wait for 5 second before retrying
continue
}
kg.Printf("Checked the liveness of KubeArmor's gRPC service (%s)", server)

var wg sync.WaitGroup
stop := make(chan struct{})
errCh := make(chan error, 1)
var wg sync.WaitGroup
stop := make(chan struct{})
errCh := make(chan error, 1)

// Start watching messages
wg.Add(1)
go func() {
client.WatchMessages(&wg, stop, errCh)
}()
kg.Print("Started to watch messages from " + server)
// Start watching messages
wg.Add(1)
go func() {
client.WatchMessages(&wg, stop, errCh)
}()
kg.Print("Started to watch messages from " + server)

// Start watching alerts
wg.Add(1)
go func() {
client.WatchAlerts(&wg, stop, errCh)
}()
kg.Print("Started to watch alerts from " + server)
// Start watching alerts
wg.Add(1)
go func() {
client.WatchAlerts(&wg, stop, errCh)
}()
kg.Print("Started to watch alerts from " + server)

// Start watching logs
wg.Add(1)
go func() {
client.WatchLogs(&wg, stop, errCh)
}()
kg.Print("Started to watch logs from " + server)

// Wait for an error or all goroutines to finish
select {
case err := <-errCh:
close(stop) // Stop other goroutines
kg.Warn(err.Error())
case <-func() chan struct{} {
done := make(chan struct{})
// Start watching logs
wg.Add(1)
go func() {
wg.Wait()
close(done)
client.WatchLogs(&wg, stop, errCh)
}()
return done
}():
// All goroutines finished without error
}
kg.Print("Started to watch logs from " + server)

if err := client.DestroyClient(); err != nil {
kg.Warnf("Failed to destroy the client (%s) %s", server, err.Error())
// Wait for an error or all goroutines to finish
select {
case err := <-errCh:
close(stop) // Stop other goroutines
kg.Warn(err.Error())
case <-func() chan struct{} {
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
return done
}():
// All goroutines finished without error
}

if err := client.DestroyClient(); err != nil {
kg.Warnf("Failed to destroy the client (%s) %s", server, err.Error())
}

kg.Printf("Destroyed the client (%s)", server)
}
kg.Printf("Destroyed the client (%s)", server)

return nil
}
Expand Down Expand Up @@ -810,15 +817,13 @@ func (rs *RelayServer) GetFeedsFromNodes() {
}

for Running {
select {
case ip := <-ipsChan:
ClientListLock.Lock()
if _, ok := ClientList[ip]; !ok {
ClientList[ip] = 1
go connectToKubeArmor(ip, rs.Port)
}
ClientListLock.Unlock()
ip := <-ipsChan
ClientListLock.Lock()
if _, ok := ClientList[ip]; !ok {
ClientList[ip] = 1
go connectToKubeArmor(ip, rs.Port)
}
ClientListLock.Unlock()
time.Sleep(10 * time.Second)
}
}
Expand Down

0 comments on commit e3b04fd

Please sign in to comment.