From 7bf5315277c20b4b859cffa7169eca542e29ddf6 Mon Sep 17 00:00:00 2001 From: hainenber Date: Sat, 17 Feb 2024 15:16:39 +0700 Subject: [PATCH 1/6] fix(loki/src/k8s_events): correctly update health state for the component during operation Signed-off-by: hainenber --- CHANGELOG.md | 3 ++ .../kubernetes_events/kubernetes_events.go | 40 +++++++++++++++++-- 2 files changed, 40 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b9988a7fe3f2..e2cf98c623cd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -135,6 +135,9 @@ v0.40.0 (2024-02-27) - Fix issue where registry was not being properly deleted. (@mattdurham) +- Fix bug where `loki.source.kubernetes_events` unable to register as unhealthy + when there are failures for underlying informers. (@hainenber) + ### Other changes - Removed support for Windows 2012 in line with Microsoft end of life. (@mattdurham) diff --git a/component/loki/source/kubernetes_events/kubernetes_events.go b/component/loki/source/kubernetes_events/kubernetes_events.go index e06247f05b27..54dac08a9a9e 100644 --- a/component/loki/source/kubernetes_events/kubernetes_events.go +++ b/component/loki/source/kubernetes_events/kubernetes_events.go @@ -95,11 +95,15 @@ type Component struct { receiversMut sync.RWMutex receivers []loki.LogsReceiver + + healthMut sync.RWMutex + health component.Health } var ( - _ component.Component = (*Component)(nil) - _ component.DebugComponent = (*Component)(nil) + _ component.Component = (*Component)(nil) + _ component.DebugComponent = (*Component)(nil) + _ component.HealthComponent = (*Component)(nil) ) // New creates a new loki.source.kubernetes_events component. @@ -154,6 +158,7 @@ func (c *Component) Run(ctx context.Context) error { c.tasksMut.RUnlock() if err := c.runner.ApplyTasks(ctx, tasks); err != nil { + c.setHealth(err) level.Error(c.log).Log("msg", "failed to apply event watchers", "err", err) } } @@ -182,7 +187,10 @@ func (c *Component) Run(ctx context.Context) error { cancel() }) - return rg.Run() + err := rg.Run() + c.setHealth(err) + + return err } // Update implements component.Component. @@ -257,3 +265,29 @@ func (c *Component) DebugInfo() interface{} { } return info } + +// CurrentHealth implements component.HealthComponent +func (c *Component) CurrentHealth() component.Health { + c.healthMut.RLock() + defer c.healthMut.RUnlock() + return c.health +} + +func (c *Component) setHealth(err error) { + c.healthMut.Lock() + defer c.healthMut.Unlock() + + if err == nil { + c.health = component.Health{ + Health: component.HealthTypeHealthy, + Message: "component is ready", + UpdateTime: time.Now(), + } + } else { + c.health = component.Health{ + Health: component.HealthTypeUnhealthy, + Message: fmt.Sprintf("component encounters error: %s", err), + UpdateTime: time.Now(), + } + } +} From c90ec1b2443b9ce9cf04fce913f86fe41bf5eef2 Mon Sep 17 00:00:00 2001 From: hainenber Date: Mon, 19 Feb 2024 23:36:59 +0700 Subject: [PATCH 2/6] feat(pkg/runner): change runner's interface to return error Signed-off-by: hainenber --- component/loki/source/docker/runner.go | 6 +++--- component/loki/source/kubernetes/kubetail/tailer.go | 8 ++++++-- .../loki/source/kubernetes_events/event_controller.go | 5 ++++- pkg/runner/runner.go | 2 +- pkg/runner/runner_test.go | 4 +++- 5 files changed, 17 insertions(+), 8 deletions(-) diff --git a/component/loki/source/docker/runner.go b/component/loki/source/docker/runner.go index c010a02feef2..069bd2d9f2b5 100644 --- a/component/loki/source/docker/runner.go +++ b/component/loki/source/docker/runner.go @@ -94,7 +94,7 @@ func newTailer(l log.Logger, task *tailerTask) *tailer { } } -func (t *tailer) Run(ctx context.Context) { +func (t *tailer) Run(ctx context.Context) error { ch, chErr := t.opts.client.ContainerWait(ctx, t.target.Name(), container.WaitConditionNextExit) t.target.StartIfNotRunning() @@ -108,10 +108,10 @@ func (t *tailer) Run(ctx context.Context) { // refresh. level.Error(t.log).Log("msg", "could not set up a wait request to the Docker client", "error", err) t.target.Stop() - return + return err case <-ch: t.target.Stop() - return + return nil } } diff --git a/component/loki/source/kubernetes/kubetail/tailer.go b/component/loki/source/kubernetes/kubetail/tailer.go index 6ea2a2d9b3fb..ba0e10e79512 100644 --- a/component/loki/source/kubernetes/kubetail/tailer.go +++ b/component/loki/source/kubernetes/kubetail/tailer.go @@ -89,7 +89,7 @@ var retailBackoff = backoff.Config{ MaxBackoff: time.Minute, } -func (t *tailer) Run(ctx context.Context) { +func (t *tailer) Run(ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -111,18 +111,22 @@ func (t *tailer) Run(ctx context.Context) { terminated, err := t.containerTerminated(ctx) if terminated { // The container shut down and won't come back; we can stop tailing it. - return + return nil } else if err != nil { level.Warn(t.log).Log("msg", "could not determine if container terminated; will retry tailing", "err", err) + return err } } if err != nil { t.target.Report(time.Now().UTC(), err) level.Warn(t.log).Log("msg", "tailer stopped; will retry", "err", err) + return err } bo.Wait() } + + return nil } func (t *tailer) tail(ctx context.Context, handler loki.EntryHandler) error { diff --git a/component/loki/source/kubernetes_events/event_controller.go b/component/loki/source/kubernetes_events/event_controller.go index 0f45c3dc62ae..15857ff384ec 100644 --- a/component/loki/source/kubernetes_events/event_controller.go +++ b/component/loki/source/kubernetes_events/event_controller.go @@ -79,7 +79,7 @@ func newEventController(task eventControllerTask) *eventController { } } -func (ctrl *eventController) Run(ctx context.Context) { +func (ctrl *eventController) Run(ctx context.Context) error { defer ctrl.handler.Stop() level.Info(ctrl.log).Log("msg", "watching events for namespace", "namespace", ctrl.task.Namespace) @@ -87,7 +87,10 @@ func (ctrl *eventController) Run(ctx context.Context) { if err := ctrl.runError(ctx); err != nil { level.Error(ctrl.log).Log("msg", "event watcher exited with error", "err", err) + return err } + + return nil } func (ctrl *eventController) runError(ctx context.Context) error { diff --git a/pkg/runner/runner.go b/pkg/runner/runner.go index a61590f46a61..b949995bce36 100644 --- a/pkg/runner/runner.go +++ b/pkg/runner/runner.go @@ -32,7 +32,7 @@ type Worker interface { // Run starts a Worker, blocking until the provided ctx is canceled or a // fatal error occurs. Run is guaranteed to be called exactly once for any // given Worker. - Run(ctx context.Context) + Run(ctx context.Context) error } // The Runner manages a set of running Workers based on an active set of tasks. diff --git a/pkg/runner/runner_test.go b/pkg/runner/runner_test.go index ea06dcff80ca..726093cbf47c 100644 --- a/pkg/runner/runner_test.go +++ b/pkg/runner/runner_test.go @@ -101,9 +101,11 @@ type genericWorker struct { var _ runner.Worker = (*genericWorker)(nil) -func (w *genericWorker) Run(ctx context.Context) { +func (w *genericWorker) Run(ctx context.Context) error { w.workerCount.Inc() defer w.workerCount.Dec() <-ctx.Done() + + return nil } From 8e7825dc688c08c1489bf47741dbe6fa42b0f2dd Mon Sep 17 00:00:00 2001 From: hainenber Date: Mon, 19 Feb 2024 23:38:44 +0700 Subject: [PATCH 3/6] feat(pkg/runner): collect worker errors + a method to get them Signed-off-by: hainenber --- pkg/runner/runner.go | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/pkg/runner/runner.go b/pkg/runner/runner.go index b949995bce36..4234f7522c5f 100644 --- a/pkg/runner/runner.go +++ b/pkg/runner/runner.go @@ -42,8 +42,10 @@ type Runner[TaskType Task] struct { ctx context.Context cancel context.CancelFunc - running sync.WaitGroup - workers *hashMap + running sync.WaitGroup + workers *hashMap + workerErrs []error + workerErrMut sync.RWMutex } // Internal types used to implement the Runner. @@ -163,7 +165,12 @@ func (s *Runner[TaskType]) ApplyTasks(ctx context.Context, tt []TaskType) error go func() { defer s.running.Done() defer close(newWorker.Exited) - newWorker.Worker.Run(workerCtx) + // Gather error encountered by worker when running + if err := newWorker.Worker.Run(workerCtx); err != nil { + s.workerErrMut.Lock() + s.workerErrs = append(s.workerErrs, err) + s.workerErrMut.Unlock() + } }() _ = s.workers.Add(newTask) @@ -203,3 +210,10 @@ func (s *Runner[TaskType]) Stop() { s.cancel() s.running.Wait() } + +// GetWorkerErrors returns errors encountered by workers when they run assigned tasks +func (s *Runner[TaskType]) GetWorkerErrors() []error { + s.workerErrMut.RLock() + defer s.workerErrMut.RUnlock() + return s.workerErrs +} From 6152c7b3b237616a58a21fe20b03f3ccbc07c128 Mon Sep 17 00:00:00 2001 From: hainenber Date: Mon, 19 Feb 2024 23:39:11 +0700 Subject: [PATCH 4/6] feat(loki/src/k8s_events): check on bubbled up errors to set health accordingly Signed-off-by: hainenber --- .../loki/source/kubernetes_events/kubernetes_events.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/component/loki/source/kubernetes_events/kubernetes_events.go b/component/loki/source/kubernetes_events/kubernetes_events.go index 54dac08a9a9e..ca1f0684808b 100644 --- a/component/loki/source/kubernetes_events/kubernetes_events.go +++ b/component/loki/source/kubernetes_events/kubernetes_events.go @@ -161,6 +161,16 @@ func (c *Component) Run(ctx context.Context) error { c.setHealth(err) level.Error(c.log).Log("msg", "failed to apply event watchers", "err", err) } + + // Check on bubbled up errors encountered by the workers when running applied tasks + // and set component health accordingly + appliedTaskErrorString := "" + for _, err := range c.runner.GetWorkerErrors() { + appliedTaskErrorString += err.Error() + "\n" + } + if appliedTaskErrorString != "" { + c.setHealth(fmt.Errorf(appliedTaskErrorString)) + } } } }, func(_ error) { From 61693df00b01ffafa66e83c9b8fd79a9b34bd95f Mon Sep 17 00:00:00 2001 From: hainenber Date: Thu, 29 Feb 2024 23:43:28 +0700 Subject: [PATCH 5/6] fix(loki/src/k8s_events): revert interface change + add background actor to set health based on applied task result Signed-off-by: hainenber --- component/loki/source/docker/runner.go | 6 ++-- .../loki/source/kubernetes/kubetail/tailer.go | 8 ++---- .../kubernetes_events/event_controller.go | 20 ++++++++++--- .../kubernetes_events/kubernetes_events.go | 23 ++++++++++++--- pkg/runner/runner.go | 28 ++++++------------- pkg/runner/runner_test.go | 4 +-- 6 files changed, 49 insertions(+), 40 deletions(-) diff --git a/component/loki/source/docker/runner.go b/component/loki/source/docker/runner.go index 069bd2d9f2b5..c010a02feef2 100644 --- a/component/loki/source/docker/runner.go +++ b/component/loki/source/docker/runner.go @@ -94,7 +94,7 @@ func newTailer(l log.Logger, task *tailerTask) *tailer { } } -func (t *tailer) Run(ctx context.Context) error { +func (t *tailer) Run(ctx context.Context) { ch, chErr := t.opts.client.ContainerWait(ctx, t.target.Name(), container.WaitConditionNextExit) t.target.StartIfNotRunning() @@ -108,10 +108,10 @@ func (t *tailer) Run(ctx context.Context) error { // refresh. level.Error(t.log).Log("msg", "could not set up a wait request to the Docker client", "error", err) t.target.Stop() - return err + return case <-ch: t.target.Stop() - return nil + return } } diff --git a/component/loki/source/kubernetes/kubetail/tailer.go b/component/loki/source/kubernetes/kubetail/tailer.go index ba0e10e79512..6ea2a2d9b3fb 100644 --- a/component/loki/source/kubernetes/kubetail/tailer.go +++ b/component/loki/source/kubernetes/kubetail/tailer.go @@ -89,7 +89,7 @@ var retailBackoff = backoff.Config{ MaxBackoff: time.Minute, } -func (t *tailer) Run(ctx context.Context) error { +func (t *tailer) Run(ctx context.Context) { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -111,22 +111,18 @@ func (t *tailer) Run(ctx context.Context) error { terminated, err := t.containerTerminated(ctx) if terminated { // The container shut down and won't come back; we can stop tailing it. - return nil + return } else if err != nil { level.Warn(t.log).Log("msg", "could not determine if container terminated; will retry tailing", "err", err) - return err } } if err != nil { t.target.Report(time.Now().UTC(), err) level.Warn(t.log).Log("msg", "tailer stopped; will retry", "err", err) - return err } bo.Wait() } - - return nil } func (t *tailer) tail(ctx context.Context, handler loki.EntryHandler) error { diff --git a/component/loki/source/kubernetes_events/event_controller.go b/component/loki/source/kubernetes_events/event_controller.go index 15857ff384ec..cb1d12fa0e5a 100644 --- a/component/loki/source/kubernetes_events/event_controller.go +++ b/component/loki/source/kubernetes_events/event_controller.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "strings" + "sync" "time" "github.com/cespare/xxhash/v2" @@ -58,6 +59,9 @@ type eventController struct { positionsKey string initTimestamp time.Time + + taskErr error + taskErrMut sync.RWMutex } func newEventController(task eventControllerTask) *eventController { @@ -79,18 +83,20 @@ func newEventController(task eventControllerTask) *eventController { } } -func (ctrl *eventController) Run(ctx context.Context) error { +func (ctrl *eventController) Run(ctx context.Context) { defer ctrl.handler.Stop() level.Info(ctrl.log).Log("msg", "watching events for namespace", "namespace", ctrl.task.Namespace) defer level.Info(ctrl.log).Log("msg", "stopping watcher for events", "namespace", ctrl.task.Namespace) - if err := ctrl.runError(ctx); err != nil { + err := ctrl.runError(ctx) + if err != nil { level.Error(ctrl.log).Log("msg", "event watcher exited with error", "err", err) - return err } - return nil + ctrl.taskErrMut.Lock() + ctrl.taskErr = err + ctrl.taskErrMut.Unlock() } func (ctrl *eventController) runError(ctx context.Context) error { @@ -346,6 +352,12 @@ func (ctrl *eventController) DebugInfo() controllerInfo { } } +func (ctrl *eventController) GetTaskError() error { + ctrl.taskErrMut.RLock() + defer ctrl.taskErrMut.RUnlock() + return ctrl.taskErr +} + type controllerInfo struct { Namespace string `river:"namespace,attr"` LastTimestamp time.Time `river:"last_event_timestamp,attr"` diff --git a/component/loki/source/kubernetes_events/kubernetes_events.go b/component/loki/source/kubernetes_events/kubernetes_events.go index ca1f0684808b..1bf803371e6e 100644 --- a/component/loki/source/kubernetes_events/kubernetes_events.go +++ b/component/loki/source/kubernetes_events/kubernetes_events.go @@ -161,15 +161,30 @@ func (c *Component) Run(ctx context.Context) error { c.setHealth(err) level.Error(c.log).Log("msg", "failed to apply event watchers", "err", err) } + } + } + }, func(_ error) { + cancel() + }) - // Check on bubbled up errors encountered by the workers when running applied tasks - // and set component health accordingly + // Actor to set component health through errors from applied tasks. + ticker := time.NewTicker(500 * time.Millisecond) + rg.Add(func() error { + for { + select { + case <-ctx.Done(): + return nil + case <-ticker.C: appliedTaskErrorString := "" - for _, err := range c.runner.GetWorkerErrors() { - appliedTaskErrorString += err.Error() + "\n" + for _, worker := range c.runner.Workers() { + if taskError := worker.(*eventController).GetTaskError(); taskError != nil { + appliedTaskErrorString += taskError.Error() + "\n" + } } if appliedTaskErrorString != "" { c.setHealth(fmt.Errorf(appliedTaskErrorString)) + } else { + c.setHealth(nil) } } } diff --git a/pkg/runner/runner.go b/pkg/runner/runner.go index 4234f7522c5f..7f910c45105e 100644 --- a/pkg/runner/runner.go +++ b/pkg/runner/runner.go @@ -32,7 +32,7 @@ type Worker interface { // Run starts a Worker, blocking until the provided ctx is canceled or a // fatal error occurs. Run is guaranteed to be called exactly once for any // given Worker. - Run(ctx context.Context) error + Run(ctx context.Context) } // The Runner manages a set of running Workers based on an active set of tasks. @@ -42,10 +42,8 @@ type Runner[TaskType Task] struct { ctx context.Context cancel context.CancelFunc - running sync.WaitGroup - workers *hashMap - workerErrs []error - workerErrMut sync.RWMutex + running sync.WaitGroup + workers *hashMap } // Internal types used to implement the Runner. @@ -65,8 +63,9 @@ type ( // workerTask implements Task for it to be used in a hashMap; two workerTasks // are equal if their underlying Tasks are equal. workerTask struct { - Worker *scheduledWorker - Task Task + Worker *scheduledWorker + Task Task + TaskErr error } ) @@ -165,12 +164,8 @@ func (s *Runner[TaskType]) ApplyTasks(ctx context.Context, tt []TaskType) error go func() { defer s.running.Done() defer close(newWorker.Exited) - // Gather error encountered by worker when running - if err := newWorker.Worker.Run(workerCtx); err != nil { - s.workerErrMut.Lock() - s.workerErrs = append(s.workerErrs, err) - s.workerErrMut.Unlock() - } + // Gather error encountered by worker when running the defined task. + newWorker.Worker.Run(workerCtx) }() _ = s.workers.Add(newTask) @@ -210,10 +205,3 @@ func (s *Runner[TaskType]) Stop() { s.cancel() s.running.Wait() } - -// GetWorkerErrors returns errors encountered by workers when they run assigned tasks -func (s *Runner[TaskType]) GetWorkerErrors() []error { - s.workerErrMut.RLock() - defer s.workerErrMut.RUnlock() - return s.workerErrs -} diff --git a/pkg/runner/runner_test.go b/pkg/runner/runner_test.go index 726093cbf47c..ea06dcff80ca 100644 --- a/pkg/runner/runner_test.go +++ b/pkg/runner/runner_test.go @@ -101,11 +101,9 @@ type genericWorker struct { var _ runner.Worker = (*genericWorker)(nil) -func (w *genericWorker) Run(ctx context.Context) error { +func (w *genericWorker) Run(ctx context.Context) { w.workerCount.Inc() defer w.workerCount.Dec() <-ctx.Done() - - return nil } From 91abff7829bff7eff6b5cf6181c5b32bd2690877 Mon Sep 17 00:00:00 2001 From: hainenber Date: Thu, 29 Feb 2024 23:48:26 +0700 Subject: [PATCH 6/6] chore: remove codes from previous approach Signed-off-by: hainenber --- pkg/runner/runner.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pkg/runner/runner.go b/pkg/runner/runner.go index 7f910c45105e..a61590f46a61 100644 --- a/pkg/runner/runner.go +++ b/pkg/runner/runner.go @@ -63,9 +63,8 @@ type ( // workerTask implements Task for it to be used in a hashMap; two workerTasks // are equal if their underlying Tasks are equal. workerTask struct { - Worker *scheduledWorker - Task Task - TaskErr error + Worker *scheduledWorker + Task Task } ) @@ -164,7 +163,6 @@ func (s *Runner[TaskType]) ApplyTasks(ctx context.Context, tt []TaskType) error go func() { defer s.running.Done() defer close(newWorker.Exited) - // Gather error encountered by worker when running the defined task. newWorker.Worker.Run(workerCtx) }()