Skip to content

Commit

Permalink
Add konnectivity-client dialer
Browse files Browse the repository at this point in the history
  • Loading branch information
rafi committed Aug 28, 2023
1 parent 84a854f commit 230a1ac
Show file tree
Hide file tree
Showing 6 changed files with 297 additions and 6 deletions.
9 changes: 9 additions & 0 deletions cmd/csi/csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,15 @@ func Start(opt *csiOption) error {
csi.WithDriverMode(opt.DriverMode),
csi.WithUseNodeHostname(opt.UseNodeHostname),
csi.WithEnableSpdk(opt.EnableSpdk),
csi.WithKonnectivity(
opt.KonnectivityUDS,
opt.KonnectivityProxyHost,
opt.KonnectivityProxyPort,
opt.KonnectivityProxyMode,
opt.KonnectivityClientCert,
opt.KonnectivityClientKey,
opt.KonnectivityCACert,
),
)
if err := driver.Run(); err != nil {
return err
Expand Down
14 changes: 14 additions & 0 deletions cmd/csi/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ type csiOption struct {
EnableSpdk bool
ExtenderSchedulerNames []string
FrameworkSchedulerNames []string
KonnectivityUDS string
KonnectivityProxyHost string
KonnectivityProxyPort int
KonnectivityProxyMode string
KonnectivityClientCert string
KonnectivityClientKey string
KonnectivityCACert string
}

func (option *csiOption) addFlags(fs *pflag.FlagSet) {
Expand All @@ -53,4 +60,11 @@ func (option *csiOption) addFlags(fs *pflag.FlagSet) {
fs.BoolVar(&option.EnableSpdk, "enable-spdk", true, "enable spdk or not")
fs.StringSliceVar(&option.ExtenderSchedulerNames, "extender-scheduler-names", []string{"default-scheduler"}, "extender scheduler names")
fs.StringSliceVar(&option.FrameworkSchedulerNames, "framework-scheduler-names", []string{}, "framework scheduler names")
fs.StringVar(&option.KonnectivityUDS, "konnectivity-uds", "", "apiserver-network-proxy unix socket path")
fs.StringVar(&option.KonnectivityProxyHost, "konnectivity-proxy-host", "", "apiserver-network-proxy server host")
fs.IntVar(&option.KonnectivityProxyPort, "konnectivity-proxy-port", 0, "apiserver-network-proxy server port")
fs.StringVar(&option.KonnectivityProxyMode, "konnectivity-proxy-mode", "", "apiserver-network-proxy proxy mode")
fs.StringVar(&option.KonnectivityClientCert, "konnectivity-client-cert", "", "apiserver-network-proxy client cert")
fs.StringVar(&option.KonnectivityClientKey, "konnectivity-client-key", "", "apiserver-network-proxy client key")
fs.StringVar(&option.KonnectivityCACert, "konnectivity-ca-cert", "", "apiserver-network-proxy CA cert")
}
24 changes: 19 additions & 5 deletions pkg/csi/client/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ func MustRunThisWhenTest() {
}

// NewGrpcConnection lvm connection
func NewGrpcConnection(address string, timeout time.Duration) (Connection, error) {
conn, err := connect(address, timeout)
func NewGrpcConnection(address string, timeout time.Duration, konnectivityOpts GrpcProxyClientOptions) (Connection, error) {
conn, err := connect(address, timeout, konnectivityOpts)
if err != nil {
return nil, err
}
Expand All @@ -87,8 +87,10 @@ func (c *workerConnection) Close() error {
return c.conn.Close()
}

func connect(address string, timeout time.Duration) (*grpc.ClientConn, error) {
func connect(address string, timeout time.Duration, proxyOpts GrpcProxyClientOptions) (*grpc.ClientConn, error) {
log.V(6).Infof("New Connecting to %s", address)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
// only for unit test
var bufDialerFunc func(context.Context, string) (net.Conn, error)
dialOptions := []grpc.DialOption{
Expand All @@ -101,6 +103,20 @@ func connect(address string, timeout time.Duration) (*grpc.ClientConn, error) {
}
dialOptions = append(dialOptions, grpc.WithContextDialer(bufDialerFunc))
}
// setup konnectivity proxy connection
if proxyOpts.ProxyHost != "" || proxyOpts.ProxyUDSName != "" {
var err error
var proxyDialer proxyFunc
if proxyOpts.ProxyUDSName == "" {
proxyDialer, err = getKonnectivityMTLSDialer(ctx, address, timeout, proxyOpts)
} else {
proxyDialer, err = getKonnectivityUDSDialer(ctx, address, timeout, proxyOpts)
}
if err != nil {
return nil, fmt.Errorf("failed to setup konnectivity dialer: %w", err)
}
dialOptions = append(dialOptions, grpc.WithContextDialer(proxyDialer))
}
// if strings.HasPrefix(address, "/") {
// dialOptions = append(dialOptions, grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
// return net.DialTimeout("unix", addr, timeout)
Expand All @@ -118,8 +134,6 @@ func connect(address string, timeout time.Duration) (*grpc.ClientConn, error) {
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
for {
if !conn.WaitForStateChange(ctx, conn.GetState()) {
log.Warningf("Connection to %s timed out", address)
Expand Down
213 changes: 213 additions & 0 deletions pkg/csi/client/konnectivity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package client

import (
"bufio"
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"net"
"net/http"
"os"
"path/filepath"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
log "k8s.io/klog/v2"
"sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client"
)

type GrpcProxyClientOptions struct {
Mode string
ClientCert string
ClientKey string
CACert string
ProxyUDSName string
ProxyHost string
ProxyPort int
}

type proxyFunc func(ctx context.Context, addr string) (net.Conn, error)

func getKonnectivityUDSDialer(ctx context.Context, address string, timeout time.Duration, o GrpcProxyClientOptions) (func(ctx context.Context, addr string) (net.Conn, error), error) {
log.Infof("using konnectivity UDS dialer")

var proxyConn net.Conn
var userAgent = "csi-open-local"
var err error

switch o.Mode {
case "grpc":
dialOption := grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
c, err := net.DialTimeout("unix", o.ProxyUDSName, timeout)
if err != nil {
log.ErrorS(err, "failed to create connection to uds", "name", o.ProxyUDSName)
}
return c, err
})
tunnel, err := client.CreateSingleUseGrpcTunnelWithContext(
context.TODO(),
ctx,
o.ProxyUDSName,
dialOption,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithUserAgent(userAgent),
)
if err != nil {
return nil, fmt.Errorf("failed to create tunnel %s, got %v", o.ProxyUDSName, err)
}

proxyConn, err = tunnel.DialContext(ctx, "tcp", address)
if err != nil {
return nil, fmt.Errorf("failed to dial request %s, got %v", address, err)
}
case "http-connect":
proxyConn, err = net.Dial("unix", o.ProxyUDSName)
if err != nil {
return nil, fmt.Errorf("dialing proxy %q failed: %v", o.ProxyUDSName, err)
}
fmt.Fprintf(proxyConn, "CONNECT %s HTTP/1.1\r\nHost: %s\r\nUser-Agent: %s\r\n\r\n", address, "127.0.0.1", userAgent)
br := bufio.NewReader(proxyConn)
res, err := http.ReadResponse(br, nil)
if err != nil {
return nil, fmt.Errorf("reading HTTP response from CONNECT to %s via uds proxy %s failed: %v",
address, o.ProxyUDSName, err)
}
if res.StatusCode != 200 {
return nil, fmt.Errorf("proxy error from %s while dialing %s: %v", o.ProxyUDSName, address, res.Status)
}

// It's safe to discard the bufio.Reader here and return the
// original TCP conn directly because we only use this for
// TLS, and in TLS the client speaks first, so we know there's
// no unbuffered data. But we can double-check.
if br.Buffered() > 0 {
return nil, fmt.Errorf("unexpected %d bytes of buffered data from CONNECT uds proxy %q",
br.Buffered(), o.ProxyUDSName)
}
default:
return nil, fmt.Errorf("failed to process mode %s", o.Mode)
}

return func(ctx context.Context, addr string) (net.Conn, error) {
return proxyConn, nil
}, nil
}

func getKonnectivityMTLSDialer(ctx context.Context, address string, _ time.Duration, o GrpcProxyClientOptions) (func(ctx context.Context, addr string) (net.Conn, error), error) {
log.Infof("using konnectivity mTLS dialer")

tlsConfig, err := getClientTLSConfig(o.CACert, o.ClientCert, o.ClientKey, o.ProxyHost, nil)
if err != nil {
return nil, err
}

var proxyConn net.Conn
switch o.Mode {
case "grpc":
transportCreds := credentials.NewTLS(tlsConfig)
dialOption := grpc.WithTransportCredentials(transportCreds)
serverAddress := fmt.Sprintf("%s:%d", o.ProxyHost, o.ProxyPort)
tunnel, err := client.CreateSingleUseGrpcTunnelWithContext(context.TODO(), ctx, serverAddress, dialOption)
if err != nil {
return nil, fmt.Errorf("failed to create tunnel %s, got %v", serverAddress, err)
}

proxyConn, err = tunnel.DialContext(ctx, "tcp", address)
if err != nil {
return nil, fmt.Errorf("failed to dial request %s, got %v", address, err)
}
case "http-connect":
proxyAddress := fmt.Sprintf("%s:%d", o.ProxyHost, o.ProxyPort)
proxyConn, err = tls.Dial("tcp", proxyAddress, tlsConfig)
if err != nil {
return nil, fmt.Errorf("dialing proxy %q failed: %v", proxyAddress, err)
}
fmt.Fprintf(proxyConn, "CONNECT %s HTTP/1.1\r\nHost: %s\r\n\r\n", address, "127.0.0.1")
br := bufio.NewReader(proxyConn)
res, err := http.ReadResponse(br, nil)
if err != nil {
return nil, fmt.Errorf("reading HTTP response from CONNECT to %s via proxy %s failed: %v",
address, proxyAddress, err)
}
if res.StatusCode != 200 {
return nil, fmt.Errorf("proxy error from %s while dialing %s: %v", proxyAddress, address, res.Status)
}

// It's safe to discard the bufio.Reader here and return the
// original TCP conn directly because we only use this for
// TLS, and in TLS the client speaks first, so we know there's
// no unbuffered data. But we can double-check.
if br.Buffered() > 0 {
return nil, fmt.Errorf("unexpected %d bytes of buffered data from CONNECT proxy %q",
br.Buffered(), proxyAddress)
}
default:
return nil, fmt.Errorf("failed to process mode %s", o.Mode)
}

return func(ctx context.Context, addr string) (net.Conn, error) {
return proxyConn, nil
}, nil
}

// getCACertPool loads CA certificates to pool
func getCACertPool(caFile string) (*x509.CertPool, error) {
certPool := x509.NewCertPool()
caCert, err := os.ReadFile(filepath.Clean(caFile))
if err != nil {
return nil, fmt.Errorf("failed to read CA cert %s: %v", caFile, err)
}
ok := certPool.AppendCertsFromPEM(caCert)
if !ok {
return nil, fmt.Errorf("failed to append CA cert to the cert pool")
}
return certPool, nil
}

// getClientTLSConfig returns tlsConfig based on x509 certs
func getClientTLSConfig(caFile, certFile, keyFile, serverName string, protos []string) (*tls.Config, error) {
certPool, err := getCACertPool(caFile)
if err != nil {
return nil, err
}

tlsConfig := &tls.Config{
RootCAs: certPool,
MinVersion: tls.VersionTLS12,
}
if len(protos) != 0 {
tlsConfig.NextProtos = protos
}
if certFile == "" && keyFile == "" {
// return TLS config based on CA only
return tlsConfig, nil
}

cert, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
return nil, fmt.Errorf("failed to load X509 key pair %s and %s: %v", certFile, keyFile, err)
}

tlsConfig.ServerName = serverName
tlsConfig.Certificates = []tls.Certificate{cert}
return tlsConfig, nil
}
16 changes: 15 additions & 1 deletion pkg/csi/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,21 @@ func (cs *controllerServer) getNodeConn(nodeSelected string) (client.Connection,
log.Errorf("CreateVolume: Get node %s address with error: %s", nodeSelected, err.Error())
return nil, err
}
conn, err := client.NewGrpcConnection(addr, time.Duration(cs.options.grpcConnectionTimeout*int(time.Second)))

var proxyOpts client.GrpcProxyClientOptions
if cs.options.konnectivityProxyHost != "" || cs.options.konnectivityUDS != "" {
proxyOpts = client.GrpcProxyClientOptions{
Mode: cs.options.konnectivityProxyMode,
ProxyHost: cs.options.konnectivityProxyHost,
ProxyPort: cs.options.konnectivityProxyPort,
ProxyUDSName: cs.options.konnectivityUDS,
ClientCert: cs.options.konnectivityClientCert,
ClientKey: cs.options.konnectivityClientKey,
CACert: cs.options.konnectivityCACert,
}
}

conn, err := client.NewGrpcConnection(addr, time.Duration(cs.options.grpcConnectionTimeout*int(time.Second)), proxyOpts)
return conn, err
}

Expand Down
27 changes: 27 additions & 0 deletions pkg/csi/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ type driverOptions struct {
extenderSchedulerNames []string
frameworkSchedulerNames []string

konnectivityUDS string
konnectivityProxyHost string
konnectivityProxyPort int
konnectivityProxyMode string
konnectivityClientCert string
konnectivityClientKey string
konnectivityCACert string

kubeclient kubernetes.Interface
localclient clientset.Interface
snapclient snapshot.Interface
Expand All @@ -60,6 +68,13 @@ var defaultDriverOptions = driverOptions{
useNodeHostname: false,
extenderSchedulerNames: []string{"default-scheduler"},
frameworkSchedulerNames: []string{},
konnectivityUDS: "",
konnectivityProxyHost: "",
konnectivityProxyPort: 0,
konnectivityProxyMode: "",
konnectivityClientCert: "",
konnectivityClientKey: "",
konnectivityCACert: "",
}

// Option configures a Driver
Expand Down Expand Up @@ -207,6 +222,18 @@ func WithSnapshotClient(snapclient snapshot.Interface) Option {
}
}

func WithKonnectivity(uds, host string, port int, mode, clientCert, clientKey, caCert string) Option {
return func(o *driverOptions) {
o.konnectivityUDS = uds
o.konnectivityProxyHost = host
o.konnectivityProxyPort = port
o.konnectivityProxyMode = mode
o.konnectivityClientCert = clientCert
o.konnectivityClientKey = clientKey
o.konnectivityCACert = caCert
}
}

func ParseEndpoint(endpoint string) (string, string, error) {
u, err := url.Parse(endpoint)
if err != nil {
Expand Down

0 comments on commit 230a1ac

Please sign in to comment.