From 26e233fb0295c565c212d9cbc500e278f24da6bc Mon Sep 17 00:00:00 2001 From: Wojciech Chojnowski Date: Wed, 22 May 2024 16:10:00 +0200 Subject: [PATCH] DEVOPS-7803 restore log stream Re-establish connection while the pod is still running --- pkg/kube/pod.go | 7 +++-- pkg/kube/pod_controller.go | 61 +++++++++++++++++++++++++++++++++++++- 2 files changed, 65 insertions(+), 3 deletions(-) diff --git a/pkg/kube/pod.go b/pkg/kube/pod.go index 38086e3cef..ce86d4eb05 100644 --- a/pkg/kube/pod.go +++ b/pkg/kube/pod.go @@ -295,11 +295,14 @@ func DeletePod(ctx context.Context, cli kubernetes.Interface, pod *corev1.Pod) e return nil } -func StreamPodLogs(ctx context.Context, cli kubernetes.Interface, namespace, podName, containerName string) (io.ReadCloser, error) { +func StreamPodLogs(ctx context.Context, cli kubernetes.Interface, namespace, podName, containerName string, sinceTime *metav1.Time) (io.ReadCloser, error) { plo := &corev1.PodLogOptions{ Follow: true, Container: containerName, } + if sinceTime != nil { + plo.SinceTime = sinceTime + } return cli.CoreV1().Pods(namespace).GetLogs(podName, plo).Stream(ctx) } @@ -319,7 +322,7 @@ func GetPodLogs(ctx context.Context, cli kubernetes.Interface, namespace, podNam // getErrorFromLogs fetches logs from pod and constructs error containing last ten lines of log and specified error message func getErrorFromLogs(ctx context.Context, cli kubernetes.Interface, namespace, podName, containerName string, err error, errorMessage string) error { - r, logErr := StreamPodLogs(ctx, cli, namespace, podName, containerName) + r, logErr := StreamPodLogs(ctx, cli, namespace, podName, containerName, nil) if logErr != nil { return errors.Wrapf(logErr, "Failed to fetch logs from the pod") } diff --git a/pkg/kube/pod_controller.go b/pkg/kube/pod_controller.go index cd9aa296a4..be38df33a8 100644 --- a/pkg/kube/pod_controller.go +++ b/pkg/kube/pod_controller.go @@ -230,7 +230,66 @@ func (p *podController) StreamPodLogs(ctx context.Context) (io.ReadCloser, error return nil, ErrPodControllerPodNotStarted } - return StreamPodLogs(ctx, p.cli, p.pod.Namespace, p.pod.Name, ContainerNameFromPodOptsOrDefault(p.podOptions)) + return newRestoreLogStreamReader(ctx, p.cli, p.pod.Namespace, p.pod.Name, ContainerNameFromPodOptsOrDefault(p.podOptions)) +} + +// newRestoreLogStreamReader creates a new restoreLogStreamReader instance which is used to stream logs from a pod. +// restoreLogStreamReader will automatically try to establish a new log stream if the current one is closed and the pod is still alive. +// This wrapper has to exist as there is hardcoded 4h timeout in kubelet for streaming logs. +func newRestoreLogStreamReader(ctx context.Context, cli kubernetes.Interface, namespace, podName, containerName string) (io.ReadCloser, error) { + reader, err := StreamPodLogs(ctx, cli, namespace, podName, containerName, nil) + if err != nil { + return nil, err + } + return &restoreLogStreamReader{ + ctx: ctx, + cli: cli, + namespace: namespace, + podName: podName, + containerName: containerName, + reader: reader, + }, nil +} + +type restoreLogStreamReader struct { + ctx context.Context + cli kubernetes.Interface + namespace string + podName string + containerName string + reader io.ReadCloser + lastReadTime metav1.Time +} + +func (s *restoreLogStreamReader) Read(p []byte) (n int, err error) { + n, err = s.reader.Read(p) + defer func() { s.lastReadTime = metav1.Now() }() + if err == io.EOF { + pod, err := s.cli.CoreV1().Pods(s.namespace).Get(s.ctx, s.podName, metav1.GetOptions{}) + if err != nil { + return 0, err + } + + if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed { + return n, io.EOF + } + + err = s.reader.Close() + if err != nil { + return 0, err + } + s.reader, err = StreamPodLogs(s.ctx, s.cli, s.namespace, s.podName, s.containerName, &s.lastReadTime) + if err != nil { + return 0, err + } + + return s.reader.Read(p) + } + return n, err +} + +func (s *restoreLogStreamReader) Close() error { + return s.reader.Close() } // GetCommandExecutor returns PodCommandExecutor instance which is configured to execute commands within pod controlled