Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry publishing attempts #750

Merged
merged 13 commits into from
Aug 10, 2023
30 changes: 22 additions & 8 deletions app/jobs/publish_feed_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,10 @@ class PublishFeedJob < ApplicationJob

attr_accessor :podcast, :episodes, :rss, :put_object, :copy_object

def perform(podcast)
# Since we don't current have a way to retry failed attempts,
# and until somthing akin to https://github.com/PRX/feeder.prx.org/issues/714 lands
# the RSS publishing is extracted from the publishing pipeline semantics.
# TODO: recombine the publishing invocations (RSS, Apple) once we have some retry guarantees
podcast.feeds.each { |feed| save_file(podcast, feed) }
Copy link
Member Author

@svevang svevang Aug 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returns the RSS publishing to it's previous place in the partitioned concurrency semantics.

So now:

  • Apple publishing and delivery will block RSS, for those podcasts with that configured.
  • Only one thread will be publishing RSS at a time (partitioned by podcast).

def perform(podcast, pub_item)
# Consume the SQS message, return early, if we have racing threads trying to
# grab the current publishing pipeline.
return :mismatched if mismatched_publishing_item?(podcast, pub_item)

PublishingPipelineState.start!(podcast)
podcast.feeds.each { |feed| publish_feed(podcast, feed) }
Expand Down Expand Up @@ -46,9 +44,9 @@ def publish_apple(feed)
end

def publish_rss(podcast, feed)
# res = save_file(podcast, feed)
res = save_file(podcast, feed)
PublishingPipelineState.publish_rss!(podcast)
# res
res
end

def save_file(podcast, feed, options = {})
Expand Down Expand Up @@ -85,4 +83,20 @@ def client
Aws::S3::Client.new
end
end

def mismatched_publishing_item?(podcast, pub_item)
current_pub_item = PublishingQueueItem.current_unfinished_item(podcast)

mismatch = pub_item != current_pub_item

if mismatch
Rails.logger.error("Mismatched publishing_queue_item in PublishFeedJob", {
podcast_id: podcast.id,
incoming_publishing_item_id: pub_item.id,
current_publishing_item_id: current_pub_item.id
})
end

mismatch
end
end
1 change: 1 addition & 0 deletions app/models/podcast.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def self.by_prx_series(series)
def self.release!(options = {})
Rails.logger.tagged("Podcast.release!") do
PublishingPipelineState.expire_pipelines!
PublishingPipelineState.retry_failed_pipelines!
Episode.release_episodes!(options)
end
end
Expand Down
26 changes: 20 additions & 6 deletions app/models/publishing_pipeline_state.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
class PublishingPipelineState < ApplicationRecord
TERMINAL_STATUSES = [:complete, :error, :expired].freeze
TERMINAL_FAILURE_STATUSES = [:error, :expired].freeze
UNIQUE_STATUSES = TERMINAL_STATUSES + [:created, :started]

# Handle the max timout for a publishing pipeline: Pub RSS job + Pub Apple job + a few extra minutes of flight
Expand All @@ -15,6 +16,11 @@ class PublishingPipelineState < ApplicationRecord

where(publishing_queue_item: pq_items)
}
scope :latest_failed_pipelines, -> {
# Grab the latest attempted Publishing Item AND the latest failed Pub Item.
# If that is a non-null intersection, then we have a current/latest+failed pipeline.
where(publishing_queue_item_id: PublishingQueueItem.latest_attempted.latest_failed.select(:id))
}

scope :latest_by_queue_item, -> {
where(id: PublishingPipelineState
Expand Down Expand Up @@ -115,12 +121,11 @@ def self.attempt!(podcast, perform_later: true)
Rails.logger.info("Creating publishing pipeline for podcast #{podcast.id}", {podcast_id: podcast.id, queue_item_id: latest_unfinished_item.id})
PublishingPipelineState.create!(podcast: podcast, publishing_queue_item: latest_unfinished_item, status: :created)

Rails.logger.info("Initiating PublishFeedJob for podcast #{podcast.id}", {podcast_id: podcast.id, queue_item_id: latest_unfinished_item.id, perform_later: perform_later})
if perform_later
Rails.logger.info("Scheduling PublishFeedJob for podcast #{podcast.id}", {podcast_id: podcast.id})
PublishFeedJob.perform_later(podcast)
PublishFeedJob.perform_later(podcast, latest_unfinished_item)
else
Rails.logger.info("Performing PublishFeedJob for podcast #{podcast.id}", {podcast_id: podcast.id})
PublishFeedJob.perform_now(podcast)
PublishFeedJob.perform_now(podcast, latest_unfinished_item)
end
end
end
Expand Down Expand Up @@ -160,8 +165,17 @@ def self.expire!(podcast)

def self.expire_pipelines!
Podcast.where(id: expired_pipelines.select(:podcast_id)).each do |podcast|
Rails.logger.error("Cleaning up expired publishing pipeline for podcast #{podcast.id}", {podcast_id: podcast.id})
expire!(podcast)
Rails.logger.tagged("PublishingPipeLineState.expire_pipelines!", "Podcast:#{podcast.id}") do
expire!(podcast)
end
end
end

def self.retry_failed_pipelines!
Podcast.where(id: latest_failed_pipelines.select(:podcast_id).distinct).each do |podcast|
Rails.logger.tagged("PublishingPipeLineState.retry_failed_pipelines!", "Podcast:#{podcast.id}") do
start_pipeline!(podcast)
end
end
end

Expand Down
20 changes: 18 additions & 2 deletions app/models/publishing_queue_item.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,22 @@
class PublishingQueueItem < ApplicationRecord
scope :max_id_grouped, -> { group(:podcast_id).select("max(id) as id") }
scope :latest_attempted, -> { joins(:publishing_pipeline_states).order("publishing_pipeline_states.id desc") }
scope :latest_complete, -> { latest_attempted.where(publishing_pipeline_states: {status: PublishingPipelineState::TERMINAL_STATUSES}) }

scope :latest_attempted, -> {
where(id: PublishingPipelineState.group(:podcast_id).select("max(publishing_queue_item_id)"))
.order(id: :desc)
}
scope :latest_complete, -> {
latest_by_status(PublishingPipelineState::TERMINAL_STATUSES)
}
scope :latest_failed, -> {
latest_by_status(PublishingPipelineState::TERMINAL_FAILURE_STATUSES)
}

scope :latest_by_status, ->(status) {
where(id: PublishingPipelineState.group(:podcast_id)
.where(status: status)
.select("max(publishing_queue_item_id)")).order(id: :desc)
}

has_many :publishing_pipeline_states
has_many :latest_state, -> { latest_by_queue_item }, class_name: "PublishingPipelineState"
Expand Down Expand Up @@ -51,6 +66,7 @@ def self.all_unfinished_items
FROM publishing_pipeline_states WHERE podcast_id = pqi.podcast_id AND status in (#{PublishingPipelineState.terminal_status_codes.join(",")})), -1)
AND podcast_id = pqi.podcast_id
) unfinished_podcast_items ON TRUE
ORDER BY unfinished_podcast_items.id DESC
) publishing_queue_items
SQL

Expand Down
16 changes: 15 additions & 1 deletion test/jobs/publish_feed_job_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,26 @@
PublishingPipelineState.start_pipeline!(podcast)
end

rss = job.perform(podcast)
pub_item = PublishingQueueItem.unfinished_items(podcast).first

rss = job.perform(podcast, pub_item)
refute_nil rss
refute_nil job.put_object
assert_nil job.copy_object
end
end

it "will skip the publishing if the pub items are mismatched" do
job.stub(:client, stub_client) do
PublishFeedJob.stub(:perform_later, nil) do
PublishingPipelineState.start_pipeline!(podcast)
end

pub_item = PublishingQueueItem.create(podcast: podcast)
assert job.mismatched_publishing_item?(podcast, pub_item)
assert_equal :mismatched, job.perform(podcast, pub_item)
end
end
end

describe "publishing to apple" do
Expand Down
9 changes: 9 additions & 0 deletions test/models/podcast_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,15 @@
end
obj.verify
end

it "retries latest publishing pipelines with errors" do
obj = MiniTest::Mock.new
obj.expect :call, nil
PublishingPipelineState.stub(:retry_failed_pipelines!, obj) do
Podcast.release!
end
obj.verify
end
end
end

Expand Down
34 changes: 34 additions & 0 deletions test/models/publishing_pipeline_state_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,40 @@
end
end

describe ".retry_failed_pipelines!" do
it "should retry failed pipelines" do
PublishingPipelineState.start_pipeline!(podcast)
assert_equal ["created"], PublishingPipelineState.latest_pipeline(podcast).map(&:status)

# it fails
PublishingPipelineState.error!(podcast)
assert_equal ["created", "error"].sort, PublishingPipelineState.latest_pipeline(podcast).map(&:status).sort

# it retries
PublishingPipelineState.retry_failed_pipelines!
assert_equal ["created"].sort, PublishingPipelineState.latest_pipeline(podcast).map(&:status).sort
end

it "ignores previously errored pipelines back in the queue" do
# A failed pipeline
PublishingPipelineState.start_pipeline!(podcast)
PublishingPipelineState.error!(podcast)
assert_equal ["created", "error"].sort, PublishingPipelineState.latest_pipeline(podcast).map(&:status).sort

# A new pipeline
PublishingPipelineState.start_pipeline!(podcast)
PublishingPipelineState.publish_rss!(podcast)
assert_equal ["created", "published_rss"], PublishingPipelineState.latest_pipeline(podcast).map(&:status)
publishing_item = PublishingPipelineState.latest_pipeline(podcast).map(&:publishing_queue_item_id).uniq

# it does not retry the errored pipeline
PublishingPipelineState.retry_failed_pipelines!
assert_equal ["created", "published_rss"].sort, PublishingPipelineState.latest_pipeline(podcast).map(&:status).sort
# it's the same publishing item
assert_equal publishing_item, PublishingPipelineState.latest_pipeline(podcast).map(&:publishing_queue_item_id).uniq
end
end

describe "#publishing_queue_item" do
it "has one publish queue item per attempt state" do
pqi = PublishingQueueItem.create!(podcast: podcast)
Expand Down
58 changes: 57 additions & 1 deletion test/models/publishing_queue_item_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,67 @@
pqi2 = PublishingPipelineState.create!(podcast: podcast, publishing_queue_item: PublishingQueueItem.create!(podcast: podcast)).publishing_queue_item
pqi3 = PublishingPipelineState.create!(podcast: podcast, publishing_queue_item: PublishingQueueItem.create!(podcast: podcast)).publishing_queue_item

assert_equal [pqi3, pqi2, pqi1].sort, PublishingQueueItem.latest_attempted.sort
assert_equal [pqi1, pqi2, pqi3].sort, PublishingQueueItem.unfinished_items(podcast).sort
assert_equal [pqi3].sort, PublishingQueueItem.latest_attempted.sort
assert_equal pqi3.created_at, PublishingQueueItem.latest_attempted.first.created_at
end
end

describe ".latest_failed" do
it "returns the most recent failed publishing attempt for each podcast" do
pqi1 = PublishingPipelineState.create!(podcast: podcast, publishing_queue_item: PublishingQueueItem.create!(podcast: podcast)).publishing_queue_item
PublishingPipelineState.error!(podcast)

_pqi2 = PublishingPipelineState.create!(podcast: podcast, publishing_queue_item: PublishingQueueItem.create!(podcast: podcast)).publishing_queue_item
PublishingPipelineState.complete!(podcast)

pqi3 = PublishingPipelineState.create!(podcast: podcast, publishing_queue_item: PublishingQueueItem.create!(podcast: podcast)).publishing_queue_item

assert_equal [pqi3].sort, PublishingQueueItem.unfinished_items(podcast).sort
assert_equal [pqi1].sort, PublishingQueueItem.latest_failed.where(podcast: podcast)

PublishingPipelineState.error!(podcast)
assert_equal [pqi3].sort, PublishingQueueItem.latest_failed.where(podcast: podcast)
end

it "can be combined with other scopes to query the current failed item" do
# create a failed item, transition to `created` pipeline state and then transition to `error`
pqi1 = PublishingPipelineState.create!(podcast: podcast, publishing_queue_item: PublishingQueueItem.create!(podcast: podcast)).publishing_queue_item
PublishingPipelineState.error!(podcast)

assert_equal [pqi1].sort, PublishingQueueItem.latest_failed.where(podcast: podcast)
assert_equal [pqi1].sort, PublishingQueueItem.latest_attempted.latest_failed.where(podcast: podcast)

pqi2 = PublishingPipelineState.create!(podcast: podcast, publishing_queue_item: PublishingQueueItem.create!(podcast: podcast)).publishing_queue_item

assert_equal [pqi1].sort, PublishingQueueItem.latest_failed.where(podcast: podcast)
assert_equal [].sort, PublishingQueueItem.latest_attempted.latest_failed.where(podcast: podcast)

PublishingPipelineState.error!(podcast)

assert_equal [pqi2].sort, PublishingQueueItem.latest_failed.where(podcast: podcast)
assert_equal [pqi2].sort, PublishingQueueItem.latest_attempted.latest_failed.where(podcast: podcast)

_pqi3 = PublishingPipelineState.create!(podcast: podcast, publishing_queue_item: PublishingQueueItem.create!(podcast: podcast)).publishing_queue_item

assert_equal [pqi2].sort, PublishingQueueItem.latest_failed.where(podcast: podcast)
assert_equal [].sort, PublishingQueueItem.latest_attempted.latest_failed.where(podcast: podcast)
end

it "returns the most recent expired publishing attempt for each podcast" do
pqi1 = PublishingPipelineState.create!(podcast: podcast, publishing_queue_item: PublishingQueueItem.create!(podcast: podcast)).publishing_queue_item
PublishingPipelineState.expire!(podcast)

assert_equal [pqi1].sort, PublishingQueueItem.latest_failed.where(podcast: podcast)
assert_equal [pqi1].sort, PublishingQueueItem.latest_attempted.latest_failed.where(podcast: podcast)

_pqi2 = PublishingPipelineState.create!(podcast: podcast, publishing_queue_item: PublishingQueueItem.create!(podcast: podcast)).publishing_queue_item

assert_equal [pqi1].sort, PublishingQueueItem.latest_failed.where(podcast: podcast)
assert_equal [].sort, PublishingQueueItem.latest_attempted.latest_failed.where(podcast: podcast)
end
end

describe ".all_unfinished_items" do
let(:podcast2) { create(:podcast) }
it "returns the publishing queue items across all podcasts" do
Expand Down