diff --git a/impl/src/main/java/io/serverlessworkflow/impl/TaskContext.java b/impl/src/main/java/io/serverlessworkflow/impl/TaskContext.java index c9e28f12..fbb77a99 100644 --- a/impl/src/main/java/io/serverlessworkflow/impl/TaskContext.java +++ b/impl/src/main/java/io/serverlessworkflow/impl/TaskContext.java @@ -16,6 +16,8 @@ package io.serverlessworkflow.impl; import com.fasterxml.jackson.databind.JsonNode; +import io.serverlessworkflow.api.types.FlowDirective; +import io.serverlessworkflow.api.types.FlowDirectiveEnum; import io.serverlessworkflow.api.types.TaskBase; public class TaskContext { @@ -26,11 +28,13 @@ public class TaskContext { private JsonNode input; private JsonNode output; private JsonNode rawOutput; + private FlowDirective flowDirective; public TaskContext(JsonNode rawInput, T task) { this.rawInput = rawInput; this.input = rawInput; this.task = task; + this.flowDirective = task.getThen(); } public void input(JsonNode input) { @@ -54,6 +58,10 @@ public void rawOutput(JsonNode output) { this.output = output; } + public JsonNode rawOutput() { + return rawOutput; + } + public void output(JsonNode output) { this.output = output; } @@ -62,7 +70,13 @@ public JsonNode output() { return output; } - public JsonNode rawOutput() { - return rawOutput; + public void flowDirective(FlowDirective flowDirective) { + this.flowDirective = flowDirective; + } + + public FlowDirective flowDirective() { + return flowDirective == null + ? new FlowDirective().withFlowDirectiveEnum(FlowDirectiveEnum.CONTINUE) + : flowDirective; } } diff --git a/impl/src/main/java/io/serverlessworkflow/impl/WorkflowContext.java b/impl/src/main/java/io/serverlessworkflow/impl/WorkflowContext.java index de88d2d0..4f0f0f16 100644 --- a/impl/src/main/java/io/serverlessworkflow/impl/WorkflowContext.java +++ b/impl/src/main/java/io/serverlessworkflow/impl/WorkflowContext.java @@ -20,24 +20,31 @@ public class WorkflowContext { private final WorkflowPosition position; - private JsonNode context; + private final WorkflowDefinition definition; private final JsonNode input; + private JsonNode current; + private JsonNode context; - private WorkflowContext(WorkflowPosition position, JsonNode input) { + private WorkflowContext( + WorkflowPosition position, WorkflowDefinition definition, JsonNode input) { this.position = position; + this.definition = definition; this.input = input; + this.current = input.deepCopy(); this.context = JsonUtils.mapper().createObjectNode(); } - public static Builder builder(JsonNode input) { - return new Builder(input); + public static Builder builder(WorkflowDefinition definition, JsonNode input) { + return new Builder(definition, input); } public static class Builder { private WorkflowPosition position = new DefaultWorkflowPosition(); + private WorkflowDefinition definition; private JsonNode input; - private Builder(JsonNode input) { + private Builder(WorkflowDefinition definition, JsonNode input) { + this.definition = definition; this.input = input; } @@ -47,7 +54,7 @@ public Builder position(WorkflowPosition position) { } public WorkflowContext build() { - return new WorkflowContext(position, input); + return new WorkflowContext(position, definition, input); } } @@ -66,4 +73,16 @@ public void context(JsonNode context) { public JsonNode rawInput() { return input; } + + public void current(JsonNode output) { + this.current = output; + } + + public JsonNode current() { + return current; + } + + public WorkflowDefinition definition() { + return definition; + } } diff --git a/impl/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java b/impl/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java index afcebeba..cd64f5f1 100644 --- a/impl/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java +++ b/impl/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java @@ -16,13 +16,10 @@ package io.serverlessworkflow.impl; import static io.serverlessworkflow.impl.WorkflowUtils.*; -import static io.serverlessworkflow.impl.json.JsonUtils.*; -import com.fasterxml.jackson.databind.JsonNode; import io.serverlessworkflow.api.types.Input; import io.serverlessworkflow.api.types.Output; import io.serverlessworkflow.api.types.TaskBase; -import io.serverlessworkflow.api.types.TaskItem; import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.impl.executors.DefaultTaskExecutorFactory; import io.serverlessworkflow.impl.executors.TaskExecutor; @@ -34,52 +31,60 @@ import io.serverlessworkflow.impl.jsonschema.SchemaValidator; import io.serverlessworkflow.impl.jsonschema.SchemaValidatorFactory; import io.serverlessworkflow.resources.DefaultResourceLoaderFactory; +import io.serverlessworkflow.resources.ResourceLoader; import io.serverlessworkflow.resources.ResourceLoaderFactory; import java.nio.file.Path; import java.util.Collection; import java.util.Collections; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; public class WorkflowDefinition { + private final Workflow workflow; + private final Collection listeners; + private Optional inputSchemaValidator = Optional.empty(); + private Optional outputSchemaValidator = Optional.empty(); + private Optional inputFilter = Optional.empty(); + private Optional outputFilter = Optional.empty(); + private final TaskExecutorFactory taskFactory; + private final ExpressionFactory exprFactory; + private final ResourceLoader resourceLoader; + private final SchemaValidatorFactory schemaValidatorFactory; + private final Map> taskExecutors = + new ConcurrentHashMap<>(); + private WorkflowDefinition( Workflow workflow, Collection listeners, - WorkflowFactories factories) { + TaskExecutorFactory taskFactory, + ResourceLoader resourceLoader, + ExpressionFactory exprFactory, + SchemaValidatorFactory schemaValidatorFactory) { this.workflow = workflow; this.listeners = listeners; - this.factories = factories; + this.taskFactory = taskFactory; + this.exprFactory = exprFactory; + this.schemaValidatorFactory = schemaValidatorFactory; + this.resourceLoader = resourceLoader; if (workflow.getInput() != null) { Input input = workflow.getInput(); this.inputSchemaValidator = getSchemaValidator( - factories.getValidatorFactory(), schemaToNode(factories, input.getSchema())); - this.inputFilter = buildWorkflowFilter(factories.getExpressionFactory(), input.getFrom()); + schemaValidatorFactory, schemaToNode(resourceLoader, input.getSchema())); + this.inputFilter = buildWorkflowFilter(exprFactory, input.getFrom()); } if (workflow.getOutput() != null) { Output output = workflow.getOutput(); this.outputSchemaValidator = getSchemaValidator( - factories.getValidatorFactory(), schemaToNode(factories, output.getSchema())); - this.outputFilter = buildWorkflowFilter(factories.getExpressionFactory(), output.getAs()); + schemaValidatorFactory, schemaToNode(resourceLoader, output.getSchema())); + this.outputFilter = buildWorkflowFilter(exprFactory, output.getAs()); } } - private final Workflow workflow; - private final Collection listeners; - private final WorkflowFactories factories; - private Optional inputSchemaValidator = Optional.empty(); - private Optional outputSchemaValidator = Optional.empty(); - private Optional inputFilter = Optional.empty(); - private Optional outputFilter = Optional.empty(); - - private final Map> taskExecutors = - new ConcurrentHashMap<>(); - public static class Builder { private final Workflow workflow; private TaskExecutorFactory taskFactory = DefaultTaskExecutorFactory.get(); @@ -127,18 +132,15 @@ public Builder withSchemaValidatorFactory(SchemaValidatorFactory factory) { } public WorkflowDefinition build() { - WorkflowDefinition def = - new WorkflowDefinition( - workflow, - listeners == null - ? Collections.emptySet() - : Collections.unmodifiableCollection(listeners), - new WorkflowFactories( - taskFactory, - resourceLoaderFactory.getResourceLoader(path), - exprFactory, - schemaValidatorFactory)); - return def; + return new WorkflowDefinition( + workflow, + listeners == null + ? Collections.emptySet() + : Collections.unmodifiableCollection(listeners), + taskFactory, + resourceLoaderFactory.getResourceLoader(path), + exprFactory, + schemaValidatorFactory); } } @@ -147,7 +149,7 @@ public static Builder builder(Workflow workflow) { } public WorkflowInstance execute(Object input) { - return new WorkflowInstance(JsonUtils.fromValue(input)); + return new WorkflowInstance(this, JsonUtils.fromValue(input)); } enum State { @@ -156,50 +158,48 @@ enum State { FINISHED }; - public class WorkflowInstance { - - private JsonNode output; - private State state; - private WorkflowContext context; - - private WorkflowInstance(JsonNode input) { - this.output = input; - inputSchemaValidator.ifPresent(v -> v.validate(input)); - this.context = WorkflowContext.builder(input).build(); - inputFilter.ifPresent(f -> output = f.apply(context, Optional.empty(), output)); - this.state = State.STARTED; - processDo(workflow.getDo()); - outputFilter.ifPresent(f -> output = f.apply(context, Optional.empty(), output)); - outputSchemaValidator.ifPresent(v -> v.validate(output)); - } + public Optional inputSchemaValidator() { + return inputSchemaValidator; + } - private void processDo(List tasks) { - context.position().addProperty("do"); - int index = 0; - for (TaskItem task : tasks) { - context.position().addIndex(++index).addProperty(task.getName()); - listeners.forEach(l -> l.onTaskStarted(context.position(), task.getTask())); - this.output = - taskExecutors - .computeIfAbsent( - context.position().jsonPointer(), - k -> factories.getTaskFactory().getTaskExecutor(task.getTask(), factories)) - .apply(context, output); - listeners.forEach(l -> l.onTaskEnded(context.position(), task.getTask())); - context.position().back().back(); - } - } + public Optional inputFilter() { + return inputFilter; + } - public State state() { - return state; - } + public Workflow workflow() { + return workflow; + } - public Object output() { - return toJavaValue(output); - } + public Collection listeners() { + return listeners; + } - public Object outputAsJsonNode() { - return output; - } + public Map> taskExecutors() { + return taskExecutors; + } + + public TaskExecutorFactory taskFactory() { + return taskFactory; + } + + public Optional outputFilter() { + return outputFilter; + } + + public Optional outputSchemaValidator() { + return outputSchemaValidator; + } + + public ExpressionFactory expressionFactory() { + return exprFactory; + } + + public SchemaValidatorFactory validatorFactory() { + return schemaValidatorFactory; + } + + public ResourceLoader resourceLoader() { + + return resourceLoader; } } diff --git a/impl/src/main/java/io/serverlessworkflow/impl/WorkflowFactories.java b/impl/src/main/java/io/serverlessworkflow/impl/WorkflowFactories.java deleted file mode 100644 index 6b0408b5..00000000 --- a/impl/src/main/java/io/serverlessworkflow/impl/WorkflowFactories.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright 2020-Present The Serverless Workflow Specification Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.serverlessworkflow.impl; - -import io.serverlessworkflow.impl.executors.TaskExecutorFactory; -import io.serverlessworkflow.impl.expressions.ExpressionFactory; -import io.serverlessworkflow.impl.jsonschema.SchemaValidatorFactory; -import io.serverlessworkflow.resources.ResourceLoader; - -public class WorkflowFactories { - - private final TaskExecutorFactory taskFactory; - private final ResourceLoader resourceLoader; - private final ExpressionFactory expressionFactory; - private final SchemaValidatorFactory validatorFactory; - - public WorkflowFactories( - TaskExecutorFactory taskFactory, - ResourceLoader resourceLoader, - ExpressionFactory expressionFactory, - SchemaValidatorFactory validatorFactory) { - this.taskFactory = taskFactory; - this.resourceLoader = resourceLoader; - this.expressionFactory = expressionFactory; - this.validatorFactory = validatorFactory; - } - - public TaskExecutorFactory getTaskFactory() { - return taskFactory; - } - - public ResourceLoader getResourceLoader() { - return resourceLoader; - } - - public ExpressionFactory getExpressionFactory() { - return expressionFactory; - } - - public SchemaValidatorFactory getValidatorFactory() { - return validatorFactory; - } -} diff --git a/impl/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java b/impl/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java new file mode 100644 index 00000000..70979d03 --- /dev/null +++ b/impl/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java @@ -0,0 +1,53 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl; + +import static io.serverlessworkflow.impl.json.JsonUtils.toJavaValue; + +import com.fasterxml.jackson.databind.JsonNode; +import io.serverlessworkflow.impl.WorkflowDefinition.State; +import java.util.Optional; + +public class WorkflowInstance { + private State state; + private WorkflowContext context; + + public WorkflowInstance(WorkflowDefinition definition, JsonNode input) { + definition.inputSchemaValidator().ifPresent(v -> v.validate(input)); + context = WorkflowContext.builder(definition, input).build(); + definition + .inputFilter() + .ifPresent(f -> context.current(f.apply(context, Optional.empty(), context.current()))); + state = State.STARTED; + WorkflowUtils.processTaskList(definition.workflow().getDo(), context); + definition + .outputFilter() + .ifPresent(f -> context.current(f.apply(context, Optional.empty(), context.current()))); + definition.outputSchemaValidator().ifPresent(v -> v.validate(context.current())); + } + + public State state() { + return state; + } + + public Object output() { + return toJavaValue(context.current()); + } + + public Object outputAsJsonNode() { + return context.current(); + } +} diff --git a/impl/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java b/impl/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java index 787d17dc..47191272 100644 --- a/impl/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java +++ b/impl/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java @@ -19,21 +19,27 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.serverlessworkflow.api.WorkflowFormat; import io.serverlessworkflow.api.types.ExportAs; +import io.serverlessworkflow.api.types.FlowDirective; import io.serverlessworkflow.api.types.InputFrom; import io.serverlessworkflow.api.types.OutputAs; import io.serverlessworkflow.api.types.SchemaExternal; import io.serverlessworkflow.api.types.SchemaInline; import io.serverlessworkflow.api.types.SchemaUnion; +import io.serverlessworkflow.api.types.TaskBase; +import io.serverlessworkflow.api.types.TaskItem; import io.serverlessworkflow.impl.expressions.Expression; import io.serverlessworkflow.impl.expressions.ExpressionFactory; import io.serverlessworkflow.impl.expressions.ExpressionUtils; import io.serverlessworkflow.impl.json.JsonUtils; import io.serverlessworkflow.impl.jsonschema.SchemaValidator; import io.serverlessworkflow.impl.jsonschema.SchemaValidatorFactory; +import io.serverlessworkflow.resources.ResourceLoader; import io.serverlessworkflow.resources.StaticResource; import java.io.IOException; import java.io.InputStream; import java.io.UncheckedIOException; +import java.util.List; +import java.util.ListIterator; import java.util.Map; import java.util.Optional; @@ -46,14 +52,14 @@ public static Optional getSchemaValidator( return node.map(n -> validatorFactory.getValidator(n)); } - public static Optional schemaToNode(WorkflowFactories factories, SchemaUnion schema) { + public static Optional schemaToNode(ResourceLoader resourceLoader, SchemaUnion schema) { if (schema != null) { if (schema.getSchemaInline() != null) { SchemaInline inline = schema.getSchemaInline(); return Optional.of(JsonUtils.mapper().convertValue(inline.getDocument(), JsonNode.class)); } else if (schema.getSchemaExternal() != null) { SchemaExternal external = schema.getSchemaExternal(); - StaticResource resource = factories.getResourceLoader().loadStatic(external.getResource()); + StaticResource resource = resourceLoader.loadStatic(external.getResource()); ObjectMapper mapper = WorkflowFormat.fromFileName(resource.name()).mapper(); try (InputStream in = resource.open()) { return Optional.of(mapper.readTree(in)); @@ -89,9 +95,8 @@ public static Optional buildWorkflowFilter( private static WorkflowFilter buildWorkflowFilter( ExpressionFactory exprFactory, String str, Object object) { if (str != null) { - Expression expression = exprFactory.getExpression(str); - return expression::eval; - } else { + return buildWorkflowFilter(exprFactory, str); + } else if (object != null) { Object exprObj = ExpressionUtils.buildExpressionObject(object, exprFactory); return exprObj instanceof Map ? (w, t, n) -> @@ -99,5 +104,80 @@ private static WorkflowFilter buildWorkflowFilter( ExpressionUtils.evaluateExpressionMap((Map) exprObj, w, t, n)) : (w, t, n) -> JsonUtils.fromValue(object); } + throw new IllegalStateException("Both object and str are null"); + } + + private static TaskItem findTaskByName(ListIterator iter, String taskName) { + int currentIndex = iter.nextIndex(); + while (iter.hasPrevious()) { + TaskItem item = iter.previous(); + if (item.getName().equals(taskName)) { + return item; + } + } + while (iter.nextIndex() < currentIndex) { + iter.next(); + } + while (iter.hasNext()) { + TaskItem item = iter.next(); + if (item.getName().equals(taskName)) { + return item; + } + } + throw new IllegalArgumentException("Cannot find task with name " + taskName); + } + + public static void processTaskList(List tasks, WorkflowContext context) { + context.position().addProperty("do"); + if (!tasks.isEmpty()) { + ListIterator iter = tasks.listIterator(); + TaskItem nextTask = iter.next(); + while (nextTask != null) { + TaskItem task = nextTask; + context.position().addIndex(iter.nextIndex()).addProperty(task.getName()); + context + .definition() + .listeners() + .forEach(l -> l.onTaskStarted(context.position(), task.getTask())); + TaskContext taskContext = + context + .definition() + .taskExecutors() + .computeIfAbsent( + context.position().jsonPointer(), + k -> + context + .definition() + .taskFactory() + .getTaskExecutor(task.getTask(), context.definition())) + .apply(context, context.current()); + context.current(taskContext.output()); + FlowDirective flowDirective = taskContext.flowDirective(); + if (flowDirective.getFlowDirectiveEnum() != null) { + switch (flowDirective.getFlowDirectiveEnum()) { + case CONTINUE: + nextTask = iter.hasNext() ? iter.next() : null; + break; + case END: + case EXIT: + nextTask = null; + break; + } + } else { + nextTask = WorkflowUtils.findTaskByName(iter, flowDirective.getString()); + } + context + .definition() + .listeners() + .forEach(l -> l.onTaskEnded(context.position(), task.getTask())); + context.position().back(); + } + } + context.position().back(); + } + + public static WorkflowFilter buildWorkflowFilter(ExpressionFactory exprFactory, String str) { + Expression expression = exprFactory.getExpression(str); + return expression::eval; } } diff --git a/impl/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java b/impl/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java index 3ed5d6e6..625380df 100644 --- a/impl/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java +++ b/impl/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java @@ -24,7 +24,7 @@ import io.serverlessworkflow.api.types.TaskBase; import io.serverlessworkflow.impl.TaskContext; import io.serverlessworkflow.impl.WorkflowContext; -import io.serverlessworkflow.impl.WorkflowFactories; +import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowFilter; import io.serverlessworkflow.impl.jsonschema.SchemaValidator; import java.util.Optional; @@ -40,46 +40,47 @@ public abstract class AbstractTaskExecutor implements TaskEx private Optional outputSchemaValidator = Optional.empty(); private Optional contextSchemaValidator = Optional.empty(); - protected AbstractTaskExecutor(T task, WorkflowFactories holder) { + protected AbstractTaskExecutor(T task, WorkflowDefinition holder) { this.task = task; buildInputProcessors(holder); buildOutputProcessors(holder); buildContextProcessors(holder); } - private void buildInputProcessors(WorkflowFactories holder) { + private void buildInputProcessors(WorkflowDefinition holder) { if (task.getInput() != null) { Input input = task.getInput(); - this.inputProcessor = buildWorkflowFilter(holder.getExpressionFactory(), input.getFrom()); + this.inputProcessor = buildWorkflowFilter(holder.expressionFactory(), input.getFrom()); this.inputSchemaValidator = - getSchemaValidator(holder.getValidatorFactory(), schemaToNode(holder, input.getSchema())); + getSchemaValidator( + holder.validatorFactory(), schemaToNode(holder.resourceLoader(), input.getSchema())); } } - private void buildOutputProcessors(WorkflowFactories holder) { + private void buildOutputProcessors(WorkflowDefinition holder) { if (task.getOutput() != null) { Output output = task.getOutput(); - this.outputProcessor = buildWorkflowFilter(holder.getExpressionFactory(), output.getAs()); + this.outputProcessor = buildWorkflowFilter(holder.expressionFactory(), output.getAs()); this.outputSchemaValidator = getSchemaValidator( - holder.getValidatorFactory(), schemaToNode(holder, output.getSchema())); + holder.validatorFactory(), schemaToNode(holder.resourceLoader(), output.getSchema())); } } - private void buildContextProcessors(WorkflowFactories holder) { + private void buildContextProcessors(WorkflowDefinition holder) { if (task.getExport() != null) { Export export = task.getExport(); if (export.getAs() != null) { - this.contextProcessor = buildWorkflowFilter(holder.getExpressionFactory(), export.getAs()); + this.contextProcessor = buildWorkflowFilter(holder.expressionFactory(), export.getAs()); } this.contextSchemaValidator = getSchemaValidator( - holder.getValidatorFactory(), schemaToNode(holder, export.getSchema())); + holder.validatorFactory(), schemaToNode(holder.resourceLoader(), export.getSchema())); } } @Override - public JsonNode apply(WorkflowContext workflowContext, JsonNode rawInput) { + public TaskContext apply(WorkflowContext workflowContext, JsonNode rawInput) { TaskContext taskContext = new TaskContext<>(rawInput, task); inputSchemaValidator.ifPresent(s -> s.validate(taskContext.rawInput())); inputProcessor.ifPresent( @@ -97,9 +98,9 @@ public JsonNode apply(WorkflowContext workflowContext, JsonNode rawInput) { workflowContext.context( p.apply(workflowContext, Optional.of(taskContext), workflowContext.context()))); contextSchemaValidator.ifPresent(s -> s.validate(workflowContext.context())); - return taskContext.output(); + return taskContext; } protected abstract JsonNode internalExecute( - WorkflowContext workflow, TaskContext task, JsonNode node); + WorkflowContext workflow, TaskContext taskContext, JsonNode node); } diff --git a/impl/src/main/java/io/serverlessworkflow/impl/executors/DefaultTaskExecutorFactory.java b/impl/src/main/java/io/serverlessworkflow/impl/executors/DefaultTaskExecutorFactory.java index cb76e395..117a8ed2 100644 --- a/impl/src/main/java/io/serverlessworkflow/impl/executors/DefaultTaskExecutorFactory.java +++ b/impl/src/main/java/io/serverlessworkflow/impl/executors/DefaultTaskExecutorFactory.java @@ -18,7 +18,7 @@ import io.serverlessworkflow.api.types.CallTask; import io.serverlessworkflow.api.types.Task; import io.serverlessworkflow.api.types.TaskBase; -import io.serverlessworkflow.impl.WorkflowFactories; +import io.serverlessworkflow.impl.WorkflowDefinition; public class DefaultTaskExecutorFactory implements TaskExecutorFactory { @@ -30,12 +30,19 @@ public static TaskExecutorFactory get() { protected DefaultTaskExecutorFactory() {} - public TaskExecutor getTaskExecutor(Task task, WorkflowFactories factories) { + public TaskExecutor getTaskExecutor( + Task task, WorkflowDefinition definition) { if (task.getCallTask() != null) { CallTask callTask = task.getCallTask(); if (callTask.getCallHTTP() != null) { - return new HttpExecutor(callTask.getCallHTTP(), factories); + return new HttpExecutor(callTask.getCallHTTP(), definition); } + } else if (task.getSwitchTask() != null) { + return new SwitchExecutor(task.getSwitchTask(), definition); + } else if (task.getDoTask() != null) { + return new DoExecutor(task.getDoTask(), definition); + } else if (task.getSetTask() != null) { + return new SetExecutor(task.getSetTask(), definition); } throw new UnsupportedOperationException(task.get().getClass().getName() + " not supported yet"); } diff --git a/impl/src/main/java/io/serverlessworkflow/impl/executors/DoExecutor.java b/impl/src/main/java/io/serverlessworkflow/impl/executors/DoExecutor.java new file mode 100644 index 00000000..5a052351 --- /dev/null +++ b/impl/src/main/java/io/serverlessworkflow/impl/executors/DoExecutor.java @@ -0,0 +1,37 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors; + +import com.fasterxml.jackson.databind.JsonNode; +import io.serverlessworkflow.api.types.DoTask; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowUtils; + +public class DoExecutor extends AbstractTaskExecutor { + + protected DoExecutor(DoTask task, WorkflowDefinition holder) { + super(task, holder); + } + + @Override + protected JsonNode internalExecute( + WorkflowContext workflow, TaskContext taskContext, JsonNode node) { + WorkflowUtils.processTaskList(task.getDo(), workflow); + return node; + } +} diff --git a/impl/src/main/java/io/serverlessworkflow/impl/executors/HttpExecutor.java b/impl/src/main/java/io/serverlessworkflow/impl/executors/HttpExecutor.java index e17fd8dc..ee57344d 100644 --- a/impl/src/main/java/io/serverlessworkflow/impl/executors/HttpExecutor.java +++ b/impl/src/main/java/io/serverlessworkflow/impl/executors/HttpExecutor.java @@ -24,7 +24,7 @@ import io.serverlessworkflow.api.types.UriTemplate; import io.serverlessworkflow.impl.TaskContext; import io.serverlessworkflow.impl.WorkflowContext; -import io.serverlessworkflow.impl.WorkflowFactories; +import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.expressions.Expression; import io.serverlessworkflow.impl.expressions.ExpressionFactory; import io.serverlessworkflow.impl.expressions.ExpressionUtils; @@ -58,25 +58,25 @@ private interface RequestSupplier { JsonNode apply(Builder request, WorkflowContext workflow, TaskContext task, JsonNode node); } - public HttpExecutor(CallHTTP task, WorkflowFactories holder) { - super(task, holder); + public HttpExecutor(CallHTTP task, WorkflowDefinition definition) { + super(task, definition); HTTPArguments httpArgs = task.getWith(); - this.targetSupplier = getTargetSupplier(httpArgs.getEndpoint(), holder.getExpressionFactory()); + this.targetSupplier = getTargetSupplier(httpArgs.getEndpoint(), definition.expressionFactory()); this.headersMap = httpArgs.getHeaders() != null ? ExpressionUtils.buildExpressionMap( - httpArgs.getHeaders().getAdditionalProperties(), holder.getExpressionFactory()) + httpArgs.getHeaders().getAdditionalProperties(), definition.expressionFactory()) : Map.of(); this.queryMap = httpArgs.getQuery() != null ? ExpressionUtils.buildExpressionMap( - httpArgs.getQuery().getAdditionalProperties(), holder.getExpressionFactory()) + httpArgs.getQuery().getAdditionalProperties(), definition.expressionFactory()) : Map.of(); switch (httpArgs.getMethod().toUpperCase()) { case HttpMethod.POST: Object body = ExpressionUtils.buildExpressionObject( - httpArgs.getBody(), holder.getExpressionFactory()); + httpArgs.getBody(), definition.expressionFactory()); this.requestFunction = (request, workflow, context, node) -> request.post( diff --git a/impl/src/main/java/io/serverlessworkflow/impl/executors/SetExecutor.java b/impl/src/main/java/io/serverlessworkflow/impl/executors/SetExecutor.java new file mode 100644 index 00000000..aadbb677 --- /dev/null +++ b/impl/src/main/java/io/serverlessworkflow/impl/executors/SetExecutor.java @@ -0,0 +1,40 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors; + +import com.fasterxml.jackson.databind.JsonNode; +import io.serverlessworkflow.api.types.SetTask; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.json.JsonUtils; +import io.serverlessworkflow.impl.json.MergeUtils; + +public class SetExecutor extends AbstractTaskExecutor { + + private JsonNode toBeSet; + + protected SetExecutor(SetTask task, WorkflowDefinition holder) { + super(task, holder); + this.toBeSet = JsonUtils.fromValue(task.getSet().getAdditionalProperties()); + } + + @Override + protected JsonNode internalExecute( + WorkflowContext workflow, TaskContext taskContext, JsonNode node) { + return MergeUtils.merge(toBeSet, node); + } +} diff --git a/impl/src/main/java/io/serverlessworkflow/impl/executors/SwitchExecutor.java b/impl/src/main/java/io/serverlessworkflow/impl/executors/SwitchExecutor.java new file mode 100644 index 00000000..78c4cc79 --- /dev/null +++ b/impl/src/main/java/io/serverlessworkflow/impl/executors/SwitchExecutor.java @@ -0,0 +1,49 @@ +package io.serverlessworkflow.impl.executors; + +import com.fasterxml.jackson.databind.JsonNode; +import io.serverlessworkflow.api.types.FlowDirective; +import io.serverlessworkflow.api.types.SwitchCase; +import io.serverlessworkflow.api.types.SwitchItem; +import io.serverlessworkflow.api.types.SwitchTask; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowFilter; +import io.serverlessworkflow.impl.WorkflowUtils; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; + +public class SwitchExecutor extends AbstractTaskExecutor { + + private Map workflowFilters = new ConcurrentHashMap<>(); + private FlowDirective defaultDirective; + + protected SwitchExecutor(SwitchTask task, WorkflowDefinition holder) { + super(task, holder); + for (SwitchItem item : task.getSwitch()) { + SwitchCase switchCase = item.getSwitchCase(); + if (switchCase.getWhen() != null) { + workflowFilters.put( + switchCase, + WorkflowUtils.buildWorkflowFilter(holder.expressionFactory(), switchCase.getWhen())); + } else { + defaultDirective = switchCase.getThen(); + } + } + } + + @Override + protected JsonNode internalExecute( + WorkflowContext workflow, TaskContext taskContext, JsonNode node) { + for (Entry entry : workflowFilters.entrySet()) { + if (entry.getValue().apply(workflow, Optional.of(taskContext), node).asBoolean()) { + taskContext.flowDirective(entry.getKey().getThen()); + return node; + } + } + taskContext.flowDirective(defaultDirective); + return node; + } +} diff --git a/impl/src/main/java/io/serverlessworkflow/impl/executors/TaskExecutor.java b/impl/src/main/java/io/serverlessworkflow/impl/executors/TaskExecutor.java index 8c896385..62228199 100644 --- a/impl/src/main/java/io/serverlessworkflow/impl/executors/TaskExecutor.java +++ b/impl/src/main/java/io/serverlessworkflow/impl/executors/TaskExecutor.java @@ -17,8 +17,9 @@ import com.fasterxml.jackson.databind.JsonNode; import io.serverlessworkflow.api.types.TaskBase; +import io.serverlessworkflow.impl.TaskContext; import io.serverlessworkflow.impl.WorkflowContext; import java.util.function.BiFunction; public interface TaskExecutor - extends BiFunction {} + extends BiFunction> {} diff --git a/impl/src/main/java/io/serverlessworkflow/impl/executors/TaskExecutorFactory.java b/impl/src/main/java/io/serverlessworkflow/impl/executors/TaskExecutorFactory.java index 85cef4b1..8c399cf6 100644 --- a/impl/src/main/java/io/serverlessworkflow/impl/executors/TaskExecutorFactory.java +++ b/impl/src/main/java/io/serverlessworkflow/impl/executors/TaskExecutorFactory.java @@ -17,8 +17,8 @@ import io.serverlessworkflow.api.types.Task; import io.serverlessworkflow.api.types.TaskBase; -import io.serverlessworkflow.impl.WorkflowFactories; +import io.serverlessworkflow.impl.WorkflowDefinition; public interface TaskExecutorFactory { - TaskExecutor getTaskExecutor(Task task, WorkflowFactories factories); + TaskExecutor getTaskExecutor(Task task, WorkflowDefinition definition); } diff --git a/impl/src/test/java/io/serverlessworkflow/impl/WorkflowDefinitionTest.java b/impl/src/test/java/io/serverlessworkflow/impl/WorkflowDefinitionTest.java index c1dbd85f..35a0ba69 100644 --- a/impl/src/test/java/io/serverlessworkflow/impl/WorkflowDefinitionTest.java +++ b/impl/src/test/java/io/serverlessworkflow/impl/WorkflowDefinitionTest.java @@ -82,6 +82,39 @@ private static Stream provideParameters() { Arguments.of( "callPostHttp.yaml", Map.of("name", "Javierito", "status", "available"), - new Condition<>(o -> o.equals("Javierito"), "CallHttpPostCondition"))); + new Condition<>(o -> o.equals("Javierito"), "CallHttpPostCondition")), + Arguments.of( + "switch-then-string.yaml", + Map.of("orderType", "electronic"), + new Condition( + o -> + o.equals( + Map.of("orderType", "electronic", "validate", true, "status", "fulfilled")), + "switch-electronic")), + Arguments.of( + "switch-then-string.yaml", + Map.of("orderType", "physical"), + new Condition( + o -> + o.equals( + Map.of( + "orderType", + "physical", + "inventory", + "clear", + "items", + 1, + "address", + "Elmer St")), + "switch-physical")), + Arguments.of( + "switch-then-string.yaml", + Map.of("orderType", "unknown"), + new Condition( + o -> + o.equals( + Map.of( + "orderType", "unknown", "log", "warn", "message", "something's wrong")), + "switch-unknown"))); } } diff --git a/impl/src/test/resources/switch-then-string.yaml b/impl/src/test/resources/switch-then-string.yaml new file mode 100644 index 00000000..d9891a4d --- /dev/null +++ b/impl/src/test/resources/switch-then-string.yaml @@ -0,0 +1,45 @@ +document: + dsl: '1.0.0-alpha5' + namespace: test + name: sample-workflow + version: 0.1.0 +do: + - processOrder: + switch: + - case1: + when: .orderType == "electronic" + then: processElectronicOrder + - case2: + when: .orderType == "physical" + then: processPhysicalOrder + - default: + then: handleUnknownOrderType + - processElectronicOrder: + do: + - validatePayment: + set: + validate: true + - fulfillOrder: + set: + status: fulfilled + then: exit + - processPhysicalOrder: + do: + - checkInventory: + set: + inventory: clear + - packItems: + set: + items: 1 + - scheduleShipping: + set: + address: Elmer St + then: exit + - handleUnknownOrderType: + do: + - logWarning: + set: + log: warn + - notifyAdmin: + set: + message: something's wrong