Skip to content

Commit

Permalink
Supporting retry on error feature for subflow activity
Browse files Browse the repository at this point in the history
  • Loading branch information
vnalawad-tibco committed Oct 10, 2024
1 parent 8ea4c88 commit 0d5fd8a
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 28 deletions.
19 changes: 4 additions & 15 deletions instance/ind_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,11 +571,8 @@ func (inst *IndependentInstance) handleTaskError(taskBehavior model.TaskBehavior
host, ok := containerInst.host.(*TaskInst)

if ok {
behavior := inst.flowModel.GetDefaultTaskBehavior()
if typeID := host.task.TypeID(); typeID != "" {
behavior = inst.flowModel.GetTaskBehavior(typeID)
}
inst.handleTaskError(behavior, host, err)
host.returnError = err
inst.scheduleEval(host)
}
} else {
taskInst.appendErrorData(err)
Expand Down Expand Up @@ -637,16 +634,8 @@ func (inst *IndependentInstance) HandleGlobalError(containerInst *Instance, err
host, ok := containerInst.host.(*TaskInst)

if ok {
behavior := inst.flowModel.GetDefaultTaskBehavior()
if typeID := host.task.TypeID(); typeID != "" {
behavior = inst.flowModel.GetTaskBehavior(typeID)
}

inst.handleTaskError(behavior, host, err)

//fail the task

//inst.scheduleEval(host)
host.returnError = err
inst.scheduleEval(host)
}
} else {
inst.returnError = err
Expand Down
8 changes: 6 additions & 2 deletions instance/taskinst.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package instance

import (
"fmt"
"github.com/project-flogo/flow/support"
"runtime/debug"
"time"

"github.com/project-flogo/flow/support"

"github.com/project-flogo/core/activity"
"github.com/project-flogo/core/data"
"github.com/project-flogo/core/data/coerce"
Expand Down Expand Up @@ -492,6 +493,9 @@ func (ti *TaskInst) PostEvalActivity() (done bool, evalErr error) {
return false, evalErr
}
}
if ti.returnError != nil {
return false, ti.returnError
}

if done {

Expand Down Expand Up @@ -615,7 +619,7 @@ func NewErrorObj(taskId string, msg string) map[string]interface{} {
return map[string]interface{}{"activity": taskId, "message": msg, "type": "unknown", "code": "", "data": nil}
}

//DEPRECATED
// DEPRECATED
type LegacyCtx struct {
task *TaskInst
}
Expand Down
42 changes: 31 additions & 11 deletions model/simple/taskbehavior.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,18 +155,38 @@ func evalActivity(ctx model.TaskContext) (bool, error) {
func (tb *TaskBehavior) PostEval(ctx model.TaskContext) (evalResult model.EvalResult, err error) {
ctx.FlowLogger().Debugf("PostEval Task '%s'", ctx.Task().ID())
_, err = ctx.PostEvalActivity()

if err != nil {
//// check if error returned is retriable
//if errVal, ok := err.(*activity.Error); ok && errVal.Retriable() {
// // check if task is configured to retry on error
// retryData, rerr := getRetryData(ctx)
// if rerr != nil {
// return model.EvalFail, rerr
// }
// if retryData.Count > 0 {
// return retryPostEval(ctx, retryData), nil
// }
//}
if _, ok := err.(*activity.Error); ok && ctx.Task().RetryOnErrConfig() != nil {
retryData, rerr := getRetryData(ctx)
if rerr != nil {
return model.EvalFail, rerr
}

if t, ok := ctx.(*instance.TaskInst); ok {
if t.GetTracingContext() != nil {
t.GetTracingContext().SetTag("retry_enabled", true)
t.GetTracingContext().SetTag("retries_remaining", retryData.Count)
t.GetTracingContext().SetTag("retry_interval_ms", retryData.Interval)
// Complete previous trace except last. Last trace will completed in the caller.
if retryData.Count > 0 {
_ = trace.GetTracer().FinishTrace(t.GetTracingContext(), err)
}
}
}

if retryData != nil && retryData.Count > 0 {
done, err := retryEval(ctx, retryData)
if err != nil {
return model.EvalFail, err
} else if done {
return model.EvalDone, nil
} else {
return model.EvalWait, nil
}
}
}
ref := activity.GetRef(ctx.Task().ActivityConfig().Activity)
ctx.FlowLogger().Errorf("Error post evaluating activity '%s'[%s] - %s", ctx.Task().ID(), ref, err.Error())
//ctx.SetStatus(model.TaskStatusFailed)
Expand Down Expand Up @@ -421,7 +441,7 @@ func getEnterCode(linkInst model.LinkInstance) int {
return 4
}

//SortTaskEntries Sort by EnterCode, keeping original order or equal elements.
// SortTaskEntries Sort by EnterCode, keeping original order or equal elements.
func SortTaskEntries(entries []*model.TaskEntry) {
sort.SliceStable(entries, func(i, j int) bool {
return entries[i].EnterCode < entries[j].EnterCode
Expand Down

0 comments on commit 0d5fd8a

Please sign in to comment.