Skip to content

Commit

Permalink
Merge pull request #238 from peter-wangxu/fix_konnnectivity
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-wangxu authored Sep 26, 2023
2 parents 1b28a09 + a25ef3f commit 4e7d041
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 27 deletions.
28 changes: 18 additions & 10 deletions pkg/csi/client/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,14 @@ func (c *workerConnection) Close() error {
return c.conn.Close()
}

func connect(address string, timeout time.Duration, proxyOpts GrpcProxyClientOptions) (*grpc.ClientConn, error) {
log.V(6).Infof("New Connecting to %s", address)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
// connect create new connection to the `address`
// if proxyOpts.ProxyUDSName or proxyOpts.ProxyHost is not empty, a proxied connection returned
// currently, the proxy server can only be konnnectivity
func connect(address string, timeout time.Duration, proxyOpts GrpcProxyClientOptions) (conn *grpc.ClientConn, err error) {
log.V(6).Infof("New Connecting to remote address %s ...", address)
connectCtx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

// only for unit test
var bufDialerFunc func(context.Context, string) (net.Conn, error)
dialOptions := []grpc.DialOption{
Expand All @@ -103,18 +107,22 @@ func connect(address string, timeout time.Duration, proxyOpts GrpcProxyClientOpt
}
dialOptions = append(dialOptions, grpc.WithContextDialer(bufDialerFunc))
}
// end for unit test
// setup konnectivity proxy connection
var proxyAddress string
if proxyOpts.ProxyHost != "" || proxyOpts.ProxyUDSName != "" {
var err error
var proxyDialer proxyFunc
if proxyOpts.ProxyUDSName == "" {
proxyDialer, err = getKonnectivityMTLSDialer(ctx, address, timeout, proxyOpts)
proxyAddress = proxyOpts.ProxyHost
proxyDialer, err = getKonnectivityMTLSDialer(connectCtx, address, proxyOpts)
} else {
proxyDialer, err = getKonnectivityUDSDialer(ctx, address, timeout, proxyOpts)
proxyAddress = proxyOpts.ProxyUDSName
proxyDialer, err = getKonnectivityUDSDialer(connectCtx, address, proxyOpts)
}
if err != nil {
return nil, fmt.Errorf("failed to setup konnectivity dialer: %w", err)
return nil, fmt.Errorf("failed to setup konnectivity dialer(%q): %w", address, err)
}
log.Infof("connected to proxy server %q.", proxyAddress)
dialOptions = append(dialOptions, grpc.WithContextDialer(proxyDialer))
}
// if strings.HasPrefix(address, "/") {
Expand All @@ -129,14 +137,14 @@ func connect(address string, timeout time.Duration, proxyOpts GrpcProxyClientOpt
return net.DialTimeout("unix", addr, timeout)
}))
}
conn, err := grpc.Dial(address, dialOptions...)
conn, err = grpc.Dial(address, dialOptions...)

if err != nil {
return nil, err
}
for {
if !conn.WaitForStateChange(ctx, conn.GetState()) {
log.Warningf("Connection to %s timed out", address)
if !conn.WaitForStateChange(connectCtx, conn.GetState()) {
log.Warningf("Connection to %s timed out, subsequent calls might fail due to this.", address)
return conn, nil // return nil, subsequent GetPluginInfo will show the real connection error
}
if conn.GetState() == connectivity.Ready {
Expand Down
13 changes: 6 additions & 7 deletions pkg/csi/client/konnectivity.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"net/http"
"os"
"path/filepath"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
Expand All @@ -47,7 +46,7 @@ type GrpcProxyClientOptions struct {

type proxyFunc func(ctx context.Context, addr string) (net.Conn, error)

func getKonnectivityUDSDialer(ctx context.Context, address string, timeout time.Duration, o GrpcProxyClientOptions) (func(ctx context.Context, addr string) (net.Conn, error), error) {
func getKonnectivityUDSDialer(ctx context.Context, address string,o GrpcProxyClientOptions) (func(ctx context.Context, addr string) (net.Conn, error), error) {
log.Infof("using konnectivity UDS dialer")

var proxyConn net.Conn
Expand All @@ -57,15 +56,15 @@ func getKonnectivityUDSDialer(ctx context.Context, address string, timeout time.
switch o.Mode {
case "grpc":
dialOption := grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
c, err := net.DialTimeout("unix", o.ProxyUDSName, timeout)
c, err := net.Dial("unix", o.ProxyUDSName)
if err != nil {
log.ErrorS(err, "failed to create connection to uds", "name", o.ProxyUDSName)
}
return c, err
})
tunnel, err := client.CreateSingleUseGrpcTunnelWithContext(
context.TODO(),
ctx,
ctx,// create context should follow grpc timeout configuration
context.TODO(), // tunnel context use context.TODO()
o.ProxyUDSName,
dialOption,
grpc.WithTransportCredentials(insecure.NewCredentials()),
Expand Down Expand Up @@ -112,7 +111,7 @@ func getKonnectivityUDSDialer(ctx context.Context, address string, timeout time.
}, nil
}

func getKonnectivityMTLSDialer(ctx context.Context, address string, _ time.Duration, o GrpcProxyClientOptions) (func(ctx context.Context, addr string) (net.Conn, error), error) {
func getKonnectivityMTLSDialer(ctx context.Context, address string, o GrpcProxyClientOptions) (func(ctx context.Context, addr string) (net.Conn, error), error) {
log.Infof("using konnectivity mTLS dialer")

tlsConfig, err := getClientTLSConfig(o.CACert, o.ClientCert, o.ClientKey, o.ProxyHost, nil)
Expand All @@ -126,7 +125,7 @@ func getKonnectivityMTLSDialer(ctx context.Context, address string, _ time.Durat
transportCreds := credentials.NewTLS(tlsConfig)
dialOption := grpc.WithTransportCredentials(transportCreds)
serverAddress := fmt.Sprintf("%s:%d", o.ProxyHost, o.ProxyPort)
tunnel, err := client.CreateSingleUseGrpcTunnelWithContext(context.TODO(), ctx, serverAddress, dialOption)
tunnel, err := client.CreateSingleUseGrpcTunnelWithContext(ctx, context.TODO(), serverAddress, dialOption)
if err != nil {
return nil, fmt.Errorf("failed to create tunnel %s, got %v", serverAddress, err)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/csi/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
return nil, status.Errorf(codes.Internal, "CreateVolume: fail to get lv %s from node %s: %s", req.Name, nodeName, err.Error())
} else {
if lvName == "" {
log.Info("CreateVolume: volume %s not found, creating volume on node %s", volumeID, nodeName)
log.Infof("CreateVolume: volume %s is not found, creating volume on node %s", volumeID, nodeName)
outstr, err := conn.CreateVolume(ctx, options)
if err != nil {
return nil, status.Errorf(codes.Internal, "CreateVolume: fail to create lv %s(options: %v): %s", utils.GetNameKey(vgName, volumeID), options, err.Error())
Expand Down Expand Up @@ -767,6 +767,7 @@ func (cs *controllerServer) newCreateSnapshotResponse(snapshotId, sourceVolumeId
}, nil
}

// getNodeConn creates a new connection to the lvmd for the `nodeSelected` node
func (cs *controllerServer) getNodeConn(nodeSelected string) (client.Connection, error) {
node, err := cs.nodeLister.Get(nodeSelected)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/csi/nodeutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,15 +476,15 @@ func (ns *nodeServer) createLvm(vgName, volumeID, lvmType, unit string, pvSize i
log.Errorf("createVolume:: lvcreate command %s error: %v", cmd, err)
return err
}
log.Infof("Successful Create Striping LVM volume: %s, with command: %s", volumeID, cmd)
log.Infof("Successfully Create Striping LVM volume: %s, with command: %s", volumeID, cmd)
} else if lvmType == LinearType {
cmd := fmt.Sprintf("%s lvcreate -n %s -L %d%s -Wy -y %s", localtype.NsenterCmd, volumeID, pvSize, unit, vgName)
_, err := ns.osTool.RunCommand(cmd)
if err != nil {
log.Errorf("createVolume:: lvcreate linear command %s error: %v", cmd, err)
return err
}
log.Infof("Successful Create Linear LVM volume: %s, with command: %s", volumeID, cmd)
log.Infof("Successfully Create Linear LVM volume: %s, with command: %s", volumeID, cmd)
}
return nil
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/scheduler/algorithm/algo/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ func ProcessDevicePVC(pod *corev1.Pod, pvcs []*corev1.PersistentVolumeClaim, nod

func ScoreInlineLVMVolume(pod *corev1.Pod, node *corev1.Node, ctx *algorithm.SchedulingContext) (score int, units []cache.AllocatedUnit, err error) {
if pod != nil {
klog.Infof("allocating lvm volume for pod %s", utils.GetName(pod.ObjectMeta))
klog.Infof("scoring lvm volume for pod %s", utils.GetName(pod.ObjectMeta))
}

fits, units, err := HandleInlineLVMVolume(ctx, node, pod)
Expand All @@ -550,7 +550,7 @@ func ScoreLVMVolume(pod *corev1.Pod, pvcs []*corev1.PersistentVolumeClaim, node
return
}
if pod != nil {
klog.Infof("allocating lvm volume for pod %s", utils.GetName(pod.ObjectMeta))
klog.Infof("scoring lvm volume for pod %s on node %s", utils.GetName(pod.ObjectMeta), node.Name)
}

fits, units, err := ProcessLVMPVCPriority(pod, pvcs, node, ctx)
Expand Down Expand Up @@ -782,7 +782,7 @@ func ScoreMountPointVolume(
return
}
if pod != nil {
klog.Infof("allocating mount point volume for pod %s", utils.GetName(pod.ObjectMeta))
klog.Infof("scoring mount point volume for pod %s", utils.GetName(pod.ObjectMeta))
}

klog.V(6).Infof("pvcs: %#v, node: %#v", pvcs, node)
Expand Down Expand Up @@ -818,7 +818,7 @@ func ScoreDeviceVolume(
return
}
if pod != nil {
klog.Infof("allocating device volume for pod %s", utils.GetName(pod.ObjectMeta))
klog.Infof("scoring device volume for pod %s", utils.GetName(pod.ObjectMeta))
}

klog.V(6).Infof("pvcs: %#v, node: %#v", pvcs, node)
Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/algorithm/predicates/capacity.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,10 @@ func CapacityPredicate(ctx *algorithm.SchedulingContext, pod *corev1.Pod, node *
return false, err
}
if !fits {
klog.Info("pod %s not fit node %s readonly snapshot", utils.GetName(pod.ObjectMeta))
klog.Infof("pod %s not fit node %s readonly snapshot", utils.GetName(pod.ObjectMeta), node.Name)
return false, errors.NewSnapshotError(pkg.VolumeTypeLVM)
} else {
klog.Info("pod %s fit node %s readonly snapshot!", utils.GetName(pod.ObjectMeta))
klog.Infof("pod %s fit node %s readonly snapshot!", utils.GetName(pod.ObjectMeta), node.Name)
}

if len(lvmPVCs) <= 0 && len(mpPVCs) <= 0 && len(devicePVCs) <= 0 && !containInlineVolume {
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/algorithm/predicates/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func Predicates(Ctx *algorithm.SchedulingContext, PredicateFuncs []PredicateFunc
for _, pre := range PredicateFuncs {
fits, err = pre(Ctx, pod, node)
isError, failReasons := normalizeError(err)
log.Infof("fits: %t,failReasons: %s, err: %+v", fits, failReasons, err)
log.Infof("[predicate=%s]fits: %t,failReasons: %s, err: %+v", utils.GetFuncName(pre, false), fits, failReasons, err)

if isError && err != nil {
log.Errorf("scheduling terminated for %s: %s", utils.GetName(pod.ObjectMeta), err.Error())
Expand Down
15 changes: 15 additions & 0 deletions pkg/utils/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"os"
"os/exec"
"reflect"
"runtime"
"strings"
"time"

Expand Down Expand Up @@ -933,3 +934,17 @@ func GetNameKey(nameSpace, name string) string {
func GetName(meta metav1.ObjectMeta) string {
return GetNameKey(meta.Namespace, meta.Name)
}

// GetFuncName returns funcname in the form of 'github.com/alibaba/open-local/pkg/scheduler/algorithm/predicates.CapacityPredicate'
// if `full=true`, otherwise 'predicates.CapacityPredicate'
func GetFuncName(funcName interface{}, full bool) string {
origin := runtime.FuncForPC(reflect.ValueOf(funcName).Pointer()).Name()
if !full {
funcs := strings.Split(origin, "/")
if len(funcs) > 1 {
return funcs[len(funcs)-1]
}
return funcs[0]
}
return origin
}

0 comments on commit 4e7d041

Please sign in to comment.