Skip to content

Commit

Permalink
gazette: make Router synchronous and lazy, and clear Read header
Browse files Browse the repository at this point in the history
Simplify Router by starting a Channel immediately, and instead emphasize
RPC timeouts rather than connection timeouts.

There are a variety of conditions where connection timeouts don't really
work, such as when a server has bound its port but is not actively
serving it. Currently this causes tonic to block indefinitely.

Also, Channel embeds a reconnection behavior if the transport is broken,
so it's already the case that Channels are connecting in the background
and we should expect to see corresponding delays.

So, have a single behavior for Channels: they start immediately but may
have connection delay in the background.

- Back out connection-readiness route selection in favor of random balancing.
- Distinguish local vs non-local Channels: in the future, we should use this
  to implement selective compression over non-local Channels.

Also, upon an error a Read stream must clear its ReadRequest Header.
The rationale is that the current Header could point to a Gazette broker
which has permanently stopped, and connections will time out repeatedly.
Clearing the Header causes route discovery to restart using
the base service address.
  • Loading branch information
jgraettinger committed Oct 29, 2024
1 parent e35d266 commit e0c6fb7
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 106 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions crates/agent/src/proxy_connectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ impl<L: runtime::LogHandler> ProxyConnectors<L> {

let mut proxy_client =
proto_grpc::runtime::connector_proxy_client::ConnectorProxyClient::with_interceptor(
gazette::dial_channel(reactor_address).await?,
gazette::dial_channel(reactor_address)?,
metadata.clone(),
);
let mut proxy_responses = proxy_client
Expand Down Expand Up @@ -238,7 +238,7 @@ impl<L: runtime::LogHandler> ProxyConnectors<L> {
};

Ok((
gazette::dial_channel(&address).await?,
gazette::dial_channel(&address)?,
metadata,
(cancel_tx, log_loop),
))
Expand Down
1 change: 1 addition & 0 deletions crates/gazette/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ futures = { workspace = true }
futures-core = { workspace = true }
hyper-util = { workspace = true }
jsonwebtoken = { workspace = true }
rand = { workspace = true }
reqwest = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion crates/gazette/src/journal/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl Client {
router: &crate::Router,
req: &broker::ListRequest,
) -> crate::Result<tonic::Streaming<broker::ListResponse>> {
let mut client = self.into_sub(router.route(None, false, &self.default).await?);
let mut client = self.into_sub(router.route(None, false, &self.default)?);
Ok(client.list(req.clone()).await?.into_inner())
}
}
Expand Down
8 changes: 5 additions & 3 deletions crates/gazette/src/journal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl Client {

/// Invoke the Gazette journal Apply API.
pub async fn apply(&self, req: broker::ApplyRequest) -> crate::Result<broker::ApplyResponse> {
let mut client = self.into_sub(self.router.route(None, false, &self.default).await?);
let mut client = self.into_sub(self.router.route(None, false, &self.default)?);

let resp = client
.apply(req)
Expand All @@ -66,7 +66,7 @@ impl Client {
&self,
req: broker::FragmentsRequest,
) -> crate::Result<broker::FragmentsResponse> {
let mut client = self.into_sub(self.router.route(None, false, &self.default).await?);
let mut client = self.into_sub(self.router.route(None, false, &self.default)?);

let resp = client
.list_fragments(req)
Expand All @@ -77,11 +77,13 @@ impl Client {
check_ok(resp.status(), resp)
}

fn into_sub(&self, channel: Channel) -> SubClient {
fn into_sub(&self, (channel, _local): (Channel, bool)) -> SubClient {
proto_grpc::broker::journal_client::JournalClient::with_interceptor(
channel,
self.metadata.clone(),
)
// TODO(johnny): Use `_local` to selectively enable LZ4 compression
// when traversing a non-local zone.
}
}

Expand Down
10 changes: 8 additions & 2 deletions crates/gazette/src/journal/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ impl Client {
// Surface error to the caller, which can either drop us
// or poll us again to retry.
() = co.yield_(Err(err)).await;
// Restart route discovery.
req.header = None;
}
}
}
Expand All @@ -49,7 +51,7 @@ impl Client {
write_head: &mut i64,
) -> crate::Result<()> {
let route = req.header.as_ref().and_then(|hdr| hdr.route.as_ref());
let mut client = self.into_sub(self.router.route(route, false, &self.default).await?);
let mut client = self.into_sub(self.router.route(route, false, &self.default)?);

// Fetch metadata first before we start the actual read.
req.metadata_only = true;
Expand Down Expand Up @@ -89,7 +91,9 @@ impl Client {
}
match (resp.status(), &resp.fragment, resp.content.is_empty()) {
// Metadata response telling us of a new fragment being read.
(broker::Status::Ok, Some(_fragment), true) => {
(broker::Status::Ok, Some(fragment), true) => {
tracing::trace!(fragment=?ops::DebugJson(fragment), "read fragment metadata");

// Offset jumps happen if content is removed from the middle of a journal,
// or when reading from the journal head (offset -1).
if req.offset != resp.offset {
Expand Down Expand Up @@ -138,6 +142,8 @@ async fn read_fragment_url(
.and_then(reqwest::Response::error_for_status)
.map_err(Error::FetchFragment)?;

tracing::trace!(fragment=?ops::DebugJson(&fragment), "started direct fragment read");

let raw_reader = response
// Map into a Stream<Item = Result<Bytes, _>>.
.bytes_stream()
Expand Down
27 changes: 15 additions & 12 deletions crates/gazette/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,18 @@ impl Error {

pub type Result<T> = std::result::Result<T, Error>;

/// Dial a gRPC endpoint with opinionated defaults and
/// Lazily dial a gRPC endpoint with opinionated defaults and
/// support for TLS and Unix Domain Sockets.
pub async fn dial_channel(endpoint: &str) -> Result<tonic::transport::Channel> {
pub fn dial_channel(endpoint: &str) -> Result<tonic::transport::Channel> {
use std::time::Duration;

let ep = tonic::transport::Endpoint::from_shared(endpoint.to_string())
.map_err(|_err| Error::InvalidEndpoint(endpoint.to_string()))?
// Note this connect_timeout accounts only for TCP connection time and
// does not apply to time required for TLS or HTTP/2 transport start,
// which can block indefinitely if the server is bound but not listening.
// Callers MUST implement per-RPC timeouts if that's important.
// This timeout is only a best-effort sanity check.
.connect_timeout(Duration::from_secs(5))
.keep_alive_timeout(Duration::from_secs(120))
.keep_alive_while_idle(true)
Expand All @@ -93,17 +98,15 @@ pub async fn dial_channel(endpoint: &str) -> Result<tonic::transport::Channel> {
.assume_http2(true),
)?;

let channel = match ep.uri().scheme_str() {
Some("unix") => {
ep.connect_with_connector(tower::util::service_fn(|uri: tonic::transport::Uri| {
connect_unix(uri)
}))
.await?
}
Some("https" | "http") => ep.connect().await?,
let channel =
match ep.uri().scheme_str() {
Some("unix") => ep.connect_with_connector_lazy(tower::util::service_fn(
|uri: tonic::transport::Uri| connect_unix(uri),
)),
Some("https" | "http") => ep.connect_lazy(),

_ => return Err(Error::InvalidEndpoint(endpoint.to_string())),
};
_ => return Err(Error::InvalidEndpoint(endpoint.to_string())),
};

Ok(channel)
}
Expand Down
109 changes: 27 additions & 82 deletions crates/gazette/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,6 @@ use std::collections::HashMap;
use std::sync::Arc;
use tonic::transport::Channel;

// DialState represents a Channel which may be:
// - Ready (if Some)
// - Currently being dialed (if locked)
// - Neither (None and not locked).
// Ready channels also track their number of uses since the last sweep.
type DialState = Arc<futures::lock::Mutex<Option<(Channel, usize)>>>;

/// Router facilitates dispatching requests to designated members of
/// a dynamic serving topology, by maintaining ready Channels to
/// member endpoints which may be dynamically discovered over time.
Expand All @@ -20,7 +13,7 @@ pub struct Router {
inner: Arc<Inner>,
}
struct Inner {
states: std::sync::Mutex<HashMap<MemberId, DialState>>,
states: std::sync::Mutex<HashMap<MemberId, (Channel, bool)>>,
zone: String,
}

Expand All @@ -39,60 +32,39 @@ impl Router {
}

/// Map an optional broker::Route and indication of whether the "primary"
/// member is required into a ready Channel for use in the dispatch of an RPC.
/// member is required into a ready Channel for use in the dispatch of an RPC,
/// and a boolean which is set if and only if the Channel is in our local zone.
///
/// route() will prefer to send requests to a ready member Channel if possible,
/// or will dial new Channels if required by the `route` and `primary` requirement.
pub async fn route(
/// route() dial new Channels as required by the `route` and `primary` requirement.
/// Use sweep() to periodically clean up Channels which are no longer in use.
pub fn route(
&self,
route: Option<&broker::Route>,
primary: bool,
default: &MemberId,
) -> Result<Channel, Error> {
let (index, state) = self.pick(route, primary, &default);
) -> Result<(Channel, bool), Error> {
let index = pick(route, primary, &self.inner.zone);

// Acquire MemberId-specific, async-aware lock.
let mut state = state.lock().await;
let id = match index {
Some(index) => &route.unwrap().members[index],
None => default,
};
let mut states = self.inner.states.lock().unwrap();

// Fast path: client is dialed and ready.
if let Some((ref client, uses)) = &mut *state {
*uses += 1;
return Ok(client.clone());
// Is the channel already started?
if let Some((channel, mark)) = states.get_mut(id) {
*mark = true;
return Ok((channel.clone(), id.zone == self.inner.zone));
}

// Slow path: start dialing the endpoint.
// Start dialing the endpoint.
let channel = super::dial_channel(match index {
Some(index) => &route.unwrap().endpoints[index],
None => &default.suffix,
})
.await?;
})?;
states.insert(id.clone(), (channel.clone(), true));

*state = Some((channel.clone(), 1));

Ok(channel)
}

fn pick(
&self,
route: Option<&broker::Route>,
primary: bool,
default: &MemberId,
) -> (Option<usize>, DialState) {
// Acquire non-async lock which *cannot* be held across an await point.
let mut states = self.inner.states.lock().unwrap();
let index = pick(route, primary, &self.inner.zone, &states);

let id = match index {
Some(index) => &route.unwrap().members[index],
None => default,
};

let state = match states.get(id) {
Some(value) => value.clone(),
None => states.entry(id.clone()).or_default().clone(),
};

(index, state)
Ok((channel, id.zone == self.inner.zone))
}

// Identify Channels which have not been used since the preceding sweep, and close them.
Expand All @@ -101,32 +73,19 @@ impl Router {
pub fn sweep(&self) {
let mut states = self.inner.states.lock().unwrap();

states.retain(|id, state| {
// Retain entries which are currently connecting.
let Some(mut state) = state.try_lock() else {
return true;
};
// Drop entries which are not connected.
let Some((_client, uses)) = &mut *state else {
return false;
};
states.retain(|id, (_channel, mark)| {
// Drop entries which have not been used since the last sweep.
if *uses == 0 {
if !*mark {
tracing::debug!(?id, "dropping idle member connection");
return false;
}
*uses = 0; // Mark for next sweep.
*mark = false; // Mark for next sweep.
true
});
}
}

fn pick(
route: Option<&broker::Route>,
primary: bool,
zone: &str,
states: &HashMap<MemberId, DialState>,
) -> Option<usize> {
fn pick(route: Option<&broker::Route>, primary: bool, zone: &str) -> Option<usize> {
let default_route = broker::Route::default();
let route = route.unwrap_or(&default_route);

Expand All @@ -136,28 +95,14 @@ fn pick(
.zip(route.endpoints.iter())
.enumerate()
.max_by_key(|(index, (id, _endpoint))| {
let connected = if let Some(state) = states.get(id) {
if let Some(state) = state.try_lock() {
if let Some(_conn) = state.as_ref() {
true // Transport is ready.
} else {
false // Transport is not ready and no task is starting it.
}
} else {
true // Another task has started this transport.
}
} else {
false // Transport has not been started.
};

// Member selection criteria:
(
// If we want the primary, then prefer the primary.
primary && *index as i32 == route.primary,
// Prefer members in our same zone.
zone == id.zone,
// Prefer members which are already connected.
connected,
// Randomize over members to balance load.
rand::random::<u8>(),
)
})
.map(|(index, _)| index)
Expand Down
8 changes: 4 additions & 4 deletions crates/gazette/src/shard/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl Client {
&self,
req: consumer::ListRequest,
) -> Result<consumer::ListResponse, crate::Error> {
let mut client = self.into_sub(self.router.route(None, false, &self.default).await?);
let mut client = self.into_sub(self.router.route(None, false, &self.default)?);

let resp = client
.list(req)
Expand All @@ -60,7 +60,7 @@ impl Client {
&self,
req: consumer::ApplyRequest,
) -> Result<consumer::ApplyResponse, crate::Error> {
let mut client = self.into_sub(self.router.route(None, false, &self.default).await?);
let mut client = self.into_sub(self.router.route(None, false, &self.default)?);

let resp = client
.apply(req)
Expand All @@ -76,7 +76,7 @@ impl Client {
&self,
req: consumer::UnassignRequest,
) -> Result<consumer::UnassignResponse, crate::Error> {
let mut client = self.into_sub(self.router.route(None, false, &self.default).await?);
let mut client = self.into_sub(self.router.route(None, false, &self.default)?);

let resp = client
.unassign(req)
Expand All @@ -87,7 +87,7 @@ impl Client {
check_ok(resp.status(), resp)
}

fn into_sub(&self, channel: Channel) -> SubClient {
fn into_sub(&self, (channel, _local): (Channel, bool)) -> SubClient {
proto_grpc::consumer::shard_client::ShardClient::with_interceptor(
channel,
self.metadata.clone(),
Expand Down

0 comments on commit e0c6fb7

Please sign in to comment.