Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Feature/double hbone #1429

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 33 additions & 17 deletions src/proxy/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,40 +219,56 @@ impl OutboundConnection {
req: &Request,
connection_stats: &ConnectionResult,
) -> Result<(), Error> {
// Outer HBONE
let upgraded = Box::pin(self.send_hbone_request(remote_addr, req)).await?;

// Inner HBONE
let upgraded = TokioH2Stream::new(upgraded);
// TODO: dst should take a hostname? and upstream_sans currently contains E/W Gateway certs
let inner_workload = pool::WorkloadKey {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will reorganize later.

src_id: req.source.identity(),
dst_id: req.upstream_sans.clone(),
src: remote_addr.ip(),
dst: req.actual_destination,
};
let request = self.create_hbone_request(remote_addr, req);
let workload_key = &inner_workload;

let (_conn_client, inner_upgraded, drain_tx, driver_task) = self
.pool
.send_request_unpooled(upgraded, &inner_workload, request)
// To shut down inner HTTP2 connection
let (drain_tx, drain_rx) = tokio::sync::watch::channel(false);
let key = workload_key.clone();
let dest = rustls::pki_types::ServerName::IpAddress(key.dst.ip().into());
let cert = self
.pi
.local_workload_information
.fetch_certificate()
.await?;
let inner_upgraded = inner_upgraded?;
// FIXME The following isn't great because it will also contain the identity of the E/W gateways
let connector = cert.outbound_connector(req.upstream_sans.clone())?;

// Do actual IO as late as possible
// Outer HBONE
let upgraded = Box::pin(self.send_hbone_request(remote_addr, req)).await?;
// Wrap upgraded to implement tokio's Async{Write,Read}
let upgraded = TokioH2Stream::new(upgraded);
let tls_stream = connector.connect(upgraded, dest).await?;

let (sender, driver_task) =
super::h2::client::spawn_connection(self.pi.cfg.clone(), tls_stream, drain_rx).await?;
let mut connection = super::pool::ConnClient {
sender,
wl_key: key,
};
let inner_upgraded = connection.sender.send_request(request).await?;
let res = copy::copy_bidirectional(
copy::TcpStreamSplitter(stream),
inner_upgraded,
connection_stats,
)
.await;

// This always drops ungracefully
// drop(conn_client);
// tokio::time::sleep(std::time::Duration::from_secs(1)).await;
// drain_tx.send(true).unwrap();
// tokio::time::sleep(std::time::Duration::from_secs(1)).await;
drain_tx.send(true).unwrap();
let _ = drain_tx.send(true);
let _ = driver_task.await;
// this sleep is important, so we have a race condition somewhere
// tokio::time::sleep(std::time::Duration::from_secs(1)).await;
// Here, there is an implicit, drop(conn_client).
// Its really important that this happens AFTER driver_task finishes.
// Otherwise, TLS connections do not terminate gracefully.
//
// drop(conn_client);
res
}

Expand Down
23 changes: 17 additions & 6 deletions src/proxy/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ struct PoolState {
// This is merely a counter to track the overall number of conns this pool spawns
// to ensure we get unique poolkeys-per-new-conn, it is not a limit
pool_global_conn_count: AtomicI32,
pub spawner: ConnSpawner,
spawner: ConnSpawner,
}

struct ConnSpawner {
Expand Down Expand Up @@ -413,17 +413,28 @@ impl WorkloadHBONEPool {
Error,
> {
let (tx, rx) = tokio::sync::watch::channel(false);
let (mut connection, driver_task) = self
let key = workload_key.clone();
let dest = rustls::pki_types::ServerName::IpAddress(key.dst.ip().into());
let cert = self
.state
.spawner
.new_unpooled_conn(workload_key.clone(), stream, rx)
.local_workload
.fetch_certificate()
.await?;
let connector = cert.outbound_connector(vec![])?;
let tls_stream = connector.connect(stream, dest).await?;
let (sender, driver_drain) =
h2::client::spawn_connection(self.state.spawner.cfg.clone(), tls_stream, rx).await?;
let mut connection = ConnClient {
sender,
wl_key: key,
};

Ok((
connection.clone(),
connection.sender.send_request(request).await,
tx,
driver_task,
driver_drain,
))
}

Expand Down Expand Up @@ -568,9 +579,9 @@ impl WorkloadHBONEPool {
// A sort of faux-client, that represents a single checked-out 'request sender' which might
// send requests over some underlying stream using some underlying http/2 client
pub struct ConnClient {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixme

sender: H2ConnectClient,
pub sender: H2ConnectClient,
// A WL key may have many clients, but every client has no more than one WL key
wl_key: WorkloadKey, // the WL key associated with this client.
pub wl_key: WorkloadKey, // the WL key associated with this client.
}

impl ConnClient {
Expand Down