Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel state #291

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions examples/parallel.asl
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{
"Comment": "Parallel Example.",
"StartAt": "FunWithMath",
"States": {
"FunWithMath": {
"Type": "Parallel",
"End": true,
"Branches": [
{
"StartAt": "Add",
"States": {
"Add": {
"Type": "Task",
"Resource": "docker://docker.io/agrare/sleep:latest",
"End": true
}
}
},
{
"StartAt": "Subtract",
"States": {
"Subtract": {
"Type": "Task",
"Resource": "docker://docker.io/agrare/sleep:latest",
"End": true
}
}
}
]
}
}
}
2 changes: 2 additions & 0 deletions lib/floe.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
require_relative "floe/workflow"
# mixins used by workflow components
require_relative "floe/workflow/error_matcher_mixin"
require_relative "floe/workflow/branch"
require_relative "floe/workflow/catcher"
require_relative "floe/workflow/choice_rule"
require_relative "floe/workflow/choice_rule/not"
Expand All @@ -29,6 +30,7 @@
require_relative "floe/workflow/retrier"
require_relative "floe/workflow/state"
# mixins used by states
require_relative "floe/workflow/states/child_workflow_mixin"
require_relative "floe/workflow/states/input_output_mixin"
require_relative "floe/workflow/states/non_terminal_mixin"
require_relative "floe/workflow/states/retry_catch_mixin"
Expand Down
8 changes: 8 additions & 0 deletions lib/floe/workflow/branch.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# frozen_string_literal: true

module Floe
class Workflow
class Branch < Floe::WorkflowBase
end
end
end
58 changes: 58 additions & 0 deletions lib/floe/workflow/states/child_workflow_mixin.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# frozen_string_literal: true

module Floe
class Workflow
module States
module ChildWorkflowMixin
def run_nonblock!(context)
start(context) unless context.state_started?

step_nonblock!(context) while running?(context)
return Errno::EAGAIN unless ready?(context)

finish(context) if ended?(context)
end

def finish(context)
if success?(context)
result = each_child_context(context).map(&:output)
context.output = process_output(context, result)
else
error = parse_error(context)
retry_state!(context, error) || catch_error!(context, error) || fail_workflow!(context, error)
end

super
end

def ready?(context)
!context.state_started? || each_child_workflow(context).any? { |wf, ctx| wf.step_nonblock_ready?(ctx) }
end

def wait_until(context)
each_child_workflow(context).filter_map { |wf, ctx| wf.wait_until(ctx) }.min
end

def waiting?(context)
each_child_workflow(context).any? { |wf, ctx| wf.waiting?(ctx) }
end

def running?(context)
!ended?(context)
end

def ended?(context)
each_child_context(context).all?(&:ended?)
end

def success?(context)
each_child_context(context).none?(&:failed?)
end

def each_child_context(context)
context.state[child_context_key].map { |ctx| Context.new(ctx) }
end
end
end
end
end
64 changes: 15 additions & 49 deletions lib/floe/workflow/states/map.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ module Floe
class Workflow
module States
class Map < Floe::Workflow::State
include ChildWorkflowMixin
include InputOutputMixin
include NonTerminalMixin
include RetryCatchMixin
Expand Down Expand Up @@ -53,53 +54,12 @@ def start(context)
context.state["ItemProcessorContext"] = input.map { |item| Context.new({"Execution" => {"Id" => context.execution["Id"]}}, :input => item.to_json).to_h }
end

def finish(context)
if success?(context)
result = each_item_processor(context).map(&:output)
context.output = process_output(context, result)
else
error = parse_error(context)
retry_state!(context, error) || catch_error!(context, error) || fail_workflow!(context, error)
end

super
end

def run_nonblock!(context)
start(context) unless context.state_started?

step_nonblock!(context) while running?(context)
return Errno::EAGAIN unless ready?(context)

finish(context) if ended?(context)
end

def end?
@end
end

def ready?(context)
!context.state_started? || each_item_processor(context).any? { |ctx| item_processor.step_nonblock_ready?(ctx) }
end

def wait_until(context)
each_item_processor(context).filter_map { |ctx| item_processor.wait_until(ctx) }.min
end

def waiting?(context)
each_item_processor(context).any? { |ctx| item_processor.waiting?(ctx) }
end

def running?(context)
!ended?(context)
end

def ended?(context)
each_item_processor(context).all?(&:ended?)
end

def success?(context)
contexts = each_item_processor(context)
contexts = each_child_context(context)
num_failed = contexts.count(&:failed?)
total = contexts.count

Expand All @@ -120,25 +80,27 @@ def success?(context)

private

def each_item_processor(context)
context.state["ItemProcessorContext"].map { |ctx| Context.new(ctx) }
end

def step_nonblock!(context)
each_item_processor(context).each do |ctx|
each_child_context(context).each do |ctx|
# If this iteration isn't already running and we can't start any more
next if !ctx.started? && concurrency_exceeded?(context)

item_processor.run_nonblock(ctx) if item_processor.step_nonblock_ready?(ctx)
end
end

def each_child_workflow(context)
each_child_context(context).map do |ctx|
[item_processor, Context.new(ctx)]
end
end

def concurrency_exceeded?(context)
max_concurrency && num_running(context) >= max_concurrency
end

def num_running(context)
each_item_processor(context).count(&:running?)
each_child_context(context).count(&:running?)
end

def parse_error(context)
Expand All @@ -148,10 +110,14 @@ def parse_error(context)
if tolerated_failure_count || tolerated_failure_percentage
{"Error" => "States.ExceedToleratedFailureThreshold"}
else
each_item_processor(context).detect(&:failed?)&.output || {"Error" => "States.Error"}
each_child_context(context).detect(&:failed?)&.output || {"Error" => "States.Error"}
end
end

def child_context_key
"ItemProcessorContext"
end

def validate_state!(workflow)
validate_state_next!(workflow)
invalid_field_error!("MaxConcurrency", @max_concurrency, "must be greater than 0") if @max_concurrency && @max_concurrency <= 0
Expand Down
67 changes: 65 additions & 2 deletions lib/floe/workflow/states/parallel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,72 @@ module Floe
class Workflow
module States
class Parallel < Floe::Workflow::State
def initialize(*)
include ChildWorkflowMixin
include InputOutputMixin
include NonTerminalMixin
include RetryCatchMixin

attr_reader :end, :next, :parameters, :input_path, :output_path, :result_path,
:result_selector, :retry, :catch, :branches

def initialize(workflow, name, payload)
super

missing_field_error!("Branches") if payload["Branches"].nil?

@next = payload["Next"]
@end = !!payload["End"]
@parameters = PayloadTemplate.new(payload["Parameters"]) if payload["Parameters"]
@input_path = Path.new(payload.fetch("InputPath", "$"))
@output_path = Path.new(payload.fetch("OutputPath", "$"))
@result_path = ReferencePath.new(payload.fetch("ResultPath", "$"))
@result_selector = PayloadTemplate.new(payload["ResultSelector"]) if payload["ResultSelector"]
@retry = payload["Retry"].to_a.map { |retrier| Retrier.new(retrier) }
@catch = payload["Catch"].to_a.map { |catcher| Catcher.new(catcher) }
@branches = payload["Branches"].map { |branch| Branch.new(branch) }

validate_state!(workflow)
end

def start(context)
super
raise NotImplementedError

input = process_input(context)

context.state["BranchContext"] = branches.map { |_branch| Context.new({"Execution" => {"Id" => context.execution["Id"]}}, :input => input.to_json).to_h }
end

def end?
@end
end

private

def step_nonblock!(context)
each_child_workflow(context).each do |wf, ctx|
wf.run_nonblock(ctx) if wf.step_nonblock_ready?(ctx)
end
end

def each_child_workflow(context)
branches.filter_map.with_index do |branch, i|
ctx = context.state.dig("BranchContext", i)
next if ctx.nil?

[branch, Context.new(ctx)]
end
end

def parse_error(context)
each_child_context(context).detect(&:failed?)&.output || {"Error" => "States.Error"}
end

def child_context_key
"BranchContext"
end

def validate_state!(workflow)
validate_state_next!(workflow)
end
end
end
Expand Down
25 changes: 25 additions & 0 deletions spec/shared/workflow_base_shared.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
shared_examples_for "WorkflowBase" do
it "raises an exception for missing States field" do
payload = {"StartAt" => "Missing"}
expect { described_class.new(payload, ["Map"]) }
.to raise_error(Floe::InvalidWorkflowError, "Map does not have required field \"States\"")
end

it "raises an exception for missing StartAt field" do
payload = {"States" => {"First" => {"Type" => "Succeed"}}}
expect { described_class.new(payload, ["Map"]) }
.to raise_error(Floe::InvalidWorkflowError, "Map does not have required field \"StartAt\"")
end

it "raises an exception if StartAt isn't in States" do
payload = {"StartAt" => "First", "States" => {"Second" => {"Type" => "Succeed"}}}
expect { described_class.new(payload, ["Map"]) }
.to raise_error(Floe::InvalidWorkflowError, "Map field \"StartAt\" value \"First\" is not found in \"States\"")
end

it "raises an exception if a Next state isn't in States" do
payload = {"StartAt" => "First", "States" => {"First" => {"Type" => "Pass", "Next" => "Last"}}}
expect { described_class.new(payload, ["Map"]) }
.to raise_error(Floe::InvalidWorkflowError, "States.First field \"Next\" value \"Last\" is not found in \"States\"")
end
end
1 change: 1 addition & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
end

Dir['./spec/support/**/*.rb'].sort.each { |f| require f }
Dir['./spec/shared/**/*.rb'].sort.each { |f| require f }

# See https://rubydoc.info/gems/rspec-core/RSpec/Core/Configuration
RSpec.configure do |config|
Expand Down
3 changes: 3 additions & 0 deletions spec/workflow/branch_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
RSpec.describe Floe::Workflow::Branch do
include_examples "WorkflowBase"
end
24 changes: 1 addition & 23 deletions spec/workflow/item_processor_spec.rb
Original file line number Diff line number Diff line change
@@ -1,25 +1,3 @@
RSpec.describe Floe::Workflow::ItemProcessor do
it "raises an exception for missing States field" do
payload = {"StartAt" => "Missing"}
expect { described_class.new(payload, ["Map"]) }
.to raise_error(Floe::InvalidWorkflowError, "Map does not have required field \"States\"")
end

it "raises an exception for missing StartAt field" do
payload = {"States" => {"First" => {"Type" => "Succeed"}}}
expect { described_class.new(payload, ["Map"]) }
.to raise_error(Floe::InvalidWorkflowError, "Map does not have required field \"StartAt\"")
end

it "raises an exception if StartAt isn't in States" do
payload = {"StartAt" => "First", "States" => {"Second" => {"Type" => "Succeed"}}}
expect { described_class.new(payload, ["Map"]) }
.to raise_error(Floe::InvalidWorkflowError, "Map field \"StartAt\" value \"First\" is not found in \"States\"")
end

it "raises an exception if a Next state isn't in States" do
payload = {"StartAt" => "First", "States" => {"First" => {"Type" => "Pass", "Next" => "Last"}}}
expect { described_class.new(payload, ["Map"]) }
.to raise_error(Floe::InvalidWorkflowError, "States.First field \"Next\" value \"Last\" is not found in \"States\"")
end
include_examples "WorkflowBase"
end
Loading