Skip to content

Commit

Permalink
Fixes #37319 - Allow setting IDs for outputs
Browse files Browse the repository at this point in the history
and switch runner output interface to be keyword based.
  • Loading branch information
adamruzicka committed Jul 18, 2024
1 parent c2964a6 commit 08bba2c
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 15 deletions.
18 changes: 10 additions & 8 deletions lib/smart_proxy_dynflow/continuous_output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,20 @@ def humanize
raw_outputs.map { |output| output['output'] }.join("\n")
end

def add_exception(context, exception, timestamp = Time.now.getlocal)
add_output(context + ": #{exception.class} - #{exception.message}", 'debug', timestamp)
def add_exception(context, exception, timestamp: Time.now.getlocal, id: nil)
add_output(context + ": #{exception.class} - #{exception.message}", 'debug', timestamp: timestamp, id: id)
end

def add_output(*args)
add_raw_output(self.class.format_output(*args))
def add_output(...)
add_raw_output(self.class.format_output(...))
end

def self.format_output(message, type = 'debug', timestamp = Time.now.getlocal)
{ 'output_type' => type,
'output' => message,
'timestamp' => timestamp.to_f }
def self.format_output(message, type = 'debug', timestamp: Time.now.getlocal, id: nil)
base = { 'output_type' => type,
'output' => message,
'timestamp' => timestamp.to_f }
base['id'] = id if id
base
end
end
end
4 changes: 2 additions & 2 deletions lib/smart_proxy_dynflow/runner/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ def timeout_interval
# or nil for no timeout
end

def publish_data(data, type)
@continuous_output.add_output(data, type)
def publish_data(...)
@continuous_output.add_output(...)
end

def publish_exception(context, exception, fatal = true)
Expand Down
8 changes: 4 additions & 4 deletions lib/smart_proxy_dynflow/runner/parent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,16 @@ def host_action(identifier)
Dynflow::Action::Suspended.new OpenStruct.new(options)
end

def broadcast_data(data, type)
@outputs.each_value { |output| output.add_output(data, type) }
def broadcast_data(...)
@outputs.each_value { |output| output.add_output(...) }
end

def publish_data(_data, _type)
true
end

def publish_data_for(identifier, data, type)
@outputs[identifier].add_output(data, type)
def publish_data_for(identifier, data, type, **kwargs)
@outputs[identifier].add_output(data, type, **kwargs)
end

def dispatch_exception(context, exception)
Expand Down
87 changes: 86 additions & 1 deletion test/runner_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,39 @@ class RunnerTest < Minitest::Spec
it 'returns a hash with outputs' do
message = 'a message'
type = 'stdout'
runner.publish_data(message, type)
now = Time.now
runner.publish_data(message, type, timestamp: now)
updates = runner.generate_updates
_(updates.keys).must_equal [suspended_action]
update = updates.values.first
_(update.exit_status).must_be :nil?
_(update.continuous_output.raw_outputs.count).must_equal 1
end

it 'accepts timestamp' do
now = Time.now
runner.publish_data('message', 'stdout')
update = runner.generate_updates.values.first
_(update.continuous_output.raw_outputs.first['timestamp']).must_be_instance_of Float
_(update.continuous_output.raw_outputs.first['timestamp']).wont_equal now.to_f

runner.publish_data('message', 'stdout', timestamp: now)
update = runner.generate_updates.values.first
_(update.continuous_output.raw_outputs.first['timestamp']).must_be_instance_of Float
_(update.continuous_output.raw_outputs.first['timestamp']).must_equal now.to_f
end

it 'accepts id' do
runner.publish_data('message', 'stdout')
update = runner.generate_updates.values.first
_(update.continuous_output.raw_outputs.first.key?('id')).must_equal false

id = 5
runner.publish_data('message', 'stdout', id: id)
update = runner.generate_updates.values.first
_(update.continuous_output.raw_outputs.first['id']).must_equal id
end

it 'works in compatibility mode' do
runner = Base.new
message = 'a message'
Expand Down Expand Up @@ -80,13 +105,73 @@ class RunnerTest < Minitest::Spec
runner.publish_data_for('foo', 'message', 'stdout')
_(runner.generate_updates.keys.count).must_equal 1
end

it 'accepts timestamp' do
now = Time.now
runner.publish_data_for('foo', 'message', 'stdout')
update = runner.generate_updates.values.first
_(update.continuous_output.raw_outputs.first['timestamp']).must_be_instance_of Float
_(update.continuous_output.raw_outputs.first['timestamp']).wont_equal now.to_f

runner.publish_data_for('foo', 'message', 'stdout', timestamp: now)
update = runner.generate_updates.values.first
_(update.continuous_output.raw_outputs.first['timestamp']).must_be_instance_of Float
_(update.continuous_output.raw_outputs.first['timestamp']).must_equal now.to_f
end

it 'accepts id' do
runner.publish_data_for('foo', 'message', 'stdout')
update = runner.generate_updates.values.first
_(update.continuous_output.raw_outputs.first.key?('id')).must_equal false

id = 5
runner.publish_data_for('foo', 'message', 'stdout', id: id)
update = runner.generate_updates.values.first
_(update.continuous_output.raw_outputs.first['id']).must_equal id
end
end

describe '#broadcast_data' do
it 'publishes data for all hosts' do
runner.broadcast_data('message', 'stdout')
_(runner.generate_updates.keys.count).must_equal 2
end

it 'accepts timestamp' do
now = Time.now
runner.broadcast_data('message', 'stdout')
updates = runner.generate_updates.values
_(updates.count).must_equal 2
updates.each do |update|
_(update.continuous_output.raw_outputs.first['timestamp']).must_be_instance_of Float
_(update.continuous_output.raw_outputs.first['timestamp']).wont_equal now.to_f
end

runner.broadcast_data('message', 'stdout', timestamp: now)
updates = runner.generate_updates.values
_(updates.count).must_equal 2
updates.each do |update|
_(update.continuous_output.raw_outputs.first['timestamp']).must_be_instance_of Float
_(update.continuous_output.raw_outputs.first['timestamp']).must_equal now.to_f
end
end

it 'accepts id' do
runner.broadcast_data('message', 'stdout')
updates = runner.generate_updates.values
_(updates.count).must_equal 2
updates.each do |update|
_(update.continuous_output.raw_outputs.first.key?('id')).must_equal false
end

id = 5
runner.broadcast_data('message', 'stdout', id: id)
updates = runner.generate_updates.values
_(updates.count).must_equal 2
updates.each do |update|
_(update.continuous_output.raw_outputs.first['id']).must_equal id
end
end
end

describe '#publish_exception' do
Expand Down

0 comments on commit 08bba2c

Please sign in to comment.