diff --git a/Dockerfile b/Dockerfile index 5c5e1ca1..47126c5d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ # Build the manager binary -FROM golang:1.19 as builder +FROM golang:1.21 as builder LABEL org.opencontainers.image.source=https://github.com/kubernetes-sigs/blixt LABEL org.opencontainers.image.description="An experimental layer 4 load-balancer built using eBPF/XDP with ebpf-go \ diff --git a/controllers/dataplane_controller.go b/controllers/dataplane_controller.go new file mode 100644 index 00000000..e13080e1 --- /dev/null +++ b/controllers/dataplane_controller.go @@ -0,0 +1,172 @@ +package controllers + +import ( + "context" + "fmt" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" + + dataplane "github.com/kubernetes-sigs/blixt/internal/dataplane/client" + "github.com/kubernetes-sigs/blixt/pkg/vars" +) + +//+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=gateways,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=gateways/status,verbs=get;update;patch +//+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=gateways/finalizers,verbs=update + +//+kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups=core,resources=services/status,verbs=get + +//+kubebuilder:rbac:groups=core,resources=endpoints,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups=core,resources=endpoints/status,verbs=get + +// DataplaneReconciler reconciles the dataplane pods. +type DataplaneReconciler struct { + client.Client + scheme *runtime.Scheme + + backendsClientManager *dataplane.BackendsClientManager + + updates chan event.GenericEvent +} + +func NewDataplaneReconciler(client client.Client, schema *runtime.Scheme, manager *dataplane.BackendsClientManager) *DataplaneReconciler { + return &DataplaneReconciler{ + Client: client, + scheme: schema, + backendsClientManager: manager, + updates: make(chan event.GenericEvent, 1), + } +} + +var ( + podOwnerKey = ".metadata.controller" + apiGVStr = appsv1.SchemeGroupVersion.String() +) + +// SetupWithManager loads the controller into the provided controller manager. +func (r *DataplaneReconciler) SetupWithManager(mgr ctrl.Manager) error { + + // In order to allow our reconciler to quickly look up Pods by their owner, we’ll + // need an index. We declare an index key that we can later use with the client + // as a pseudo-field name, and then describe how to extract the indexed value from + // the Pod object. The indexer will automatically take care of namespaces for us, + // so we just have to extract the owner name if the Pod has a DaemonSet owner. + if err := mgr.GetFieldIndexer().IndexField(context.Background(), &corev1.Pod{}, podOwnerKey, func(rawObj client.Object) []string { + // grab the pod object, extract the owner... + pod := rawObj.(*corev1.Pod) + owner := metav1.GetControllerOf(pod) + if owner == nil { + return nil + } + // ...make sure it's a DaemonSet... + if owner.APIVersion != apiGVStr || owner.Kind != "DaemonSet" { + return nil + } + + // ...and if so, return it + return []string{owner.Name} + }); err != nil { + return err + } + + return ctrl.NewControllerManagedBy(mgr). + For(&appsv1.DaemonSet{}, + builder.WithPredicates(predicate.NewPredicateFuncs(r.daemonsetHasMatchingAnnotations)), + ). + WithEventFilter(predicate.Funcs{ + UpdateFunc: func(e event.UpdateEvent) bool { + return true + }, + }). + Complete(r) +} + +func (r *DataplaneReconciler) daemonsetHasMatchingAnnotations(obj client.Object) bool { + log := log.FromContext(context.Background()) + + daemonset, ok := obj.(*appsv1.DaemonSet) + if !ok { + log.Error(fmt.Errorf("received unexpected type in daemonset watch predicates: %T", obj), "THIS SHOULD NEVER HAPPEN!") + return false + } + + // determine if this is a blixt daemonset + matchLabels := daemonset.Spec.Selector.MatchLabels + app, ok := matchLabels["app"] + if !ok || app != vars.DefaultDataPlaneAppLabel { + return false + } + + // verify that it's the dataplane daemonset + component, ok := matchLabels["component"] + if !ok || component != vars.DefaultDataPlaneComponentLabel { + return false + } + + return true +} + +// Reconcile provisions (and de-provisions) resources relevant to this controller. +func (r *DataplaneReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + logger := log.FromContext(ctx) + + ds := new(appsv1.DaemonSet) + if err := r.Client.Get(ctx, req.NamespacedName, ds); err != nil { + if errors.IsNotFound(err) { + logger.Info("DataplaneReconciler", "reconcile status", "object enqueued no longer exists, skipping") + return ctrl.Result{}, nil + } + return ctrl.Result{}, err + } + + var childPods corev1.PodList + if err := r.List(ctx, &childPods, client.InNamespace(req.Namespace), client.MatchingFields{podOwnerKey: req.Name}); err != nil { + logger.Error(err, "DataplaneReconciler", "reconcile status", "unable to list child pods") + return ctrl.Result{}, err + } + + readyPodByNN := make(map[types.NamespacedName]corev1.Pod) + for _, pod := range childPods.Items { + for _, container := range pod.Status.ContainerStatuses { + if container.Name == vars.DefaultDataPlaneComponentLabel && container.Ready { + key := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name} + readyPodByNN[key] = pod + } + } + } + + logger.Info("DataplaneReconciler", "reconcile status", "setting updated backends client list", "num ready pods", len(readyPodByNN)) + updated, err := r.backendsClientManager.SetClientsList(ctx, readyPodByNN) + if updated { + logger.Info("DataplaneReconciler", "reconcile status", "backends client list updated, sending generic event") + select { + case r.updates <- event.GenericEvent{Object: ds}: + logger.Info("DataplaneReconciler", "reconcile status", "generic event sent") + default: + logger.Info("DataplaneReconciler", "reconcile status", "generic event skipped - channel is full") + } + } + if err != nil { + logger.Error(err, "DataplaneReconciler", "reconcile status", "partial failure for backends client list update") + return ctrl.Result{Requeue: true}, err + } + + logger.Info("DataplaneReconciler", "reconcile status", "done") + return ctrl.Result{}, nil +} + +func (r *DataplaneReconciler) GetUpdates() <-chan event.GenericEvent { + return r.updates +} diff --git a/controllers/tcproute_controller.go b/controllers/tcproute_controller.go index ca9b61b8..a141d670 100644 --- a/controllers/tcproute_controller.go +++ b/controllers/tcproute_controller.go @@ -23,15 +23,16 @@ import ( "time" "github.com/go-logr/logr" - appsv1 "k8s.io/api/apps/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/source" gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" gatewayv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1" @@ -52,7 +53,9 @@ type TCPRouteReconciler struct { client.Client Scheme *runtime.Scheme - log logr.Logger + log logr.Logger + ClientReconcileRequestChan <-chan event.GenericEvent + BackendsClientManager *dataplane.BackendsClientManager } // SetupWithManager sets up the controller with the Manager. @@ -61,8 +64,8 @@ func (r *TCPRouteReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&gatewayv1alpha2.TCPRoute{}). - Watches( - &appsv1.DaemonSet{}, + WatchesRawSource( + &source.Channel{Source: r.ClientReconcileRequestChan}, handler.EnqueueRequestsFromMapFunc(r.mapDataPlaneDaemonsetToTCPRoutes), ). Watches( @@ -202,18 +205,11 @@ func (r *TCPRouteReconciler) ensureTCPRouteConfiguredInDataPlane(ctx context.Con return err } - // TODO: add multiple endpoint support https://github.com/Kong/blixt/issues/46 - dataplaneClient, err := dataplane.NewDataPlaneClient(context.Background(), r.Client) - if err != nil { + if _, err = r.BackendsClientManager.Update(ctx, targets); err != nil { return err } - confirmation, err := dataplaneClient.Update(context.Background(), targets) - if err != nil { - return err - } - - r.log.Info(fmt.Sprintf("successful data-plane UPDATE, confirmation: %s", confirmation.String())) + r.log.Info("successful data-plane UPDATE") return nil } @@ -230,24 +226,17 @@ func (r *TCPRouteReconciler) ensureTCPRouteDeletedInDataPlane(ctx context.Contex return err } - // TODO: add multiple endpoint support https://github.com/Kong/blixt/issues/46 - dataplaneClient, err := dataplane.NewDataPlaneClient(context.Background(), r.Client) - if err != nil { - return err - } - - // since we currently only support one TCPRoute per Gateway, we can delete the vip (gateway) - // entry from the dataplane. this won't fly when we end up adding support for multiple TCPRoutes - // per Gateway. - confirmation, err := dataplaneClient.Delete(context.Background(), &dataplane.Vip{ + vip := dataplane.Vip{ Ip: gatewayIP, Port: gwPort, - }) - if err != nil { + } + + // delete the target from the dataplane + if _, err = r.BackendsClientManager.Delete(ctx, &vip); err != nil { return err } - r.log.Info(fmt.Sprintf("successful data-plane DELETE, confirmation: %s", confirmation.String())) + r.log.Info("successful data-plane DELETE") oldFinalizers := tcproute.GetFinalizers() newFinalizers := make([]string, 0, len(oldFinalizers)-1) @@ -259,4 +248,5 @@ func (r *TCPRouteReconciler) ensureTCPRouteDeletedInDataPlane(ctx context.Contex tcproute.SetFinalizers(newFinalizers) return r.Client.Update(ctx, tcproute) + } diff --git a/controllers/udproute_controller.go b/controllers/udproute_controller.go index 2d7a2d75..e01a1b89 100644 --- a/controllers/udproute_controller.go +++ b/controllers/udproute_controller.go @@ -23,15 +23,16 @@ import ( "time" "github.com/go-logr/logr" - appsv1 "k8s.io/api/apps/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/source" gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" gatewayv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1" @@ -52,7 +53,9 @@ type UDPRouteReconciler struct { client.Client Scheme *runtime.Scheme - log logr.Logger + log logr.Logger + ClientReconcileRequestChan <-chan event.GenericEvent + BackendsClientManager *dataplane.BackendsClientManager } // SetupWithManager sets up the controller with the Manager. @@ -61,8 +64,8 @@ func (r *UDPRouteReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&gatewayv1alpha2.UDPRoute{}). - Watches( - &appsv1.DaemonSet{}, + WatchesRawSource( + &source.Channel{Source: r.ClientReconcileRequestChan}, handler.EnqueueRequestsFromMapFunc(r.mapDataPlaneDaemonsetToUDPRoutes), ). Watches( @@ -202,18 +205,11 @@ func (r *UDPRouteReconciler) ensureUDPRouteConfiguredInDataPlane(ctx context.Con return err } - // TODO: add multiple endpoint support https://github.com/kubernetes-sigs/blixt/issues/46 - dataplaneClient, err := dataplane.NewDataPlaneClient(context.Background(), r.Client) - if err != nil { + if _, err = r.BackendsClientManager.Update(ctx, targets); err != nil { return err } - confirmation, err := dataplaneClient.Update(context.Background(), targets) - if err != nil { - return err - } - - r.log.Info(fmt.Sprintf("successful data-plane UPDATE, confirmation: %s", confirmation.String())) + r.log.Info("successful data-plane UPDATE") return nil } @@ -230,24 +226,17 @@ func (r *UDPRouteReconciler) ensureUDPRouteDeletedInDataPlane(ctx context.Contex return err } - // TODO: add multiple endpoint support https://github.com/kubernetes-sigs/blixt/issues/46 - dataplaneClient, err := dataplane.NewDataPlaneClient(context.Background(), r.Client) - if err != nil { - return err - } - - // since we currently only support one UDPRoute per Gateway, we can delete the vip (gateway) - // entry from the dataplane. this won't fly when we end up adding support for multiple UDPRoutes - // per Gateway. - confirmation, err := dataplaneClient.Delete(context.Background(), &dataplane.Vip{ + vip := dataplane.Vip{ Ip: gatewayIP, Port: gwPort, - }) - if err != nil { + } + + // delete the target from the dataplane + if _, err = r.BackendsClientManager.Delete(ctx, &vip); err != nil { return err } - r.log.Info(fmt.Sprintf("successful data-plane DELETE, confirmation: %s", confirmation.String())) + r.log.Info("successful data-plane DELETE") oldFinalizers := udproute.GetFinalizers() newFinalizers := make([]string, 0, len(oldFinalizers)-1) diff --git a/go.mod b/go.mod index a0aa25f9..ceb63be6 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/go-logr/logr v1.2.4 github.com/go-logr/stdr v1.2.2 github.com/google/uuid v1.3.1 + github.com/kong/blixt v0.2.1 github.com/kong/kubernetes-testing-framework v0.39.1 github.com/onsi/ginkgo/v2 v2.13.0 github.com/onsi/gomega v1.28.1 diff --git a/go.sum b/go.sum index f0e1ac51..8af5acd1 100644 --- a/go.sum +++ b/go.sum @@ -120,6 +120,8 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/kong/blixt v0.2.1 h1:QMBmHziwPhxgbUDl+Be+5cbQ6tbOT9JqGJJF8GtZs0U= +github.com/kong/blixt v0.2.1/go.mod h1:N6gFG9pVhbrva7dzadaWJycGDkG6EBFgybE1hJQoIR0= github.com/kong/kubernetes-testing-framework v0.39.1 h1:30dTVe0Muda3r6NAMQHvdGLuB+nkhZRXnJA8AJjuvO4= github.com/kong/kubernetes-testing-framework v0.39.1/go.mod h1:12TQ5gAkZhuxh47IJcW03iumky1X/T7ZCStuClQ1vzs= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= diff --git a/internal/dataplane/client/client.go b/internal/dataplane/client/client.go index 6e0fc1bc..25bff534 100644 --- a/internal/dataplane/client/client.go +++ b/internal/dataplane/client/client.go @@ -18,64 +18,212 @@ package client import ( "context" + "errors" "fmt" + "sync" + "github.com/go-logr/logr" "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" corev1 "k8s.io/api/core/v1" - "sigs.k8s.io/controller-runtime/pkg/client" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/log" "github.com/kubernetes-sigs/blixt/pkg/vars" ) -// NewDataPlaneClient provides a new client for communicating with the grpc API -// of the data-plane given a function which can provide the API endpoint. -func NewDataPlaneClient(ctx context.Context, c client.Client) (BackendsClient, error) { - endpoints, err := GetDataPlaneEndpointsFromDefaultPods(ctx, c) +// clientInfo encapsulates the gathered information about a BackendsClient +// along with the gRPC client connection. +type clientInfo struct { + conn *grpc.ClientConn + client BackendsClient + name string +} + +// BackendsClientManager is managing the connections and interactions with +// the available BackendsClient servers. +type BackendsClientManager struct { + log logr.Logger + clientset *kubernetes.Clientset + + mu sync.RWMutex + clients map[types.NamespacedName]clientInfo +} + +// NewBackendsClientManager returns an initialized instance of BackendsClientManager. +func NewBackendsClientManager(config *rest.Config) (*BackendsClientManager, error) { + clientset, err := kubernetes.NewForConfig(config) if err != nil { return nil, err } - if len(endpoints) < 1 { - return nil, fmt.Errorf("no endpoints could be found for the dataplane API") + return &BackendsClientManager{ + log: log.FromContext(context.Background()), + clientset: clientset, + mu: sync.RWMutex{}, + clients: map[types.NamespacedName]clientInfo{}, + }, nil +} + +func (c *BackendsClientManager) SetClientsList(ctx context.Context, readyPods map[types.NamespacedName]corev1.Pod) (bool, error) { + // TODO: close and connect to the different clients concurrently. + + clientListUpdated := false + var err error + + // Remove old clients + for nn, backendInfo := range c.clients { + if _, ok := readyPods[nn]; !ok { + c.mu.Lock() + delete(c.clients, nn) + c.mu.Unlock() + + if closeErr := backendInfo.conn.Close(); closeErr != nil { + err = errors.Join(err, closeErr) + continue + } + clientListUpdated = true + } } - if len(endpoints) > 1 { - return nil, fmt.Errorf("TODO: multiple endpoints not currently supported") + // Add new clients + for _, pod := range readyPods { + key := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name} + if _, ok := c.clients[key]; !ok { + + if pod.Status.PodIP == "" { + continue + } + + endpoint := fmt.Sprintf("%s:%d", pod.Status.PodIP, vars.DefaultDataPlaneAPIPort) + c.log.Info("BackendsClientManager", "status", "connecting", "pod", pod.GetName(), "endpoint", endpoint) + + conn, dialErr := grpc.DialContext(ctx, endpoint, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock()) + if dialErr != nil { + c.log.Error(dialErr, "BackendsClientManager", "status", "connection failure", "pod", pod.GetName()) + err = errors.Join(err, dialErr) + continue + } + + c.mu.Lock() + c.clients[key] = clientInfo{ + conn: conn, + client: NewBackendsClient(conn), + name: pod.Name, + } + c.mu.Unlock() + + c.log.Info("BackendsClientManager", "status", "connected", "pod", pod.GetName()) + + clientListUpdated = true + } } - endpoint := endpoints[0] - // TODO: mTLS https://github.com/Kong/blixt/issues/50 - conn, err := grpc.Dial(endpoint, grpc.WithInsecure(), grpc.WithBlock()) //nolint:staticcheck - if err != nil { - return nil, err + return clientListUpdated, err +} + +func (c *BackendsClientManager) Close() { + c.log.Info("BackendsClientManager", "status", "shutting down") + + c.mu.Lock() + defer c.mu.Unlock() + + var wg sync.WaitGroup + wg.Add(len(c.clients)) + + for key, cc := range c.clients { + go func(cc clientInfo) { + defer wg.Done() + cc.conn.Close() + }(cc) + + delete(c.clients, key) } - client := NewBackendsClient(conn) + wg.Wait() - return client, nil + c.log.Info("BackendsClientManager", "status", "shutdown completed") } -// GetDataPlaneEndpointsFromDefaultPods provides a list of endpoints for the -// dataplane API assuming all the default deployment settings (e.g., namespace, -// API port, e.t.c.). -func GetDataPlaneEndpointsFromDefaultPods(ctx context.Context, c client.Client) (endpoints []string, err error) { - pods := new(corev1.PodList) - if err = c.List(context.Background(), pods, client.MatchingLabels{ - "app": vars.DefaultDataPlaneAppLabel, - "component": vars.DefaultDataPlaneComponentLabel, - }, client.InNamespace(vars.DefaultNamespace)); err != nil { - return +func (c *BackendsClientManager) getClientsInfo() []clientInfo { + c.mu.RLock() + defer c.mu.RUnlock() + + backends := make([]clientInfo, 0, len(c.clients)) + for _, backendClient := range c.clients { + backends = append(backends, backendClient) } - for _, pod := range pods.Items { - if pod.Status.PodIP == "" { - err = fmt.Errorf("pod %s/%s doesn't have an IP yet", pod.Namespace, pod.Name) - return - } + return backends +} + +// Update sends an update request to all available BackendsClient servers concurrently. +func (c *BackendsClientManager) Update(ctx context.Context, in *Targets, opts ...grpc.CallOption) (*Confirmation, error) { + clientsInfo := c.getClientsInfo() + + var wg sync.WaitGroup + wg.Add(len(clientsInfo)) + + errs := make(chan error, len(clientsInfo)) + + for _, ci := range clientsInfo { + go func(ci clientInfo) { + defer wg.Done() + + conf, err := ci.client.Update(ctx, in, opts...) + if err != nil { + c.log.Error(err, "BackendsClientManager", "operation", "update", "pod", ci.name) + errs <- err + return + } + c.log.Info("BackendsClientManager", "operation", "update", "pod", ci.name, "confirmation", conf.Confirmation) + }(ci) + } + + wg.Wait() + close(errs) + + var err error + for e := range errs { + err = errors.Join(err, e) + } + + return nil, err +} + +// Delete sends an delete request to all available BackendsClient servers concurrently. +func (c *BackendsClientManager) Delete(ctx context.Context, in *Vip, opts ...grpc.CallOption) (*Confirmation, error) { + clientsInfo := c.getClientsInfo() + + var wg sync.WaitGroup + wg.Add(len(clientsInfo)) + + errs := make(chan error, len(clientsInfo)) + + for _, ci := range clientsInfo { + go func(ci clientInfo) { + defer wg.Done() + + conf, err := ci.client.Delete(ctx, in, opts...) + if err != nil { + c.log.Error(err, "BackendsClientManager", "operation", "delete", "pod", ci.name) + errs <- err + return + } + c.log.Info("BackendsClientManager", "operation", "delete", "pod", ci.name, "confirmation", conf.Confirmation) + + }(ci) + } + + wg.Wait() + close(errs) - newEndpoint := fmt.Sprintf("%s:%d", pod.Status.PodIP, vars.DefaultDataPlaneAPIPort) - endpoints = append(endpoints, newEndpoint) + var err error + for e := range errs { + err = errors.Join(err, e) } - return + return nil, err } diff --git a/main.go b/main.go index 08e7bc0a..9c0219dd 100644 --- a/main.go +++ b/main.go @@ -17,6 +17,7 @@ limitations under the License. package main import ( + "context" "flag" "os" @@ -28,6 +29,7 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/log/zap" "sigs.k8s.io/controller-runtime/pkg/metrics/server" @@ -35,6 +37,7 @@ import ( gatewayv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1" "github.com/kubernetes-sigs/blixt/controllers" + "github.com/kubernetes-sigs/blixt/internal/dataplane/client" //+kubebuilder:scaffold:imports ) @@ -68,7 +71,8 @@ func main() { ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts))) - mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ + cfg := ctrl.GetConfigOrDie() + mgr, err := ctrl.NewManager(cfg, ctrl.Options{ Scheme: scheme, Metrics: server.Options{ BindAddress: metricsAddr, @@ -93,6 +97,22 @@ func main() { os.Exit(1) } + clientsManager, err := client.NewBackendsClientManager(cfg) + if err != nil { + setupLog.Error(err, "unable to create backends client manager") + os.Exit(1) + } + defer clientsManager.Close() + + dataplaneReconciler := controllers.NewDataplaneReconciler(mgr.GetClient(), mgr.GetScheme(), clientsManager) + if err = dataplaneReconciler.SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "Dataplane") + os.Exit(1) + } + + ctx := ctrl.SetupSignalHandler() + udpReconcileRequestChan, tcpReconcileRequestChan := tee(ctx, dataplaneReconciler.GetUpdates()) + if err = (&controllers.GatewayReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), @@ -108,15 +128,19 @@ func main() { os.Exit(1) } if err = (&controllers.UDPRouteReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + ClientReconcileRequestChan: udpReconcileRequestChan, + BackendsClientManager: clientsManager, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "UDPRoute") os.Exit(1) } if err = (&controllers.TCPRouteReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + ClientReconcileRequestChan: tcpReconcileRequestChan, + BackendsClientManager: clientsManager, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "TCPRoute") os.Exit(1) @@ -133,8 +157,60 @@ func main() { } setupLog.Info("starting manager") - if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { + if err := mgr.Start(ctx); err != nil { setupLog.Error(err, "problem running manager") os.Exit(1) } } + +// Tee consumes the received channel and mirrors the messages into 2 new channels. +func tee(ctx context.Context, in <-chan event.GenericEvent) (_, _ <-chan event.GenericEvent) { + out1, out2 := make(chan event.GenericEvent), make(chan event.GenericEvent) + + OrDone := func(ctx context.Context, in <-chan event.GenericEvent) <-chan event.GenericEvent { + out := make(chan event.GenericEvent) + go func() { + defer close(out) + + for { + select { + case <-ctx.Done(): + return + case i, ok := <-in: + if !ok { + return + } + select { + case out <- i: + case <-ctx.Done(): + } + } + } + }() + return out + } + + go func() { + defer close(out1) + defer close(out2) + + for val := range OrDone(ctx, in) { + select { + case <-ctx.Done(): + return + case out1 <- val: + select { + case <-ctx.Done(): + case out2 <- val: + } + + case out2 <- val: + select { + case <-ctx.Done(): + case out1 <- val: + } + } + } + }() + return out1, out2 +}