Skip to content

Commit

Permalink
Initial working appWatcher logic
Browse files Browse the repository at this point in the history
  • Loading branch information
Matt Morrison committed Jul 25, 2023
1 parent 38a529f commit 8ef2f6e
Show file tree
Hide file tree
Showing 10 changed files with 200 additions and 25 deletions.
9 changes: 9 additions & 0 deletions Earthfile
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ build-binary:
RUN GOARM=${VARIANT#v} go build -ldflags "-X github.com/zapier/kubechecks/pkg.GitCommit=$GIT_COMMIT -X github.com/zapier/kubechecks/pkg.GitTag=$GIT_TAG" -o kubechecks
SAVE ARTIFACT kubechecks

build-debug-binary:
FROM +go-deps
ARG GOARCH=amd64
WORKDIR /src
COPY . /src
RUN GOARM=${VARIANT#v} go build -gcflags="all=-N -l" -ldflags "-X github.com/zapier/kubechecks/pkg.GitCommit=$GIT_COMMIT -X github.com/zapier/kubechecks/pkg.GitTag=$GIT_TAG" -o kubechecks
SAVE ARTIFACT kubechecks

docker:
FROM ubuntu:20.04
ARG TARGETVARIANT
Expand Down Expand Up @@ -134,6 +142,7 @@ docker-debug:
FROM +docker --GIT_TAG=debug --GIT_COMMIT=abcdef --CI_REGISTRY_IMAGE=$CI_REGISTRY_IMAGE --GOARCH=$GOARCH

COPY (+dlv/dlv --GOARCH=$GOARCH --VARIANT=$TARGETVARIANT) /usr/local/bin/dlv
COPY (+build-debug-binary/kubechecks --GOARCH=$GOARCH --VARIANT=$TARGETVARIANT) .

CMD ["/usr/local/bin/dlv", "--listen=:2345", "--api-version=2", "--headless=true", "--accept-multiclient", "exec", "--continue", "./kubechecks", "controller"]

Expand Down
4 changes: 2 additions & 2 deletions charts/kubechecks/Chart.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
apiVersion: v2
name: kubechecks
description: A Helm chart for kubechecks
version: 0.2.0
appVersion: "1.0.6"
version: 0.3.0
appVersion: "1.1.0"
type: application
maintainers:
- name: zapier
5 changes: 5 additions & 0 deletions cmd/controller_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cmd

import (
"fmt"
"github.com/zapier/kubechecks/pkg/config"
"os"
"os/signal"
"syscall"
Expand All @@ -26,6 +27,7 @@ var controllerCmd = &cobra.Command{
fmt.Println("Starting KubeChecks:", pkg.GitTag, pkg.GitCommit)

server := server.NewServer(&config.ServerConfig{
ArgoCdNamespace: viper.GetString("argocd-namespace"),
UrlPrefix: viper.GetString("webhook-url-prefix"),
WebhookSecret: viper.GetString("webhook-secret"),
})
Expand Down Expand Up @@ -69,6 +71,7 @@ func init() {
flags.Bool("show-debug-info", false, "Set to true to print debug info to the footer of MR comments (KUBECHECKS_SHOW_DEBUG_INFO).")
flags.Bool("enable-conftest", false, "Set to true to enable conftest policy checking of manifests (KUBECHECKS_ENABLE_CONFTEST).")
flags.String("label-filter", "", "(Optional) If set, The label that must be set on an MR (as \"kubechecks:<value>\") for kubechecks to process the merge request webhook (KUBECHECKS_LABEL_FILTER).")
flags.String("argocd-namespace", "argocd", "The namespace to watch for Application resources")
flags.String("openai-api-token", "", "OpenAI API Token (KUBECHECKS_OPENAI_API_TOKEN).")
flags.String("webhook-url-base", "", "The URL where KubeChecks receives webhooks from Gitlab")
flags.String("webhook-url-prefix", "", "If your application is running behind a proxy that uses path based routing, set this value to match the path prefix.")
Expand All @@ -86,10 +89,12 @@ func init() {
panicIfError(viper.BindPFlag("fallback-k8s-version", flags.Lookup("fallback-k8s-version")))
panicIfError(viper.BindPFlag("show-debug-info", flags.Lookup("show-debug-info")))
panicIfError(viper.BindPFlag("label-filter", flags.Lookup("label-filter")))
panicIfError(viper.BindPFlag("argocd-namespace", flags.Lookup("argocd-namespace")))
panicIfError(viper.BindPFlag("openai-api-token", flags.Lookup("openai-api-token")))
panicIfError(viper.BindPFlag("webhook-url-base", flags.Lookup("webhook-url-base")))
panicIfError(viper.BindPFlag("webhook-url-prefix", flags.Lookup("webhook-url-prefix")))
panicIfError(viper.BindPFlag("webhook-secret", flags.Lookup("webhook-secret")))
panicIfError(viper.BindPFlag("ensure-webhooks", flags.Lookup("ensure-webhooks")))
panicIfError(viper.BindPFlag("monitor-all-applications", flags.Lookup("monitor-all-applications")))

}
1 change: 1 addition & 0 deletions localdev/kubechecks/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ deployment:
KUBECHECKS_ENABLE_WEBHOOK_CONTROLLER: false
KUBECHECKS_ARGOCD_API_INSECURE: true
KUBECHECKS_ARGOCD_API_PATH_PREFIX : '/argocd'
KUBECHECKS_ARGOCD_NAMESPACE: 'kubechecks'
KUBECHECKS_WEBHOOK_URL_PREFIX: 'kubechecks'
KUBECHECKS_NAMESPACE: 'kubechecks'
KUBECHECKS_FALLBACK_K8S_VERSION: "1.25.0"
Expand Down
16 changes: 0 additions & 16 deletions localdev/terraform/modules/vcs_files/base_files/.kubechecks.yaml

This file was deleted.

2 changes: 1 addition & 1 deletion pkg/app_directory/app_directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (d *AppDirectory) Count() int {
return len(d.appsMap)
}

func (d *AppDirectory) AddApp(app v1alpha1.Application) {
func (d *AppDirectory) AddApp(app *v1alpha1.Application) {
appName := app.Name

src := app.Spec.Source
Expand Down
2 changes: 1 addition & 1 deletion pkg/app_directory/app_directory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestPathsAreJoinedProperly(t *testing.T) {
},
}

rad.AddApp(app1)
rad.AddApp(&app1)

assert.Equal(t, map[string]ApplicationStub{
"test-app": {
Expand Down
162 changes: 162 additions & 0 deletions pkg/app_watcher/appwatcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package app_watcher

import (
"context"
appclientset "github.com/argoproj/argo-cd/v2/pkg/client/clientset/versioned"
"github.com/rs/zerolog/log"
"github.com/zapier/kubechecks/pkg/config"
"k8s.io/client-go/tools/clientcmd"

"strings"
"time"

appv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
applisters "github.com/argoproj/argo-cd/v2/pkg/client/listers/application/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apiruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
)

// ApplicationWatcher is the controller that watches ArgoCD Application resources via the Kubernetes API
type ApplicationWatcher struct {
cfg *config.ServerConfig
applicationClientset appclientset.Interface
appInformer cache.SharedIndexInformer
appCache cache.InformerSynced
appLister applisters.ApplicationLister
}

// NewApplicationWatcher creates new instance of ApplicationWatcher.
func NewApplicationWatcher(cfg *config.ServerConfig) (*ApplicationWatcher, error) {
// this assumes kubechecks is running inside the cluster
kubeCfg, err := clientcmd.BuildConfigFromFlags("", "")
if err != nil {
log.Fatal().Msgf("Error building kubeconfig: %s", err.Error())
}

appClient := appclientset.NewForConfigOrDie(kubeCfg)

ctrl := ApplicationWatcher{
cfg: cfg,
applicationClientset: appClient,
}

appInformer, appLister := ctrl.newApplicationInformerAndLister()

ctrl.appInformer = appInformer
ctrl.appLister = appLister

return &ctrl, nil
}

// Run starts the Application CRD controller.
func (ctrl *ApplicationWatcher) Run(ctx context.Context, processors int) {
log.Info().Msg("starting Application Controller")

defer runtime.HandleCrash()

go ctrl.appInformer.Run(ctx.Done())

if !cache.WaitForCacheSync(ctx.Done(), ctrl.appInformer.HasSynced) {
log.Error().Msg("Timed out waiting for caches to sync")
return
}

<-ctx.Done()
}

// onAdd is the function executed when the informer notifies the
// presence of a new Application in the namespace
func (ctrl *ApplicationWatcher) onApplicationAdded(obj interface{}) {
if !canProcessApp(obj) {
return
}
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
log.Error().Err(err).Msg("appwatcher: could not get key for added application")
}
log.Trace().Str("key", key).Msg("appwatcher: onApplicationAdded")
ctrl.cfg.VcsToArgoMap.AddApp(obj.(*appv1.Application))
}

func (ctrl *ApplicationWatcher) onApplicationUpdated(old, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
if err != nil {
log.Warn().Err(err).Msg("appwatcher: could not get key for updated application")
}
// TODO
// have any of the Source repoURLs changed?
log.Trace().Str("key", key).Msg("appwatcher: onApplicationUpdated")
}

func (ctrl *ApplicationWatcher) onApplicationDeleted(obj interface{}) {
if !canProcessApp(obj) {
return
}
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
log.Warn().Err(err).Msg("appwatcher: could not get key for deleted application")
}

log.Trace().Str("key", key).Msg("appwatcher: onApplicationDeleted")
}

/*
This Go function, named newApplicationInformerAndLister, is part of the ApplicationWatcher struct. It sets up a Kubernetes SharedIndexInformer and a Lister for Argo CD Applications.
A SharedIndexInformer is used to watch changes to a specific type of Kubernetes resource in an efficient manner. It significantly reduces the load on the Kubernetes API server by sharing and caching watches between all controllers that need to observe the object.
Listers use the data from the informer's cache to provide a read-optimized view of the cache which reduces the load on the API Server and hides some complexity.
*/
func (ctrl *ApplicationWatcher) newApplicationInformerAndLister() (cache.SharedIndexInformer, applisters.ApplicationLister) {
refreshTimeout := time.Second * 30
informer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (apiruntime.Object, error) {
return ctrl.applicationClientset.ArgoprojV1alpha1().Applications(ctrl.cfg.ArgoCdNamespace).List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return ctrl.applicationClientset.ArgoprojV1alpha1().Applications(ctrl.cfg.ArgoCdNamespace).Watch(context.TODO(), options)
},
},
&appv1.Application{},
refreshTimeout,
cache.Indexers{
cache.NamespaceIndex: func(obj interface{}) ([]string, error) {
return cache.MetaNamespaceIndexFunc(obj)
},
},
)
lister := applisters.NewApplicationLister(informer.GetIndexer())
informer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: ctrl.onApplicationAdded,
UpdateFunc: ctrl.onApplicationUpdated,
DeleteFunc: ctrl.onApplicationDeleted,
},
)
return informer, lister
}

func canProcessApp(obj interface{}) bool {
app, ok := obj.(*appv1.Application)
if !ok {
return false
}

for _, src := range app.Spec.Sources {
if isGitRepo(src.RepoURL) {
return true
}
}

if !isGitRepo(app.Spec.Source.RepoURL) {
return false
}

return true
}

func isGitRepo(url string) bool {
return strings.Contains(url, "gitlab.com") || strings.Contains(url, "github.com")
}
2 changes: 1 addition & 1 deletion pkg/config/vcs_argo_app_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (v2a *VcsToArgoMap) GetAppsInRepo(repoCloneUrl string) *app_directory.AppDi
return v2a.vcsAppStubsByRepo[repoUrl]
}

func (v2a *VcsToArgoMap) AddApp(app v1alpha1.Application) {
func (v2a *VcsToArgoMap) AddApp(app *v1alpha1.Application) {
if app.Spec.Source == nil {
return
}
Expand Down
22 changes: 18 additions & 4 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package server

import (
"context"
"github.com/zapier/kubechecks/pkg/app_watcher"
"github.com/zapier/kubechecks/pkg/config"
"net/url"
"strings"
Expand All @@ -24,10 +25,22 @@ var singleton *Server

type Server struct {
cfg *config.ServerConfig
appWatcher *app_watcher.ApplicationWatcher
}

func NewServer(cfg *config.ServerConfig) *Server {
cfg.VcsToArgoMap = config.NewVcsToArgoMap()

var appWatcher *app_watcher.ApplicationWatcher
if viper.GetBool("monitor-all-applications") {
var err error
appWatcher, err = app_watcher.NewApplicationWatcher(cfg)
if err != nil {
log.Fatal().Err(err).Msg("could not create ApplicationWatcher")
}
}
singleton = &Server{
appWatcher: appWatcher,
cfg: cfg,
}

Expand All @@ -39,6 +52,10 @@ func GetServer() *Server {
}

func (s *Server) Start() {
if s.appWatcher != nil {
go s.appWatcher.Run(context.Background(), 1)
}

if err := s.buildVcsToArgoMap(); err != nil {
log.Warn().Err(err).Msg("failed to build vcs app map from argo")
}
Expand Down Expand Up @@ -131,18 +148,15 @@ func (s *Server) buildVcsToArgoMap() error {

ctx := context.TODO()

result := config.NewVcsToArgoMap()

argoClient := argo_client.GetArgoClient()

apps, err := argoClient.GetApplications(ctx)
if err != nil {
return errors.Wrap(err, "failed to list applications")
}
for _, app := range apps.Items {
result.AddApp(app)
s.cfg.VcsToArgoMap.AddApp(&app)
}

s.cfg.VcsToArgoMap = result
return nil
}

0 comments on commit 8ef2f6e

Please sign in to comment.