Skip to content

Commit

Permalink
[Fix serverlessworkflow#466] Implement switch
Browse files Browse the repository at this point in the history
Signed-off-by: Francisco Javier Tirado Sarti <[email protected]>
  • Loading branch information
fjtirado committed Nov 22, 2024
1 parent 11d0050 commit 9db247c
Show file tree
Hide file tree
Showing 16 changed files with 495 additions and 172 deletions.
18 changes: 16 additions & 2 deletions impl/src/main/java/io/serverlessworkflow/impl/TaskContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package io.serverlessworkflow.impl;

import com.fasterxml.jackson.databind.JsonNode;
import io.serverlessworkflow.api.types.FlowDirective;
import io.serverlessworkflow.api.types.FlowDirectiveEnum;
import io.serverlessworkflow.api.types.TaskBase;

public class TaskContext<T extends TaskBase> {
Expand All @@ -26,11 +28,13 @@ public class TaskContext<T extends TaskBase> {
private JsonNode input;
private JsonNode output;
private JsonNode rawOutput;
private FlowDirective flowDirective;

public TaskContext(JsonNode rawInput, T task) {
this.rawInput = rawInput;
this.input = rawInput;
this.task = task;
this.flowDirective = task.getThen();
}

public void input(JsonNode input) {
Expand All @@ -54,6 +58,10 @@ public void rawOutput(JsonNode output) {
this.output = output;
}

public JsonNode rawOutput() {
return rawOutput;
}

public void output(JsonNode output) {
this.output = output;
}
Expand All @@ -62,7 +70,13 @@ public JsonNode output() {
return output;
}

public JsonNode rawOutput() {
return rawOutput;
public void flowDirective(FlowDirective flowDirective) {
this.flowDirective = flowDirective;
}

public FlowDirective flowDirective() {
return flowDirective == null
? new FlowDirective().withFlowDirectiveEnum(FlowDirectiveEnum.CONTINUE)
: flowDirective;
}
}
31 changes: 25 additions & 6 deletions impl/src/main/java/io/serverlessworkflow/impl/WorkflowContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,31 @@

public class WorkflowContext {
private final WorkflowPosition position;
private JsonNode context;
private final WorkflowDefinition definition;
private final JsonNode input;
private JsonNode current;
private JsonNode context;

private WorkflowContext(WorkflowPosition position, JsonNode input) {
private WorkflowContext(
WorkflowPosition position, WorkflowDefinition definition, JsonNode input) {
this.position = position;
this.definition = definition;
this.input = input;
this.current = input.deepCopy();
this.context = JsonUtils.mapper().createObjectNode();
}

public static Builder builder(JsonNode input) {
return new Builder(input);
public static Builder builder(WorkflowDefinition definition, JsonNode input) {
return new Builder(definition, input);
}

public static class Builder {
private WorkflowPosition position = new DefaultWorkflowPosition();
private WorkflowDefinition definition;
private JsonNode input;

private Builder(JsonNode input) {
private Builder(WorkflowDefinition definition, JsonNode input) {
this.definition = definition;
this.input = input;
}

Expand All @@ -47,7 +54,7 @@ public Builder position(WorkflowPosition position) {
}

public WorkflowContext build() {
return new WorkflowContext(position, input);
return new WorkflowContext(position, definition, input);
}
}

Expand All @@ -66,4 +73,16 @@ public void context(JsonNode context) {
public JsonNode rawInput() {
return input;
}

public void current(JsonNode output) {
this.current = output;
}

public JsonNode current() {
return current;
}

public WorkflowDefinition definition() {
return definition;
}
}
150 changes: 75 additions & 75 deletions impl/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,10 @@
package io.serverlessworkflow.impl;

import static io.serverlessworkflow.impl.WorkflowUtils.*;
import static io.serverlessworkflow.impl.json.JsonUtils.*;

import com.fasterxml.jackson.databind.JsonNode;
import io.serverlessworkflow.api.types.Input;
import io.serverlessworkflow.api.types.Output;
import io.serverlessworkflow.api.types.TaskBase;
import io.serverlessworkflow.api.types.TaskItem;
import io.serverlessworkflow.api.types.Workflow;
import io.serverlessworkflow.impl.executors.DefaultTaskExecutorFactory;
import io.serverlessworkflow.impl.executors.TaskExecutor;
Expand All @@ -34,52 +31,60 @@
import io.serverlessworkflow.impl.jsonschema.SchemaValidator;
import io.serverlessworkflow.impl.jsonschema.SchemaValidatorFactory;
import io.serverlessworkflow.resources.DefaultResourceLoaderFactory;
import io.serverlessworkflow.resources.ResourceLoader;
import io.serverlessworkflow.resources.ResourceLoaderFactory;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

public class WorkflowDefinition {

private final Workflow workflow;
private final Collection<WorkflowExecutionListener> listeners;
private Optional<SchemaValidator> inputSchemaValidator = Optional.empty();
private Optional<SchemaValidator> outputSchemaValidator = Optional.empty();
private Optional<WorkflowFilter> inputFilter = Optional.empty();
private Optional<WorkflowFilter> outputFilter = Optional.empty();
private final TaskExecutorFactory taskFactory;
private final ExpressionFactory exprFactory;
private final ResourceLoader resourceLoader;
private final SchemaValidatorFactory schemaValidatorFactory;
private final Map<String, TaskExecutor<? extends TaskBase>> taskExecutors =
new ConcurrentHashMap<>();

private WorkflowDefinition(
Workflow workflow,
Collection<WorkflowExecutionListener> listeners,
WorkflowFactories factories) {
TaskExecutorFactory taskFactory,
ResourceLoader resourceLoader,
ExpressionFactory exprFactory,
SchemaValidatorFactory schemaValidatorFactory) {
this.workflow = workflow;
this.listeners = listeners;
this.factories = factories;
this.taskFactory = taskFactory;
this.exprFactory = exprFactory;
this.schemaValidatorFactory = schemaValidatorFactory;
this.resourceLoader = resourceLoader;
if (workflow.getInput() != null) {
Input input = workflow.getInput();
this.inputSchemaValidator =
getSchemaValidator(
factories.getValidatorFactory(), schemaToNode(factories, input.getSchema()));
this.inputFilter = buildWorkflowFilter(factories.getExpressionFactory(), input.getFrom());
schemaValidatorFactory, schemaToNode(resourceLoader, input.getSchema()));
this.inputFilter = buildWorkflowFilter(exprFactory, input.getFrom());
}
if (workflow.getOutput() != null) {
Output output = workflow.getOutput();
this.outputSchemaValidator =
getSchemaValidator(
factories.getValidatorFactory(), schemaToNode(factories, output.getSchema()));
this.outputFilter = buildWorkflowFilter(factories.getExpressionFactory(), output.getAs());
schemaValidatorFactory, schemaToNode(resourceLoader, output.getSchema()));
this.outputFilter = buildWorkflowFilter(exprFactory, output.getAs());
}
}

private final Workflow workflow;
private final Collection<WorkflowExecutionListener> listeners;
private final WorkflowFactories factories;
private Optional<SchemaValidator> inputSchemaValidator = Optional.empty();
private Optional<SchemaValidator> outputSchemaValidator = Optional.empty();
private Optional<WorkflowFilter> inputFilter = Optional.empty();
private Optional<WorkflowFilter> outputFilter = Optional.empty();

private final Map<String, TaskExecutor<? extends TaskBase>> taskExecutors =
new ConcurrentHashMap<>();

public static class Builder {
private final Workflow workflow;
private TaskExecutorFactory taskFactory = DefaultTaskExecutorFactory.get();
Expand Down Expand Up @@ -127,18 +132,15 @@ public Builder withSchemaValidatorFactory(SchemaValidatorFactory factory) {
}

public WorkflowDefinition build() {
WorkflowDefinition def =
new WorkflowDefinition(
workflow,
listeners == null
? Collections.emptySet()
: Collections.unmodifiableCollection(listeners),
new WorkflowFactories(
taskFactory,
resourceLoaderFactory.getResourceLoader(path),
exprFactory,
schemaValidatorFactory));
return def;
return new WorkflowDefinition(
workflow,
listeners == null
? Collections.emptySet()
: Collections.unmodifiableCollection(listeners),
taskFactory,
resourceLoaderFactory.getResourceLoader(path),
exprFactory,
schemaValidatorFactory);
}
}

Expand All @@ -147,7 +149,7 @@ public static Builder builder(Workflow workflow) {
}

public WorkflowInstance execute(Object input) {
return new WorkflowInstance(JsonUtils.fromValue(input));
return new WorkflowInstance(this, JsonUtils.fromValue(input));
}

enum State {
Expand All @@ -156,50 +158,48 @@ enum State {
FINISHED
};

public class WorkflowInstance {

private JsonNode output;
private State state;
private WorkflowContext context;

private WorkflowInstance(JsonNode input) {
this.output = input;
inputSchemaValidator.ifPresent(v -> v.validate(input));
this.context = WorkflowContext.builder(input).build();
inputFilter.ifPresent(f -> output = f.apply(context, Optional.empty(), output));
this.state = State.STARTED;
processDo(workflow.getDo());
outputFilter.ifPresent(f -> output = f.apply(context, Optional.empty(), output));
outputSchemaValidator.ifPresent(v -> v.validate(output));
}
public Optional<SchemaValidator> inputSchemaValidator() {
return inputSchemaValidator;
}

private void processDo(List<TaskItem> tasks) {
context.position().addProperty("do");
int index = 0;
for (TaskItem task : tasks) {
context.position().addIndex(++index).addProperty(task.getName());
listeners.forEach(l -> l.onTaskStarted(context.position(), task.getTask()));
this.output =
taskExecutors
.computeIfAbsent(
context.position().jsonPointer(),
k -> factories.getTaskFactory().getTaskExecutor(task.getTask(), factories))
.apply(context, output);
listeners.forEach(l -> l.onTaskEnded(context.position(), task.getTask()));
context.position().back().back();
}
}
public Optional<WorkflowFilter> inputFilter() {
return inputFilter;
}

public State state() {
return state;
}
public Workflow workflow() {
return workflow;
}

public Object output() {
return toJavaValue(output);
}
public Collection<WorkflowExecutionListener> listeners() {
return listeners;
}

public Object outputAsJsonNode() {
return output;
}
public Map<String, TaskExecutor<? extends TaskBase>> taskExecutors() {
return taskExecutors;
}

public TaskExecutorFactory taskFactory() {
return taskFactory;
}

public Optional<WorkflowFilter> outputFilter() {
return outputFilter;
}

public Optional<SchemaValidator> outputSchemaValidator() {
return outputSchemaValidator;
}

public ExpressionFactory expressionFactory() {
return exprFactory;
}

public SchemaValidatorFactory validatorFactory() {
return schemaValidatorFactory;
}

public ResourceLoader resourceLoader() {

return resourceLoader;
}
}

This file was deleted.

Loading

0 comments on commit 9db247c

Please sign in to comment.