Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow setting custom behavior when rate limit exceeded #16

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ spec/reports
test/tmp
test/version_tmp
tmp
.DS_Store
24 changes: 22 additions & 2 deletions lib/sidekiq/throttler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,28 @@ def call(worker, msg, queue)
yield
end

rate_limit.exceeded do |delay|
worker.class.perform_in(delay, *msg['args'])
# We now allow for explicitly setting the exceeded behavior.
# By default (or with a :retry value), the exceeded behavior follows the
# previous behavior and retries the job.
#
# Otherwise, it attempts to use the behavior specified in the class options
# The specified behavior should be a a proc that takes up to 4 arguments:
# 1: the period of the rate limiter
# 2: the worker for the job
# 3: the message payload
# 4: the queue the job was pulled from
# NB: The exceeded behavior passed *MUST* be a proc if you are using less
# then 4 arguments. Because the rate limiter itself always passes four
# arguments it doesn't work with a lambda.

worker_options = (worker.class.get_sidekiq_options['throttle'] || {}).stringify_keys

if worker_options['exceeded'].nil? || worker_options['exceeded'] == :retry
rate_limit.exceeded do |delay|
worker.class.perform_in(delay, *msg['args'])
end
else
rate_limit.exceeded &worker_options['exceeded']
end

rate_limit.execute
Expand Down
3 changes: 2 additions & 1 deletion lib/sidekiq/throttler/rate_limit.rb
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ def execute
return @within_bounds.call unless can_throttle?

if exceeded?
@exceeded.call(period)
# passing a bunch of variables for use in the exceeded behavior
@exceeded.call(period, worker, payload, queue)
else
increment
@within_bounds.call
Expand Down
20 changes: 20 additions & 0 deletions spec/app/workers/lolz_worker2.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
class LolzWorker2
include Sidekiq::Worker

# immediately requeue
sidekiq_options throttle: {
threshold: 10,
period: 1.minute,
# specify an exceeded behavior which immediately requeues the job instead
# of retrying it.
exceeded: Proc.new {
|delay, worker, payload|
worker.class.perform_async(payload)
}
}

def perform(name)
puts "LolzWorker2!!!"
puts "OHAI #{name.upcase}!"
end
end
4 changes: 3 additions & 1 deletion spec/sidekiq/throttler/rate_limit_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,9 @@

it 'calls the exceeded callback with the configured #period' do
callback = Proc.new {}
expect(callback).to receive(:call).with(rate_limit.period)
#expect(callback).to receive(:call).with(rate_limit.period)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you forgot this

expect(callback).to receive(:call).with(rate_limit.period, worker, payload, queue)


rate_limit.exceeded(&callback)
rate_limit.execute
Expand Down
10 changes: 10 additions & 0 deletions spec/sidekiq/throttler_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
LolzWorker.new
end

let(:worker2) do
LolzWorker2.new
end

let(:options) do
{ storage: :memory }
end
Expand Down Expand Up @@ -50,6 +54,12 @@
expect(worker.class).to receive(:perform_in).with(1.minute, *message['args'])
throttler.call(worker, message, queue)
end

it 'properly performs the behavior specificed in exceeded option' do
Sidekiq::Throttler::RateLimit.any_instance.should_receive(:exceeded?).and_return(true)
worker2.class.should_receive(:perform_async).with(*message['args'])
throttler.call(worker2, message, queue)
end
end
end
end