Skip to content

Commit

Permalink
fix: avoid retrieving container logs again, when expected information…
Browse files Browse the repository at this point in the history
… is complete (#5868)
  • Loading branch information
rangoo94 authored Sep 25, 2024
1 parent 8a9a8d0 commit 4e2a0ee
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 5 deletions.
12 changes: 9 additions & 3 deletions pkg/testworkflows/testworkflowcontroller/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func getContainerLogsStream(ctx context.Context, clientSet kubernetes.Interface,
return stream, nil
}

func WatchContainerLogs(parentCtx context.Context, clientSet kubernetes.Interface, namespace, podName, containerName string, bufferSize int, isDone func() bool) <-chan ChannelMessage[ContainerLog] {
func WatchContainerLogs(parentCtx context.Context, clientSet kubernetes.Interface, namespace, podName, containerName string, bufferSize int, isDone func() bool, isLastHint func(*instructions.Instruction) bool) <-chan ChannelMessage[ContainerLog] {
ctx, ctxCancel := context.WithCancel(parentCtx)
ch := make(chan ChannelMessage[ContainerLog], bufferSize)
var mu sync.Mutex
Expand Down Expand Up @@ -253,6 +253,7 @@ func WatchContainerLogs(parentCtx context.Context, clientSet kubernetes.Interfac
readerAnyContent := false
tsReader := newTimestampReader()
lastTs := time.Now()
completed := false

hasNewLine := false

Expand Down Expand Up @@ -286,8 +287,10 @@ func WatchContainerLogs(parentCtx context.Context, clientSet kubernetes.Interfac

// If the stream is finished,
// either the logfile has been rotated, or the container actually finished.
// Assume that only if there was EOF without any logs since, the container is done.
if err == io.EOF && !readerAnyContent {
// Consider the container is done only when either:
// - there was EOF without any logs since, or
// - the last expected instruction was already delivered
if err == io.EOF && (!readerAnyContent || completed) {
return
}

Expand Down Expand Up @@ -361,6 +364,9 @@ func WatchContainerLogs(parentCtx context.Context, clientSet kubernetes.Interfac
item := ContainerLog{Time: lastTs}
if isHint {
item.Hint = instruction
if !completed && isLastHint(instruction) {
completed = true
}
} else {
item.Output = instruction
}
Expand Down
14 changes: 12 additions & 2 deletions pkg/testworkflows/testworkflowcontroller/watchinstrumentedpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/kubeshop/testkube/cmd/testworkflow-init/constants"
"github.com/kubeshop/testkube/cmd/testworkflow-init/data"
"github.com/kubeshop/testkube/cmd/testworkflow-init/instructions"
"github.com/kubeshop/testkube/pkg/api/v1/testkube"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowcontroller/watchers"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor/stage"
Expand Down Expand Up @@ -96,7 +97,7 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf
notifier.Error(fmt.Errorf("cannot read execution instructions: %v", err))
return
}
refs, _ := ExtractRefsFromActionGroup(actions)
refs, endRefs := ExtractRefsFromActionGroup(actions)
initialRefs := make([]string, len(actions))
for i := range refs {
for j := range refs[i] {
Expand All @@ -119,6 +120,12 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf
aborted := false
container := fmt.Sprintf("%d", containerIndex+1)

// Determine the last ref in this container, so we can confirm that the logs have been read until end
lastRef := endRefs[containerIndex][len(endRefs[containerIndex])-1]
if lastRef == "" && len(endRefs[containerIndex]) > 1 {
lastRef = endRefs[containerIndex][len(endRefs[containerIndex])-2]
}

// Wait until the Container is started
currentPodEventsIndex = 0
for ok := true; ok; _, ok = <-updatesCh {
Expand Down Expand Up @@ -148,10 +155,13 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf
lastStarted = refs[containerIndex][0]

// Read the Container logs
isLastHint := func(hint *instructions.Instruction) bool {
return hint != nil && hint.Ref == lastRef && hint.Name == constants.InstructionEnd
}
isDone := func() bool {
return opts.DisableFollow || watcher.State().ContainerFinished(container) || watcher.State().Completed()
}
for v := range WatchContainerLogs(ctx, clientSet, watcher.State().Namespace(), watcher.State().PodName(), container, 10, isDone) {
for v := range WatchContainerLogs(ctx, clientSet, watcher.State().Namespace(), watcher.State().PodName(), container, 10, isDone, isLastHint) {
if v.Error != nil {
notifier.Error(v.Error)
continue
Expand Down

0 comments on commit 4e2a0ee

Please sign in to comment.