Skip to content

Commit

Permalink
Add ChildWorkflowMixin
Browse files Browse the repository at this point in the history
  • Loading branch information
agrare committed Oct 25, 2024
1 parent 7d1de74 commit ecce6cf
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 101 deletions.
1 change: 1 addition & 0 deletions lib/floe.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
require_relative "floe/workflow/retrier"
require_relative "floe/workflow/state"
# mixins used by states
require_relative "floe/workflow/states/child_workflow_mixin"
require_relative "floe/workflow/states/input_output_mixin"
require_relative "floe/workflow/states/non_terminal_mixin"
require_relative "floe/workflow/states/retry_catch_mixin"
Expand Down
58 changes: 58 additions & 0 deletions lib/floe/workflow/states/child_workflow_mixin.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# frozen_string_literal: true

module Floe
class Workflow
module States
module ChildWorkflowMixin
def run_nonblock!(context)
start(context) unless context.state_started?

step_nonblock!(context) while running?(context)
return Errno::EAGAIN unless ready?(context)

finish(context) if ended?(context)
end

def finish(context)
if success?(context)
result = each_child_context(context).map(&:output)
context.output = process_output(context, result)
else
error = parse_error(context)
retry_state!(context, error) || catch_error!(context, error) || fail_workflow!(context, error)
end

super
end

def ready?(context)
!context.state_started? || each_child_workflow(context).any? { |wf, ctx| wf.step_nonblock_ready?(ctx) }
end

def wait_until(context)
each_child_workflow(context).filter_map { |wf, ctx| wf.wait_until(ctx) }.min
end

def waiting?(context)
each_child_workflow(context).any? { |wf, ctx| wf.waiting?(ctx) }
end

def running?(context)
!ended?(context)
end

def ended?(context)
each_child_context(context).all?(&:ended?)
end

def success?(context)
each_child_context(context).none?(&:failed?)
end

def each_child_context(context)
context.state[child_context_key].map { |ctx| Context.new(ctx) }
end
end
end
end
end
64 changes: 15 additions & 49 deletions lib/floe/workflow/states/map.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ module Floe
class Workflow
module States
class Map < Floe::Workflow::State
include ChildWorkflowMixin
include InputOutputMixin
include NonTerminalMixin
include RetryCatchMixin
Expand Down Expand Up @@ -53,53 +54,12 @@ def start(context)
context.state["ItemProcessorContext"] = input.map { |item| Context.new({"Execution" => {"Id" => context.execution["Id"]}}, :input => item.to_json).to_h }
end

def finish(context)
if success?(context)
result = each_item_processor(context).map(&:output)
context.output = process_output(context, result)
else
error = parse_error(context)
retry_state!(context, error) || catch_error!(context, error) || fail_workflow!(context, error)
end

super
end

def run_nonblock!(context)
start(context) unless context.state_started?

step_nonblock!(context) while running?(context)
return Errno::EAGAIN unless ready?(context)

finish(context) if ended?(context)
end

def end?
@end
end

def ready?(context)
!context.state_started? || each_item_processor(context).any? { |ctx| item_processor.step_nonblock_ready?(ctx) }
end

def wait_until(context)
each_item_processor(context).filter_map { |ctx| item_processor.wait_until(ctx) }.min
end

def waiting?(context)
each_item_processor(context).any? { |ctx| item_processor.waiting?(ctx) }
end

def running?(context)
!ended?(context)
end

def ended?(context)
each_item_processor(context).all?(&:ended?)
end

def success?(context)
contexts = each_item_processor(context)
contexts = each_child_context(context)
num_failed = contexts.count(&:failed?)
total = contexts.count

Expand All @@ -120,25 +80,27 @@ def success?(context)

private

def each_item_processor(context)
context.state["ItemProcessorContext"].map { |ctx| Context.new(ctx) }
end

def step_nonblock!(context)
each_item_processor(context).each do |ctx|
each_child_context(context).each do |ctx|
# If this iteration isn't already running and we can't start any more
next if !ctx.started? && concurrency_exceeded?(context)

item_processor.run_nonblock(ctx) if item_processor.step_nonblock_ready?(ctx)
end
end

def each_child_workflow(context)
each_child_context(context).map do |ctx|
[item_processor, Context.new(ctx)]
end
end

def concurrency_exceeded?(context)
max_concurrency && num_running(context) >= max_concurrency
end

def num_running(context)
each_item_processor(context).count(&:running?)
each_child_context(context).count(&:running?)
end

def parse_error(context)
Expand All @@ -148,10 +110,14 @@ def parse_error(context)
if tolerated_failure_count || tolerated_failure_percentage
{"Error" => "States.ExceedToleratedFailureThreshold"}
else
each_item_processor(context).detect(&:failed?)&.output || {"Error" => "States.Error"}
each_child_context(context).detect(&:failed?)&.output || {"Error" => "States.Error"}
end
end

def child_context_key
"ItemProcessorContext"
end

def validate_state!(workflow)
validate_state_next!(workflow)
invalid_field_error!("MaxConcurrency", @max_concurrency, "must be greater than 0") if @max_concurrency && @max_concurrency <= 0
Expand Down
60 changes: 8 additions & 52 deletions lib/floe/workflow/states/parallel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ module Floe
class Workflow
module States
class Parallel < Floe::Workflow::State
include ChildWorkflowMixin
include InputOutputMixin
include NonTerminalMixin
include RetryCatchMixin
Expand Down Expand Up @@ -38,64 +39,19 @@ def start(context)
context.state["BranchContext"] = branches.map { |_branch| Context.new({"Execution" => {"Id" => context.execution["Id"]}}, :input => input.to_json).to_h }
end

def finish(context)
if success?(context)
result = each_branch_context(context).map(&:output)
context.output = process_output(context, result)
else
error = parse_error(context)
retry_state!(context, error) || catch_error!(context, error) || fail_workflow!(context, error)
end

super
end

def run_nonblock!(context)
start(context) unless context.state_started?

step_nonblock!(context) while running?(context)
return Errno::EAGAIN unless ready?(context)

finish(context) if ended?(context)
end

def end?
@end
end

def ready?(context)
!context.state_started? || each_branch(context).any? { |branch, ctx| branch.step_nonblock_ready?(ctx) }
end

def wait_until(context)
each_branch(context).filter_map { |branch, ctx| branch.wait_until(ctx) }.min
end

def waiting?(context)
each_branch(context).any? { |branch, ctx| branch.waiting?(ctx) }
end

def running?(context)
!ended?(context)
end

def ended?(context)
each_branch_context(context).all?(&:ended?)
end

def success?(context)
each_branch_context(context).none?(&:failed?)
end

private

def step_nonblock!(context)
each_branch(context).each do |branch, ctx|
branch.run_nonblock(ctx) if branch.step_nonblock_ready?(ctx)
each_child_workflow(context).each do |wf, ctx|
wf.run_nonblock(ctx) if wf.step_nonblock_ready?(ctx)
end
end

def each_branch(context)
def each_child_workflow(context)
branches.filter_map.with_index do |branch, i|
ctx = context.state.dig("BranchContext", i)
next if ctx.nil?
Expand All @@ -104,12 +60,12 @@ def each_branch(context)
end
end

def each_branch_context(context)
context.state["BranchContext"].map { |ctx| Context.new(ctx) }
def parse_error(context)
each_child_context(context).detect(&:failed?)&.output || {"Error" => "States.Error"}
end

def parse_error(context)
each_branch_context(context).detect(&:failed?)&.output || {"Error" => "States.Error"}
def child_context_key
"BranchContext"
end

def validate_state!(workflow)
Expand Down

0 comments on commit ecce6cf

Please sign in to comment.