Skip to content

Commit

Permalink
Merge pull request #39 from Flowpack/memory-leak-debugging
Browse files Browse the repository at this point in the history
!!! BUGFIX: fix huge memory leak where job output was stored in memory for a long time - Disables {{ .Output }} and {{ .Tasks.XYZ.Output }} placeholders
  • Loading branch information
skurfuerst authored Jan 4, 2023
2 parents ed281a2 + 75e2f63 commit e1790bb
Show file tree
Hide file tree
Showing 11 changed files with 245 additions and 43 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ bin
.env*

dist/
/heapdump*
72 changes: 72 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,44 @@

This is NOT a fully featured CI pipeline solution.

<!-- TOC -->
* [Badges](#badges)
* [Components](#components)
* [prunner (this repository)](#prunner--this-repository-)
* [prunner-ui](#prunner-ui)
* [Flowpack.Prunner](#flowpackprunner)
* [User guide](#user-guide)
* [Main concepts](#main-concepts)
* [A simple pipeline](#a-simple-pipeline)
* [Task dependencies](#task-dependencies)
* [Job variables](#job-variables)
* [Environment variables](#environment-variables)
* [Dotenv files](#dotenv-files)
* [Limiting concurrency](#limiting-concurrency)
* [The wait list](#the-wait-list)
* [Debounce jobs with a start delay](#debounce-jobs-with-a-start-delay)
* [Disabling fail-fast behavior](#disabling-fail-fast-behavior)
* [Configuring retention period](#configuring-retention-period)
* [Handling of child processes](#handling-of-child-processes)
* [Graceful shutdown](#graceful-shutdown)
* [Reloading definitions and watching for changes](#reloading-definitions-and-watching-for-changes)
* [Persistent job state](#persistent-job-state)
* [Running prunner](#running-prunner)
* [CLI Reference](#cli-reference)
* [Docker](#docker)
* [Development](#development)
* [Requirements](#requirements)
* [Running locally](#running-locally)
* [IDE Setup (IntelliJ/GoLand)](#ide-setup--intellijgoland-)
* [Building for different operating systems.](#building-for-different-operating-systems)
* [Running Tests](#running-tests)
* [Memory Leak Debugging](#memory-leak-debugging)
* [Generate OpenAPI (Swagger) spec](#generate-openapi--swagger--spec)
* [Releasing](#releasing)
* [Security concept](#security-concept)
* [License](#license)
<!-- TOC -->

## Badges

[![Release](https://img.shields.io/github/release/Flowpack/prunner.svg?style=for-the-badge)](https://github.com/Flowpack/prunner/releases/latest)
Expand Down Expand Up @@ -375,6 +413,25 @@ For interacting with the API, you need a JWT token which you can generate for de
```bash
go run ./cmd/prunner debug
```
### IDE Setup (IntelliJ/GoLand)

- Please install [Go](https://plugins.jetbrains.com/plugin/9568-go) Plugin in IntelliJ.
- In the Settings of IntelliJ: Activate `Languages & Frameworks -> Go -> Go Modules` - `Enable Go Modules Integration`
- Open a Go File. At the top of the screen the following message appears: `GOROOT is not defined` -> `Setup GOROOT` -> `/usr/local/opt/go/libexec`
- If autocompletion / syntax check shows lots of things red, try the following two steps:
- restart the IDE
- if this does not help, `File -> Invalidate Caches`

- Run / Debug in IDE:
- `Run -> Edit Configurations`
- `Add new Run Configuration` -> `Go Build`
- Files: `.../cmd/prunner/main.go`
- Working Directory: `.../`

- Tests:
- `Run -> Edit Configurations`
- `Add new Run Configuration` -> `Go Test`
- Test Kind: Package (otherwise you cannot set breakpoints)

### Building for different operating systems.

Expand Down Expand Up @@ -407,6 +464,21 @@ Then, to run the linter, use:
golangci-lint run
```

### Memory Leak Debugging

to find memory leaks, you can run `prunner` in the following way:

```bash
# start prunner in profiling mode with the config from test/memory_leak_debugging/pipelines.yml
./dev.sh memory-leak-start

# run a pipeline which creates many MB of log output (possibly multiple times)
./dev.sh start-pipeline memleak1

# analyze heap dump
./dev.sh analyze-heapdump
```

### Generate OpenAPI (Swagger) spec

An OpenAPI 2.0 spec is generated from the Go types and annotations in source code using the `go-swagger` tool (it is not
Expand Down
9 changes: 8 additions & 1 deletion app/debug_cmd.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package app

import (
"fmt"
"github.com/apex/log"
"github.com/go-chi/jwtauth/v5"
"github.com/urfave/cli/v2"
"os"
)

func newDebugCmd() *cli.Command {
Expand All @@ -21,7 +23,12 @@ func newDebugCmd() *cli.Command {
claims := make(map[string]interface{})
jwtauth.SetIssuedNow(claims)
_, tokenString, _ := tokenAuth.Encode(claims)
log.Infof("Send the following HTTP header for JWT authorization:\n Authorization: Bearer %s", tokenString)
if os.Getenv("MINIMAL_OUTPUT") == "1" {
// for scripting
fmt.Printf("Bearer %s", tokenString)
} else {
log.Infof("Send the following HTTP header for JWT authorization:\n Authorization: Bearer %s", tokenString)
}

return nil
},
Expand Down
72 changes: 72 additions & 0 deletions dev.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#!/bin/bash
############################## DEV_SCRIPT_MARKER ##############################
# This script is used to document and run recurring tasks in development. #
# #
# You can run your tasks using the script `./dev some-task`. #
# You can install the Sandstorm Dev Script Runner and run your tasks from any #
# nested folder using `dev some-task`. #
# https://github.com/sandstorm/Sandstorm.DevScriptRunner #
###############################################################################

set -e

######### TASKS #########

function build() {
go build -o ./bin/prunner ./cmd/prunner
_log_success "Built ./bin/prunner"
}

function memory-leak-start() {
build


_log_success "Starting prunner on http://127.0.0.1:9009 with profiling enabled:"
_log_success " http://127.0.0.1:9009/debug/pprof/"
./bin/prunner --path test/memory_leak_debugging --verbose --enable-profiling
}

function start-pipeline {
export PIPELINE_NAME=$1

if [ "$PIPELINE_NAME" == "" ]; then
_log_error "PIPELINE_NAME must be set in call to start-pipeline"
exit 1
fi

_log_warning "Generating auth token"
TOKEN=$(MINIMAL_OUTPUT=1 go run ./cmd/prunner debug)

_log_warning "Starting pipeline $PIPELINE_NAME"

curl -XPOST -H "Authorization: $TOKEN" -H "Content-type: application/json" -d "{
\"pipeline\": \"$PIPELINE_NAME\"
}" 'http://127.0.0.1:9009/pipelines/schedule'

curl -XGET -H "Authorization: $TOKEN" -H "Content-type: application/json" \
'http://127.0.0.1:9009/pipelines/jobs' | jq .
}

function analyze-heapdump {
DUMPNAME=heapdump-$(date +%s)
curl -o $DUMPNAME http://localhost:9009/debug/pprof/heap?gc=1
#curl -o $DUMPNAME http://localhost:9009/debug/pprof/allocs
PORT=$(jot -r 1 2000 65000)
go tool pprof -http=:$PORT $DUMPNAME
}

####### Utilities #######

_log_success() {
printf "\033[0;32m%s\033[0m\n" "${1}"
}
_log_warning() {
printf "\033[1;33m%s\033[0m\n" "${1}"
}
_log_error() {
printf "\033[0;31m%s\033[0m\n" "${1}"
}

# THIS NEEDS TO BE LAST!!!
# this will run your tasks
"$@"
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ require (
github.com/taskctl/taskctl v1.3.1-0.20210426182424-d8747985c906
github.com/urfave/cli/v2 v2.4.0
gopkg.in/yaml.v2 v2.4.0
mvdan.cc/sh/v3 v3.4.3
mvdan.cc/sh/v3 v3.6.0
)

require (
Expand All @@ -44,9 +44,9 @@ require (
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
golang.org/x/crypto v0.0.0-20220331220935-ae2d96664a29 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20220403205710-6acee93ad0eb // indirect
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.3.0 // indirect
golang.org/x/term v0.3.0 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181122145206-62eef0e2fa9b/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand All @@ -222,12 +224,16 @@ golang.org/x/sys v0.0.0-20210925032602-92d5a993a665/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220403205710-6acee93ad0eb h1:PVGECzEo9Y3uOidtkHGdd347NjLtITfJFO9BxFpmRoo=
golang.org/x/sys v0.0.0-20220403205710-6acee93ad0eb/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ=
golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20191110171634-ad39bd3f0407/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210916214954-140adaaadfaf/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.3.0 h1:qoo4akIqOcDME5bhc/NgxUdovd6BSS2uMsVjB56q1xI=
golang.org/x/term v0.3.0/go.mod h1:q750SLmJuPmVoN1blW3UFBPREJfb1KmY3vwxfr+nFDA=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
Expand Down Expand Up @@ -264,3 +270,5 @@ mvdan.cc/editorconfig v0.2.0/go.mod h1:lvnnD3BNdBYkhq+B4uBuFFKatfp02eB6HixDvEz91
mvdan.cc/sh/v3 v3.1.1/go.mod h1:F+Vm4ZxPJxDKExMLhvjuI50oPnedVXpfjNSrusiTOno=
mvdan.cc/sh/v3 v3.4.3 h1:zbuKH7YH9cqU6PGajhFFXZY7dhPXcDr55iN/cUAqpuw=
mvdan.cc/sh/v3 v3.4.3/go.mod h1:p/tqPPI4Epfk2rICAe2RoaNd8HBSJ8t9Y2DA9yQlbzY=
mvdan.cc/sh/v3 v3.6.0 h1:gtva4EXJ0dFNvl5bHjcUEvws+KRcDslT8VKheTYkbGU=
mvdan.cc/sh/v3 v3.6.0/go.mod h1:U4mhtBLZ32iWhif5/lD+ygy1zrgaQhUu+XFy7C8+TTA=
28 changes: 22 additions & 6 deletions taskctl/executor.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package taskctl

import (
"bytes"
"context"
"io"
"os"
Expand All @@ -27,7 +26,6 @@ type PgidExecutor struct {
dir string
env []string
interp *interp.Runner
buf bytes.Buffer
}

// NewPgidExecutor creates new pgid executor
Expand All @@ -51,8 +49,21 @@ func NewPgidExecutor(stdin io.Reader, stdout, stderr io.Writer, killTimeout time
}

e.interp, err = interp.New(
interp.StdIO(stdin, io.MultiWriter(&e.buf, stdout), io.MultiWriter(&e.buf, stderr)),
// BEGIN MODIFICATION compared to taskctl/pkg/executor/executor.go

// in the original TaskRunner code, stdout and stderr are also stored in a Buffer inside e; and then
// returned as result of Execute(). This was needed to support the {{.Output}} variable in taskctl,
// which contained the output of the previous steps.
//
// For prunner, this lead to a huge memory leak, where all output was always kept in RAM.
//
// As we do not need to support the {{.Output}} feature, we can directly send the content to
// the stdout/stderr files.
interp.StdIO(stdin, stdout, stderr),
// we use a custom ExecHandler for overriding the process group handling
interp.ExecHandler(createExecHandler(killTimeout)),

// END MODIFICATION
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -81,12 +92,14 @@ func (e *PgidExecutor) Execute(ctx context.Context, job *executor.Job) ([]byte,
job.Dir = e.dir
}

// BEGIN MODIFICATION compared to taskctl/pkg/executor/executor.go
jobID := job.Vars.Get(JobIDVariableName).(string)

log.
WithField("component", "executor").
WithField("jobID", jobID).
Debugf("Executing %q", command)
// END MODIFICATION

e.interp.Dir = job.Dir
e.interp.Env = expand.ListEnviron(env...)
Expand All @@ -101,13 +114,16 @@ func (e *PgidExecutor) Execute(ctx context.Context, job *executor.Job) ([]byte,
}
}()

offset := e.buf.Len()
// BEGIN MODIFICATION compared to taskctl/pkg/executor/executor.go
// we disable returning the contents as byte array (needed for {{.Output}} support of
// TaskCTL, which we removed because of Memory Leaks)
err = e.interp.Run(ctx, cmd)
if err != nil {
return e.buf.Bytes()[offset:], err
return []byte{}, err
}

return e.buf.Bytes()[offset:], nil
return []byte{}, nil
// END MODIFICATION
}

func execEnv(env expand.Environ) []string {
Expand Down
10 changes: 5 additions & 5 deletions taskctl/executor_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package taskctl_test

import (
"bytes"
"context"
"io/ioutil"
"testing"
Expand All @@ -22,14 +21,15 @@ func TestPgidExecutor_Execute(t *testing.T) {
to := 1 * time.Minute
job1.Timeout = &to

output, err := e.Execute(context.Background(), job1)
_, err = e.Execute(context.Background(), job1)
if err != nil {
t.Fatal(err)
}

if !bytes.Contains(output, []byte("success")) {
t.Error()
}
// Disabled because of memory leak, see executor.go
//if !bytes.Contains(output, []byte("success")) {
// t.Error()
//}

job1 = executor.NewJobFromCommand("exit 1")

Expand Down
Loading

0 comments on commit e1790bb

Please sign in to comment.