Skip to content

Commit

Permalink
Fix TCP Stream Handler State Leak (#446)
Browse files Browse the repository at this point in the history
* Fix TCP Stream Handler State Leak

TCP stream handler state was never garbage collected on disconnect
resulting in memory exhaustion after sufficient TCP life cycles.

The following changes have been made to enable this garbage collection:

* source/tcp now creates a special registration and token which
corresponds to all stream handler events.

* src/thread has been extended with a new `spawn2` function which operates
as `spawn`, but takes an additional `mio::SetReadiness` which is flagged
readable after the given closure returns control.

* thread::Stoppable has been extended to include a new `ready` function
which returns true when joining the given thread is guaranteed not block.
`ready` always returns false on threads created with `spawn`.

* Introduce ThreadPool Abstraction

New abstraction captures notion of a thread pool, wrapping existing
thread abstractions such that threads can by dynamically
spawned, tracked, and garbage collected.  Mutexes and Arcs are leveraged
to ensure thread safety.

* Guard Against None Values in join_ready.

* Add Missing Count Functions to TokenSlab.

Changes were missed in previous commits.

* Remove StreamHandler.run and Clarify Comments.

StreamHandler.run was no longer useful as ThreadPool encapsulates
signaling when thread completion.
  • Loading branch information
pulltab authored Jul 2, 2018
1 parent 6945c2c commit b770703
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 40 deletions.
87 changes: 57 additions & 30 deletions src/source/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use std::io::ErrorKind;
use std::marker::PhantomData;
use std::net::ToSocketAddrs;
use thread;
use thread::Stoppable;
use util;

/// Configured for the `metric::Telemetry` source.
Expand Down Expand Up @@ -50,7 +49,9 @@ pub trait TCPStreamHandler: 'static + Default + Clone + Sync + Send {
/// State for a TCP backed source.
pub struct TCP<H> {
listeners: util::TokenSlab<mio::net::TcpListener>,
handlers: Vec<thread::ThreadHandle>,
stream_events: mio::Registration,
stream_events_token: mio::Token,
handlers: thread::ThreadPool,
phantom: PhantomData<H>,
}

Expand All @@ -60,6 +61,11 @@ where
{
/// Constructs and starts a new TCP source.
fn init(config: TCPConfig) -> Self {
// Create registrations and for all TCP interfaces and stream handlers.
//
// Note - Due to restrictions in mio, we must construct these registrations
// here as we are assuming this function is called directly from the main
// process. Registrations must be bound to a mio poller by the subordiante thread.
let addrs = (config.host.as_str(), config.port).to_socket_addrs();
let mut listeners = util::TokenSlab::<mio::net::TcpListener>::new();
match addrs {
Expand All @@ -83,9 +89,14 @@ where
}
};

let (stream_events, stream_events_readiness) = mio::Registration::new2();
let stream_events_token = mio::Token::from(listeners.count());
let thread_pool = thread::ThreadPool::new(Some(stream_events_readiness));
TCP {
listeners: listeners,
handlers: Vec::new(),
stream_events: stream_events,
stream_events_token: stream_events_token,
handlers: thread_pool,
phantom: PhantomData,
}
}
Expand All @@ -101,7 +112,16 @@ where
) {
error!("Failed to register {:?} - {:?}!", listener, e);
}
}
};

if let Err(e) = poller.register(
&self.stream_events,
self.stream_events_token,
mio::Ready::readable(),
mio::PollOpt::edge(),
) {
error!("Failed to register stream events - {:?}!", e);
};

self.accept_loop(chans, &poller)
}
Expand All @@ -119,21 +139,27 @@ where
Ok(_num_events) => for event in events {
match event.token() {
constants::SYSTEM => {
for handler in self.handlers {
handler.shutdown();
}

self.handlers.shutdown();
util::send(&mut chans, metric::Event::Shutdown);
return;
}
listener_token => {
if let Err(e) =
self.spawn_stream_handlers(&chans, listener_token)
{
let listener = &self.listeners[listener_token];
error!("Failed to spawn stream handlers! {:?}", e);
error!("Deregistering listener for {:?} due to unrecoverable error!", *listener);
let _ = poll.deregister(listener);
if listener_token == self.stream_events_token {
// Mio event corresponding to a StreamHandler.
// Currently, the only StreamHandler event flags
// the StreamHandler as terminated. Cleanup state.
let ready = self.handlers.join_ready();
trace!("Removed {:?} terminated stream handlers.", ready.len());

} else {
if let Err(e) =
self.spawn_stream_handlers(&chans, listener_token)
{
let listener = &self.listeners[listener_token];
error!("Failed to spawn stream handlers! {:?}", e);
error!("Deregistering listener for {:?} due to unrecoverable error!", *listener);
let _ = poll.deregister(listener);
}
}
}
}
Expand All @@ -151,23 +177,24 @@ where
loop {
match listener.accept() {
Ok((stream, _addr)) => {
// Actually spawn the stream handler
let rchans = chans.to_owned();
let new_stream = thread::spawn(move |poller| {
// Note - Stream handlers are allowed to crash without
// compromising Cernan's ability to gracefully shutdown.
poller
.register(
&stream,
mio::Token(0),
mio::Ready::readable(),
mio::PollOpt::edge(),
)
.unwrap();
self.handlers.spawn(
move |poller| {
// Note - Stream handlers are allowed to crash without
// compromising Cernan's ability to gracefully shutdown.
poller
.register(
&stream,
mio::Token(0),
mio::Ready::readable(),
mio::PollOpt::edge(),
)
.unwrap();

let mut handler = H::new();
handler.handle_stream(rchans, &poller, stream);
});
self.handlers.push(new_stream);
let mut handler = H::new();
handler.handle_stream(rchans, &poller, stream);
});
}
Err(e) => match e.kind() {
ErrorKind::ConnectionAborted
Expand Down
108 changes: 98 additions & 10 deletions src/thread.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
//! Mio enabled threading library.
use constants;
use mio;
use std::option;
use std::thread;
use std::sync;
use util;

/// Event polling structure. Alias of `mio::Poll`.
pub type Poll = mio::Poll;
/// Events buffer type. Alias of `mio::Events`.
pub type Events = mio::Events;


/// Mio enabled thread state.
pub struct ThreadHandle {
/// JoinHandle for the executing thread.
pub handle: thread::JoinHandle<()>,

/// Readiness signal used to notify the given thread when an event is ready
/// to be consumed on the SYSTEM channel.
readiness: mio::SetReadiness,
shutdown_event: mio::SetReadiness,
}

/// Trait for stoppable processes.
Expand All @@ -38,7 +42,7 @@ impl Stoppable for ThreadHandle {
/// Note - It is the responsability of the developer to ensure
/// that thread logic polls for events occuring on the SYSTEM token.
fn shutdown(self) {
self.readiness
self.shutdown_event
.set_readiness(mio::Ready::readable())
.expect("Failed to notify child thread of shutdown!");
self.join();
Expand All @@ -50,23 +54,107 @@ pub fn spawn<F>(f: F) -> ThreadHandle
where
F: Send + 'static + FnOnce(mio::Poll) -> (),
{
let poller = mio::Poll::new().unwrap();
let (registration, readiness) = mio::Registration::new2();

let child_poller = mio::Poll::new().unwrap();
let (shutdown_event_registration, shutdown_event) = mio::Registration::new2();
ThreadHandle {
readiness: readiness,

shutdown_event: shutdown_event,
handle: thread::spawn(move || {
poller
child_poller
.register(
&registration,
&shutdown_event_registration,
constants::SYSTEM,
mio::Ready::readable(),
mio::PollOpt::edge(),
)
.expect("Failed to register system pipe");

f(poller);
f(child_poller);
}),
}
}

/// mio Eventable ThreadPool.
pub struct ThreadPool {
/// thread_id counter.
thread_id: usize,
/// Listing of all the joinable threads in the pool.
joinable: sync::Arc<sync::Mutex<Vec<usize>>>,
/// Mapping of thread_id to ThreadHandle.
threads: util::HashMap<usize, ThreadHandle>,
/// Mio readiness flagging when threads finish execution.
thread_event_readiness: option::Option<mio::SetReadiness>,
}


impl ThreadPool {

/// Construct a new ThreadPool.
pub fn new(thread_events_readiness: option::Option<mio::SetReadiness>) -> Self {
ThreadPool {
thread_id: 0,
joinable: sync::Arc::new(sync::Mutex::new(Vec::new())),
thread_event_readiness: thread_events_readiness,
threads: util::HashMap::default(),
}
}

/// Spawn a new thread and assign it to the pool.
pub fn spawn<F>(&mut self, f: F) -> usize
where
F: Send + 'static + FnOnce(mio::Poll) -> (),
{
let id = self.next_thread_id();
let joinable_arc = self.joinable.clone();
let thread_event_readiness = self.thread_event_readiness.clone();
let handler =
spawn(move |poller| {
f(poller);

let mut joinable = joinable_arc.lock().unwrap();
joinable.push(id);

if let Some(readiness) = thread_event_readiness {
readiness
.set_readiness(mio::Ready::readable())
.expect("Failed to flag readiness for ThreadPool event!");
}
});
self.threads.insert(id, handler);
id
}

fn next_thread_id(&mut self) -> usize
{
let thread_id = self.thread_id;
self.thread_id += 1;
thread_id
}

/// Block on completion of all executing threads.
pub fn join(mut self) -> Vec<usize>
{
self.threads.drain().for_each(|(_, h)| h.join());
self.join_ready()
}

/// Join all completed threads.
pub fn join_ready(&mut self) -> Vec<usize>
{
let mut joinable = self.joinable.lock().unwrap();
let mut joined = Vec::new();
while let Some(id) = joinable.pop() {
if let Some(handle) = self.threads.remove(&id) {
handle.join();
}
joined.push(id);
}
joined
}

/// Serially signal shutdown and block for completion of all threads.
pub fn shutdown(mut self) -> Vec<usize>
{
self.threads.drain().for_each(|(_, h)| h.shutdown());
self.join_ready()
}
}
8 changes: 8 additions & 0 deletions src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ fn token_to_idx(token: &mio::Token) -> usize {

/// Wrapper around Slab
pub struct TokenSlab<E: mio::Evented> {
token_count: usize,
tokens: slab::Slab<E>,
}

Expand Down Expand Up @@ -125,6 +126,7 @@ impl<E: mio::Evented> TokenSlab<E> {
/// of constants::SYSTEM.
pub fn new() -> TokenSlab<E> {
TokenSlab {
token_count: 0,
tokens: slab::Slab::with_capacity(token_to_idx(&constants::SYSTEM)),
}
}
Expand All @@ -134,10 +136,16 @@ impl<E: mio::Evented> TokenSlab<E> {
self.tokens.iter()
}

/// Return the number of tokens stored in the TokenSlab.
pub fn count(&self) -> usize {
self.token_count
}

/// Inserts a new Evented into the slab, returning a mio::Token
/// corresponding to the index of the newly inserted type.
pub fn insert(&mut self, thing: E) -> mio::Token {
let idx = self.tokens.insert(thing);
self.token_count += 1;
mio::Token::from(idx)
}
}

0 comments on commit b770703

Please sign in to comment.