Skip to content

Commit

Permalink
Added support for endpointslices
Browse files Browse the repository at this point in the history
Signed-off-by: ajaychoudhary-hotstar <[email protected]>
  • Loading branch information
ajaychoudhary-hotstar committed Oct 28, 2024
1 parent 02e8831 commit 8c602fd
Show file tree
Hide file tree
Showing 12 changed files with 168 additions and 114 deletions.
6 changes: 4 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ it will be removed; but as it won't be user-visible this isn't considered a brea
instead of the Mapping name, which could reduce the cache's effectiveness. This has been fixed so
that the correct key is used. ([Incorrect Cache Key for Mapping])

- Change: Updated Emissary-Ingress to use EndpointSlices instead of Endpoints to support more than 1000 Backends

[Incorrect Cache Key for Mapping]: https://github.com/emissary-ingress/emissary/issues/5714

## [3.9.0] November 13, 2023
Expand Down Expand Up @@ -401,7 +403,7 @@ it will be removed; but as it won't be user-visible this isn't considered a brea
releases, or a `Host` with or without a `TLSContext` as in prior 2.y releases.

- Bugfix: Prior releases of Emissary-ingress had the arbitrary limitation that a `TCPMapping` cannot
be used on the same port that HTTP is served on, even if TLS+SNI would make this possible.
be used on the same port that HTTP is served on, even if TLS+SNI would make this possible.
Emissary-ingress now allows `TCPMappings` to be used on the same `Listener` port as HTTP `Hosts`,
as long as that `Listener` terminates TLS.

Expand Down Expand Up @@ -567,7 +569,7 @@ it will be removed; but as it won't be user-visible this isn't considered a brea
releases, or a `Host` with or without a `TLSContext` as in prior 2.y releases.

- Bugfix: Prior releases of Emissary-ingress had the arbitrary limitation that a `TCPMapping` cannot
be used on the same port that HTTP is served on, even if TLS+SNI would make this possible.
be used on the same port that HTTP is served on, even if TLS+SNI would make this possible.
Emissary-ingress now allows `TCPMappings` to be used on the same `Listener` port as HTTP `Hosts`,
as long as that `Listener` terminates TLS.

Expand Down
43 changes: 26 additions & 17 deletions cmd/entrypoint/endpoint_routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,22 @@ func makeEndpoints(ctx context.Context, ksnap *snapshot.KubernetesSnapshot, cons

result := map[string][]*ambex.Endpoint{}

for _, k8sEp := range ksnap.Endpoints {
svc, ok := k8sServices[key(k8sEp)]
if !ok {
continue
}
for _, ep := range k8sEndpointsToAmbex(k8sEp, svc) {
result[ep.ClusterName] = append(result[ep.ClusterName], ep)
svcEndpointSlices := map[string][]*kates.EndpointSlice{}

// Collect all the EndpointSlices for each service
for _, k8sEndpointSlice := range ksnap.EndpointSlices {
svcKey := fmt.Sprintf("%s:%s", k8sEndpointSlice.Namespace, k8sEndpointSlice.Labels["kubernetes.io/service-name"])
svcEndpointSlices[svcKey] = append(svcEndpointSlices[svcKey], k8sEndpointSlice)
break
}
//Map each service to its corresponding endpoints from all its EndpointSlices
for svcKey, svc := range k8sServices {
if slices, ok := svcEndpointSlices[svcKey]; ok {
for _, slice := range slices {
for _, ep := range k8sEndpointsToAmbex(slice, svc) {
result[ep.ClusterName] = append(result[ep.ClusterName], ep)
}
}
}
}

Expand All @@ -43,7 +52,7 @@ func key(resource kates.Object) string {
return fmt.Sprintf("%s:%s", resource.GetNamespace(), resource.GetName())
}

func k8sEndpointsToAmbex(ep *kates.Endpoints, svc *kates.Service) (result []*ambex.Endpoint) {
func k8sEndpointsToAmbex(endpointSlice *kates.EndpointSlice, svc *kates.Service) (result []*ambex.Endpoint) {
portmap := map[string][]string{}
for _, p := range svc.Spec.Ports {
port := fmt.Sprintf("%d", p.Port)
Expand All @@ -64,29 +73,29 @@ func k8sEndpointsToAmbex(ep *kates.Endpoints, svc *kates.Service) (result []*amb
}
}

for _, subset := range ep.Subsets {
for _, port := range subset.Ports {
if port.Protocol == kates.ProtocolTCP || port.Protocol == kates.ProtocolUDP {
for _, endpoint := range endpointSlice.Endpoints {
for _, port := range endpointSlice.Ports {
if *port.Protocol == kates.ProtocolTCP || *port.Protocol == kates.ProtocolUDP {
portNames := map[string]bool{}
candidates := []string{fmt.Sprintf("%d", port.Port), port.Name, ""}
candidates := []string{fmt.Sprintf("%d", *port.Port), *port.Name, ""}
for _, c := range candidates {
if pns, ok := portmap[c]; ok {
for _, pn := range pns {
portNames[pn] = true
}
}
}
for _, addr := range subset.Addresses {
for _, address := range endpoint.Addresses {
for pn := range portNames {
sep := "/"
if pn == "" {
sep = ""
}
result = append(result, &ambex.Endpoint{
ClusterName: fmt.Sprintf("k8s/%s/%s%s%s", ep.Namespace, ep.Name, sep, pn),
Ip: addr.IP,
Port: uint32(port.Port),
Protocol: string(port.Protocol),
ClusterName: fmt.Sprintf("k8s/%s/%s%s%s", svc.Namespace, svc.Name, sep, pn),
Ip: address,
Port: uint32(*port.Port),
Protocol: string(*port.Protocol),
})
}
}
Expand Down
65 changes: 40 additions & 25 deletions cmd/entrypoint/endpoint_routing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ func TestEndpointRouting(t *testing.T) {
// Create Mapping, Service, and Endpoints resources to start.
assert.NoError(t, f.Upsert(makeMapping("default", "foo", "/foo", "foo", "endpoint")))
assert.NoError(t, f.Upsert(makeService("default", "foo")))
subset, err := makeSubset(8080, "1.2.3.4")
endpoint, port, err := makeSliceEndpoint(8080, "1.2.3.4")
require.NoError(t, err)
assert.NoError(t, f.Upsert(makeEndpoints("default", "foo", subset)))
assert.NoError(t, f.Upsert(makeEndpointSlice("default", "foo", "foo", endpoint, port)))
f.Flush()
snap, err := f.GetSnapshot(HasMapping("default", "foo"))
require.NoError(t, err)
Expand Down Expand Up @@ -57,9 +57,9 @@ service: foo
resolver: endpoint`,
}
assert.NoError(t, f.Upsert(svc))
subset, err := makeSubset(8080, "1.2.3.4")
endpoint, port, err := makeSliceEndpoint(8080, "1.2.3.4")
require.NoError(t, err)
assert.NoError(t, f.Upsert(makeEndpoints("default", "foo", subset)))
assert.NoError(t, f.Upsert(makeEndpointSlice("default", "foo", "foo", endpoint, port)))
f.Flush()
snap, err := f.GetSnapshot(HasService("default", "foo"))
require.NoError(t, err)
Expand Down Expand Up @@ -97,9 +97,9 @@ func TestEndpointRoutingMultiplePorts(t *testing.T) {
},
},
}))
subset, err := makeSubset("cleartext", 8080, "encrypted", 8443, "1.2.3.4")
endpoint, port, err := makeSliceEndpoint(8080, "1.2.3.4")
require.NoError(t, err)
assert.NoError(t, f.Upsert(makeEndpoints("default", "foo", subset)))
assert.NoError(t, f.Upsert(makeEndpointSlice("default", "foo", "foo", endpoint, port)))
f.Flush()
snap, err := f.GetSnapshot(HasMapping("default", "foo"))
require.NoError(t, err)
Expand Down Expand Up @@ -155,9 +155,9 @@ func TestEndpointRoutingIP(t *testing.T) {
func TestEndpointRoutingMappingCreation(t *testing.T) {
f := entrypoint.RunFake(t, entrypoint.FakeConfig{}, nil)
assert.NoError(t, f.Upsert(makeService("default", "foo")))
subset, err := makeSubset(8080, "1.2.3.4")
endpoint, port, err := makeSliceEndpoint(8080, "1.2.3.4")
require.NoError(t, err)
assert.NoError(t, f.Upsert(makeEndpoints("default", "foo", subset)))
assert.NoError(t, f.Upsert(makeEndpointSlice("default", "foo", "foo", endpoint, port)))
f.Flush()
f.AssertEndpointsEmpty(timeout)
assert.NoError(t, f.UpsertYAML(`
Expand Down Expand Up @@ -242,36 +242,51 @@ func makeService(namespace, name string) *kates.Service {
}
}

func makeEndpoints(namespace, name string, subsets ...kates.EndpointSubset) *kates.Endpoints {
return &kates.Endpoints{
TypeMeta: kates.TypeMeta{Kind: "Endpoints"},
ObjectMeta: kates.ObjectMeta{Namespace: namespace, Name: name},
Subsets: subsets,
func makeEndpointSlice(namespace, name, serviceName string, endpoint []kates.Endpoint, port []kates.EndpointSlicePort) *kates.EndpointSlice {
return &kates.EndpointSlice{
TypeMeta: kates.TypeMeta{Kind: "EndpointSlices", APIVersion: "v1.discovery.k8s.io"},
ObjectMeta: kates.ObjectMeta{
Namespace: namespace,
Name: name,
Labels: map[string]string{
"kubernetes.io/service-name": serviceName,
},
},
Endpoints: endpoint,
Ports: port,
}
}

// makeSubset provides a convenient way to kubernetes EndpointSubset resources. Any int args are
// ports, any ip address strings are addresses, and no ip address strings are used as the port name
// for any ports that follow them in the arg list.
func makeSubset(args ...interface{}) (kates.EndpointSubset, error) {
func makeSliceEndpoint(args ...interface{}) ([]kates.Endpoint, []kates.EndpointSlicePort, error) {
var endpoints []kates.Endpoint
var ports []kates.EndpointSlicePort
portName := ""
var ports []kates.EndpointPort
var addrs []kates.EndpointAddress

for _, arg := range args {
switch v := arg.(type) {
case int:
ports = append(ports, kates.EndpointPort{Name: portName, Port: int32(v), Protocol: kates.ProtocolTCP})
ports = append(ports, kates.EndpointSlicePort{Name: &portName, Port: int32Ptr(int32(v)), Protocol: protocolPtr(kates.ProtocolTCP)})
case string:
IP := net.ParseIP(v)
if IP == nil {
portName = v
if IP != nil {
endpoints = append(endpoints, kates.Endpoint{
Addresses: []string{v},
})
} else {
addrs = append(addrs, kates.EndpointAddress{IP: v})
portName = v // Assume it's a port name if not an IP address
}
default:
return kates.EndpointSubset{}, fmt.Errorf("unrecognized type: %T", v)
return nil, nil, fmt.Errorf("unrecognized type: %T", v)
}
}

return kates.EndpointSubset{Addresses: addrs, Ports: ports}, nil
return endpoints, ports, nil
}

func int32Ptr(i int32) *int32 {
return &i
}

func protocolPtr(p kates.Protocol) *kates.Protocol {
return &p
}
21 changes: 18 additions & 3 deletions cmd/entrypoint/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type endpointRoutingInfo struct {
module moduleResolver
endpointWatches map[string]bool // A set to track the subset of kubernetes endpoints we care about.
previousWatches map[string]bool
endpointSlices []*kates.EndpointSlice
}

type ResolverType int
Expand All @@ -47,7 +48,7 @@ func (rt ResolverType) String() string {

// newEndpointRoutingInfo creates a shiny new struct to hold information about
// resolvers in use and such.
func newEndpointRoutingInfo() endpointRoutingInfo {
func newEndpointRoutingInfo(endpointSlices []*kates.EndpointSlice) endpointRoutingInfo {
return endpointRoutingInfo{
// resolverTypes keeps track of the type of every resolver in the system.
// It starts out empty.
Expand All @@ -59,6 +60,7 @@ func newEndpointRoutingInfo() endpointRoutingInfo {
resolverTypes: make(map[string]ResolverType),
// Track which endpoints we actually want to watch.
endpointWatches: make(map[string]bool),
endpointSlices: endpointSlices,
}
}

Expand All @@ -71,6 +73,7 @@ func (eri *endpointRoutingInfo) reconcileEndpointWatches(ctx context.Context, s
eri.module = moduleResolver{}
eri.previousWatches = eri.endpointWatches
eri.endpointWatches = map[string]bool{}
eri.endpointSlices = s.EndpointSlices

// Phase one processes all the configuration stuff that Mappings depend on. Right now this
// includes Modules and Resolvers. When we are done with Phase one we have processed enough
Expand Down Expand Up @@ -228,7 +231,13 @@ func (eri *endpointRoutingInfo) checkMapping(ctx context.Context, mapping *amb.M

if eri.resolverTypes[resolver] == KubernetesEndpointResolver {
svc, ns, _ := eri.module.parseService(ctx, mapping, service, mapping.GetNamespace())
eri.endpointWatches[fmt.Sprintf("%s:%s", ns, svc)] = true
for _, endpointSlice := range eri.endpointSlices {
// Check if this EndpointSlice matches the target service and namespace
if endpointSlice.Namespace == ns && endpointSlice.Labels["kubernetes.io/service-name"] == svc {
eri.endpointWatches[fmt.Sprintf("%s:%s", ns, endpointSlice.Name)] = true

}
}
}
}

Expand All @@ -247,7 +256,13 @@ func (eri *endpointRoutingInfo) checkTCPMapping(ctx context.Context, tcpmapping

if eri.resolverTypes[resolver] == KubernetesEndpointResolver {
svc, ns, _ := eri.module.parseService(ctx, tcpmapping, service, tcpmapping.GetNamespace())
eri.endpointWatches[fmt.Sprintf("%s:%s", ns, svc)] = true
for _, endpointSlice := range eri.endpointSlices {
// Check if this EndpointSlice matches the target service and namespace
if endpointSlice.Namespace == ns && endpointSlice.Labels["kubernetes.io/service-name"] == svc {
eri.endpointWatches[fmt.Sprintf("%s:%s", ns, endpointSlice.Name)] = true

}
}
}
}

Expand Down
9 changes: 5 additions & 4 deletions cmd/entrypoint/interesting_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,11 @@ func GetInterestingTypes(ctx context.Context, serverTypeList []kates.APIResource
//
// Note that we pull `secrets.v1.` in to "K8sSecrets". ReconcileSecrets will pull
// over the ones we need into "Secrets" and "Endpoints" respectively.
"Services": {{typename: "services.v1."}}, // New in Kubernetes 0.16.0 (2015-04-28) (v1beta{1..3} before that)
"Endpoints": {{typename: "endpoints.v1.", fieldselector: endpointFs}}, // New in Kubernetes 0.16.0 (2015-04-28) (v1beta{1..3} before that)
"K8sSecrets": {{typename: "secrets.v1."}}, // New in Kubernetes 0.16.0 (2015-04-28) (v1beta{1..3} before that)
"ConfigMaps": {{typename: "configmaps.v1.", fieldselector: configMapFs}},
"Services": {{typename: "services.v1."}}, // New in Kubernetes 0.16.0 (2015-04-28) (v1beta{1..3} before that)
"Endpoints": {{typename: "endpoints.v1.", fieldselector: endpointFs}}, // New in Kubernetes 0.16.0 (2015-04-28) (v1beta{1..3} before that)
"EndpointSlices": {{typename: "endpointslices.v1.discovery.k8s.io", fieldselector: endpointFs}},
"K8sSecrets": {{typename: "secrets.v1."}}, // New in Kubernetes 0.16.0 (2015-04-28) (v1beta{1..3} before that)
"ConfigMaps": {{typename: "configmaps.v1.", fieldselector: configMapFs}},
"Ingresses": {
{typename: "ingresses.v1beta1.extensions"}, // New in Kubernetes 1.2.0 (2016-03-16), gone in Kubernetes 1.22.0 (2021-08-04)
{typename: "ingresses.v1beta1.networking.k8s.io"}, // New in Kubernetes 1.14.0 (2019-03-25), gone in Kubernetes 1.22.0 (2021-08-04)
Expand Down
58 changes: 30 additions & 28 deletions cmd/entrypoint/testdata/custom-endpoints.yaml
Original file line number Diff line number Diff line change
@@ -1,46 +1,48 @@
---
# All the IP addresses, pod names, etc., are basically made up. These
# aren't meant to be functional, just to exercise the machinery of
# aren't meant to be functional, just to exercise the machinery of
# filting things in the watcher.
apiVersion: v1
kind: Endpoints
apiVersion: discovery.k8s.io/v1
kind: EndpointSlice
metadata:
name: random-1
subsets:
endpoints:
- addresses:
- ip: 10.42.0.55
nodeName: flynn-2a
targetRef:
kind: Pod
name: random-6db467b4d7-zzzz1
- ip: 10.42.0.56
nodeName: flynn-2b
targetRef:
kind: Pod
name: random-6db467b4d7-zzzz1
- "10.42.0.55"
nodeName: flynn-2a
targetRef:
kind: Pod
name: random-6db467b4d7-zzzz1
- addresses:
- "10.42.0.56"
nodeName: flynn-2b
targetRef:
kind: Pod
name: random-6db467b4d7-zzzz1
ports:
- port: 5000
protocol: TCP
---
# All the IP addresses, pod names, etc., are basically made up. These
# aren't meant to be functional, just to exercise the machinery of
# aren't meant to be functional, just to exercise the machinery of
# filting things in the watcher.
apiVersion: v1
kind: Endpoints
apiVersion: discovery.k8s.io/v1
kind: EndpointSlice
metadata:
name: random-2
subsets:
endpoints:
- addresses:
- "10.42.0.65"
nodeName: flynn-2a
targetRef:
kind: Pod
name: rande-6db467b4d7-zzzz2
- addresses:
- ip: 10.42.0.65
nodeName: flynn-2a
targetRef:
kind: Pod
name: rande-6db467b4d7-zzzz2
- ip: 10.42.0.66
nodeName: flynn-2b
targetRef:
kind: Pod
name: rande-6db467b4d7-zzzz2
- "10.42.0.66"
nodeName: flynn-2b
targetRef:
kind: Pod
name: rande-6db467b4d7-zzzz2
ports:
- port: 5000
protocol: TCP
Loading

0 comments on commit 8c602fd

Please sign in to comment.