From 6a914ee4d5e5471760e4106919db521fa6df3d4a Mon Sep 17 00:00:00 2001 From: Adam Grare Date: Wed, 23 Oct 2024 11:14:24 -0400 Subject: [PATCH 1/3] Add shared workflow_base specs --- spec/shared/workflow_base_shared.rb | 25 +++++++++++++++++++++++++ spec/spec_helper.rb | 1 + spec/workflow/item_processor_spec.rb | 24 +----------------------- 3 files changed, 27 insertions(+), 23 deletions(-) create mode 100644 spec/shared/workflow_base_shared.rb diff --git a/spec/shared/workflow_base_shared.rb b/spec/shared/workflow_base_shared.rb new file mode 100644 index 00000000..53e33b42 --- /dev/null +++ b/spec/shared/workflow_base_shared.rb @@ -0,0 +1,25 @@ +shared_examples_for "WorkflowBase" do + it "raises an exception for missing States field" do + payload = {"StartAt" => "Missing"} + expect { described_class.new(payload, ["Map"]) } + .to raise_error(Floe::InvalidWorkflowError, "Map does not have required field \"States\"") + end + + it "raises an exception for missing StartAt field" do + payload = {"States" => {"First" => {"Type" => "Succeed"}}} + expect { described_class.new(payload, ["Map"]) } + .to raise_error(Floe::InvalidWorkflowError, "Map does not have required field \"StartAt\"") + end + + it "raises an exception if StartAt isn't in States" do + payload = {"StartAt" => "First", "States" => {"Second" => {"Type" => "Succeed"}}} + expect { described_class.new(payload, ["Map"]) } + .to raise_error(Floe::InvalidWorkflowError, "Map field \"StartAt\" value \"First\" is not found in \"States\"") + end + + it "raises an exception if a Next state isn't in States" do + payload = {"StartAt" => "First", "States" => {"First" => {"Type" => "Pass", "Next" => "Last"}}} + expect { described_class.new(payload, ["Map"]) } + .to raise_error(Floe::InvalidWorkflowError, "States.First field \"Next\" value \"Last\" is not found in \"States\"") + end +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index e0044d97..5146a7b0 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -19,6 +19,7 @@ end Dir['./spec/support/**/*.rb'].sort.each { |f| require f } +Dir['./spec/shared/**/*.rb'].sort.each { |f| require f } # See https://rubydoc.info/gems/rspec-core/RSpec/Core/Configuration RSpec.configure do |config| diff --git a/spec/workflow/item_processor_spec.rb b/spec/workflow/item_processor_spec.rb index 0dd52dbc..c00c8610 100644 --- a/spec/workflow/item_processor_spec.rb +++ b/spec/workflow/item_processor_spec.rb @@ -1,25 +1,3 @@ RSpec.describe Floe::Workflow::ItemProcessor do - it "raises an exception for missing States field" do - payload = {"StartAt" => "Missing"} - expect { described_class.new(payload, ["Map"]) } - .to raise_error(Floe::InvalidWorkflowError, "Map does not have required field \"States\"") - end - - it "raises an exception for missing StartAt field" do - payload = {"States" => {"First" => {"Type" => "Succeed"}}} - expect { described_class.new(payload, ["Map"]) } - .to raise_error(Floe::InvalidWorkflowError, "Map does not have required field \"StartAt\"") - end - - it "raises an exception if StartAt isn't in States" do - payload = {"StartAt" => "First", "States" => {"Second" => {"Type" => "Succeed"}}} - expect { described_class.new(payload, ["Map"]) } - .to raise_error(Floe::InvalidWorkflowError, "Map field \"StartAt\" value \"First\" is not found in \"States\"") - end - - it "raises an exception if a Next state isn't in States" do - payload = {"StartAt" => "First", "States" => {"First" => {"Type" => "Pass", "Next" => "Last"}}} - expect { described_class.new(payload, ["Map"]) } - .to raise_error(Floe::InvalidWorkflowError, "States.First field \"Next\" value \"Last\" is not found in \"States\"") - end + include_examples "WorkflowBase" end From 7d1de74b43ed6a31eef2ef6904bb52fc3861460f Mon Sep 17 00:00:00 2001 From: Adam Grare Date: Tue, 8 Oct 2024 15:07:11 -0400 Subject: [PATCH 2/3] Parallel State --- examples/parallel.asl | 32 +++ lib/floe.rb | 1 + lib/floe/workflow/branch.rb | 8 + lib/floe/workflow/states/parallel.rb | 111 ++++++++- spec/workflow/branch_spec.rb | 3 + spec/workflow/states/parallel_spec.rb | 346 ++++++++++++++++++++++++++ 6 files changed, 499 insertions(+), 2 deletions(-) create mode 100644 examples/parallel.asl create mode 100644 lib/floe/workflow/branch.rb create mode 100644 spec/workflow/branch_spec.rb create mode 100644 spec/workflow/states/parallel_spec.rb diff --git a/examples/parallel.asl b/examples/parallel.asl new file mode 100644 index 00000000..f04444d9 --- /dev/null +++ b/examples/parallel.asl @@ -0,0 +1,32 @@ +{ + "Comment": "Parallel Example.", + "StartAt": "FunWithMath", + "States": { + "FunWithMath": { + "Type": "Parallel", + "End": true, + "Branches": [ + { + "StartAt": "Add", + "States": { + "Add": { + "Type": "Task", + "Resource": "docker://docker.io/agrare/sleep:latest", + "End": true + } + } + }, + { + "StartAt": "Subtract", + "States": { + "Subtract": { + "Type": "Task", + "Resource": "docker://docker.io/agrare/sleep:latest", + "End": true + } + } + } + ] + } + } +} diff --git a/lib/floe.rb b/lib/floe.rb index 007ddb83..5aa1d789 100644 --- a/lib/floe.rb +++ b/lib/floe.rb @@ -12,6 +12,7 @@ require_relative "floe/workflow" # mixins used by workflow components require_relative "floe/workflow/error_matcher_mixin" +require_relative "floe/workflow/branch" require_relative "floe/workflow/catcher" require_relative "floe/workflow/choice_rule" require_relative "floe/workflow/choice_rule/not" diff --git a/lib/floe/workflow/branch.rb b/lib/floe/workflow/branch.rb new file mode 100644 index 00000000..e9b66d2e --- /dev/null +++ b/lib/floe/workflow/branch.rb @@ -0,0 +1,8 @@ +# frozen_string_literal: true + +module Floe + class Workflow + class Branch < Floe::WorkflowBase + end + end +end diff --git a/lib/floe/workflow/states/parallel.rb b/lib/floe/workflow/states/parallel.rb index 997f4e78..777905ba 100644 --- a/lib/floe/workflow/states/parallel.rb +++ b/lib/floe/workflow/states/parallel.rb @@ -4,9 +4,116 @@ module Floe class Workflow module States class Parallel < Floe::Workflow::State - def initialize(*) + include InputOutputMixin + include NonTerminalMixin + include RetryCatchMixin + + attr_reader :end, :next, :parameters, :input_path, :output_path, :result_path, + :result_selector, :retry, :catch, :branches + + def initialize(workflow, name, payload) + super + + missing_field_error!("Branches") if payload["Branches"].nil? + + @next = payload["Next"] + @end = !!payload["End"] + @parameters = PayloadTemplate.new(payload["Parameters"]) if payload["Parameters"] + @input_path = Path.new(payload.fetch("InputPath", "$")) + @output_path = Path.new(payload.fetch("OutputPath", "$")) + @result_path = ReferencePath.new(payload.fetch("ResultPath", "$")) + @result_selector = PayloadTemplate.new(payload["ResultSelector"]) if payload["ResultSelector"] + @retry = payload["Retry"].to_a.map { |retrier| Retrier.new(retrier) } + @catch = payload["Catch"].to_a.map { |catcher| Catcher.new(catcher) } + @branches = payload["Branches"].map { |branch| Branch.new(branch) } + + validate_state!(workflow) + end + + def start(context) + super + + input = process_input(context) + + context.state["BranchContext"] = branches.map { |_branch| Context.new({"Execution" => {"Id" => context.execution["Id"]}}, :input => input.to_json).to_h } + end + + def finish(context) + if success?(context) + result = each_branch_context(context).map(&:output) + context.output = process_output(context, result) + else + error = parse_error(context) + retry_state!(context, error) || catch_error!(context, error) || fail_workflow!(context, error) + end + super - raise NotImplementedError + end + + def run_nonblock!(context) + start(context) unless context.state_started? + + step_nonblock!(context) while running?(context) + return Errno::EAGAIN unless ready?(context) + + finish(context) if ended?(context) + end + + def end? + @end + end + + def ready?(context) + !context.state_started? || each_branch(context).any? { |branch, ctx| branch.step_nonblock_ready?(ctx) } + end + + def wait_until(context) + each_branch(context).filter_map { |branch, ctx| branch.wait_until(ctx) }.min + end + + def waiting?(context) + each_branch(context).any? { |branch, ctx| branch.waiting?(ctx) } + end + + def running?(context) + !ended?(context) + end + + def ended?(context) + each_branch_context(context).all?(&:ended?) + end + + def success?(context) + each_branch_context(context).none?(&:failed?) + end + + private + + def step_nonblock!(context) + each_branch(context).each do |branch, ctx| + branch.run_nonblock(ctx) if branch.step_nonblock_ready?(ctx) + end + end + + def each_branch(context) + branches.filter_map.with_index do |branch, i| + ctx = context.state.dig("BranchContext", i) + next if ctx.nil? + + [branch, Context.new(ctx)] + end + end + + def each_branch_context(context) + context.state["BranchContext"].map { |ctx| Context.new(ctx) } + end + + def parse_error(context) + each_branch_context(context).detect(&:failed?)&.output || {"Error" => "States.Error"} + end + + def validate_state!(workflow) + validate_state_next!(workflow) end end end diff --git a/spec/workflow/branch_spec.rb b/spec/workflow/branch_spec.rb new file mode 100644 index 00000000..8751462d --- /dev/null +++ b/spec/workflow/branch_spec.rb @@ -0,0 +1,3 @@ +RSpec.describe Floe::Workflow::Branch do + include_examples "WorkflowBase" +end diff --git a/spec/workflow/states/parallel_spec.rb b/spec/workflow/states/parallel_spec.rb new file mode 100644 index 00000000..da864782 --- /dev/null +++ b/spec/workflow/states/parallel_spec.rb @@ -0,0 +1,346 @@ +RSpec.describe Floe::Workflow::States::Parallel do + let(:input) { [3, 2] } + let(:ctx) { Floe::Workflow::Context.new(:input => input.to_json) } + let(:state) { workflow.start_workflow.current_state } + let(:workflow) do + payload = { + "FunWithMath" => { + "Type" => "Parallel", + "Branches" => [ + { + "StartAt" => "Add", + "States" => { + "Add" => { + "Type" => "Pass", + "Parameters" => {"result.$" => "States.MathAdd($.[0], $.[1])"}, + "OutputPath" => "$.result", + "End" => true + } + } + }, + { + "StartAt" => "ArrayLength", + "States" => { + "ArrayLength" => { + "Type" => "Pass", + "Parameters" => {"result.$" => "States.ArrayLength($.)"}, + "OutputPath" => "$.result", + "End" => true + } + } + } + ], + "Next" => "NextState" + }, + "NextState" => { + "Type" => "Succeed" + } + } + + make_workflow(ctx, payload) + end + + describe "#initialize" do + it "builds the Parallel state object" do + expect { workflow }.not_to raise_error + end + + it "raises an InvalidWorkflowError with a missing Branches parameter" do + payload = { + "FunWithMath" => { + "Type" => "Parallel", + "End" => true + } + } + + expect { make_workflow(ctx, payload) } + .to raise_error(Floe::InvalidWorkflowError, "States.FunWithMath does not have required field \"Branches\"") + end + + it "raises an InvalidWorkflowError with a missing Next and End" do + payload = { + "FunWithMath" => { + "Type" => "Parallel", + "Branches" => [ + { + "StartAt" => "Add", + "States" => { + "Add" => { + "Type" => "Pass", + "End" => true + } + } + } + ] + } + } + + expect { make_workflow(ctx, payload) } + .to raise_error(Floe::InvalidWorkflowError, "States.FunWithMath does not have required field \"Next\"") + end + + it "raises an InvalidWorkflowError if a state in a Branch attempts to transition to a state in the outer workflow" do + payload = { + "FunWithMath" => { + "Type" => "Parallel", + "Branches" => [ + { + "StartAt" => "Add", + "States" => { + "Add" => { + "Type" => "Pass", + "Next" => "PassState" + } + } + } + ], + "Next" => "PassState" + }, + "PassState" => { + "Type" => "Pass", + "Next" => "SucceedState" + }, + "SucceedState" => { + "Type" => "Succeed" + } + } + + expect { make_workflow(ctx, payload) } + .to raise_error(Floe::InvalidWorkflowError, "States.Add field \"Next\" value \"PassState\" is not found in \"States\"") + end + + it "raises an InvalidWorkflowError if a state in a Branch attemps to transition to a state in another branch" do + payload = { + "FunWithMath" => { + "Type" => "Parallel", + "Branches" => [ + { + "StartAt" => "Add", + "States" => { + "Add" => { + "Type" => "Pass", + "Next" => "Subtract" + } + } + }, + { + "StartAt" => "Subtract", + "States" => { + "Add" => { + "Type" => "Pass", + "End" => true + } + } + } + ], + "Next" => "PassState" + }, + "PassState" => { + "Type" => "Pass", + "Next" => "SucceedState" + }, + "SucceedState" => { + "Type" => "Succeed" + } + } + + expect { make_workflow(ctx, payload) } + .to raise_error(Floe::InvalidWorkflowError, "States.Add field \"Next\" value \"Subtract\" is not found in \"States\"") + end + end + + it "#end?" do + expect(state.end?).to be false + end + + describe "#start" do + it "initializes branch context for each branch" do + state.start(ctx) + expect(ctx.state["BranchContext"].length).to eq(2) + end + + it "copies the execution id into the branch contexts" do + state.start(ctx) + + expect(ctx.state.dig("BranchContext", 0, "Execution", "Id")).to eq(ctx.dig("Execution", "Id")) + expect(ctx.state.dig("BranchContext", 1, "Execution", "Id")).to eq(ctx.dig("Execution", "Id")) + end + + it "copies the state input into the branch contexts" do + state.start(ctx) + + expect(ctx.state.dig("BranchContext", 0, "Execution", "Input")).to eq(ctx.dig("State", "Input")) + expect(ctx.state.dig("BranchContext", 1, "Execution", "Input")).to eq(ctx.dig("State", "Input")) + end + end + + describe "#finish" do + before { state.start(ctx) } + + context "with all successful branches" do + before do + ctx.state["BranchContext"][0]["State"] = {"Output" => {"foo" => "bar"}} + ctx.state["BranchContext"][1]["State"] = {"Output" => 2} + end + + it "sets the state output to an array of the branch output" do + state.finish(ctx) + + expect(ctx.failed?).to be_falsey + expect(ctx.output).to eq([{"foo" => "bar"}, 2]) + end + end + + context "with one failed branch" do + before do + ctx.state["BranchContext"][0]["State"] = {"Output" => {"foo" => "bar"}} + ctx.state["BranchContext"][1]["State"] = {"Output" => {"Error" => "States.TaskFailed"}} + end + + it "sets the state as failed" do + state.finish(ctx) + + expect(ctx.failed?).to be_truthy + expect(ctx.output).to eq("Error" => "States.TaskFailed") + end + end + end + + describe "#run_nonblock!" do + it "sets next to NextState" do + loop while state.run_nonblock!(ctx) != 0 + expect(ctx.next_state).to eq("NextState") + end + + it "sets the context output to array of branch outputs" do + loop while state.run_nonblock!(ctx) != 0 + expect(ctx.output).to eq([5, 2]) + end + end + + describe "#ready?" do + context "before the state has started" do + it "returns truthy" do + expect(state.ready?(ctx)).to be_truthy + end + end + + context "with the state started" do + before { state.start(ctx) } + + context "before the branches have started" do + it "returns truthy" do + expect(state.ready?(ctx)).to be_truthy + end + end + end + end + + describe "#running?" do + before { state.start(ctx) } + + context "with one branch not ended" do + before { ctx.state["BranchContext"][0]["Execution"]["EndTime"] = Time.now.utc } + + it "returns truthy" do + expect(state.running?(ctx)).to be_truthy + end + end + + context "with all branches ended" do + before { ctx.state["BranchContext"].each { |ctx| ctx["Execution"]["EndTime"] = Time.now.utc } } + + it "returns false" do + expect(state.running?(ctx)).to be_falsey + end + end + + context "with some branches not ended" do + before { ctx.state["BranchContext"][0]["Execution"]["EndTime"] = Time.now.utc } + + it "returns true" do + expect(state.running?(ctx)).to be_truthy + end + end + end + + describe "#waiting?" do + context "with no branches waiting" do + it "returns falsey" do + expect(state.waiting?(ctx)).to be_falsey + end + end + + context "with one branch waiting" do + before do + state.start(ctx) + ctx.state["BranchContext"][0]["State"]["Name"] = "Add" + ctx.state["BranchContext"][0]["State"]["WaitUntil"] = (Time.now + 3_600).iso8601 + end + + it "returns truthy" do + expect(state.waiting?(ctx)).to be_truthy + end + end + + context "with one branch done waiting" do + before do + state.start(ctx) + ctx.state["BranchContext"][0]["State"]["Name"] = "Add" + ctx.state["BranchContext"][0]["State"]["WaitUntil"] = (Time.now - 3_600).iso8601 + end + + it "returns falsey" do + expect(state.waiting?(ctx)).to be_falsey + end + end + end + + describe "#wait_until" do + context "with no branches waiting" do + it "returns falsey" do + expect(state.wait_until(ctx)).to be_falsey + end + end + + context "with one branch waiting" do + let(:wait_until) { (Time.now + 3_600).iso8601 } + + before do + state.start(ctx) + ctx.state["BranchContext"][0]["State"]["Name"] = "Add" + ctx.state["BranchContext"][0]["State"]["WaitUntil"] = wait_until + end + + it "returns the time to wait until" do + expect(state.wait_until(ctx)).to eq(Time.parse(wait_until)) + end + end + end + + describe "#success?" do + before { state.start(ctx) } + + context "with all successful branches" do + before do + ctx.state["BranchContext"][0]["State"] = {"Output" => {"foo" => "bar"}} + ctx.state["BranchContext"][1]["State"] = {"Output" => 2} + end + + it "returns truthy" do + expect(state.success?(ctx)).to be_truthy + end + end + + context "with one failed branch" do + before do + ctx.state["BranchContext"][0]["State"] = {"Output" => {"foo" => "bar"}} + ctx.state["BranchContext"][1]["State"] = {"Output" => {"Error" => "States.TaskFailed"}} + end + + it "returns falsey" do + expect(state.success?(ctx)).to be_falsey + end + end + end +end From ecce6cfb4c00e6b9696f2e18041888c9c8f97566 Mon Sep 17 00:00:00 2001 From: Adam Grare Date: Fri, 25 Oct 2024 13:14:55 -0400 Subject: [PATCH 3/3] Add ChildWorkflowMixin --- lib/floe.rb | 1 + .../workflow/states/child_workflow_mixin.rb | 58 +++++++++++++++++ lib/floe/workflow/states/map.rb | 64 +++++-------------- lib/floe/workflow/states/parallel.rb | 60 +++-------------- 4 files changed, 82 insertions(+), 101 deletions(-) create mode 100644 lib/floe/workflow/states/child_workflow_mixin.rb diff --git a/lib/floe.rb b/lib/floe.rb index 5aa1d789..6cfe559e 100644 --- a/lib/floe.rb +++ b/lib/floe.rb @@ -30,6 +30,7 @@ require_relative "floe/workflow/retrier" require_relative "floe/workflow/state" # mixins used by states +require_relative "floe/workflow/states/child_workflow_mixin" require_relative "floe/workflow/states/input_output_mixin" require_relative "floe/workflow/states/non_terminal_mixin" require_relative "floe/workflow/states/retry_catch_mixin" diff --git a/lib/floe/workflow/states/child_workflow_mixin.rb b/lib/floe/workflow/states/child_workflow_mixin.rb new file mode 100644 index 00000000..7d1fd3cb --- /dev/null +++ b/lib/floe/workflow/states/child_workflow_mixin.rb @@ -0,0 +1,58 @@ +# frozen_string_literal: true + +module Floe + class Workflow + module States + module ChildWorkflowMixin + def run_nonblock!(context) + start(context) unless context.state_started? + + step_nonblock!(context) while running?(context) + return Errno::EAGAIN unless ready?(context) + + finish(context) if ended?(context) + end + + def finish(context) + if success?(context) + result = each_child_context(context).map(&:output) + context.output = process_output(context, result) + else + error = parse_error(context) + retry_state!(context, error) || catch_error!(context, error) || fail_workflow!(context, error) + end + + super + end + + def ready?(context) + !context.state_started? || each_child_workflow(context).any? { |wf, ctx| wf.step_nonblock_ready?(ctx) } + end + + def wait_until(context) + each_child_workflow(context).filter_map { |wf, ctx| wf.wait_until(ctx) }.min + end + + def waiting?(context) + each_child_workflow(context).any? { |wf, ctx| wf.waiting?(ctx) } + end + + def running?(context) + !ended?(context) + end + + def ended?(context) + each_child_context(context).all?(&:ended?) + end + + def success?(context) + each_child_context(context).none?(&:failed?) + end + + def each_child_context(context) + context.state[child_context_key].map { |ctx| Context.new(ctx) } + end + end + end + end +end diff --git a/lib/floe/workflow/states/map.rb b/lib/floe/workflow/states/map.rb index 65d62fdb..e1af12cd 100644 --- a/lib/floe/workflow/states/map.rb +++ b/lib/floe/workflow/states/map.rb @@ -4,6 +4,7 @@ module Floe class Workflow module States class Map < Floe::Workflow::State + include ChildWorkflowMixin include InputOutputMixin include NonTerminalMixin include RetryCatchMixin @@ -53,53 +54,12 @@ def start(context) context.state["ItemProcessorContext"] = input.map { |item| Context.new({"Execution" => {"Id" => context.execution["Id"]}}, :input => item.to_json).to_h } end - def finish(context) - if success?(context) - result = each_item_processor(context).map(&:output) - context.output = process_output(context, result) - else - error = parse_error(context) - retry_state!(context, error) || catch_error!(context, error) || fail_workflow!(context, error) - end - - super - end - - def run_nonblock!(context) - start(context) unless context.state_started? - - step_nonblock!(context) while running?(context) - return Errno::EAGAIN unless ready?(context) - - finish(context) if ended?(context) - end - def end? @end end - def ready?(context) - !context.state_started? || each_item_processor(context).any? { |ctx| item_processor.step_nonblock_ready?(ctx) } - end - - def wait_until(context) - each_item_processor(context).filter_map { |ctx| item_processor.wait_until(ctx) }.min - end - - def waiting?(context) - each_item_processor(context).any? { |ctx| item_processor.waiting?(ctx) } - end - - def running?(context) - !ended?(context) - end - - def ended?(context) - each_item_processor(context).all?(&:ended?) - end - def success?(context) - contexts = each_item_processor(context) + contexts = each_child_context(context) num_failed = contexts.count(&:failed?) total = contexts.count @@ -120,12 +80,8 @@ def success?(context) private - def each_item_processor(context) - context.state["ItemProcessorContext"].map { |ctx| Context.new(ctx) } - end - def step_nonblock!(context) - each_item_processor(context).each do |ctx| + each_child_context(context).each do |ctx| # If this iteration isn't already running and we can't start any more next if !ctx.started? && concurrency_exceeded?(context) @@ -133,12 +89,18 @@ def step_nonblock!(context) end end + def each_child_workflow(context) + each_child_context(context).map do |ctx| + [item_processor, Context.new(ctx)] + end + end + def concurrency_exceeded?(context) max_concurrency && num_running(context) >= max_concurrency end def num_running(context) - each_item_processor(context).count(&:running?) + each_child_context(context).count(&:running?) end def parse_error(context) @@ -148,10 +110,14 @@ def parse_error(context) if tolerated_failure_count || tolerated_failure_percentage {"Error" => "States.ExceedToleratedFailureThreshold"} else - each_item_processor(context).detect(&:failed?)&.output || {"Error" => "States.Error"} + each_child_context(context).detect(&:failed?)&.output || {"Error" => "States.Error"} end end + def child_context_key + "ItemProcessorContext" + end + def validate_state!(workflow) validate_state_next!(workflow) invalid_field_error!("MaxConcurrency", @max_concurrency, "must be greater than 0") if @max_concurrency && @max_concurrency <= 0 diff --git a/lib/floe/workflow/states/parallel.rb b/lib/floe/workflow/states/parallel.rb index 777905ba..29818fb1 100644 --- a/lib/floe/workflow/states/parallel.rb +++ b/lib/floe/workflow/states/parallel.rb @@ -4,6 +4,7 @@ module Floe class Workflow module States class Parallel < Floe::Workflow::State + include ChildWorkflowMixin include InputOutputMixin include NonTerminalMixin include RetryCatchMixin @@ -38,64 +39,19 @@ def start(context) context.state["BranchContext"] = branches.map { |_branch| Context.new({"Execution" => {"Id" => context.execution["Id"]}}, :input => input.to_json).to_h } end - def finish(context) - if success?(context) - result = each_branch_context(context).map(&:output) - context.output = process_output(context, result) - else - error = parse_error(context) - retry_state!(context, error) || catch_error!(context, error) || fail_workflow!(context, error) - end - - super - end - - def run_nonblock!(context) - start(context) unless context.state_started? - - step_nonblock!(context) while running?(context) - return Errno::EAGAIN unless ready?(context) - - finish(context) if ended?(context) - end - def end? @end end - def ready?(context) - !context.state_started? || each_branch(context).any? { |branch, ctx| branch.step_nonblock_ready?(ctx) } - end - - def wait_until(context) - each_branch(context).filter_map { |branch, ctx| branch.wait_until(ctx) }.min - end - - def waiting?(context) - each_branch(context).any? { |branch, ctx| branch.waiting?(ctx) } - end - - def running?(context) - !ended?(context) - end - - def ended?(context) - each_branch_context(context).all?(&:ended?) - end - - def success?(context) - each_branch_context(context).none?(&:failed?) - end - private def step_nonblock!(context) - each_branch(context).each do |branch, ctx| - branch.run_nonblock(ctx) if branch.step_nonblock_ready?(ctx) + each_child_workflow(context).each do |wf, ctx| + wf.run_nonblock(ctx) if wf.step_nonblock_ready?(ctx) end end - def each_branch(context) + def each_child_workflow(context) branches.filter_map.with_index do |branch, i| ctx = context.state.dig("BranchContext", i) next if ctx.nil? @@ -104,12 +60,12 @@ def each_branch(context) end end - def each_branch_context(context) - context.state["BranchContext"].map { |ctx| Context.new(ctx) } + def parse_error(context) + each_child_context(context).detect(&:failed?)&.output || {"Error" => "States.Error"} end - def parse_error(context) - each_branch_context(context).detect(&:failed?)&.output || {"Error" => "States.Error"} + def child_context_key + "BranchContext" end def validate_state!(workflow)