diff --git a/app/jobs/publish_feed_job.rb b/app/jobs/publish_feed_job.rb index d795a23c8..a9e788d7c 100644 --- a/app/jobs/publish_feed_job.rb +++ b/app/jobs/publish_feed_job.rb @@ -7,12 +7,10 @@ class PublishFeedJob < ApplicationJob attr_accessor :podcast, :episodes, :rss, :put_object, :copy_object - def perform(podcast) - # Since we don't current have a way to retry failed attempts, - # and until somthing akin to https://github.com/PRX/feeder.prx.org/issues/714 lands - # the RSS publishing is extracted from the publishing pipeline semantics. - # TODO: recombine the publishing invocations (RSS, Apple) once we have some retry guarantees - podcast.feeds.each { |feed| save_file(podcast, feed) } + def perform(podcast, pub_item) + # Consume the SQS message, return early, if we have racing threads trying to + # grab the current publishing pipeline. + return :mismatched if mismatched_publishing_item?(podcast, pub_item) PublishingPipelineState.start!(podcast) podcast.feeds.each { |feed| publish_feed(podcast, feed) } @@ -46,9 +44,9 @@ def publish_apple(feed) end def publish_rss(podcast, feed) - # res = save_file(podcast, feed) + res = save_file(podcast, feed) PublishingPipelineState.publish_rss!(podcast) - # res + res end def save_file(podcast, feed, options = {}) @@ -85,4 +83,20 @@ def client Aws::S3::Client.new end end + + def mismatched_publishing_item?(podcast, pub_item) + current_pub_item = PublishingQueueItem.current_unfinished_item(podcast) + + mismatch = pub_item != current_pub_item + + if mismatch + Rails.logger.error("Mismatched publishing_queue_item in PublishFeedJob", { + podcast_id: podcast.id, + incoming_publishing_item_id: pub_item.id, + current_publishing_item_id: current_pub_item.id + }) + end + + mismatch + end end diff --git a/app/models/podcast.rb b/app/models/podcast.rb index 212c9c712..dd538a9b8 100644 --- a/app/models/podcast.rb +++ b/app/models/podcast.rb @@ -58,6 +58,7 @@ def self.by_prx_series(series) def self.release!(options = {}) Rails.logger.tagged("Podcast.release!") do PublishingPipelineState.expire_pipelines! + PublishingPipelineState.retry_failed_pipelines! Episode.release_episodes!(options) end end diff --git a/app/models/publishing_pipeline_state.rb b/app/models/publishing_pipeline_state.rb index 363425a41..6a7d94b90 100644 --- a/app/models/publishing_pipeline_state.rb +++ b/app/models/publishing_pipeline_state.rb @@ -1,5 +1,6 @@ class PublishingPipelineState < ApplicationRecord TERMINAL_STATUSES = [:complete, :error, :expired].freeze + TERMINAL_FAILURE_STATUSES = [:error, :expired].freeze UNIQUE_STATUSES = TERMINAL_STATUSES + [:created, :started] # Handle the max timout for a publishing pipeline: Pub RSS job + Pub Apple job + a few extra minutes of flight @@ -15,6 +16,11 @@ class PublishingPipelineState < ApplicationRecord where(publishing_queue_item: pq_items) } + scope :latest_failed_pipelines, -> { + # Grab the latest attempted Publishing Item AND the latest failed Pub Item. + # If that is a non-null intersection, then we have a current/latest+failed pipeline. + where(publishing_queue_item_id: PublishingQueueItem.latest_attempted.latest_failed.select(:id)) + } scope :latest_by_queue_item, -> { where(id: PublishingPipelineState @@ -115,12 +121,11 @@ def self.attempt!(podcast, perform_later: true) Rails.logger.info("Creating publishing pipeline for podcast #{podcast.id}", {podcast_id: podcast.id, queue_item_id: latest_unfinished_item.id}) PublishingPipelineState.create!(podcast: podcast, publishing_queue_item: latest_unfinished_item, status: :created) + Rails.logger.info("Initiating PublishFeedJob for podcast #{podcast.id}", {podcast_id: podcast.id, queue_item_id: latest_unfinished_item.id, perform_later: perform_later}) if perform_later - Rails.logger.info("Scheduling PublishFeedJob for podcast #{podcast.id}", {podcast_id: podcast.id}) - PublishFeedJob.perform_later(podcast) + PublishFeedJob.perform_later(podcast, latest_unfinished_item) else - Rails.logger.info("Performing PublishFeedJob for podcast #{podcast.id}", {podcast_id: podcast.id}) - PublishFeedJob.perform_now(podcast) + PublishFeedJob.perform_now(podcast, latest_unfinished_item) end end end @@ -160,8 +165,17 @@ def self.expire!(podcast) def self.expire_pipelines! Podcast.where(id: expired_pipelines.select(:podcast_id)).each do |podcast| - Rails.logger.error("Cleaning up expired publishing pipeline for podcast #{podcast.id}", {podcast_id: podcast.id}) - expire!(podcast) + Rails.logger.tagged("PublishingPipeLineState.expire_pipelines!", "Podcast:#{podcast.id}") do + expire!(podcast) + end + end + end + + def self.retry_failed_pipelines! + Podcast.where(id: latest_failed_pipelines.select(:podcast_id).distinct).each do |podcast| + Rails.logger.tagged("PublishingPipeLineState.retry_failed_pipelines!", "Podcast:#{podcast.id}") do + start_pipeline!(podcast) + end end end diff --git a/app/models/publishing_queue_item.rb b/app/models/publishing_queue_item.rb index 2fe567288..78b2897b0 100644 --- a/app/models/publishing_queue_item.rb +++ b/app/models/publishing_queue_item.rb @@ -1,7 +1,21 @@ class PublishingQueueItem < ApplicationRecord scope :max_id_grouped, -> { group(:podcast_id).select("max(id) as id") } - scope :latest_attempted, -> { joins(:publishing_pipeline_states).order("publishing_pipeline_states.id desc") } - scope :latest_complete, -> { latest_attempted.where(publishing_pipeline_states: {status: PublishingPipelineState::TERMINAL_STATUSES}) } + + scope :latest_attempted, -> { + where(id: PublishingPipelineState.group(:podcast_id).select("max(publishing_queue_item_id)")) + } + scope :latest_complete, -> { + latest_by_status(PublishingPipelineState::TERMINAL_STATUSES) + } + scope :latest_failed, -> { + latest_by_status(PublishingPipelineState::TERMINAL_FAILURE_STATUSES) + } + + scope :latest_by_status, ->(status) { + where(id: PublishingPipelineState.group(:podcast_id) + .where(status: status) + .select("max(publishing_queue_item_id)")) + } has_many :publishing_pipeline_states has_many :latest_state, -> { latest_by_queue_item }, class_name: "PublishingPipelineState" @@ -51,6 +65,7 @@ def self.all_unfinished_items FROM publishing_pipeline_states WHERE podcast_id = pqi.podcast_id AND status in (#{PublishingPipelineState.terminal_status_codes.join(",")})), -1) AND podcast_id = pqi.podcast_id ) unfinished_podcast_items ON TRUE + ORDER BY unfinished_podcast_items.id DESC ) publishing_queue_items SQL diff --git a/test/jobs/publish_feed_job_test.rb b/test/jobs/publish_feed_job_test.rb index a96823d4e..8752e0a79 100644 --- a/test/jobs/publish_feed_job_test.rb +++ b/test/jobs/publish_feed_job_test.rb @@ -35,12 +35,26 @@ PublishingPipelineState.start_pipeline!(podcast) end - rss = job.perform(podcast) + pub_item = PublishingQueueItem.unfinished_items(podcast).first + + rss = job.perform(podcast, pub_item) refute_nil rss refute_nil job.put_object assert_nil job.copy_object end end + + it "will skip the publishing if the pub items are mismatched" do + job.stub(:client, stub_client) do + PublishFeedJob.stub(:perform_later, nil) do + PublishingPipelineState.start_pipeline!(podcast) + end + + pub_item = PublishingQueueItem.create(podcast: podcast) + assert job.mismatched_publishing_item?(podcast, pub_item) + assert_equal :mismatched, job.perform(podcast, pub_item) + end + end end describe "publishing to apple" do diff --git a/test/models/podcast_test.rb b/test/models/podcast_test.rb index 439285df8..b75d6251b 100644 --- a/test/models/podcast_test.rb +++ b/test/models/podcast_test.rb @@ -96,6 +96,15 @@ end obj.verify end + + it "retries latest publishing pipelines with errors" do + obj = MiniTest::Mock.new + obj.expect :call, nil + PublishingPipelineState.stub(:retry_failed_pipelines!, obj) do + Podcast.release! + end + obj.verify + end end end diff --git a/test/models/publishing_pipeline_state_test.rb b/test/models/publishing_pipeline_state_test.rb index e0485f597..9fcdc8bd7 100644 --- a/test/models/publishing_pipeline_state_test.rb +++ b/test/models/publishing_pipeline_state_test.rb @@ -166,6 +166,40 @@ end end + describe ".retry_failed_pipelines!" do + it "should retry failed pipelines" do + PublishingPipelineState.start_pipeline!(podcast) + assert_equal ["created"], PublishingPipelineState.latest_pipeline(podcast).map(&:status) + + # it fails + PublishingPipelineState.error!(podcast) + assert_equal ["created", "error"].sort, PublishingPipelineState.latest_pipeline(podcast).map(&:status).sort + + # it retries + PublishingPipelineState.retry_failed_pipelines! + assert_equal ["created"].sort, PublishingPipelineState.latest_pipeline(podcast).map(&:status).sort + end + + it "ignores previously errored pipelines back in the queue" do + # A failed pipeline + PublishingPipelineState.start_pipeline!(podcast) + PublishingPipelineState.error!(podcast) + assert_equal ["created", "error"].sort, PublishingPipelineState.latest_pipeline(podcast).map(&:status).sort + + # A new pipeline + PublishingPipelineState.start_pipeline!(podcast) + PublishingPipelineState.publish_rss!(podcast) + assert_equal ["created", "published_rss"], PublishingPipelineState.latest_pipeline(podcast).map(&:status) + publishing_item = PublishingPipelineState.latest_pipeline(podcast).map(&:publishing_queue_item_id).uniq + + # it does not retry the errored pipeline + PublishingPipelineState.retry_failed_pipelines! + assert_equal ["created", "published_rss"].sort, PublishingPipelineState.latest_pipeline(podcast).map(&:status).sort + # it's the same publishing item + assert_equal publishing_item, PublishingPipelineState.latest_pipeline(podcast).map(&:publishing_queue_item_id).uniq + end + end + describe "#publishing_queue_item" do it "has one publish queue item per attempt state" do pqi = PublishingQueueItem.create!(podcast: podcast) diff --git a/test/models/publishing_queue_item_test.rb b/test/models/publishing_queue_item_test.rb index 1d9bbb94b..d9059956c 100644 --- a/test/models/publishing_queue_item_test.rb +++ b/test/models/publishing_queue_item_test.rb @@ -35,11 +35,67 @@ pqi2 = PublishingPipelineState.create!(podcast: podcast, publishing_queue_item: PublishingQueueItem.create!(podcast: podcast)).publishing_queue_item pqi3 = PublishingPipelineState.create!(podcast: podcast, publishing_queue_item: PublishingQueueItem.create!(podcast: podcast)).publishing_queue_item - assert_equal [pqi3, pqi2, pqi1].sort, PublishingQueueItem.latest_attempted.sort + assert_equal [pqi1, pqi2, pqi3].sort, PublishingQueueItem.unfinished_items(podcast).sort + assert_equal [pqi3].sort, PublishingQueueItem.latest_attempted.sort assert_equal pqi3.created_at, PublishingQueueItem.latest_attempted.first.created_at end end + describe ".latest_failed" do + it "returns the most recent failed publishing attempt for each podcast" do + pqi1 = PublishingPipelineState.create!(podcast: podcast, publishing_queue_item: PublishingQueueItem.create!(podcast: podcast)).publishing_queue_item + PublishingPipelineState.error!(podcast) + + _pqi2 = PublishingPipelineState.create!(podcast: podcast, publishing_queue_item: PublishingQueueItem.create!(podcast: podcast)).publishing_queue_item + PublishingPipelineState.complete!(podcast) + + pqi3 = PublishingPipelineState.create!(podcast: podcast, publishing_queue_item: PublishingQueueItem.create!(podcast: podcast)).publishing_queue_item + + assert_equal [pqi3].sort, PublishingQueueItem.unfinished_items(podcast).sort + assert_equal [pqi1].sort, PublishingQueueItem.latest_failed.where(podcast: podcast) + + PublishingPipelineState.error!(podcast) + assert_equal [pqi3].sort, PublishingQueueItem.latest_failed.where(podcast: podcast) + end + + it "can be combined with other scopes to query the current failed item" do + # create a failed item, transition to `created` pipeline state and then transition to `error` + pqi1 = PublishingPipelineState.create!(podcast: podcast, publishing_queue_item: PublishingQueueItem.create!(podcast: podcast)).publishing_queue_item + PublishingPipelineState.error!(podcast) + + assert_equal [pqi1].sort, PublishingQueueItem.latest_failed.where(podcast: podcast) + assert_equal [pqi1].sort, PublishingQueueItem.latest_attempted.latest_failed.where(podcast: podcast) + + pqi2 = PublishingPipelineState.create!(podcast: podcast, publishing_queue_item: PublishingQueueItem.create!(podcast: podcast)).publishing_queue_item + + assert_equal [pqi1].sort, PublishingQueueItem.latest_failed.where(podcast: podcast) + assert_equal [].sort, PublishingQueueItem.latest_attempted.latest_failed.where(podcast: podcast) + + PublishingPipelineState.error!(podcast) + + assert_equal [pqi2].sort, PublishingQueueItem.latest_failed.where(podcast: podcast) + assert_equal [pqi2].sort, PublishingQueueItem.latest_attempted.latest_failed.where(podcast: podcast) + + _pqi3 = PublishingPipelineState.create!(podcast: podcast, publishing_queue_item: PublishingQueueItem.create!(podcast: podcast)).publishing_queue_item + + assert_equal [pqi2].sort, PublishingQueueItem.latest_failed.where(podcast: podcast) + assert_equal [].sort, PublishingQueueItem.latest_attempted.latest_failed.where(podcast: podcast) + end + + it "returns the most recent expired publishing attempt for each podcast" do + pqi1 = PublishingPipelineState.create!(podcast: podcast, publishing_queue_item: PublishingQueueItem.create!(podcast: podcast)).publishing_queue_item + PublishingPipelineState.expire!(podcast) + + assert_equal [pqi1].sort, PublishingQueueItem.latest_failed.where(podcast: podcast) + assert_equal [pqi1].sort, PublishingQueueItem.latest_attempted.latest_failed.where(podcast: podcast) + + _pqi2 = PublishingPipelineState.create!(podcast: podcast, publishing_queue_item: PublishingQueueItem.create!(podcast: podcast)).publishing_queue_item + + assert_equal [pqi1].sort, PublishingQueueItem.latest_failed.where(podcast: podcast) + assert_equal [].sort, PublishingQueueItem.latest_attempted.latest_failed.where(podcast: podcast) + end + end + describe ".all_unfinished_items" do let(:podcast2) { create(:podcast) } it "returns the publishing queue items across all podcasts" do