Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor daemon to allow testing platform detection #196

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 5 additions & 131 deletions cmd/daemon/daemon.go
Original file line number Diff line number Diff line change
@@ -1,125 +1,21 @@
package main

import (
"context"
"errors"
"flag"
"fmt"
"io"
"net"
"os"

daemon "github.com/openshift/dpu-operator/internal/daemon"
nfdevicehandler "github.com/openshift/dpu-operator/internal/daemon/device-handler/nf-device-handler"
sriovdevicehandler "github.com/openshift/dpu-operator/internal/daemon/device-handler/sriov-device-handler"
deviceplugin "github.com/openshift/dpu-operator/internal/daemon/device-plugin"
"github.com/openshift/dpu-operator/internal/platform"
"github.com/openshift/dpu-operator/internal/utils"
"go.uber.org/zap/zapcore"

"github.com/go-logr/logr"
"github.com/openshift/dpu-operator/internal/daemon/plugin"
v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
)

var ()

type Daemon interface {
Listen() (net.Listener, error)
ListenAndServe() error
Serve(listen net.Listener) error
Stop()
}

func isDpuMode(log logr.Logger, mode string) (bool, error) {
if mode == "host" {
return false, nil
} else if mode == "dpu" {
return true, nil
} else if mode == "auto" {
platform := platform.NewPlatformInfo()
detectedDpuMode, err := platform.IsDpu()
if err != nil {
return false, fmt.Errorf("Failed to query platform info: %v", err)
}
log.Info("Autodetected mode", "isDPU", detectedDpuMode)
return detectedDpuMode, nil
} else {
return false, errors.New("Invalid mode")
}
}

func createDaemon(dpuMode bool, config *rest.Config, vspImages map[string]string, client client.Client) (Daemon, error) {
platform := platform.NewPlatformInfo()
plugin, err := platform.VspPlugin(dpuMode, vspImages, client)
if err != nil {
return nil, err
}

if dpuMode {
deviceHandler := nfdevicehandler.NewNfDeviceHandler()
dp := deviceplugin.NewDevicePlugin(deviceHandler)
return daemon.NewDpuDaemon(plugin, dp, config), nil
} else {
deviceHandler := sriovdevicehandler.NewSriovDeviceHandler()
dp := deviceplugin.NewDevicePlugin(deviceHandler)
return daemon.NewHostDaemon(plugin, dp), nil
}
}

func copyFile(src, dst string) error {
sourceFile, err := os.Open(src)
if err != nil {
return err
}
defer sourceFile.Close()

destinationFile, err := os.Create(dst)
if err != nil {
return err
}
defer destinationFile.Close()

_, err = io.Copy(destinationFile, sourceFile)
if err != nil {
return err
}

return nil
}

func makeExecutable(file string) error {
info, err := os.Stat(file)
if err != nil {
return err
}

newMode := info.Mode() | 0111

if err := os.Chmod(file, newMode); err != nil {
return err
}

return nil
}

func prepareCni(path string) error {
err := copyFile("/dpu-cni", path)
if err != nil {
return err
}
return makeExecutable(path)
}

func main() {
var mode string
var err error
var dpuMode bool
flag.StringVar(&mode, "mode", "", "Mode for the daemon, can be either host or dpu")
opts := zap.Options{
Development: true,
Expand All @@ -137,35 +33,13 @@ func main() {
Scheme: scheme.Scheme,
})

ce := utils.NewClusterEnvironment(client)
flavour, err := ce.Flavour(context.TODO())
if err != nil {
log.Error(err, "Failed to get cluster flavour")
return
}
log.Info("Detected OpenShift", "flavour", flavour)
pm := utils.NewPathManager("/")
cniPath, err := pm.CniPath(flavour)
if err != nil {
log.Error(err, "Failed to get cni path")
return
}
err = prepareCni(cniPath)
if err != nil {
log.Error(err, "Failed to prepare CNI binary", "path", cniPath)
return
}
log.Info("Prepared CNI binary", "path", cniPath)
dpuMode, err = isDpuMode(log, mode)
if err != nil {
log.Error(err, "Failed to parse mode")
log.Error(err, "Failed to create client")
return
}

vspImages := plugin.CreateVspImagesMap(true, log)
daemon, err := createDaemon(dpuMode, config, vspImages, client)
if err != nil {
log.Error(err, "Failed to start daemon")
return
}
daemon.ListenAndServe()

d := daemon.NewDaemon(mode, client, scheme.Scheme, vspImages, config)
d.Run()
}
130 changes: 130 additions & 0 deletions internal/daemon/daemon.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package daemon

import (
"context"
"errors"
"fmt"
"net"

nfdevicehandler "github.com/openshift/dpu-operator/internal/daemon/device-handler/nf-device-handler"
sriovdevicehandler "github.com/openshift/dpu-operator/internal/daemon/device-handler/sriov-device-handler"
deviceplugin "github.com/openshift/dpu-operator/internal/daemon/device-plugin"
"github.com/openshift/dpu-operator/internal/platform"
"github.com/openshift/dpu-operator/internal/utils"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)

var ()

type SideManager interface {
Listen() (net.Listener, error)
ListenAndServe() error
Serve(listen net.Listener) error
Stop()
}

func createDaemon(dpuMode bool, config *rest.Config, vspImages map[string]string, client client.Client) (SideManager, error) {
platform := platform.NewPlatformInfo()
plugin, err := platform.VspPlugin(dpuMode, vspImages, client)
if err != nil {
return nil, err
}

if dpuMode {
deviceHandler := nfdevicehandler.NewNfDeviceHandler()
dp := deviceplugin.NewDevicePlugin(deviceHandler)
return NewDpuSideManger(plugin, dp, config), nil
} else {
deviceHandler := sriovdevicehandler.NewSriovDeviceHandler()
dp := deviceplugin.NewDevicePlugin(deviceHandler)
return NewHostSideManager(plugin, dp), nil
}
}

type Daemon struct {
client client.Client
mode string
pm *utils.PathManager
log logr.Logger
vspImages map[string]string
config *rest.Config
}

func NewDaemon(mode string, client client.Client, scheme *runtime.Scheme, vspImages map[string]string, config *rest.Config) Daemon {
log := ctrl.Log.WithName("Daemon")
return Daemon{
client: client,
mode: mode,
pm: utils.NewPathManager("/"),
log: log,
vspImages: vspImages,
config: config,
}
}

func (d *Daemon) Run() {
ce := utils.NewClusterEnvironment(d.client)
flavour, err := ce.Flavour(context.TODO())
if err != nil {
d.log.Error(err, "Failed to get cluster flavour")
return
}
d.log.Info("Detected OpenShift", "flavour", flavour)
err = d.prepareCni(flavour)
if err != nil {
return
}
dpuMode, err := d.isDpuMode()
if err != nil {
d.log.Error(err, "Failed to parse mode")
return
}
daemon, err := createDaemon(dpuMode, d.config, d.vspImages, d.client)
if err != nil {
d.log.Error(err, "Failed to start daemon")
return
}
daemon.ListenAndServe()
}

func (d *Daemon) prepareCni(flavour utils.Flavour) error {
cniPath, err := d.pm.CniPath(flavour)
if err != nil {
d.log.Error(err, "Failed to get cni path")
return err
}
err = utils.CopyFile("/dpu-cni", cniPath)
if err != nil {
d.log.Error(err, "Failed to prepare CNI binary", "path", cniPath)
return err
}
err = utils.MakeExecutable(cniPath)
if err != nil {
return err
}
d.log.Info("Prepared CNI binary", "path", cniPath)
return nil
}

func (d *Daemon) isDpuMode() (bool, error) {
if d.mode == "host" {
return false, nil
} else if d.mode == "dpu" {
return true, nil
} else if d.mode == "auto" {
platform := platform.NewPlatformInfo()
detectedDpuMode, err := platform.IsDpu()
if err != nil {
return false, fmt.Errorf("Failed to query platform info: %v", err)
}
d.log.Info("Autodetected mode", "isDPU", detectedDpuMode)
return detectedDpuMode, nil
} else {
return false, errors.New("Invalid mode")
}
}
28 changes: 14 additions & 14 deletions internal/daemon/dpudaemon.go → internal/daemon/dpusidemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/metrics/server"
)

type DpuDaemon struct {
type DpuSideManager struct {
pb.UnimplementedBridgePortServiceServer
pb2.UnimplementedDeviceServiceServer

Expand All @@ -46,19 +46,19 @@ type DpuDaemon struct {
pathManager utils.PathManager
}

func (s *DpuDaemon) CreateBridgePort(context context.Context, bpr *pb.CreateBridgePortRequest) (*pb.BridgePort, error) {
func (s *DpuSideManager) CreateBridgePort(context context.Context, bpr *pb.CreateBridgePortRequest) (*pb.BridgePort, error) {
s.log.Info("Passing CreateBridgePort", "name", bpr.BridgePort.Name)
return s.vsp.CreateBridgePort(bpr)
}

func (s *DpuDaemon) DeleteBridgePort(context context.Context, bpr *pb.DeleteBridgePortRequest) (*emptypb.Empty, error) {
func (s *DpuSideManager) DeleteBridgePort(context context.Context, bpr *pb.DeleteBridgePortRequest) (*emptypb.Empty, error) {
s.log.Info("Passing DeleteBridgePort", "name", bpr.Name)
err := s.vsp.DeleteBridgePort(bpr)
return &emptypb.Empty{}, err
}

func NewDpuDaemon(vsp plugin.VendorPlugin, dp deviceplugin.DevicePlugin, config *rest.Config, opts ...func(*DpuDaemon)) *DpuDaemon {
d := &DpuDaemon{
func NewDpuSideManger(vsp plugin.VendorPlugin, dp deviceplugin.DevicePlugin, config *rest.Config, opts ...func(*DpuSideManager)) *DpuSideManager {
d := &DpuSideManager{
vsp: vsp,
dp: dp,
pathManager: *utils.NewPathManager("/"),
Expand All @@ -75,13 +75,13 @@ func NewDpuDaemon(vsp plugin.VendorPlugin, dp deviceplugin.DevicePlugin, config
return d
}

func WithPathManager(pathManager utils.PathManager) func(*DpuDaemon) {
return func(d *DpuDaemon) {
func WithPathManager(pathManager utils.PathManager) func(*DpuSideManager) {
return func(d *DpuSideManager) {
d.pathManager = pathManager
}
}

func (d *DpuDaemon) cniCmdNfAddHandler(req *cnitypes.PodRequest) (*cni100.Result, error) {
func (d *DpuSideManager) cniCmdNfAddHandler(req *cnitypes.PodRequest) (*cni100.Result, error) {
d.log.Info("cniCmdNfAddHandler")
res, err := networkfn.CmdAdd(req)
if err != nil {
Expand All @@ -98,7 +98,7 @@ func (d *DpuDaemon) cniCmdNfAddHandler(req *cnitypes.PodRequest) (*cni100.Result
return res, nil
}

func (d *DpuDaemon) cniCmdNfDelHandler(req *cnitypes.PodRequest) (*cni100.Result, error) {
func (d *DpuSideManager) cniCmdNfDelHandler(req *cnitypes.PodRequest) (*cni100.Result, error) {
d.log.Info("cniCmdNfDelHandler")
err := networkfn.CmdDel(req)
if err != nil {
Expand All @@ -118,7 +118,7 @@ func (d *DpuDaemon) cniCmdNfDelHandler(req *cnitypes.PodRequest) (*cni100.Result
return nil, nil
}

func (d *DpuDaemon) Listen() (net.Listener, error) {
func (d *DpuSideManager) Listen() (net.Listener, error) {
d.startedWg.Add(1)
d.log.Info("Starting DpuDaemon")
d.setupReconcilers()
Expand Down Expand Up @@ -151,7 +151,7 @@ func (d *DpuDaemon) Listen() (net.Listener, error) {
return lis, err
}

func (d *DpuDaemon) ListenAndServe() error {
func (d *DpuSideManager) ListenAndServe() error {
listener, err := d.Listen()

if err != nil {
Expand All @@ -162,7 +162,7 @@ func (d *DpuDaemon) ListenAndServe() error {
return d.Serve(listener)
}

func (d *DpuDaemon) Serve(listener net.Listener) error {
func (d *DpuSideManager) Serve(listener net.Listener) error {

d.wg.Add(1)
go func() {
Expand Down Expand Up @@ -232,12 +232,12 @@ func (d *DpuDaemon) Serve(listener net.Listener) error {
return err
}

func (d *DpuDaemon) Stop() {
func (d *DpuSideManager) Stop() {
d.done <- nil
d.startedWg.Wait()
}

func (d *DpuDaemon) setupReconcilers() {
func (d *DpuSideManager) setupReconcilers() {
if d.manager == nil {
t := time.Duration(0)

Expand Down
Loading