From fe5bdb6196cb11a2abc3db3ad9a87c4c0b6fda5a Mon Sep 17 00:00:00 2001 From: Paulin Todev Date: Tue, 20 Aug 2024 18:05:45 +0100 Subject: [PATCH] Cleanup loki.process on update --- CHANGELOG.md | 4 ++ internal/component/loki/process/process.go | 6 +- .../component/loki/process/process_test.go | 55 ++++++++++++++++++- 3 files changed, 61 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c47cbbf4b1f1..b7c2c8ebb7f8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,10 @@ internal API changes are not present. Main (unreleased) ----------------- +### Bugfixes + +- Fix a memory leak which would occur any time `loki.process` had its configuration reloaded. (@ptodev) + v0.42.0 (2024-07-24) ------------------------- diff --git a/internal/component/loki/process/process.go b/internal/component/loki/process/process.go index b7d0e8eb1b47..ecc863762fc4 100644 --- a/internal/component/loki/process/process.go +++ b/internal/component/loki/process/process.go @@ -90,7 +90,6 @@ func (c *Component) Run(ctx context.Context) error { if c.entryHandler != nil { c.entryHandler.Stop() } - close(c.processIn) c.mut.RUnlock() }() wg := &sync.WaitGroup{} @@ -127,8 +126,9 @@ func (c *Component) Update(args component.Arguments) error { if err != nil { return err } - c.entryHandler = loki.NewEntryHandler(c.processOut, func() { pipeline.Cleanup() }) - c.processIn = pipeline.Wrap(c.entryHandler).Chan() + entryHandler := loki.NewEntryHandler(c.processOut, func() { pipeline.Cleanup() }) + c.entryHandler = pipeline.Wrap(entryHandler) + c.processIn = c.entryHandler.Chan() c.stages = newArgs.Stages } diff --git a/internal/component/loki/process/process_test.go b/internal/component/loki/process/process_test.go index 5256bbcbca0f..3c18a7f513f7 100644 --- a/internal/component/loki/process/process_test.go +++ b/internal/component/loki/process/process_test.go @@ -487,7 +487,60 @@ func TestDeadlockWithFrequentUpdates(t *testing.T) { // Run everything for a while time.Sleep(1 * time.Second) - require.WithinDuration(t, time.Now(), lastSend.Load().(time.Time), 300*time.Millisecond) +// Make sure there are no goroutine leaks when the config is updated. +// Goroutine leaks often cause memory leaks. +func TestLeakyUpdate(t *testing.T) { + defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start")) + + tester := newTester(t) + defer tester.stop() + + forwardArgs := ` + // This will be filled later + forward_to = []` + + numLogsToSend := 1 + + cfg1 := ` + stage.metrics { + metric.counter { + name = "paulin_test1" + action = "inc" + match_all = true + } + }` + forwardArgs + + cfg2 := ` + stage.metrics { + metric.counter { + name = "paulin_test2" + action = "inc" + match_all = true + } + }` + forwardArgs + + metricsTemplate1 := ` + # HELP loki_process_custom_paulin_test1 + # TYPE loki_process_custom_paulin_test1 counter + loki_process_custom_paulin_test1{filename="/var/log/pods/agent/agent/1.log",foo="bar"} %d + ` + + metricsTemplate2 := ` + # HELP loki_process_custom_paulin_test2 + # TYPE loki_process_custom_paulin_test2 counter + loki_process_custom_paulin_test2{filename="/var/log/pods/agent/agent/1.log",foo="bar"} %d + ` + + metrics1 := fmt.Sprintf(metricsTemplate1, numLogsToSend) + metrics2 := fmt.Sprintf(metricsTemplate2, numLogsToSend) + + tester.updateAndTest(numLogsToSend, cfg1, "", metrics1) + tester.updateAndTest(numLogsToSend, cfg2, "", metrics2) + + for i := 0; i < 100; i++ { + tester.updateAndTest(numLogsToSend, cfg1, "", metrics1) + tester.updateAndTest(numLogsToSend, cfg2, "", metrics2) + } } func TestMetricsStageRefresh(t *testing.T) {