Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for multiple gRPC client management #93

Merged
merged 10 commits into from
Oct 27, 2023
Merged
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Build the manager binary
FROM golang:1.19 as builder
FROM golang:1.21 as builder

LABEL org.opencontainers.image.source=https://github.com/kubernetes-sigs/blixt
LABEL org.opencontainers.image.description="An experimental layer 4 load-balancer built using eBPF/XDP with ebpf-go \
Expand Down
172 changes: 172 additions & 0 deletions controllers/dataplane_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package controllers

import (
"context"
"fmt"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"

dataplane "github.com/kubernetes-sigs/blixt/internal/dataplane/client"
"github.com/kubernetes-sigs/blixt/pkg/vars"
)

//+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=gateways,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=gateways/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=gateways/finalizers,verbs=update

//+kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=services/status,verbs=get

//+kubebuilder:rbac:groups=core,resources=endpoints,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=endpoints/status,verbs=get

// DataplaneReconciler reconciles the dataplane pods.
type DataplaneReconciler struct {
client.Client
scheme *runtime.Scheme

backendsClientManager *dataplane.BackendsClientManager

updates chan event.GenericEvent
}

func NewDataplaneReconciler(client client.Client, schema *runtime.Scheme, manager *dataplane.BackendsClientManager) *DataplaneReconciler {
return &DataplaneReconciler{
Client: client,
scheme: schema,
backendsClientManager: manager,
updates: make(chan event.GenericEvent, 1),
}
}

var (
podOwnerKey = ".metadata.controller"
apiGVStr = appsv1.SchemeGroupVersion.String()
)

// SetupWithManager loads the controller into the provided controller manager.
func (r *DataplaneReconciler) SetupWithManager(mgr ctrl.Manager) error {

// In order to allow our reconciler to quickly look up Pods by their owner, we’ll
// need an index. We declare an index key that we can later use with the client
// as a pseudo-field name, and then describe how to extract the indexed value from
// the Pod object. The indexer will automatically take care of namespaces for us,
// so we just have to extract the owner name if the Pod has a DaemonSet owner.
if err := mgr.GetFieldIndexer().IndexField(context.Background(), &corev1.Pod{}, podOwnerKey, func(rawObj client.Object) []string {
// grab the pod object, extract the owner...
pod := rawObj.(*corev1.Pod)
owner := metav1.GetControllerOf(pod)
if owner == nil {
return nil
}
// ...make sure it's a DaemonSet...
if owner.APIVersion != apiGVStr || owner.Kind != "DaemonSet" {
return nil
}

// ...and if so, return it
return []string{owner.Name}
}); err != nil {
return err
}
levikobi marked this conversation as resolved.
Show resolved Hide resolved

return ctrl.NewControllerManagedBy(mgr).
For(&appsv1.DaemonSet{},
builder.WithPredicates(predicate.NewPredicateFuncs(r.daemonsetHasMatchingAnnotations)),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you possibly filter this even more? i.e Only care about events where the DS's status changes since it's where we get the ready pods from anyways?

status:
  currentNumberScheduled: 1
  desiredNumberScheduled: 1
  numberAvailable: 1
  numberMisscheduled: 0
  numberReady: 1
  observedGeneration: 1
  updatedNumberScheduled: 1

Copy link
Member Author

@levikobi levikobi Aug 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still relatively new to the tooling, did you mean something like that?

return ctrl.NewControllerManagedBy(mgr).
	For(&appsv1.DaemonSet{},
		builder.WithPredicates(predicate.NewPredicateFuncs(r.daemonsetHasMatchingAnnotations)),
	).
	WithEventFilter(predicate.Funcs{
		UpdateFunc: func(e event.UpdateEvent) bool {
			return true
		},
	}).
	Complete(r)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah close to that see https://sdk.operatorframework.io/docs/building-operators/golang/references/event-filtering/

In bpfd we do something like

// Only reconcile if a bpfprogram has been created for the controller's program type.
func BpfProgramTypePredicate(kind string) predicate.Funcs {
	return predicate.Funcs{
		GenericFunc: func(e event.GenericEvent) bool {
			return e.Object.(*bpfdiov1alpha1.BpfProgram).Spec.Type == kind
		},
		CreateFunc: func(e event.CreateEvent) bool {
			return e.Object.(*bpfdiov1alpha1.BpfProgram).Spec.Type == kind
		},
		UpdateFunc: func(e event.UpdateEvent) bool {
			return e.ObjectNew.(*bpfdiov1alpha1.BpfProgram).Spec.Type == kind
		},
		DeleteFunc: func(e event.DeleteEvent) bool {
			return e.Object.(*bpfdiov1alpha1.BpfProgram).Spec.Type == kind
		},
	}
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(which is a bit verbose but has worked for us in the past)

).
WithEventFilter(predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
return true
},
}).
Complete(r)
}

func (r *DataplaneReconciler) daemonsetHasMatchingAnnotations(obj client.Object) bool {
log := log.FromContext(context.Background())

daemonset, ok := obj.(*appsv1.DaemonSet)
if !ok {
levikobi marked this conversation as resolved.
Show resolved Hide resolved
log.Error(fmt.Errorf("received unexpected type in daemonset watch predicates: %T", obj), "THIS SHOULD NEVER HAPPEN!")
return false
}

// determine if this is a blixt daemonset
matchLabels := daemonset.Spec.Selector.MatchLabels
app, ok := matchLabels["app"]
if !ok || app != vars.DefaultDataPlaneAppLabel {
return false
}

// verify that it's the dataplane daemonset
component, ok := matchLabels["component"]
if !ok || component != vars.DefaultDataPlaneComponentLabel {
return false
}

return true
}

// Reconcile provisions (and de-provisions) resources relevant to this controller.
func (r *DataplaneReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)

ds := new(appsv1.DaemonSet)
if err := r.Client.Get(ctx, req.NamespacedName, ds); err != nil {
if errors.IsNotFound(err) {
logger.Info("DataplaneReconciler", "reconcile status", "object enqueued no longer exists, skipping")
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
}

var childPods corev1.PodList
if err := r.List(ctx, &childPods, client.InNamespace(req.Namespace), client.MatchingFields{podOwnerKey: req.Name}); err != nil {
logger.Error(err, "DataplaneReconciler", "reconcile status", "unable to list child pods")
return ctrl.Result{}, err
}

readyPodByNN := make(map[types.NamespacedName]corev1.Pod)
for _, pod := range childPods.Items {
for _, container := range pod.Status.ContainerStatuses {
if container.Name == vars.DefaultDataPlaneComponentLabel && container.Ready {
key := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
readyPodByNN[key] = pod
}
}
}

logger.Info("DataplaneReconciler", "reconcile status", "setting updated backends client list", "num ready pods", len(readyPodByNN))
updated, err := r.backendsClientManager.SetClientsList(ctx, readyPodByNN)
if updated {
logger.Info("DataplaneReconciler", "reconcile status", "backends client list updated, sending generic event")
select {
case r.updates <- event.GenericEvent{Object: ds}:
logger.Info("DataplaneReconciler", "reconcile status", "generic event sent")
default:
logger.Info("DataplaneReconciler", "reconcile status", "generic event skipped - channel is full")
}
}
if err != nil {
logger.Error(err, "DataplaneReconciler", "reconcile status", "partial failure for backends client list update")
return ctrl.Result{Requeue: true}, err
}

logger.Info("DataplaneReconciler", "reconcile status", "done")
return ctrl.Result{}, nil
}

func (r *DataplaneReconciler) GetUpdates() <-chan event.GenericEvent {
return r.updates
}
42 changes: 16 additions & 26 deletions controllers/tcproute_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@ import (
"time"

"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/source"
gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
gatewayv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"

Expand All @@ -52,7 +53,9 @@ type TCPRouteReconciler struct {
client.Client
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We no longer neeed RBAC for daemonset's here right? (on the //+kubebuilder lines above)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make sure that all controller specific RBAC tags are aligned to each dedicated controller file and then any RBAC tags that are used in all of the controllers maybe goes in the dataplane.

Sorry I know it wasn't really a problem you introduced :/

Scheme *runtime.Scheme

log logr.Logger
log logr.Logger
ClientReconcileRequestChan <-chan event.GenericEvent
BackendsClientManager *dataplane.BackendsClientManager
}

// SetupWithManager sets up the controller with the Manager.
Expand All @@ -61,8 +64,8 @@ func (r *TCPRouteReconciler) SetupWithManager(mgr ctrl.Manager) error {

return ctrl.NewControllerManagedBy(mgr).
For(&gatewayv1alpha2.TCPRoute{}).
Watches(
&appsv1.DaemonSet{},
WatchesRawSource(
&source.Channel{Source: r.ClientReconcileRequestChan},
handler.EnqueueRequestsFromMapFunc(r.mapDataPlaneDaemonsetToTCPRoutes),
).
Watches(
Expand Down Expand Up @@ -202,18 +205,11 @@ func (r *TCPRouteReconciler) ensureTCPRouteConfiguredInDataPlane(ctx context.Con
return err
}

// TODO: add multiple endpoint support https://github.com/Kong/blixt/issues/46
dataplaneClient, err := dataplane.NewDataPlaneClient(context.Background(), r.Client)
if err != nil {
if _, err = r.BackendsClientManager.Update(ctx, targets); err != nil {
return err
}

confirmation, err := dataplaneClient.Update(context.Background(), targets)
if err != nil {
return err
}

r.log.Info(fmt.Sprintf("successful data-plane UPDATE, confirmation: %s", confirmation.String()))
r.log.Info("successful data-plane UPDATE")

return nil
}
Expand All @@ -230,24 +226,17 @@ func (r *TCPRouteReconciler) ensureTCPRouteDeletedInDataPlane(ctx context.Contex
return err
}

// TODO: add multiple endpoint support https://github.com/Kong/blixt/issues/46
dataplaneClient, err := dataplane.NewDataPlaneClient(context.Background(), r.Client)
if err != nil {
return err
}

// since we currently only support one TCPRoute per Gateway, we can delete the vip (gateway)
// entry from the dataplane. this won't fly when we end up adding support for multiple TCPRoutes
// per Gateway.
confirmation, err := dataplaneClient.Delete(context.Background(), &dataplane.Vip{
vip := dataplane.Vip{
Ip: gatewayIP,
Port: gwPort,
})
if err != nil {
}

// delete the target from the dataplane
if _, err = r.BackendsClientManager.Delete(ctx, &vip); err != nil {
return err
}

r.log.Info(fmt.Sprintf("successful data-plane DELETE, confirmation: %s", confirmation.String()))
r.log.Info("successful data-plane DELETE")
astoycos marked this conversation as resolved.
Show resolved Hide resolved

oldFinalizers := tcproute.GetFinalizers()
newFinalizers := make([]string, 0, len(oldFinalizers)-1)
Expand All @@ -259,4 +248,5 @@ func (r *TCPRouteReconciler) ensureTCPRouteDeletedInDataPlane(ctx context.Contex
tcproute.SetFinalizers(newFinalizers)

return r.Client.Update(ctx, tcproute)

}
41 changes: 15 additions & 26 deletions controllers/udproute_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@ import (
"time"

"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/source"
gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
gatewayv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"

Expand All @@ -52,7 +53,9 @@ type UDPRouteReconciler struct {
client.Client
Scheme *runtime.Scheme

log logr.Logger
log logr.Logger
ClientReconcileRequestChan <-chan event.GenericEvent
BackendsClientManager *dataplane.BackendsClientManager
}

// SetupWithManager sets up the controller with the Manager.
Expand All @@ -61,8 +64,8 @@ func (r *UDPRouteReconciler) SetupWithManager(mgr ctrl.Manager) error {

return ctrl.NewControllerManagedBy(mgr).
For(&gatewayv1alpha2.UDPRoute{}).
Watches(
&appsv1.DaemonSet{},
WatchesRawSource(
&source.Channel{Source: r.ClientReconcileRequestChan},
handler.EnqueueRequestsFromMapFunc(r.mapDataPlaneDaemonsetToUDPRoutes),
).
Watches(
Expand Down Expand Up @@ -202,18 +205,11 @@ func (r *UDPRouteReconciler) ensureUDPRouteConfiguredInDataPlane(ctx context.Con
return err
}

// TODO: add multiple endpoint support https://github.com/kubernetes-sigs/blixt/issues/46
dataplaneClient, err := dataplane.NewDataPlaneClient(context.Background(), r.Client)
if err != nil {
if _, err = r.BackendsClientManager.Update(ctx, targets); err != nil {
return err
}

confirmation, err := dataplaneClient.Update(context.Background(), targets)
if err != nil {
return err
}

r.log.Info(fmt.Sprintf("successful data-plane UPDATE, confirmation: %s", confirmation.String()))
r.log.Info("successful data-plane UPDATE")

return nil
}
Expand All @@ -230,24 +226,17 @@ func (r *UDPRouteReconciler) ensureUDPRouteDeletedInDataPlane(ctx context.Contex
return err
}

// TODO: add multiple endpoint support https://github.com/kubernetes-sigs/blixt/issues/46
dataplaneClient, err := dataplane.NewDataPlaneClient(context.Background(), r.Client)
if err != nil {
return err
}

// since we currently only support one UDPRoute per Gateway, we can delete the vip (gateway)
// entry from the dataplane. this won't fly when we end up adding support for multiple UDPRoutes
// per Gateway.
confirmation, err := dataplaneClient.Delete(context.Background(), &dataplane.Vip{
vip := dataplane.Vip{
Ip: gatewayIP,
Port: gwPort,
})
if err != nil {
}

// delete the target from the dataplane
if _, err = r.BackendsClientManager.Delete(ctx, &vip); err != nil {
return err
}

r.log.Info(fmt.Sprintf("successful data-plane DELETE, confirmation: %s", confirmation.String()))
r.log.Info("successful data-plane DELETE")

oldFinalizers := udproute.GetFinalizers()
newFinalizers := make([]string, 0, len(oldFinalizers)-1)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/go-logr/logr v1.2.4
github.com/go-logr/stdr v1.2.2
github.com/google/uuid v1.3.1
github.com/kong/blixt v0.2.1
github.com/kong/kubernetes-testing-framework v0.39.1
github.com/onsi/ginkgo/v2 v2.13.0
github.com/onsi/gomega v1.28.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kong/blixt v0.2.1 h1:QMBmHziwPhxgbUDl+Be+5cbQ6tbOT9JqGJJF8GtZs0U=
github.com/kong/blixt v0.2.1/go.mod h1:N6gFG9pVhbrva7dzadaWJycGDkG6EBFgybE1hJQoIR0=
github.com/kong/kubernetes-testing-framework v0.39.1 h1:30dTVe0Muda3r6NAMQHvdGLuB+nkhZRXnJA8AJjuvO4=
github.com/kong/kubernetes-testing-framework v0.39.1/go.mod h1:12TQ5gAkZhuxh47IJcW03iumky1X/T7ZCStuClQ1vzs=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
Expand Down
Loading
Loading