Skip to content

Commit

Permalink
fix: [TKC-2561] do not merge
Browse files Browse the repository at this point in the history
  • Loading branch information
povilasv committed Oct 14, 2024
1 parent 2847b2f commit 09147b7
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 1 deletion.
4 changes: 4 additions & 0 deletions cmd/kubectl-testkube/commands/testworkflows/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,11 @@ func NewRunTestWorkflowCmd() *cobra.Command {
if outputPretty {
ui.NL()
if watchEnabled {
fmt.Println("uiWatch")
exitCode = uiWatch(execution, client)
ui.NL()
if downloadArtifactsEnabled {
fmt.Println("downlaod artifact")
tests.DownloadTestWorkflowArtifacts(execution.Id, downloadDir, format, masks, client, outputPretty)
}
} else {
Expand Down Expand Up @@ -130,7 +132,9 @@ func NewRunTestWorkflowCmd() *cobra.Command {
}

func uiWatch(execution testkube.TestWorkflowExecution, client apiclientv1.Client) int {
fmt.Println("watch logs")
result, err := watchTestWorkflowLogs(execution.Id, execution.Signature, client)
fmt.Println("watch logs done", err, result)
ui.ExitOnError("reading test workflow execution logs", err)

// Apply the result in the execution
Expand Down
7 changes: 6 additions & 1 deletion internal/app/api/v1/testworkflowexecutions.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,20 +456,25 @@ func (s *TestkubeAPI) GetTestWorkflowNotificationsStream(ctx context.Context, ex
// Check for the logs
ctrl, err := testworkflowcontroller.New(ctx, s.Clientset, execution.GetNamespace(s.Namespace), execution.Id, execution.ScheduledAt)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "failed to create test workflow controller")
}

fmt.Println("GetTestWorkflowNotificationsStream", executionID)
// Stream the notifications
ch := make(chan testkube.TestWorkflowExecutionNotification)
go func() {
for n := range ctrl.Watch(ctx) {
if n.Error == nil {
ch <- n.Value.ToInternal()
} else {
s.Log.Errorw("failed to watch logs", "error", n.Error)
}
}
ctrl.StopController()
close(ch)
}()
fmt.Println("GetTestWorkflowNotificationsStream done", executionID)

return ch, nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/agent/testworkflows.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func (ag *Agent) executeWorkflowNotificationsRequest(ctx context.Context, req *c
}
}
if err != nil {
ag.logger.Errorf("error executing workflow notifications request: %s", err.Error())
message := fmt.Sprintf("cannot get pod logs: %s", err.Error())
ag.testWorkflowNotificationsResponseBuffer <- &cloud.TestWorkflowNotificationsResponse{
StreamId: req.StreamId,
Expand Down
3 changes: 3 additions & 0 deletions pkg/api/v1/client/direct_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ func (t DirectClient[A]) GetLogsV2(uri string, logs chan events.Log) error {

// GetTestWorkflowExecutionNotifications returns logs stream from job pods, based on job pods logs
func (t DirectClient[A]) GetTestWorkflowExecutionNotifications(uri string, notifications chan testkube.TestWorkflowExecutionNotification) error {

fmt.Println("get", uri)
req, err := http.NewRequest(http.MethodGet, uri, nil)
if err != nil {
return err
Expand All @@ -234,6 +236,7 @@ func (t DirectClient[A]) GetTestWorkflowExecutionNotifications(uri string, notif
defer close(notifications)
defer resp.Body.Close()

fmt.Println("StreamToTestWorkflowExecutionNotificationsChannel")
StreamToTestWorkflowExecutionNotificationsChannel(resp.Body, notifications)
}()

Expand Down
1 change: 1 addition & 0 deletions pkg/api/v1/client/testworkflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func (c TestWorkflowClient) ExecuteTestWorkflow(name string, request testkube.Te
func (c TestWorkflowClient) GetTestWorkflowExecutionNotifications(id string) (notifications chan testkube.TestWorkflowExecutionNotification, err error) {
notifications = make(chan testkube.TestWorkflowExecutionNotification)
uri := c.testWorkflowTransport.GetURI("/test-workflow-executions/%s/notifications", id)
fmt.Println("uri", uri)
err = c.testWorkflowTransport.GetTestWorkflowExecutionNotifications(uri, notifications)
return notifications, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf

// Stop immediately after the operation is canceled
if ctx.Err() != nil {
fmt.Println("Context error 1", ctx.Err(), context.Cause(ctx))
return
}

Expand Down Expand Up @@ -148,6 +149,7 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf

// Stop immediately after the operation is canceled
if ctx.Err() != nil {
fmt.Println("Context error 2", ctx.Err(), context.Cause(ctx))
return
}

Expand Down Expand Up @@ -185,6 +187,7 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf

// Stop immediately after the operation is canceled
if ctx.Err() != nil {
fmt.Println("Context error 3", ctx.Err(), context.Cause(ctx))
return
}

Expand All @@ -198,6 +201,7 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf

// Stop immediately after the operation is canceled
if ctx.Err() != nil {
fmt.Println("Context error 4", ctx.Err(), context.Cause(ctx))
return
}

Expand Down Expand Up @@ -240,6 +244,7 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf

// Stop immediately after the operation is canceled
if ctx.Err() != nil {
fmt.Println("Context error 5", ctx.Err(), context.Cause(ctx))
return
}

Expand Down

0 comments on commit 09147b7

Please sign in to comment.