diff --git a/cmd/flags.go b/cmd/flags.go index 13f67509..644c5d14 100644 --- a/cmd/flags.go +++ b/cmd/flags.go @@ -40,6 +40,8 @@ func newBoolOpts() DocOpt[bool] { return DocOpt[bool]{} } +func newInt64Opts() DocOpt[int64] { return DocOpt[int64]{} } + func newStringOpts() DocOpt[string] { return DocOpt[string]{} } @@ -48,6 +50,10 @@ func newStringSliceOpts() DocOpt[[]string] { return DocOpt[[]string]{} } +func int64Flag(flags *pflag.FlagSet, name, usage string, opts ...DocOpt[int64]) { + addFlag(name, usage, opts, flags.Int64, flags.Int64P) +} + func stringFlag(flags *pflag.FlagSet, name, usage string, opts ...DocOpt[string]) { addFlag(name, usage, opts, flags.String, flags.StringP) } diff --git a/cmd/root.go b/cmd/root.go index 52fc3698..ee21bdc1 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -91,6 +91,12 @@ func init() { stringFlag(flags, "worst-preupgrade-state", "The worst state that can be returned from preupgrade checks.", newStringOpts(). withDefault("panic")) + int64Flag(flags, "max-queue-size", "Size of app diff check queue.", + newInt64Opts(). + withDefault(1024)) + int64Flag(flags, "max-concurrenct-checks", "Number of concurrent checks to run.", + newInt64Opts(). + withDefault(32)) panicIfError(viper.BindPFlags(flags)) setupLogOutput() diff --git a/docs/usage.md b/docs/usage.md index eaae0d27..d95acd28 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -47,6 +47,8 @@ The full list of supported environment variables is described below: |`KUBECHECKS_KUBERNETES_CONFIG`|Path to your kubernetes config file, used to monitor applications.|| |`KUBECHECKS_LABEL_FILTER`|(Optional) If set, The label that must be set on an MR (as "kubechecks:") for kubechecks to process the merge request webhook.|| |`KUBECHECKS_LOG_LEVEL`|Set the log output level. One of error, warn, info, debug, trace.|`info`| +|`KUBECHECKS_MAX_CONCURRENCT_CHECKS`|Number of concurrent checks to run.|`32`| +|`KUBECHECKS_MAX_QUEUE_SIZE`|Size of app diff check queue.|`1024`| |`KUBECHECKS_MONITOR_ALL_APPLICATIONS`|Monitor all applications in argocd automatically.|`false`| |`KUBECHECKS_OPENAI_API_TOKEN`|OpenAI API Token.|| |`KUBECHECKS_OTEL_COLLECTOR_HOST`|The OpenTelemetry collector host.|| diff --git a/pkg/config/config.go b/pkg/config/config.go index da456565..235a409f 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -59,6 +59,8 @@ type ServerConfig struct { SchemasLocations []string `mapstructure:"schemas-location"` ShowDebugInfo bool `mapstructure:"show-debug-info"` TidyOutdatedCommentsMode string `mapstructure:"tidy-outdated-comments-mode"` + MaxQueueSize int64 `mapstructure:"max-queue-size"` + MaxConcurrenctChecks int `mapstructure:"max-concurrenct-checks"` } func New() (ServerConfig, error) { diff --git a/pkg/events/check.go b/pkg/events/check.go index 2379797d..9af2ea38 100644 --- a/pkg/events/check.go +++ b/pkg/events/check.go @@ -32,11 +32,10 @@ import ( var tracer = otel.Tracer("pkg/events") type CheckEvent struct { - fileList []string // What files have changed in this PR/MR - pullRequest vcs.PullRequest - logger zerolog.Logger - workerLimits int - vcsNote *msg.Message + fileList []string // What files have changed in this PR/MR + pullRequest vcs.PullRequest + logger zerolog.Logger + vcsNote *msg.Message affectedItems affected_apps.AffectedItems @@ -234,17 +233,12 @@ func (ce *CheckEvent) Process(ctx context.Context) error { // Concurrently process all apps, with a corresponding error channel for reporting back failures ce.addedAppsSet = make(map[string]v1alpha1.Application) - ce.appChannel = make(chan *v1alpha1.Application, len(ce.affectedItems.Applications)*2) - - // If the number of affected apps that we have is less than our worker limit, lower the worker limit - if ce.workerLimits > len(ce.affectedItems.Applications) { - ce.workerLimits = len(ce.affectedItems.Applications) - } + ce.appChannel = make(chan *v1alpha1.Application, ce.ctr.Config.MaxQueueSize) // We make one comment per run, containing output for all the apps ce.vcsNote = ce.createNote(ctx) - for w := 0; w <= ce.workerLimits; w++ { + for w := 0; w <= ce.ctr.Config.MaxConcurrenctChecks; w++ { go ce.appWorkers(ctx, w) } diff --git a/pkg/events/runner.go b/pkg/events/runner.go index 5fe70e7d..642740e5 100644 --- a/pkg/events/runner.go +++ b/pkg/events/runner.go @@ -88,7 +88,7 @@ func (r *Runner) Run(ctx context.Context, desc string, fn checkFunction, worstSt result, err := fn(ctx, r.Request) logger.Info(). Err(err). - Uint8("result", uint8(result.State)). + Str("result", result.State.BareString()). Msg("check result") if err != nil { @@ -99,10 +99,6 @@ func (r *Runner) Run(ctx context.Context, desc string, fn checkFunction, worstSt } addToAppMessage(result) - - logger.Info(). - Str("result", result.State.BareString()). - Msgf("check done") }() }