diff --git a/Cargo.lock b/Cargo.lock index f39624b317..ff10f534e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2566,6 +2566,7 @@ dependencies = [ "pin-project-lite", "proto-gazette", "proto-grpc", + "rand 0.8.5", "reqwest", "serde_json", "simd-doc", diff --git a/crates/agent/src/proxy_connectors.rs b/crates/agent/src/proxy_connectors.rs index 642c6f9dad..f9473bc6a7 100644 --- a/crates/agent/src/proxy_connectors.rs +++ b/crates/agent/src/proxy_connectors.rs @@ -201,7 +201,7 @@ impl ProxyConnectors { let mut proxy_client = proto_grpc::runtime::connector_proxy_client::ConnectorProxyClient::with_interceptor( - gazette::dial_channel(reactor_address).await?, + gazette::dial_channel(reactor_address)?, metadata.clone(), ); let mut proxy_responses = proxy_client @@ -238,7 +238,7 @@ impl ProxyConnectors { }; Ok(( - gazette::dial_channel(&address).await?, + gazette::dial_channel(&address)?, metadata, (cancel_tx, log_loop), )) diff --git a/crates/gazette/Cargo.toml b/crates/gazette/Cargo.toml index 94a4870f6d..1045a9976e 100644 --- a/crates/gazette/Cargo.toml +++ b/crates/gazette/Cargo.toml @@ -20,6 +20,7 @@ futures = { workspace = true } futures-core = { workspace = true } hyper-util = { workspace = true } jsonwebtoken = { workspace = true } +rand = { workspace = true } reqwest = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } diff --git a/crates/gazette/src/journal/list.rs b/crates/gazette/src/journal/list.rs index 142bcce412..dbfdc43b18 100644 --- a/crates/gazette/src/journal/list.rs +++ b/crates/gazette/src/journal/list.rs @@ -51,7 +51,7 @@ impl Client { router: &crate::Router, req: &broker::ListRequest, ) -> crate::Result> { - let mut client = self.into_sub(router.route(None, false, &self.default).await?); + let mut client = self.into_sub(router.route(None, false, &self.default)?); Ok(client.list(req.clone()).await?.into_inner()) } } diff --git a/crates/gazette/src/journal/mod.rs b/crates/gazette/src/journal/mod.rs index db47ad7c1d..1f800ef7b2 100644 --- a/crates/gazette/src/journal/mod.rs +++ b/crates/gazette/src/journal/mod.rs @@ -50,7 +50,7 @@ impl Client { /// Invoke the Gazette journal Apply API. pub async fn apply(&self, req: broker::ApplyRequest) -> crate::Result { - let mut client = self.into_sub(self.router.route(None, false, &self.default).await?); + let mut client = self.into_sub(self.router.route(None, false, &self.default)?); let resp = client .apply(req) @@ -66,7 +66,7 @@ impl Client { &self, req: broker::FragmentsRequest, ) -> crate::Result { - let mut client = self.into_sub(self.router.route(None, false, &self.default).await?); + let mut client = self.into_sub(self.router.route(None, false, &self.default)?); let resp = client .list_fragments(req) @@ -77,11 +77,13 @@ impl Client { check_ok(resp.status(), resp) } - fn into_sub(&self, channel: Channel) -> SubClient { + fn into_sub(&self, (channel, _local): (Channel, bool)) -> SubClient { proto_grpc::broker::journal_client::JournalClient::with_interceptor( channel, self.metadata.clone(), ) + // TODO(johnny): Use `_local` to selectively enable LZ4 compression + // when traversing a non-local zone. } } diff --git a/crates/gazette/src/journal/read.rs b/crates/gazette/src/journal/read.rs index ef50199fff..8c9d4e2bf5 100644 --- a/crates/gazette/src/journal/read.rs +++ b/crates/gazette/src/journal/read.rs @@ -36,6 +36,8 @@ impl Client { // Surface error to the caller, which can either drop us // or poll us again to retry. () = co.yield_(Err(err)).await; + // Restart route discovery. + req.header = None; } } } @@ -49,7 +51,7 @@ impl Client { write_head: &mut i64, ) -> crate::Result<()> { let route = req.header.as_ref().and_then(|hdr| hdr.route.as_ref()); - let mut client = self.into_sub(self.router.route(route, false, &self.default).await?); + let mut client = self.into_sub(self.router.route(route, false, &self.default)?); // Fetch metadata first before we start the actual read. req.metadata_only = true; @@ -89,7 +91,9 @@ impl Client { } match (resp.status(), &resp.fragment, resp.content.is_empty()) { // Metadata response telling us of a new fragment being read. - (broker::Status::Ok, Some(_fragment), true) => { + (broker::Status::Ok, Some(fragment), true) => { + tracing::trace!(fragment=?ops::DebugJson(fragment), "read fragment metadata"); + // Offset jumps happen if content is removed from the middle of a journal, // or when reading from the journal head (offset -1). if req.offset != resp.offset { @@ -138,6 +142,8 @@ async fn read_fragment_url( .and_then(reqwest::Response::error_for_status) .map_err(Error::FetchFragment)?; + tracing::trace!(fragment=?ops::DebugJson(&fragment), "started direct fragment read"); + let raw_reader = response // Map into a Stream>. .bytes_stream() diff --git a/crates/gazette/src/lib.rs b/crates/gazette/src/lib.rs index 5ed060f0b4..db36bb6910 100644 --- a/crates/gazette/src/lib.rs +++ b/crates/gazette/src/lib.rs @@ -77,13 +77,18 @@ impl Error { pub type Result = std::result::Result; -/// Dial a gRPC endpoint with opinionated defaults and +/// Lazily dial a gRPC endpoint with opinionated defaults and /// support for TLS and Unix Domain Sockets. -pub async fn dial_channel(endpoint: &str) -> Result { +pub fn dial_channel(endpoint: &str) -> Result { use std::time::Duration; let ep = tonic::transport::Endpoint::from_shared(endpoint.to_string()) .map_err(|_err| Error::InvalidEndpoint(endpoint.to_string()))? + // Note this connect_timeout accounts only for TCP connection time and + // does not apply to time required for TLS or HTTP/2 transport start, + // which can block indefinitely if the server is bound but not listening. + // Callers MUST implement per-RPC timeouts if that's important. + // This timeout is only a best-effort sanity check. .connect_timeout(Duration::from_secs(5)) .keep_alive_timeout(Duration::from_secs(120)) .keep_alive_while_idle(true) @@ -93,17 +98,15 @@ pub async fn dial_channel(endpoint: &str) -> Result { .assume_http2(true), )?; - let channel = match ep.uri().scheme_str() { - Some("unix") => { - ep.connect_with_connector(tower::util::service_fn(|uri: tonic::transport::Uri| { - connect_unix(uri) - })) - .await? - } - Some("https" | "http") => ep.connect().await?, + let channel = + match ep.uri().scheme_str() { + Some("unix") => ep.connect_with_connector_lazy(tower::util::service_fn( + |uri: tonic::transport::Uri| connect_unix(uri), + )), + Some("https" | "http") => ep.connect_lazy(), - _ => return Err(Error::InvalidEndpoint(endpoint.to_string())), - }; + _ => return Err(Error::InvalidEndpoint(endpoint.to_string())), + }; Ok(channel) } diff --git a/crates/gazette/src/router.rs b/crates/gazette/src/router.rs index 5564f4d3df..3125491a59 100644 --- a/crates/gazette/src/router.rs +++ b/crates/gazette/src/router.rs @@ -5,13 +5,6 @@ use std::collections::HashMap; use std::sync::Arc; use tonic::transport::Channel; -// DialState represents a Channel which may be: -// - Ready (if Some) -// - Currently being dialed (if locked) -// - Neither (None and not locked). -// Ready channels also track their number of uses since the last sweep. -type DialState = Arc>>; - /// Router facilitates dispatching requests to designated members of /// a dynamic serving topology, by maintaining ready Channels to /// member endpoints which may be dynamically discovered over time. @@ -20,7 +13,7 @@ pub struct Router { inner: Arc, } struct Inner { - states: std::sync::Mutex>, + states: std::sync::Mutex>, zone: String, } @@ -39,60 +32,39 @@ impl Router { } /// Map an optional broker::Route and indication of whether the "primary" - /// member is required into a ready Channel for use in the dispatch of an RPC. + /// member is required into a ready Channel for use in the dispatch of an RPC, + /// and a boolean which is set if and only if the Channel is in our local zone. /// - /// route() will prefer to send requests to a ready member Channel if possible, - /// or will dial new Channels if required by the `route` and `primary` requirement. - pub async fn route( + /// route() dial new Channels as required by the `route` and `primary` requirement. + /// Use sweep() to periodically clean up Channels which are no longer in use. + pub fn route( &self, route: Option<&broker::Route>, primary: bool, default: &MemberId, - ) -> Result { - let (index, state) = self.pick(route, primary, &default); + ) -> Result<(Channel, bool), Error> { + let index = pick(route, primary, &self.inner.zone); - // Acquire MemberId-specific, async-aware lock. - let mut state = state.lock().await; + let id = match index { + Some(index) => &route.unwrap().members[index], + None => default, + }; + let mut states = self.inner.states.lock().unwrap(); - // Fast path: client is dialed and ready. - if let Some((ref client, uses)) = &mut *state { - *uses += 1; - return Ok(client.clone()); + // Is the channel already started? + if let Some((channel, mark)) = states.get_mut(id) { + *mark = true; + return Ok((channel.clone(), id.zone == self.inner.zone)); } - // Slow path: start dialing the endpoint. + // Start dialing the endpoint. let channel = super::dial_channel(match index { Some(index) => &route.unwrap().endpoints[index], None => &default.suffix, - }) - .await?; + })?; + states.insert(id.clone(), (channel.clone(), true)); - *state = Some((channel.clone(), 1)); - - Ok(channel) - } - - fn pick( - &self, - route: Option<&broker::Route>, - primary: bool, - default: &MemberId, - ) -> (Option, DialState) { - // Acquire non-async lock which *cannot* be held across an await point. - let mut states = self.inner.states.lock().unwrap(); - let index = pick(route, primary, &self.inner.zone, &states); - - let id = match index { - Some(index) => &route.unwrap().members[index], - None => default, - }; - - let state = match states.get(id) { - Some(value) => value.clone(), - None => states.entry(id.clone()).or_default().clone(), - }; - - (index, state) + Ok((channel, id.zone == self.inner.zone)) } // Identify Channels which have not been used since the preceding sweep, and close them. @@ -101,32 +73,19 @@ impl Router { pub fn sweep(&self) { let mut states = self.inner.states.lock().unwrap(); - states.retain(|id, state| { - // Retain entries which are currently connecting. - let Some(mut state) = state.try_lock() else { - return true; - }; - // Drop entries which are not connected. - let Some((_client, uses)) = &mut *state else { - return false; - }; + states.retain(|id, (_channel, mark)| { // Drop entries which have not been used since the last sweep. - if *uses == 0 { + if !*mark { tracing::debug!(?id, "dropping idle member connection"); return false; } - *uses = 0; // Mark for next sweep. + *mark = false; // Mark for next sweep. true }); } } -fn pick( - route: Option<&broker::Route>, - primary: bool, - zone: &str, - states: &HashMap, -) -> Option { +fn pick(route: Option<&broker::Route>, primary: bool, zone: &str) -> Option { let default_route = broker::Route::default(); let route = route.unwrap_or(&default_route); @@ -136,28 +95,14 @@ fn pick( .zip(route.endpoints.iter()) .enumerate() .max_by_key(|(index, (id, _endpoint))| { - let connected = if let Some(state) = states.get(id) { - if let Some(state) = state.try_lock() { - if let Some(_conn) = state.as_ref() { - true // Transport is ready. - } else { - false // Transport is not ready and no task is starting it. - } - } else { - true // Another task has started this transport. - } - } else { - false // Transport has not been started. - }; - // Member selection criteria: ( // If we want the primary, then prefer the primary. primary && *index as i32 == route.primary, // Prefer members in our same zone. zone == id.zone, - // Prefer members which are already connected. - connected, + // Randomize over members to balance load. + rand::random::(), ) }) .map(|(index, _)| index) diff --git a/crates/gazette/src/shard/mod.rs b/crates/gazette/src/shard/mod.rs index aa54058f77..1cb1df43f8 100644 --- a/crates/gazette/src/shard/mod.rs +++ b/crates/gazette/src/shard/mod.rs @@ -44,7 +44,7 @@ impl Client { &self, req: consumer::ListRequest, ) -> Result { - let mut client = self.into_sub(self.router.route(None, false, &self.default).await?); + let mut client = self.into_sub(self.router.route(None, false, &self.default)?); let resp = client .list(req) @@ -60,7 +60,7 @@ impl Client { &self, req: consumer::ApplyRequest, ) -> Result { - let mut client = self.into_sub(self.router.route(None, false, &self.default).await?); + let mut client = self.into_sub(self.router.route(None, false, &self.default)?); let resp = client .apply(req) @@ -76,7 +76,7 @@ impl Client { &self, req: consumer::UnassignRequest, ) -> Result { - let mut client = self.into_sub(self.router.route(None, false, &self.default).await?); + let mut client = self.into_sub(self.router.route(None, false, &self.default)?); let resp = client .unassign(req) @@ -87,7 +87,7 @@ impl Client { check_ok(resp.status(), resp) } - fn into_sub(&self, channel: Channel) -> SubClient { + fn into_sub(&self, (channel, _local): (Channel, bool)) -> SubClient { proto_grpc::consumer::shard_client::ShardClient::with_interceptor( channel, self.metadata.clone(),