Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DEVOPS-7803 restore log stream #2972

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions pkg/kube/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,11 +295,14 @@ func DeletePod(ctx context.Context, cli kubernetes.Interface, pod *corev1.Pod) e
return nil
}

func StreamPodLogs(ctx context.Context, cli kubernetes.Interface, namespace, podName, containerName string) (io.ReadCloser, error) {
func StreamPodLogs(ctx context.Context, cli kubernetes.Interface, namespace, podName, containerName string, sinceTime *metav1.Time) (io.ReadCloser, error) {
plo := &corev1.PodLogOptions{
Follow: true,
Container: containerName,
}
if sinceTime != nil {
plo.SinceTime = sinceTime
}
return cli.CoreV1().Pods(namespace).GetLogs(podName, plo).Stream(ctx)
}

Expand All @@ -319,7 +322,7 @@ func GetPodLogs(ctx context.Context, cli kubernetes.Interface, namespace, podNam

// getErrorFromLogs fetches logs from pod and constructs error containing last ten lines of log and specified error message
func getErrorFromLogs(ctx context.Context, cli kubernetes.Interface, namespace, podName, containerName string, err error, errorMessage string) error {
r, logErr := StreamPodLogs(ctx, cli, namespace, podName, containerName)
r, logErr := StreamPodLogs(ctx, cli, namespace, podName, containerName, nil)
if logErr != nil {
return errors.Wrapf(logErr, "Failed to fetch logs from the pod")
}
Expand Down
61 changes: 60 additions & 1 deletion pkg/kube/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,66 @@ func (p *podController) StreamPodLogs(ctx context.Context) (io.ReadCloser, error
return nil, ErrPodControllerPodNotStarted
}

return StreamPodLogs(ctx, p.cli, p.pod.Namespace, p.pod.Name, ContainerNameFromPodOptsOrDefault(p.podOptions))
return newRestoreLogStreamReader(ctx, p.cli, p.pod.Namespace, p.pod.Name, ContainerNameFromPodOptsOrDefault(p.podOptions))
}

// newRestoreLogStreamReader creates a new restoreLogStreamReader instance which is used to stream logs from a pod.
// restoreLogStreamReader will automatically try to establish a new log stream if the current one is closed and the pod is still alive.
// This wrapper has to exist as there is hardcoded 4h timeout in kubelet for streaming logs.
func newRestoreLogStreamReader(ctx context.Context, cli kubernetes.Interface, namespace, podName, containerName string) (io.ReadCloser, error) {
reader, err := StreamPodLogs(ctx, cli, namespace, podName, containerName, nil)
if err != nil {
return nil, err
}
return &restoreLogStreamReader{
ctx: ctx,
cli: cli,
namespace: namespace,
podName: podName,
containerName: containerName,
reader: reader,
}, nil
}

type restoreLogStreamReader struct {
ctx context.Context
cli kubernetes.Interface
namespace string
podName string
containerName string
reader io.ReadCloser
lastReadTime metav1.Time
}

func (s *restoreLogStreamReader) Read(p []byte) (n int, err error) {
n, err = s.reader.Read(p)
defer func() { s.lastReadTime = metav1.Now() }()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like we take the timestamp after the stream close. My main concern in #2903 (comment) was that if there was a log at the time of the stream close we might lose some records.
Did you have a way to reproduce the failure so we can have at least empirical proof that this approach works?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there is a way to reproduce it, and I did it. Unfortunately, each test takes 4 hours as I just run an action from a blueprint that uses a long sleep and echo.

This approach in manual tests always resulted in a duplicated last line of the log, which was independent of the frequency of the log. The last line was repeated even if it was the only line since 4 hours.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. Should we then add some deduplication for that case (if we can)?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deduplication would be quite an overhead for what we are trying to achieve. This problem manifests only after 4 hours, and a single duplicated line for tasks that take over 4 hours, in my opinion, doesn't justify the need to add deduplication logic. This logic would require storing the last read line in memory and comparing it after reconnecting. Additionally, we should not assume that our users' logs distinguish between lines, which would complicate such a simple comparison.

Therefore, in my opinion, we should accept this single duplicated line that occurs on rare occasions.

if err == io.EOF {
pod, err := s.cli.CoreV1().Pods(s.namespace).Get(s.ctx, s.podName, metav1.GetOptions{})
if err != nil {
return 0, err
}

if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed {
return n, io.EOF
}

err = s.reader.Close()
if err != nil {
return 0, err
}
s.reader, err = StreamPodLogs(s.ctx, s.cli, s.namespace, s.podName, s.containerName, &s.lastReadTime)
if err != nil {
return 0, err
}

return s.reader.Read(p)
hairyhum marked this conversation as resolved.
Show resolved Hide resolved
}
return n, err
}

func (s *restoreLogStreamReader) Close() error {
return s.reader.Close()
}

// GetCommandExecutor returns PodCommandExecutor instance which is configured to execute commands within pod controlled
Expand Down