Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ProcessWorkflowTask is not stopped on worker.Stop() #1706

Open
ndtretyak opened this issue Nov 8, 2024 · 3 comments · May be fixed by #1707
Open

ProcessWorkflowTask is not stopped on worker.Stop() #1706

ndtretyak opened this issue Nov 8, 2024 · 3 comments · May be fixed by #1707

Comments

@ndtretyak
Copy link
Contributor

ndtretyak commented Nov 8, 2024

Expected Behavior

ProcessWorkflowTask should stop

Actual Behavior

A WorkflowTask started by a stopped worker continues to send heartbeats, blocking further workflow execution.

Steps to Reproduce the Problem

The following code is based on the server's per-namespace worker implementation. It starts the worker, stops it, and then starts it again using the same client.

The workflow executes a series of local activities, similar to the server's Scheduler workflow.

If the worker is stopped during the execution of a local activity, we will see a "canceled" error in the logs:

2024/11/08 10:15:49 ERROR Activity failed

However, this error is ignored by the workflow, which will continue processing the current WorkflowTask and will schedule another local activity. This activity will not be executed since the worker is stopped. ProcessWorkflowTask will remain active, though, and will keep sending heartbeats until either a network timeout occurs or the history size limit is reached.

Heartbeats can be seen in the logs:

2024/11/08 10:16:28 DEBUG Force RespondWorkflowTaskCompleted
package main

import (
	"context"
	"fmt"
	"time"

	"go.temporal.io/api/enums/v1"
	"go.temporal.io/sdk/client"
	"go.temporal.io/sdk/worker"
	"go.temporal.io/sdk/workflow"
)

const (
	hostPort   = "localhost:7233"
	namespace  = "default"
	taskQueue  = "default"
	workflowID = "test-workflow"

	sleepDuration = 100 * time.Millisecond
	nActivities   = 10
)

type Worker struct {
	parentClient client.Client
	client       client.Client
	worker       worker.Worker
}

func NewWorker(c client.Client) *Worker {
	return &Worker{parentClient: c}
}

func (w *Worker) Start() error {
	c, err := newTemporalClientFromExisting(w.parentClient)
	if err != nil {
		return fmt.Errorf("create client: %w", err)
	}
	tw := newTemporalWorker(c)
	err = tw.Start()
	if err != nil {
		return fmt.Errorf("start worker: %w", err)
	}
	w.client = c
	w.worker = tw
	return nil
}

func (w *Worker) Stop() {
	w.worker.Stop()
	w.client.Close()
}

func newTemporalClient() (client.Client, error) {
	return client.Dial(client.Options{
		HostPort:  hostPort,
		Namespace: namespace,
	})
}

func newTemporalClientFromExisting(c client.Client) (client.Client, error) {
	return client.NewClientFromExisting(c, client.Options{})
}

func newTemporalWorker(c client.Client) worker.Worker {
	w := worker.New(c, taskQueue, worker.Options{})
	w.RegisterWorkflow(Workflow)
	w.RegisterActivity(Activity)
	return w
}

func Workflow(ctx workflow.Context) error {
	for {
		for range nActivities {
			ctx := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{
				StartToCloseTimeout: time.Minute,
			})
			err := workflow.ExecuteLocalActivity(ctx, Activity).Get(ctx, nil)
			if err != nil {
				workflow.GetLogger(ctx).Error("Activity failed.", "Error", err)
			}
		}
		workflow.Sleep(ctx, sleepDuration)
		if workflow.GetInfo(ctx).GetContinueAsNewSuggested() {
			return workflow.NewContinueAsNewError(ctx, Workflow)
		}
	}
}

func Activity(ctx context.Context) error {
	time.Sleep(sleepDuration)
	return ctx.Err()
}

func startWorkflow(ctx context.Context, c client.Client) error {
	opts := client.StartWorkflowOptions{
		ID:                    workflowID,
		TaskQueue:             taskQueue,
		WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING,
	}
	_, err := c.ExecuteWorkflow(ctx, opts, Workflow)
	return err
}

func main() {
	ctx := context.Background()
	c, err := newTemporalClient()
	if err != nil {
		fmt.Printf("Error creating client: %v\n", err)
	}

	err = startWorkflow(ctx, c)
	if err != nil {
		fmt.Printf("Error starting workflow: %v\n", err)
		return
	}
	fmt.Println("Workflow started.")

	w := NewWorker(c)
	for {
		err = w.Start()
		if err != nil {
			fmt.Println("Error starting worker:", err)
			return
		}
		time.Sleep(33 * sleepDuration)
		w.Stop()
	}
}

Specifications

  • Version:
  • Platform:
@ndtretyak ndtretyak linked a pull request Nov 12, 2024 that will close this issue
@cretz
Copy link
Member

cretz commented Nov 12, 2024

I am not sure we want workflow heartbeating to stop just because poller stopped. This is used by long-running local activities. Just because poller stopped doesn't mean we want to interrupt existing tasks (which can include local activity retries if the policy wants it). A worker stopping still allows all workflow and activity tasks to complete.

If the local activity worker is already stopped, then yes there may be some other issue concerning not properly waiting for local activity to complete on worker shutdown.

@ndtretyak
Copy link
Contributor Author

ndtretyak commented Nov 12, 2024

As I understand, there are at least three ways for the local activity task to "hang forever"

  1. If the task is created after laTunnel (which uses the same stop channel as the worker) has already closed, then sendTask will return false, leaving the local activity task unstarted.

if !w.laTunnel.sendTask(task) {

  1. If the task is created before laTunnel closes, but after the local activity worker has already stopped, the task is sent to taskCh, but no one is listening. Additionally, this select might choose to send even if stopCh has already closed.

func (lat *localActivityTunnel) sendTask(task *localActivityTask) bool {
select {
case lat.taskCh <- task:
return true
case <-lat.stopCh:
return false
}
}

  1. If the task is polled by the local activity worker just before worker.Stop() is called, it will be dropped.

func (latp *localActivityTaskPoller) ProcessTask(task interface{}) error {
if latp.stopping() {
return errStop
}

Since the local activity worker is stopped first, I thought it would be fine to simply stop sending heartbeats.

@cretz
Copy link
Member

cretz commented Nov 12, 2024

Hrmm, I suspect we may be prematurely stopping the local activity worker before its tasks are done. Let me confer with team, but I agree if there's no need to continue to do workflow task heartbeating if the local activity is not running.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants