Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Producer#push always succeeds with async producers #57

Open
phrinx opened this issue Oct 22, 2014 · 4 comments
Open

Producer#push always succeeds with async producers #57

phrinx opened this issue Oct 22, 2014 · 4 comments

Comments

@phrinx
Copy link

phrinx commented Oct 22, 2014

This code below was executed while kafka service was shut down. No failure is returned, promise was fulfilled and on_success has been called which seems wrong!

[45] pry(main)> producer = Hermann::Producer.new(nil, ['localhost:2821'], {'request.required.acks' => '1', 'producer.type' => 'async'})
=> #<Hermann::Producer:0x4ff9c1a1 @brokers=["localhost:2821"], @children=[], @internal=#<Hermann::Provider::JavaProducer:0x16175a0c @producer=#<Java::KafkaJavaapiProducer::Producer:0x5718d528>>, @topic=nil>
[46] pry(main)> promise = producer.push('foo', topic: 'backup')
=> #<Concurrent::Promise:0x3953c8fb
 @children=[],
 @event=#<Concurrent::Event:0x0c5860a @condition=#<Concurrent::Condition:0x6c1d0dc2 @condition=#<ConditionVariable:0xe3e7226>>, @mutex=#<Mutex:0x476aefbf>, @set=true>,
 @executor=#<Concurrent::ThreadPoolExecutor:0xb727643 @executor=#<Java::JavaUtilConcurrent::ThreadPoolExecutor:0x58bcda24>, @max_queue=120, @overflow_policy=:abort>,
 @mutex=#<Mutex:0x63d5b757>,
 @on_fulfill=#<Proc:0x3a83f397@/Users/dkoehler/.rvm/gems/jruby-1.7.16@backup/gems/concurrent-ruby-0.7.0-java/lib/concurrent/promise.rb:38>,
 @on_reject=#<Proc:0x2ced5c84@/Users/dkoehler/.rvm/gems/jruby-1.7.16@backup/gems/concurrent-ruby-0.7.0-java/lib/concurrent/promise.rb:39>,
 @parent=nil,
 @promise_body=#<Proc:0x1354f527@/Users/dkoehler/.rvm/gems/jruby-1.7.16@backup/gems/hermann-0.18.1-java/lib/hermann/provider/java_producer.rb:48>,
 @state=:fulfilled,
 @value=nil>
[47] pry(main)> promise.rejected?
=> false
[48] pry(main)> promise.fulfilled?
=> true
[49] pry(main)> promise.on_success { p 'really?'}
"really?"
=> #<Concurrent::Promise:0x39d53fab
 @children=[],
 @event=#<Concurrent::Event:0x6e0bf178 @condition=#<Concurrent::Condition:0x521a35b9 @condition=#<ConditionVariable:0x36bd2ecb>>, @mutex=#<Mutex:0x3e4b2d80>, @set=true>,
 @executor=#<Concurrent::ThreadPoolExecutor:0xb727643 @executor=#<Java::JavaUtilConcurrent::ThreadPoolExecutor:0x58bcda24>, @max_queue=120, @overflow_policy=:abort>,
 @mutex=#<Mutex:0x39984e73>,
 @on_fulfill=#<Proc:0x3e467d8f@(pry):49>,
 @on_reject=#<Proc:0x5dd573a@/Users/dkoehler/.rvm/gems/jruby-1.7.16@backup/gems/concurrent-ruby-0.7.0-java/lib/concurrent/promise.rb:39>,
 @parent=
  #<Concurrent::Promise:0x3953c8fb
   @children=[#<Concurrent::Promise:0x39d53fab ...>],
   @event=#<Concurrent::Event:0x0c5860a @condition=#<Concurrent::Condition:0x6c1d0dc2 @condition=#<ConditionVariable:0xe3e7226>>, @mutex=#<Mutex:0x476aefbf>, @set=true>,
   @executor=#<Concurrent::ThreadPoolExecutor:0xb727643 @executor=#<Java::JavaUtilConcurrent::ThreadPoolExecutor:0x58bcda24>, @max_queue=120, @overflow_policy=:abort>,
   @mutex=#<Mutex:0x63d5b757>,
   @on_fulfill=#<Proc:0x3a83f397@/Users/dkoehler/.rvm/gems/jruby-1.7.16@backup/gems/concurrent-ruby-0.7.0-java/lib/concurrent/promise.rb:38>,
   @on_reject=#<Proc:0x2ced5c84@/Users/dkoehler/.rvm/gems/jruby-1.7.16@backup/gems/concurrent-ruby-0.7.0-java/lib/concurrent/promise.rb:39>,
   @parent=nil,
   @promise_body=#<Proc:0x1354f527@/Users/dkoehler/.rvm/gems/jruby-1.7.16@backup/gems/hermann-0.18.1-java/lib/hermann/provider/java_producer.rb:48>,
   @state=:fulfilled,
   @value=nil>,
 @promise_body=#<Proc:0x689a3d8@/Users/dkoehler/.rvm/gems/jruby-1.7.16@backup/gems/concurrent-ruby-0.7.0-java/lib/concurrent/promise.rb:41>,
 @state=:fulfilled,
 @value="really?">
@jamescway
Copy link
Contributor

Found this quote:

request.required.acks and producer.mode (sync or async) are orthogonal
configs, i.e. with async mode, the producer.send() call itself will not be
blocking on acks from the servers, but its async sending thread will still
be blocked for acks, and if it fails to send out the messages due to, say,
timing out on the acks, the sender will record the failures in the metrics.
So if you want your application to be paused or directly notified upon such
sending failures you probably should use sync, if your application do not
need to be paused, but just be monitored on such failure metrics you can
use async. Which acks value to use is then based on how much you want your
message to be persistent by trading latency.
Guozhang

@phrinx
Copy link
Author

phrinx commented Oct 22, 2014

I see. So basically using async producer there only way to get information about failures would be to access metrics those metrics. Would it make sense to expose a hermann API so metrics can be retrieved in the same way irrespective of whether the java lib or librdkafka is used?

@rtyler
Copy link
Member

rtyler commented Oct 23, 2014

@phrinx I think it would be feasible to expose those metrics in a sane way, but it's a non-trivial amount of work that I know we haven't looked too deeply into it yet.

How critical/important is the async usecase for your work right now?

@phrinx
Copy link
Author

phrinx commented Oct 23, 2014

I believe this issue is not very critical to me atm. For the time being I will instead use 'sync' producer mode with only registering callbacks for #on_success and #on_error on the promise returned by #push such that I get callback in case of error (which won't be the case in async mode due to this issue).

I understand that with this model I could silently loose messages in case of server shutdowns etc but that's an acceptable case for my application.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

No branches or pull requests

3 participants