Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into apple-retry-uploads
Browse files Browse the repository at this point in the history
  • Loading branch information
svevang committed Jul 25, 2023
2 parents 67c7d70 + 7d220c3 commit 53f8dd8
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 44 deletions.
11 changes: 9 additions & 2 deletions app/jobs/publish_feed_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,18 @@ class PublishFeedJob < ApplicationJob
attr_accessor :podcast, :episodes, :rss, :put_object, :copy_object

def perform(podcast)
# Since we don't current have a way to retry failed attempts,
# and until somthing akin to https://github.com/PRX/feeder.prx.org/issues/714 lands
# the RSS publishing is extracted from the publishing pipeline semantics.
# TODO: recombine the publishing invocations (RSS, Apple) once we have some retry guarantees
podcast.feeds.each { |feed| save_file(podcast, feed) }

PublishingPipelineState.start!(podcast)
podcast.feeds.each { |feed| publish_feed(podcast, feed) }
PublishingPipelineState.complete!(podcast)
rescue => e
PublishingPipelineState.error!(podcast)
Rails.logger.error("Error publishing podcast", {podcast_id: podcast.id, error: e.message, backtrace: e.backtrace.join("\n")})
raise e
ensure
PublishingPipelineState.settle_remaining!(podcast)
Expand All @@ -39,9 +46,9 @@ def publish_apple(feed)
end

def publish_rss(podcast, feed)
res = save_file(podcast, feed)
# res = save_file(podcast, feed)
PublishingPipelineState.publish_rss!(podcast)
res
# res
end

def save_file(podcast, feed, options = {})
Expand Down
9 changes: 6 additions & 3 deletions app/models/apple/api_waiting.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@ module ApiWaiting
def self.wait_for(remaining_records)
t_beg = Time.now.utc
loop do
# TODO: handle timeout
break [false, remaining_records] if Time.now.utc - t_beg > self::API_WAIT_TIMEOUT
Rails.logger.info(".wait_for", {remaining_records: remaining_records, have_waited: Time.now.utc - t_beg})

# Return `timeout == true` if we've waited too long
break [true, remaining_records] if Time.now.utc - t_beg > self::API_WAIT_TIMEOUT

remaining_records = yield(remaining_records)

break [true, []] if remaining_records.empty?
# All done, return `timeout == false`
break [false, []] if remaining_records.empty?

sleep(self::API_WAIT_INTERVAL)
end
Expand Down
14 changes: 7 additions & 7 deletions app/models/apple/episode.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ def self.wait_for_asset_state(api, eps)

remaining_eps.each do |ep|
Rails.logger.info("Waiting for audio asset state?", {episode_id: ep.feeder_id,
delivery_file_count: ep.podcast_delivery_files.count,
delivery_files_processed_errors: ep.podcast_delivery_files.all?(&:processed_errors?),
delivery_files_processed: ep.podcast_delivery_files.all?(&:processed?),
delivery_files_delivered: ep.podcast_delivery_files.all?(&:delivered?),
asset_state: ep.audio_asset_state,
has_podcast_audio: ep&.podcast_container&.has_podcast_audio?,
waiting_for_asset_state: ep.waiting_for_asset_state?})
delivery_file_count: ep.podcast_delivery_files.count,
delivery_files_processed_errors: ep.podcast_delivery_files.all?(&:processed_errors?),
delivery_files_processed: ep.podcast_delivery_files.all?(&:processed?),
delivery_files_delivered: ep.podcast_delivery_files.all?(&:delivered?),
asset_state: ep.audio_asset_state,
has_podcast_audio: ep&.podcast_container&.has_podcast_audio?,
waiting_for_asset_state: ep.waiting_for_asset_state?})
end

rem =
Expand Down
10 changes: 8 additions & 2 deletions app/models/apple/publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,11 @@ def wait_for_upload_processing(eps)
Rails.logger.tagged("##{__method__}") do
pdfs = eps.map(&:podcast_delivery_files).flatten

Apple::PodcastDeliveryFile.wait_for_delivery_files(api, pdfs)
(waiting_timed_out, _) = Apple::PodcastDeliveryFile.wait_for_delivery(api, pdfs)
raise "Timed out waiting for delivery" if waiting_timed_out

(waiting_timed_out, _) = Apple::PodcastDeliveryFile.wait_for_processing(api, pdfs)
raise "Timed out waiting for processing" if waiting_timed_out

# Get the latest state of the podcast containers
# which should include synced files
Expand All @@ -124,7 +128,9 @@ def wait_for_upload_processing(eps)
def wait_for_asset_state(eps)
Rails.logger.tagged("##{__method__}") do
eps = eps.filter { |e| e.podcast_delivery_files.any?(&:api_marked_as_uploaded?) }
Apple::Episode.wait_for_asset_state(api, eps)

(waiting_timed_out, _) = Apple::Episode.wait_for_asset_state(api, eps)
raise "Timed out waiting for asset state" if waiting_timed_out
end
end

Expand Down
1 change: 1 addition & 0 deletions app/models/podcast.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def self.by_prx_series(series)
end

def self.release!(options = {})
Rails.logger.info("Podcast.release! called")
PublishingPipelineState.expire_pipelines!
Episode.release_episodes!(options)
end
Expand Down
51 changes: 33 additions & 18 deletions app/models/publishing_pipeline_state.rb
Original file line number Diff line number Diff line change
Expand Up @@ -88,29 +88,40 @@ def self.most_recent_state(podcast)
end

def self.start_pipeline!(podcast)
PublishingQueueItem.ensure_queued!(podcast)
attempt!(podcast)
Rails.logger.tagged("PublishingPipeLineState.start_pipeline!") do
PublishingQueueItem.ensure_queued!(podcast)
attempt!(podcast)
end
end

# None of the methods that grab locks are threadsafe if we assume that
# creating published artifacts is non-idempotent (e.g. creating remote Apple
# resources)
def self.attempt!(podcast, perform_later: true)
podcast.with_publish_lock do
next if PublishingQueueItem.unfinished_items(podcast).empty?
next if PublishingQueueItem.current_unfinished_item(podcast).present?

# Dedupe the work, grab the latest unfinished item in the queue
latest_unfinished_item = PublishingQueueItem.unfinished_items(podcast).first

PublishingPipelineState.create!(podcast: podcast, publishing_queue_item: latest_unfinished_item, status: :created)

if perform_later
Rails.logger.info("Scheduling PublishFeedJob for podcast #{podcast.id}", {podcast_id: podcast.id})
PublishFeedJob.perform_later(podcast)
else
Rails.logger.info("Performing PublishFeedJob for podcast #{podcast.id}", {podcast_id: podcast.id})
PublishFeedJob.perform_now(podcast)
Rails.logger.tagged("PublishingPipeLineState.attempt!") do
podcast.with_publish_lock do
if PublishingQueueItem.unfinished_items(podcast).empty?
Rails.logger.info("Unfinished items empty, nothing to do", podcast_id: podcast.id)
next
end
if (curr_running_item = PublishingQueueItem.current_unfinished_item(podcast))
Rails.logger.info("Podcast's PublishingQueueItem already has running pipeline", podcast_id: podcast.id, running_queue_item: curr_running_item.id)
next
end

# Dedupe the work, grab the latest unfinished item in the queue
latest_unfinished_item = PublishingQueueItem.unfinished_items(podcast).first

Rails.logger.info("Creating publishing pipeline for podcast #{podcast.id}", {podcast_id: podcast.id, queue_item_id: latest_unfinished_item.id})
PublishingPipelineState.create!(podcast: podcast, publishing_queue_item: latest_unfinished_item, status: :created)

if perform_later
Rails.logger.info("Scheduling PublishFeedJob for podcast #{podcast.id}", {podcast_id: podcast.id})
PublishFeedJob.perform_later(podcast)
else
Rails.logger.info("Performing PublishFeedJob for podcast #{podcast.id}", {podcast_id: podcast.id})
PublishFeedJob.perform_now(podcast)
end
end
end
end
Expand Down Expand Up @@ -155,7 +166,9 @@ def self.expire_pipelines!
end

def self.settle_remaining!(podcast)
attempt!(podcast)
Rails.logger.tagged("PublishingPipeLineState.settle_remaining!") do
attempt!(podcast)
end
end

def self.complete?(podcast)
Expand All @@ -165,7 +178,9 @@ def self.complete?(podcast)
def self.state_transition(podcast, to_state)
podcast.with_publish_lock do
pqi = PublishingQueueItem.current_unfinished_item(podcast)
curr_running_item = PublishingQueueItem.current_unfinished_item(podcast)
if pqi.present?
Rails.logger.info("Transitioning podcast #{podcast.id} publishing pipeline to state #{to_state}", {podcast_id: podcast.id, to_state: to_state, running_queue_item: curr_running_item&.id})
PublishingPipelineState.create!(podcast: podcast, publishing_queue_item: pqi, status: to_state)
else
Rails.logger.error("Podcast #{podcast.id} has no unfinished work, cannot transition state", {podcast_id: podcast.id, to_state: to_state})
Expand Down
1 change: 1 addition & 0 deletions app/models/publishing_queue_item.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def self.delivery_status
end

def self.ensure_queued!(podcast)
Rails.logger.info("Creating new PublishingQueueItem", {podcast_id: podcast.id})
create!(podcast: podcast)
end

Expand Down
8 changes: 4 additions & 4 deletions test/models/apple/api_waiting_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class TestTimeout

step = 0

(finished_waiting, remaining) = TestWait.wait_for(records) do |remaining|
(timed_out, remaining) = TestWait.wait_for(records) do |remaining|
rem = remaining.dup

assert_equal intervals[step], rem
Expand All @@ -37,16 +37,16 @@ class TestTimeout
rem
end

assert_equal finished_waiting, true
assert_equal timed_out, false
assert_equal remaining, []
end

it "times out" do
(finished_waiting, remaining) = TestTimeout.wait_for(["a", "b", "c"]) do |remaining|
(timed_out, remaining) = TestTimeout.wait_for(["a", "b", "c"]) do |remaining|
remaining
end

assert_equal finished_waiting, false
assert_equal timed_out, true
assert_equal remaining, ["a", "b", "c"]
end
end
Expand Down
22 changes: 14 additions & 8 deletions test/models/publishing_pipeline_state_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@
assert_equal [pa1, pa2].sort, PublishingPipelineState.expired_pipelines.sort
PublishingPipelineState.expire_pipelines!

assert_equal ["created", "expired"], PublishingPipelineState.latest_pipeline(podcast).map(&:status)
assert_equal ["created", "expired"], PublishingPipelineState.latest_pipeline(podcast2).map(&:status)
assert_equal ["created", "expired"].sort, PublishingPipelineState.latest_pipeline(podcast).map(&:status).sort
assert_equal ["created", "expired"].sort, PublishingPipelineState.latest_pipeline(podcast2).map(&:status).sort

# All pipelines are in a terminal state
# There is nothing running:
Expand Down Expand Up @@ -197,8 +197,10 @@

describe "error!" do
it 'sets the status to "error"' do
PublishFeedJob.stub_any_instance(:publish_feed, ->(*args) { raise "error" }) do
assert_raises(RuntimeError) { PublishingPipelineState.attempt!(podcast, perform_later: false) }
PublishFeedJob.stub_any_instance(:save_file, nil) do
PublishFeedJob.stub_any_instance(:publish_feed, ->(*args) { raise "error" }) do
assert_raises(RuntimeError) { PublishingPipelineState.attempt!(podcast, perform_later: false) }
end
end

assert_equal ["created", "started", "error"].sort, PublishingPipelineState.where(podcast: podcast).map(&:status).sort
Expand All @@ -207,8 +209,10 @@

describe "complete!" do
it 'sets the status to "complete"' do
PublishFeedJob.stub_any_instance(:publish_feed, "pub!") do
PublishingPipelineState.attempt!(podcast, perform_later: false)
PublishFeedJob.stub_any_instance(:save_file, nil) do
PublishFeedJob.stub_any_instance(:publish_feed, "pub!") do
PublishingPipelineState.attempt!(podcast, perform_later: false)
end
end

assert_equal ["created", "started", "complete"].sort, PublishingPipelineState.where(podcast: podcast).map(&:status).sort
Expand All @@ -218,8 +222,10 @@
# And another request comes along to publish:
PublishingQueueItem.create!(podcast: podcast)

PublishFeedJob.stub_any_instance(:publish_feed, "pub!") do
PublishingPipelineState.attempt!(podcast, perform_later: false)
PublishFeedJob.stub_any_instance(:save_file, nil) do
PublishFeedJob.stub_any_instance(:publish_feed, "pub!") do
PublishingPipelineState.attempt!(podcast, perform_later: false)
end
end

PublishingQueueItem.create!(podcast: podcast)
Expand Down

0 comments on commit 53f8dd8

Please sign in to comment.