Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread pools for callbacks #30

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ Networking usually means pushing lots of bytes around and in Ruby it's easy to m
The [examples](https://github.com/iconara/ione/tree/master/examples) directory has some examples of what you can do with Ione, for example:

* [redis_client](https://github.com/iconara/ione/tree/master/examples/redis_client) is a more or less full featured Redis client that uses most of Ione's features.
* [http_client](https://github.com/iconara/ione/tree/master/examples/http_client) is a simplistic HTTP client that uses Ione and [http_parser.rb](http://rubygems.org/gems/http_parser.rb) to make HTTP GET request. It also shows how to make TLS connections.
* [http_client](https://github.com/iconara/ione/tree/master/examples/http_client) is a simplistic HTTP client that uses Ione and [http_parser.rb](http://rubygems.org/gems/http_parser.rb) to make HTTP GET request. It also shows how to make TLS connections, and how to provide a thread pool for callbacks.
* [cql-rb](https://github.com/iconara/cql-rb) is a high performance Cassandra driver and where Ione was originally developed.
* [cassandra-driver](https://github.com/datastax/ruby-driver) is the successor to cql-rb.
* [ione-rpc](https://github.com/iconara/ione-rpc) is a RPC framework built on Ione. It makes it reasonably easy to build networked applications without having to reinvent the wheel.
Expand Down
2 changes: 2 additions & 0 deletions examples/http_client/README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# Ione HTTP client example

A simplistic HTTP client that uses Ione and [http_parser.rb](http://rubygems.org/gems/http_parser.rb) to make HTTP GET request. It also supports HTTPS.

This example also uses a thread pool to avoid blocking the reactor when the HTTP response is parsed. It's purpose is to move the protocol processing off of the reactor thread, not to parallelize it, and that means that a simple single-threaded implementation is sufficient.
45 changes: 42 additions & 3 deletions examples/http_client/lib/ione/http_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
require 'ione'
require 'http_parser'
require 'uri'
require 'thread'


module Ione
class HttpClient
def initialize(cert_store=nil)
@reactor = Io::IoReactor.new
@thread_pool = SingleThreadPool.new
@reactor = Io::IoReactor.new(thread_pool: @thread_pool)
if cert_store
@cert_store = cert_store
else
Expand All @@ -18,11 +20,11 @@ def initialize(cert_store=nil)
end

def start
@reactor.start.map(self)
@thread_pool.start.then { @reactor.start }.map(self)
end

def stop
@reactor.stop.map(self)
@reactor.stop.then { @thread_pool.stop }.map(self)
end

def get(url, headers={})
Expand Down Expand Up @@ -97,4 +99,41 @@ def initialize(status, headers, body)
@body = body
end
end

class SingleThreadPool
StoppedError = Class.new(StandardError)

def initialize
@queue = Queue.new
@stopped_promise = Promise.new
end

def submit(&task)
if @stopped
Future.failed(StoppedError.new('Thread pool stopped'))
else
promise = Promise.new
@queue << [task, promise]
promise.future
end
end

def start
@thread = Thread.start do
until (job = @queue.pop) == :die
task, promise = job
promise.try(&task)
end
@stopped_promise.fulfill
end
Future.resolved
end

def stop
@stopped = true
@queue.clear
@queue << :die
@stopped_promise.future
end
end
end
1 change: 1 addition & 0 deletions lib/ione.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ module Ione

require 'ione/future'
require 'ione/byte_buffer'
require 'ione/thread_pool'
require 'ione/io'
20 changes: 17 additions & 3 deletions lib/ione/io/acceptor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ class Acceptor
attr_reader :backlog

# @private
def initialize(host, port, backlog, unblocker, reactor, socket_impl=nil)
def initialize(host, port, backlog, unblocker, thread_pool, reactor, socket_impl=nil)
@host = host
@port = port
@backlog = backlog
@unblocker = unblocker
@reactor = reactor
@thread_pool = thread_pool
@socket_impl = socket_impl || ServerSocket
@accept_listeners = []
@lock = Mutex.new
Expand All @@ -30,6 +31,15 @@ def initialize(host, port, backlog, unblocker, reactor, socket_impl=nil)

# Register a listener to be notified when client connections are accepted
#
# It is very important that you don't do any heavy lifting in the callback
# since it by default is called from the IO reactor thread, and as long as
# the callback is working the reactor can't handle any IO and no other
# callbacks can be called. However, if you have provided a thread pool to
# your reactor then each call to the callback will be submitted to that
# pool and you're free to do as much work as you want.
#
# Errors raised by the callback will be ignored.
#
# @yieldparam [Ione::Io::ServerConnection] the connection to the client
def on_accept(&listener)
@lock.synchronize do
Expand Down Expand Up @@ -107,7 +117,7 @@ def writable?
# @private
def read
client_socket, host, port = accept
connection = ServerConnection.new(client_socket, host, port, @unblocker)
connection = ServerConnection.new(client_socket, host, port, @unblocker, @thread_pool)
@reactor.accept(connection)
notify_accept_listeners(connection)
end
Expand Down Expand Up @@ -135,7 +145,11 @@ def accept

def notify_accept_listeners(connection)
listeners = @lock.synchronize { @accept_listeners }
listeners.each { |l| l.call(connection) rescue nil }
listeners.each do |listener|
@thread_pool.submit do
listener.call(connection)
end
end
end
end
end
Expand Down
17 changes: 12 additions & 5 deletions lib/ione/io/base_connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ class BaseConnection
attr_reader :host, :port

# @private
def initialize(host, port, unblocker)
def initialize(host, port, unblocker, thread_pool)
@host = host
@port = port
@unblocker = unblocker
@thread_pool = thread_pool
@state = CONNECTING_STATE
@writable = false
@lock = Mutex.new
Expand Down Expand Up @@ -111,9 +112,11 @@ def writable?
# yourself in your protocol handler.
#
# It is very important that you don't do any heavy lifting in the callback
# since it is called from the IO reactor thread, and as long as the
# callback is working the reactor can't handle any IO and no other
# callbacks can be called.
# since it by default is called from the IO reactor thread, and as long as
# the callback is working the reactor can't handle any IO and no other
# callbacks can be called. However, if you have provided a thread pool to
# your reactor then each call to the callback will be submitted to that
# pool and you're free to do as much work as you want.
#
# Errors raised by the callback will be ignored.
#
Expand Down Expand Up @@ -185,7 +188,11 @@ def flush
# @private
def read
new_data = @io.read_nonblock(65536)
@data_listener.call(new_data) if @data_listener
if @data_listener
@thread_pool.submit do
@data_listener.call(new_data)
end
end
rescue => e
close(e)
end
Expand Down
4 changes: 2 additions & 2 deletions lib/ione/io/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ class Connection < BaseConnection
attr_reader :connection_timeout

# @private
def initialize(host, port, connection_timeout, unblocker, clock, socket_impl=Socket)
super(host, port, unblocker)
def initialize(host, port, connection_timeout, unblocker, thread_pool, clock, socket_impl=Socket)
super(host, port, unblocker, thread_pool)
@connection_timeout = connection_timeout
@clock = clock
@socket_impl = socket_impl
Expand Down
30 changes: 24 additions & 6 deletions lib/ione/io/io_reactor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,21 @@ module Io
# run in the reactor thread, and every cycle you use there is a cycle which
# can't be used to handle IO.
#
# You can provide the reactor with a thread pool to use when callbacks. This
# way you don't have to worry about the protocol parsing blocking the
# reactor. A thread pool is any object that responds to `#submit`, takes a
# block and returns a future which resolves to the value of calling the
# block. It's completely up to the implementation when and how the block is
# called. The default implementation simply calls the block immediately.
#
# The callbacks that are called in the thread pool are `Connection#on_data`,
# `ServerConnection#on_data` ({BaseConnection#on_data}) and
# {Acceptor#on_accept}.
#
# If you provide a thread pool with more than one thread multiple chunks
# from the same connection can be processed in parallel, and it's up to you
# to serialize the processing per connection.
#
# The IO reactor is completely protocol agnostic, and it's up to you to
# create objects that can interpret the bytes received from remote hosts,
# and to send the correct commands back. The way this works is that when you
Expand Down Expand Up @@ -89,9 +104,12 @@ class IoReactor

# Initializes a new IO reactor.
#
# @param options [Hash] only used to inject behaviour during tests
# @param options [Hash]
# @option options [#submit] :thread_pool (nil) a thread pool which will
# be used for (some) callbacks
def initialize(options={})
@options = options
@options = options.dup
@thread_pool = options.delete(:thread_pool) || NULL_THREAD_POOL
@clock = options[:clock] || Time
@state = PENDING_STATE
@error_listeners = []
Expand Down Expand Up @@ -238,14 +256,14 @@ def connect(host, port, options=nil, &block)
timeout = options[:timeout] || 5
ssl = options[:ssl]
end
connection = Connection.new(host, port, timeout, @unblocker, @clock)
connection = Connection.new(host, port, timeout, @unblocker, @thread_pool, @clock)
f = connection.connect
@io_loop.add_socket(connection)
@unblocker.unblock if running?
if ssl
f = f.flat_map do
ssl_context = ssl == true ? nil : ssl
upgraded_connection = SslConnection.new(host, port, connection.to_io, @unblocker, ssl_context)
upgraded_connection = SslConnection.new(host, port, connection.to_io, @unblocker, @thread_pool, ssl_context)
ff = upgraded_connection.connect
@io_loop.remove_socket(connection)
@io_loop.add_socket(upgraded_connection)
Expand Down Expand Up @@ -320,9 +338,9 @@ def bind(host, port, options=nil, &block)
ssl_context = options[:ssl]
end
if ssl_context
server = SslAcceptor.new(host, port, backlog, @unblocker, self, ssl_context)
server = SslAcceptor.new(host, port, backlog, @unblocker, @thread_pool, self, ssl_context)
else
server = Acceptor.new(host, port, backlog, @unblocker, self)
server = Acceptor.new(host, port, backlog, @unblocker, @thread_pool, self)
end
f = server.bind
@io_loop.add_socket(server)
Expand Down
4 changes: 2 additions & 2 deletions lib/ione/io/server_connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ module Io
# @since v1.1.0
class ServerConnection < BaseConnection
# @private
def initialize(socket, host, port, unblocker)
super(host, port, unblocker)
def initialize(socket, host, port, unblocker, thread_pool)
super(host, port, unblocker, thread_pool)
@io = socket
@state = CONNECTED_STATE
end
Expand Down
6 changes: 3 additions & 3 deletions lib/ione/io/ssl_acceptor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ module Ione
module Io
# @private
class SslAcceptor < Acceptor
def initialize(host, port, backlog, unblocker, reactor, ssl_context, socket_impl=nil, ssl_socket_impl=nil)
super(host, port, backlog, unblocker, reactor, socket_impl)
def initialize(host, port, backlog, unblocker, thread_pool, reactor, ssl_context, socket_impl=nil, ssl_socket_impl=nil)
super(host, port, backlog, unblocker, thread_pool, reactor, socket_impl)
@ssl_context = ssl_context
@ssl_socket_impl = ssl_socket_impl
end

def read
client_socket, host, port = accept
connection = SslServerConnection.new(client_socket, host, port, @unblocker, @ssl_context, method(:notify_accept_listeners), @ssl_socket_impl)
connection = SslServerConnection.new(client_socket, host, port, @unblocker, @thread_pool, @ssl_context, method(:notify_accept_listeners), @ssl_socket_impl)
@reactor.accept(connection)
end
end
Expand Down
4 changes: 2 additions & 2 deletions lib/ione/io/ssl_connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ module Ione
module Io
# @private
class SslConnection < BaseConnection
def initialize(host, port, io, unblocker, ssl_context=nil, socket_impl=OpenSSL::SSL::SSLSocket)
super(host, port, unblocker)
def initialize(host, port, io, unblocker, thread_pool, ssl_context=nil, socket_impl=OpenSSL::SSL::SSLSocket)
super(host, port, unblocker, thread_pool)
@socket_impl = socket_impl
@ssl_context = ssl_context
@raw_io = io
Expand Down
4 changes: 2 additions & 2 deletions lib/ione/io/ssl_server_connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ class SslServerConnection < ServerConnection
ACCEPTING_STATE = 0
ESTABLISHED_STATE = 1

def initialize(socket, host, port, unblocker, ssl_context, accept_callback, ssl_socket_impl=nil)
super(socket, host, port, unblocker)
def initialize(socket, host, port, unblocker, thread_pool, ssl_context, accept_callback, ssl_socket_impl=nil)
super(socket, host, port, unblocker, thread_pool)
@ssl_context = ssl_context
@accept_callback = accept_callback
@ssl_socket_impl = ssl_socket_impl || OpenSSL::SSL::SSLSocket
Expand Down
18 changes: 18 additions & 0 deletions lib/ione/thread_pool.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# encoding: utf-8

module Ione
# A null implementation of a thread pool whose {#submit} calls the given block
# immediately and returns a future resolved with its value.
#
# @private
class NullThreadPool
# @return [Ione::Future] a future that resolves to the value of the given block
def submit(&task)
Future.resolved(task.call)
rescue => e
Future.failed(e)
end
end

NULL_THREAD_POOL = NullThreadPool.new
end
21 changes: 20 additions & 1 deletion spec/ione/io/acceptor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ module Ione
module Io
describe Acceptor do
let :acceptor do
described_class.new('example.com', 4321, backlog, unblocker, reactor, socket_impl)
described_class.new('example.com', 4321, backlog, unblocker, thread_pool, reactor, socket_impl)
end

let :backlog do
Expand All @@ -18,6 +18,10 @@ module Io
double(:unblocker)
end

let :thread_pool do
FakeThreadPool.new(true)
end

let :reactor do
double(:reactor)
end
Expand Down Expand Up @@ -202,6 +206,21 @@ module Io
received_connection2.port.should == 3333
end

it 'calls the accept listeners in the provided thread pool' do
thread_pool.auto_run = false
called1 = false
called2 = false
acceptor.on_accept { |c| called1 = true }
acceptor.on_accept { |c| called2 = true }
acceptor.bind
acceptor.read
called1.should be_false
called2.should be_false
thread_pool.run_all
called1.should be_true
called2.should be_true
end

it 'ignores exceptions raised by the connection callback' do
called = false
acceptor.on_accept { |c| raise 'bork!' }
Expand Down
11 changes: 11 additions & 0 deletions spec/ione/io/connection_common.rb
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,17 @@
data.should == 'foo bar'
end

it 'calls the data listener in the provided thread pool' do
thread_pool.auto_run = false
socket.should_receive(:read_nonblock).with(instance_of(Fixnum)).and_return('foo bar')
data = nil
handler.on_data { |d| data = d }
handler.read
data.should be_nil
thread_pool.run_all
data.should eq('foo bar')
end

context 'when #read_nonblock raises an error' do
before do
socket.stub(:close)
Expand Down
Loading