Skip to content

Commit

Permalink
Fix goroutine leaks in other unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ptodev committed Aug 20, 2024
1 parent fe5bdb6 commit 1df216d
Showing 1 changed file with 159 additions and 58 deletions.
217 changes: 159 additions & 58 deletions internal/component/loki/process/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"os"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -316,6 +317,8 @@ stage.labels {
}

func TestEntrySentToTwoProcessComponents(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"))

// Set up two different loki.process components.
stg1 := `
forward_to = []
Expand All @@ -337,13 +340,16 @@ stage.static_labels {
args1.ForwardTo = []loki.LogsReceiver{ch1}
args2.ForwardTo = []loki.LogsReceiver{ch2}

ctx, ctxCancel := context.WithCancel(context.Background())
defer ctxCancel()

// Start the loki.process components.
tc1, err := componenttest.NewControllerFromID(util.TestLogger(t), "loki.process")
require.NoError(t, err)
tc2, err := componenttest.NewControllerFromID(util.TestLogger(t), "loki.process")
require.NoError(t, err)
go func() { require.NoError(t, tc1.Run(componenttest.TestContext(t), args1)) }()
go func() { require.NoError(t, tc2.Run(componenttest.TestContext(t), args2)) }()
go func() { require.NoError(t, tc1.Run(ctx, args1)) }()
go func() { require.NoError(t, tc2.Run(ctx, args2)) }()
require.NoError(t, tc1.WaitExports(time.Second))
require.NoError(t, tc2.WaitExports(time.Second))

Expand All @@ -357,7 +363,7 @@ stage.static_labels {
require.NoError(t, err)

go func() {
err := ctrl.Run(context.Background(), lsf.Arguments{
err := ctrl.Run(ctx, lsf.Arguments{
Targets: []discovery.Target{{"__path__": f.Name(), "somelbl": "somevalue"}},
ForwardTo: []loki.LogsReceiver{
tc1.Exports().(Exports).Receiver,
Expand Down Expand Up @@ -395,68 +401,102 @@ stage.static_labels {
}
}

func TestDeadlockWithFrequentUpdates(t *testing.T) {
stg := `stage.json {
expressions = {"output" = "log", stream = "stream", timestamp = "time", "extra" = "" }
drop_malformed = true
}
stage.json {
expressions = { "user" = "" }
source = "extra"
}
stage.labels {
values = {
stream = "",
user = "",
ts = "timestamp",
}
}`
type testFrequentUpdate struct {
t *testing.T
c *Component

// Unmarshal the River relabel rules into a custom struct, as we don't have
// an easy way to refer to a loki.LogsReceiver value for the forward_to
// argument.
type cfg struct {
Stages []stages.StageConfig `river:"stage,enum"`
receiver1 loki.LogsReceiver
receiver2 loki.LogsReceiver

keepSending atomic.Bool
keepReceiving atomic.Bool
keepUpdating atomic.Bool

wgLogSend sync.WaitGroup
wgRun sync.WaitGroup
wgUpdate sync.WaitGroup

lastSend atomic.Value

stop func()
}

func startTestFrequentUpdate(t *testing.T, cfg string) *testFrequentUpdate {
res := testFrequentUpdate{
t: t,
receiver1: loki.NewLogsReceiver(),
receiver2: loki.NewLogsReceiver(),
}
var stagesCfg cfg
err := river.Unmarshal([]byte(stg), &stagesCfg)

ctx, cancel := context.WithCancel(context.Background())

res.keepSending.Store(true)
res.keepReceiving.Store(true)
res.keepUpdating.Store(true)

res.stop = func() {
res.keepUpdating.Store(false)
res.wgUpdate.Wait()

res.keepSending.Store(false)
res.wgLogSend.Wait()

cancel()
res.wgRun.Wait()

close(res.receiver1.Chan())
close(res.receiver2.Chan())
}

var args Arguments
err := river.Unmarshal([]byte(cfg), &args)
require.NoError(t, err)

ch1, ch2 := loki.NewLogsReceiver(), loki.NewLogsReceiver()
args.ForwardTo = []loki.LogsReceiver{res.receiver1, res.receiver2}

// Create and run the component, so that it can process and forwards logs.
opts := component.Options{
Logger: util.TestFlowLogger(t),
Registerer: prometheus.NewRegistry(),
OnStateChange: func(e component.Exports) {},
}
args := Arguments{
ForwardTo: []loki.LogsReceiver{ch1, ch2},
Stages: stagesCfg.Stages,
}

c, err := New(opts, args)
res.c, err = New(opts, args)
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go c.Run(ctx)

var lastSend atomic.Value
// Drain received logs
res.wgRun.Add(1)
go func() {
res.c.Run(ctx)
res.wgRun.Done()
}()

return &res
}

// Continuously receive the logs from both channels
func (r *testFrequentUpdate) drainLogs() {
drainLogs := func() {
r.lastSend.Store(time.Now())
}

r.wgLogSend.Add(1)
go func() {
for {
for r.keepReceiving.Load() {
select {
case <-ch1.Chan():
lastSend.Store(time.Now())
case <-ch2.Chan():
lastSend.Store(time.Now())
case <-r.receiver1.Chan():
drainLogs()
case <-r.receiver2.Chan():
drainLogs()
}
}
r.wgLogSend.Done()
}()
}

// Continuously send entries to both channels
// Continuously send entries to both channels
func (r *testFrequentUpdate) sendLogs() {
r.wgLogSend.Add(1)
go func() {
for {
for r.keepSending.Load() {
ts := time.Now()
logEntry := loki.Entry{
Labels: model.LabelSet{"filename": "/var/log/pods/agent/agent/1.log", "foo": "bar"},
Expand All @@ -465,28 +505,89 @@ func TestDeadlockWithFrequentUpdates(t *testing.T) {
Line: logline,
},
}
c.receiver.Chan() <- logEntry
select {
case r.c.receiver.Chan() <- logEntry:
default:
// continue
}
}
r.keepReceiving.Store(false)
r.wgLogSend.Done()
}()
}

// Call Updates
args1 := Arguments{
ForwardTo: []loki.LogsReceiver{ch1},
Stages: stagesCfg.Stages,
}
args2 := Arguments{
ForwardTo: []loki.LogsReceiver{ch2},
Stages: stagesCfg.Stages,
}
func (r *testFrequentUpdate) updateContinuously(cfg1, cfg2 string) {
var args1 Arguments
err := river.Unmarshal([]byte(cfg1), &args1)
require.NoError(r.t, err)
args1.ForwardTo = []loki.LogsReceiver{r.receiver1}

var args2 Arguments
err = river.Unmarshal([]byte(cfg2), &args2)
require.NoError(r.t, err)
args2.ForwardTo = []loki.LogsReceiver{r.receiver2}

r.wgUpdate.Add(1)
go func() {
for {
c.Update(args1)
c.Update(args2)
for r.keepUpdating.Load() {
r.c.Update(args1)
r.c.Update(args2)
}
r.wgUpdate.Done()
}()
}

func TestDeadlockWithFrequentUpdates(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"))

cfg1 := `stage.json {
expressions = {"output" = "log", stream = "stream", timestamp = "time", "extra" = "" }
drop_malformed = true
}
stage.json {
expressions = { "user" = "" }
source = "extra"
}
stage.labels {
values = {
stream = "",
user = "",
ts = "timestamp",
}
}
forward_to = []`

cfg2 := `stage.json {
expressions = {"output" = "log", stream = "stream", timestamp = "time", "extra" = "" }
drop_malformed = true
}
stage.labels {
values = {
stream = "",
ts = "timestamp",
}
}
forward_to = []`

r := startTestFrequentUpdate(t, `forward_to = []`)

// Continuously send entries to both channels
r.sendLogs()

// Continuously receive entries on both channels
r.drainLogs()

// Call Updates
r.updateContinuously(cfg1, cfg2)

// Run everything for a while
time.Sleep(1 * time.Second)
require.WithinDuration(t, time.Now(), r.lastSend.Load().(time.Time), 300*time.Millisecond)

// Clean up
r.stop()
}

// Make sure there are no goroutine leaks when the config is updated.
// Goroutine leaks often cause memory leaks.
func TestLeakyUpdate(t *testing.T) {
Expand Down

0 comments on commit 1df216d

Please sign in to comment.