Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

increase channels size and goroutine limits #175

Merged
merged 2 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cmd/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ func newBoolOpts() DocOpt[bool] {
return DocOpt[bool]{}
}

func newInt64Opts() DocOpt[int64] { return DocOpt[int64]{} }

func newStringOpts() DocOpt[string] {
return DocOpt[string]{}
}
Expand All @@ -48,6 +50,10 @@ func newStringSliceOpts() DocOpt[[]string] {
return DocOpt[[]string]{}
}

func int64Flag(flags *pflag.FlagSet, name, usage string, opts ...DocOpt[int64]) {
addFlag(name, usage, opts, flags.Int64, flags.Int64P)
}

func stringFlag(flags *pflag.FlagSet, name, usage string, opts ...DocOpt[string]) {
addFlag(name, usage, opts, flags.String, flags.StringP)
}
Expand Down
6 changes: 6 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ func init() {
stringFlag(flags, "worst-preupgrade-state", "The worst state that can be returned from preupgrade checks.",
newStringOpts().
withDefault("panic"))
int64Flag(flags, "max-queue-size", "Size of app diff check queue.",
newInt64Opts().
withDefault(1024))
int64Flag(flags, "max-concurrenct-checks", "Number of concurrent checks to run.",
newInt64Opts().
withDefault(32))

panicIfError(viper.BindPFlags(flags))
setupLogOutput()
Expand Down
2 changes: 2 additions & 0 deletions docs/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ The full list of supported environment variables is described below:
|`KUBECHECKS_KUBERNETES_CONFIG`|Path to your kubernetes config file, used to monitor applications.||
|`KUBECHECKS_LABEL_FILTER`|(Optional) If set, The label that must be set on an MR (as "kubechecks:<value>") for kubechecks to process the merge request webhook.||
|`KUBECHECKS_LOG_LEVEL`|Set the log output level. One of error, warn, info, debug, trace.|`info`|
|`KUBECHECKS_MAX_CONCURRENCT_CHECKS`|Number of concurrent checks to run.|`32`|
|`KUBECHECKS_MAX_QUEUE_SIZE`|Size of app diff check queue.|`1024`|
|`KUBECHECKS_MONITOR_ALL_APPLICATIONS`|Monitor all applications in argocd automatically.|`false`|
|`KUBECHECKS_OPENAI_API_TOKEN`|OpenAI API Token.||
|`KUBECHECKS_OTEL_COLLECTOR_HOST`|The OpenTelemetry collector host.||
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ type ServerConfig struct {
SchemasLocations []string `mapstructure:"schemas-location"`
ShowDebugInfo bool `mapstructure:"show-debug-info"`
TidyOutdatedCommentsMode string `mapstructure:"tidy-outdated-comments-mode"`
MaxQueueSize int64 `mapstructure:"max-queue-size"`
MaxConcurrenctChecks int `mapstructure:"max-concurrenct-checks"`
}

func New() (ServerConfig, error) {
Expand Down
18 changes: 6 additions & 12 deletions pkg/events/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,10 @@ import (
var tracer = otel.Tracer("pkg/events")

type CheckEvent struct {
fileList []string // What files have changed in this PR/MR
pullRequest vcs.PullRequest
logger zerolog.Logger
workerLimits int
vcsNote *msg.Message
fileList []string // What files have changed in this PR/MR
pullRequest vcs.PullRequest
logger zerolog.Logger
vcsNote *msg.Message

affectedItems affected_apps.AffectedItems

Expand Down Expand Up @@ -234,17 +233,12 @@ func (ce *CheckEvent) Process(ctx context.Context) error {

// Concurrently process all apps, with a corresponding error channel for reporting back failures
ce.addedAppsSet = make(map[string]v1alpha1.Application)
ce.appChannel = make(chan *v1alpha1.Application, len(ce.affectedItems.Applications)*2)

// If the number of affected apps that we have is less than our worker limit, lower the worker limit
if ce.workerLimits > len(ce.affectedItems.Applications) {
ce.workerLimits = len(ce.affectedItems.Applications)
}
ce.appChannel = make(chan *v1alpha1.Application, ce.ctr.Config.MaxQueueSize)

// We make one comment per run, containing output for all the apps
ce.vcsNote = ce.createNote(ctx)

for w := 0; w <= ce.workerLimits; w++ {
for w := 0; w <= ce.ctr.Config.MaxConcurrenctChecks; w++ {
go ce.appWorkers(ctx, w)
}

Expand Down
6 changes: 1 addition & 5 deletions pkg/events/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (r *Runner) Run(ctx context.Context, desc string, fn checkFunction, worstSt
result, err := fn(ctx, r.Request)
logger.Info().
Err(err).
Uint8("result", uint8(result.State)).
Str("result", result.State.BareString()).
Msg("check result")

if err != nil {
Expand All @@ -99,10 +99,6 @@ func (r *Runner) Run(ctx context.Context, desc string, fn checkFunction, worstSt
}

addToAppMessage(result)

logger.Info().
Str("result", result.State.BareString()).
Msgf("check done")
}()
}

Expand Down
Loading