Skip to content

Commit

Permalink
Flatten error handling
Browse files Browse the repository at this point in the history
remove sync_blocks_rss from state flow
  • Loading branch information
svevang committed Oct 14, 2024
1 parent 46592c9 commit e349c74
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 92 deletions.
45 changes: 32 additions & 13 deletions app/jobs/publish_feed_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,51 @@ def perform(podcast, pub_item)
podcast.feeds.each { |feed| publish_rss(podcast, feed) }
PublishingPipelineState.complete!(podcast)
rescue Apple::AssetStateTimeoutError => e
PublishingPipelineState.error!(podcast)
Rails.logger.error("Asset processing timeout", {podcast_id: podcast.id, error: e.message, backtrace: e.backtrace.join("\n")})
handle_apple_timeout_error(podcast, e)
rescue => e
PublishingPipelineState.error!(podcast)
# TODO, we can remove this once we have a better way to track errors
Rails.logger.error("Error publishing podcast", {podcast_id: podcast.id, error: e.message, backtrace: e.backtrace.join("\n")})
handle_error(podcast, e)
ensure
PublishingPipelineState.settle_remaining!(podcast)
end

def publish_apple(podcast, feed)
return unless feed.publish_to_apple?
res = PublishAppleJob.do_perform(feed.apple_config)
PublishingPipelineState.publish_apple!(podcast)
res
rescue => e
NewRelic::Agent.notice_error(e)
res = PublishingPipelineState.error_apple!(podcast)
raise e if feed.apple_config.sync_blocks_rss?
res

begin
res = PublishAppleJob.do_perform(podcast.apple_config)
PublishingPipelineState.publish_apple!(podcast)
res
rescue => e
PublishingPipelineState.error_apple!(podcast)
NewRelic::Agent.notice_error(e)
raise e
end
end

def publish_rss(podcast, feed)
res = save_file(podcast, feed)
PublishingPipelineState.publish_rss!(podcast)
res
rescue => e
handle_rss_error(podcast, feed, e)
end

def handle_apple_timeout_error(podcast, error, raise_error: true)
PublishingPipelineState.error!(podcast)
Rails.logger.error("Asset processing timeout", {podcast_id: podcast.id, error: error.message, backtrace: error&.backtrace&.join("\n")})
raise error
end

def handle_rss_error(podcast, feed, error)
PublishingPipelineState.error!(podcast)
Rails.logger.error("Error publishing RSS", {podcast_id: podcast.id, feed_id: feed.id, error: error.message, backtrace: error&.backtrace&.join("\n")})
raise error
end

def handle_error(podcast, error)
PublishingPipelineState.error!(podcast)
Rails.logger.error("Error publishing podcast", {podcast_id: podcast.id, error: error.message, backtrace: error&.backtrace&.join("\n")})
raise error
end

def save_file(podcast, feed, options = {})
Expand Down
165 changes: 87 additions & 78 deletions test/jobs/publish_feed_job_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
let(:episode) { create(:episode, prx_uri: "/api/v1/stories/87683") }
let(:podcast) { episode.podcast }
let(:feed) { create(:feed, podcast: podcast, slug: "adfree") }
let(:private_feed) { create(:private_feed, podcast: podcast) }
let(:private_feed) { create(:apple_feed, podcast: podcast) }

let(:job) { PublishFeedJob.new }

Expand All @@ -25,71 +25,109 @@
end
end

it "can process publishing a podcast" do
it "transitions to the error state upon rss error" do
job.stub(:s3_client, stub_client) do
PublishFeedJob.stub(:perform_later, nil) do
PublishingPipelineState.start_pipeline!(podcast)
end

pub_item = PublishingQueueItem.unfinished_items(podcast).first

rss = job.perform(podcast, pub_item)
refute_nil rss
refute_nil job.put_object
assert_nil job.copy_object
PublishingPipelineState.start_pipeline!(podcast)
assert_raises(RuntimeError) { job.handle_rss_error(podcast, feed, RuntimeError.new("rss error")) }
assert_equal ["created", "error"], PublishingPipelineState.where(podcast: podcast).latest_pipelines.pluck(:status)
end
end

it "will skip the publishing if the pub items are mismatched" do
it "transitions to the error state upon general error" do
# Simulate some method blowing up
job.stub(:s3_client, stub_client) do
PublishFeedJob.stub(:perform_later, nil) do
PublishingPipelineState.start_pipeline!(podcast)
PublishingPipelineState.start_pipeline!(podcast)
PublishingPipelineState.stub(:publish_rss!, ->(*, **) { raise "some general" }) do
assert_raises(RuntimeError) { job.handle_rss_error(podcast, feed, RuntimeError.new("some general")) }
assert_equal ["created", "error"], PublishingPipelineState.where(podcast: podcast).latest_pipelines.pluck(:status)
end

pub_item = PublishingQueueItem.create(podcast: podcast)
assert job.mismatched_publishing_item?(podcast, pub_item)
assert_equal :mismatched, job.perform(podcast, pub_item)
end
end

it "will skip the publishing if the pub items are null" do
job.stub(:s3_client, stub_client) do
assert PublishingQueueItem.unfinished_items(podcast).empty?
describe "validations of the publishing pipeline" do
it "can process publishing a podcast" do
job.stub(:s3_client, stub_client) do
PublishFeedJob.stub(:perform_later, nil) do
PublishingPipelineState.start_pipeline!(podcast)
end

assert job.null_publishing_item?(podcast, nil)
assert_equal :null, job.perform(podcast, nil)
pub_item = PublishingQueueItem.unfinished_items(podcast).first

# There is no currently running publishing pipeline
pub_item = PublishingQueueItem.create(podcast: podcast)
assert job.null_publishing_item?(podcast, pub_item)
assert_equal :null, job.perform(podcast, pub_item)
rss = job.perform(podcast, pub_item)
refute_nil rss
refute_nil job.put_object
assert_nil job.copy_object
end
end

# `settle_remaining` is called at the end of the publishing job
# This means pub_item has been picked up and scheduled
assert_equal "created", pub_item.reload.last_pipeline_state
PublishingPipelineState.complete!(podcast)
assert_equal "complete", pub_item.reload.last_pipeline_state
it "will skip the publishing if the pub items are mismatched" do
job.stub(:s3_client, stub_client) do
PublishFeedJob.stub(:perform_later, nil) do
PublishingPipelineState.start_pipeline!(podcast)
end

# Start a pipeline: Create publishing item and transition that item's pipeline to :created
queue_item = PublishingPipelineState.start_pipeline!(podcast)
pub_item = PublishingQueueItem.create(podcast: podcast)
assert job.mismatched_publishing_item?(podcast, pub_item)
assert_equal :mismatched, job.perform(podcast, pub_item)
end
end

it "will skip the publishing if the pub items are null" do
job.stub(:s3_client, stub_client) do
assert PublishingQueueItem.unfinished_items(podcast).empty?

assert job.null_publishing_item?(podcast, nil)
assert_equal :null, job.perform(podcast, nil)

refute job.null_publishing_item?(podcast, queue_item)
res = job.perform(podcast, queue_item)
refute_equal :null, res
# There is no currently running publishing pipeline
pub_item = PublishingQueueItem.create(podcast: podcast)
assert job.null_publishing_item?(podcast, pub_item)

PublishAppleJob.stub(:do_perform, :publishing_apple!) do
assert_equal :null, job.perform(podcast, pub_item)
end

# `settle_remaining` is called at the end of the publishing job
# This means pub_item has been picked up and scheduled
assert_equal "created", pub_item.reload.last_pipeline_state
PublishingPipelineState.complete!(podcast)
assert_equal "complete", pub_item.reload.last_pipeline_state

# Start a pipeline: Create publishing item and transition that item's pipeline to :created
queue_item = PublishingPipelineState.start_pipeline!(podcast)

refute job.null_publishing_item?(podcast, queue_item)
PublishAppleJob.stub(:do_perform, :publishing_apple!) do
refute_equal :null, job.perform(podcast, queue_item)
end
end
end
end
end

describe "publishing to apple" do
it "does not schedule publishing to apple if the feed is non-apple" do
assert_nil job.publish_apple(podcast, private_feed)
let(:apple_feed) { private_feed }
let(:apple_config) { podcast.apple_config }

before do
assert private_feed.persisted?
assert podcast.reload.apple_config.present?
end

describe "when the apple config is present" do
let(:apple_feed) { create(:apple_feed, podcast: podcast) }
it "does not schedule publishing to apple if the apple config prevents it" do
podcast.apple_config.update!(publish_enabled: false)
assert_nil job.publish_apple(podcast, apple_feed)
end

it "does not schedule publishing to apple if the apple config is disabled" do
apple_config.update!(publish_enabled: false)
assert_nil job.publish_apple(podcast, apple_feed)
end

describe "when the apple config is present" do
it "does not schedule publishing to apple if the config is marked as not publishable" do
apple_feed.apple_config.update!(publish_enabled: false)
podcast.apple_config.update!(publish_enabled: false)

assert_nil job.publish_apple(podcast, apple_feed)
end

Expand All @@ -101,61 +139,32 @@
end
end

it "Performs the apple publishing job based regardless of sync_blocks_rss flag" do
assert apple_feed.apple_config.present?
assert apple_feed.apple_config.publish_enabled

# stub the two possible ways the job can be called
# perform_later is not used.
PublishAppleJob.stub(:perform_later, :perform_later) do
PublishAppleJob.stub(:do_perform, :do_perform) do
apple_feed.apple_config.update!(sync_blocks_rss: true)

assert_equal :do_perform, job.publish_apple(podcast, apple_feed)

apple_feed.apple_config.update!(sync_blocks_rss: false)
feed.reload
assert_equal :do_perform, job.publish_apple(podcast, apple_feed)
end
end
end

describe "when the apple publishing fails" do
before do
# Simulate a publishing attempt
PublishingQueueItem.create!(podcast: feed.podcast)
PublishingPipelineState.attempt!(feed.podcast)
PublishingPipelineState.start!(feed.podcast)
end

it "raises an error if the apple publishing fails" do
assert apple_feed.apple_config.present?
assert apple_feed.apple_config.publish_enabled

PublishAppleJob.stub(:do_perform, ->(*, **) { raise "some apple error" }) do
# it raises
assert_raises(RuntimeError) { job.publish_apple(podcast, apple_feed) }
assert_raises(RuntimeError) { PublishingPipelineState.attempt!(feed.podcast, perform_later: false) }

assert_equal ["created", "started", "error_apple"].sort, PublishingPipelineState.where(podcast: feed.podcast).latest_pipelines.pluck(:status).sort
assert_equal ["created", "started", "error", "error_apple"].sort, PublishingPipelineState.where(podcast: feed.podcast).latest_pipelines.pluck(:status).sort
end
end

it "does not raise an error if the apple publishing is not blocking RSS" do
it "raises an error if the apple publishing times out" do
assert apple_feed.apple_config.present?
assert apple_feed.apple_config.publish_enabled
apple_feed.apple_config.update!(sync_blocks_rss: false)

mock = Minitest::Mock.new
mock.expect(:call, nil, [RuntimeError])
PublishAppleJob.stub(:do_perform, ->(*, **) { raise Apple::AssetStateTimeoutError.new([]) }) do
assert_raises(Apple::AssetStateTimeoutError) { PublishingPipelineState.attempt!(feed.podcast, perform_later: false) }

PublishAppleJob.stub(:do_perform, ->(*, **) { raise "some apple error" }) do
NewRelic::Agent.stub(:notice_error, mock) do
job.publish_apple(podcast, apple_feed)
end
assert_equal ["created", "started", "error", "error_apple"].sort, PublishingPipelineState.where(podcast: feed.podcast).latest_pipelines.pluck(:status).sort
end
assert_equal ["created", "started", "error_apple"].sort, PublishingPipelineState.where(podcast: feed.podcast).latest_pipelines.pluck(:status).sort

mock.verify
end
end
end
Expand Down
3 changes: 2 additions & 1 deletion test/models/publishing_pipeline_state_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,8 @@
PublishFeedJob.stub_any_instance(:save_file, nil) do
PublishFeedJob.stub_any_instance(:publish_apple, ->(*args) { raise "error" }) do
pqi = PublishingQueueItem.ensure_queued!(podcast)
PublishingPipelineState.attempt!(podcast, perform_later: false)

assert_raises(RuntimeError) { PublishingPipelineState.attempt!(podcast, perform_later: false) }
end
end

Expand Down

0 comments on commit e349c74

Please sign in to comment.