From 2794dcfd7c8ca885982e71e6d1c6884f80798d60 Mon Sep 17 00:00:00 2001 From: Adam Ruzicka Date: Fri, 5 Apr 2024 12:58:53 +0200 Subject: [PATCH] Fixes #37319 - Allow setting IDs for outputs and switch runner output interface to be keyword based. --- lib/smart_proxy_dynflow/continuous_output.rb | 18 ++-- lib/smart_proxy_dynflow/runner/base.rb | 4 +- lib/smart_proxy_dynflow/runner/parent.rb | 8 +- test/runner_test.rb | 87 +++++++++++++++++++- 4 files changed, 102 insertions(+), 15 deletions(-) diff --git a/lib/smart_proxy_dynflow/continuous_output.rb b/lib/smart_proxy_dynflow/continuous_output.rb index 5bfa511..cfe5c0d 100644 --- a/lib/smart_proxy_dynflow/continuous_output.rb +++ b/lib/smart_proxy_dynflow/continuous_output.rb @@ -37,18 +37,20 @@ def humanize raw_outputs.map { |output| output['output'] }.join("\n") end - def add_exception(context, exception, timestamp = Time.now.getlocal) - add_output(context + ": #{exception.class} - #{exception.message}", 'debug', timestamp) + def add_exception(context, exception, timestamp: Time.now.getlocal, id: nil) + add_output(context + ": #{exception.class} - #{exception.message}", 'debug', timestamp: timestamp, id: id) end - def add_output(*args) - add_raw_output(self.class.format_output(*args)) + def add_output(*args, **kwargs) + add_raw_output(self.class.format_output(*args, **kwargs)) end - def self.format_output(message, type = 'debug', timestamp = Time.now.getlocal) - { 'output_type' => type, - 'output' => message, - 'timestamp' => timestamp.to_f } + def self.format_output(message, type = 'debug', timestamp: Time.now.getlocal, id: nil) + base = { 'output_type' => type, + 'output' => message, + 'timestamp' => timestamp.to_f } + base['id'] = id if id + base end end end diff --git a/lib/smart_proxy_dynflow/runner/base.rb b/lib/smart_proxy_dynflow/runner/base.rb index ac5341a..372e9e7 100644 --- a/lib/smart_proxy_dynflow/runner/base.rb +++ b/lib/smart_proxy_dynflow/runner/base.rb @@ -58,8 +58,8 @@ def timeout_interval # or nil for no timeout end - def publish_data(data, type) - @continuous_output.add_output(data, type) + def publish_data(...) + @continuous_output.add_output(...) end def publish_exception(context, exception, fatal = true) diff --git a/lib/smart_proxy_dynflow/runner/parent.rb b/lib/smart_proxy_dynflow/runner/parent.rb index fe81dd7..61955f9 100644 --- a/lib/smart_proxy_dynflow/runner/parent.rb +++ b/lib/smart_proxy_dynflow/runner/parent.rb @@ -35,16 +35,16 @@ def host_action(identifier) Dynflow::Action::Suspended.new OpenStruct.new(options) end - def broadcast_data(data, type) - @outputs.each_value { |output| output.add_output(data, type) } + def broadcast_data(...) + @outputs.each_value { |output| output.add_output(...) } end def publish_data(_data, _type) true end - def publish_data_for(identifier, data, type) - @outputs[identifier].add_output(data, type) + def publish_data_for(identifier, data, type, **kwargs) + @outputs[identifier].add_output(data, type, **kwargs) end def dispatch_exception(context, exception) diff --git a/test/runner_test.rb b/test/runner_test.rb index 3961021..7fc034d 100644 --- a/test/runner_test.rb +++ b/test/runner_test.rb @@ -19,7 +19,8 @@ class RunnerTest < Minitest::Spec it 'returns a hash with outputs' do message = 'a message' type = 'stdout' - runner.publish_data(message, type) + now = Time.now + runner.publish_data(message, type, timestamp: now) updates = runner.generate_updates _(updates.keys).must_equal [suspended_action] update = updates.values.first @@ -27,6 +28,30 @@ class RunnerTest < Minitest::Spec _(update.continuous_output.raw_outputs.count).must_equal 1 end + it 'accepts timestamp' do + now = Time.now + runner.publish_data('message', 'stdout') + update = runner.generate_updates.values.first + _(update.continuous_output.raw_outputs.first['timestamp']).must_be_instance_of Float + _(update.continuous_output.raw_outputs.first['timestamp']).wont_equal now.to_f + + runner.publish_data('message', 'stdout', timestamp: now) + update = runner.generate_updates.values.first + _(update.continuous_output.raw_outputs.first['timestamp']).must_be_instance_of Float + _(update.continuous_output.raw_outputs.first['timestamp']).must_equal now.to_f + end + + it 'accepts id' do + runner.publish_data('message', 'stdout') + update = runner.generate_updates.values.first + _(update.continuous_output.raw_outputs.first.key?('id')).must_equal false + + id = 5 + runner.publish_data('message', 'stdout', id: id) + update = runner.generate_updates.values.first + _(update.continuous_output.raw_outputs.first['id']).must_equal id + end + it 'works in compatibility mode' do runner = Base.new message = 'a message' @@ -80,6 +105,30 @@ class RunnerTest < Minitest::Spec runner.publish_data_for('foo', 'message', 'stdout') _(runner.generate_updates.keys.count).must_equal 1 end + + it 'accepts timestamp' do + now = Time.now + runner.publish_data_for('foo', 'message', 'stdout') + update = runner.generate_updates.values.first + _(update.continuous_output.raw_outputs.first['timestamp']).must_be_instance_of Float + _(update.continuous_output.raw_outputs.first['timestamp']).wont_equal now.to_f + + runner.publish_data_for('foo', 'message', 'stdout', timestamp: now) + update = runner.generate_updates.values.first + _(update.continuous_output.raw_outputs.first['timestamp']).must_be_instance_of Float + _(update.continuous_output.raw_outputs.first['timestamp']).must_equal now.to_f + end + + it 'accepts id' do + runner.publish_data_for('foo', 'message', 'stdout') + update = runner.generate_updates.values.first + _(update.continuous_output.raw_outputs.first.key?('id')).must_equal false + + id = 5 + runner.publish_data_for('foo', 'message', 'stdout', id: id) + update = runner.generate_updates.values.first + _(update.continuous_output.raw_outputs.first['id']).must_equal id + end end describe '#broadcast_data' do @@ -87,6 +136,42 @@ class RunnerTest < Minitest::Spec runner.broadcast_data('message', 'stdout') _(runner.generate_updates.keys.count).must_equal 2 end + + it 'accepts timestamp' do + now = Time.now + runner.broadcast_data('message', 'stdout') + updates = runner.generate_updates.values + _(updates.count).must_equal 2 + updates.each do |update| + _(update.continuous_output.raw_outputs.first['timestamp']).must_be_instance_of Float + _(update.continuous_output.raw_outputs.first['timestamp']).wont_equal now.to_f + end + + runner.broadcast_data('message', 'stdout', timestamp: now) + updates = runner.generate_updates.values + _(updates.count).must_equal 2 + updates.each do |update| + _(update.continuous_output.raw_outputs.first['timestamp']).must_be_instance_of Float + _(update.continuous_output.raw_outputs.first['timestamp']).must_equal now.to_f + end + end + + it 'accepts id' do + runner.broadcast_data('message', 'stdout') + updates = runner.generate_updates.values + _(updates.count).must_equal 2 + updates.each do |update| + _(update.continuous_output.raw_outputs.first.key?('id')).must_equal false + end + + id = 5 + runner.broadcast_data('message', 'stdout', id: id) + updates = runner.generate_updates.values + _(updates.count).must_equal 2 + updates.each do |update| + _(update.continuous_output.raw_outputs.first['id']).must_equal id + end + end end describe '#publish_exception' do