Skip to content

Commit

Permalink
Implement locality aware load balancing (istio#868)
Browse files Browse the repository at this point in the history
* Implement locality aware load balancing

Fixes istio#862

* Update docs

* Add tests and pick better names

* fixes
  • Loading branch information
howardjohn authored Apr 4, 2024
1 parent 5cecb70 commit f918a76
Show file tree
Hide file tree
Showing 11 changed files with 481 additions and 41 deletions.
56 changes: 56 additions & 0 deletions proto/workload.proto
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,52 @@ message Service {
// Waypoint is the waypoint proxy for this service. When set, all incoming requests must go
// through the waypoint.
GatewayAddress waypoint = 7;

// Load balancing policy for selecting endpoints.
// Note: this applies only to connecting directly to the workload; when waypoints are used, the waypoint's load_balancing
// configuration is used.
LoadBalancing load_balancing = 8;
}

message LoadBalancing {
enum Scope {
UNSPECIFIED_SCOPE = 0;
// Prefer traffic in the same region.
REGION = 1;
// Prefer traffic in the same zone.
ZONE = 2;
// Prefer traffic in the same subzone.
SUBZONE = 3;
// Prefer traffic on the same node.
NODE = 4;
// Prefer traffic in the same cluster.
CLUSTER = 5;
// Prefer traffic in the same network.
NETWORK = 6;
}
enum Mode {
UNSPECIFIED_MODE = 0;

// In STRICT mode, only endpoints that meets all of the routing preferences will be considered.
// This can be used, for instance, to keep traffic ONLY within the same cluster/node/region.
// This should be used with caution, as it can result in all traffic being dropped if there is no matching endpoints,
// even if there are endpoints outside of the preferences.
STRICT = 1;
// In FAILOVER mode, endpoint selection will prefer endpoints that match all preferences, but failover to groups of endpoints
// that match less (or, eventually, none) preferences.
// For instance, with `[NETWORK, REGION, ZONE]`, we will send to:
// 1. Endpoints matching `[NETWORK, REGION, ZONE]`
// 2. Endpoints matching `[NETWORK, REGION]`
// 3. Endpoints matching `[NETWORK]`
// 4. Any endpoints
FAILOVER = 2;
}

// routing_preference defines what scopes we want to keep traffic within.
// The `mode` determines how these routing preferences are handled
repeated Scope routing_preference = 1;
// mode defines how we should handle the routing preferences.
Mode mode = 2;
}

// Workload represents a workload - an endpoint (or collection behind a hostname).
Expand Down Expand Up @@ -116,6 +162,7 @@ message Workload {
// a workload that backs a Kubernetes service will typically have only endpoints. A
// workload that backs a headless Kubernetes service, however, will have both
// addresses as well as a hostname used for direct access to the headless endpoint.
// TODO: support this field
string hostname = 21;

// Network represents the network this workload is on. This may be elided for the default network.
Expand Down Expand Up @@ -178,10 +225,19 @@ message Workload {
// The cluster ID that the workload instance belongs to
string cluster_id = 18;

// The Locality defines information about where a workload is geographically deployed
Locality locality = 24;

// Reservations for deleted fields.
reserved 15;
}

message Locality {
string region = 1;
string zone = 2;
string subzone = 3;
}

enum WorkloadStatus {
// Workload is healthy and ready to serve traffic.
HEALTHY = 0;
Expand Down
12 changes: 11 additions & 1 deletion src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,8 @@ mod tests {
use crate::xds::istio::security::StringMatch as XdsStringMatch;
use crate::xds::istio::workload::gateway_address::Destination as XdsDestination;
use crate::xds::istio::workload::GatewayAddress as XdsGatewayAddress;
use crate::xds::istio::workload::LoadBalancing as XdsLoadBalancing;
use crate::xds::istio::workload::Locality as XdsLocality;
use crate::xds::istio::workload::NetworkAddress as XdsNetworkAddress;
use crate::xds::istio::workload::Port as XdsPort;
use crate::xds::istio::workload::PortList as XdsPortList;
Expand Down Expand Up @@ -637,6 +639,11 @@ mod tests {
}],
},
)]),
locality: Some(XdsLocality {
region: "region".to_string(),
zone: "zone".to_string(),
subzone: "subezone".to_string(),
}),
// ..Default::default() // intentionally don't default. we want all fields populated
};

Expand All @@ -654,7 +661,10 @@ mod tests {
}],
subject_alt_names: vec!["SAN1".to_string(), "SAN2".to_string()],
waypoint: None,
// ..Default::default() // intentionally don't default. we want all fields populated
load_balancing: Some(XdsLoadBalancing {
routing_preference: vec![1, 2],
mode: 1,
}), // ..Default::default() // intentionally don't default. we want all fields populated
};

let auth = XdsAuthorization {
Expand Down
7 changes: 4 additions & 3 deletions src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -661,9 +661,7 @@ mod tests {
let w = mock_default_gateway_workload();
let s = mock_default_gateway_service();
let mut state = state::ProxyState::default();
if let Err(err) = state.workloads.insert(w) {
panic!("received error inserting workload: {}", err);
}
state.workloads.insert(w);
state.services.insert(s);
let state = state::DemandProxyState::new(
Arc::new(RwLock::new(state)),
Expand Down Expand Up @@ -741,6 +739,7 @@ mod tests {
authorization_policies: Vec::new(),
native_tunnel: false,
application_tunnel: None,
locality: Default::default(),
}
}

Expand Down Expand Up @@ -769,6 +768,7 @@ mod tests {
authorization_policies: Vec::new(),
native_tunnel: false,
application_tunnel: None,
locality: Default::default(),
}
}

Expand Down Expand Up @@ -806,6 +806,7 @@ mod tests {
endpoints,
subject_alt_names: vec![],
waypoint: None,
load_balancer: None,
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/proxy/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,7 @@ mod tests {
.collect(),
subject_alt_names: vec![format!("{name}.default.svc.cluster.local")],
waypoint: waypoint.service_attached(),
load_balancer: None,
}
});

Expand Down Expand Up @@ -816,7 +817,7 @@ mod tests {
state.services.insert(svc);
}
for wl in workloads {
state.workloads.insert(wl)?;
state.workloads.insert(wl);
}

Ok(DemandProxyState::new(
Expand Down
12 changes: 6 additions & 6 deletions src/proxy/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ impl OutboundConnection {
let waypoint_us = self
.pi
.state
.fetch_upstream(&self.pi.cfg.network, waypoint_vip)
.fetch_upstream(&self.pi.cfg.network, &source_workload, waypoint_vip)
.await
.ok_or(proxy::Error::UnknownWaypoint(
"unable to determine waypoint upstream".to_string(),
Expand All @@ -414,7 +414,7 @@ impl OutboundConnection {
let waypoint_ip = self
.pi
.state
.load_balance(
.pick_workload_destination(
&waypoint_workload,
&source_workload,
self.pi.metrics.clone(),
Expand Down Expand Up @@ -447,7 +447,7 @@ impl OutboundConnection {
let us = self
.pi
.state
.fetch_upstream(&source_workload.network, target)
.fetch_upstream(&source_workload.network, &source_workload, target)
.await;
if us.is_none() {
// For case no upstream found, passthrough it
Expand All @@ -469,7 +469,7 @@ impl OutboundConnection {
let workload_ip = self
.pi
.state
.load_balance(
.pick_workload_destination(
&mutable_us.workload,
&source_workload,
self.pi.metrics.clone(),
Expand All @@ -491,7 +491,7 @@ impl OutboundConnection {
match self
.pi
.state
.fetch_waypoint(&mutable_us.workload, workload_ip)
.fetch_waypoint(&mutable_us.workload, &source_workload, workload_ip)
.await
{
Ok(None) => {} // workload doesn't have a waypoint; this is fine
Expand All @@ -500,7 +500,7 @@ impl OutboundConnection {
let waypoint_ip = self
.pi
.state
.load_balance(
.pick_workload_destination(
&waypoint_workload,
&source_workload,
self.pi.metrics.clone(),
Expand Down
Loading

0 comments on commit f918a76

Please sign in to comment.