diff --git a/README.md b/README.md index b9dcf7b..7607cde 100644 --- a/README.md +++ b/README.md @@ -120,6 +120,13 @@ Maximum number of worker threads. Default: number of logical cores +#### `--no-use_threads` + +Do not use threads, process each message using raw pub/sub event +callback. + +Default: use threads + #### `--project_id=PROJECT_ID`, `--credentials=KEYFILE_PATH` Credentials of Google Cloud Platform. Please see [the document](https://github.com/GoogleCloudPlatform/google-cloud-ruby/blob/master/AUTHENTICATION.md) for details. diff --git a/exe/activejob-google_cloud_pubsub-worker b/exe/activejob-google_cloud_pubsub-worker index 65b9a2e..88cfff0 100755 --- a/exe/activejob-google_cloud_pubsub-worker +++ b/exe/activejob-google_cloud_pubsub-worker @@ -24,6 +24,10 @@ worker_args = parser.define_by_keywords( } ) +parser.on '--[no-]threads' do |o| + worker_args[:use_threads] = o +end + pubsub_args = parser.define_by_keywords( {}, Google::Cloud::Pubsub.method(:new), diff --git a/lib/activejob-google_cloud_pubsub/worker.rb b/lib/activejob-google_cloud_pubsub/worker.rb index 56d4f9e..2c2269e 100644 --- a/lib/activejob-google_cloud_pubsub/worker.rb +++ b/lib/activejob-google_cloud_pubsub/worker.rb @@ -13,40 +13,53 @@ class Worker using PubsubExtension - def initialize(queue: 'default', min_threads: 0, max_threads: Concurrent.processor_count, pubsub: Google::Cloud::Pubsub.new, logger: Logger.new($stdout)) + def initialize(queue: 'default', min_threads: 0, max_threads: Concurrent.processor_count, use_threads: true, pubsub: Google::Cloud::Pubsub.new, logger: Logger.new($stdout)) @queue_name = queue @min_threads = min_threads @max_threads = max_threads + @use_threads = use_threads @pubsub = pubsub @logger = logger end def run - pool = Concurrent::ThreadPoolExecutor.new(min_threads: @min_threads, max_threads: @max_threads, max_queue: -1) + if @use_threads + pool = Concurrent::ThreadPoolExecutor.new(min_threads: @min_threads, max_threads: @max_threads, max_queue: -1) + else + @logger&.warn "Running without threads" + end @pubsub.subscription_for(@queue_name).listen {|message| @logger&.info "Message(#{message.message_id}) was received." - begin - Concurrent::Promise.execute(args: message, executor: pool) {|msg| - process_or_delay msg - }.rescue {|e| - @logger&.error e - } - rescue Concurrent::RejectedExecutionError - Concurrent::Promise.execute(args: message) {|msg| - msg.delay! 10.seconds.to_i - - @logger&.info "Message(#{msg.message_id}) was rescheduled after 10 seconds because the thread pool is full." - }.rescue {|e| - @logger&.error e - } + if @use_threads + run_concurrent message, pool + else + process message end }.start sleep end + def run_concurrent(message, pool) + begin + Concurrent::Promise.execute(args: message, executor: pool) {|msg| + process_or_delay msg + }.rescue {|e| + @logger&.error e + } + rescue Concurrent::RejectedExecutionError + Concurrent::Promise.execute(args: message) {|msg| + msg.delay! 10.seconds.to_i + + @logger&.info "Message(#{msg.message_id}) was rescheduled after 10 seconds because the thread pool is full." + }.rescue {|e| + @logger&.error e + } + end + end + def ensure_subscription @pubsub.subscription_for @queue_name