diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0bd9ead --- /dev/null +++ b/.gitignore @@ -0,0 +1,10 @@ +/.bundle/ +/.yardoc +/Gemfile.lock +/_yardoc/ +/coverage/ +/doc/ +/pkg/ +/spec/reports/ +/tmp/ +dump.rdb diff --git a/.rspec b/.rspec new file mode 100644 index 0000000..8c18f1a --- /dev/null +++ b/.rspec @@ -0,0 +1,2 @@ +--format documentation +--color diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..03ee64f --- /dev/null +++ b/.travis.yml @@ -0,0 +1,5 @@ +sudo: false +language: ruby +rvm: + - 2.3.1 +before_install: gem install bundler -v 1.12.5 diff --git a/Gemfile b/Gemfile new file mode 100644 index 0000000..c766fd2 --- /dev/null +++ b/Gemfile @@ -0,0 +1,4 @@ +source 'https://rubygems.org' + +# Specify your gem's dependencies in sidekiq-batch.gemspec +gemspec diff --git a/Guardfile b/Guardfile new file mode 100644 index 0000000..ac65cab --- /dev/null +++ b/Guardfile @@ -0,0 +1,4 @@ +guard 'rspec', cmd: 'rspec --color' do + watch(%r{^lib/(.+)\.rb$}) { |m| "spec/#{m[1]}_spec.rb" } + watch(%r|^spec/(.*)_spec\.rb|) +end diff --git a/LICENSE.txt b/LICENSE.txt new file mode 100644 index 0000000..622384f --- /dev/null +++ b/LICENSE.txt @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2016 Breamware + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..5324182 --- /dev/null +++ b/README.md @@ -0,0 +1,32 @@ +# Sidekiq::Batch + +Simple Sidekiq Batch Job implementation. + +## Installation + +Add this line to your application's Gemfile: + +```ruby +gem 'sidekiq-batch' +``` + +And then execute: + + $ bundle + +Or install it yourself as: + + $ gem install sidekiq-batch + +## Usage + +Sidekiq Batch is drop-in replacement for the API from Sidekiq PRO. See https://github.com/mperham/sidekiq/wiki/Batches for usage. + +## Contributing + +Bug reports and pull requests are welcome on GitHub at https://github.com/breamware/sidekiq-batch. + + +## License + +The gem is available as open source under the terms of the [MIT License](http://opensource.org/licenses/MIT). diff --git a/Rakefile b/Rakefile new file mode 100644 index 0000000..b7e9ed5 --- /dev/null +++ b/Rakefile @@ -0,0 +1,6 @@ +require "bundler/gem_tasks" +require "rspec/core/rake_task" + +RSpec::Core::RakeTask.new(:spec) + +task :default => :spec diff --git a/lib/sidekiq/batch.rb b/lib/sidekiq/batch.rb new file mode 100644 index 0000000..23c7df5 --- /dev/null +++ b/lib/sidekiq/batch.rb @@ -0,0 +1,79 @@ +require 'securerandom' +require 'sidekiq' + +require 'sidekiq/batch/callback' +require 'sidekiq/batch/middleware' +require 'sidekiq/batch/status' +require 'sidekiq/batch/version' + +module Sidekiq + class Batch + class NoBlockGivenError < StandardError; end + + attr_reader :bid, :description, :callback_queue + + def initialize(existing_bid = nil) + @bid = existing_bid || SecureRandom.urlsafe_base64(10) + Sidekiq.redis { |r| r.set("#{bid}-to_process", 0) } + end + + def description=(description) + @description = description + Sidekiq.redis { |r| r.hset(bid, 'description', description) } + end + + def callback_queue=(callback_queue) + @callback_queue = callback_queue + Sidekiq.redis { |r| r.hset(bid, 'callback_queue', callback_queue) } + end + + def on(event, callback, options = {}) + return unless %w(success complete).include?(event.to_s) + Sidekiq.redis do |r| + r.hset(bid, "callback_#{event}", callback) + r.hset(bid, "callback_#{event}_opts", options.to_json) + end + end + + def jobs + raise NoBlockGivenError unless block_given? + + Batch.increment_job_queue(bid) + Thread.current[:bid] = bid + yield + Batch.process_successful_job(bid) + end + + class << self + def process_failed_job(bid, jid) + to_process = Sidekiq.redis do |r| + r.multi do + r.sadd("#{bid}-failed", jid) + r.scard("#{bid}-failed") + r.get("#{bid}-to_process") + end + end + if to_process[2].to_i == to_process[1].to_i + Callback.call_if_needed(:complete, bid) + end + end + + def process_successful_job(bid) + to_process = Sidekiq.redis do |r| + r.multi do + r.decr("#{bid}-to_process") + r.get("#{bid}-to_process") + end + end + if to_process[1].to_i == 0 + Callback.call_if_needed(:success, bid) + Callback.call_if_needed(:complete, bid) + end + end + + def increment_job_queue(bid) + Sidekiq.redis { |r| r.incr("#{bid}-to_process") } + end + end + end +end diff --git a/lib/sidekiq/batch/callback.rb b/lib/sidekiq/batch/callback.rb new file mode 100644 index 0000000..eb3644e --- /dev/null +++ b/lib/sidekiq/batch/callback.rb @@ -0,0 +1,40 @@ +module Sidekiq + class Batch + module Callback + class Worker + include Sidekiq::Worker + + def perform(clazz, event, opts, bid) + return unless %w(success complete).include?(event) + instance = clazz.constantize.send(:new) rescue nil + return unless instance + instance.send("on_#{event}", Status.new(bid), opts) rescue nil + end + end + + class << self + def call_if_needed(event, bid) + needed = Sidekiq.redis do |r| + r.multi do + r.hget(bid, event) + r.hset(bid, event, true) + end + end + return if 'true' == needed[0] + callback, opts, queue = Sidekiq.redis do |r| + r.hmget(bid, + "callback_#{event}", "callback_#{event}_opts", + 'callback_queue') + end + return unless callback + opts = JSON.parse(opts) if opts + opts ||= {} + queue ||= 'default' + Sidekiq::Client.push('class' => Sidekiq::Batch::Callback::Worker, + 'args' => [callback, event, opts, bid], + 'queue' => queue) + end + end + end + end +end diff --git a/lib/sidekiq/batch/middleware.rb b/lib/sidekiq/batch/middleware.rb new file mode 100644 index 0000000..d4775aa --- /dev/null +++ b/lib/sidekiq/batch/middleware.rb @@ -0,0 +1,51 @@ +module Sidekiq + class Batch + module Middleware + def self.extended(base) + base.class_eval do + register_middleware + end + end + + def register_middleware + Sidekiq.configure_server do |config| + config.client_middleware do |chain| + chain.add ClientMiddleware + end + config.server_middleware do |chain| + chain.add ClientMiddleware + chain.add ServerMiddleware + end + end + end + + class ClientMiddleware + def call(_worker, msg, _queue, _redis_pool = nil) + if (bid = Thread.current[:bid]) + Batch.increment_job_queue(bid) if + msg[:bid] = bid + end + yield + end + end + + class ServerMiddleware + def call(_worker, msg, _queue) + if (bid = msg['bid']) + begin + yield + Batch.process_successful_job(bid) + rescue + Batch.process_failed_job(bid, msg['jid']) + raise + end + else + yield + end + end + end + end + end +end + +Sidekiq::Batch::Middleware.send(:extend, Sidekiq::Batch::Middleware) diff --git a/lib/sidekiq/batch/status.rb b/lib/sidekiq/batch/status.rb new file mode 100644 index 0000000..52b9721 --- /dev/null +++ b/lib/sidekiq/batch/status.rb @@ -0,0 +1,34 @@ +module Sidekiq + class Batch + class Status + attr_reader :bid, :total, :failures, :created_at, :failure_info + + def initialize(bid) + @bid = bid + end + + def join + raise "Not supported" + end + + def pending + Sidekiq.redis { |r| r.get("#{bid}-to_process") }.to_i + end + + def complete? + 'true' == Sidekiq.redis { |r| r.hget(bid, 'complete') } + end + + def data + { + total: total, + failures: failures, + pending: pending, + created_at: created_at, + complete: complete?, + failure_info: failure_info + } + end + end + end +end diff --git a/lib/sidekiq/batch/version.rb b/lib/sidekiq/batch/version.rb new file mode 100644 index 0000000..7d8aca3 --- /dev/null +++ b/lib/sidekiq/batch/version.rb @@ -0,0 +1,5 @@ +module Sidekiq + class Batch + VERSION = '0.1.0.pre'.freeze + end +end diff --git a/sidekiq-batch.gemspec b/sidekiq-batch.gemspec new file mode 100644 index 0000000..4931279 --- /dev/null +++ b/sidekiq-batch.gemspec @@ -0,0 +1,27 @@ +# coding: utf-8 +lib = File.expand_path('../lib', __FILE__) +$LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib) +require 'sidekiq/batch/version' + +Gem::Specification.new do |spec| + spec.name = "sidekiq-batch" + spec.version = Sidekiq::Batch::VERSION + spec.authors = ["Marcin Naglik"] + spec.email = ["marcin.naglik@gmail.com"] + + spec.summary = "Sidekiq Batch Jobs" + spec.description = "Sidekiq Batch Jobs Implementation" + spec.homepage = "http://github.com/breamware/sidekiq-batch" + spec.license = "MIT" + + spec.files = `git ls-files -z`.split("\x0").reject { |f| f.match(%r{^(test|spec|features)/}) } + spec.bindir = "exe" + spec.executables = spec.files.grep(%r{^exe/}) { |f| File.basename(f) } + spec.require_paths = ["lib"] + + spec.add_dependency "sidekiq", "~> 4" + + spec.add_development_dependency "bundler", "~> 1.12" + spec.add_development_dependency "rake", "~> 10.0" + spec.add_development_dependency "rspec", "~> 3.0" +end diff --git a/spec/integration/integration.rb b/spec/integration/integration.rb new file mode 100644 index 0000000..edc743a --- /dev/null +++ b/spec/integration/integration.rb @@ -0,0 +1,31 @@ +require 'sidekiq/batch' + +class TestWorker + include Sidekiq::Worker + + def perform + end +end + +class MyCallback + def on_success(status, options) + puts "Success #{options} #{status.data}" + end + + def on_complete(status, options) + puts "Complete #{options} #{status.data}" + end +end + +batch = Sidekiq::Batch.new +batch.description = 'Test batch' +batch.callback_queue = :default +batch.on(:success, MyCallback, to: 'success@gmail.com') +batch.on(:complete, MyCallback, to: 'complete@gmail.com') + +batch.jobs do + 10.times do + TestWorker.perform_async + end +end +puts Sidekiq::Batch::Status.new(batch.bid).data diff --git a/spec/sidekiq/batch/callback_spec.rb b/spec/sidekiq/batch/callback_spec.rb new file mode 100644 index 0000000..4d00fbe --- /dev/null +++ b/spec/sidekiq/batch/callback_spec.rb @@ -0,0 +1,39 @@ +require 'spec_helper' + +describe Sidekiq::Batch::Callback::Worker do + describe '#perfom' do + it 'does not do anything if it cannot find the callback class' do + subject.perform('SampleCallback', 'complete', {}, 'ABCD') + end + + it 'does not do anything if event is different from complete or success' do + class SampleCallback; end + expect(SampleCallback).not_to receive(:new) + subject.perform('SampleCallback', 'ups', {}, 'ABCD') + end + + it 'creates instance when class and event' do + class SampleCallback; end + expect(SampleCallback).to receive(:new) + subject.perform('SampleCallback', 'success', {}, 'ABCD') + end + + it 'calls on_success if defined' do + callback_instance = double('SampleCallback') + class SampleCallback; end + expect(SampleCallback).to receive(:new).and_return(callback_instance) + expect(callback_instance).to receive(:on_success) + .with(instance_of(Sidekiq::Batch::Status), {}) + subject.perform('SampleCallback', 'success', {}, 'ABCD') + end + + it 'calls on_complete if defined' do + callback_instance = double('SampleCallback') + class SampleCallback; end + expect(SampleCallback).to receive(:new).and_return(callback_instance) + expect(callback_instance).to receive(:on_complete) + .with(instance_of(Sidekiq::Batch::Status), {}) + subject.perform('SampleCallback', 'complete', {}, 'ABCD') + end + end +end diff --git a/spec/sidekiq/batch/middleware_spec.rb b/spec/sidekiq/batch/middleware_spec.rb new file mode 100644 index 0000000..41f1e75 --- /dev/null +++ b/spec/sidekiq/batch/middleware_spec.rb @@ -0,0 +1,77 @@ +require 'spec_helper' + +describe Sidekiq::Batch::Middleware do + describe Sidekiq::Batch::Middleware::ServerMiddleware do + context 'when without batch' do + it 'just yields' do + yielded = false + expect(Sidekiq::Batch).not_to receive(:process_successful_job) + expect(Sidekiq::Batch).not_to receive(:process_failed_job) + subject.call(nil, {}, nil) { yielded = true } + expect(yielded).to be_truthy + end + end + + context 'when in batch' do + let(:bid) { 'SAMPLEBID' } + context 'when successful' do + it 'yields' do + yielded = false + subject.call(nil, { 'bid' => bid }, nil) { yielded = true } + expect(yielded).to be_truthy + end + + it 'calls process_successful_job' do + expect(Sidekiq::Batch).to receive(:process_successful_job).with(bid) + subject.call(nil, { 'bid' => bid }, nil) {} + end + end + + context 'when failed' do + it 'calls process_failed_job and reraises exception' do + reraised = false + expect(Sidekiq::Batch).to receive(:process_failed_job) + begin + subject.call(nil, { 'bid' => bid }, nil) { raise 'ERR' } + rescue + reraised = true + end + expect(reraised).to be_truthy + end + end + end + end + + describe Sidekiq::Batch::Middleware::ClientMiddleware do + context 'when without batch' do + it 'just yields' do + yielded = false + expect(Sidekiq::Batch).not_to receive(:increment_job_queue) + subject.call(nil, nil, nil) { yielded = true } + expect(yielded).to be_truthy + end + end + + context 'when in batch' do + let(:bid) { 'SAMPLEBID' } + before { Thread.current[:bid] = bid } + + it 'yields' do + yielded = false + subject.call(nil, {}, nil) { yielded = true } + expect(yielded).to be_truthy + end + + it 'increments job queue' do + expect(Sidekiq::Batch).to receive(:increment_job_queue).with(bid) + subject.call(nil, {}, nil) {} + end + + it 'assigns bid to msg' do + msg = {} + subject.call(nil, msg, nil) {} + expect(msg[:bid]).to eq(bid) + end + end + end +end diff --git a/spec/sidekiq/batch/status_spec.rb b/spec/sidekiq/batch/status_spec.rb new file mode 100644 index 0000000..f212912 --- /dev/null +++ b/spec/sidekiq/batch/status_spec.rb @@ -0,0 +1,4 @@ +require 'spec_helper' + +describe Sidekiq::Batch::Status do +end diff --git a/spec/sidekiq/batch_spec.rb b/spec/sidekiq/batch_spec.rb new file mode 100644 index 0000000..e6ec2d5 --- /dev/null +++ b/spec/sidekiq/batch_spec.rb @@ -0,0 +1,54 @@ +require 'spec_helper' + +describe Sidekiq::Batch do + it 'has a version number' do + expect(Sidekiq::Batch::VERSION).not_to be nil + end + + describe '#initialize' do + subject { described_class } + + it 'creates bid when called without it' do + expect(subject.new.bid).not_to be_nil + end + + it 'reuses bid when called with it' do + batch = subject.new('dayPO5KxuRXXxw') + expect(batch.bid).to eq('dayPO5KxuRXXxw') + end + end + + describe '#description' do + let(:description) { 'custom description' } + before { subject.description = description } + + it 'sets descriptions' do + expect(subject.description).to eq(description) + end + + it 'persists description' do + expect(Sidekiq.redis { |r| r.hget(subject.bid, 'description') }) + .to eq(description) + end + end + + describe '#callback_queue' do + let(:callback_queue) { 'custom_queue' } + before { subject.callback_queue = callback_queue } + + it 'sets callback_queue' do + expect(subject.callback_queue).to eq(callback_queue) + end + + it 'persists callback_queue' do + expect(Sidekiq.redis { |r| r.hget(subject.bid, 'callback_queue') }) + .to eq(callback_queue) + end + end + + describe '#jobs' do + it 'throws error if no block given' do + expect { subject.jobs }.to raise_error Sidekiq::Batch::NoBlockGivenError + end + end +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb new file mode 100644 index 0000000..541117e --- /dev/null +++ b/spec/spec_helper.rb @@ -0,0 +1,2 @@ +$LOAD_PATH.unshift File.expand_path('../../lib', __FILE__) +require 'sidekiq/batch'