diff --git a/app/jobs/publish_feed_job.rb b/app/jobs/publish_feed_job.rb index d1cfcd93d..d795a23c8 100644 --- a/app/jobs/publish_feed_job.rb +++ b/app/jobs/publish_feed_job.rb @@ -8,11 +8,18 @@ class PublishFeedJob < ApplicationJob attr_accessor :podcast, :episodes, :rss, :put_object, :copy_object def perform(podcast) + # Since we don't current have a way to retry failed attempts, + # and until somthing akin to https://github.com/PRX/feeder.prx.org/issues/714 lands + # the RSS publishing is extracted from the publishing pipeline semantics. + # TODO: recombine the publishing invocations (RSS, Apple) once we have some retry guarantees + podcast.feeds.each { |feed| save_file(podcast, feed) } + PublishingPipelineState.start!(podcast) podcast.feeds.each { |feed| publish_feed(podcast, feed) } PublishingPipelineState.complete!(podcast) rescue => e PublishingPipelineState.error!(podcast) + Rails.logger.error("Error publishing podcast", {podcast_id: podcast.id, error: e.message, backtrace: e.backtrace.join("\n")}) raise e ensure PublishingPipelineState.settle_remaining!(podcast) @@ -39,9 +46,9 @@ def publish_apple(feed) end def publish_rss(podcast, feed) - res = save_file(podcast, feed) + # res = save_file(podcast, feed) PublishingPipelineState.publish_rss!(podcast) - res + # res end def save_file(podcast, feed, options = {}) diff --git a/app/models/apple/api_waiting.rb b/app/models/apple/api_waiting.rb index ba9778c05..fc34d1378 100644 --- a/app/models/apple/api_waiting.rb +++ b/app/models/apple/api_waiting.rb @@ -8,12 +8,15 @@ module ApiWaiting def self.wait_for(remaining_records) t_beg = Time.now.utc loop do - # TODO: handle timeout - break [false, remaining_records] if Time.now.utc - t_beg > self::API_WAIT_TIMEOUT + Rails.logger.info(".wait_for", {remaining_records: remaining_records, have_waited: Time.now.utc - t_beg}) + + # Return `timeout == true` if we've waited too long + break [true, remaining_records] if Time.now.utc - t_beg > self::API_WAIT_TIMEOUT remaining_records = yield(remaining_records) - break [true, []] if remaining_records.empty? + # All done, return `timeout == false` + break [false, []] if remaining_records.empty? sleep(self::API_WAIT_INTERVAL) end diff --git a/app/models/apple/episode.rb b/app/models/apple/episode.rb index d55ca38ea..9808d11c9 100644 --- a/app/models/apple/episode.rb +++ b/app/models/apple/episode.rb @@ -25,13 +25,13 @@ def self.wait_for_asset_state(api, eps) remaining_eps.each do |ep| Rails.logger.info("Waiting for audio asset state?", {episode_id: ep.feeder_id, - delivery_file_count: ep.podcast_delivery_files.count, - delivery_files_processed_errors: ep.podcast_delivery_files.all?(&:processed_errors?), - delivery_files_processed: ep.podcast_delivery_files.all?(&:processed?), - delivery_files_delivered: ep.podcast_delivery_files.all?(&:delivered?), - asset_state: ep.audio_asset_state, - has_podcast_audio: ep&.podcast_container&.has_podcast_audio?, - waiting_for_asset_state: ep.waiting_for_asset_state?}) + delivery_file_count: ep.podcast_delivery_files.count, + delivery_files_processed_errors: ep.podcast_delivery_files.all?(&:processed_errors?), + delivery_files_processed: ep.podcast_delivery_files.all?(&:processed?), + delivery_files_delivered: ep.podcast_delivery_files.all?(&:delivered?), + asset_state: ep.audio_asset_state, + has_podcast_audio: ep&.podcast_container&.has_podcast_audio?, + waiting_for_asset_state: ep.waiting_for_asset_state?}) end rem = diff --git a/app/models/apple/publisher.rb b/app/models/apple/publisher.rb index 49e1b7555..e3eb38296 100644 --- a/app/models/apple/publisher.rb +++ b/app/models/apple/publisher.rb @@ -113,7 +113,11 @@ def wait_for_upload_processing(eps) Rails.logger.tagged("##{__method__}") do pdfs = eps.map(&:podcast_delivery_files).flatten - Apple::PodcastDeliveryFile.wait_for_delivery_files(api, pdfs) + (waiting_timed_out, _) = Apple::PodcastDeliveryFile.wait_for_delivery(api, pdfs) + raise "Timed out waiting for delivery" if waiting_timed_out + + (waiting_timed_out, _) = Apple::PodcastDeliveryFile.wait_for_processing(api, pdfs) + raise "Timed out waiting for processing" if waiting_timed_out # Get the latest state of the podcast containers # which should include synced files @@ -124,7 +128,9 @@ def wait_for_upload_processing(eps) def wait_for_asset_state(eps) Rails.logger.tagged("##{__method__}") do eps = eps.filter { |e| e.podcast_delivery_files.any?(&:api_marked_as_uploaded?) } - Apple::Episode.wait_for_asset_state(api, eps) + + (waiting_timed_out, _) = Apple::Episode.wait_for_asset_state(api, eps) + raise "Timed out waiting for asset state" if waiting_timed_out end end diff --git a/app/models/podcast.rb b/app/models/podcast.rb index fd589f114..930eb6993 100644 --- a/app/models/podcast.rb +++ b/app/models/podcast.rb @@ -54,6 +54,7 @@ def self.by_prx_series(series) end def self.release!(options = {}) + Rails.logger.info("Podcast.release! called") PublishingPipelineState.expire_pipelines! Episode.release_episodes!(options) end diff --git a/app/models/publishing_pipeline_state.rb b/app/models/publishing_pipeline_state.rb index 4e64f3c77..ec1b236e4 100644 --- a/app/models/publishing_pipeline_state.rb +++ b/app/models/publishing_pipeline_state.rb @@ -88,29 +88,40 @@ def self.most_recent_state(podcast) end def self.start_pipeline!(podcast) - PublishingQueueItem.ensure_queued!(podcast) - attempt!(podcast) + Rails.logger.tagged("PublishingPipeLineState.start_pipeline!") do + PublishingQueueItem.ensure_queued!(podcast) + attempt!(podcast) + end end # None of the methods that grab locks are threadsafe if we assume that # creating published artifacts is non-idempotent (e.g. creating remote Apple # resources) def self.attempt!(podcast, perform_later: true) - podcast.with_publish_lock do - next if PublishingQueueItem.unfinished_items(podcast).empty? - next if PublishingQueueItem.current_unfinished_item(podcast).present? - - # Dedupe the work, grab the latest unfinished item in the queue - latest_unfinished_item = PublishingQueueItem.unfinished_items(podcast).first - - PublishingPipelineState.create!(podcast: podcast, publishing_queue_item: latest_unfinished_item, status: :created) - - if perform_later - Rails.logger.info("Scheduling PublishFeedJob for podcast #{podcast.id}", {podcast_id: podcast.id}) - PublishFeedJob.perform_later(podcast) - else - Rails.logger.info("Performing PublishFeedJob for podcast #{podcast.id}", {podcast_id: podcast.id}) - PublishFeedJob.perform_now(podcast) + Rails.logger.tagged("PublishingPipeLineState.attempt!") do + podcast.with_publish_lock do + if PublishingQueueItem.unfinished_items(podcast).empty? + Rails.logger.info("Unfinished items empty, nothing to do", podcast_id: podcast.id) + next + end + if (curr_running_item = PublishingQueueItem.current_unfinished_item(podcast)) + Rails.logger.info("Podcast's PublishingQueueItem already has running pipeline", podcast_id: podcast.id, running_queue_item: curr_running_item.id) + next + end + + # Dedupe the work, grab the latest unfinished item in the queue + latest_unfinished_item = PublishingQueueItem.unfinished_items(podcast).first + + Rails.logger.info("Creating publishing pipeline for podcast #{podcast.id}", {podcast_id: podcast.id, queue_item_id: latest_unfinished_item.id}) + PublishingPipelineState.create!(podcast: podcast, publishing_queue_item: latest_unfinished_item, status: :created) + + if perform_later + Rails.logger.info("Scheduling PublishFeedJob for podcast #{podcast.id}", {podcast_id: podcast.id}) + PublishFeedJob.perform_later(podcast) + else + Rails.logger.info("Performing PublishFeedJob for podcast #{podcast.id}", {podcast_id: podcast.id}) + PublishFeedJob.perform_now(podcast) + end end end end @@ -155,7 +166,9 @@ def self.expire_pipelines! end def self.settle_remaining!(podcast) - attempt!(podcast) + Rails.logger.tagged("PublishingPipeLineState.settle_remaining!") do + attempt!(podcast) + end end def self.complete?(podcast) @@ -165,7 +178,9 @@ def self.complete?(podcast) def self.state_transition(podcast, to_state) podcast.with_publish_lock do pqi = PublishingQueueItem.current_unfinished_item(podcast) + curr_running_item = PublishingQueueItem.current_unfinished_item(podcast) if pqi.present? + Rails.logger.info("Transitioning podcast #{podcast.id} publishing pipeline to state #{to_state}", {podcast_id: podcast.id, to_state: to_state, running_queue_item: curr_running_item&.id}) PublishingPipelineState.create!(podcast: podcast, publishing_queue_item: pqi, status: to_state) else Rails.logger.error("Podcast #{podcast.id} has no unfinished work, cannot transition state", {podcast_id: podcast.id, to_state: to_state}) diff --git a/app/models/publishing_queue_item.rb b/app/models/publishing_queue_item.rb index f98af237c..2fe567288 100644 --- a/app/models/publishing_queue_item.rb +++ b/app/models/publishing_queue_item.rb @@ -18,6 +18,7 @@ def self.delivery_status end def self.ensure_queued!(podcast) + Rails.logger.info("Creating new PublishingQueueItem", {podcast_id: podcast.id}) create!(podcast: podcast) end diff --git a/test/models/apple/api_waiting_test.rb b/test/models/apple/api_waiting_test.rb index e6ac4f8d9..dbac3278c 100644 --- a/test/models/apple/api_waiting_test.rb +++ b/test/models/apple/api_waiting_test.rb @@ -25,7 +25,7 @@ class TestTimeout step = 0 - (finished_waiting, remaining) = TestWait.wait_for(records) do |remaining| + (timed_out, remaining) = TestWait.wait_for(records) do |remaining| rem = remaining.dup assert_equal intervals[step], rem @@ -37,16 +37,16 @@ class TestTimeout rem end - assert_equal finished_waiting, true + assert_equal timed_out, false assert_equal remaining, [] end it "times out" do - (finished_waiting, remaining) = TestTimeout.wait_for(["a", "b", "c"]) do |remaining| + (timed_out, remaining) = TestTimeout.wait_for(["a", "b", "c"]) do |remaining| remaining end - assert_equal finished_waiting, false + assert_equal timed_out, true assert_equal remaining, ["a", "b", "c"] end end diff --git a/test/models/publishing_pipeline_state_test.rb b/test/models/publishing_pipeline_state_test.rb index 112668eab..de027e701 100644 --- a/test/models/publishing_pipeline_state_test.rb +++ b/test/models/publishing_pipeline_state_test.rb @@ -153,8 +153,8 @@ assert_equal [pa1, pa2].sort, PublishingPipelineState.expired_pipelines.sort PublishingPipelineState.expire_pipelines! - assert_equal ["created", "expired"], PublishingPipelineState.latest_pipeline(podcast).map(&:status) - assert_equal ["created", "expired"], PublishingPipelineState.latest_pipeline(podcast2).map(&:status) + assert_equal ["created", "expired"].sort, PublishingPipelineState.latest_pipeline(podcast).map(&:status).sort + assert_equal ["created", "expired"].sort, PublishingPipelineState.latest_pipeline(podcast2).map(&:status).sort # All pipelines are in a terminal state # There is nothing running: @@ -197,8 +197,10 @@ describe "error!" do it 'sets the status to "error"' do - PublishFeedJob.stub_any_instance(:publish_feed, ->(*args) { raise "error" }) do - assert_raises(RuntimeError) { PublishingPipelineState.attempt!(podcast, perform_later: false) } + PublishFeedJob.stub_any_instance(:save_file, nil) do + PublishFeedJob.stub_any_instance(:publish_feed, ->(*args) { raise "error" }) do + assert_raises(RuntimeError) { PublishingPipelineState.attempt!(podcast, perform_later: false) } + end end assert_equal ["created", "started", "error"].sort, PublishingPipelineState.where(podcast: podcast).map(&:status).sort @@ -207,8 +209,10 @@ describe "complete!" do it 'sets the status to "complete"' do - PublishFeedJob.stub_any_instance(:publish_feed, "pub!") do - PublishingPipelineState.attempt!(podcast, perform_later: false) + PublishFeedJob.stub_any_instance(:save_file, nil) do + PublishFeedJob.stub_any_instance(:publish_feed, "pub!") do + PublishingPipelineState.attempt!(podcast, perform_later: false) + end end assert_equal ["created", "started", "complete"].sort, PublishingPipelineState.where(podcast: podcast).map(&:status).sort @@ -218,8 +222,10 @@ # And another request comes along to publish: PublishingQueueItem.create!(podcast: podcast) - PublishFeedJob.stub_any_instance(:publish_feed, "pub!") do - PublishingPipelineState.attempt!(podcast, perform_later: false) + PublishFeedJob.stub_any_instance(:save_file, nil) do + PublishFeedJob.stub_any_instance(:publish_feed, "pub!") do + PublishingPipelineState.attempt!(podcast, perform_later: false) + end end PublishingQueueItem.create!(podcast: podcast)